PySpark Kafka Streaming in Depth — Offsets, Backpressure, and Exactly-Once
A hands-on guide to consuming Kafka reliably with Structured Streaming. We cover offset management and checkpoints, maxOffsetsPerTrigger backpressure, schema parsing, exactly-once ingestion, and operating consumer lag and reprocessing.
Kafka is the de facto bus for real-time data, and Spark Structured Streaming is the most common tool for consuming it and loading the results into a Lakehouse. Yet behind the seemingly simple task of "read from Kafka and write" hides a pile of operational details — offset management, backpressure, schema parsing, exactly-once, consumer lag.
This post lays out the practical patterns for consuming and ingesting Kafka reliably with PySpark. (For the general theory of streaming state management and watermarks, see the separate post "PySpark Structured Streaming State Management and Exactly-Once.")
1. Reading the Kafka Source — the Basics
from pyspark.sql import functions as F
raw = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "broker1:9092,broker2:9092")
.option("subscribe", "events") # topic
.option("startingOffsets", "latest") # or earliest
.option("maxOffsetsPerTrigger", "500000") # backpressure (see below)
.load())
# Kafka messages are binary — key/value are binary
# Columns of raw: key, value, topic, partition, offset, timestamp| Kafka column | Meaning |
|---|---|
key, value | Message (binary) |
topic, partition, offset | Position info |
timestamp | Kafka timestamp |
2. Offset Management — the Checkpoint Is Everything
"How far have we read" (the offset) is managed by the checkpoint. The source of truth is the Spark checkpoint, not the Kafka consumer group's __consumer_offsets.
query = (parsed.writeStream
.option("checkpointLocation", "s3://bucket/ckpt/events") # required
.start())startingOffsets | Behavior |
|---|---|
latest | Only messages after the query starts (ignore existing) |
earliest | From the beginning of the topic |
| JSON spec | From specific partitions/offsets |
Key point:
startingOffsetsapplies only on the first run, when no checkpoint exists. On restart, the query resumes from the offsets in the checkpoint. So be careful — deleting the checkpoint means reading from the beginning (or from latest) all over again. The checkpoint is a critical asset that holds offsets, state, and progress all in one place.
3. Backpressure — maxOffsetsPerTrigger
When a streaming job resumes after a long pause or traffic spikes, a single micro-batch may try to process tens of millions of records at once and hit an OOM. Use maxOffsetsPerTrigger to cap the number of messages per batch.
.option("maxOffsetsPerTrigger", "500000") # at most 500K records per triggerWithout backpressure: 100M backlogged records in one batch → OOM 💥
With backpressure: processed in chunks of 500K per batch → catches up steadilyTune this value to your batch processing capacity. Too small and the lag never shrinks; too large and each batch becomes heavy.
4. Schema Parsing — binary value → Structured Data
Kafka values are usually JSON or Avro. Parse them with from_json/from_avro.
from pyspark.sql import types as T
schema = T.StructType([
T.StructField("user_id", T.LongType()),
T.StructField("event_type", T.StringType()),
T.StructField("event_time", T.TimestampType()),
])
parsed = (raw
.select(
F.col("key").cast("string").alias("key"),
F.from_json(F.col("value").cast("string"), schema).alias("data"),
F.col("timestamp").alias("kafka_ts"),
F.col("offset"))
.select("key", "data.*", "kafka_ts", "offset"))With Avro + Schema Registry, use from_avro and the registry integration. (For Avro schema fundamentals, see the separate post "The Complete Guide to Apache Avro Schemas.") Quarantine malformed messages (see the quarantine pattern in the separate post "Nested Semi-Structured Data").
5. Exactly-Once Ingestion — foreachBatch + MERGE
For Kafka→Lakehouse ingestion, exactly-once is achieved by combining a checkpoint (replayable source) + an idempotent sink. Instead of appending files, use foreachBatch with an Iceberg/Delta MERGE.
def upsert(batch_df, batch_id):
# Deduplicate keys within the batch (the MERGE source must have one row per key)
from pyspark.sql.window import Window
w = Window.partitionBy("user_id", "event_time").orderBy(F.col("offset").desc())
deduped = batch_df.withColumn("rn", F.row_number().over(w)).where("rn=1").drop("rn")
deduped.createOrReplaceTempView("u")
batch_df.sparkSession.sql("""
MERGE INTO analytics.events t USING u s
ON t.user_id = s.user_id AND t.event_time = s.event_time
WHEN NOT MATCHED THEN INSERT *
""")
query = (parsed.writeStream
.option("checkpointLocation", "s3://bucket/ckpt/events")
.foreachBatch(upsert)
.trigger(processingTime="1 minute")
.start())Even if a restart causes the same batch to be reprocessed, the MERGE's key matching ensures no duplicates (idempotency). Using the Kafka offset as a dedup key or tiebreaker makes the consistency even more robust.
6. Trigger Modes and Small Files
| Trigger | Behavior | Notes |
|---|---|---|
processingTime="1 minute" | Micro-batch every minute | Most common |
availableNow=True | Process the backlog, then stop | Periodic batch-style streaming |
continuous (experimental) | Ultra-low latency | Many restrictions |
Short trigger intervals (5 seconds) cause small files to explode. Ingestion streams typically use 1–5 minute triggers to grow the batches, and any small files that still accumulate are cleaned up with periodic compaction (see the separate post "PySpark Small Files Problem").
availableNowis a practical mode for running "periodic batches with streaming code."
7. Multiple Topics, Partitions, and Parallelism
.option("subscribe", "events,clicks,orders") # multiple topics
.option("subscribePattern", "logs-.*") # pattern subscription- Parallelism is bound to the number of Kafka partitions. If the topic has few partitions, Spark gets only that many tasks — if throughput won't scale, you may need to add topic partitions.
minPartitionscan split partitions further on the Spark side to raise parallelism (useful when each partition carries a lot of data).
8. Operations — Consumer Lag and Reprocessing
| Concern | Approach |
|---|---|
| Consumer lag | Kafka lag monitoring (detect processing delays) |
| Processing delay | StreamingQueryProgress (inputRowsPerSecond, etc.) |
| Reprocessing | New checkpoint path + explicit startingOffsets |
| Schema evolution | New fields nullable, quarantine absorbs breakage |
| Query changes | Watch checkpoint compatibility (state/schema changes) |
# Monitor progress
for q in spark.streams.active:
print(q.lastProgress) # batch duration, input rate, processing rate, lagReprocessing (replaying from a specific point): leave the existing checkpoint alone and run a separate job with a new checkpoint path +
startingOffsets. Deleting the existing checkpoint destroys progress and state.
9. Summary
| Area | Key point |
|---|---|
| Offsets | The checkpoint is the source of truth |
| Backpressure | Cap batches with maxOffsetsPerTrigger |
| Parsing | from_json/from_avro, quarantine breakage |
| Exactly-once | foreachBatch + idempotent MERGE |
| Small files | Longer trigger interval + periodic compaction |
| Parallelism | Bound by Kafka partition count |
The key to Kafka streaming is keeping one fact front and center: "the checkpoint manages offsets, state, and progress — all of it." Stop the floods with backpressure, parse safely with from_json, and achieve exactly-once with foreachBatch + an idempotent MERGE — and you can run a 24/7 real-time pipeline without duplicates or data loss. Tame small files with trigger intervals and compaction, and your real-time Lakehouse ingestion is complete.
This post was written against Spark 3.5 + Kafka. If you need help designing a real-time Kafka-Lakehouse ingestion pipeline, feel free to reach out.
— The Data Dynamics Engineering Team