PySpark Bucketing — Eliminating Shuffles from Repeated Large Joins
If you keep shuffling the same large tables for every join, bucketing is the answer. We cover how bucketing eliminates shuffles, how to create and join bucketed tables, how to size the bucket count, and when to use — or avoid — bucketing in the Iceberg/Delta era.
When you join two large tables, both sides get shuffled by the join key. But what if you join the same two tables dozens of times a day? You are repeating the same shuffle every single time. Bucketing is a technique that performs this shuffle once, up front, at write time, so that subsequent joins need no shuffle at all.
This post covers how bucketing eliminates shuffles, how to create and join bucketed tables, how to design the bucket count, and when bucketing makes sense in the Lakehouse era.
1. The Problem — Repeated Shuffles
On every join:
big_A ──(shuffle by join key)──┐
├─ SortMergeJoin
big_B ──(shuffle by join key)──┘
→ Both tables are redistributed over the network every time (expensive)
→ Repeating the same join means repeating the same shuffleIf the join key is the same but both tables are too large to broadcast, SortMergeJoin shuffles both sides every time. Accumulated over many runs, this cost becomes enormous.
2. The Idea Behind Bucketing — Shuffle in Advance
Bucketing pre-splits a table into N buckets by the hash of the join key at write time. The same key always lands in the same bucket number. If two tables are bucketed by the same key with the same bucket count, a join only needs to match "bucket against bucket" — so no shuffle is required.
[Bucketed tables]
big_A: bucket_0, bucket_1, ... bucket_31 (distributed by hash of user_id)
big_B: bucket_0, bucket_1, ... bucket_31 (identical)
Join: bucket_0 ↔ bucket_0, bucket_1 ↔ bucket_1 ...
→ Local joins between matching buckets, no shuffle| Without bucketing | With bucketing | |
|---|---|---|
| Shuffle at join time | Both sides, every time | None (done in advance) |
| Write cost | Low | High (shuffle at write time) |
| Best for | One-off joins | Repeated joins |
The key insight: bucketing moves the shuffle cost to a single write-time event. Bucket once, and every subsequent join runs shuffle-free.
3. Creating Bucketed Tables
Bucketed tables must be saved as metastore tables (saveAsTable). A plain write.parquet path write does not preserve the bucket information.
# 32 buckets by user_id, sorted within each bucket
(big_A.write
.bucketBy(32, "user_id")
.sortBy("user_id") # pre-sorting also skips the sort phase of SMJ
.mode("overwrite")
.saveAsTable("analytics.events_bucketed"))
(big_B.write
.bucketBy(32, "user_id") # same key, same bucket count!
.sortBy("user_id")
.mode("overwrite")
.saveAsTable("analytics.users_bucketed"))4. Bucketed Joins — Verifying the Shuffle Is Gone
a = spark.table("analytics.events_bucketed")
b = spark.table("analytics.users_bucketed")
joined = a.join(b, "user_id")
joined.explain()
# → Success if the Exchange (shuffle) nodes disappear and only SortMergeJoin remainsIf there is no Exchange on either side of the join in EXPLAIN, bucketing is working (see the EXPLAIN-reading section of our separate post "Debugging Slow PySpark Jobs"). If the buckets do not match (e.g., mismatched bucket counts), the shuffle reappears.
5. Designing the Bucket Count
Choose the bucket count carefully — changing it later requires a full rewrite.
| Consideration | Guideline |
|---|---|
| Size per bucket | Target 128MB–1GB, like partitions |
| Bucket count | Too few → huge buckets (skew, OOM); too many → small files |
| Parallelism | Bucket count ≥ roughly the number of executor cores |
| Both sides match | The two joined tables must have the same bucket count to eliminate the shuffle |
# Rough estimate: total size / target bucket size
# Example: 256GB / 512MB ≈ 512 buckets (powers of two preferred)Common trap: creating one table with 32 buckets and the other with 64 will not eliminate the shuffle. Standardize the bucket count across tables that will be joined together.
6. Limitations and Pitfalls of Bucketing
| Pitfall | Result |
|---|---|
| Mismatched bucket counts | Shuffle comes back |
| Skewed bucket key | A few buckets become huge → skew |
| Bucketing a small table | Unnecessary — broadcast is better |
| Bucketing for one-off joins | You only pay the write-time shuffle cost |
| Changing the bucket count | Requires a full rewrite |
Saving via write.parquet path | Bucket info is lost → metastore table required |
Bucketing only pays off for repeated joins. For a join that runs once, you simply add a shuffle cost at write time for nothing.
7. The Lakehouse Era — Iceberg/Delta and Bucketing
This part matters for readers of this blog. Iceberg/Delta provide bucket partitioning at the table-format level, which is more flexible than Hive-style bucketing.
Iceberg bucket transform
-- Iceberg: hidden partitioning via the bucket transform
CREATE TABLE analytics.events (
user_id BIGINT, event_time TIMESTAMP, ...
) USING iceberg
PARTITIONED BY (days(event_time), bucket(32, user_id));Iceberg's bucket(N, col) is part of the partition strategy: it can be changed later through partition evolution and works together with metadata-based pruning. (We covered Iceberg partition transforms in a separate post, "How Trino + Iceberg Solves the Partitioning Problem.")
| Hive bucketing | Iceberg bucket | |
|---|---|---|
| Changing it | Full rewrite | Partition evolution |
| Pruning | Limited | Manifest-based |
| Engine compatibility | Spark-centric | Shared across Spark and Trino |
If you are on a Lakehouse, consider Iceberg/Delta bucket partitioning before pure Hive bucketing. It is more flexible and behaves consistently with other engines such as Trino.
8. Bucketing vs Other Shuffle-Reduction Techniques
| Technique | When |
|---|---|
| Broadcast Join | When one side is small (try first) |
| Bucketing | Repeated joins between large tables |
| Pre-aggregation | When data can be reduced before the join |
| AQE partition coalescing | Cleaning up small partitions after a shuffle |
| Iceberg bucket partitioning | Lakehouse + repeated joins |
The order of preference: broadcast if possible → if not, and the join is repeated, bucketing / Iceberg bucket.
9. Summary
| Item | Key point |
|---|---|
| Principle | Shuffle in advance at write time → zero shuffle at join time |
| Creation | bucketBy(N, key).sortBy(key).saveAsTable |
| Requirements | Same key and same bucket count on both tables, metastore table |
| Good fit | Repeated joins between large tables |
| Bad fit | One-off joins, small tables (→ broadcast) |
| Lakehouse | Prefer Iceberg/Delta bucket partitioning |
Bucketing is an investment that "collapses the shuffle cost of all future joins into one." It is powerful for workloads that repeatedly join the same large tables, but a net loss for one-off joins or small tables. And in a Lakehouse environment, it is wiser to consider the more flexible bucket partitioning of Iceberg/Delta first — it achieves the same goal of eliminating shuffles, while adding partition evolution and multi-engine compatibility on top.
This post is based on Spark 3.5 + Iceberg/Delta. If you need help optimizing repeated joins or designing Lakehouse partitioning, feel free to reach out.
— Data Dynamics Engineering Team