Blog
pysparksparkbroadcastjoinenrichmentdata-engineering

PySpark Broadcast Variables and Large Lookups — Enriching Data Without a Shuffle

How to enrich hundreds of millions of rows with small reference data without shuffling. We cover the difference between broadcast joins and broadcast variables, the auto-broadcast threshold, the dangers of broadcasting something too large, and patterns for external API/model lookups.

Data DynamicsJune 5, 20266 min read

Data enrichment is everyday work — attaching product details to transactions, geo information to IPs, names to codes. Joining a large fact table (hundreds of millions of rows) with small reference data (tens to hundreds of thousands of rows) gets slow if done naively, because both sides get shuffled. If you instead replicate the small data to every worker, the job finishes with zero shuffle.

This post covers the difference between broadcast joins and broadcast variables, how auto-broadcast works, the dangers of broadcasting something too large, and patterns for external lookups.

1. The Problem — Attaching Small Data Triggers a Shuffle

Large fact (500M rows) ──(shuffled by join key)──┐
                                                 ├─ SortMergeJoin
Small dimension (100K rows) ──(shuffled)─────────┘
→ 500M rows redistributed over the network (just to attach small data!)

Even though the dimension is tiny, a SortMergeJoin shuffles all 500 million fact rows. That's pure waste. Replicating the small dimension eliminates this shuffle entirely.

2. Broadcast Join — The Most Common Solution

Replicate the small side to every executor and join without any shuffle.

from pyspark.sql.functions import broadcast
 
enriched = fact.join(broadcast(dim_product), "product_id")
# → dim_product is replicated to every worker; fact is joined in place (zero shuffle)
# Auto-broadcast threshold (default 10MB) — tables smaller than this (per statistics) are broadcast automatically
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "50MB")
SortMergeJoinBroadcast Join
Large sideShuffledProcessed in place
Small sideShuffledReplicated to every worker
Best forBoth sides largeOne side small

Spark automatically broadcasts tables that statistics say are small. If auto-broadcast doesn't kick in (e.g., missing statistics), force it with the broadcast() hint. If you see BroadcastHashJoin in EXPLAIN, it worked (see our separate post "Debugging Slow PySpark Jobs").

3. The Biggest Pitfall — Broadcasting Something Too Large

The broadcast target is collected on the driver and then replicated to every executor. If it's too large:

Broadcast a large table → driver collects it whole → driver OOM
Or → each executor holds a giant copy in memory → executor OOM
Warning signConsequence
Broadcast target is hundreds of MB to GBDriver/executor OOM
Blindly raising the thresholdLarge tables get auto-broadcast and blow up
Inaccurate statisticsA large table is misjudged as small

Rule of thumb: broadcast only truly small data (tens of MB at most). When a dimension grows, consider bucketing (see our separate post "PySpark Bucketing") or a regular join instead of broadcasting. Make it a habit to check whether "the dimension you think is small" is actually small.

4. Broadcast Variables — Lookups, Not Joins

When you need reference data inside a UDF or map operation rather than a DataFrame join, use a broadcast variable. A small dict/set is replicated to every executor exactly once, instead of being re-sent for every row.

# Build a dict from small reference data on the driver, then broadcast it
code_map = {row["code"]: row["name"] for row in dim_small.collect()}
bc = spark.sparkContext.broadcast(code_map)
 
from pyspark.sql.functions import udf
@udf("string")
def lookup_name(code):
    return bc.value.get(code)        # look up in the replicated dict
 
df = df.withColumn("name", lookup_name("code"))
broadcast joinbroadcast variable
TargetDataFramePython object (dict/set/model)
UsageJoinReference inside UDF/map
AdvantageLeverages the optimizerReference from arbitrary logic

For most enrichment, a broadcast join is better (the optimizer handles it and there's no UDF overhead). Reserve broadcast variables for "references that can't be expressed as a join" — complex lookups, small ML models, rule tables.

5. Heavy Models and Connections — Use mapInPandas

When enriching with heavy objects like ML models or DB connections, mapInPandas fits better than a broadcast variable — because it initializes once per partition.

def enrich(batches):
    model = load_model()                 # once per partition (not per row)
    for pdf in batches:
        pdf["score"] = model.predict(pdf[features])
        yield pdf
 
df.mapInPandas(enrich, schema="... score double")

(This pattern is covered in detail in our separate post "Why PySpark UDFs Are Slow and Pandas UDFs".)

6. Streaming Enrichment

Broadcast works for attaching reference data to a stream as well. But if the reference data changes, you need to handle that.

# Broadcast join a static dimension to the stream (when the dimension rarely changes)
stream.join(broadcast(dim_static), "key")
 
# If the dimension changes periodically: re-read the latest dimension inside foreachBatch and join
def process(batch_df, batch_id):
    dim = spark.read.table("analytics.dim")   # latest dimension every batch
    batch_df.join(broadcast(dim), "key").write...

For frequently changing dimensions, a common pattern is to read the latest version inside foreachBatch on every micro-batch and broadcast it.

7. Choosing an Enrichment Pattern

SituationRecommendation
Joining a small dimensionbroadcast() join
Very small dict lookup (UDF required)broadcast variable
Heavy model/connectionmapInPandas
Large dimension, repeated joinsBucketing / regular join
Stream + static dimensionbroadcast join
Stream + changing dimensionReload inside foreachBatch

8. Summary

ToolPurposeCaveat
broadcast joinJoining small dimensionsNever broadcast anything too large (OOM)
broadcast variableSmall lookups inside UDFsdict/set/small models
mapInPandasHeavy objectsInitializes per partition
BucketingRepeated joins on large dimensionsPre-shuffles up front

The core principle of data enrichment: "don't shuffle small reference data — replicate it". Most enrichment finishes shuffle-free with a single line of broadcast() join; use broadcast variables only for references that can't be expressed as a join, and mapInPandas for heavy models. Avoid the one pitfall — broadcasting something too large causes OOM — and enrichment becomes the fastest, simplest operation you run. Always verify the actual size of "the thing you think is small".


This post is based on Spark 3.5. If you need help designing large-scale data enrichment and join optimization, feel free to reach out.

— The Data Dynamics Engineering Team