Large-Scale Deduplication and SCD Type 2 in PySpark — Getting Consistency Right with MERGE
How to implement exactly-once deduplication across billions of rows and SCD Type 2 change-history tracking with PySpark + Iceberg/Delta MERGE. Covers the pitfalls of dropDuplicates, window-based latest-record selection, and idempotent upsert patterns.
Two of the trickiest problems in data pipelines are deduplication and change-history management (SCD Type 2). CDC streams deliver the same record multiple times, and dimension data changes over time. Try to handle this by hand and you end up with an unrealistic full-scan pipeline that "recomputes everything including yesterday's data."
This article walks through patterns for exactly-once deduplication at the scale of billions of rows and history-preserving SCD2, implemented efficiently with PySpark + Iceberg/Delta MERGE.
1. The dropDuplicates Trap
dropDuplicates, the first thing everyone reaches for, has a trap.
# No guarantee of WHICH row survives — non-deterministic!
df.dropDuplicates(["user_id"])dropDuplicates(["user_id"]) keeps one row per user_id, but it does not guarantee which of the multiple versions survives. If you need to keep "the latest version," as with CDC, this is the wrong tool.
| Requirement | dropDuplicates | Right tool |
|---|---|---|
| Remove fully identical rows | OK | distinct() / dropDuplicates() |
| Pick the latest row per key | Not guaranteed | window row_number |
| Incremental upsert (consistency) | Unsuitable | MERGE |
2. Keeping Only the Latest Record — the Window Pattern
The standard pattern for deterministically picking the single "latest" row per key is a window function.
from pyspark.sql import functions as F
from pyspark.sql.window import Window
# Per user_id, newest updated_at first. Tie-break on op_seq
w = Window.partitionBy("user_id").orderBy(
F.col("updated_at").desc(),
F.col("op_seq").desc(),
)
latest = (df
.withColumn("rn", F.row_number().over(w))
.filter(F.col("rn") == 1)
.drop("rn"))Key points:
- Make the tie-breaker explicit. With
updated_atalone, the order of records sharing the same timestamp is non-deterministic. Add a monotonically increasing value such as a sequence/offset (op_seq, Kafka offset). - This pattern triggers a shuffle and slows down when
user_idis skewed (see the separate article "Conquering PySpark Data Skew").
Deduplication in Streaming
# Watermark-based dedup (prevents unbounded state growth)
dedup = (stream
.withWatermark("event_time", "1 hour")
.dropDuplicates(["event_id", "event_time"]))Streaming dedup without a watermark grows state without bound and ends in OOM. Always limit the state retention window with a watermark.
3. Incremental Upsert — MERGE Is the Answer
Instead of rewriting everything on every run, merge (upsert) only the changes into the target table. Iceberg/Delta's MERGE INTO handles this transactionally.
SQL Common to Iceberg / Delta
# Trim the source (changes) down to the latest records, then MERGE
latest.createOrReplaceTempView("updates")
spark.sql("""
MERGE INTO analytics.users AS t
USING updates AS s
ON t.user_id = s.user_id
WHEN MATCHED AND s.updated_at > t.updated_at THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
""")| Clause | Behavior |
|---|---|
MATCHED AND s.updated_at > t.updated_at | Update only when newer (ignore stale events) |
NOT MATCHED | Insert new rows |
(Optional) MATCHED AND s.op = 'D' THEN DELETE | Apply CDC deletes |
The idempotency key: thanks to the
s.updated_at > t.updated_atcondition, running the same batch twice produces the same result. Even if the pipeline retries, the data never drifts — this is the heart of consistency.
Always Dedup the Source Before MERGE
# If the MERGE source has 2+ rows for the same key: error or non-deterministic result!
# → Guarantee one row per key with the window pattern from section 2, then MERGEThe most common MERGE incident is duplicate keys left in the source. Guarantee one row per key with a window before the MERGE.
4. SCD Type 2 — Preserving Change History
SCD2 is the pattern where, when a dimension changes, you keep the old row as history instead of overwriting it. Each row carries a validity period and a current flag.
user_id | tier | valid_from | valid_to | is_current
--------+--------+-----------+------------+-----------
123 | silver | 2026-01-01 | 2026-05-01 | false ← past
123 | gold | 2026-05-01 | 9999-12-31 | true ← currentImplementing SCD2 with MERGE
SCD2 is tricky because a single MERGE has to do two things: "close out the changed existing row + insert the new version." The standard pattern is to represent each changed record twice (once for the close-out, once for the insert).
from pyspark.sql import functions as F
target = spark.table("analytics.dim_users") # is_current=true marks the current version
src = latest # latest source (one row per key)
# Compare against the current versions to keep only keys that actually changed
current = target.filter("is_current = true")
changed = (src.alias("s")
.join(current.alias("t"), "user_id")
.where("s.tier <> t.tier OR s.region <> t.region") # change-detection columns
.select("s.*"))
# (A) new keys + (B) changed keys → insert as new versions
# (C) changed existing rows get valid_to closed out + is_current=false
# Rows to insert: changed keys + brand-new keys
to_insert = changed.unionByName(
src.join(current, "user_id", "left_anti") # keys that never existed before
)
to_insert.createOrReplaceTempView("scd_updates")
spark.sql("""
MERGE INTO analytics.dim_users AS t
USING scd_updates AS s
ON t.user_id = s.user_id AND t.is_current = true
WHEN MATCHED AND (t.tier <> s.tier OR t.region <> s.region) THEN
UPDATE SET t.is_current = false, t.valid_to = current_date()
""")
# Append the new version rows (valid_from=today, valid_to=9999, is_current=true)
new_versions = to_insert.select(
"user_id", "tier", "region",
F.current_date().alias("valid_from"),
F.lit("9999-12-31").cast("date").alias("valid_to"),
F.lit(True).alias("is_current"),
)
new_versions.writeTo("analytics.dim_users").append()Flow summary:
1. Get the single latest row per key from the source (window dedup)
2. Compare with current versions → identify keys that actually changed
3. MERGE: close out changed current rows with is_current=false + valid_to
4. append: insert changed keys + new keys as the new current versionsQuerying SCD2
-- Current values
SELECT * FROM analytics.dim_users WHERE is_current = true;
-- Values as of a specific date (point-in-time)
SELECT * FROM analytics.dim_users
WHERE '2026-03-15' BETWEEN valid_from AND valid_to;5. Performance and Consistency Checklist
- Guarantee one row per key in the MERGE source (window dedup) — the most common bug
- Deterministic latest-record selection with a tie-breaker (sequence/offset)
- Idempotency via
s.updated_at > t.updated_at - Check join/MERGE key skew (especially NULL/default-value keys)
- Bound streaming dedup state with a watermark
- Regular compaction after MERGE (small files — see the separate article)
- Decide whether to apply CDC deletes (
op='D')
6. Choosing the Right Tool
| Task | Tool |
|---|---|
| Remove fully identical rows | distinct() |
| Latest row per key | window row_number + tie-breaker |
| Incremental upsert | Iceberg/Delta MERGE INTO (idempotent condition) |
| Preserve change history | SCD2 (MERGE close-out + append) |
| Streaming duplicates | watermarked dropDuplicates |
7. Wrapping Up
Deduplication and SCD2 boil down to three things. First, dropDuplicates cannot guarantee "latest," so pick deterministically with window + tie-breaker. Second, instead of a full scan every run, do an incremental upsert with MERGE, and secure idempotency with the updated_at comparison. Third, handle SCD2 as "close out existing rows + insert new versions" using the MERGE+append combination, and always remove duplicate keys from the MERGE source first.
Thanks to transactional MERGE in Iceberg/Delta, the complex history management we used to hand-roll in the Hive era collapses into a single declarative SQL block. It is the key tool for achieving consistency and efficiency at the same time — get these patterns into muscle memory and your CDC and dimension-management pipelines become much simpler.
This article is based on Spark 3.5 + Iceberg/Delta. If you need help designing CDC incremental ingestion or SCD2 dimension-management pipelines, feel free to reach out.
— The Data Dynamics Engineering Team