PySpark 시계열 처리 — 갭필, 리샘플링, 그리고 큰 파티션 윈도우
센서·로그·금융 시계열에서 빠진 구간을 채우고(gap fill), 간격을 재조정하고(resampling), 큰 파티션 윈도우 OOM 을 피하는 법. forward fill, 시간 버킷 집계, unbounded window 의 위험과 대안을 PySpark 코드로 정리합니다.
시계열 데이터는 비어 있는 게 정상입니다. 센서는 가끔 값을 안 보내고, 거래는 특정 시각에만 발생하며, 로그는 불규칙하게 찍힙니다. 그런데 분석·ML 은 보통 규칙적인 시계열을 전제합니다. 빠진 구간을 채우고(gap fill), 간격을 재조정하고(resampling), 그러면서도 큰 파티션이 메모리를 터뜨리지 않게 하는 것 — 이것이 PySpark 시계열 처리의 핵심 난제입니다.
이 글은 갭필, 리샘플링, forward fill, 그리고 윈도우 함수의 메모리 함정을 실전 패턴으로 정리합니다.
1. 시계열의 세 가지 과제
① 갭필(Gap Fill) : 빠진 시각의 행을 만들어 채우기 (없는 행 생성)
② 리샘플링 : 불규칙/세밀한 간격 → 규칙적 간격으로 (1초 → 1분)
③ Forward Fill : 빠진 값을 직전 값으로 채우기 (LOCF)각각 도구가 다릅니다.
| 과제 | 핵심 도구 |
|---|---|
| 갭필 | sequence + explode 로 시각 격자 생성 후 left join |
| 리샘플링 | window/date_trunc 시간 버킷 집계 |
| forward fill | last(ignorenulls) over window |
2. 리샘플링 — 시간 버킷 집계
가장 흔한 작업. 세밀한 데이터를 일정 간격(1분, 1시간)으로 집계합니다.
from pyspark.sql import functions as F
# 1분 단위로 리샘플 (텀블링 윈도우)
resampled = (df
.groupBy(
"sensor_id",
F.window("event_time", "1 minute"))
.agg(
F.avg("value").alias("avg_value"),
F.max("value").alias("max_value"),
F.count("*").alias("n")))
# window 구조체를 평탄화
resampled = resampled.select(
"sensor_id",
F.col("window.start").alias("ts"),
"avg_value", "max_value", "n")date_trunc 로도 가능합니다(겹치지 않는 단순 버킷).
df.withColumn("minute", F.date_trunc("minute", "event_time")) \
.groupBy("sensor_id", "minute").agg(F.avg("value"))리샘플링은 데이터를 줄이는 방향이라 안전합니다. 문제는 다음 단계 — 줄였더니 중간에 빈 버킷이 생기는 갭입니다.
3. 갭필 — 없는 행 만들기
리샘플 후 "센서가 값을 안 보낸 1분"은 행 자체가 없습니다. 분석을 위해 모든 시각 격자를 만들고 left join 해서 빈 행을 채웁니다.
# 1) 전체 시간 격자 생성: 시작~끝을 1분 간격으로
bounds = df.agg(
F.min("event_time").alias("t0"),
F.max("event_time").alias("t1")).collect()[0]
grid = (spark.sql(f"""
SELECT explode(sequence(
timestamp '{bounds.t0}',
timestamp '{bounds.t1}',
interval 1 minute)) AS ts
"""))
# 2) 센서 × 시간격자 교차 (모든 센서가 모든 시각을 갖도록)
sensors = df.select("sensor_id").distinct()
full_grid = sensors.crossJoin(grid) # 센서 수 × 시각 수
# 3) 실제 데이터를 격자에 left join → 없는 시각은 NULL
filled = full_grid.join(resampled, ["sensor_id", "ts"], "left")sequence(start, stop, interval) + explode 가 시각 격자를 만드는 핵심입니다. crossJoin 은 센서 수가 많으면 커지므로, 센서별로 시간 범위가 다르면 범위를 좁혀 만드세요.
4. Forward Fill (LOCF) — 빈 값을 직전 값으로
갭필로 행은 만들었지만 값이 NULL 입니다. 시계열에서는 보통 직전 관측값으로 채웁니다(Last Observation Carried Forward).
from pyspark.sql.window import Window
w = (Window.partitionBy("sensor_id")
.orderBy("ts")
.rowsBetween(Window.unboundedPreceding, Window.currentRow))
# 직전의 NULL 아닌 값으로 채우기
filled = filled.withColumn(
"value_ffill",
F.last("value", ignorenulls=True).over(w))F.last(..., ignorenulls=True) 가 forward fill 의 핵심입니다. (이 패턴은 as-of join 과 같은 원리입니다 — 별도 글 "PySpark As-of Join" 참고.)
Backward fill / 보간
# backward fill: 역순 정렬 후 last, 또는 first(ignorenulls) with following 프레임
w_back = Window.partitionBy("sensor_id").orderBy("ts") \
.rowsBetween(Window.currentRow, Window.unboundedFollowing)
filled = filled.withColumn("value_bfill", F.first("value", ignorenulls=True).over(w_back))
# 선형 보간은 직전·직후 값과 시간 거리로 직접 계산5. 가장 위험한 함정 — 큰 파티션 윈도우 OOM
시계열 윈도우의 치명적 함정: partitionBy 키의 한 파티션이 메모리에 다 들어가야 합니다. 센서가 적고 데이터가 길면, 한 센서의 전체 시계열이 한 익스큐터로 모여 OOM 이 납니다.
partitionBy("sensor_id") + 센서 10개 × 각 1억 포인트
→ 한 파티션 = 1억 행이 한 익스큐터 메모리에 → OOM 💥특히 orderBy 가 있는 윈도우 + unbounded 프레임은 정렬을 위해 파티션 전체를 들고 있어야 해서 위험합니다.
대응:
| 상황 | 대응 |
|---|---|
| 파티션 키 카디널리티 낮음 | 시간(일/월)을 파티션 키에 추가해 잘게 |
| unbounded 누적 필요 | 범위를 시간 청크로 분할 처리 |
| forward fill 거리 무제한 | 합리적 최대 채움 거리로 프레임 제한 |
# 파티션을 잘게: sensor_id + 날짜로 (단, fill 이 날짜 경계를 못 넘는 점 주의)
w = Window.partitionBy("sensor_id", F.to_date("ts")).orderBy("ts")...
# 또는 forward fill 거리를 제한 (최근 60분만 채움)
w = (Window.partitionBy("sensor_id").orderBy(F.col("ts").cast("long"))
.rangeBetween(-3600, 0)) # 직전 1시간 이내만핵심 트레이드오프: 파티션을 잘게 쪼개면 OOM 은 피하지만, fill 이 청크 경계를 넘지 못합니다(날짜가 바뀌면 직전 값을 못 가져옴). 경계를 넘는 fill 이 필요하면 청크 끝값을 다음 청크로 전파하는 추가 처리가 필요합니다.
6. 세션화 (시계열 패턴)
"30분 이상 비활동 시 새 세션"같은 세션 구분도 시계열 윈도우의 응용입니다.
w = Window.partitionBy("user_id").orderBy("ts")
sessionized = (df
.withColumn("prev_ts", F.lag("ts").over(w))
.withColumn("gap_min",
(F.col("ts").cast("long") - F.col("prev_ts").cast("long")) / 60)
.withColumn("is_new",
F.when((F.col("gap_min") > 30) | F.col("prev_ts").isNull(), 1).otherwise(0))
.withColumn("session_id",
F.sum("is_new").over(w.rowsBetween(Window.unboundedPreceding, Window.currentRow))))7. 성능 체크리스트
- 리샘플링으로 먼저 데이터 줄이기(윈도우 전에)
- 윈도우
partitionBy키 카디널리티 확인(낮으면 OOM) - unbounded 프레임 남발 금지 — 필요한 범위로 제한
- forward fill 거리 제한(
rangeBetween시간) - 시각 격자 crossJoin 범위 최소화
- AQE 켜기, 스큐 점검
8. 정리
| 과제 | 도구 | 함정 |
|---|---|---|
| 리샘플링 | window/date_trunc 집계 | (안전) |
| 갭필 | sequence+explode+left join | crossJoin 폭발 |
| forward fill | last(ignorenulls) over window | 큰 파티션 OOM |
| 세션화 | lag+조건부 누적합 | 파티션 스큐 |
시계열 처리의 핵심은 "리샘플링으로 줄이고, 격자로 채우고, last 로 메우되, 파티션 크기를 항상 의식하는 것"입니다. 윈도우 함수는 강력하지만 partitionBy 한 파티션을 통째로 메모리에 올린다는 사실을 잊으면, 센서 몇 개짜리 데이터에서도 OOM 을 만납니다. 채움 거리와 청크 경계의 트레이드오프를 이해하면, 수십억 포인트의 시계열도 안정적으로 규칙화할 수 있습니다.
이 글은 Spark 3.5 기준으로 작성되었습니다. 대규모 시계열·IoT 데이터 파이프라인 설계가 필요하시면 언제든 문의해 주세요.
— Data Dynamics 엔지니어링 팀