Blog
pysparksparkdata-skewperformanceaqedata-engineering

Conquering PySpark Data Skew — Rescuing Jobs Stuck at 99%

A diagnosis-to-resolution guide for data skew, where a single task runs forever. Covers the causes of skew, how to identify it in the Spark UI, and practical patterns with code: AQE Skew Join, salting, broadcast joins, and pre-aggregation.

Data DynamicsJune 5, 20267 min read

Every data engineer has watched a Spark job that "looks stuck at 99%." 199 out of 200 tasks finished in seconds, but the last one has been running for 30 minutes. Almost always, the culprit is the same — data skew.

This post explains why skew happens, how to identify it in the Spark UI, and walks through practical remediation patterns — from AQE Skew Join to salting — with code.

1. What Is Skew — Partition Imbalance

Spark splits data into partitions and distributes them across tasks. When a shuffle occurs (join, groupBy), the same key lands in the same partition. If data is concentrated on a particular key, the partition responsible for that key becomes enormous.

Normal: [50MB][52MB][48MB][51MB] ...  → all tasks take similar time
Skewed: [50MB][48MB][9GB!!][51MB] ...  → one task runs 30 min, the rest are done

Symptoms:

  • One or two tasks take dramatically longer than the rest (stragglers)
  • Massive spill or OOM in those tasks
  • Total job time is bound by "the single slowest task"

2. Where Skew Comes From

CauseExample
Data concentrated on a specific keyuser_id = 0 (anonymous), null, guest accounts
Hot keys (popular products or users)10% of events land on a single seller
NULL join keysAll NULLs pile into one partition
Low-cardinality groupBygroup by country → only the 'KR' partition is huge
Uneven source partitionsKafka partition or file size variance

The most common offender is a NULL or default-value key (0, '', 'unknown'). Hundreds of millions of meaningless rows pile up and funnel into a single partition.

3. Diagnosis — Confirming Skew in the Spark UI

Don't guess — look at the task distribution in the Stage detail page of the Spark UI.

  • In Summary Metrics, if the Max of Duration, Shuffle Read Size, or Spill is tens to hundreds of times the Median, you have skew.
  • A large gap between the 75th percentile and Max signals that a handful of tasks are dragging down the whole stage.
# Check directly which keys are concentrated
(df.groupBy("join_key")
   .count()
   .orderBy(F.desc("count"))
   .show(20, truncate=False))

If the top few keys account for a significant share of the total, hot-key skew is confirmed.

4. Fix 1 — AQE Skew Join (Try This First)

Adaptive Query Execution (AQE) in Spark 3.0+ detects skewed partitions at runtime and automatically splits them into smaller sub-partitions. It is the first thing to try, with the least effort.

spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
 
# Skew detection thresholds (defaults — tune if needed)
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256MB")

Detection rule (roughly): a partition is considered skewed and split if it is larger than skewedPartitionFactor (default 5) times the median and also exceeds thresholdInBytes (default 256MB).

ProsLimitations
Almost no code changesApplies only to sort-merge joins (not broadcast)
Adapts automatically at runtimeSplitting alone may not be enough for extreme skew

If AQE alone solves it, you're done. If not, layer on the techniques below.

5. Fix 2 — Salting

For extreme hot keys, spread the key artificially. Attach a random suffix (salt) to the hot key to scatter it across multiple partitions, and replicate the smaller table once per salt value.

from pyspark.sql import functions as F
 
N = 16  # number of salts
 
# Big (skewed) table: assign a random salt to the join key
big_salted = big.withColumn("salt", (F.rand() * N).cast("int"))
 
# Small table: replicate (explode) with 0..N-1 so it matches every salt
small_salted = (small
    .withColumn("salt", F.explode(F.array([F.lit(i) for i in range(N)]))))
 
# Include the salt in the join key
joined = big_salted.join(small_salted, ["join_key", "salt"]).drop("salt")

The core idea: one hot key → split into 16 slices as key#0 ~ key#15, processed by 16 tasks in parallel. Replicating the small table 16x has a cost, but it's far better than a job that stalls on skew.

Salting Only the Hot Keys (Optimization)

Salting everything increases shuffle volume. It's more efficient to apply salt only to the hot keys and join the rest normally.

hot_keys = [0, None]  # hot keys found during diagnosis
is_hot = F.col("join_key").isin(hot_keys)
 
# Salt only the hot-key rows, leave the rest as-is → union, then join

6. Fix 3 — Broadcast Join

If the other side of the join is small enough (a few hundred MB or less), replicate the small side to every executor and eliminate the shuffle entirely. No shuffle means no skew.

from pyspark.sql.functions import broadcast
 
joined = big.join(broadcast(small_dim), "join_key")
 
# Adjust the auto-broadcast threshold (default 10MB)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "100MB")

For star-schema joins where one side is small — like dimension tables — this is the fundamental cure for skew. Just verify the size first: broadcasting a table that's too large causes executor OOM.

7. Fix 4 — Isolating NULL/Default-Value Keys

NULLs never match each other in a join, yet they all collect in one partition and create skew. Split them off before the join.

# NULL keys can't match in an inner join anyway, so exclude them upfront
matched = big.filter(F.col("join_key").isNotNull()).join(small, "join_key")
 
# If NULL rows are needed in the result (outer), handle them separately and union
null_rows = big.filter(F.col("join_key").isNull())
result = matched.unionByName(null_rows, allowMissingColumns=True)

The same principle applies to default values (0, 'unknown'): if the value is meaningless, isolate it or handle it separately.

8. Fix 5 — Pre-Aggregation to Shrink the Join Input

For groupBy skew, aggregate before the join to reduce the amount of data entering the join in the first place. Aggregating the large fact table by key first makes the join input smaller and relieves the skew.

# Aggregate the fact table first → join the smaller result with the dimension
agg = fact.groupBy("seller_id").agg(F.sum("amount").alias("total"))
result = agg.join(dim_seller, "seller_id")

9. Choosing a Technique

SituationFirst-choice fix
General skewEnable AQE Skew Join
One side is smallBroadcast Join
Extreme hot keys (a few)Hot-key salting
NULL/default-value keysIsolate the keys, handle separately
groupBy skewPre-aggregation / two-stage aggregation
The same join repeated oftenPre-shuffle with bucketing

In practice the order is usually enable AQE → if skew remains, broadcast/salting.

10. Common Pitfalls

PitfallResult
Trying to fix it with repartition(n)Just an even redistribution — key skew remains
Salt count too smallInsufficient spreading
Salt count too largeSmall-table replication cost explodes
Broadcast target is actually largeExecutor OOM
AQE left disabledNo automatic skew handling

repartition only changes the number of partitions; it can't stop a specific key from piling into one partition. For key skew, salt/broadcast is the right answer.

11. Summary

FixCore ideaBest for
AQE Skew JoinAutomatic runtime splittingAlmost every case — try first
BroadcastEliminates the shuffleWhen one side is small
SaltingArtificially spreads hot keysExtreme hot keys
Key isolationQuarantines NULL/default valuesMeaningless-key concentration
Pre-aggregationShrinks the join inputgroupBy skew

The starting point for fixing data skew is "don't guess — look at the task distribution in the Spark UI." If Max is tens of times the Median, suspect skew, identify which keys are concentrated, and respond in order: AQE → broadcast → salting. The moment a job that used to stall at 99% finishes evenly, you'll know understanding skew was worth it.


This article is based on Spark 3.5. If you need help diagnosing skew and performance issues in large-scale Spark jobs, feel free to reach out.

— Data Dynamics Engineering Team