Blog
pysparksparkkafkastructured-streamingexactly-oncedata-engineering

PySpark Kafka 스트리밍 심화 — 오프셋, 백프레셔, Exactly-Once

Structured Streaming 으로 Kafka 를 안정적으로 소비하는 실전 가이드. 오프셋 관리와 체크포인트, maxOffsetsPerTrigger 백프레셔, 스키마 파싱, exactly-once 적재, 그리고 컨슈머 랙·재처리 운영까지 정리합니다.

Data Dynamics2026년 6월 5일9 min read

Kafka 는 실시간 데이터의 사실상 표준 버스이고, Spark Structured Streaming 은 이를 소비해 Lakehouse 에 적재하는 가장 흔한 도구입니다. 그런데 "Kafka 에서 읽어서 쓴다"는 단순해 보이는 작업에 — 오프셋 관리, 백프레셔, 스키마 파싱, exactly-once, 컨슈머 랙 — 운영의 디테일이 잔뜩 숨어 있습니다.

이 글은 PySpark 로 Kafka 를 안정적으로 소비·적재하는 실전 패턴을 정리합니다. (스트리밍 상태 관리·워터마크 일반론은 별도 글 "PySpark Structured Streaming 상태 관리와 Exactly-Once"를 참고하세요.)

1. Kafka 소스 읽기 기본

from pyspark.sql import functions as F
 
raw = (spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "broker1:9092,broker2:9092")
    .option("subscribe", "events")                  # 토픽
    .option("startingOffsets", "latest")            # 또는 earliest
    .option("maxOffsetsPerTrigger", "500000")       # 백프레셔(아래)
    .load())
 
# Kafka 메시지는 바이너리 — key/value 는 binary
# raw 의 컬럼: key, value, topic, partition, offset, timestamp
Kafka 컬럼의미
key, value메시지 (binary)
topic, partition, offset위치 정보
timestampKafka 타임스탬프

2. 오프셋 관리 — 체크포인트가 전부

"어디까지 읽었나"(오프셋)를 체크포인트가 관리합니다. Kafka 컨슈머 그룹의 __consumer_offsets 가 아니라, Spark 체크포인트가 진실의 원천입니다.

query = (parsed.writeStream
    .option("checkpointLocation", "s3://bucket/ckpt/events")  # 필수
    .start())
startingOffsets동작
latest쿼리 시작 이후 메시지만(기존 무시)
earliest토픽 처음부터
JSON 지정특정 파티션·오프셋부터

핵심: startingOffsets체크포인트가 없는 첫 실행에만 적용됩니다. 재시작 시에는 체크포인트의 오프셋에서 이어갑니다. 그래서 체크포인트를 지우면 처음부터(또는 latest 부터) 다시 읽게 되니 주의하세요. 체크포인트는 오프셋·상태·진행상황을 모두 담은 핵심 자산입니다.

3. 백프레셔 — maxOffsetsPerTrigger

스트리밍이 한참 멈췄다 재개되거나 트래픽이 폭증하면, 한 마이크로배치가 수천만 건을 한 번에 처리하려다 OOM 이 납니다. maxOffsetsPerTrigger 로 배치당 최대 메시지 수를 제한합니다.

.option("maxOffsetsPerTrigger", "500000")   # 트리거당 최대 50만 건
백프레셔 없음: 밀린 1억 건을 한 배치에 → OOM 💥
백프레셔 있음: 배치당 50만 건씩 나눠 처리 → 안정적으로 따라잡음

이 값은 배치 처리 능력에 맞춰 정합니다. 너무 작으면 지연(랙)이 안 줄고, 너무 크면 배치가 무거워집니다.

4. 스키마 파싱 — binary value → 구조화

Kafka value 는 보통 JSON 또는 Avro 입니다. from_json/from_avro 로 파싱합니다.

from pyspark.sql import types as T
 
schema = T.StructType([
    T.StructField("user_id", T.LongType()),
    T.StructField("event_type", T.StringType()),
    T.StructField("event_time", T.TimestampType()),
])
 
parsed = (raw
    .select(
        F.col("key").cast("string").alias("key"),
        F.from_json(F.col("value").cast("string"), schema).alias("data"),
        F.col("timestamp").alias("kafka_ts"),
        F.col("offset"))
    .select("key", "data.*", "kafka_ts", "offset"))

Avro + Schema Registry 를 쓰면 from_avro 와 레지스트리 연동을 사용합니다. (Avro 스키마 일반론은 별도 글 "Apache Avro Schema 완벽 가이드" 참고.) 깨진 메시지는 격리하세요(별도 글 "중첩 반정형 데이터"의 quarantine 패턴).

5. Exactly-Once 적재 — foreachBatch + MERGE

Kafka→Lakehouse 적재에서 exactly-once 는 체크포인트(재생 가능 소스) + 멱등 싱크의 조합으로 성립합니다. 파일 append 대신 foreachBatch 로 Iceberg/Delta MERGE 를 씁니다.

def upsert(batch_df, batch_id):
    # 배치 내 키 중복 제거 (MERGE source 는 키당 1행)
    from pyspark.sql.window import Window
    w = Window.partitionBy("user_id", "event_time").orderBy(F.col("offset").desc())
    deduped = batch_df.withColumn("rn", F.row_number().over(w)).where("rn=1").drop("rn")
    deduped.createOrReplaceTempView("u")
 
    batch_df.sparkSession.sql("""
        MERGE INTO analytics.events t USING u s
        ON t.user_id = s.user_id AND t.event_time = s.event_time
        WHEN NOT MATCHED THEN INSERT *
    """)
 
query = (parsed.writeStream
    .option("checkpointLocation", "s3://bucket/ckpt/events")
    .foreachBatch(upsert)
    .trigger(processingTime="1 minute")
    .start())

재시작으로 같은 배치가 재처리돼도, MERGE 의 키 매칭으로 중복이 안 생깁니다(멱등). Kafka offset 을 dedup 키나 타이브레이커로 활용하면 정합성이 더 견고해집니다.

6. 트리거 모드와 작은 파일

트리거동작비고
processingTime="1 minute"1분마다 마이크로배치가장 흔함
availableNow=True밀린 데이터 처리 후 종료주기 배치식 스트리밍
continuous(실험적)초저지연제약 많음

트리거 간격이 짧으면(5초) 작은 파일이 폭증합니다. 적재 스트리밍은 보통 1~5분 트리거로 배치를 키우고, 그래도 쌓이는 작은 파일은 정기 컴팩션으로 정리합니다(별도 글 "PySpark Small Files Problem"). availableNow 는 "스트리밍 코드로 주기 배치"를 돌리는 실용적 모드입니다.

7. 멀티 토픽·파티션과 병렬성

.option("subscribe", "events,clicks,orders")      # 여러 토픽
.option("subscribePattern", "logs-.*")            # 패턴 구독
  • 병렬성은 Kafka 파티션 수에 묶입니다. 토픽 파티션이 적으면 Spark task 도 그만큼만 — 처리량이 안 늘면 토픽 파티션을 늘려야 할 수 있습니다.
  • minPartitions 로 Spark 쪽 파티션을 더 쪼개 병렬도를 높일 수 있습니다(파티션당 데이터가 클 때).

8. 운영 — 컨슈머 랙과 재처리

관심사방법
컨슈머 랙Kafka 랙 모니터링(처리 지연 감지)
처리 지연StreamingQueryProgress(inputRowsPerSecond 등)
재처리체크포인트 새 경로 + startingOffsets 지정
스키마 진화신규 필드 nullable, 격리로 깨짐 흡수
쿼리 변경체크포인트 호환성 주의(상태/스키마 변경)
# 진행 상황 모니터링
for q in spark.streams.active:
    print(q.lastProgress)   # 배치 시간, 입력 속도, 처리 속도, 랙

재처리(특정 시점부터 다시): 기존 체크포인트를 건드리지 말고 새 체크포인트 경로 + startingOffsets 로 별도 실행하세요. 기존 체크포인트를 지우면 진행상황·상태가 사라집니다.

9. 정리

영역핵심
오프셋체크포인트가 진실의 원천
백프레셔maxOffsetsPerTrigger 로 배치 제한
파싱from_json/from_avro, 깨짐 격리
Exactly-onceforeachBatch + 멱등 MERGE
작은 파일트리거 간격↑ + 정기 컴팩션
병렬성Kafka 파티션 수에 의존

Kafka 스트리밍의 핵심은 "체크포인트가 오프셋·상태·진행상황을 모두 관리한다"는 사실을 중심에 두는 것입니다. 백프레셔로 폭주를 막고, from_json 으로 안전하게 파싱하며, foreachBatch + 멱등 MERGE 로 exactly-once 를 달성하면 — 24시간 흐르는 실시간 파이프라인을 중복·유실 없이 운영할 수 있습니다. 트리거 간격과 컴팩션으로 작은 파일까지 다스리면 Lakehouse 실시간 적재가 완성됩니다.


이 글은 Spark 3.5 + Kafka 기준으로 작성되었습니다. 실시간 Kafka-Lakehouse 적재 파이프라인 설계가 필요하시면 언제든 문의해 주세요.

— Data Dynamics 엔지니어링 팀