Blog
pysparksparkasof-jointime-seriespoint-in-timedata-engineering

PySpark As-of Join — "그 시점의 값"을 붙이는 Point-in-Time 조인

거래 시각마다 그 직전의 환율·가격·상태를 붙이는 as-of(point-in-time) 조인을 PySpark 로 구현하는 법. 일반 조인으로 안 되는 이유, range join 폭발 문제, union+window 패턴, 그리고 데이터 누수(미래 정보) 방지까지 정리합니다.

Data Dynamics2026년 6월 5일9 min read

"각 거래 시점에 유효했던 환율을 붙여라." "센서 이벤트마다 그 직전 보정값을 매칭하라." "피처 생성 시점의 고객 등급은?" — 이런 요구는 모두 as-of join(point-in-time join) 입니다. 키가 정확히 일치하는 조인이 아니라, "기준 시각 이전의 가장 가까운 값" 을 찾는 시간 기반 조인입니다.

문제는 PySpark(Spark SQL)에 as-of join 이 1급 구문으로 없다는 것입니다. 잘못 구현하면 range join 으로 데이터가 폭발하거나, 미래 정보가 새어 들어가 ML 파이프라인을 망칩니다. 이 글은 올바른 구현 패턴과 함정을 정리합니다.

1. As-of Join 이란

거래(trades)                환율(rates, 시각별로 변함)
time      symbol            time      rate
10:00:05  KRW               10:00:00  1330
10:00:25  KRW               10:00:10  1331
                            10:00:20  1332
 
→ 10:00:05 거래에는 10:00:00 의 1330 (그 시점 이전 최신)
→ 10:00:25 거래에는 10:00:20 의 1332

일반 equi-join 은 time 이 정확히 같아야 매칭됩니다. 하지만 거래 시각과 환율 갱신 시각은 절대 정확히 안 맞습니다. "≤ 기준시각 중 최대" 를 찾아야 합니다 — 이것이 as-of 의 핵심입니다.

2. 왜 일반 조인으로 안 되나

시도 ① equi-join — 매칭 실패

trades.join(rates, "time")   # 시각이 정확히 같은 경우만 → 대부분 누락

시도 ② range join — 폭발

# 거래시각 이전의 모든 환율과 조인 → 거래당 수많은 행
joined = trades.join(rates,
    (trades.symbol == rates.symbol) & (rates.time <= trades.time))
# 그 후 거래별로 가장 최근 1건만 남기기 → 중간 결과가 거대해짐

range join 은 <= 조건이라 거래 하나가 그 이전 모든 환율과 매칭됩니다. 환율이 하루 수만 건이면 거래당 수만 행이 생겨 셔플·메모리가 폭발합니다. Spark 는 range join 을 잘 최적화하지 못해 사실상 cross join 에 가까워집니다.

3. 올바른 패턴 — Union + Window

검증된 PySpark as-of 패턴은 두 테이블을 union 한 뒤 window 로 직전 값을 끌어오는 것입니다. range join 의 폭발 없이 한 번의 정렬로 해결합니다.

from pyspark.sql import functions as F
from pyspark.sql.window import Window
 
# 1) 두 데이터셋을 공통 스키마로 union (출처 구분 + 값 컬럼)
t = trades.select("symbol", "time",
                  F.lit("trade").alias("src"),
                  F.col("trade_id"),
                  F.lit(None).cast("double").alias("rate"))
 
r = rates.select("symbol", "time",
                 F.lit("rate").alias("src"),
                 F.lit(None).cast("long").alias("trade_id"),
                 F.col("rate"))
 
unioned = t.unionByName(r)
 
# 2) symbol 별 시간순으로 정렬하고, 직전까지의 마지막 rate 를 끌어옴
w = (Window.partitionBy("symbol")
            .orderBy("time")
            .rowsBetween(Window.unboundedPreceding, Window.currentRow))
 
filled = unioned.withColumn(
    "asof_rate",
    F.last("rate", ignorenulls=True).over(w))   # 핵심: 직전 최신 rate 채우기
 
# 3) 거래 행만 남기면 각 거래에 as-of rate 가 붙어 있음
result = filled.where("src = 'trade'").select("trade_id", "symbol", "time", "asof_rate")

핵심은 F.last("rate", ignorenulls=True) 입니다. 시간순으로 보면서 마지막으로 본 NULL 이 아닌 환율을 각 행에 채웁니다. 거래 행에는 원래 rate 가 NULL 이므로, 직전 환율 행의 값이 들어옵니다.

비교range joinunion+window
중간 결과거래 × 이전 환율 전부 (폭발)union(단순 합)
연산조인 + 재필터1회 정렬 + window
확장성나쁨좋음

4. 동시각 처리 — 타이브레이커

거래와 환율이 같은 시각일 때 어느 것을 먼저 볼지 정해야 합니다. "거래 시점 이전(같은 시각 포함)"이라면 환율을 거래보다 먼저 정렬해야 합니다.

# src 에 순서를 부여: 같은 time 이면 rate(0) 가 trade(1) 보다 먼저
w = (Window.partitionBy("symbol")
            .orderBy("time", F.when(F.col("src") == "rate", 0).otherwise(1)))

이 디테일을 놓치면 "같은 시각의 환율"이 거래에 안 붙는 미묘한 버그가 생깁니다.

5. 가장 위험한 함정 — 데이터 누수(미래 정보)

ML 피처 생성에서 as-of join 의 존재 이유가 바로 데이터 누수 방지입니다. 피처 시점 이후의 정보가 학습에 들어가면, 검증에서는 좋아 보이지만 실전에서 무너집니다.

잘못된 조인: 라벨 시점의 "현재" 고객 등급을 붙임 → 미래 정보 누수
올바른 as-of: 피처 시점에 "유효했던" 등급만 → 누수 없음

방어 규칙:

  • 조건은 반드시 rate.time <= trade.time (strict 하게 미래 배제). < vs <= 를 비즈니스 정의에 맞게 명확히.
  • window 정렬에서 동시각 처리를 점검(4장).
  • "현재값 테이블(SCD2 의 is_current)"을 그냥 조인하지 말 것 — 그건 미래값입니다. 반드시 유효기간 기반으로.

as-of join 을 쓰는 진짜 이유의 절반은 성능이고 절반은 정확성(미래 정보 차단) 입니다. 특히 피처 스토어·백테스팅에서 이 누수는 치명적입니다.

6. SCD2 차원과의 as-of 조인

이력 차원(SCD2)에 유효기간이 있으면, as-of 는 유효기간 조인으로 표현됩니다.

# dim_users: valid_from, valid_to (SCD2)
result = (events.alias("e")
    .join(dim.alias("d"),
        (F.col("e.user_id") == F.col("d.user_id")) &
        (F.col("e.event_time") >= F.col("d.valid_from")) &
        (F.col("e.event_time") <  F.col("d.valid_to")))
)

유효기간이 겹치지 않게 관리되면(별도 글 "PySpark 대규모 중복 제거와 SCD Type 2" 참고) 이 조인은 이벤트당 정확히 한 행을 매칭합니다. 다만 이것도 범위 조인이라, 큰 데이터에서는 valid_from 으로 파티션 프루닝이 되도록 설계하거나 union+window 로 전환을 고려합니다.

7. 성능 팁

항목권장
정렬 비용symbol 카디널리티로 파티션 분산, 스큐 주의
메모리window 가 큰 파티션을 메모리에 — 파티션 키 적절히
데이터 축소조인 전 필요한 시간 범위로 필터(불필요 과거 제외)
AQE켜기
단방향성한쪽만 채우면 되므로 양방향 fill 금지

symbol(파티션 키)이 적으면 한 파티션이 거대해져 window 가 OOM 납니다. 키 분포를 확인하세요(스큐는 별도 글 참고).

8. 정리

항목핵심
정의"기준 시각 이전의 가장 가까운 값" 매칭
안티패턴equi-join(누락), range join(폭발)
정석union + last(ignorenulls) over window
동시각정렬 타이브레이커 명시
최대 위험미래 정보 누수 — <= 엄격, SCD2 유효기간

As-of join 은 "Spark 에 없는 구문을 직접 구현"해야 하는 대표적 난제입니다. range join 으로 정면 돌파하면 폭발하지만, union + window 의 last(ignorenulls) 패턴이면 한 번의 정렬로 깔끔히 풀립니다. 그리고 성능만큼 중요한 것이 미래 정보 차단 — 피처·백테스팅에서 as-of 를 쓰는 진짜 이유가 여기에 있다는 점을 잊지 마세요.


이 글은 Spark 3.5 기준으로 작성되었습니다. 시계열 피처 엔지니어링이나 point-in-time 정합성이 중요한 파이프라인 설계가 필요하시면 언제든 문의해 주세요.

— Data Dynamics 엔지니어링 팀