Blog
pysparksparkhierarchical-datarecursiongraphdata-engineering

PySpark 재귀·계층 데이터 — BOM·조직도를 재귀 CTE 없이 풀기

부모-자식 관계로 무한 깊이를 갖는 BOM, 조직도, 카테고리 트리를 PySpark 로 처리하는 법. Spark 에 재귀 CTE가 약한 이유와, 반복 조인(iterative join)으로 전개·경로 누적·순환 탐지를 구현하는 실전 패턴을 정리합니다.

Data Dynamics2026年6月5日10 min read
This post is not yet translated. The original Korean version is shown below.

BOM(자재 명세서), 조직도, 카테고리 트리, 댓글 스레드 — 모두 부모-자식 관계로 무한 깊이를 갖는 계층 데이터입니다. "이 제품을 만드는 데 필요한 모든 하위 부품을 펼쳐라", "이 팀장 아래 모든 직원을 구하라" 같은 요구는 본질적으로 재귀입니다.

전통적 RDBMS 는 WITH RECURSIVE(재귀 CTE)로 이를 풀지만, Spark SQL 은 오랫동안 재귀 CTE 를 제대로 지원하지 않았습니다. 이 글은 Spark 에서 계층 데이터를 반복 조인(iterative join) 으로 전개하는 패턴과, 경로 누적·순환 탐지 같은 실전 디테일을 정리합니다.

1. 문제 — 깊이를 모르는 전개

parent_child 테이블:
  parent | child
  A      | B
  A      | C
  B      | D
  D      | E      ← A 아래 4단계 깊이
 
질문: "A 의 모든 후손은?"  → {B, C, D, E}  (깊이를 미리 모름)

깊이가 고정이면 join 을 그만큼 반복하면 되지만, 실무 계층은 깊이가 가변적입니다. 그래서 "더 이상 새 후손이 안 나올 때까지 반복" 하는 구조가 필요합니다.

2. 왜 Spark 는 재귀가 어려운가

RDBMSSpark
재귀 CTEWITH RECURSIVE 네이티브제한적/버전 의존, 분산에 부적합
실행 모델단일 노드 반복분산 — 반복마다 셔플·계보 누적
함정거의 없음lineage 폭증, 셔플 반복, 순환 무한루프

Spark 의 분산 실행은 "반복"과 궁합이 나쁩니다. 매 반복이 변환을 쌓아 lineage(계보)가 길어지고, 셔플이 누적됩니다. 그래서 명시적 반복 + 체크포인트로 직접 제어하는 편이 안정적입니다.

3. 핵심 패턴 — 반복 조인(Iterative Join)

"현재까지 찾은 후손" 집합에 한 단계씩 자식을 더해가며, 새 후손이 안 나올 때까지 반복합니다.

from pyspark.sql import functions as F
 
edges = spark.table("parent_child")            # parent, child
spark.sparkContext.setCheckpointDir("/tmp/hier-ckpt")
 
def descendants(root):
    # 시작: root 의 직접 자식
    frontier = edges.where(F.col("parent") == root).select("child")
    result = frontier
    iteration = 0
 
    while True:
        # 현재 frontier 의 자식들을 한 단계 확장
        next_level = (frontier.alias("f")
            .join(edges.alias("e"), F.col("f.child") == F.col("e.parent"))
            .select(F.col("e.child").alias("child"))
            .distinct())
 
        # 이미 찾은 것 제외 (새로 발견된 후손만)
        new_nodes = next_level.join(result, "child", "left_anti")
 
        if new_nodes.isEmpty():     # 더 이상 새 후손 없음 → 종료
            break
 
        result = result.unionByName(new_nodes).distinct()
        frontier = new_nodes
        iteration += 1
 
        # 반복 lineage 절단 (필수)
        if iteration % 3 == 0:
            result = result.checkpoint()
 
    return result

핵심 요소:

  • frontier: 이번 단계에 새로 확장할 노드만(전체 재계산 방지).
  • left_anti join: 이미 찾은 노드를 빼서 중복·재방문 방지(= 순환 방어).
  • 종료 조건: 새 노드가 없으면 멈춤.
  • checkpoint: 반복 lineage 를 끊어 OOM·플래너 지연 방지(별도 글 "GraphFrames 엔티티 해소"와 같은 원리).

4. 경로(Path)와 깊이(Level) 누적

단순 후손 집합이 아니라 "A→B→D→E 경로"나 "각 노드의 깊이"가 필요할 때가 많습니다(예: 조직 레벨, BOM 전개 수량).

def expand_with_path(root):
    # 시작: 깊이 1, 경로 = [root, child]
    frontier = (edges.where(F.col("parent") == root)
        .select(
            F.col("child"),
            F.lit(1).alias("level"),
            F.array(F.lit(root), F.col("child")).alias("path")))
    result = frontier
 
    while True:
        nxt = (frontier.alias("f")
            .join(edges.alias("e"), F.col("f.child") == F.col("e.parent"))
            .select(
                F.col("e.child").alias("child"),
                (F.col("f.level") + 1).alias("level"),
                F.concat(F.col("f.path"), F.array(F.col("e.child"))).alias("path")))
 
        # 순환 방어: 경로 안에 이미 등장한 노드면 제외
        nxt = nxt.where(~F.array_contains(
            F.expr("slice(path, 1, size(path)-1)"), F.col("child")))
 
        new_nodes = nxt.join(result.select("child"), "child", "left_anti")
        if new_nodes.isEmpty():
            break
        result = result.unionByName(new_nodes)
        frontier = new_nodes
 
    return result   # child, level, path

path 배열에 경로를 누적하고, 경로 안에 이미 있는 노드가 다시 나오면 순환이므로 제외합니다. 이것이 무한 루프를 막는 핵심입니다.

5. 순환(Cycle) 탐지 — 무한 루프의 함정

계층 데이터의 가장 위험한 함정은 순환입니다. A→B→A 같은 잘못된 데이터가 있으면 반복이 영원히 끝나지 않습니다.

방어방법
방문 노드 제외left_anti 로 이미 찾은 노드 빼기
경로 기반 탐지경로 배열에 중복 노드 있으면 순환
최대 반복 제한안전장치로 반복 횟수 상한
MAX_DEPTH = 50      # 안전 상한 — 정상 계층이 이보다 깊을 리 없다면
if iteration > MAX_DEPTH:
    raise RuntimeError("순환 의심: 최대 깊이 초과")

실무 철칙: 소스 데이터에 순환이 없다고 가정하지 마세요. 방문 노드 제외 + 최대 깊이 상한을 항상 함께 두어, 잘못된 데이터가 잡을 무한 실행시키는 것을 막으세요.

6. BOM 수량 전개 (가중 누적)

BOM 은 단순 트리가 아니라 수량이 곱해집니다. "완제품 1개 = 모듈 2개, 모듈 1개 = 나사 4개 → 완제품 1개에 나사 8개". 반복 조인에 수량 곱을 누적합니다.

# edges: parent, child, qty
frontier = (edges.where(F.col("parent") == product)
    .select("child", F.col("qty").alias("total_qty")))
 
# 확장 시 수량을 곱해서 누적
nxt = (frontier.alias("f")
    .join(edges.alias("e"), F.col("f.child") == F.col("e.parent"))
    .select(
        F.col("e.child").alias("child"),
        (F.col("f.total_qty") * F.col("e.qty")).alias("total_qty")))
 
# 같은 부품이 여러 경로로 쓰이면 최종 합산
result.groupBy("child").agg(F.sum("total_qty").alias("required_qty"))

7. 대안 — GraphFrames

계층/재귀가 본질적으로 그래프이므로, GraphFrames 의 BFS 나 Connected Components 를 쓸 수도 있습니다.

from graphframes import GraphFrame
g = GraphFrame(vertices, edges)
 
# root 에서 도달 가능한 노드 탐색 (BFS)
paths = g.bfs(fromExpr=f"id = '{root}'", toExpr="id IS NOT NULL", maxPathLength=10)
방법적합
반복 조인수량 전개, 경로 누적 등 커스텀 로직
GraphFrames BFS단순 도달성·최단경로

복잡한 비즈니스 로직(수량·조건부)이 있으면 반복 조인이, 순수 그래프 탐색이면 GraphFrames 가 깔끔합니다.

8. 성능 주의

항목주의
lineage 폭증주기적 checkpoint 필수
셔플 반복매 반복이 조인(셔플) — edges 작으면 broadcast
frontier 비대넓은 트리는 단계마다 노드 급증
순환무한 루프 — 방문 제외 + 상한
작은 edgesbroadcast(edges) 로 셔플 제거

edges 테이블이 충분히 작으면 매 반복의 조인을 broadcast join 으로 만들어 셔플을 없앨 수 있습니다(별도 글 "PySpark 데이터 스큐 완전 정복"의 broadcast 참고).

9. 정리

요소핵심
전개반복 조인 — 새 노드 없을 때까지
frontier새 노드만 확장(전체 재계산 금지)
경로·깊이array 에 누적
순환 방어방문 제외 + 경로 중복 검사 + 최대 깊이
lineage주기적 checkpoint
BOM 수량곱 누적 후 합산

Spark 에서 계층 데이터는 "재귀 CTE 가 없으니 못 한다"가 아니라, 반복 조인으로 직접 제어하는 문제입니다. frontier 만 확장해 비용을 줄이고, checkpoint 로 lineage 를 끊고, 순환을 항상 방어하는 — 이 세 가지만 지키면 BOM 전개부터 조직도까지 안정적으로 풀 수 있습니다. 분산 환경의 반복은 단일 노드와 다르다는 점을 의식하는 것이 핵심입니다.


이 글은 Spark 3.5 기준으로 작성되었습니다. BOM 전개·조직 계층·카테고리 트리 같은 계층 데이터 파이프라인 설계가 필요하시면 언제든 문의해 주세요.

— Data Dynamics 엔지니어링 팀