Incremental Processing in PySpark — Iceberg/Delta CDC and Change Data Feed
Building incremental pipelines that read "only what changed" and propagate it downstream instead of reprocessing everything. We cover tracking insert/update/delete with Iceberg incremental reads and Delta Change Data Feed, applying changes idempotently to downstream tables, and incremental propagation through a medallion architecture.
Re-reading and reprocessing an entire table every time the data changes becomes impossible as data grows. You can't rescan multiple terabytes from scratch every day. The answer is incremental processing — reading "only the rows that changed since the last run" and propagating them downstream.
Traditionally, incremental processing was painfully implemented with updated_at watermarks, but modern Lakehouse formats build in change tracking itself (Iceberg incremental read, Delta Change Data Feed). This post walks through the patterns for reading change data with PySpark and applying it idempotently downstream.
1. What Is Incremental Processing
Full reprocessing: Scan the entire table every run → transform → overwrite (unrealistic at TB scale)
Incremental: Read "only changed rows" → transform → merge downstream (efficient)The key is knowing "what changed (insert/update/delete)". There are two ways to get this.
| Approach | Tracks |
|---|---|
| Iceberg incremental read | Data appended between snapshots |
| Delta Change Data Feed (CDF) | Row-level changes (insert/update/delete) |
2. Iceberg — Snapshot-Based Incremental Reads
In Iceberg, every commit is a snapshot. You can read only the data newly added between two snapshots.
# Read only data appended after a given snapshot (targets append snapshots)
incr = (spark.read
.format("iceberg")
.option("start-snapshot-id", last_processed_snapshot)
.option("end-snapshot-id", current_snapshot)
.load("analytics.events"))
# Or timestamp-based
incr = (spark.read
.format("iceberg")
.option("start-timestamp", last_run_ts_ms)
.load("analytics.events"))After processing, persist the last snapshot ID you saw and use it as the starting point for the next run.
# After processing, record the current snapshot ID in a checkpoint table (starting point for the next run)
current = spark.sql("SELECT snapshot_id FROM analytics.events.snapshots ORDER BY committed_at DESC LIMIT 1")Iceberg incremental read primarily handles appends. To track updates/deletes at the row level, you need Delta CDF or a separate design.
3. Delta Change Data Feed (CDF)
Delta's CDF records insert/update/delete at the row level. It even tells you the change type, which makes it powerful.
# Enable CDF on a table (at creation time or via ALTER)
spark.sql("ALTER TABLE analytics.users SET TBLPROPERTIES (delta.enableChangeDataFeed = true)")# Read changes for a version (or timestamp) range
changes = (spark.read
.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", last_version)
.table("analytics.users"))
# changes includes special columns:
# _change_type: insert / update_preimage / update_postimage / delete
# _commit_version, _commit_timestamp_change_type | Meaning |
|---|---|
insert | Newly inserted row |
update_preimage | Value before the update |
update_postimage | Value after the update |
delete | Deleted row |
You can also subscribe to changes as a stream.
stream = (spark.readStream
.format("delta")
.option("readChangeFeed", "true")
.table("analytics.users"))4. Applying Changes Downstream (Idempotent MERGE)
Apply the changes you read to a downstream table. Inserts/updates become upserts; deletes become deletions.
def apply_changes(batch_df, batch_id):
# For updates, use only the postimage; process together with deletes
latest = (batch_df
.filter("_change_type IN ('insert','update_postimage','delete')"))
latest.createOrReplaceTempView("changes")
batch_df.sparkSession.sql("""
MERGE INTO marts.users t USING changes s
ON t.id = s.id
WHEN MATCHED AND s._change_type = 'delete' THEN DELETE
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED AND s._change_type != 'delete' THEN INSERT *
""")
(stream.writeStream
.option("checkpointLocation", "s3://bucket/ckpt/users-cdc")
.foreachBatch(apply_changes)
.start())CDF + MERGE is the standard pattern for propagating source changes downstream exactly. Since deletes are applied too, the downstream table stays consistent with the source.
5. Medallion Architecture — Incremental Propagation
Incremental processing really shines when you chain multiple layers (Bronze→Silver→Gold) incrementally.
Bronze (raw ingestion) ──CDF/incremental──> Silver (cleansed) ──CDF──> Gold (aggregated)
Each stage propagates "only what changed" to the next → no full recomputation# Silver: read only Bronze's changes, cleanse, then merge into Silver
bronze_changes = read_changes("bronze.events", last_version)
cleaned = transform(bronze_changes)
merge_into("silver.events", cleaned)
# Gold: read only Silver's changes and refresh aggregates (incremental aggregation needs care — see below)Since each layer processes only the changes, you don't have to recompute multiple terabytes from scratch every day.
6. Pitfall — Correctness of Incremental Aggregation
Propagating upserts incrementally is easy, but incremental aggregation is tricky. "Yesterday's total + today's changes" is not always correct (especially when updates/deletes alter past aggregates).
| Aggregation | Incremental feasibility |
|---|---|
| count/sum (insert only) | Just add |
| sum (with update/delete) | Must apply the before/after delta |
| distinct | Hard to do incrementally (needs sketches) |
| min/max (with delete) | May require recomputation |
Rule of thumb: aggregating insert-only streams incrementally is easy, but when updates/deletes rewrite the past, it's safer to re-aggregate only the affected partitions/groups. Consider a design that picks out only the impacted keys for partial recomputation.
7. Choosing Between Incremental and Full Reprocessing
| Situation | Recommendation |
|---|---|
| Large volume, frequent updates | Incremental (CDF/incremental) |
| Small volume, simple | Full reprocessing (simplicity wins) |
| Complex aggregations (past changes) | Recompute only affected partitions |
| Append-only events | Iceberg incremental read |
| Tracking update/delete | Delta CDF |
Incremental is not always the right answer. With small data, full reprocessing is simpler and less bug-prone.
8. Summary
| Tool | Tracks | Use case |
|---|---|---|
| Iceberg incremental read | Appends between snapshots | Append-only increments |
| Delta CDF | Row-level i/u/d | Change propagation |
| MERGE | Idempotent application | Downstream apply |
| Snapshot/version checkpoint | Progress position | Next starting point |
The essence of incremental processing is: "don't re-read everything — read only what changed and propagate it". Iceberg's snapshot increments and Delta's Change Data Feed provide change tracking at the table-format level, so the incremental logic you used to struggle with via updated_at becomes declaratively simple. Apply insert/update/delete downstream idempotently with CDF + MERGE, chain your medallion layers incrementally — and you can keep data fresh without recomputing multiple terabytes every day. Just handle the correctness of incremental aggregation with care.
This post is based on Spark 3.5 + Iceberg/Delta. If you need help with incremental processing, CDC pipelines, or medallion architecture design, feel free to reach out.
— The Data Dynamics Engineering Team