PySpark As-of Join — A Point-in-Time Join That Attaches "the Value as of That Moment"
How to implement an as-of (point-in-time) join in PySpark that attaches the most recent exchange rate, price, or state preceding each transaction timestamp. Covers why plain joins fail, the range join explosion problem, the union+window pattern, and preventing data leakage (future information).
"Attach the exchange rate that was in effect at each trade time." "Match every sensor event with the calibration value immediately preceding it." "What was the customer's tier at feature-generation time?" — these are all as-of joins (point-in-time joins). Instead of matching keys exactly, you're doing a time-based join that finds "the closest value at or before a reference timestamp."
The problem is that PySpark (Spark SQL) has no first-class as-of join syntax. Implement it incorrectly and a range join blows up your data, or future information leaks in and ruins your ML pipeline. This post walks through the correct implementation patterns and the pitfalls.
1. What Is an As-of Join
trades rates (changes over time)
time symbol time rate
10:00:05 KRW 10:00:00 1330
10:00:25 KRW 10:00:10 1331
10:00:20 1332
→ the 10:00:05 trade gets 1330 from 10:00:00 (latest at or before that moment)
→ the 10:00:25 trade gets 1332 from 10:00:20A plain equi-join only matches when time is exactly equal. But trade timestamps and rate-update timestamps virtually never line up exactly. You need to find "the maximum among timestamps ≤ the reference time" — that's the essence of as-of.
2. Why Plain Joins Don't Work
Attempt 1: equi-join — fails to match
trades.join(rates, "time") # only matches exactly equal timestamps → most rows are lostAttempt 2: range join — explosion
# Join each trade with ALL prior rates → many rows per trade
joined = trades.join(rates,
(trades.symbol == rates.symbol) & (rates.time <= trades.time))
# Then keep only the most recent row per trade → the intermediate result becomes hugeBecause of the <= condition, a range join matches a single trade with every earlier rate. If you have tens of thousands of rate updates per day, each trade produces tens of thousands of rows, and shuffle and memory usage explode. Spark doesn't optimize range joins well, so in practice this degenerates into something close to a cross join.
3. The Right Pattern — Union + Window
The proven PySpark as-of pattern is to union the two tables and use a window function to carry the most recent value forward. It solves the problem with a single sort, without the range-join explosion.
from pyspark.sql import functions as F
from pyspark.sql.window import Window
# 1) Union both datasets into a common schema (source marker + value column)
t = trades.select("symbol", "time",
F.lit("trade").alias("src"),
F.col("trade_id"),
F.lit(None).cast("double").alias("rate"))
r = rates.select("symbol", "time",
F.lit("rate").alias("src"),
F.lit(None).cast("long").alias("trade_id"),
F.col("rate"))
unioned = t.unionByName(r)
# 2) Sort by time within each symbol and carry the last seen rate forward
w = (Window.partitionBy("symbol")
.orderBy("time")
.rowsBetween(Window.unboundedPreceding, Window.currentRow))
filled = unioned.withColumn(
"asof_rate",
F.last("rate", ignorenulls=True).over(w)) # key step: fill with the latest preceding rate
# 3) Keep only the trade rows — each trade now carries its as-of rate
result = filled.where("src = 'trade'").select("trade_id", "symbol", "time", "asof_rate")The key is F.last("rate", ignorenulls=True). As the window scans in time order, it fills each row with the last non-NULL rate seen so far. Trade rows start with a NULL rate, so they pick up the value from the most recent preceding rate row.
| Comparison | range join | union+window |
|---|---|---|
| Intermediate result | trades × all prior rates (explosion) | union (simple concatenation) |
| Operations | join + re-filter | one sort + window |
| Scalability | poor | good |
4. Handling Identical Timestamps — Tiebreakers
When a trade and a rate share the same timestamp, you have to decide which one comes first. If the semantics are "at or before the trade time," rates must sort ahead of trades.
# Assign an ordering to src: at the same time, rate (0) sorts before trade (1)
w = (Window.partitionBy("symbol")
.orderBy("time", F.when(F.col("src") == "rate", 0).otherwise(1)))Miss this detail and you get a subtle bug where "the rate at the exact same timestamp" never attaches to the trade.
5. The Most Dangerous Pitfall — Data Leakage (Future Information)
In ML feature generation, preventing data leakage is the entire reason as-of joins exist. If information from after the feature timestamp enters training, the model looks great in validation but collapses in production.
Wrong join: attach the customer's "current" tier at label time → future-information leakage
Correct as-of: only the tier that was "in effect" at feature time → no leakageDefensive rules:
- The condition must strictly be
rate.time <= trade.time(strictly exclude the future). Pin down<vs<=according to your business definition. - Verify the same-timestamp handling in the window ordering (see Section 4).
- Never join a "current values table (SCD2's is_current)" directly — those are future values. Always join on validity periods.
Half the real reason to use an as-of join is performance; the other half is correctness (blocking future information). In feature stores and backtesting in particular, this leakage is fatal.
6. As-of Joins Against SCD2 Dimensions
If your historical dimension (SCD2) carries validity periods, the as-of join is expressed as a validity-range join.
# dim_users: valid_from, valid_to (SCD2)
result = (events.alias("e")
.join(dim.alias("d"),
(F.col("e.user_id") == F.col("d.user_id")) &
(F.col("e.event_time") >= F.col("d.valid_from")) &
(F.col("e.event_time") < F.col("d.valid_to")))
)As long as validity periods are kept non-overlapping (see our separate post "Large-Scale Deduplication and SCD Type 2 in PySpark"), this join matches exactly one row per event. But it's still a range join, so at scale either design for partition pruning on valid_from or consider switching to union+window.
7. Performance Tips
| Item | Recommendation |
|---|---|
| Sort cost | Spread partitions by symbol cardinality, watch for skew |
| Memory | The window holds large partitions in memory — choose partition keys carefully |
| Data reduction | Filter to the required time range before joining (drop irrelevant history) |
| AQE | Enable it |
| One-directional fill | Only one side needs filling — never do a bidirectional fill |
If symbol (the partition key) has low cardinality, a single partition becomes huge and the window OOMs. Check your key distribution (see our separate post on skew).
8. Summary
| Item | Key point |
|---|---|
| Definition | Match "the closest value at or before the reference time" |
| Anti-patterns | equi-join (missed matches), range join (explosion) |
| Canonical pattern | union + last(ignorenulls) over window |
| Same timestamps | Specify an explicit ordering tiebreaker |
| Biggest risk | Future-information leakage — strict <=, SCD2 validity periods |
The as-of join is a classic case of "implementing a construct Spark doesn't have." Attack it head-on with a range join and it explodes, but the union + window last(ignorenulls) pattern solves it cleanly with a single sort. And remember: just as important as performance is blocking future information — that's the real reason as-of joins matter for features and backtesting.
This article is based on Spark 3.5. If you need help with time-series feature engineering or designing pipelines where point-in-time consistency matters, feel free to reach out.
— The Data Dynamics Engineering Team