PySpark 대규모 중복 제거와 SCD Type 2 — MERGE 로 정합성 잡기
수십억 행에서 정확히 한 번만 남기는 중복 제거, 그리고 변경 이력을 관리하는 SCD Type 2 를 PySpark + Iceberg/Delta MERGE 로 구현하는 법을 다룹니다. dropDuplicates 의 함정, window 기반 최신 레코드 선택, idempotent upsert 패턴까지.
데이터 파이프라인에서 가장 까다로운 두 가지가 중복 제거와 변경 이력 관리(SCD Type 2) 입니다. CDC 스트림에는 같은 레코드가 여러 번 들어오고, 차원 데이터는 시간에 따라 바뀝니다. 이걸 손으로 처리하려다 보면 "어제 것까지 다 다시 계산"하는 비현실적인 풀스캔 파이프라인이 됩니다.
이 글은 수십억 행 규모에서 정확히 한 번만 남기는 중복 제거와 이력을 보존하는 SCD2를, PySpark + Iceberg/Delta 의 MERGE 로 효율적으로 구현하는 패턴을 정리합니다.
1. dropDuplicates 의 함정
가장 먼저 손이 가는 dropDuplicates 는 함정이 있습니다.
# "어떤" 행이 남는지 보장되지 않음 — 비결정적!
df.dropDuplicates(["user_id"])dropDuplicates(["user_id"]) 는 user_id 별로 한 행을 남기지만, 여러 버전 중 무엇이 남는지 보장하지 않습니다. CDC 처럼 "가장 최신 버전"을 남겨야 한다면 이건 틀린 도구입니다.
| 요구사항 | dropDuplicates | 올바른 도구 |
|---|---|---|
| 완전히 동일한 행 제거 | OK | distinct() / dropDuplicates() |
| 키별 최신 행 선택 | 보장 못 함 | window row_number |
| 증분 upsert(정합성) | 부적합 | MERGE |
2. 최신 레코드만 남기기 — window 패턴
키별로 "가장 최신" 한 행을 결정적으로 고르는 표준 패턴은 window 함수입니다.
from pyspark.sql import functions as F
from pyspark.sql.window import Window
# user_id 별, updated_at 최신순. 동률이면 op_seq 로 타이브레이크
w = Window.partitionBy("user_id").orderBy(
F.col("updated_at").desc(),
F.col("op_seq").desc(),
)
latest = (df
.withColumn("rn", F.row_number().over(w))
.filter(F.col("rn") == 1)
.drop("rn"))핵심:
- 타이브레이커를 명시하세요.
updated_at만으로는 같은 시각 레코드 순서가 비결정적입니다. 시퀀스/오프셋(op_seq, Kafka offset) 같은 단조 증가 값을 더합니다. - 이 패턴은 셔플을 유발하고,
user_id가 스큐하면 느려집니다(별도 글 "PySpark 데이터 스큐 완전 정복" 참고).
스트리밍에서의 중복 제거
# 워터마크 기반 dedup (상태 무한 증가 방지)
dedup = (stream
.withWatermark("event_time", "1 hour")
.dropDuplicates(["event_id", "event_time"]))워터마크 없이 스트리밍 dedup 을 하면 상태가 무한히 커져 OOM 이 납니다. 반드시 워터마크로 상태 보존 기간을 제한하세요.
3. 증분 Upsert — MERGE 가 정답
매번 전체를 다시 쓰는 대신, 변경분만 대상 테이블에 병합(upsert)합니다. Iceberg/Delta 의 MERGE INTO 가 이를 트랜잭션으로 처리합니다.
Iceberg / Delta 공통 SQL
# source(변경분)에서 최신 레코드만 추린 뒤 MERGE
latest.createOrReplaceTempView("updates")
spark.sql("""
MERGE INTO analytics.users AS t
USING updates AS s
ON t.user_id = s.user_id
WHEN MATCHED AND s.updated_at > t.updated_at THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
""")| 절 | 동작 |
|---|---|
MATCHED AND s.updated_at > t.updated_at | 더 최신일 때만 갱신(오래된 이벤트 무시) |
NOT MATCHED | 신규 행 삽입 |
(선택) MATCHED AND s.op = 'D' THEN DELETE | CDC 삭제 반영 |
idempotent(멱등) 핵심:
s.updated_at > t.updated_at조건 덕분에, 같은 배치를 두 번 실행해도 결과가 동일합니다. 파이프라인이 재시도돼도 데이터가 틀어지지 않습니다 — 정합성의 핵심입니다.
MERGE 전에 source 를 반드시 dedup
# MERGE 의 source 에 같은 키가 2개 이상이면 에러 또는 비결정적 결과!
# → 2장의 window 패턴으로 키당 1행 보장한 뒤 MERGEMERGE 의 가장 흔한 사고는 source 에 키 중복이 남아있는 것입니다. MERGE 전에 window 로 키당 한 행을 보장하세요.
4. SCD Type 2 — 변경 이력 보존
SCD2 는 차원이 바뀔 때 기존 행을 덮어쓰지 않고, 이력으로 남기는 패턴입니다. 각 행에 유효 기간과 현재 여부 플래그를 둡니다.
user_id | tier | valid_from | valid_to | is_current
--------+--------+-----------+------------+-----------
123 | silver | 2026-01-01 | 2026-05-01 | false ← 과거
123 | gold | 2026-05-01 | 9999-12-31 | true ← 현재MERGE 로 SCD2 구현
SCD2 는 한 번의 MERGE 로 "변경된 기존 행 마감 + 새 버전 삽입" 두 가지를 해야 해서 까다롭습니다. 표준 패턴은 변경 대상을 두 번 표현(마감용 + 삽입용) 하는 것입니다.
from pyspark.sql import functions as F
target = spark.table("analytics.dim_users") # is_current=true 가 현재 버전
src = latest # 최신 소스(키당 1행)
# 현재 버전과 비교해 '실제로 바뀐' 키만 추림
current = target.filter("is_current = true")
changed = (src.alias("s")
.join(current.alias("t"), "user_id")
.where("s.tier <> t.tier OR s.region <> t.region") # 변경 감지 컬럼
.select("s.*"))
# (A) 신규 키 + (B) 변경 키 → 새 버전으로 삽입
# (C) 변경된 기존 행은 valid_to 마감 + is_current=false
# 삽입용: 변경 키 + 신규 키
to_insert = changed.unionByName(
src.join(current, "user_id", "left_anti") # 아예 없던 신규 키
)
to_insert.createOrReplaceTempView("scd_updates")
spark.sql("""
MERGE INTO analytics.dim_users AS t
USING scd_updates AS s
ON t.user_id = s.user_id AND t.is_current = true
WHEN MATCHED AND (t.tier <> s.tier OR t.region <> s.region) THEN
UPDATE SET t.is_current = false, t.valid_to = current_date()
""")
# 새 버전 행 append (valid_from=오늘, valid_to=9999, is_current=true)
new_versions = to_insert.select(
"user_id", "tier", "region",
F.current_date().alias("valid_from"),
F.lit("9999-12-31").cast("date").alias("valid_to"),
F.lit(True).alias("is_current"),
)
new_versions.writeTo("analytics.dim_users").append()흐름 요약:
1. 소스에서 키당 최신 1행 확보 (window dedup)
2. 현재 버전과 비교 → 실제 변경된 키 식별
3. MERGE: 변경된 현재 행을 is_current=false + valid_to 마감
4. append: 변경 키 + 신규 키를 새 현재 버전으로 삽입SCD2 조회
-- 현재 값
SELECT * FROM analytics.dim_users WHERE is_current = true;
-- 특정 시점의 값 (point-in-time)
SELECT * FROM analytics.dim_users
WHERE '2026-03-15' BETWEEN valid_from AND valid_to;5. 성능·정합성 체크리스트
- MERGE source 는 키당 1행 보장(window dedup) — 가장 흔한 버그
- 타이브레이커(시퀀스/오프셋)로 결정적 최신 선택
-
s.updated_at > t.updated_at로 멱등성 확보 - 조인/MERGE 키 스큐 점검(특히 NULL·기본값 키)
- 스트리밍 dedup 은 워터마크로 상태 제한
- MERGE 후 정기 컴팩션(작은 파일 — 별도 글 참고)
- CDC 삭제(
op='D') 반영 여부 결정
6. 도구 선택
| 작업 | 도구 |
|---|---|
| 완전 동일 행 제거 | distinct() |
| 키별 최신 1행 | window row_number + 타이브레이커 |
| 증분 upsert | Iceberg/Delta MERGE INTO (멱등 조건) |
| 변경 이력 보존 | SCD2 (MERGE 마감 + append) |
| 스트리밍 중복 | 워터마크 dropDuplicates |
7. 정리
중복 제거와 SCD2 의 핵심은 세 가지입니다. 첫째, dropDuplicates 는 "최신"을 보장하지 못하므로 window + 타이브레이커로 결정적으로 고를 것. 둘째, 매번 풀스캔 대신 MERGE 로 증분 upsert 하되 updated_at 비교로 멱등성을 확보할 것. 셋째, SCD2 는 "기존 행 마감 + 새 버전 삽입"을 MERGE+append 조합으로 처리하고, MERGE source 의 키 중복을 반드시 먼저 제거할 것.
Iceberg/Delta 의 트랜잭션 MERGE 덕분에, 과거 Hive 시대에 손으로 짜던 복잡한 이력 관리가 선언적인 SQL 한 블록으로 정리됩니다. 정합성과 효율을 동시에 잡는 핵심 도구이니, 패턴을 손에 익혀두면 CDC·차원 관리 파이프라인이 훨씬 단순해집니다.
이 글은 Spark 3.5 + Iceberg/Delta 기준으로 작성되었습니다. CDC 증분 적재나 SCD2 차원 관리 파이프라인 설계가 필요하시면 언제든 문의해 주세요.
— Data Dynamics 엔지니어링 팀