PySpark 증분 처리 — Iceberg/Delta CDC와 Change Data Feed
전체 재처리 대신 "바뀐 것만" 읽어 하류로 전파하는 증분 파이프라인. Iceberg incremental read와 Delta Change Data Feed로 insert/update/delete를 추적하고, 다운스트림 테이블에 멱등하게 반영하는 패턴, 그리고 메달리온 아키텍처 증분 전파를 정리합니다.
데이터가 바뀔 때마다 전체 테이블을 다시 읽어 재처리하는 것은 데이터가 커지면 불가능해집니다. 매일 수 TB 를 통째로 다시 스캔할 수는 없습니다. 해법은 증분 처리 — "마지막 처리 이후 바뀐 행만" 읽어 하류로 전파하는 것입니다.
전통적으로 증분 처리는 updated_at 워터마크로 어렵게 구현했지만, 현대 Lakehouse 포맷은 변경 자체를 추적하는 기능(Iceberg incremental read, Delta Change Data Feed)을 내장합니다. 이 글은 PySpark 로 변경분을 읽어 다운스트림에 멱등하게 반영하는 패턴을 정리합니다.
1. 증분 처리란
전체 재처리: 매번 전체 테이블 스캔 → 변환 → 덮어쓰기 (TB 단위면 비현실적)
증분 처리: "바뀐 행만" 읽기 → 변환 → 다운스트림에 머지 (효율적)핵심은 "무엇이 바뀌었는가(insert/update/delete)"를 아는 것입니다. 이를 두 가지 방식으로 얻습니다.
| 방식 | 추적 |
|---|---|
| Iceberg incremental read | 스냅샷 사이 추가된 데이터 |
| Delta Change Data Feed(CDF) | 행 단위 변경(insert/update/delete) |
2. Iceberg — 스냅샷 기반 증분 읽기
Iceberg 는 매 커밋이 스냅샷입니다. 두 스냅샷 사이에 새로 추가된 데이터만 읽을 수 있습니다.
# 특정 스냅샷 이후 추가분만 읽기 (append 스냅샷 대상)
incr = (spark.read
.format("iceberg")
.option("start-snapshot-id", last_processed_snapshot)
.option("end-snapshot-id", current_snapshot)
.load("analytics.events"))
# 또는 타임스탬프 기반
incr = (spark.read
.format("iceberg")
.option("start-timestamp", last_run_ts_ms)
.load("analytics.events"))처리 후, 마지막으로 본 스냅샷 ID 를 저장해 다음 실행의 시작점으로 씁니다.
# 처리 후 현재 스냅샷 ID 를 체크포인트 테이블에 기록 (다음 실행 시작점)
current = spark.sql("SELECT snapshot_id FROM analytics.events.snapshots ORDER BY committed_at DESC LIMIT 1")Iceberg incremental read 는 주로 append(추가) 를 다룹니다. update/delete 까지 행 단위로 추적하려면 Delta CDF 나 별도 설계가 필요합니다.
3. Delta Change Data Feed (CDF)
Delta 의 CDF 는 insert/update/delete 를 행 단위로 기록합니다. 변경 유형까지 알 수 있어 강력합니다.
# 테이블에 CDF 활성화 (생성 시 또는 ALTER)
spark.sql("ALTER TABLE analytics.users SET TBLPROPERTIES (delta.enableChangeDataFeed = true)")# 버전(또는 타임스탬프) 범위의 변경분 읽기
changes = (spark.read
.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", last_version)
.table("analytics.users"))
# changes 에는 특별 컬럼이 추가됨:
# _change_type: insert / update_preimage / update_postimage / delete
# _commit_version, _commit_timestamp_change_type | 의미 |
|---|---|
insert | 새로 삽입된 행 |
update_preimage | 업데이트 전 값 |
update_postimage | 업데이트 후 값 |
delete | 삭제된 행 |
스트리밍으로도 변경분을 구독할 수 있습니다.
stream = (spark.readStream
.format("delta")
.option("readChangeFeed", "true")
.table("analytics.users"))4. 변경분을 다운스트림에 반영 (멱등 MERGE)
읽은 변경분을 하류 테이블에 적용합니다. insert/update 는 upsert 로, delete 는 삭제로 반영합니다.
def apply_changes(batch_df, batch_id):
# update 는 postimage 만 사용, delete 와 함께 처리
latest = (batch_df
.filter("_change_type IN ('insert','update_postimage','delete')"))
latest.createOrReplaceTempView("changes")
batch_df.sparkSession.sql("""
MERGE INTO marts.users t USING changes s
ON t.id = s.id
WHEN MATCHED AND s._change_type = 'delete' THEN DELETE
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED AND s._change_type != 'delete' THEN INSERT *
""")
(stream.writeStream
.option("checkpointLocation", "s3://bucket/ckpt/users-cdc")
.foreachBatch(apply_changes)
.start())CDF + MERGE 는 소스의 변경을 하류에 정확히 전파하는 표준 패턴입니다. delete 까지 반영되므로 하류가 소스와 일관되게 유지됩니다.
5. 메달리온 아키텍처 — 증분 전파
증분 처리의 진가는 여러 레이어(Bronze→Silver→Gold)를 증분으로 연결할 때 드러납니다.
Bronze(원천 적재) ──CDF/incremental──> Silver(정제) ──CDF──> Gold(집계)
각 단계가 "바뀐 것만" 다음 단계로 전파 → 전체 재계산 없음# Silver: Bronze 의 변경분만 읽어 정제 후 Silver 에 머지
bronze_changes = read_changes("bronze.events", last_version)
cleaned = transform(bronze_changes)
merge_into("silver.events", cleaned)
# Gold: Silver 의 변경분만 읽어 집계 갱신 (증분 집계는 주의 — 아래)각 레이어가 변경분만 처리하니, 매일 수 TB 를 통째로 재계산하지 않아도 됩니다.
6. 함정 — 증분 집계의 정합성
증분으로 upsert 를 전파하는 건 쉽지만, 증분 집계는 까다롭습니다. "어제 합계 + 오늘 변경분"이 항상 맞지는 않습니다(특히 update/delete 가 과거 집계를 바꿀 때).
| 집계 | 증분 가능성 |
|---|---|
| count/sum (insert만) | 더하면 됨 |
| sum (update/delete 포함) | 변경 전후 차이를 반영해야 |
| distinct | 증분 어려움(스케치 필요) |
| min/max (delete 포함) | 재계산 필요할 수 있음 |
원칙: insert-only 스트림의 집계는 증분이 쉽지만, update/delete 가 과거를 바꾸면 해당 파티션/그룹만 재집계하는 게 안전합니다. 영향받는 키만 골라 부분 재계산하는 설계를 고려하세요.
7. 증분 vs 전체 재처리 선택
| 상황 | 권장 |
|---|---|
| 대용량·잦은 갱신 | 증분(CDF/incremental) |
| 소량·단순 | 전체 재처리(단순함이 이김) |
| 복잡한 집계(과거 변경) | 영향 파티션만 재계산 |
| append-only 이벤트 | Iceberg incremental read |
| update/delete 추적 | Delta CDF |
증분이 항상 옳은 건 아닙니다. 데이터가 작으면 전체 재처리가 단순하고 버그가 적습니다.
8. 정리
| 도구 | 추적 | 용도 |
|---|---|---|
| Iceberg incremental read | 스냅샷 간 추가분 | append 증분 |
| Delta CDF | 행 단위 i/u/d | 변경 전파 |
| MERGE | 멱등 반영 | 다운스트림 적용 |
| 스냅샷/버전 체크포인트 | 진행 위치 | 다음 시작점 |
증분 처리의 핵심은 "전체를 다시 읽지 말고, 바뀐 것만 읽어 전파하라"입니다. Iceberg 의 스냅샷 증분과 Delta 의 Change Data Feed 가 변경 추적을 테이블 포맷 차원에서 제공하므로, updated_at 으로 고생하던 증분 로직이 선언적으로 단순해집니다. CDF + MERGE 로 insert/update/delete 를 하류에 멱등하게 반영하고, 메달리온 레이어를 증분으로 연결하면 — 수 TB 를 매일 재계산하지 않고도 신선한 데이터를 유지할 수 있습니다. 단, 증분 집계의 정합성만은 신중히 다루세요.
이 글은 Spark 3.5 + Iceberg/Delta 기준으로 작성되었습니다. 증분 처리·CDC 파이프라인이나 메달리온 아키텍처 설계가 필요하시면 언제든 문의해 주세요.
— Data Dynamics 엔지니어링 팀