Blog
pysparksparkasof-jointime-seriespoint-in-timedata-engineering

PySpark As-of Join — "그 시점의 값"을 붙이는 Point-in-Time 조인

거래 시각마다 그 직전의 환율·가격·상태를 붙이는 as-of(point-in-time) 조인을 PySpark 로 구현하는 법. 일반 조인으로 안 되는 이유, range join 폭발 문제, union+window 패턴, 그리고 데이터 누수(미래 정보) 방지까지 정리합니다.

Data Dynamics2026年6月5日9 min read
This post is not yet translated. The original Korean version is shown below.

"각 거래 시점에 유효했던 환율을 붙여라." "센서 이벤트마다 그 직전 보정값을 매칭하라." "피처 생성 시점의 고객 등급은?" — 이런 요구는 모두 as-of join(point-in-time join) 입니다. 키가 정확히 일치하는 조인이 아니라, "기준 시각 이전의 가장 가까운 값" 을 찾는 시간 기반 조인입니다.

문제는 PySpark(Spark SQL)에 as-of join 이 1급 구문으로 없다는 것입니다. 잘못 구현하면 range join 으로 데이터가 폭발하거나, 미래 정보가 새어 들어가 ML 파이프라인을 망칩니다. 이 글은 올바른 구현 패턴과 함정을 정리합니다.

1. As-of Join 이란

거래(trades)                환율(rates, 시각별로 변함)
time      symbol            time      rate
10:00:05  KRW               10:00:00  1330
10:00:25  KRW               10:00:10  1331
                            10:00:20  1332
 
→ 10:00:05 거래에는 10:00:00 의 1330 (그 시점 이전 최신)
→ 10:00:25 거래에는 10:00:20 의 1332

일반 equi-join 은 time 이 정확히 같아야 매칭됩니다. 하지만 거래 시각과 환율 갱신 시각은 절대 정확히 안 맞습니다. "≤ 기준시각 중 최대" 를 찾아야 합니다 — 이것이 as-of 의 핵심입니다.

2. 왜 일반 조인으로 안 되나

시도 ① equi-join — 매칭 실패

trades.join(rates, "time")   # 시각이 정확히 같은 경우만 → 대부분 누락

시도 ② range join — 폭발

# 거래시각 이전의 모든 환율과 조인 → 거래당 수많은 행
joined = trades.join(rates,
    (trades.symbol == rates.symbol) & (rates.time <= trades.time))
# 그 후 거래별로 가장 최근 1건만 남기기 → 중간 결과가 거대해짐

range join 은 <= 조건이라 거래 하나가 그 이전 모든 환율과 매칭됩니다. 환율이 하루 수만 건이면 거래당 수만 행이 생겨 셔플·메모리가 폭발합니다. Spark 는 range join 을 잘 최적화하지 못해 사실상 cross join 에 가까워집니다.

3. 올바른 패턴 — Union + Window

검증된 PySpark as-of 패턴은 두 테이블을 union 한 뒤 window 로 직전 값을 끌어오는 것입니다. range join 의 폭발 없이 한 번의 정렬로 해결합니다.

from pyspark.sql import functions as F
from pyspark.sql.window import Window
 
# 1) 두 데이터셋을 공통 스키마로 union (출처 구분 + 값 컬럼)
t = trades.select("symbol", "time",
                  F.lit("trade").alias("src"),
                  F.col("trade_id"),
                  F.lit(None).cast("double").alias("rate"))
 
r = rates.select("symbol", "time",
                 F.lit("rate").alias("src"),
                 F.lit(None).cast("long").alias("trade_id"),
                 F.col("rate"))
 
unioned = t.unionByName(r)
 
# 2) symbol 별 시간순으로 정렬하고, 직전까지의 마지막 rate 를 끌어옴
w = (Window.partitionBy("symbol")
            .orderBy("time")
            .rowsBetween(Window.unboundedPreceding, Window.currentRow))
 
filled = unioned.withColumn(
    "asof_rate",
    F.last("rate", ignorenulls=True).over(w))   # 핵심: 직전 최신 rate 채우기
 
# 3) 거래 행만 남기면 각 거래에 as-of rate 가 붙어 있음
result = filled.where("src = 'trade'").select("trade_id", "symbol", "time", "asof_rate")

핵심은 F.last("rate", ignorenulls=True) 입니다. 시간순으로 보면서 마지막으로 본 NULL 이 아닌 환율을 각 행에 채웁니다. 거래 행에는 원래 rate 가 NULL 이므로, 직전 환율 행의 값이 들어옵니다.

비교range joinunion+window
중간 결과거래 × 이전 환율 전부 (폭발)union(단순 합)
연산조인 + 재필터1회 정렬 + window
확장성나쁨좋음

4. 동시각 처리 — 타이브레이커

거래와 환율이 같은 시각일 때 어느 것을 먼저 볼지 정해야 합니다. "거래 시점 이전(같은 시각 포함)"이라면 환율을 거래보다 먼저 정렬해야 합니다.

# src 에 순서를 부여: 같은 time 이면 rate(0) 가 trade(1) 보다 먼저
w = (Window.partitionBy("symbol")
            .orderBy("time", F.when(F.col("src") == "rate", 0).otherwise(1)))

이 디테일을 놓치면 "같은 시각의 환율"이 거래에 안 붙는 미묘한 버그가 생깁니다.

5. 가장 위험한 함정 — 데이터 누수(미래 정보)

ML 피처 생성에서 as-of join 의 존재 이유가 바로 데이터 누수 방지입니다. 피처 시점 이후의 정보가 학습에 들어가면, 검증에서는 좋아 보이지만 실전에서 무너집니다.

잘못된 조인: 라벨 시점의 "현재" 고객 등급을 붙임 → 미래 정보 누수
올바른 as-of: 피처 시점에 "유효했던" 등급만 → 누수 없음

방어 규칙:

  • 조건은 반드시 rate.time <= trade.time (strict 하게 미래 배제). < vs <= 를 비즈니스 정의에 맞게 명확히.
  • window 정렬에서 동시각 처리를 점검(4장).
  • "현재값 테이블(SCD2 의 is_current)"을 그냥 조인하지 말 것 — 그건 미래값입니다. 반드시 유효기간 기반으로.

as-of join 을 쓰는 진짜 이유의 절반은 성능이고 절반은 정확성(미래 정보 차단) 입니다. 특히 피처 스토어·백테스팅에서 이 누수는 치명적입니다.

6. SCD2 차원과의 as-of 조인

이력 차원(SCD2)에 유효기간이 있으면, as-of 는 유효기간 조인으로 표현됩니다.

# dim_users: valid_from, valid_to (SCD2)
result = (events.alias("e")
    .join(dim.alias("d"),
        (F.col("e.user_id") == F.col("d.user_id")) &
        (F.col("e.event_time") >= F.col("d.valid_from")) &
        (F.col("e.event_time") <  F.col("d.valid_to")))
)

유효기간이 겹치지 않게 관리되면(별도 글 "PySpark 대규모 중복 제거와 SCD Type 2" 참고) 이 조인은 이벤트당 정확히 한 행을 매칭합니다. 다만 이것도 범위 조인이라, 큰 데이터에서는 valid_from 으로 파티션 프루닝이 되도록 설계하거나 union+window 로 전환을 고려합니다.

7. 성능 팁

항목권장
정렬 비용symbol 카디널리티로 파티션 분산, 스큐 주의
메모리window 가 큰 파티션을 메모리에 — 파티션 키 적절히
데이터 축소조인 전 필요한 시간 범위로 필터(불필요 과거 제외)
AQE켜기
단방향성한쪽만 채우면 되므로 양방향 fill 금지

symbol(파티션 키)이 적으면 한 파티션이 거대해져 window 가 OOM 납니다. 키 분포를 확인하세요(스큐는 별도 글 참고).

8. 정리

항목핵심
정의"기준 시각 이전의 가장 가까운 값" 매칭
안티패턴equi-join(누락), range join(폭발)
정석union + last(ignorenulls) over window
동시각정렬 타이브레이커 명시
최대 위험미래 정보 누수 — <= 엄격, SCD2 유효기간

As-of join 은 "Spark 에 없는 구문을 직접 구현"해야 하는 대표적 난제입니다. range join 으로 정면 돌파하면 폭발하지만, union + window 의 last(ignorenulls) 패턴이면 한 번의 정렬로 깔끔히 풀립니다. 그리고 성능만큼 중요한 것이 미래 정보 차단 — 피처·백테스팅에서 as-of 를 쓰는 진짜 이유가 여기에 있다는 점을 잊지 마세요.


이 글은 Spark 3.5 기준으로 작성되었습니다. 시계열 피처 엔지니어링이나 point-in-time 정합성이 중요한 파이프라인 설계가 필요하시면 언제든 문의해 주세요.

— Data Dynamics 엔지니어링 팀