Blog
pysparksparkhierarchical-datarecursiongraphdata-engineering

PySpark 재귀·계층 데이터 — BOM·조직도를 재귀 CTE 없이 풀기

부모-자식 관계로 무한 깊이를 갖는 BOM, 조직도, 카테고리 트리를 PySpark 로 처리하는 법. Spark 에 재귀 CTE가 약한 이유와, 반복 조인(iterative join)으로 전개·경로 누적·순환 탐지를 구현하는 실전 패턴을 정리합니다.

Data Dynamics2026년 6월 5일10 min read

BOM(자재 명세서), 조직도, 카테고리 트리, 댓글 스레드 — 모두 부모-자식 관계로 무한 깊이를 갖는 계층 데이터입니다. "이 제품을 만드는 데 필요한 모든 하위 부품을 펼쳐라", "이 팀장 아래 모든 직원을 구하라" 같은 요구는 본질적으로 재귀입니다.

전통적 RDBMS 는 WITH RECURSIVE(재귀 CTE)로 이를 풀지만, Spark SQL 은 오랫동안 재귀 CTE 를 제대로 지원하지 않았습니다. 이 글은 Spark 에서 계층 데이터를 반복 조인(iterative join) 으로 전개하는 패턴과, 경로 누적·순환 탐지 같은 실전 디테일을 정리합니다.

1. 문제 — 깊이를 모르는 전개

parent_child 테이블:
  parent | child
  A      | B
  A      | C
  B      | D
  D      | E      ← A 아래 4단계 깊이
 
질문: "A 의 모든 후손은?"  → {B, C, D, E}  (깊이를 미리 모름)

깊이가 고정이면 join 을 그만큼 반복하면 되지만, 실무 계층은 깊이가 가변적입니다. 그래서 "더 이상 새 후손이 안 나올 때까지 반복" 하는 구조가 필요합니다.

2. 왜 Spark 는 재귀가 어려운가

RDBMSSpark
재귀 CTEWITH RECURSIVE 네이티브제한적/버전 의존, 분산에 부적합
실행 모델단일 노드 반복분산 — 반복마다 셔플·계보 누적
함정거의 없음lineage 폭증, 셔플 반복, 순환 무한루프

Spark 의 분산 실행은 "반복"과 궁합이 나쁩니다. 매 반복이 변환을 쌓아 lineage(계보)가 길어지고, 셔플이 누적됩니다. 그래서 명시적 반복 + 체크포인트로 직접 제어하는 편이 안정적입니다.

3. 핵심 패턴 — 반복 조인(Iterative Join)

"현재까지 찾은 후손" 집합에 한 단계씩 자식을 더해가며, 새 후손이 안 나올 때까지 반복합니다.

from pyspark.sql import functions as F
 
edges = spark.table("parent_child")            # parent, child
spark.sparkContext.setCheckpointDir("/tmp/hier-ckpt")
 
def descendants(root):
    # 시작: root 의 직접 자식
    frontier = edges.where(F.col("parent") == root).select("child")
    result = frontier
    iteration = 0
 
    while True:
        # 현재 frontier 의 자식들을 한 단계 확장
        next_level = (frontier.alias("f")
            .join(edges.alias("e"), F.col("f.child") == F.col("e.parent"))
            .select(F.col("e.child").alias("child"))
            .distinct())
 
        # 이미 찾은 것 제외 (새로 발견된 후손만)
        new_nodes = next_level.join(result, "child", "left_anti")
 
        if new_nodes.isEmpty():     # 더 이상 새 후손 없음 → 종료
            break
 
        result = result.unionByName(new_nodes).distinct()
        frontier = new_nodes
        iteration += 1
 
        # 반복 lineage 절단 (필수)
        if iteration % 3 == 0:
            result = result.checkpoint()
 
    return result

핵심 요소:

  • frontier: 이번 단계에 새로 확장할 노드만(전체 재계산 방지).
  • left_anti join: 이미 찾은 노드를 빼서 중복·재방문 방지(= 순환 방어).
  • 종료 조건: 새 노드가 없으면 멈춤.
  • checkpoint: 반복 lineage 를 끊어 OOM·플래너 지연 방지(별도 글 "GraphFrames 엔티티 해소"와 같은 원리).

4. 경로(Path)와 깊이(Level) 누적

단순 후손 집합이 아니라 "A→B→D→E 경로"나 "각 노드의 깊이"가 필요할 때가 많습니다(예: 조직 레벨, BOM 전개 수량).

def expand_with_path(root):
    # 시작: 깊이 1, 경로 = [root, child]
    frontier = (edges.where(F.col("parent") == root)
        .select(
            F.col("child"),
            F.lit(1).alias("level"),
            F.array(F.lit(root), F.col("child")).alias("path")))
    result = frontier
 
    while True:
        nxt = (frontier.alias("f")
            .join(edges.alias("e"), F.col("f.child") == F.col("e.parent"))
            .select(
                F.col("e.child").alias("child"),
                (F.col("f.level") + 1).alias("level"),
                F.concat(F.col("f.path"), F.array(F.col("e.child"))).alias("path")))
 
        # 순환 방어: 경로 안에 이미 등장한 노드면 제외
        nxt = nxt.where(~F.array_contains(
            F.expr("slice(path, 1, size(path)-1)"), F.col("child")))
 
        new_nodes = nxt.join(result.select("child"), "child", "left_anti")
        if new_nodes.isEmpty():
            break
        result = result.unionByName(new_nodes)
        frontier = new_nodes
 
    return result   # child, level, path

path 배열에 경로를 누적하고, 경로 안에 이미 있는 노드가 다시 나오면 순환이므로 제외합니다. 이것이 무한 루프를 막는 핵심입니다.

5. 순환(Cycle) 탐지 — 무한 루프의 함정

계층 데이터의 가장 위험한 함정은 순환입니다. A→B→A 같은 잘못된 데이터가 있으면 반복이 영원히 끝나지 않습니다.

방어방법
방문 노드 제외left_anti 로 이미 찾은 노드 빼기
경로 기반 탐지경로 배열에 중복 노드 있으면 순환
최대 반복 제한안전장치로 반복 횟수 상한
MAX_DEPTH = 50      # 안전 상한 — 정상 계층이 이보다 깊을 리 없다면
if iteration > MAX_DEPTH:
    raise RuntimeError("순환 의심: 최대 깊이 초과")

실무 철칙: 소스 데이터에 순환이 없다고 가정하지 마세요. 방문 노드 제외 + 최대 깊이 상한을 항상 함께 두어, 잘못된 데이터가 잡을 무한 실행시키는 것을 막으세요.

6. BOM 수량 전개 (가중 누적)

BOM 은 단순 트리가 아니라 수량이 곱해집니다. "완제품 1개 = 모듈 2개, 모듈 1개 = 나사 4개 → 완제품 1개에 나사 8개". 반복 조인에 수량 곱을 누적합니다.

# edges: parent, child, qty
frontier = (edges.where(F.col("parent") == product)
    .select("child", F.col("qty").alias("total_qty")))
 
# 확장 시 수량을 곱해서 누적
nxt = (frontier.alias("f")
    .join(edges.alias("e"), F.col("f.child") == F.col("e.parent"))
    .select(
        F.col("e.child").alias("child"),
        (F.col("f.total_qty") * F.col("e.qty")).alias("total_qty")))
 
# 같은 부품이 여러 경로로 쓰이면 최종 합산
result.groupBy("child").agg(F.sum("total_qty").alias("required_qty"))

7. 대안 — GraphFrames

계층/재귀가 본질적으로 그래프이므로, GraphFrames 의 BFS 나 Connected Components 를 쓸 수도 있습니다.

from graphframes import GraphFrame
g = GraphFrame(vertices, edges)
 
# root 에서 도달 가능한 노드 탐색 (BFS)
paths = g.bfs(fromExpr=f"id = '{root}'", toExpr="id IS NOT NULL", maxPathLength=10)
방법적합
반복 조인수량 전개, 경로 누적 등 커스텀 로직
GraphFrames BFS단순 도달성·최단경로

복잡한 비즈니스 로직(수량·조건부)이 있으면 반복 조인이, 순수 그래프 탐색이면 GraphFrames 가 깔끔합니다.

8. 성능 주의

항목주의
lineage 폭증주기적 checkpoint 필수
셔플 반복매 반복이 조인(셔플) — edges 작으면 broadcast
frontier 비대넓은 트리는 단계마다 노드 급증
순환무한 루프 — 방문 제외 + 상한
작은 edgesbroadcast(edges) 로 셔플 제거

edges 테이블이 충분히 작으면 매 반복의 조인을 broadcast join 으로 만들어 셔플을 없앨 수 있습니다(별도 글 "PySpark 데이터 스큐 완전 정복"의 broadcast 참고).

9. 정리

요소핵심
전개반복 조인 — 새 노드 없을 때까지
frontier새 노드만 확장(전체 재계산 금지)
경로·깊이array 에 누적
순환 방어방문 제외 + 경로 중복 검사 + 최대 깊이
lineage주기적 checkpoint
BOM 수량곱 누적 후 합산

Spark 에서 계층 데이터는 "재귀 CTE 가 없으니 못 한다"가 아니라, 반복 조인으로 직접 제어하는 문제입니다. frontier 만 확장해 비용을 줄이고, checkpoint 로 lineage 를 끊고, 순환을 항상 방어하는 — 이 세 가지만 지키면 BOM 전개부터 조직도까지 안정적으로 풀 수 있습니다. 분산 환경의 반복은 단일 노드와 다르다는 점을 의식하는 것이 핵심입니다.


이 글은 Spark 3.5 기준으로 작성되었습니다. BOM 전개·조직 계층·카테고리 트리 같은 계층 데이터 파이프라인 설계가 필요하시면 언제든 문의해 주세요.

— Data Dynamics 엔지니어링 팀