Blog
pysparksparkmllibfeature-engineeringdata-leakagemachine-learning

PySpark ML Feature Engineering — Large-Scale Pipelines and Preventing Data Leakage

The hard parts of building ML features on hundreds of millions of rows. We cover VectorAssembler and Pipeline composition, categorical encoding, preventing train/test data leakage, and patterns for keeping training and serving consistent — with PySpark MLlib code.

Data DynamicsJune 5, 20266 min read

The performance of an ML model is decided less by the model itself than by the quality of its features. And building features on large-scale data is not a simple transformation problem — it is a tangle of memory constraints, leakage, and training-serving consistency. Data leakage in particular is the most expensive and insidious bug there is: it makes your validation look perfect, then quietly destroys the model in production.

This post walks through building large-scale feature pipelines with PySpark MLlib, categorical encoding, and — above all — patterns that prevent leakage structurally.

1. The Most Dangerous Enemy — Data Leakage

Data leakage means information that is unknowable at training time ends up in your features. There are two kinds.

(1) Train/Test leakage: test-set statistics bleed into training preprocessing
   e.g., normalizing with a mean computed over the full dataset -> test information leaks into training
 
(2) Temporal leakage (future information): data from after the prediction time is included in features
   e.g., predicting a past label with the "current" customer tier -> using future values
LeakageSymptomDefense
Train/TestUnrealistically high validation scoresfit on train only
Temporal (future)Great backtest, collapses in productionas-of join, point-in-time

Core principle: learn all statistics (means, variances, encoding mappings) from the train data only, and merely apply that learned transformation to test and serving. This is exactly why MLlib separates fit and transform.

2. fit / transform Separation — The Basic Structure for Preventing Leakage

MLlib's Transformer/Estimator model enforces leakage prevention. fit learns statistics (on train only); transform applies them (anywhere).

from pyspark.ml.feature import StandardScaler
 
# Split into train/test first (before any preprocessing!)
train, test = df.randomSplit([0.8, 0.2], seed=42)
 
scaler = StandardScaler(inputCol="features", outputCol="scaled")
 
model = scaler.fit(train)        # ✅ learn statistics (mean, stddev) from train only
train_s = model.transform(train) # apply
test_s  = model.transform(test)  # apply the same model (no test statistics used)
# ❌ Leakage: fit on the full dataset, then split -> test information bleeds into the scaler
model = scaler.fit(df)           # uses full-dataset statistics -> leakage
train, test = df.randomSplit(...)

The order is everything — split first, fit on train only.

3. Pipeline — Bundling Transformations Into One Unit

Bundling multiple preprocessing stages into a Pipeline applies fit/transform consistently across the whole chain, blocking leakage structurally and guaranteeing training-serving consistency.

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
 
# Categorical encoding -> vector assembly -> scaling, all in one pipeline
indexer = StringIndexer(inputCols=["city", "device"],
                        outputCols=["city_idx", "device_idx"],
                        handleInvalid="keep")          # handle unseen categories
encoder = OneHotEncoder(inputCols=["city_idx", "device_idx"],
                        outputCols=["city_oh", "device_oh"])
assembler = VectorAssembler(
    inputCols=["age", "amount", "city_oh", "device_oh"],
    outputCol="features",
    handleInvalid="skip")
scaler = StandardScaler(inputCol="features", outputCol="scaled")
 
pipeline = Pipeline(stages=[indexer, encoder, assembler, scaler])
 
model = pipeline.fit(train)      # fit the entire pipeline on train
train_f = model.transform(train)
test_f  = model.transform(test)
 
# Save the pipeline model -> guarantees identical transformations at serving time
model.write().overwrite().save("/models/feature_pipeline")

Saving the Pipeline model lets you reproduce exactly the same transformations at serving time as during training. This is the key defense against the all-too-common failure mode of training-serving skew — preprocessing differently at serving time and watching performance degrade.

4. Categorical Encoding — The High-Cardinality Trap

EncoderPurposePitfall
StringIndexerstring -> indexunseen categories (handleInvalid)
OneHotEncoderindex -> sparse vectordimension explosion at high cardinality
FeatureHasherhash-based encodingsuited to high cardinality (tolerates collisions)

One-hot encoding a high-cardinality categorical (hundreds of thousands of product IDs) yields hundreds of thousands of dimensions. In that case, use FeatureHasher (the hashing trick) or target/frequency encoding.

from pyspark.ml.feature import FeatureHasher
 
hasher = FeatureHasher(inputCols=["product_id", "city"],
                       outputCol="hashed", numFeatures=1 << 18)

Always set handleInvalid="keep". When serving encounters a category that was not present at training time, omitting this option causes an error. Unseen categories will inevitably show up in production.

5. Preventing Temporal Leakage — Point-in-Time Features

For time-series and event data, features must use only values that were valid at prediction time. Naively joining a "current values" table leaks future information in.

# ❌ Leakage: joining the current tier, which may post-date the label time
features = labels.join(dim_users_current, "user_id")
 
# ✅ As-of: only values valid before the label time
features = labels.join(dim_users_history,
    (labels.user_id == dim_users_history.user_id) &
    (labels.label_time >= dim_users_history.valid_from) &
    (labels.label_time <  dim_users_history.valid_to))

We covered this point-in-time join in detail in a separate post, "PySpark As-of Join". Guaranteeing this point-in-time correctness is the core reason feature stores exist.

6. Aggregation Features and Leakage

Windowed aggregation features like "each user's average purchase amount over the last 30 days" also carry high leakage risk. If the aggregation window extends past the label time, you are looking at the future.

from pyspark.sql.window import Window
from pyspark.sql import functions as F
 
# Aggregate only over data "before" the label time (rangeBetween blocks the future)
w = (Window.partitionBy("user_id")
            .orderBy(F.col("event_time").cast("long"))
            .rangeBetween(-30*86400, -1))   # previous 30 days up to just before now (future excluded)
 
features = df.withColumn("avg_30d", F.avg("amount").over(w))

The key is setting the upper bound of rangeBetween to -1 (or being aware of currentRow) to explicitly exclude the present and the future.

7. Performance at Scale

ItemWatch out for
One-hot sparse vectorsmemory at high dimensionality — FeatureHasher
Windowed aggregation featuresOOM on large partitions (see the separate time-series post)
Join featuresbroadcast dimension tables, check for skew
Pipeline stagesavoid unnecessary intermediate caching
Feature storagereuse via Iceberg/Delta feature tables

It is most efficient to compute features once, store them as a feature table (Lakehouse), and reuse them across multiple models — the basic idea behind a feature store.

8. Summary

AreaKey point
Leakage preventionsplit first, fit on train only
Structurebundle into a Pipeline for consistency and reproducibility
CategoricalshandleInvalid; use hashing for high cardinality
Temporal leakageas-of/validity-period joins, exclude the future from windows
Training-servingsave and reuse the Pipeline model

The core insight of large-scale ML feature engineering is that "leakage is a correctness problem, not a performance problem". If your validation scores look unrealistically good, leakage should almost always be your first suspect. Make MLlib's fit/transform separation and Pipelines your discipline: learn statistics from train only, and block the future from temporal features with point-in-time joins. Do that, and the performance you saw in validation will reproduce in production — giving you a model you can trust.


This post is based on Spark 3.5 + MLlib. If you need help designing large-scale ML feature pipelines or a feature store, feel free to reach out.

— The Data Dynamics Engineering Team