PySpark Broadcast Variables and Large Lookups — Enriching Data Without a Shuffle
How to enrich hundreds of millions of rows with small reference data without shuffling. We cover the difference between broadcast joins and broadcast variables, the auto-broadcast threshold, the dangers of broadcasting something too large, and patterns for external API/model lookups.
Data enrichment is everyday work — attaching product details to transactions, geo information to IPs, names to codes. Joining a large fact table (hundreds of millions of rows) with small reference data (tens to hundreds of thousands of rows) gets slow if done naively, because both sides get shuffled. If you instead replicate the small data to every worker, the job finishes with zero shuffle.
This post covers the difference between broadcast joins and broadcast variables, how auto-broadcast works, the dangers of broadcasting something too large, and patterns for external lookups.
1. The Problem — Attaching Small Data Triggers a Shuffle
Large fact (500M rows) ──(shuffled by join key)──┐
├─ SortMergeJoin
Small dimension (100K rows) ──(shuffled)─────────┘
→ 500M rows redistributed over the network (just to attach small data!)Even though the dimension is tiny, a SortMergeJoin shuffles all 500 million fact rows. That's pure waste. Replicating the small dimension eliminates this shuffle entirely.
2. Broadcast Join — The Most Common Solution
Replicate the small side to every executor and join without any shuffle.
from pyspark.sql.functions import broadcast
enriched = fact.join(broadcast(dim_product), "product_id")
# → dim_product is replicated to every worker; fact is joined in place (zero shuffle)# Auto-broadcast threshold (default 10MB) — tables smaller than this (per statistics) are broadcast automatically
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "50MB")| SortMergeJoin | Broadcast Join | |
|---|---|---|
| Large side | Shuffled | Processed in place |
| Small side | Shuffled | Replicated to every worker |
| Best for | Both sides large | One side small |
Spark automatically broadcasts tables that statistics say are small. If auto-broadcast doesn't kick in (e.g., missing statistics), force it with the
broadcast()hint. If you seeBroadcastHashJoinin EXPLAIN, it worked (see our separate post "Debugging Slow PySpark Jobs").
3. The Biggest Pitfall — Broadcasting Something Too Large
The broadcast target is collected on the driver and then replicated to every executor. If it's too large:
Broadcast a large table → driver collects it whole → driver OOM
Or → each executor holds a giant copy in memory → executor OOM| Warning sign | Consequence |
|---|---|
| Broadcast target is hundreds of MB to GB | Driver/executor OOM |
| Blindly raising the threshold | Large tables get auto-broadcast and blow up |
| Inaccurate statistics | A large table is misjudged as small |
Rule of thumb: broadcast only truly small data (tens of MB at most). When a dimension grows, consider bucketing (see our separate post "PySpark Bucketing") or a regular join instead of broadcasting. Make it a habit to check whether "the dimension you think is small" is actually small.
4. Broadcast Variables — Lookups, Not Joins
When you need reference data inside a UDF or map operation rather than a DataFrame join, use a broadcast variable. A small dict/set is replicated to every executor exactly once, instead of being re-sent for every row.
# Build a dict from small reference data on the driver, then broadcast it
code_map = {row["code"]: row["name"] for row in dim_small.collect()}
bc = spark.sparkContext.broadcast(code_map)
from pyspark.sql.functions import udf
@udf("string")
def lookup_name(code):
return bc.value.get(code) # look up in the replicated dict
df = df.withColumn("name", lookup_name("code"))| broadcast join | broadcast variable | |
|---|---|---|
| Target | DataFrame | Python object (dict/set/model) |
| Usage | Join | Reference inside UDF/map |
| Advantage | Leverages the optimizer | Reference from arbitrary logic |
For most enrichment, a broadcast join is better (the optimizer handles it and there's no UDF overhead). Reserve broadcast variables for "references that can't be expressed as a join" — complex lookups, small ML models, rule tables.
5. Heavy Models and Connections — Use mapInPandas
When enriching with heavy objects like ML models or DB connections, mapInPandas fits better than a broadcast variable — because it initializes once per partition.
def enrich(batches):
model = load_model() # once per partition (not per row)
for pdf in batches:
pdf["score"] = model.predict(pdf[features])
yield pdf
df.mapInPandas(enrich, schema="... score double")(This pattern is covered in detail in our separate post "Why PySpark UDFs Are Slow and Pandas UDFs".)
6. Streaming Enrichment
Broadcast works for attaching reference data to a stream as well. But if the reference data changes, you need to handle that.
# Broadcast join a static dimension to the stream (when the dimension rarely changes)
stream.join(broadcast(dim_static), "key")
# If the dimension changes periodically: re-read the latest dimension inside foreachBatch and join
def process(batch_df, batch_id):
dim = spark.read.table("analytics.dim") # latest dimension every batch
batch_df.join(broadcast(dim), "key").write...For frequently changing dimensions, a common pattern is to read the latest version inside foreachBatch on every micro-batch and broadcast it.
7. Choosing an Enrichment Pattern
| Situation | Recommendation |
|---|---|
| Joining a small dimension | broadcast() join |
| Very small dict lookup (UDF required) | broadcast variable |
| Heavy model/connection | mapInPandas |
| Large dimension, repeated joins | Bucketing / regular join |
| Stream + static dimension | broadcast join |
| Stream + changing dimension | Reload inside foreachBatch |
8. Summary
| Tool | Purpose | Caveat |
|---|---|---|
| broadcast join | Joining small dimensions | Never broadcast anything too large (OOM) |
| broadcast variable | Small lookups inside UDFs | dict/set/small models |
| mapInPandas | Heavy objects | Initializes per partition |
| Bucketing | Repeated joins on large dimensions | Pre-shuffles up front |
The core principle of data enrichment: "don't shuffle small reference data — replicate it". Most enrichment finishes shuffle-free with a single line of broadcast() join; use broadcast variables only for references that can't be expressed as a join, and mapInPandas for heavy models. Avoid the one pitfall — broadcasting something too large causes OOM — and enrichment becomes the fastest, simplest operation you run. Always verify the actual size of "the thing you think is small".
This post is based on Spark 3.5. If you need help designing large-scale data enrichment and join optimization, feel free to reach out.
— The Data Dynamics Engineering Team