PySpark Kafka 스트리밍 심화 — 오프셋, 백프레셔, Exactly-Once
Structured Streaming 으로 Kafka 를 안정적으로 소비하는 실전 가이드. 오프셋 관리와 체크포인트, maxOffsetsPerTrigger 백프레셔, 스키마 파싱, exactly-once 적재, 그리고 컨슈머 랙·재처리 운영까지 정리합니다.
Kafka 는 실시간 데이터의 사실상 표준 버스이고, Spark Structured Streaming 은 이를 소비해 Lakehouse 에 적재하는 가장 흔한 도구입니다. 그런데 "Kafka 에서 읽어서 쓴다"는 단순해 보이는 작업에 — 오프셋 관리, 백프레셔, 스키마 파싱, exactly-once, 컨슈머 랙 — 운영의 디테일이 잔뜩 숨어 있습니다.
이 글은 PySpark 로 Kafka 를 안정적으로 소비·적재하는 실전 패턴을 정리합니다. (스트리밍 상태 관리·워터마크 일반론은 별도 글 "PySpark Structured Streaming 상태 관리와 Exactly-Once"를 참고하세요.)
1. Kafka 소스 읽기 기본
from pyspark.sql import functions as F
raw = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "broker1:9092,broker2:9092")
.option("subscribe", "events") # 토픽
.option("startingOffsets", "latest") # 또는 earliest
.option("maxOffsetsPerTrigger", "500000") # 백프레셔(아래)
.load())
# Kafka 메시지는 바이너리 — key/value 는 binary
# raw 의 컬럼: key, value, topic, partition, offset, timestamp| Kafka 컬럼 | 의미 |
|---|---|
key, value | 메시지 (binary) |
topic, partition, offset | 위치 정보 |
timestamp | Kafka 타임스탬프 |
2. 오프셋 관리 — 체크포인트가 전부
"어디까지 읽었나"(오프셋)를 체크포인트가 관리합니다. Kafka 컨슈머 그룹의 __consumer_offsets 가 아니라, Spark 체크포인트가 진실의 원천입니다.
query = (parsed.writeStream
.option("checkpointLocation", "s3://bucket/ckpt/events") # 필수
.start())startingOffsets | 동작 |
|---|---|
latest | 쿼리 시작 이후 메시지만(기존 무시) |
earliest | 토픽 처음부터 |
| JSON 지정 | 특정 파티션·오프셋부터 |
핵심:
startingOffsets는 체크포인트가 없는 첫 실행에만 적용됩니다. 재시작 시에는 체크포인트의 오프셋에서 이어갑니다. 그래서 체크포인트를 지우면 처음부터(또는 latest 부터) 다시 읽게 되니 주의하세요. 체크포인트는 오프셋·상태·진행상황을 모두 담은 핵심 자산입니다.
3. 백프레셔 — maxOffsetsPerTrigger
스트리밍이 한참 멈췄다 재개되거나 트래픽이 폭증하면, 한 마이크로배치가 수천만 건을 한 번에 처리하려다 OOM 이 납니다. maxOffsetsPerTrigger 로 배치당 최대 메시지 수를 제한합니다.
.option("maxOffsetsPerTrigger", "500000") # 트리거당 최대 50만 건백프레셔 없음: 밀린 1억 건을 한 배치에 → OOM 💥
백프레셔 있음: 배치당 50만 건씩 나눠 처리 → 안정적으로 따라잡음이 값은 배치 처리 능력에 맞춰 정합니다. 너무 작으면 지연(랙)이 안 줄고, 너무 크면 배치가 무거워집니다.
4. 스키마 파싱 — binary value → 구조화
Kafka value 는 보통 JSON 또는 Avro 입니다. from_json/from_avro 로 파싱합니다.
from pyspark.sql import types as T
schema = T.StructType([
T.StructField("user_id", T.LongType()),
T.StructField("event_type", T.StringType()),
T.StructField("event_time", T.TimestampType()),
])
parsed = (raw
.select(
F.col("key").cast("string").alias("key"),
F.from_json(F.col("value").cast("string"), schema).alias("data"),
F.col("timestamp").alias("kafka_ts"),
F.col("offset"))
.select("key", "data.*", "kafka_ts", "offset"))Avro + Schema Registry 를 쓰면 from_avro 와 레지스트리 연동을 사용합니다. (Avro 스키마 일반론은 별도 글 "Apache Avro Schema 완벽 가이드" 참고.) 깨진 메시지는 격리하세요(별도 글 "중첩 반정형 데이터"의 quarantine 패턴).
5. Exactly-Once 적재 — foreachBatch + MERGE
Kafka→Lakehouse 적재에서 exactly-once 는 체크포인트(재생 가능 소스) + 멱등 싱크의 조합으로 성립합니다. 파일 append 대신 foreachBatch 로 Iceberg/Delta MERGE 를 씁니다.
def upsert(batch_df, batch_id):
# 배치 내 키 중복 제거 (MERGE source 는 키당 1행)
from pyspark.sql.window import Window
w = Window.partitionBy("user_id", "event_time").orderBy(F.col("offset").desc())
deduped = batch_df.withColumn("rn", F.row_number().over(w)).where("rn=1").drop("rn")
deduped.createOrReplaceTempView("u")
batch_df.sparkSession.sql("""
MERGE INTO analytics.events t USING u s
ON t.user_id = s.user_id AND t.event_time = s.event_time
WHEN NOT MATCHED THEN INSERT *
""")
query = (parsed.writeStream
.option("checkpointLocation", "s3://bucket/ckpt/events")
.foreachBatch(upsert)
.trigger(processingTime="1 minute")
.start())재시작으로 같은 배치가 재처리돼도, MERGE 의 키 매칭으로 중복이 안 생깁니다(멱등). Kafka offset 을 dedup 키나 타이브레이커로 활용하면 정합성이 더 견고해집니다.
6. 트리거 모드와 작은 파일
| 트리거 | 동작 | 비고 |
|---|---|---|
processingTime="1 minute" | 1분마다 마이크로배치 | 가장 흔함 |
availableNow=True | 밀린 데이터 처리 후 종료 | 주기 배치식 스트리밍 |
continuous(실험적) | 초저지연 | 제약 많음 |
트리거 간격이 짧으면(5초) 작은 파일이 폭증합니다. 적재 스트리밍은 보통 1~5분 트리거로 배치를 키우고, 그래도 쌓이는 작은 파일은 정기 컴팩션으로 정리합니다(별도 글 "PySpark Small Files Problem").
availableNow는 "스트리밍 코드로 주기 배치"를 돌리는 실용적 모드입니다.
7. 멀티 토픽·파티션과 병렬성
.option("subscribe", "events,clicks,orders") # 여러 토픽
.option("subscribePattern", "logs-.*") # 패턴 구독- 병렬성은 Kafka 파티션 수에 묶입니다. 토픽 파티션이 적으면 Spark task 도 그만큼만 — 처리량이 안 늘면 토픽 파티션을 늘려야 할 수 있습니다.
minPartitions로 Spark 쪽 파티션을 더 쪼개 병렬도를 높일 수 있습니다(파티션당 데이터가 클 때).
8. 운영 — 컨슈머 랙과 재처리
| 관심사 | 방법 |
|---|---|
| 컨슈머 랙 | Kafka 랙 모니터링(처리 지연 감지) |
| 처리 지연 | StreamingQueryProgress(inputRowsPerSecond 등) |
| 재처리 | 체크포인트 새 경로 + startingOffsets 지정 |
| 스키마 진화 | 신규 필드 nullable, 격리로 깨짐 흡수 |
| 쿼리 변경 | 체크포인트 호환성 주의(상태/스키마 변경) |
# 진행 상황 모니터링
for q in spark.streams.active:
print(q.lastProgress) # 배치 시간, 입력 속도, 처리 속도, 랙재처리(특정 시점부터 다시): 기존 체크포인트를 건드리지 말고 새 체크포인트 경로 +
startingOffsets로 별도 실행하세요. 기존 체크포인트를 지우면 진행상황·상태가 사라집니다.
9. 정리
| 영역 | 핵심 |
|---|---|
| 오프셋 | 체크포인트가 진실의 원천 |
| 백프레셔 | maxOffsetsPerTrigger 로 배치 제한 |
| 파싱 | from_json/from_avro, 깨짐 격리 |
| Exactly-once | foreachBatch + 멱등 MERGE |
| 작은 파일 | 트리거 간격↑ + 정기 컴팩션 |
| 병렬성 | Kafka 파티션 수에 의존 |
Kafka 스트리밍의 핵심은 "체크포인트가 오프셋·상태·진행상황을 모두 관리한다"는 사실을 중심에 두는 것입니다. 백프레셔로 폭주를 막고, from_json 으로 안전하게 파싱하며, foreachBatch + 멱등 MERGE 로 exactly-once 를 달성하면 — 24시간 흐르는 실시간 파이프라인을 중복·유실 없이 운영할 수 있습니다. 트리거 간격과 컴팩션으로 작은 파일까지 다스리면 Lakehouse 실시간 적재가 완성됩니다.
이 글은 Spark 3.5 + Kafka 기준으로 작성되었습니다. 실시간 Kafka-Lakehouse 적재 파이프라인 설계가 필요하시면 언제든 문의해 주세요.
— Data Dynamics 엔지니어링 팀