PySpark Time-Series Processing — Gap Fill, Resampling, and Large Partition Windows
How to fill missing intervals (gap fill), resample irregular data, and avoid OOM from large partition windows in sensor, log, and financial time series. Forward fill, time-bucket aggregation, and the dangers of unbounded windows — with PySpark code.
Time-series data is full of holes — and that's normal. Sensors occasionally fail to report, trades only happen at certain moments, and logs arrive at irregular intervals. Yet analytics and ML usually assume a regular time series. Filling missing intervals (gap fill), adjusting the sampling interval (resampling), and doing all of this without letting a large partition blow up memory — that is the central challenge of time-series processing in PySpark.
This post walks through gap fill, resampling, forward fill, and the memory traps of window functions as practical patterns.
1. The Three Time-Series Tasks
1. Gap Fill : create rows for missing timestamps (generate rows that don't exist)
2. Resampling : irregular/fine-grained intervals -> regular intervals (1 second -> 1 minute)
3. Forward Fill : fill missing values with the most recent value (LOCF)Each task uses a different tool.
| Task | Key tool |
|---|---|
| Gap fill | Build a time grid with sequence + explode, then left join |
| Resampling | Time-bucket aggregation with window/date_trunc |
| Forward fill | last(ignorenulls) over window |
2. Resampling — Time-Bucket Aggregation
The most common task: aggregating fine-grained data into fixed intervals (1 minute, 1 hour).
from pyspark.sql import functions as F
# Resample to 1-minute buckets (tumbling window)
resampled = (df
.groupBy(
"sensor_id",
F.window("event_time", "1 minute"))
.agg(
F.avg("value").alias("avg_value"),
F.max("value").alias("max_value"),
F.count("*").alias("n")))
# Flatten the window struct
resampled = resampled.select(
"sensor_id",
F.col("window.start").alias("ts"),
"avg_value", "max_value", "n")You can also use date_trunc (simple non-overlapping buckets).
df.withColumn("minute", F.date_trunc("minute", "event_time")) \
.groupBy("sensor_id", "minute").agg(F.avg("value"))Resampling reduces data, so it's safe. The problem is the next step — after reducing, you end up with empty buckets in the middle: gaps.
3. Gap Fill — Creating Rows That Don't Exist
After resampling, "the minute when a sensor sent nothing" simply has no row. For analysis, you build a grid of all timestamps and left join against it to fill in the missing rows.
# 1) Generate the full time grid: start to end at 1-minute intervals
bounds = df.agg(
F.min("event_time").alias("t0"),
F.max("event_time").alias("t1")).collect()[0]
grid = (spark.sql(f"""
SELECT explode(sequence(
timestamp '{bounds.t0}',
timestamp '{bounds.t1}',
interval 1 minute)) AS ts
"""))
# 2) Cross sensors x time grid (so every sensor has every timestamp)
sensors = df.select("sensor_id").distinct()
full_grid = sensors.crossJoin(grid) # number of sensors x number of timestamps
# 3) Left join the actual data onto the grid -> missing timestamps become NULL
filled = full_grid.join(resampled, ["sensor_id", "ts"], "left")sequence(start, stop, interval) + explode is the core trick for building the time grid. The crossJoin grows with the number of sensors, so if sensors have different time ranges, narrow the grid per sensor.
4. Forward Fill (LOCF) — Filling NULLs with the Previous Value
Gap fill created the rows, but the values are NULL. In time series you usually fill with the most recent observation (Last Observation Carried Forward).
from pyspark.sql.window import Window
w = (Window.partitionBy("sensor_id")
.orderBy("ts")
.rowsBetween(Window.unboundedPreceding, Window.currentRow))
# Fill with the most recent non-NULL value
filled = filled.withColumn(
"value_ffill",
F.last("value", ignorenulls=True).over(w))F.last(..., ignorenulls=True) is the heart of forward fill. (This pattern works on the same principle as an as-of join — see the separate post "PySpark As-of Join".)
Backward Fill / Interpolation
# backward fill: sort in reverse and use last, or first(ignorenulls) with a following frame
w_back = Window.partitionBy("sensor_id").orderBy("ts") \
.rowsBetween(Window.currentRow, Window.unboundedFollowing)
filled = filled.withColumn("value_bfill", F.first("value", ignorenulls=True).over(w_back))
# For linear interpolation, compute directly from the previous/next values and the time distance5. The Most Dangerous Trap — Large Partition Window OOM
The fatal trap of time-series windows: an entire partition of the partitionBy key must fit in memory. With few sensors and long histories, one sensor's entire time series gets pulled into a single executor and triggers an OOM.
partitionBy("sensor_id") + 10 sensors x 100M points each
-> one partition = 100M rows in a single executor's memory -> OOM 💥Windows with orderBy + an unbounded frame are especially dangerous, because the sort requires holding the whole partition.
Countermeasures:
| Situation | Countermeasure |
|---|---|
| Low cardinality partition key | Add time (day/month) to the partition key to split it finer |
| Unbounded accumulation required | Process the range in time chunks |
| Unlimited forward-fill distance | Limit the frame to a reasonable maximum fill distance |
# Split partitions finer: sensor_id + date (note: fill cannot cross date boundaries)
w = Window.partitionBy("sensor_id", F.to_date("ts")).orderBy("ts")...
# Or limit the forward-fill distance (only fill within the last 60 minutes)
w = (Window.partitionBy("sensor_id").orderBy(F.col("ts").cast("long"))
.rangeBetween(-3600, 0)) # only within the previous hourThe key trade-off: splitting partitions finer avoids OOM, but the fill cannot cross chunk boundaries (when the date changes, the previous value is unreachable). If you need fills that cross boundaries, you need an extra step that propagates each chunk's last value into the next chunk.
6. Sessionization (a Time-Series Pattern)
Session boundaries like "start a new session after 30+ minutes of inactivity" are another application of time-series windows.
w = Window.partitionBy("user_id").orderBy("ts")
sessionized = (df
.withColumn("prev_ts", F.lag("ts").over(w))
.withColumn("gap_min",
(F.col("ts").cast("long") - F.col("prev_ts").cast("long")) / 60)
.withColumn("is_new",
F.when((F.col("gap_min") > 30) | F.col("prev_ts").isNull(), 1).otherwise(0))
.withColumn("session_id",
F.sum("is_new").over(w.rowsBetween(Window.unboundedPreceding, Window.currentRow))))7. Performance Checklist
- Reduce data with resampling first (before windowing)
- Check the cardinality of the window
partitionBykey (low cardinality means OOM) - Don't overuse unbounded frames — limit them to the range you need
- Limit forward-fill distance (
rangeBetweenover time) - Minimize the range of the time-grid crossJoin
- Enable AQE, check for skew
8. Summary
| Task | Tool | Trap |
|---|---|---|
| Resampling | window/date_trunc aggregation | (safe) |
| Gap fill | sequence+explode+left join | crossJoin explosion |
| Forward fill | last(ignorenulls) over window | large partition OOM |
| Sessionization | lag+conditional cumulative sum | partition skew |
The essence of time-series processing: "reduce with resampling, fill with a grid, patch with last — and always stay aware of partition size." Window functions are powerful, but if you forget that a partitionBy partition is loaded into memory in its entirety, you'll hit OOM even on data with just a handful of sensors. Once you understand the trade-off between fill distance and chunk boundaries, you can reliably regularize time series with billions of points.
This article is based on Spark 3.5. If you need help designing large-scale time-series and IoT data pipelines, feel free to reach out.
— The Data Dynamics Engineering Team