Conquering PySpark Data Skew — Rescuing Jobs Stuck at 99%
A diagnosis-to-resolution guide for data skew, where a single task runs forever. Covers the causes of skew, how to identify it in the Spark UI, and practical patterns with code: AQE Skew Join, salting, broadcast joins, and pre-aggregation.
Every data engineer has watched a Spark job that "looks stuck at 99%." 199 out of 200 tasks finished in seconds, but the last one has been running for 30 minutes. Almost always, the culprit is the same — data skew.
This post explains why skew happens, how to identify it in the Spark UI, and walks through practical remediation patterns — from AQE Skew Join to salting — with code.
1. What Is Skew — Partition Imbalance
Spark splits data into partitions and distributes them across tasks. When a shuffle occurs (join, groupBy), the same key lands in the same partition. If data is concentrated on a particular key, the partition responsible for that key becomes enormous.
Normal: [50MB][52MB][48MB][51MB] ... → all tasks take similar time
Skewed: [50MB][48MB][9GB!!][51MB] ... → one task runs 30 min, the rest are doneSymptoms:
- One or two tasks take dramatically longer than the rest (stragglers)
- Massive spill or OOM in those tasks
- Total job time is bound by "the single slowest task"
2. Where Skew Comes From
| Cause | Example |
|---|---|
| Data concentrated on a specific key | user_id = 0 (anonymous), null, guest accounts |
| Hot keys (popular products or users) | 10% of events land on a single seller |
| NULL join keys | All NULLs pile into one partition |
| Low-cardinality groupBy | group by country → only the 'KR' partition is huge |
| Uneven source partitions | Kafka partition or file size variance |
The most common offender is a NULL or default-value key (0, '', 'unknown'). Hundreds of millions of meaningless rows pile up and funnel into a single partition.
3. Diagnosis — Confirming Skew in the Spark UI
Don't guess — look at the task distribution in the Stage detail page of the Spark UI.
- In Summary Metrics, if the Max of
Duration,Shuffle Read Size, orSpillis tens to hundreds of times the Median, you have skew. - A large gap between the
75th percentileandMaxsignals that a handful of tasks are dragging down the whole stage.
# Check directly which keys are concentrated
(df.groupBy("join_key")
.count()
.orderBy(F.desc("count"))
.show(20, truncate=False))If the top few keys account for a significant share of the total, hot-key skew is confirmed.
4. Fix 1 — AQE Skew Join (Try This First)
Adaptive Query Execution (AQE) in Spark 3.0+ detects skewed partitions at runtime and automatically splits them into smaller sub-partitions. It is the first thing to try, with the least effort.
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
# Skew detection thresholds (defaults — tune if needed)
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256MB")Detection rule (roughly): a partition is considered skewed and split if it is larger than skewedPartitionFactor (default 5) times the median and also exceeds thresholdInBytes (default 256MB).
| Pros | Limitations |
|---|---|
| Almost no code changes | Applies only to sort-merge joins (not broadcast) |
| Adapts automatically at runtime | Splitting alone may not be enough for extreme skew |
If AQE alone solves it, you're done. If not, layer on the techniques below.
5. Fix 2 — Salting
For extreme hot keys, spread the key artificially. Attach a random suffix (salt) to the hot key to scatter it across multiple partitions, and replicate the smaller table once per salt value.
from pyspark.sql import functions as F
N = 16 # number of salts
# Big (skewed) table: assign a random salt to the join key
big_salted = big.withColumn("salt", (F.rand() * N).cast("int"))
# Small table: replicate (explode) with 0..N-1 so it matches every salt
small_salted = (small
.withColumn("salt", F.explode(F.array([F.lit(i) for i in range(N)]))))
# Include the salt in the join key
joined = big_salted.join(small_salted, ["join_key", "salt"]).drop("salt")The core idea: one hot key → split into 16 slices as key#0 ~ key#15, processed by 16 tasks in parallel. Replicating the small table 16x has a cost, but it's far better than a job that stalls on skew.
Salting Only the Hot Keys (Optimization)
Salting everything increases shuffle volume. It's more efficient to apply salt only to the hot keys and join the rest normally.
hot_keys = [0, None] # hot keys found during diagnosis
is_hot = F.col("join_key").isin(hot_keys)
# Salt only the hot-key rows, leave the rest as-is → union, then join6. Fix 3 — Broadcast Join
If the other side of the join is small enough (a few hundred MB or less), replicate the small side to every executor and eliminate the shuffle entirely. No shuffle means no skew.
from pyspark.sql.functions import broadcast
joined = big.join(broadcast(small_dim), "join_key")
# Adjust the auto-broadcast threshold (default 10MB)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "100MB")For star-schema joins where one side is small — like dimension tables — this is the fundamental cure for skew. Just verify the size first: broadcasting a table that's too large causes executor OOM.
7. Fix 4 — Isolating NULL/Default-Value Keys
NULLs never match each other in a join, yet they all collect in one partition and create skew. Split them off before the join.
# NULL keys can't match in an inner join anyway, so exclude them upfront
matched = big.filter(F.col("join_key").isNotNull()).join(small, "join_key")
# If NULL rows are needed in the result (outer), handle them separately and union
null_rows = big.filter(F.col("join_key").isNull())
result = matched.unionByName(null_rows, allowMissingColumns=True)The same principle applies to default values (0, 'unknown'): if the value is meaningless, isolate it or handle it separately.
8. Fix 5 — Pre-Aggregation to Shrink the Join Input
For groupBy skew, aggregate before the join to reduce the amount of data entering the join in the first place. Aggregating the large fact table by key first makes the join input smaller and relieves the skew.
# Aggregate the fact table first → join the smaller result with the dimension
agg = fact.groupBy("seller_id").agg(F.sum("amount").alias("total"))
result = agg.join(dim_seller, "seller_id")9. Choosing a Technique
| Situation | First-choice fix |
|---|---|
| General skew | Enable AQE Skew Join |
| One side is small | Broadcast Join |
| Extreme hot keys (a few) | Hot-key salting |
| NULL/default-value keys | Isolate the keys, handle separately |
| groupBy skew | Pre-aggregation / two-stage aggregation |
| The same join repeated often | Pre-shuffle with bucketing |
In practice the order is usually enable AQE → if skew remains, broadcast/salting.
10. Common Pitfalls
| Pitfall | Result |
|---|---|
Trying to fix it with repartition(n) | Just an even redistribution — key skew remains |
| Salt count too small | Insufficient spreading |
| Salt count too large | Small-table replication cost explodes |
| Broadcast target is actually large | Executor OOM |
| AQE left disabled | No automatic skew handling |
repartitiononly changes the number of partitions; it can't stop a specific key from piling into one partition. For key skew, salt/broadcast is the right answer.
11. Summary
| Fix | Core idea | Best for |
|---|---|---|
| AQE Skew Join | Automatic runtime splitting | Almost every case — try first |
| Broadcast | Eliminates the shuffle | When one side is small |
| Salting | Artificially spreads hot keys | Extreme hot keys |
| Key isolation | Quarantines NULL/default values | Meaningless-key concentration |
| Pre-aggregation | Shrinks the join input | groupBy skew |
The starting point for fixing data skew is "don't guess — look at the task distribution in the Spark UI." If Max is tens of times the Median, suspect skew, identify which keys are concentrated, and respond in order: AQE → broadcast → salting. The moment a job that used to stall at 99% finishes evenly, you'll know understanding skew was worth it.
This article is based on Spark 3.5. If you need help diagnosing skew and performance issues in large-scale Spark jobs, feel free to reach out.
— Data Dynamics Engineering Team