Blog
pysparksparkjsonnested-dataschema-evolutiondata-engineering

PySpark 깊게 중첩된 반정형 데이터 다루기 — 5단계 JSON과 스키마 진화

5단계로 중첩된 JSON, 배열 속의 배열, 그리고 소스마다 다른 스키마. PySpark 로 복잡한 nested/semi-structured 데이터를 평탄화(flatten)하고, explode 로 펼치고, 스키마 진화와 깨진 레코드를 안전하게 처리하는 실전 패턴을 정리합니다.

Data Dynamics2026년 6월 5일9 min read

API 응답, 이벤트 로그, IoT 페이로드는 깔끔한 테이블이 아닙니다. 5단계로 중첩된 JSON, 배열 안에 또 객체가 든 구조, 게다가 소스 버전마다 필드가 들쭉날쭉합니다. 이런 반정형(semi-structured) 데이터를 분석 가능한 형태로 길들이는 것은 데이터 엔지니어링의 일상적이지만 까다로운 과제입니다.

이 글은 PySpark 로 깊게 중첩된 데이터를 다루는 핵심 도구 — struct 접근, explode, from_json, 평탄화, 스키마 진화 — 를 실전 패턴으로 정리합니다.

1. 중첩 데이터의 구조 이해

Spark 의 복합 타입은 세 가지입니다. 이걸 구분해야 올바른 도구를 고릅니다.

타입의미접근/펼치기
StructType객체(필드 묶음)col.field (점 표기)
ArrayType배열(리스트)explode 로 행으로
MapType키-값 맵explode 또는 col[key]
df.printSchema()
# root
#  |-- user: struct
#  |    |-- id: long
#  |    |-- profile: struct
#  |    |    |-- name: string
#  |    |    |-- tags: array<string>
#  |-- events: array<struct<type:string, ts:long>>

2. Struct 접근 — 점 표기와 평탄화

중첩 struct 필드는 점(.)으로 접근합니다.

from pyspark.sql import functions as F
 
df.select(
    F.col("user.id").alias("user_id"),
    F.col("user.profile.name").alias("name"),
)

struct 전체 평탄화(flatten)

깊은 struct 를 통째로 평탄화하려면 필드를 재귀적으로 펼칩니다.

def flatten(df):
    """모든 struct 필드를 최상위 컬럼으로 평탄화"""
    flat_cols = []
    nested = []
    for field in df.schema.fields:
        if isinstance(field.dataType, T.StructType):
            nested.append(field.name)
        else:
            flat_cols.append(F.col(field.name))
    for n in nested:
        for child in df.schema[n].dataType.fields:
            flat_cols.append(F.col(f"{n}.{child.name}").alias(f"{n}_{child.name}"))
    flat = df.select(flat_cols)
    # 아직 struct 가 남아있으면 재귀
    if any(isinstance(f.dataType, T.StructType) for f in flat.schema.fields):
        return flatten(flat)
    return flat

평탄화는 분석/BI 친화적이지만, 깊은 구조를 다 펼치면 컬럼이 수백 개가 될 수 있습니다. 필요한 필드만 select 하는 것이 보통 더 낫습니다 — 전체 평탄화는 탐색 단계에만.

3. Explode — 배열을 행으로

배열은 explode 로 행으로 펼칩니다. 한 행의 N개 원소가 N개 행이 됩니다.

# events 배열을 행으로
exploded = df.select(
    "user.id",
    F.explode("events").alias("event"))  # 배열 → 행
 
# 펼친 뒤 struct 필드 접근
result = exploded.select(
    "id",
    F.col("event.type").alias("event_type"),
    F.col("event.ts").alias("ts"))
함수차이
explode빈 배열/NULL 이면 행이 사라짐
explode_outer빈 배열/NULL 도 NULL 행으로 보존
posexplode인덱스(위치)도 함께
# 빈 배열도 보존해야 하면 (left join 같은 효과)
F.explode_outer("events")
 
# 순번이 필요하면
df.select("id", F.posexplode("events").alias("pos", "event"))

흔한 버그: explode 를 쓰면 빈 배열·NULL 행이 조용히 사라집니다. 원본 행 수를 보존해야 한다면 explode_outer 를 쓰세요.

배열 속의 배열 (중첩 explode)

# orders[].items[] — 두 번 explode
df.select("order_id", F.explode("orders").alias("order")) \
  .select("order_id", F.explode("order.items").alias("item")) \
  .select("order_id", "item.sku", "item.qty")

4. JSON 문자열 파싱 — from_json

데이터가 JSON 문자열로 들어오는 경우(Kafka value, 로그 컬럼)가 많습니다. from_json 으로 구조화합니다.

from pyspark.sql import types as T
 
# 스키마 명시 (권장 — 안정적, 빠름)
schema = T.StructType([
    T.StructField("id", T.LongType()),
    T.StructField("tags", T.ArrayType(T.StringType())),
    T.StructField("meta", T.StructType([
        T.StructField("lang", T.StringType()),
    ])),
])
 
parsed = df.withColumn("data", F.from_json("json_str", schema))
result = parsed.select("data.id", "data.tags", "data.meta.lang")
함수용도
from_jsonJSON 문자열 → struct (스키마 지정)
to_jsonstruct → JSON 문자열
get_json_object경로로 단일 값 추출(스키마 불필요)
json_tuple여러 최상위 키 한 번에
schema_of_json샘플로 스키마 추론

스키마를 명시하지 않고 추론(schema_of_json)에 의존하면 느리고 불안정합니다. 가능하면 스키마를 명시하세요. 미지의 필드가 많은 탐색 단계에서만 추론을 쓰고, 프로덕션은 고정 스키마로.

5. 깨진 레코드 처리 — 안전하게

실전 JSON 에는 깨진 레코드가 섞입니다. 무방비로 읽으면 잡 전체가 실패하거나, 한 행 때문에 파티션이 NULL 로 오염됩니다.

# 읽기 모드: PERMISSIVE(기본), DROPMALFORMED, FAILFAST
df = (spark.read
    .option("mode", "PERMISSIVE")                       # 깨진 건 _corrupt_record 로
    .option("columnNameOfCorruptRecord", "_corrupt")
    .schema(schema)
    .json("path"))
 
# 깨진 레코드 분리 (버리지 말고 격리 → 나중에 분석)
good = df.where("_corrupt IS NULL")
bad  = df.where("_corrupt IS NOT NULL")
bad.write.mode("append").parquet("quarantine/")
모드동작
PERMISSIVE깨진 건 corrupt 컬럼에, 나머지 NULL
DROPMALFORMED깨진 행 버림
FAILFAST깨진 행 만나면 즉시 실패

권장: 깨진 레코드를 버리지 말고 격리(quarantine) 하세요. 조용히 버리면 데이터 유실을 눈치채지 못합니다. 격리 테이블에 쌓아두면 소스 문제를 추적할 수 있습니다.

6. 스키마 진화(Schema Evolution)

소스 버전마다 필드가 추가/삭제됩니다. 여러 파일의 스키마가 다를 때:

# 읽기 시 스키마 병합 (Parquet)
df = spark.read.option("mergeSchema", "true").parquet("path")
 
# 여러 DataFrame union 시 누락 컬럼 허용
combined = df_v1.unionByName(df_v2, allowMissingColumns=True)

Lakehouse 포맷은 스키마 진화를 테이블 차원에서 지원합니다.

# Iceberg/Delta: 쓰기 시 새 컬럼 자동 병합
(df.writeTo("analytics.events")
   .option("mergeSchema", "true")
   .append())
  • 새 컬럼 추가는 안전합니다(기존 데이터는 NULL).
  • 타입 변경·컬럼 삭제는 위험하니 정책을 정하세요. (Iceberg 스키마 진화 규칙은 별도 글 "Apache Iceberg 사양 버전 비교"에서 다룹니다.)

7. 성능 주의

항목주의
explode 폭발큰 배열 explode 는 행 수 급증 → 메모리·셔플
불필요 평탄화필요 필드만 select(프루닝 효과)
스키마 추론전체 스캔 비용 — 프로덕션은 명시
중첩 explode곱연산으로 행 폭발(orders×items)
corrupt 무시조용한 유실 — 반드시 격리

explode 는 행을 늘리므로, 필터를 explode 전에 적용해 펼칠 데이터를 먼저 줄이는 것이 좋습니다.

8. 정리

작업도구
struct 접근점 표기 col.field
배열 → 행explode / explode_outer / posexplode
JSON 문자열from_json(스키마 명시) / get_json_object
깨진 레코드읽기 모드 + 격리(quarantine)
스키마 진화mergeSchema, unionByName(allowMissingColumns)

복잡한 중첩 데이터의 핵심은 "구조를 먼저 이해하고(printSchema), 필요한 만큼만 펼치는 것"입니다. struct 는 점으로, 배열은 explode 로, JSON 문자열은 명시 스키마 from_json 으로 다루되, 빈 배열 유실(explode vs explode_outer)깨진 레코드 격리라는 두 함정만 조심하면 됩니다. 무작정 전체 평탄화하기보다, 분석에 필요한 경로만 골라 펼치는 절제가 성능과 가독성 모두를 지킵니다.


이 글은 Spark 3.5 기준으로 작성되었습니다. 반정형 데이터 수집·정규화 파이프라인이나 스키마 진화 전략 설계가 필요하시면 언제든 문의해 주세요.

— Data Dynamics 엔지니어링 팀