Blog
pysparksparkhierarchical-datarecursiongraphdata-engineering

Recursive and Hierarchical Data in PySpark — Expanding BOMs and Org Charts Without Recursive CTEs

How to process BOMs, org charts, and category trees with unbounded parent-child depth in PySpark. Why Spark is weak at recursive CTEs, and practical patterns using iterative joins for expansion, path accumulation, and cycle detection.

Data DynamicsJune 5, 20267 min read

Bills of materials (BOMs), org charts, category trees, comment threads — these are all hierarchical data with unbounded depth defined by parent-child relationships. Requirements like "expand every subcomponent needed to build this product" or "find every employee under this manager" are inherently recursive.

Traditional RDBMSs solve this with WITH RECURSIVE (recursive CTEs), but Spark SQL went a long time without proper recursive CTE support. This post covers the pattern of expanding hierarchical data in Spark with iterative joins, along with practical details like path accumulation and cycle detection.

1. The Problem — Expansion of Unknown Depth

parent_child table:
  parent | child
  A      | B
  A      | C
  B      | D
  D      | E      ← four levels deep under A
 
Question: "What are all of A's descendants?"  → {B, C, D, E}  (depth unknown in advance)

If the depth were fixed, you could simply repeat a join that many times — but real-world hierarchies have variable depth. So you need a structure that "repeats until no new descendants appear."

2. Why Recursion Is Hard in Spark

RDBMSSpark
Recursive CTENative WITH RECURSIVELimited/version-dependent, poor fit for distributed execution
Execution modelSingle-node iterationDistributed — each iteration accumulates shuffles and lineage
PitfallsAlmost noneLineage explosion, repeated shuffles, infinite loops on cycles

Spark's distributed execution and "iteration" don't get along. Every iteration stacks transformations, lengthening the lineage, and shuffles accumulate. That's why explicit iteration with checkpointing, under your direct control, is the more stable approach.

3. The Core Pattern — Iterative Join

Add children one level at a time to the set of "descendants found so far," repeating until no new descendants appear.

from pyspark.sql import functions as F
 
edges = spark.table("parent_child")            # parent, child
spark.sparkContext.setCheckpointDir("/tmp/hier-ckpt")
 
def descendants(root):
    # Start: root's direct children
    frontier = edges.where(F.col("parent") == root).select("child")
    result = frontier
    iteration = 0
 
    while True:
        # Expand the current frontier's children by one level
        next_level = (frontier.alias("f")
            .join(edges.alias("e"), F.col("f.child") == F.col("e.parent"))
            .select(F.col("e.child").alias("child"))
            .distinct())
 
        # Exclude what we already found (newly discovered descendants only)
        new_nodes = next_level.join(result, "child", "left_anti")
 
        if new_nodes.isEmpty():     # No new descendants left → stop
            break
 
        result = result.unionByName(new_nodes).distinct()
        frontier = new_nodes
        iteration += 1
 
        # Truncate the iterative lineage (essential)
        if iteration % 3 == 0:
            result = result.checkpoint()
 
    return result

Key elements:

  • frontier: only the nodes newly added in this step (avoids recomputing everything).
  • left_anti join: removes nodes already found, preventing duplicates and revisits (= cycle defense).
  • Termination condition: stop when no new nodes appear.
  • checkpoint: cuts the iterative lineage to avoid OOM and planner slowdowns (same principle as our separate post "Entity Resolution with GraphFrames").

4. Accumulating Paths and Depth (Level)

Often you need more than a plain descendant set — you need the "A→B→D→E path" or each node's depth (e.g., org level, BOM expansion quantities).

def expand_with_path(root):
    # Start: depth 1, path = [root, child]
    frontier = (edges.where(F.col("parent") == root)
        .select(
            F.col("child"),
            F.lit(1).alias("level"),
            F.array(F.lit(root), F.col("child")).alias("path")))
    result = frontier
 
    while True:
        nxt = (frontier.alias("f")
            .join(edges.alias("e"), F.col("f.child") == F.col("e.parent"))
            .select(
                F.col("e.child").alias("child"),
                (F.col("f.level") + 1).alias("level"),
                F.concat(F.col("f.path"), F.array(F.col("e.child"))).alias("path")))
 
        # Cycle defense: drop nodes that already appear in the path
        nxt = nxt.where(~F.array_contains(
            F.expr("slice(path, 1, size(path)-1)"), F.col("child")))
 
        new_nodes = nxt.join(result.select("child"), "child", "left_anti")
        if new_nodes.isEmpty():
            break
        result = result.unionByName(new_nodes)
        frontier = new_nodes
 
    return result   # child, level, path

We accumulate the route in the path array, and if a node already in the path shows up again, it's a cycle and gets excluded. This is the key to preventing infinite loops.

5. Cycle Detection — The Infinite Loop Trap

The most dangerous pitfall in hierarchical data is a cycle. With bad data like A→B→A, the iteration never terminates.

DefenseMethod
Exclude visited nodesRemove already-found nodes with left_anti
Path-based detectionA duplicate node in the path array means a cycle
Maximum iteration capAn upper bound on iterations as a safety net
MAX_DEPTH = 50      # safety cap — assuming no legitimate hierarchy is deeper
if iteration > MAX_DEPTH:
    raise RuntimeError("Suspected cycle: maximum depth exceeded")

Hard rule for production: never assume the source data has no cycles. Always combine visited-node exclusion with a maximum-depth cap so that bad data can't send a job into infinite execution.

6. BOM Quantity Expansion (Weighted Accumulation)

A BOM isn't a plain tree — quantities multiply. "1 finished product = 2 modules, 1 module = 4 screws → 8 screws per finished product." Accumulate the quantity product within the iterative join.

# edges: parent, child, qty
frontier = (edges.where(F.col("parent") == product)
    .select("child", F.col("qty").alias("total_qty")))
 
# Multiply quantities as you expand
nxt = (frontier.alias("f")
    .join(edges.alias("e"), F.col("f.child") == F.col("e.parent"))
    .select(
        F.col("e.child").alias("child"),
        (F.col("f.total_qty") * F.col("e.qty")).alias("total_qty")))
 
# If the same part is used along multiple paths, sum at the end
result.groupBy("child").agg(F.sum("total_qty").alias("required_qty"))

7. Alternative — GraphFrames

Since hierarchy/recursion is fundamentally a graph problem, you can also use GraphFrames' BFS or Connected Components.

from graphframes import GraphFrame
g = GraphFrame(vertices, edges)
 
# Explore nodes reachable from root (BFS)
paths = g.bfs(fromExpr=f"id = '{root}'", toExpr="id IS NOT NULL", maxPathLength=10)
MethodBest for
Iterative joinCustom logic such as quantity expansion and path accumulation
GraphFrames BFSSimple reachability and shortest paths

With complex business logic (quantities, conditional rules), iterative joins win; for pure graph traversal, GraphFrames is cleaner.

8. Performance Caveats

ItemCaution
Lineage explosionPeriodic checkpoint is mandatory
Repeated shufflesEvery iteration is a join (shuffle) — broadcast if edges is small
Frontier blowupWide trees see node counts surge at each level
CyclesInfinite loops — visited exclusion + depth cap
Small edgesEliminate shuffles with broadcast(edges)

If the edges table is small enough, turn each iteration's join into a broadcast join to eliminate shuffles entirely (see the broadcast section of our separate post "Mastering Data Skew in PySpark").

9. Summary

ElementKey point
ExpansionIterative join — until no new nodes
frontierExpand only new nodes (never recompute everything)
Path/depthAccumulate in an array
Cycle defenseVisited exclusion + path duplicate check + max depth
lineagePeriodic checkpoint
BOM quantitiesAccumulate products, then sum

Hierarchical data in Spark isn't a case of "no recursive CTE, so it can't be done" — it's a problem you control directly with iterative joins. Expand only the frontier to keep costs down, cut lineage with checkpoints, and always defend against cycles — stick to these three rules and you can reliably handle everything from BOM expansion to org charts. The key is staying aware that iteration in a distributed environment behaves differently than on a single node.


This post was written against Spark 3.5. If you need help designing hierarchical data pipelines — BOM expansion, org hierarchies, category trees — feel free to reach out.

— The Data Dynamics Engineering Team