Blog
pysparksparkasof-jointime-seriespoint-in-timedata-engineering

PySpark As-of Join — A Point-in-Time Join That Attaches "the Value as of That Moment"

How to implement an as-of (point-in-time) join in PySpark that attaches the most recent exchange rate, price, or state preceding each transaction timestamp. Covers why plain joins fail, the range join explosion problem, the union+window pattern, and preventing data leakage (future information).

Data DynamicsJune 5, 20266 min read

"Attach the exchange rate that was in effect at each trade time." "Match every sensor event with the calibration value immediately preceding it." "What was the customer's tier at feature-generation time?" — these are all as-of joins (point-in-time joins). Instead of matching keys exactly, you're doing a time-based join that finds "the closest value at or before a reference timestamp."

The problem is that PySpark (Spark SQL) has no first-class as-of join syntax. Implement it incorrectly and a range join blows up your data, or future information leaks in and ruins your ML pipeline. This post walks through the correct implementation patterns and the pitfalls.

1. What Is an As-of Join

trades                      rates (changes over time)
time      symbol            time      rate
10:00:05  KRW               10:00:00  1330
10:00:25  KRW               10:00:10  1331
                            10:00:20  1332
 
→ the 10:00:05 trade gets 1330 from 10:00:00 (latest at or before that moment)
→ the 10:00:25 trade gets 1332 from 10:00:20

A plain equi-join only matches when time is exactly equal. But trade timestamps and rate-update timestamps virtually never line up exactly. You need to find "the maximum among timestamps ≤ the reference time" — that's the essence of as-of.

2. Why Plain Joins Don't Work

Attempt 1: equi-join — fails to match

trades.join(rates, "time")   # only matches exactly equal timestamps → most rows are lost

Attempt 2: range join — explosion

# Join each trade with ALL prior rates → many rows per trade
joined = trades.join(rates,
    (trades.symbol == rates.symbol) & (rates.time <= trades.time))
# Then keep only the most recent row per trade → the intermediate result becomes huge

Because of the <= condition, a range join matches a single trade with every earlier rate. If you have tens of thousands of rate updates per day, each trade produces tens of thousands of rows, and shuffle and memory usage explode. Spark doesn't optimize range joins well, so in practice this degenerates into something close to a cross join.

3. The Right Pattern — Union + Window

The proven PySpark as-of pattern is to union the two tables and use a window function to carry the most recent value forward. It solves the problem with a single sort, without the range-join explosion.

from pyspark.sql import functions as F
from pyspark.sql.window import Window
 
# 1) Union both datasets into a common schema (source marker + value column)
t = trades.select("symbol", "time",
                  F.lit("trade").alias("src"),
                  F.col("trade_id"),
                  F.lit(None).cast("double").alias("rate"))
 
r = rates.select("symbol", "time",
                 F.lit("rate").alias("src"),
                 F.lit(None).cast("long").alias("trade_id"),
                 F.col("rate"))
 
unioned = t.unionByName(r)
 
# 2) Sort by time within each symbol and carry the last seen rate forward
w = (Window.partitionBy("symbol")
            .orderBy("time")
            .rowsBetween(Window.unboundedPreceding, Window.currentRow))
 
filled = unioned.withColumn(
    "asof_rate",
    F.last("rate", ignorenulls=True).over(w))   # key step: fill with the latest preceding rate
 
# 3) Keep only the trade rows — each trade now carries its as-of rate
result = filled.where("src = 'trade'").select("trade_id", "symbol", "time", "asof_rate")

The key is F.last("rate", ignorenulls=True). As the window scans in time order, it fills each row with the last non-NULL rate seen so far. Trade rows start with a NULL rate, so they pick up the value from the most recent preceding rate row.

Comparisonrange joinunion+window
Intermediate resulttrades × all prior rates (explosion)union (simple concatenation)
Operationsjoin + re-filterone sort + window
Scalabilitypoorgood

4. Handling Identical Timestamps — Tiebreakers

When a trade and a rate share the same timestamp, you have to decide which one comes first. If the semantics are "at or before the trade time," rates must sort ahead of trades.

# Assign an ordering to src: at the same time, rate (0) sorts before trade (1)
w = (Window.partitionBy("symbol")
            .orderBy("time", F.when(F.col("src") == "rate", 0).otherwise(1)))

Miss this detail and you get a subtle bug where "the rate at the exact same timestamp" never attaches to the trade.

5. The Most Dangerous Pitfall — Data Leakage (Future Information)

In ML feature generation, preventing data leakage is the entire reason as-of joins exist. If information from after the feature timestamp enters training, the model looks great in validation but collapses in production.

Wrong join: attach the customer's "current" tier at label time → future-information leakage
Correct as-of: only the tier that was "in effect" at feature time → no leakage

Defensive rules:

  • The condition must strictly be rate.time <= trade.time (strictly exclude the future). Pin down < vs <= according to your business definition.
  • Verify the same-timestamp handling in the window ordering (see Section 4).
  • Never join a "current values table (SCD2's is_current)" directly — those are future values. Always join on validity periods.

Half the real reason to use an as-of join is performance; the other half is correctness (blocking future information). In feature stores and backtesting in particular, this leakage is fatal.

6. As-of Joins Against SCD2 Dimensions

If your historical dimension (SCD2) carries validity periods, the as-of join is expressed as a validity-range join.

# dim_users: valid_from, valid_to (SCD2)
result = (events.alias("e")
    .join(dim.alias("d"),
        (F.col("e.user_id") == F.col("d.user_id")) &
        (F.col("e.event_time") >= F.col("d.valid_from")) &
        (F.col("e.event_time") <  F.col("d.valid_to")))
)

As long as validity periods are kept non-overlapping (see our separate post "Large-Scale Deduplication and SCD Type 2 in PySpark"), this join matches exactly one row per event. But it's still a range join, so at scale either design for partition pruning on valid_from or consider switching to union+window.

7. Performance Tips

ItemRecommendation
Sort costSpread partitions by symbol cardinality, watch for skew
MemoryThe window holds large partitions in memory — choose partition keys carefully
Data reductionFilter to the required time range before joining (drop irrelevant history)
AQEEnable it
One-directional fillOnly one side needs filling — never do a bidirectional fill

If symbol (the partition key) has low cardinality, a single partition becomes huge and the window OOMs. Check your key distribution (see our separate post on skew).

8. Summary

ItemKey point
DefinitionMatch "the closest value at or before the reference time"
Anti-patternsequi-join (missed matches), range join (explosion)
Canonical patternunion + last(ignorenulls) over window
Same timestampsSpecify an explicit ordering tiebreaker
Biggest riskFuture-information leakage — strict <=, SCD2 validity periods

The as-of join is a classic case of "implementing a construct Spark doesn't have." Attack it head-on with a range join and it explodes, but the union + window last(ignorenulls) pattern solves it cleanly with a single sort. And remember: just as important as performance is blocking future information — that's the real reason as-of joins matter for features and backtesting.


This article is based on Spark 3.5. If you need help with time-series feature engineering or designing pipelines where point-in-time consistency matters, feel free to reach out.

— The Data Dynamics Engineering Team