Blog
pysparksparkstructured-streamingexactly-oncewatermarkdata-engineering

PySpark Structured Streaming 상태 관리와 Exactly-Once 적재

스트리밍에서 가장 어려운 두 가지 — 무한히 커지는 상태와 장애 후 중복/유실 — 를 다룹니다. 워터마크와 상태 정리, foreachBatch + Iceberg/Delta 멱등 MERGE 로 exactly-once 를 구현하는 법, 그리고 늦게 도착한 데이터 처리까지 정리합니다.

Data Dynamics2026年6月5日10 min read
This post is not yet translated. The original Korean version is shown below.

배치 잡은 실패하면 다시 돌리면 됩니다. 하지만 스트리밍은 24시간 멈추지 않고, 장애가 나도 중복 없이, 유실 없이 이어가야 합니다. PySpark Structured Streaming 에서 가장 어려운 두 가지가 바로 이것입니다 — 무한히 커지는 상태(state)exactly-once 정합성.

이 글은 워터마크로 상태를 길들이는 법, 늦게 도착한(late) 데이터 처리, 그리고 foreachBatch + Lakehouse 멱등 MERGE 로 exactly-once 적재를 구현하는 실전 패턴을 정리합니다.

1. 두 가지 핵심 난제

난제 ①  상태 무한 증가
  스트리밍 집계·조인·dedup 은 상태를 메모리/체크포인트에 보관
  → 정리 안 하면 상태가 영원히 커져 OOM
 
난제 ②  Exactly-once
  장애 → 재시작 → 마지막 배치 재처리
  → 잘못하면 같은 데이터를 두 번 쓰거나(중복) 빠뜨림(유실)

이 둘을 각각 워터마크멱등 싱크로 해결합니다.

2. 워터마크 — 상태를 길들이는 핵심

워터마크는 "이 시각보다 더 늦게 오는 데이터는 버린다"는 경계입니다. 이 경계 덕분에 Spark 는 그보다 오래된 상태를 안전하게 정리할 수 있습니다.

from pyspark.sql import functions as F
 
agg = (stream
    .withWatermark("event_time", "10 minutes")    # 10분까지 지각 허용
    .groupBy(
        F.window("event_time", "5 minutes"),       # 5분 텀블링 윈도우
        "user_id")
    .agg(F.count("*").alias("cnt")))

동작:

  • 윈도우 종료 시각 + 워터마크 지연(10분)이 지나면, 그 윈도우의 상태를 메모리에서 제거합니다.
  • 워터마크 없이 집계/조인/dedup 을 하면 상태가 무한히 쌓여 결국 OOM 입니다.
워터마크 지연트레이드오프
짧게(예: 1분)상태 적음, 지각 데이터 많이 버림
길게(예: 1시간)지각 데이터 잘 포착, 상태 큼

워터마크 지연은 "실제로 데이터가 얼마나 늦게 오는가"의 분포로 정합니다. 너무 짧으면 정상 데이터를 유실하고, 너무 길면 상태가 비대해집니다.

3. 상태가 쌓이는 연산들

연산상태워터마크 필요
윈도우 집계윈도우별 누적필수(정리용)
스트림-스트림 조인양쪽 버퍼필수
dropDuplicates본 키 집합필수(없으면 무한)
flatMapGroupsWithState사용자 정의 상태직접 만료 관리
# 스트리밍 dedup — 워터마크 없으면 상태 폭발
dedup = (stream
    .withWatermark("event_time", "1 hour")
    .dropDuplicates(["event_id", "event_time"]))

4. 커스텀 상태 — flatMapGroupsWithState / transformWithState

윈도우·dedup 으로 표현 안 되는 세션화·이상탐지 같은 로직은 직접 상태를 관리합니다. (PySpark 에서는 applyInPandasWithState, 최신 버전은 transformWithStateInPandas 를 제공합니다.)

# 개념: 그룹(키)별로 상태를 들고, 이벤트마다 갱신, 타임아웃으로 만료
# - 세션 윈도우: 30분 비활동 시 세션 종료·출력
# - 상태에 last_seen 보관, 타임아웃 처리로 만료

핵심은 상태에 만료(timeout)를 반드시 설정하는 것입니다. 만료 없는 커스텀 상태는 가장 흔한 스트리밍 OOM 원인입니다.

5. Exactly-Once — 체크포인트 + 멱등 싱크

Structured Streaming 의 exactly-once 는 두 조건의 조합으로 성립합니다.

① 재생 가능한 소스(replayable): Kafka offset 처럼 어디까지 읽었는지 복원 가능
② 멱등/트랜잭션 싱크(idempotent sink): 같은 배치를 다시 써도 결과 동일
        +
   체크포인트(checkpoint): 진행 상황(offset, 상태)을 신뢰 스토리지에 저장
query = (agg.writeStream
    .option("checkpointLocation", "s3://bucket/checkpoints/agg")  # 필수
    .outputMode("update")
    .trigger(processingTime="1 minute")
    .foreachBatch(upsert_to_iceberg)      # 멱등 싱크 (아래)
    .start())

체크포인트는 "어디까지 처리했나"를 기록합니다. 장애 후 재시작하면 체크포인트에서 이어가되, 마지막 배치는 다시 실행될 수 있습니다. 그래서 싱크가 멱등이어야 중복이 안 생깁니다.

6. foreachBatch + Iceberg/Delta 멱등 MERGE

파일 싱크(.format("parquet"))는 append-only 라 정교한 upsert 가 안 됩니다. foreachBatch 로 각 마이크로배치를 일반 DataFrame 처럼 다뤄, Iceberg/Delta 의 트랜잭션 MERGE 로 멱등하게 적재합니다.

def upsert_to_iceberg(micro_batch_df, batch_id):
    # 1) 배치 내 키 중복 제거 (MERGE source 는 키당 1행이어야 함)
    from pyspark.sql.window import Window
    w = Window.partitionBy("id").orderBy(F.col("updated_at").desc())
    deduped = (micro_batch_df
        .withColumn("rn", F.row_number().over(w))
        .where("rn = 1").drop("rn"))
 
    deduped.createOrReplaceTempView("updates")
 
    # 2) 멱등 MERGE: 더 최신일 때만 갱신 → 같은 배치 재실행해도 결과 동일
    micro_batch_df.sparkSession.sql("""
        MERGE INTO analytics.events AS t
        USING updates AS s
          ON t.id = s.id
        WHEN MATCHED AND s.updated_at > t.updated_at THEN UPDATE SET *
        WHEN NOT MATCHED THEN INSERT *
    """)
 
query = (stream.writeStream
    .option("checkpointLocation", "s3://bucket/checkpoints/events")
    .foreachBatch(upsert_to_iceberg)
    .start())

왜 exactly-once 가 되는가:

  • 재시작으로 같은 batch_id 가 재처리돼도, MERGE 의 s.updated_at > t.updated_at 조건 덕에 두 번 써도 결과가 동일(멱등)합니다.
  • MERGE 는 트랜잭션이라 부분 쓰기로 인한 깨진 상태가 없습니다.

이 패턴(foreachBatch + 멱등 MERGE)이 PySpark 스트리밍 exactly-once 적재의 사실상 표준입니다. 멱등성의 비결은 배치 단위 중복 제거 + updated_at 비교라는, 배치 파이프라인과 똑같은 원리입니다(별도 글 "PySpark 대규모 중복 제거와 SCD Type 2" 참고).

7. 늦게 도착한 데이터(Late Data)

워터마크 경계 안에 도착한 지각 데이터는 해당 윈도우에 반영됩니다. 경계를 넘은 데이터는 버려집니다(드롭).

# 드롭된 지각 데이터를 따로 보관하고 싶다면 (감사·재처리용)
# foreachBatch 안에서 워터마크 기준 이전 데이터를 별도 테이블로 분기

정책 결정:

  • 비즈니스가 지각을 얼마나 허용하나 → 워터마크 지연 결정.
  • 드롭 데이터를 버릴지, 별도 보관 후 배치 재처리할지(람다/카파 아키텍처).

8. 운영 체크리스트

  • 상태 연산(집계/조인/dedup)에 워터마크 설정
  • 커스텀 상태에 타임아웃(만료) 설정
  • checkpointLocation 을 신뢰 스토리지(S3/HDFS)에
  • 싱크는 멱등/트랜잭션(foreachBatch + MERGE)
  • MERGE source 배치 내 키 중복 제거
  • 트리거 간격으로 작은 파일 제어(+ 정기 컴팩션)
  • 상태 크기·처리 지연(lag) 모니터링
  • 체크포인트 스키마 호환성(쿼리 변경 시 주의)

9. 정리

난제해법
상태 무한 증가워터마크로 오래된 상태 정리
커스텀 상태 폭발타임아웃 만료 필수
장애 후 중복/유실체크포인트 + 멱등 싱크
Exactly-once 적재foreachBatch + Iceberg/Delta MERGE
지각 데이터워터마크 지연으로 허용 범위 정의

스트리밍의 두 난제는 결국 두 도구로 수렴합니다 — 상태는 워터마크로, 정합성은 멱등 싱크로. 파일 append 로 대충 쓰면 중복·작은 파일·정합성 문제가 한꺼번에 터지지만, foreachBatch + 멱등 MERGE 패턴을 쓰면 배치 파이프라인과 동일한 정합성 보장을 스트리밍에서도 얻습니다. 24시간 멈추지 않는 파이프라인일수록, 이 기본기가 안정성을 결정합니다.


이 글은 Spark 3.5 + Iceberg/Delta 기준으로 작성되었습니다. 실시간 적재 파이프라인의 정합성·상태 관리 설계가 필요하시면 언제든 문의해 주세요.

— Data Dynamics 엔지니어링 팀