Blog
pysparksparkstructured-streamingexactly-oncewatermarkdata-engineering

PySpark Structured Streaming State Management and Exactly-Once Ingestion

We tackle the two hardest problems in streaming — unbounded state growth and duplicates/data loss after failures. Learn how to tame state with watermarks, implement exactly-once with foreachBatch + idempotent Iceberg/Delta MERGE, and handle late-arriving data.

Data DynamicsJune 5, 20266 min read

When a batch job fails, you just rerun it. Streaming, however, runs 24/7 and must keep going through failures without duplicates and without data loss. These are the two hardest problems in PySpark Structured Streaming — unbounded state growth and exactly-once consistency.

This post covers practical patterns: taming state with watermarks, handling late-arriving data, and implementing exactly-once ingestion with foreachBatch + idempotent Lakehouse MERGE.

1. The Two Core Challenges

Challenge #1  Unbounded state growth
  Streaming aggregations, joins, and dedup keep state in memory/checkpoints
  -> Without cleanup, state grows forever and you hit OOM
 
Challenge #2  Exactly-once
  Failure -> restart -> last batch gets reprocessed
  -> Done wrong, you write the same data twice (duplicates) or skip it (loss)

We solve them with watermarks and idempotent sinks, respectively.

2. Watermarks — the Key to Taming State

A watermark is a boundary that says "data arriving later than this time will be dropped." Thanks to this boundary, Spark can safely clean up state older than the watermark.

from pyspark.sql import functions as F
 
agg = (stream
    .withWatermark("event_time", "10 minutes")    # allow up to 10 minutes of lateness
    .groupBy(
        F.window("event_time", "5 minutes"),       # 5-minute tumbling window
        "user_id")
    .agg(F.count("*").alias("cnt")))

How it works:

  • Once a window's end time + the watermark delay (10 minutes) has passed, the state for that window is evicted from memory.
  • Aggregating/joining/deduping without a watermark means state accumulates forever and eventually OOMs.
Watermark delayTrade-off
Short (e.g. 1 minute)Less state, drops more late data
Long (e.g. 1 hour)Captures late data well, larger state

Set the watermark delay based on the actual distribution of how late your data arrives. Too short and you lose legitimate data; too long and state bloats.

3. Operations That Accumulate State

OperationStateWatermark required
Windowed aggregationAccumulation per windowRequired (for cleanup)
Stream-stream joinBuffers on both sidesRequired
dropDuplicatesSet of seen keysRequired (unbounded without it)
flatMapGroupsWithStateUser-defined stateManage expiry yourself
# Streaming dedup — state explodes without a watermark
dedup = (stream
    .withWatermark("event_time", "1 hour")
    .dropDuplicates(["event_id", "event_time"]))

4. Custom State — flatMapGroupsWithState / transformWithState

Logic that can't be expressed with windows or dedup — sessionization, anomaly detection — requires managing state yourself. (PySpark provides applyInPandasWithState, and recent versions offer transformWithStateInPandas.)

# Concept: hold state per group (key), update on each event, expire via timeout
# - Session window: close and emit the session after 30 minutes of inactivity
# - Keep last_seen in state, expire it through timeout handling

The key is to always set a timeout (expiry) on the state. Custom state without expiry is the most common cause of streaming OOMs.

5. Exactly-Once — Checkpoints + Idempotent Sinks

Exactly-once in Structured Streaming arises from the combination of two conditions.

1) Replayable source: like Kafka offsets, you can recover how far you have read
2) Idempotent/transactional sink: rewriting the same batch yields the same result
        +
   Checkpoint: progress (offsets, state) is saved to reliable storage
query = (agg.writeStream
    .option("checkpointLocation", "s3://bucket/checkpoints/agg")  # required
    .outputMode("update")
    .trigger(processingTime="1 minute")
    .foreachBatch(upsert_to_iceberg)      # idempotent sink (below)
    .start())

The checkpoint records "how far we have processed." After a failure and restart, the query resumes from the checkpoint, but the last batch may be executed again. That is why the sink must be idempotent to avoid duplicates.

6. foreachBatch + Idempotent Iceberg/Delta MERGE

File sinks (.format("parquet")) are append-only, so they can't do precise upserts. With foreachBatch you treat each micro-batch like a regular DataFrame and load it idempotently via a transactional MERGE into Iceberg/Delta.

def upsert_to_iceberg(micro_batch_df, batch_id):
    # 1) Dedupe keys within the batch (MERGE source must have one row per key)
    from pyspark.sql.window import Window
    w = Window.partitionBy("id").orderBy(F.col("updated_at").desc())
    deduped = (micro_batch_df
        .withColumn("rn", F.row_number().over(w))
        .where("rn = 1").drop("rn"))
 
    deduped.createOrReplaceTempView("updates")
 
    # 2) Idempotent MERGE: update only when newer -> rerunning the same batch yields the same result
    micro_batch_df.sparkSession.sql("""
        MERGE INTO analytics.events AS t
        USING updates AS s
          ON t.id = s.id
        WHEN MATCHED AND s.updated_at > t.updated_at THEN UPDATE SET *
        WHEN NOT MATCHED THEN INSERT *
    """)
 
query = (stream.writeStream
    .option("checkpointLocation", "s3://bucket/checkpoints/events")
    .foreachBatch(upsert_to_iceberg)
    .start())

Why this achieves exactly-once:

  • Even if the same batch_id is reprocessed after a restart, the MERGE's s.updated_at > t.updated_at condition makes writing twice produce the same result (idempotent).
  • MERGE is transactional, so there is no corrupted state from partial writes.

This pattern (foreachBatch + idempotent MERGE) is the de facto standard for exactly-once ingestion in PySpark streaming. The secret to idempotency is per-batch deduplication + the updated_at comparison — the exact same principle as in batch pipelines (see the separate post "Large-Scale Deduplication and SCD Type 2 in PySpark").

7. Late-Arriving Data

Late data arriving within the watermark boundary is reflected in its window. Data arriving beyond the boundary is discarded (dropped).

# If you want to retain dropped late data separately (for audit/reprocessing),
# branch records older than the watermark into a separate table inside foreachBatch

Policy decisions:

  • How much lateness does the business tolerate? -> determines the watermark delay.
  • Whether to discard dropped data, or store it separately and reprocess in batch (Lambda/Kappa architecture).

8. Operations Checklist

  • Set watermarks on stateful operations (aggregation/join/dedup)
  • Set timeouts (expiry) on custom state
  • Put checkpointLocation on reliable storage (S3/HDFS)
  • Make the sink idempotent/transactional (foreachBatch + MERGE)
  • Dedupe keys within the MERGE source batch
  • Control small files via trigger interval (+ regular compaction)
  • Monitor state size and processing lag
  • Watch checkpoint schema compatibility (be careful when changing the query)

9. Summary

ChallengeSolution
Unbounded state growthClean up old state with watermarks
Custom state explosionTimeout-based expiry is mandatory
Duplicates/loss after failuresCheckpoints + idempotent sink
Exactly-once ingestionforeachBatch + Iceberg/Delta MERGE
Late dataDefine the tolerance window via watermark delay

The two great challenges of streaming ultimately converge on two tools — watermarks for state, idempotent sinks for consistency. Sloppy file appends blow up with duplicates, small files, and consistency issues all at once, while the foreachBatch + idempotent MERGE pattern gives streaming the same consistency guarantees as batch pipelines. The longer your pipeline runs nonstop, the more these fundamentals determine its stability.


This post is based on Spark 3.5 + Iceberg/Delta. If you need help designing the consistency and state management of a real-time ingestion pipeline, feel free to reach out.

— Data Dynamics Engineering Team