PySpark ML Feature Engineering — Large-Scale Pipelines and Preventing Data Leakage
The hard parts of building ML features on hundreds of millions of rows. We cover VectorAssembler and Pipeline composition, categorical encoding, preventing train/test data leakage, and patterns for keeping training and serving consistent — with PySpark MLlib code.
The performance of an ML model is decided less by the model itself than by the quality of its features. And building features on large-scale data is not a simple transformation problem — it is a tangle of memory constraints, leakage, and training-serving consistency. Data leakage in particular is the most expensive and insidious bug there is: it makes your validation look perfect, then quietly destroys the model in production.
This post walks through building large-scale feature pipelines with PySpark MLlib, categorical encoding, and — above all — patterns that prevent leakage structurally.
1. The Most Dangerous Enemy — Data Leakage
Data leakage means information that is unknowable at training time ends up in your features. There are two kinds.
(1) Train/Test leakage: test-set statistics bleed into training preprocessing
e.g., normalizing with a mean computed over the full dataset -> test information leaks into training
(2) Temporal leakage (future information): data from after the prediction time is included in features
e.g., predicting a past label with the "current" customer tier -> using future values| Leakage | Symptom | Defense |
|---|---|---|
| Train/Test | Unrealistically high validation scores | fit on train only |
| Temporal (future) | Great backtest, collapses in production | as-of join, point-in-time |
Core principle: learn all statistics (means, variances, encoding mappings) from the train data only, and merely apply that learned transformation to test and serving. This is exactly why MLlib separates
fitandtransform.
2. fit / transform Separation — The Basic Structure for Preventing Leakage
MLlib's Transformer/Estimator model enforces leakage prevention. fit learns statistics (on train only); transform applies them (anywhere).
from pyspark.ml.feature import StandardScaler
# Split into train/test first (before any preprocessing!)
train, test = df.randomSplit([0.8, 0.2], seed=42)
scaler = StandardScaler(inputCol="features", outputCol="scaled")
model = scaler.fit(train) # ✅ learn statistics (mean, stddev) from train only
train_s = model.transform(train) # apply
test_s = model.transform(test) # apply the same model (no test statistics used)# ❌ Leakage: fit on the full dataset, then split -> test information bleeds into the scaler
model = scaler.fit(df) # uses full-dataset statistics -> leakage
train, test = df.randomSplit(...)The order is everything — split first, fit on train only.
3. Pipeline — Bundling Transformations Into One Unit
Bundling multiple preprocessing stages into a Pipeline applies fit/transform consistently across the whole chain, blocking leakage structurally and guaranteeing training-serving consistency.
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
# Categorical encoding -> vector assembly -> scaling, all in one pipeline
indexer = StringIndexer(inputCols=["city", "device"],
outputCols=["city_idx", "device_idx"],
handleInvalid="keep") # handle unseen categories
encoder = OneHotEncoder(inputCols=["city_idx", "device_idx"],
outputCols=["city_oh", "device_oh"])
assembler = VectorAssembler(
inputCols=["age", "amount", "city_oh", "device_oh"],
outputCol="features",
handleInvalid="skip")
scaler = StandardScaler(inputCol="features", outputCol="scaled")
pipeline = Pipeline(stages=[indexer, encoder, assembler, scaler])
model = pipeline.fit(train) # fit the entire pipeline on train
train_f = model.transform(train)
test_f = model.transform(test)
# Save the pipeline model -> guarantees identical transformations at serving time
model.write().overwrite().save("/models/feature_pipeline")Saving the Pipeline model lets you reproduce exactly the same transformations at serving time as during training. This is the key defense against the all-too-common failure mode of training-serving skew — preprocessing differently at serving time and watching performance degrade.
4. Categorical Encoding — The High-Cardinality Trap
| Encoder | Purpose | Pitfall |
|---|---|---|
StringIndexer | string -> index | unseen categories (handleInvalid) |
OneHotEncoder | index -> sparse vector | dimension explosion at high cardinality |
FeatureHasher | hash-based encoding | suited to high cardinality (tolerates collisions) |
One-hot encoding a high-cardinality categorical (hundreds of thousands of product IDs) yields hundreds of thousands of dimensions. In that case, use FeatureHasher (the hashing trick) or target/frequency encoding.
from pyspark.ml.feature import FeatureHasher
hasher = FeatureHasher(inputCols=["product_id", "city"],
outputCol="hashed", numFeatures=1 << 18)Always set
handleInvalid="keep". When serving encounters a category that was not present at training time, omitting this option causes an error. Unseen categories will inevitably show up in production.
5. Preventing Temporal Leakage — Point-in-Time Features
For time-series and event data, features must use only values that were valid at prediction time. Naively joining a "current values" table leaks future information in.
# ❌ Leakage: joining the current tier, which may post-date the label time
features = labels.join(dim_users_current, "user_id")
# ✅ As-of: only values valid before the label time
features = labels.join(dim_users_history,
(labels.user_id == dim_users_history.user_id) &
(labels.label_time >= dim_users_history.valid_from) &
(labels.label_time < dim_users_history.valid_to))We covered this point-in-time join in detail in a separate post, "PySpark As-of Join". Guaranteeing this point-in-time correctness is the core reason feature stores exist.
6. Aggregation Features and Leakage
Windowed aggregation features like "each user's average purchase amount over the last 30 days" also carry high leakage risk. If the aggregation window extends past the label time, you are looking at the future.
from pyspark.sql.window import Window
from pyspark.sql import functions as F
# Aggregate only over data "before" the label time (rangeBetween blocks the future)
w = (Window.partitionBy("user_id")
.orderBy(F.col("event_time").cast("long"))
.rangeBetween(-30*86400, -1)) # previous 30 days up to just before now (future excluded)
features = df.withColumn("avg_30d", F.avg("amount").over(w))The key is setting the upper bound of rangeBetween to -1 (or being aware of currentRow) to explicitly exclude the present and the future.
7. Performance at Scale
| Item | Watch out for |
|---|---|
| One-hot sparse vectors | memory at high dimensionality — FeatureHasher |
| Windowed aggregation features | OOM on large partitions (see the separate time-series post) |
| Join features | broadcast dimension tables, check for skew |
| Pipeline stages | avoid unnecessary intermediate caching |
| Feature storage | reuse via Iceberg/Delta feature tables |
It is most efficient to compute features once, store them as a feature table (Lakehouse), and reuse them across multiple models — the basic idea behind a feature store.
8. Summary
| Area | Key point |
|---|---|
| Leakage prevention | split first, fit on train only |
| Structure | bundle into a Pipeline for consistency and reproducibility |
| Categoricals | handleInvalid; use hashing for high cardinality |
| Temporal leakage | as-of/validity-period joins, exclude the future from windows |
| Training-serving | save and reuse the Pipeline model |
The core insight of large-scale ML feature engineering is that "leakage is a correctness problem, not a performance problem". If your validation scores look unrealistically good, leakage should almost always be your first suspect. Make MLlib's fit/transform separation and Pipelines your discipline: learn statistics from train only, and block the future from temporal features with point-in-time joins. Do that, and the performance you saw in validation will reproduce in production — giving you a model you can trust.
This post is based on Spark 3.5 + MLlib. If you need help designing large-scale ML feature pipelines or a feature store, feel free to reach out.
— The Data Dynamics Engineering Team