Blog
pysparksparktime-seriesgap-fillwindowdata-engineering

PySpark Time-Series Processing — Gap Fill, Resampling, and Large Partition Windows

How to fill missing intervals (gap fill), resample irregular data, and avoid OOM from large partition windows in sensor, log, and financial time series. Forward fill, time-bucket aggregation, and the dangers of unbounded windows — with PySpark code.

Data DynamicsJune 5, 20266 min read

Time-series data is full of holes — and that's normal. Sensors occasionally fail to report, trades only happen at certain moments, and logs arrive at irregular intervals. Yet analytics and ML usually assume a regular time series. Filling missing intervals (gap fill), adjusting the sampling interval (resampling), and doing all of this without letting a large partition blow up memory — that is the central challenge of time-series processing in PySpark.

This post walks through gap fill, resampling, forward fill, and the memory traps of window functions as practical patterns.

1. The Three Time-Series Tasks

1. Gap Fill       : create rows for missing timestamps (generate rows that don't exist)
2. Resampling     : irregular/fine-grained intervals -> regular intervals (1 second -> 1 minute)
3. Forward Fill   : fill missing values with the most recent value (LOCF)

Each task uses a different tool.

TaskKey tool
Gap fillBuild a time grid with sequence + explode, then left join
ResamplingTime-bucket aggregation with window/date_trunc
Forward filllast(ignorenulls) over window

2. Resampling — Time-Bucket Aggregation

The most common task: aggregating fine-grained data into fixed intervals (1 minute, 1 hour).

from pyspark.sql import functions as F
 
# Resample to 1-minute buckets (tumbling window)
resampled = (df
    .groupBy(
        "sensor_id",
        F.window("event_time", "1 minute"))
    .agg(
        F.avg("value").alias("avg_value"),
        F.max("value").alias("max_value"),
        F.count("*").alias("n")))
 
# Flatten the window struct
resampled = resampled.select(
    "sensor_id",
    F.col("window.start").alias("ts"),
    "avg_value", "max_value", "n")

You can also use date_trunc (simple non-overlapping buckets).

df.withColumn("minute", F.date_trunc("minute", "event_time")) \
  .groupBy("sensor_id", "minute").agg(F.avg("value"))

Resampling reduces data, so it's safe. The problem is the next step — after reducing, you end up with empty buckets in the middle: gaps.

3. Gap Fill — Creating Rows That Don't Exist

After resampling, "the minute when a sensor sent nothing" simply has no row. For analysis, you build a grid of all timestamps and left join against it to fill in the missing rows.

# 1) Generate the full time grid: start to end at 1-minute intervals
bounds = df.agg(
    F.min("event_time").alias("t0"),
    F.max("event_time").alias("t1")).collect()[0]
 
grid = (spark.sql(f"""
    SELECT explode(sequence(
        timestamp '{bounds.t0}',
        timestamp '{bounds.t1}',
        interval 1 minute)) AS ts
"""))
 
# 2) Cross sensors x time grid (so every sensor has every timestamp)
sensors = df.select("sensor_id").distinct()
full_grid = sensors.crossJoin(grid)        # number of sensors x number of timestamps
 
# 3) Left join the actual data onto the grid -> missing timestamps become NULL
filled = full_grid.join(resampled, ["sensor_id", "ts"], "left")

sequence(start, stop, interval) + explode is the core trick for building the time grid. The crossJoin grows with the number of sensors, so if sensors have different time ranges, narrow the grid per sensor.

4. Forward Fill (LOCF) — Filling NULLs with the Previous Value

Gap fill created the rows, but the values are NULL. In time series you usually fill with the most recent observation (Last Observation Carried Forward).

from pyspark.sql.window import Window
 
w = (Window.partitionBy("sensor_id")
            .orderBy("ts")
            .rowsBetween(Window.unboundedPreceding, Window.currentRow))
 
# Fill with the most recent non-NULL value
filled = filled.withColumn(
    "value_ffill",
    F.last("value", ignorenulls=True).over(w))

F.last(..., ignorenulls=True) is the heart of forward fill. (This pattern works on the same principle as an as-of join — see the separate post "PySpark As-of Join".)

Backward Fill / Interpolation

# backward fill: sort in reverse and use last, or first(ignorenulls) with a following frame
w_back = Window.partitionBy("sensor_id").orderBy("ts") \
    .rowsBetween(Window.currentRow, Window.unboundedFollowing)
filled = filled.withColumn("value_bfill", F.first("value", ignorenulls=True).over(w_back))
 
# For linear interpolation, compute directly from the previous/next values and the time distance

5. The Most Dangerous Trap — Large Partition Window OOM

The fatal trap of time-series windows: an entire partition of the partitionBy key must fit in memory. With few sensors and long histories, one sensor's entire time series gets pulled into a single executor and triggers an OOM.

partitionBy("sensor_id")  +  10 sensors x 100M points each
-> one partition = 100M rows in a single executor's memory -> OOM 💥

Windows with orderBy + an unbounded frame are especially dangerous, because the sort requires holding the whole partition.

Countermeasures:

SituationCountermeasure
Low cardinality partition keyAdd time (day/month) to the partition key to split it finer
Unbounded accumulation requiredProcess the range in time chunks
Unlimited forward-fill distanceLimit the frame to a reasonable maximum fill distance
# Split partitions finer: sensor_id + date (note: fill cannot cross date boundaries)
w = Window.partitionBy("sensor_id", F.to_date("ts")).orderBy("ts")...
 
# Or limit the forward-fill distance (only fill within the last 60 minutes)
w = (Window.partitionBy("sensor_id").orderBy(F.col("ts").cast("long"))
     .rangeBetween(-3600, 0))   # only within the previous hour

The key trade-off: splitting partitions finer avoids OOM, but the fill cannot cross chunk boundaries (when the date changes, the previous value is unreachable). If you need fills that cross boundaries, you need an extra step that propagates each chunk's last value into the next chunk.

6. Sessionization (a Time-Series Pattern)

Session boundaries like "start a new session after 30+ minutes of inactivity" are another application of time-series windows.

w = Window.partitionBy("user_id").orderBy("ts")
 
sessionized = (df
    .withColumn("prev_ts", F.lag("ts").over(w))
    .withColumn("gap_min",
        (F.col("ts").cast("long") - F.col("prev_ts").cast("long")) / 60)
    .withColumn("is_new",
        F.when((F.col("gap_min") > 30) | F.col("prev_ts").isNull(), 1).otherwise(0))
    .withColumn("session_id",
        F.sum("is_new").over(w.rowsBetween(Window.unboundedPreceding, Window.currentRow))))

7. Performance Checklist

  • Reduce data with resampling first (before windowing)
  • Check the cardinality of the window partitionBy key (low cardinality means OOM)
  • Don't overuse unbounded frames — limit them to the range you need
  • Limit forward-fill distance (rangeBetween over time)
  • Minimize the range of the time-grid crossJoin
  • Enable AQE, check for skew

8. Summary

TaskToolTrap
Resamplingwindow/date_trunc aggregation(safe)
Gap fillsequence+explode+left joincrossJoin explosion
Forward filllast(ignorenulls) over windowlarge partition OOM
Sessionizationlag+conditional cumulative sumpartition skew

The essence of time-series processing: "reduce with resampling, fill with a grid, patch with last — and always stay aware of partition size." Window functions are powerful, but if you forget that a partitionBy partition is loaded into memory in its entirety, you'll hit OOM even on data with just a handful of sensors. Once you understand the trade-off between fill distance and chunk boundaries, you can reliably regularize time series with billions of points.


This article is based on Spark 3.5. If you need help designing large-scale time-series and IoT data pipelines, feel free to reach out.

— The Data Dynamics Engineering Team