Blog
pysparksparkdeduplicationscd2mergeicebergdelta

Large-Scale Deduplication and SCD Type 2 in PySpark — Getting Consistency Right with MERGE

How to implement exactly-once deduplication across billions of rows and SCD Type 2 change-history tracking with PySpark + Iceberg/Delta MERGE. Covers the pitfalls of dropDuplicates, window-based latest-record selection, and idempotent upsert patterns.

Data DynamicsJune 5, 20267 min read

Two of the trickiest problems in data pipelines are deduplication and change-history management (SCD Type 2). CDC streams deliver the same record multiple times, and dimension data changes over time. Try to handle this by hand and you end up with an unrealistic full-scan pipeline that "recomputes everything including yesterday's data."

This article walks through patterns for exactly-once deduplication at the scale of billions of rows and history-preserving SCD2, implemented efficiently with PySpark + Iceberg/Delta MERGE.

1. The dropDuplicates Trap

dropDuplicates, the first thing everyone reaches for, has a trap.

# No guarantee of WHICH row survives — non-deterministic!
df.dropDuplicates(["user_id"])

dropDuplicates(["user_id"]) keeps one row per user_id, but it does not guarantee which of the multiple versions survives. If you need to keep "the latest version," as with CDC, this is the wrong tool.

RequirementdropDuplicatesRight tool
Remove fully identical rowsOKdistinct() / dropDuplicates()
Pick the latest row per keyNot guaranteedwindow row_number
Incremental upsert (consistency)UnsuitableMERGE

2. Keeping Only the Latest Record — the Window Pattern

The standard pattern for deterministically picking the single "latest" row per key is a window function.

from pyspark.sql import functions as F
from pyspark.sql.window import Window
 
# Per user_id, newest updated_at first. Tie-break on op_seq
w = Window.partitionBy("user_id").orderBy(
    F.col("updated_at").desc(),
    F.col("op_seq").desc(),
)
 
latest = (df
    .withColumn("rn", F.row_number().over(w))
    .filter(F.col("rn") == 1)
    .drop("rn"))

Key points:

  • Make the tie-breaker explicit. With updated_at alone, the order of records sharing the same timestamp is non-deterministic. Add a monotonically increasing value such as a sequence/offset (op_seq, Kafka offset).
  • This pattern triggers a shuffle and slows down when user_id is skewed (see the separate article "Conquering PySpark Data Skew").

Deduplication in Streaming

# Watermark-based dedup (prevents unbounded state growth)
dedup = (stream
    .withWatermark("event_time", "1 hour")
    .dropDuplicates(["event_id", "event_time"]))

Streaming dedup without a watermark grows state without bound and ends in OOM. Always limit the state retention window with a watermark.

3. Incremental Upsert — MERGE Is the Answer

Instead of rewriting everything on every run, merge (upsert) only the changes into the target table. Iceberg/Delta's MERGE INTO handles this transactionally.

SQL Common to Iceberg / Delta

# Trim the source (changes) down to the latest records, then MERGE
latest.createOrReplaceTempView("updates")
 
spark.sql("""
MERGE INTO analytics.users AS t
USING updates AS s
  ON t.user_id = s.user_id
WHEN MATCHED AND s.updated_at > t.updated_at THEN
  UPDATE SET *
WHEN NOT MATCHED THEN
  INSERT *
""")
ClauseBehavior
MATCHED AND s.updated_at > t.updated_atUpdate only when newer (ignore stale events)
NOT MATCHEDInsert new rows
(Optional) MATCHED AND s.op = 'D' THEN DELETEApply CDC deletes

The idempotency key: thanks to the s.updated_at > t.updated_at condition, running the same batch twice produces the same result. Even if the pipeline retries, the data never drifts — this is the heart of consistency.

Always Dedup the Source Before MERGE

# If the MERGE source has 2+ rows for the same key: error or non-deterministic result!
# → Guarantee one row per key with the window pattern from section 2, then MERGE

The most common MERGE incident is duplicate keys left in the source. Guarantee one row per key with a window before the MERGE.

4. SCD Type 2 — Preserving Change History

SCD2 is the pattern where, when a dimension changes, you keep the old row as history instead of overwriting it. Each row carries a validity period and a current flag.

user_id | tier   | valid_from | valid_to   | is_current
--------+--------+-----------+------------+-----------
123     | silver | 2026-01-01 | 2026-05-01 | false      ← past
123     | gold   | 2026-05-01 | 9999-12-31 | true       ← current

Implementing SCD2 with MERGE

SCD2 is tricky because a single MERGE has to do two things: "close out the changed existing row + insert the new version." The standard pattern is to represent each changed record twice (once for the close-out, once for the insert).

from pyspark.sql import functions as F
 
target = spark.table("analytics.dim_users")     # is_current=true marks the current version
src = latest                                     # latest source (one row per key)
 
# Compare against the current versions to keep only keys that actually changed
current = target.filter("is_current = true")
changed = (src.alias("s")
    .join(current.alias("t"), "user_id")
    .where("s.tier <> t.tier OR s.region <> t.region")   # change-detection columns
    .select("s.*"))
 
# (A) new keys + (B) changed keys → insert as new versions
# (C) changed existing rows get valid_to closed out + is_current=false
 
# Rows to insert: changed keys + brand-new keys
to_insert = changed.unionByName(
    src.join(current, "user_id", "left_anti")    # keys that never existed before
)
 
to_insert.createOrReplaceTempView("scd_updates")
 
spark.sql("""
MERGE INTO analytics.dim_users AS t
USING scd_updates AS s
  ON t.user_id = s.user_id AND t.is_current = true
WHEN MATCHED AND (t.tier <> s.tier OR t.region <> s.region) THEN
  UPDATE SET t.is_current = false, t.valid_to = current_date()
""")
 
# Append the new version rows (valid_from=today, valid_to=9999, is_current=true)
new_versions = to_insert.select(
    "user_id", "tier", "region",
    F.current_date().alias("valid_from"),
    F.lit("9999-12-31").cast("date").alias("valid_to"),
    F.lit(True).alias("is_current"),
)
new_versions.writeTo("analytics.dim_users").append()

Flow summary:

1. Get the single latest row per key from the source (window dedup)
2. Compare with current versions → identify keys that actually changed
3. MERGE: close out changed current rows with is_current=false + valid_to
4. append: insert changed keys + new keys as the new current versions

Querying SCD2

-- Current values
SELECT * FROM analytics.dim_users WHERE is_current = true;
 
-- Values as of a specific date (point-in-time)
SELECT * FROM analytics.dim_users
WHERE '2026-03-15' BETWEEN valid_from AND valid_to;

5. Performance and Consistency Checklist

  • Guarantee one row per key in the MERGE source (window dedup) — the most common bug
  • Deterministic latest-record selection with a tie-breaker (sequence/offset)
  • Idempotency via s.updated_at > t.updated_at
  • Check join/MERGE key skew (especially NULL/default-value keys)
  • Bound streaming dedup state with a watermark
  • Regular compaction after MERGE (small files — see the separate article)
  • Decide whether to apply CDC deletes (op='D')

6. Choosing the Right Tool

TaskTool
Remove fully identical rowsdistinct()
Latest row per keywindow row_number + tie-breaker
Incremental upsertIceberg/Delta MERGE INTO (idempotent condition)
Preserve change historySCD2 (MERGE close-out + append)
Streaming duplicateswatermarked dropDuplicates

7. Wrapping Up

Deduplication and SCD2 boil down to three things. First, dropDuplicates cannot guarantee "latest," so pick deterministically with window + tie-breaker. Second, instead of a full scan every run, do an incremental upsert with MERGE, and secure idempotency with the updated_at comparison. Third, handle SCD2 as "close out existing rows + insert new versions" using the MERGE+append combination, and always remove duplicate keys from the MERGE source first.

Thanks to transactional MERGE in Iceberg/Delta, the complex history management we used to hand-roll in the Hive era collapses into a single declarative SQL block. It is the key tool for achieving consistency and efficiency at the same time — get these patterns into muscle memory and your CDC and dimension-management pipelines become much simpler.


This article is based on Spark 3.5 + Iceberg/Delta. If you need help designing CDC incremental ingestion or SCD2 dimension-management pipelines, feel free to reach out.

— The Data Dynamics Engineering Team