Blog
pysparksparkbucketingshufflejoindata-engineering

PySpark Bucketing — Eliminating Shuffles from Repeated Large Joins

If you keep shuffling the same large tables for every join, bucketing is the answer. We cover how bucketing eliminates shuffles, how to create and join bucketed tables, how to size the bucket count, and when to use — or avoid — bucketing in the Iceberg/Delta era.

Data DynamicsJune 5, 20266 min read

When you join two large tables, both sides get shuffled by the join key. But what if you join the same two tables dozens of times a day? You are repeating the same shuffle every single time. Bucketing is a technique that performs this shuffle once, up front, at write time, so that subsequent joins need no shuffle at all.

This post covers how bucketing eliminates shuffles, how to create and join bucketed tables, how to design the bucket count, and when bucketing makes sense in the Lakehouse era.

1. The Problem — Repeated Shuffles

On every join:
  big_A ──(shuffle by join key)──┐
                                 ├─ SortMergeJoin
  big_B ──(shuffle by join key)──┘
→ Both tables are redistributed over the network every time (expensive)
→ Repeating the same join means repeating the same shuffle

If the join key is the same but both tables are too large to broadcast, SortMergeJoin shuffles both sides every time. Accumulated over many runs, this cost becomes enormous.

2. The Idea Behind Bucketing — Shuffle in Advance

Bucketing pre-splits a table into N buckets by the hash of the join key at write time. The same key always lands in the same bucket number. If two tables are bucketed by the same key with the same bucket count, a join only needs to match "bucket against bucket" — so no shuffle is required.

[Bucketed tables]
  big_A: bucket_0, bucket_1, ... bucket_31  (distributed by hash of user_id)
  big_B: bucket_0, bucket_1, ... bucket_31  (identical)
 
Join: bucket_0 ↔ bucket_0, bucket_1 ↔ bucket_1 ...
→ Local joins between matching buckets, no shuffle
Without bucketingWith bucketing
Shuffle at join timeBoth sides, every timeNone (done in advance)
Write costLowHigh (shuffle at write time)
Best forOne-off joinsRepeated joins

The key insight: bucketing moves the shuffle cost to a single write-time event. Bucket once, and every subsequent join runs shuffle-free.

3. Creating Bucketed Tables

Bucketed tables must be saved as metastore tables (saveAsTable). A plain write.parquet path write does not preserve the bucket information.

# 32 buckets by user_id, sorted within each bucket
(big_A.write
    .bucketBy(32, "user_id")
    .sortBy("user_id")              # pre-sorting also skips the sort phase of SMJ
    .mode("overwrite")
    .saveAsTable("analytics.events_bucketed"))
 
(big_B.write
    .bucketBy(32, "user_id")        # same key, same bucket count!
    .sortBy("user_id")
    .mode("overwrite")
    .saveAsTable("analytics.users_bucketed"))

4. Bucketed Joins — Verifying the Shuffle Is Gone

a = spark.table("analytics.events_bucketed")
b = spark.table("analytics.users_bucketed")
 
joined = a.join(b, "user_id")
joined.explain()
# → Success if the Exchange (shuffle) nodes disappear and only SortMergeJoin remains

If there is no Exchange on either side of the join in EXPLAIN, bucketing is working (see the EXPLAIN-reading section of our separate post "Debugging Slow PySpark Jobs"). If the buckets do not match (e.g., mismatched bucket counts), the shuffle reappears.

5. Designing the Bucket Count

Choose the bucket count carefully — changing it later requires a full rewrite.

ConsiderationGuideline
Size per bucketTarget 128MB–1GB, like partitions
Bucket countToo few → huge buckets (skew, OOM); too many → small files
ParallelismBucket count ≥ roughly the number of executor cores
Both sides matchThe two joined tables must have the same bucket count to eliminate the shuffle
# Rough estimate: total size / target bucket size
# Example: 256GB / 512MB ≈ 512 buckets (powers of two preferred)

Common trap: creating one table with 32 buckets and the other with 64 will not eliminate the shuffle. Standardize the bucket count across tables that will be joined together.

6. Limitations and Pitfalls of Bucketing

PitfallResult
Mismatched bucket countsShuffle comes back
Skewed bucket keyA few buckets become huge → skew
Bucketing a small tableUnnecessary — broadcast is better
Bucketing for one-off joinsYou only pay the write-time shuffle cost
Changing the bucket countRequires a full rewrite
Saving via write.parquet pathBucket info is lost → metastore table required

Bucketing only pays off for repeated joins. For a join that runs once, you simply add a shuffle cost at write time for nothing.

7. The Lakehouse Era — Iceberg/Delta and Bucketing

This part matters for readers of this blog. Iceberg/Delta provide bucket partitioning at the table-format level, which is more flexible than Hive-style bucketing.

Iceberg bucket transform

-- Iceberg: hidden partitioning via the bucket transform
CREATE TABLE analytics.events (
  user_id BIGINT, event_time TIMESTAMP, ...
) USING iceberg
PARTITIONED BY (days(event_time), bucket(32, user_id));

Iceberg's bucket(N, col) is part of the partition strategy: it can be changed later through partition evolution and works together with metadata-based pruning. (We covered Iceberg partition transforms in a separate post, "How Trino + Iceberg Solves the Partitioning Problem.")

Hive bucketingIceberg bucket
Changing itFull rewritePartition evolution
PruningLimitedManifest-based
Engine compatibilitySpark-centricShared across Spark and Trino

If you are on a Lakehouse, consider Iceberg/Delta bucket partitioning before pure Hive bucketing. It is more flexible and behaves consistently with other engines such as Trino.

8. Bucketing vs Other Shuffle-Reduction Techniques

TechniqueWhen
Broadcast JoinWhen one side is small (try first)
BucketingRepeated joins between large tables
Pre-aggregationWhen data can be reduced before the join
AQE partition coalescingCleaning up small partitions after a shuffle
Iceberg bucket partitioningLakehouse + repeated joins

The order of preference: broadcast if possible → if not, and the join is repeated, bucketing / Iceberg bucket.

9. Summary

ItemKey point
PrincipleShuffle in advance at write time → zero shuffle at join time
CreationbucketBy(N, key).sortBy(key).saveAsTable
RequirementsSame key and same bucket count on both tables, metastore table
Good fitRepeated joins between large tables
Bad fitOne-off joins, small tables (→ broadcast)
LakehousePrefer Iceberg/Delta bucket partitioning

Bucketing is an investment that "collapses the shuffle cost of all future joins into one." It is powerful for workloads that repeatedly join the same large tables, but a net loss for one-off joins or small tables. And in a Lakehouse environment, it is wiser to consider the more flexible bucket partitioning of Iceberg/Delta first — it achieves the same goal of eliminating shuffles, while adding partition evolution and multi-engine compatibility on top.


This post is based on Spark 3.5 + Iceberg/Delta. If you need help optimizing repeated joins or designing Lakehouse partitioning, feel free to reach out.

— Data Dynamics Engineering Team