Blog
pysparksparkkafkastructured-streamingexactly-oncedata-engineering

PySpark Kafka Streaming in Depth — Offsets, Backpressure, and Exactly-Once

A hands-on guide to consuming Kafka reliably with Structured Streaming. We cover offset management and checkpoints, maxOffsetsPerTrigger backpressure, schema parsing, exactly-once ingestion, and operating consumer lag and reprocessing.

Data DynamicsJune 5, 20266 min read

Kafka is the de facto bus for real-time data, and Spark Structured Streaming is the most common tool for consuming it and loading the results into a Lakehouse. Yet behind the seemingly simple task of "read from Kafka and write" hides a pile of operational details — offset management, backpressure, schema parsing, exactly-once, consumer lag.

This post lays out the practical patterns for consuming and ingesting Kafka reliably with PySpark. (For the general theory of streaming state management and watermarks, see the separate post "PySpark Structured Streaming State Management and Exactly-Once.")

1. Reading the Kafka Source — the Basics

from pyspark.sql import functions as F
 
raw = (spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "broker1:9092,broker2:9092")
    .option("subscribe", "events")                  # topic
    .option("startingOffsets", "latest")            # or earliest
    .option("maxOffsetsPerTrigger", "500000")       # backpressure (see below)
    .load())
 
# Kafka messages are binary — key/value are binary
# Columns of raw: key, value, topic, partition, offset, timestamp
Kafka columnMeaning
key, valueMessage (binary)
topic, partition, offsetPosition info
timestampKafka timestamp

2. Offset Management — the Checkpoint Is Everything

"How far have we read" (the offset) is managed by the checkpoint. The source of truth is the Spark checkpoint, not the Kafka consumer group's __consumer_offsets.

query = (parsed.writeStream
    .option("checkpointLocation", "s3://bucket/ckpt/events")  # required
    .start())
startingOffsetsBehavior
latestOnly messages after the query starts (ignore existing)
earliestFrom the beginning of the topic
JSON specFrom specific partitions/offsets

Key point: startingOffsets applies only on the first run, when no checkpoint exists. On restart, the query resumes from the offsets in the checkpoint. So be careful — deleting the checkpoint means reading from the beginning (or from latest) all over again. The checkpoint is a critical asset that holds offsets, state, and progress all in one place.

3. Backpressure — maxOffsetsPerTrigger

When a streaming job resumes after a long pause or traffic spikes, a single micro-batch may try to process tens of millions of records at once and hit an OOM. Use maxOffsetsPerTrigger to cap the number of messages per batch.

.option("maxOffsetsPerTrigger", "500000")   # at most 500K records per trigger
Without backpressure: 100M backlogged records in one batch → OOM 💥
With backpressure: processed in chunks of 500K per batch → catches up steadily

Tune this value to your batch processing capacity. Too small and the lag never shrinks; too large and each batch becomes heavy.

4. Schema Parsing — binary value → Structured Data

Kafka values are usually JSON or Avro. Parse them with from_json/from_avro.

from pyspark.sql import types as T
 
schema = T.StructType([
    T.StructField("user_id", T.LongType()),
    T.StructField("event_type", T.StringType()),
    T.StructField("event_time", T.TimestampType()),
])
 
parsed = (raw
    .select(
        F.col("key").cast("string").alias("key"),
        F.from_json(F.col("value").cast("string"), schema).alias("data"),
        F.col("timestamp").alias("kafka_ts"),
        F.col("offset"))
    .select("key", "data.*", "kafka_ts", "offset"))

With Avro + Schema Registry, use from_avro and the registry integration. (For Avro schema fundamentals, see the separate post "The Complete Guide to Apache Avro Schemas.") Quarantine malformed messages (see the quarantine pattern in the separate post "Nested Semi-Structured Data").

5. Exactly-Once Ingestion — foreachBatch + MERGE

For Kafka→Lakehouse ingestion, exactly-once is achieved by combining a checkpoint (replayable source) + an idempotent sink. Instead of appending files, use foreachBatch with an Iceberg/Delta MERGE.

def upsert(batch_df, batch_id):
    # Deduplicate keys within the batch (the MERGE source must have one row per key)
    from pyspark.sql.window import Window
    w = Window.partitionBy("user_id", "event_time").orderBy(F.col("offset").desc())
    deduped = batch_df.withColumn("rn", F.row_number().over(w)).where("rn=1").drop("rn")
    deduped.createOrReplaceTempView("u")
 
    batch_df.sparkSession.sql("""
        MERGE INTO analytics.events t USING u s
        ON t.user_id = s.user_id AND t.event_time = s.event_time
        WHEN NOT MATCHED THEN INSERT *
    """)
 
query = (parsed.writeStream
    .option("checkpointLocation", "s3://bucket/ckpt/events")
    .foreachBatch(upsert)
    .trigger(processingTime="1 minute")
    .start())

Even if a restart causes the same batch to be reprocessed, the MERGE's key matching ensures no duplicates (idempotency). Using the Kafka offset as a dedup key or tiebreaker makes the consistency even more robust.

6. Trigger Modes and Small Files

TriggerBehaviorNotes
processingTime="1 minute"Micro-batch every minuteMost common
availableNow=TrueProcess the backlog, then stopPeriodic batch-style streaming
continuous (experimental)Ultra-low latencyMany restrictions

Short trigger intervals (5 seconds) cause small files to explode. Ingestion streams typically use 1–5 minute triggers to grow the batches, and any small files that still accumulate are cleaned up with periodic compaction (see the separate post "PySpark Small Files Problem"). availableNow is a practical mode for running "periodic batches with streaming code."

7. Multiple Topics, Partitions, and Parallelism

.option("subscribe", "events,clicks,orders")      # multiple topics
.option("subscribePattern", "logs-.*")            # pattern subscription
  • Parallelism is bound to the number of Kafka partitions. If the topic has few partitions, Spark gets only that many tasks — if throughput won't scale, you may need to add topic partitions.
  • minPartitions can split partitions further on the Spark side to raise parallelism (useful when each partition carries a lot of data).

8. Operations — Consumer Lag and Reprocessing

ConcernApproach
Consumer lagKafka lag monitoring (detect processing delays)
Processing delayStreamingQueryProgress (inputRowsPerSecond, etc.)
ReprocessingNew checkpoint path + explicit startingOffsets
Schema evolutionNew fields nullable, quarantine absorbs breakage
Query changesWatch checkpoint compatibility (state/schema changes)
# Monitor progress
for q in spark.streams.active:
    print(q.lastProgress)   # batch duration, input rate, processing rate, lag

Reprocessing (replaying from a specific point): leave the existing checkpoint alone and run a separate job with a new checkpoint path + startingOffsets. Deleting the existing checkpoint destroys progress and state.

9. Summary

AreaKey point
OffsetsThe checkpoint is the source of truth
BackpressureCap batches with maxOffsetsPerTrigger
Parsingfrom_json/from_avro, quarantine breakage
Exactly-onceforeachBatch + idempotent MERGE
Small filesLonger trigger interval + periodic compaction
ParallelismBound by Kafka partition count

The key to Kafka streaming is keeping one fact front and center: "the checkpoint manages offsets, state, and progress — all of it." Stop the floods with backpressure, parse safely with from_json, and achieve exactly-once with foreachBatch + an idempotent MERGE — and you can run a 24/7 real-time pipeline without duplicates or data loss. Tame small files with trigger intervals and compaction, and your real-time Lakehouse ingestion is complete.


This post was written against Spark 3.5 + Kafka. If you need help designing a real-time Kafka-Lakehouse ingestion pipeline, feel free to reach out.

— The Data Dynamics Engineering Team