Blog
pysparksparkcdcchange-data-feedicebergdeltadata-engineering

Incremental Processing in PySpark — Iceberg/Delta CDC and Change Data Feed

Building incremental pipelines that read "only what changed" and propagate it downstream instead of reprocessing everything. We cover tracking insert/update/delete with Iceberg incremental reads and Delta Change Data Feed, applying changes idempotently to downstream tables, and incremental propagation through a medallion architecture.

Data DynamicsJune 5, 20266 min read

Re-reading and reprocessing an entire table every time the data changes becomes impossible as data grows. You can't rescan multiple terabytes from scratch every day. The answer is incremental processing — reading "only the rows that changed since the last run" and propagating them downstream.

Traditionally, incremental processing was painfully implemented with updated_at watermarks, but modern Lakehouse formats build in change tracking itself (Iceberg incremental read, Delta Change Data Feed). This post walks through the patterns for reading change data with PySpark and applying it idempotently downstream.

1. What Is Incremental Processing

Full reprocessing:    Scan the entire table every run → transform → overwrite  (unrealistic at TB scale)
Incremental:          Read "only changed rows" → transform → merge downstream  (efficient)

The key is knowing "what changed (insert/update/delete)". There are two ways to get this.

ApproachTracks
Iceberg incremental readData appended between snapshots
Delta Change Data Feed (CDF)Row-level changes (insert/update/delete)

2. Iceberg — Snapshot-Based Incremental Reads

In Iceberg, every commit is a snapshot. You can read only the data newly added between two snapshots.

# Read only data appended after a given snapshot (targets append snapshots)
incr = (spark.read
    .format("iceberg")
    .option("start-snapshot-id", last_processed_snapshot)
    .option("end-snapshot-id", current_snapshot)
    .load("analytics.events"))
 
# Or timestamp-based
incr = (spark.read
    .format("iceberg")
    .option("start-timestamp", last_run_ts_ms)
    .load("analytics.events"))

After processing, persist the last snapshot ID you saw and use it as the starting point for the next run.

# After processing, record the current snapshot ID in a checkpoint table (starting point for the next run)
current = spark.sql("SELECT snapshot_id FROM analytics.events.snapshots ORDER BY committed_at DESC LIMIT 1")

Iceberg incremental read primarily handles appends. To track updates/deletes at the row level, you need Delta CDF or a separate design.

3. Delta Change Data Feed (CDF)

Delta's CDF records insert/update/delete at the row level. It even tells you the change type, which makes it powerful.

# Enable CDF on a table (at creation time or via ALTER)
spark.sql("ALTER TABLE analytics.users SET TBLPROPERTIES (delta.enableChangeDataFeed = true)")
# Read changes for a version (or timestamp) range
changes = (spark.read
    .format("delta")
    .option("readChangeFeed", "true")
    .option("startingVersion", last_version)
    .table("analytics.users"))
 
# changes includes special columns:
#   _change_type: insert / update_preimage / update_postimage / delete
#   _commit_version, _commit_timestamp
_change_typeMeaning
insertNewly inserted row
update_preimageValue before the update
update_postimageValue after the update
deleteDeleted row

You can also subscribe to changes as a stream.

stream = (spark.readStream
    .format("delta")
    .option("readChangeFeed", "true")
    .table("analytics.users"))

4. Applying Changes Downstream (Idempotent MERGE)

Apply the changes you read to a downstream table. Inserts/updates become upserts; deletes become deletions.

def apply_changes(batch_df, batch_id):
    # For updates, use only the postimage; process together with deletes
    latest = (batch_df
        .filter("_change_type IN ('insert','update_postimage','delete')"))
    latest.createOrReplaceTempView("changes")
 
    batch_df.sparkSession.sql("""
        MERGE INTO marts.users t USING changes s
        ON t.id = s.id
        WHEN MATCHED AND s._change_type = 'delete' THEN DELETE
        WHEN MATCHED THEN UPDATE SET *
        WHEN NOT MATCHED AND s._change_type != 'delete' THEN INSERT *
    """)
 
(stream.writeStream
    .option("checkpointLocation", "s3://bucket/ckpt/users-cdc")
    .foreachBatch(apply_changes)
    .start())

CDF + MERGE is the standard pattern for propagating source changes downstream exactly. Since deletes are applied too, the downstream table stays consistent with the source.

5. Medallion Architecture — Incremental Propagation

Incremental processing really shines when you chain multiple layers (Bronze→Silver→Gold) incrementally.

Bronze (raw ingestion) ──CDF/incremental──> Silver (cleansed) ──CDF──> Gold (aggregated)
   Each stage propagates "only what changed" to the next → no full recomputation
# Silver: read only Bronze's changes, cleanse, then merge into Silver
bronze_changes = read_changes("bronze.events", last_version)
cleaned = transform(bronze_changes)
merge_into("silver.events", cleaned)
 
# Gold: read only Silver's changes and refresh aggregates (incremental aggregation needs care — see below)

Since each layer processes only the changes, you don't have to recompute multiple terabytes from scratch every day.

6. Pitfall — Correctness of Incremental Aggregation

Propagating upserts incrementally is easy, but incremental aggregation is tricky. "Yesterday's total + today's changes" is not always correct (especially when updates/deletes alter past aggregates).

AggregationIncremental feasibility
count/sum (insert only)Just add
sum (with update/delete)Must apply the before/after delta
distinctHard to do incrementally (needs sketches)
min/max (with delete)May require recomputation

Rule of thumb: aggregating insert-only streams incrementally is easy, but when updates/deletes rewrite the past, it's safer to re-aggregate only the affected partitions/groups. Consider a design that picks out only the impacted keys for partial recomputation.

7. Choosing Between Incremental and Full Reprocessing

SituationRecommendation
Large volume, frequent updatesIncremental (CDF/incremental)
Small volume, simpleFull reprocessing (simplicity wins)
Complex aggregations (past changes)Recompute only affected partitions
Append-only eventsIceberg incremental read
Tracking update/deleteDelta CDF

Incremental is not always the right answer. With small data, full reprocessing is simpler and less bug-prone.

8. Summary

ToolTracksUse case
Iceberg incremental readAppends between snapshotsAppend-only increments
Delta CDFRow-level i/u/dChange propagation
MERGEIdempotent applicationDownstream apply
Snapshot/version checkpointProgress positionNext starting point

The essence of incremental processing is: "don't re-read everything — read only what changed and propagate it". Iceberg's snapshot increments and Delta's Change Data Feed provide change tracking at the table-format level, so the incremental logic you used to struggle with via updated_at becomes declaratively simple. Apply insert/update/delete downstream idempotently with CDF + MERGE, chain your medallion layers incrementally — and you can keep data fresh without recomputing multiple terabytes every day. Just handle the correctness of incremental aggregation with care.


This post is based on Spark 3.5 + Iceberg/Delta. If you need help with incremental processing, CDC pipelines, or medallion architecture design, feel free to reach out.

— The Data Dynamics Engineering Team