Blog
pysparksparkdeduplicationscd2mergeicebergdelta

PySpark 대규모 중복 제거와 SCD Type 2 — MERGE 로 정합성 잡기

수십억 행에서 정확히 한 번만 남기는 중복 제거, 그리고 변경 이력을 관리하는 SCD Type 2 를 PySpark + Iceberg/Delta MERGE 로 구현하는 법을 다룹니다. dropDuplicates 의 함정, window 기반 최신 레코드 선택, idempotent upsert 패턴까지.

Data Dynamics2026年6月5日10 min read
This post is not yet translated. The original Korean version is shown below.

데이터 파이프라인에서 가장 까다로운 두 가지가 중복 제거변경 이력 관리(SCD Type 2) 입니다. CDC 스트림에는 같은 레코드가 여러 번 들어오고, 차원 데이터는 시간에 따라 바뀝니다. 이걸 손으로 처리하려다 보면 "어제 것까지 다 다시 계산"하는 비현실적인 풀스캔 파이프라인이 됩니다.

이 글은 수십억 행 규모에서 정확히 한 번만 남기는 중복 제거이력을 보존하는 SCD2를, PySpark + Iceberg/Delta 의 MERGE 로 효율적으로 구현하는 패턴을 정리합니다.

1. dropDuplicates 의 함정

가장 먼저 손이 가는 dropDuplicates 는 함정이 있습니다.

# "어떤" 행이 남는지 보장되지 않음 — 비결정적!
df.dropDuplicates(["user_id"])

dropDuplicates(["user_id"])user_id 별로 한 행을 남기지만, 여러 버전 중 무엇이 남는지 보장하지 않습니다. CDC 처럼 "가장 최신 버전"을 남겨야 한다면 이건 틀린 도구입니다.

요구사항dropDuplicates올바른 도구
완전히 동일한 행 제거OKdistinct() / dropDuplicates()
키별 최신 행 선택보장 못 함window row_number
증분 upsert(정합성)부적합MERGE

2. 최신 레코드만 남기기 — window 패턴

키별로 "가장 최신" 한 행을 결정적으로 고르는 표준 패턴은 window 함수입니다.

from pyspark.sql import functions as F
from pyspark.sql.window import Window
 
# user_id 별, updated_at 최신순. 동률이면 op_seq 로 타이브레이크
w = Window.partitionBy("user_id").orderBy(
    F.col("updated_at").desc(),
    F.col("op_seq").desc(),
)
 
latest = (df
    .withColumn("rn", F.row_number().over(w))
    .filter(F.col("rn") == 1)
    .drop("rn"))

핵심:

  • 타이브레이커를 명시하세요. updated_at 만으로는 같은 시각 레코드 순서가 비결정적입니다. 시퀀스/오프셋(op_seq, Kafka offset) 같은 단조 증가 값을 더합니다.
  • 이 패턴은 셔플을 유발하고, user_id 가 스큐하면 느려집니다(별도 글 "PySpark 데이터 스큐 완전 정복" 참고).

스트리밍에서의 중복 제거

# 워터마크 기반 dedup (상태 무한 증가 방지)
dedup = (stream
    .withWatermark("event_time", "1 hour")
    .dropDuplicates(["event_id", "event_time"]))

워터마크 없이 스트리밍 dedup 을 하면 상태가 무한히 커져 OOM 이 납니다. 반드시 워터마크로 상태 보존 기간을 제한하세요.

3. 증분 Upsert — MERGE 가 정답

매번 전체를 다시 쓰는 대신, 변경분만 대상 테이블에 병합(upsert)합니다. Iceberg/Delta 의 MERGE INTO 가 이를 트랜잭션으로 처리합니다.

Iceberg / Delta 공통 SQL

# source(변경분)에서 최신 레코드만 추린 뒤 MERGE
latest.createOrReplaceTempView("updates")
 
spark.sql("""
MERGE INTO analytics.users AS t
USING updates AS s
  ON t.user_id = s.user_id
WHEN MATCHED AND s.updated_at > t.updated_at THEN
  UPDATE SET *
WHEN NOT MATCHED THEN
  INSERT *
""")
동작
MATCHED AND s.updated_at > t.updated_at더 최신일 때만 갱신(오래된 이벤트 무시)
NOT MATCHED신규 행 삽입
(선택) MATCHED AND s.op = 'D' THEN DELETECDC 삭제 반영

idempotent(멱등) 핵심: s.updated_at > t.updated_at 조건 덕분에, 같은 배치를 두 번 실행해도 결과가 동일합니다. 파이프라인이 재시도돼도 데이터가 틀어지지 않습니다 — 정합성의 핵심입니다.

MERGE 전에 source 를 반드시 dedup

# MERGE 의 source 에 같은 키가 2개 이상이면 에러 또는 비결정적 결과!
# → 2장의 window 패턴으로 키당 1행 보장한 뒤 MERGE

MERGE 의 가장 흔한 사고는 source 에 키 중복이 남아있는 것입니다. MERGE 전에 window 로 키당 한 행을 보장하세요.

4. SCD Type 2 — 변경 이력 보존

SCD2 는 차원이 바뀔 때 기존 행을 덮어쓰지 않고, 이력으로 남기는 패턴입니다. 각 행에 유효 기간과 현재 여부 플래그를 둡니다.

user_id | tier   | valid_from | valid_to   | is_current
--------+--------+-----------+------------+-----------
123     | silver | 2026-01-01 | 2026-05-01 | false      ← 과거
123     | gold   | 2026-05-01 | 9999-12-31 | true       ← 현재

MERGE 로 SCD2 구현

SCD2 는 한 번의 MERGE 로 "변경된 기존 행 마감 + 새 버전 삽입" 두 가지를 해야 해서 까다롭습니다. 표준 패턴은 변경 대상을 두 번 표현(마감용 + 삽입용) 하는 것입니다.

from pyspark.sql import functions as F
 
target = spark.table("analytics.dim_users")     # is_current=true 가 현재 버전
src = latest                                     # 최신 소스(키당 1행)
 
# 현재 버전과 비교해 '실제로 바뀐' 키만 추림
current = target.filter("is_current = true")
changed = (src.alias("s")
    .join(current.alias("t"), "user_id")
    .where("s.tier <> t.tier OR s.region <> t.region")   # 변경 감지 컬럼
    .select("s.*"))
 
# (A) 신규 키 + (B) 변경 키 → 새 버전으로 삽입
# (C) 변경된 기존 행은 valid_to 마감 + is_current=false
 
# 삽입용: 변경 키 + 신규 키
to_insert = changed.unionByName(
    src.join(current, "user_id", "left_anti")    # 아예 없던 신규 키
)
 
to_insert.createOrReplaceTempView("scd_updates")
 
spark.sql("""
MERGE INTO analytics.dim_users AS t
USING scd_updates AS s
  ON t.user_id = s.user_id AND t.is_current = true
WHEN MATCHED AND (t.tier <> s.tier OR t.region <> s.region) THEN
  UPDATE SET t.is_current = false, t.valid_to = current_date()
""")
 
# 새 버전 행 append (valid_from=오늘, valid_to=9999, is_current=true)
new_versions = to_insert.select(
    "user_id", "tier", "region",
    F.current_date().alias("valid_from"),
    F.lit("9999-12-31").cast("date").alias("valid_to"),
    F.lit(True).alias("is_current"),
)
new_versions.writeTo("analytics.dim_users").append()

흐름 요약:

1. 소스에서 키당 최신 1행 확보 (window dedup)
2. 현재 버전과 비교 → 실제 변경된 키 식별
3. MERGE: 변경된 현재 행을 is_current=false + valid_to 마감
4. append: 변경 키 + 신규 키를 새 현재 버전으로 삽입

SCD2 조회

-- 현재 값
SELECT * FROM analytics.dim_users WHERE is_current = true;
 
-- 특정 시점의 값 (point-in-time)
SELECT * FROM analytics.dim_users
WHERE '2026-03-15' BETWEEN valid_from AND valid_to;

5. 성능·정합성 체크리스트

  • MERGE source 는 키당 1행 보장(window dedup) — 가장 흔한 버그
  • 타이브레이커(시퀀스/오프셋)로 결정적 최신 선택
  • s.updated_at > t.updated_at 로 멱등성 확보
  • 조인/MERGE 키 스큐 점검(특히 NULL·기본값 키)
  • 스트리밍 dedup 은 워터마크로 상태 제한
  • MERGE 후 정기 컴팩션(작은 파일 — 별도 글 참고)
  • CDC 삭제(op='D') 반영 여부 결정

6. 도구 선택

작업도구
완전 동일 행 제거distinct()
키별 최신 1행window row_number + 타이브레이커
증분 upsertIceberg/Delta MERGE INTO (멱등 조건)
변경 이력 보존SCD2 (MERGE 마감 + append)
스트리밍 중복워터마크 dropDuplicates

7. 정리

중복 제거와 SCD2 의 핵심은 세 가지입니다. 첫째, dropDuplicates 는 "최신"을 보장하지 못하므로 window + 타이브레이커로 결정적으로 고를 것. 둘째, 매번 풀스캔 대신 MERGE 로 증분 upsert 하되 updated_at 비교로 멱등성을 확보할 것. 셋째, SCD2 는 "기존 행 마감 + 새 버전 삽입"을 MERGE+append 조합으로 처리하고, MERGE source 의 키 중복을 반드시 먼저 제거할 것.

Iceberg/Delta 의 트랜잭션 MERGE 덕분에, 과거 Hive 시대에 손으로 짜던 복잡한 이력 관리가 선언적인 SQL 한 블록으로 정리됩니다. 정합성과 효율을 동시에 잡는 핵심 도구이니, 패턴을 손에 익혀두면 CDC·차원 관리 파이프라인이 훨씬 단순해집니다.


이 글은 Spark 3.5 + Iceberg/Delta 기준으로 작성되었습니다. CDC 증분 적재나 SCD2 차원 관리 파이프라인 설계가 필요하시면 언제든 문의해 주세요.

— Data Dynamics 엔지니어링 팀