Blog
pysparksparktime-seriesgap-fillwindowdata-engineering

PySpark 시계열 처리 — 갭필, 리샘플링, 그리고 큰 파티션 윈도우

센서·로그·금융 시계열에서 빠진 구간을 채우고(gap fill), 간격을 재조정하고(resampling), 큰 파티션 윈도우 OOM 을 피하는 법. forward fill, 시간 버킷 집계, unbounded window 의 위험과 대안을 PySpark 코드로 정리합니다.

Data Dynamics2026年6月5日9 min read
This post is not yet translated. The original Korean version is shown below.

시계열 데이터는 비어 있는 게 정상입니다. 센서는 가끔 값을 안 보내고, 거래는 특정 시각에만 발생하며, 로그는 불규칙하게 찍힙니다. 그런데 분석·ML 은 보통 규칙적인 시계열을 전제합니다. 빠진 구간을 채우고(gap fill), 간격을 재조정하고(resampling), 그러면서도 큰 파티션이 메모리를 터뜨리지 않게 하는 것 — 이것이 PySpark 시계열 처리의 핵심 난제입니다.

이 글은 갭필, 리샘플링, forward fill, 그리고 윈도우 함수의 메모리 함정을 실전 패턴으로 정리합니다.

1. 시계열의 세 가지 과제

① 갭필(Gap Fill)   : 빠진 시각의 행을 만들어 채우기 (없는 행 생성)
② 리샘플링         : 불규칙/세밀한 간격 → 규칙적 간격으로 (1초 → 1분)
③ Forward Fill     : 빠진 값을 직전 값으로 채우기 (LOCF)

각각 도구가 다릅니다.

과제핵심 도구
갭필sequence + explode 로 시각 격자 생성 후 left join
리샘플링window/date_trunc 시간 버킷 집계
forward filllast(ignorenulls) over window

2. 리샘플링 — 시간 버킷 집계

가장 흔한 작업. 세밀한 데이터를 일정 간격(1분, 1시간)으로 집계합니다.

from pyspark.sql import functions as F
 
# 1분 단위로 리샘플 (텀블링 윈도우)
resampled = (df
    .groupBy(
        "sensor_id",
        F.window("event_time", "1 minute"))
    .agg(
        F.avg("value").alias("avg_value"),
        F.max("value").alias("max_value"),
        F.count("*").alias("n")))
 
# window 구조체를 평탄화
resampled = resampled.select(
    "sensor_id",
    F.col("window.start").alias("ts"),
    "avg_value", "max_value", "n")

date_trunc 로도 가능합니다(겹치지 않는 단순 버킷).

df.withColumn("minute", F.date_trunc("minute", "event_time")) \
  .groupBy("sensor_id", "minute").agg(F.avg("value"))

리샘플링은 데이터를 줄이는 방향이라 안전합니다. 문제는 다음 단계 — 줄였더니 중간에 빈 버킷이 생기는 갭입니다.

3. 갭필 — 없는 행 만들기

리샘플 후 "센서가 값을 안 보낸 1분"은 행 자체가 없습니다. 분석을 위해 모든 시각 격자를 만들고 left join 해서 빈 행을 채웁니다.

# 1) 전체 시간 격자 생성: 시작~끝을 1분 간격으로
bounds = df.agg(
    F.min("event_time").alias("t0"),
    F.max("event_time").alias("t1")).collect()[0]
 
grid = (spark.sql(f"""
    SELECT explode(sequence(
        timestamp '{bounds.t0}',
        timestamp '{bounds.t1}',
        interval 1 minute)) AS ts
"""))
 
# 2) 센서 × 시간격자 교차 (모든 센서가 모든 시각을 갖도록)
sensors = df.select("sensor_id").distinct()
full_grid = sensors.crossJoin(grid)        # 센서 수 × 시각 수
 
# 3) 실제 데이터를 격자에 left join → 없는 시각은 NULL
filled = full_grid.join(resampled, ["sensor_id", "ts"], "left")

sequence(start, stop, interval) + explode 가 시각 격자를 만드는 핵심입니다. crossJoin 은 센서 수가 많으면 커지므로, 센서별로 시간 범위가 다르면 범위를 좁혀 만드세요.

4. Forward Fill (LOCF) — 빈 값을 직전 값으로

갭필로 행은 만들었지만 값이 NULL 입니다. 시계열에서는 보통 직전 관측값으로 채웁니다(Last Observation Carried Forward).

from pyspark.sql.window import Window
 
w = (Window.partitionBy("sensor_id")
            .orderBy("ts")
            .rowsBetween(Window.unboundedPreceding, Window.currentRow))
 
# 직전의 NULL 아닌 값으로 채우기
filled = filled.withColumn(
    "value_ffill",
    F.last("value", ignorenulls=True).over(w))

F.last(..., ignorenulls=True) 가 forward fill 의 핵심입니다. (이 패턴은 as-of join 과 같은 원리입니다 — 별도 글 "PySpark As-of Join" 참고.)

Backward fill / 보간

# backward fill: 역순 정렬 후 last, 또는 first(ignorenulls) with following 프레임
w_back = Window.partitionBy("sensor_id").orderBy("ts") \
    .rowsBetween(Window.currentRow, Window.unboundedFollowing)
filled = filled.withColumn("value_bfill", F.first("value", ignorenulls=True).over(w_back))
 
# 선형 보간은 직전·직후 값과 시간 거리로 직접 계산

5. 가장 위험한 함정 — 큰 파티션 윈도우 OOM

시계열 윈도우의 치명적 함정: partitionBy 키의 한 파티션이 메모리에 다 들어가야 합니다. 센서가 적고 데이터가 길면, 한 센서의 전체 시계열이 한 익스큐터로 모여 OOM 이 납니다.

partitionBy("sensor_id")  +  센서 10개 × 각 1억 포인트
→ 한 파티션 = 1억 행이 한 익스큐터 메모리에 → OOM 💥

특히 orderBy 가 있는 윈도우 + unbounded 프레임은 정렬을 위해 파티션 전체를 들고 있어야 해서 위험합니다.

대응:

상황대응
파티션 키 카디널리티 낮음시간(일/월)을 파티션 키에 추가해 잘게
unbounded 누적 필요범위를 시간 청크로 분할 처리
forward fill 거리 무제한합리적 최대 채움 거리로 프레임 제한
# 파티션을 잘게: sensor_id + 날짜로 (단, fill 이 날짜 경계를 못 넘는 점 주의)
w = Window.partitionBy("sensor_id", F.to_date("ts")).orderBy("ts")...
 
# 또는 forward fill 거리를 제한 (최근 60분만 채움)
w = (Window.partitionBy("sensor_id").orderBy(F.col("ts").cast("long"))
     .rangeBetween(-3600, 0))   # 직전 1시간 이내만

핵심 트레이드오프: 파티션을 잘게 쪼개면 OOM 은 피하지만, fill 이 청크 경계를 넘지 못합니다(날짜가 바뀌면 직전 값을 못 가져옴). 경계를 넘는 fill 이 필요하면 청크 끝값을 다음 청크로 전파하는 추가 처리가 필요합니다.

6. 세션화 (시계열 패턴)

"30분 이상 비활동 시 새 세션"같은 세션 구분도 시계열 윈도우의 응용입니다.

w = Window.partitionBy("user_id").orderBy("ts")
 
sessionized = (df
    .withColumn("prev_ts", F.lag("ts").over(w))
    .withColumn("gap_min",
        (F.col("ts").cast("long") - F.col("prev_ts").cast("long")) / 60)
    .withColumn("is_new",
        F.when((F.col("gap_min") > 30) | F.col("prev_ts").isNull(), 1).otherwise(0))
    .withColumn("session_id",
        F.sum("is_new").over(w.rowsBetween(Window.unboundedPreceding, Window.currentRow))))

7. 성능 체크리스트

  • 리샘플링으로 먼저 데이터 줄이기(윈도우 전에)
  • 윈도우 partitionBy 키 카디널리티 확인(낮으면 OOM)
  • unbounded 프레임 남발 금지 — 필요한 범위로 제한
  • forward fill 거리 제한(rangeBetween 시간)
  • 시각 격자 crossJoin 범위 최소화
  • AQE 켜기, 스큐 점검

8. 정리

과제도구함정
리샘플링window/date_trunc 집계(안전)
갭필sequence+explode+left joincrossJoin 폭발
forward filllast(ignorenulls) over window큰 파티션 OOM
세션화lag+조건부 누적합파티션 스큐

시계열 처리의 핵심은 "리샘플링으로 줄이고, 격자로 채우고, last 로 메우되, 파티션 크기를 항상 의식하는 것"입니다. 윈도우 함수는 강력하지만 partitionBy 한 파티션을 통째로 메모리에 올린다는 사실을 잊으면, 센서 몇 개짜리 데이터에서도 OOM 을 만납니다. 채움 거리와 청크 경계의 트레이드오프를 이해하면, 수십억 포인트의 시계열도 안정적으로 규칙화할 수 있습니다.


이 글은 Spark 3.5 기준으로 작성되었습니다. 대규모 시계열·IoT 데이터 파이프라인 설계가 필요하시면 언제든 문의해 주세요.

— Data Dynamics 엔지니어링 팀