Recursive and Hierarchical Data in PySpark — Expanding BOMs and Org Charts Without Recursive CTEs
How to process BOMs, org charts, and category trees with unbounded parent-child depth in PySpark. Why Spark is weak at recursive CTEs, and practical patterns using iterative joins for expansion, path accumulation, and cycle detection.
Bills of materials (BOMs), org charts, category trees, comment threads — these are all hierarchical data with unbounded depth defined by parent-child relationships. Requirements like "expand every subcomponent needed to build this product" or "find every employee under this manager" are inherently recursive.
Traditional RDBMSs solve this with WITH RECURSIVE (recursive CTEs), but Spark SQL went a long time without proper recursive CTE support. This post covers the pattern of expanding hierarchical data in Spark with iterative joins, along with practical details like path accumulation and cycle detection.
1. The Problem — Expansion of Unknown Depth
parent_child table:
parent | child
A | B
A | C
B | D
D | E ← four levels deep under A
Question: "What are all of A's descendants?" → {B, C, D, E} (depth unknown in advance)If the depth were fixed, you could simply repeat a join that many times — but real-world hierarchies have variable depth. So you need a structure that "repeats until no new descendants appear."
2. Why Recursion Is Hard in Spark
| RDBMS | Spark | |
|---|---|---|
| Recursive CTE | Native WITH RECURSIVE | Limited/version-dependent, poor fit for distributed execution |
| Execution model | Single-node iteration | Distributed — each iteration accumulates shuffles and lineage |
| Pitfalls | Almost none | Lineage explosion, repeated shuffles, infinite loops on cycles |
Spark's distributed execution and "iteration" don't get along. Every iteration stacks transformations, lengthening the lineage, and shuffles accumulate. That's why explicit iteration with checkpointing, under your direct control, is the more stable approach.
3. The Core Pattern — Iterative Join
Add children one level at a time to the set of "descendants found so far," repeating until no new descendants appear.
from pyspark.sql import functions as F
edges = spark.table("parent_child") # parent, child
spark.sparkContext.setCheckpointDir("/tmp/hier-ckpt")
def descendants(root):
# Start: root's direct children
frontier = edges.where(F.col("parent") == root).select("child")
result = frontier
iteration = 0
while True:
# Expand the current frontier's children by one level
next_level = (frontier.alias("f")
.join(edges.alias("e"), F.col("f.child") == F.col("e.parent"))
.select(F.col("e.child").alias("child"))
.distinct())
# Exclude what we already found (newly discovered descendants only)
new_nodes = next_level.join(result, "child", "left_anti")
if new_nodes.isEmpty(): # No new descendants left → stop
break
result = result.unionByName(new_nodes).distinct()
frontier = new_nodes
iteration += 1
# Truncate the iterative lineage (essential)
if iteration % 3 == 0:
result = result.checkpoint()
return resultKey elements:
- frontier: only the nodes newly added in this step (avoids recomputing everything).
- left_anti join: removes nodes already found, preventing duplicates and revisits (= cycle defense).
- Termination condition: stop when no new nodes appear.
- checkpoint: cuts the iterative lineage to avoid OOM and planner slowdowns (same principle as our separate post "Entity Resolution with GraphFrames").
4. Accumulating Paths and Depth (Level)
Often you need more than a plain descendant set — you need the "A→B→D→E path" or each node's depth (e.g., org level, BOM expansion quantities).
def expand_with_path(root):
# Start: depth 1, path = [root, child]
frontier = (edges.where(F.col("parent") == root)
.select(
F.col("child"),
F.lit(1).alias("level"),
F.array(F.lit(root), F.col("child")).alias("path")))
result = frontier
while True:
nxt = (frontier.alias("f")
.join(edges.alias("e"), F.col("f.child") == F.col("e.parent"))
.select(
F.col("e.child").alias("child"),
(F.col("f.level") + 1).alias("level"),
F.concat(F.col("f.path"), F.array(F.col("e.child"))).alias("path")))
# Cycle defense: drop nodes that already appear in the path
nxt = nxt.where(~F.array_contains(
F.expr("slice(path, 1, size(path)-1)"), F.col("child")))
new_nodes = nxt.join(result.select("child"), "child", "left_anti")
if new_nodes.isEmpty():
break
result = result.unionByName(new_nodes)
frontier = new_nodes
return result # child, level, pathWe accumulate the route in the path array, and if a node already in the path shows up again, it's a cycle and gets excluded. This is the key to preventing infinite loops.
5. Cycle Detection — The Infinite Loop Trap
The most dangerous pitfall in hierarchical data is a cycle. With bad data like A→B→A, the iteration never terminates.
| Defense | Method |
|---|---|
| Exclude visited nodes | Remove already-found nodes with left_anti |
| Path-based detection | A duplicate node in the path array means a cycle |
| Maximum iteration cap | An upper bound on iterations as a safety net |
MAX_DEPTH = 50 # safety cap — assuming no legitimate hierarchy is deeper
if iteration > MAX_DEPTH:
raise RuntimeError("Suspected cycle: maximum depth exceeded")Hard rule for production: never assume the source data has no cycles. Always combine visited-node exclusion with a maximum-depth cap so that bad data can't send a job into infinite execution.
6. BOM Quantity Expansion (Weighted Accumulation)
A BOM isn't a plain tree — quantities multiply. "1 finished product = 2 modules, 1 module = 4 screws → 8 screws per finished product." Accumulate the quantity product within the iterative join.
# edges: parent, child, qty
frontier = (edges.where(F.col("parent") == product)
.select("child", F.col("qty").alias("total_qty")))
# Multiply quantities as you expand
nxt = (frontier.alias("f")
.join(edges.alias("e"), F.col("f.child") == F.col("e.parent"))
.select(
F.col("e.child").alias("child"),
(F.col("f.total_qty") * F.col("e.qty")).alias("total_qty")))
# If the same part is used along multiple paths, sum at the end
result.groupBy("child").agg(F.sum("total_qty").alias("required_qty"))7. Alternative — GraphFrames
Since hierarchy/recursion is fundamentally a graph problem, you can also use GraphFrames' BFS or Connected Components.
from graphframes import GraphFrame
g = GraphFrame(vertices, edges)
# Explore nodes reachable from root (BFS)
paths = g.bfs(fromExpr=f"id = '{root}'", toExpr="id IS NOT NULL", maxPathLength=10)| Method | Best for |
|---|---|
| Iterative join | Custom logic such as quantity expansion and path accumulation |
| GraphFrames BFS | Simple reachability and shortest paths |
With complex business logic (quantities, conditional rules), iterative joins win; for pure graph traversal, GraphFrames is cleaner.
8. Performance Caveats
| Item | Caution |
|---|---|
| Lineage explosion | Periodic checkpoint is mandatory |
| Repeated shuffles | Every iteration is a join (shuffle) — broadcast if edges is small |
| Frontier blowup | Wide trees see node counts surge at each level |
| Cycles | Infinite loops — visited exclusion + depth cap |
| Small edges | Eliminate shuffles with broadcast(edges) |
If the edges table is small enough, turn each iteration's join into a broadcast join to eliminate shuffles entirely (see the broadcast section of our separate post "Mastering Data Skew in PySpark").
9. Summary
| Element | Key point |
|---|---|
| Expansion | Iterative join — until no new nodes |
| frontier | Expand only new nodes (never recompute everything) |
| Path/depth | Accumulate in an array |
| Cycle defense | Visited exclusion + path duplicate check + max depth |
| lineage | Periodic checkpoint |
| BOM quantities | Accumulate products, then sum |
Hierarchical data in Spark isn't a case of "no recursive CTE, so it can't be done" — it's a problem you control directly with iterative joins. Expand only the frontier to keep costs down, cut lineage with checkpoints, and always defend against cycles — stick to these three rules and you can reliably handle everything from BOM expansion to org charts. The key is staying aware that iteration in a distributed environment behaves differently than on a single node.
This post was written against Spark 3.5. If you need help designing hierarchical data pipelines — BOM expansion, org hierarchies, category trees — feel free to reach out.
— The Data Dynamics Engineering Team