Blog
pysparksparkjsonnested-dataschema-evolutiondata-engineering

PySpark 깊게 중첩된 반정형 데이터 다루기 — 5단계 JSON과 스키마 진화

5단계로 중첩된 JSON, 배열 속의 배열, 그리고 소스마다 다른 스키마. PySpark 로 복잡한 nested/semi-structured 데이터를 평탄화(flatten)하고, explode 로 펼치고, 스키마 진화와 깨진 레코드를 안전하게 처리하는 실전 패턴을 정리합니다.

Data Dynamics2026年6月5日9 min read
This post is not yet translated. The original Korean version is shown below.

API 응답, 이벤트 로그, IoT 페이로드는 깔끔한 테이블이 아닙니다. 5단계로 중첩된 JSON, 배열 안에 또 객체가 든 구조, 게다가 소스 버전마다 필드가 들쭉날쭉합니다. 이런 반정형(semi-structured) 데이터를 분석 가능한 형태로 길들이는 것은 데이터 엔지니어링의 일상적이지만 까다로운 과제입니다.

이 글은 PySpark 로 깊게 중첩된 데이터를 다루는 핵심 도구 — struct 접근, explode, from_json, 평탄화, 스키마 진화 — 를 실전 패턴으로 정리합니다.

1. 중첩 데이터의 구조 이해

Spark 의 복합 타입은 세 가지입니다. 이걸 구분해야 올바른 도구를 고릅니다.

타입의미접근/펼치기
StructType객체(필드 묶음)col.field (점 표기)
ArrayType배열(리스트)explode 로 행으로
MapType키-값 맵explode 또는 col[key]
df.printSchema()
# root
#  |-- user: struct
#  |    |-- id: long
#  |    |-- profile: struct
#  |    |    |-- name: string
#  |    |    |-- tags: array<string>
#  |-- events: array<struct<type:string, ts:long>>

2. Struct 접근 — 점 표기와 평탄화

중첩 struct 필드는 점(.)으로 접근합니다.

from pyspark.sql import functions as F
 
df.select(
    F.col("user.id").alias("user_id"),
    F.col("user.profile.name").alias("name"),
)

struct 전체 평탄화(flatten)

깊은 struct 를 통째로 평탄화하려면 필드를 재귀적으로 펼칩니다.

def flatten(df):
    """모든 struct 필드를 최상위 컬럼으로 평탄화"""
    flat_cols = []
    nested = []
    for field in df.schema.fields:
        if isinstance(field.dataType, T.StructType):
            nested.append(field.name)
        else:
            flat_cols.append(F.col(field.name))
    for n in nested:
        for child in df.schema[n].dataType.fields:
            flat_cols.append(F.col(f"{n}.{child.name}").alias(f"{n}_{child.name}"))
    flat = df.select(flat_cols)
    # 아직 struct 가 남아있으면 재귀
    if any(isinstance(f.dataType, T.StructType) for f in flat.schema.fields):
        return flatten(flat)
    return flat

평탄화는 분석/BI 친화적이지만, 깊은 구조를 다 펼치면 컬럼이 수백 개가 될 수 있습니다. 필요한 필드만 select 하는 것이 보통 더 낫습니다 — 전체 평탄화는 탐색 단계에만.

3. Explode — 배열을 행으로

배열은 explode 로 행으로 펼칩니다. 한 행의 N개 원소가 N개 행이 됩니다.

# events 배열을 행으로
exploded = df.select(
    "user.id",
    F.explode("events").alias("event"))  # 배열 → 행
 
# 펼친 뒤 struct 필드 접근
result = exploded.select(
    "id",
    F.col("event.type").alias("event_type"),
    F.col("event.ts").alias("ts"))
함수차이
explode빈 배열/NULL 이면 행이 사라짐
explode_outer빈 배열/NULL 도 NULL 행으로 보존
posexplode인덱스(위치)도 함께
# 빈 배열도 보존해야 하면 (left join 같은 효과)
F.explode_outer("events")
 
# 순번이 필요하면
df.select("id", F.posexplode("events").alias("pos", "event"))

흔한 버그: explode 를 쓰면 빈 배열·NULL 행이 조용히 사라집니다. 원본 행 수를 보존해야 한다면 explode_outer 를 쓰세요.

배열 속의 배열 (중첩 explode)

# orders[].items[] — 두 번 explode
df.select("order_id", F.explode("orders").alias("order")) \
  .select("order_id", F.explode("order.items").alias("item")) \
  .select("order_id", "item.sku", "item.qty")

4. JSON 문자열 파싱 — from_json

데이터가 JSON 문자열로 들어오는 경우(Kafka value, 로그 컬럼)가 많습니다. from_json 으로 구조화합니다.

from pyspark.sql import types as T
 
# 스키마 명시 (권장 — 안정적, 빠름)
schema = T.StructType([
    T.StructField("id", T.LongType()),
    T.StructField("tags", T.ArrayType(T.StringType())),
    T.StructField("meta", T.StructType([
        T.StructField("lang", T.StringType()),
    ])),
])
 
parsed = df.withColumn("data", F.from_json("json_str", schema))
result = parsed.select("data.id", "data.tags", "data.meta.lang")
함수용도
from_jsonJSON 문자열 → struct (스키마 지정)
to_jsonstruct → JSON 문자열
get_json_object경로로 단일 값 추출(스키마 불필요)
json_tuple여러 최상위 키 한 번에
schema_of_json샘플로 스키마 추론

스키마를 명시하지 않고 추론(schema_of_json)에 의존하면 느리고 불안정합니다. 가능하면 스키마를 명시하세요. 미지의 필드가 많은 탐색 단계에서만 추론을 쓰고, 프로덕션은 고정 스키마로.

5. 깨진 레코드 처리 — 안전하게

실전 JSON 에는 깨진 레코드가 섞입니다. 무방비로 읽으면 잡 전체가 실패하거나, 한 행 때문에 파티션이 NULL 로 오염됩니다.

# 읽기 모드: PERMISSIVE(기본), DROPMALFORMED, FAILFAST
df = (spark.read
    .option("mode", "PERMISSIVE")                       # 깨진 건 _corrupt_record 로
    .option("columnNameOfCorruptRecord", "_corrupt")
    .schema(schema)
    .json("path"))
 
# 깨진 레코드 분리 (버리지 말고 격리 → 나중에 분석)
good = df.where("_corrupt IS NULL")
bad  = df.where("_corrupt IS NOT NULL")
bad.write.mode("append").parquet("quarantine/")
모드동작
PERMISSIVE깨진 건 corrupt 컬럼에, 나머지 NULL
DROPMALFORMED깨진 행 버림
FAILFAST깨진 행 만나면 즉시 실패

권장: 깨진 레코드를 버리지 말고 격리(quarantine) 하세요. 조용히 버리면 데이터 유실을 눈치채지 못합니다. 격리 테이블에 쌓아두면 소스 문제를 추적할 수 있습니다.

6. 스키마 진화(Schema Evolution)

소스 버전마다 필드가 추가/삭제됩니다. 여러 파일의 스키마가 다를 때:

# 읽기 시 스키마 병합 (Parquet)
df = spark.read.option("mergeSchema", "true").parquet("path")
 
# 여러 DataFrame union 시 누락 컬럼 허용
combined = df_v1.unionByName(df_v2, allowMissingColumns=True)

Lakehouse 포맷은 스키마 진화를 테이블 차원에서 지원합니다.

# Iceberg/Delta: 쓰기 시 새 컬럼 자동 병합
(df.writeTo("analytics.events")
   .option("mergeSchema", "true")
   .append())
  • 새 컬럼 추가는 안전합니다(기존 데이터는 NULL).
  • 타입 변경·컬럼 삭제는 위험하니 정책을 정하세요. (Iceberg 스키마 진화 규칙은 별도 글 "Apache Iceberg 사양 버전 비교"에서 다룹니다.)

7. 성능 주의

항목주의
explode 폭발큰 배열 explode 는 행 수 급증 → 메모리·셔플
불필요 평탄화필요 필드만 select(프루닝 효과)
스키마 추론전체 스캔 비용 — 프로덕션은 명시
중첩 explode곱연산으로 행 폭발(orders×items)
corrupt 무시조용한 유실 — 반드시 격리

explode 는 행을 늘리므로, 필터를 explode 전에 적용해 펼칠 데이터를 먼저 줄이는 것이 좋습니다.

8. 정리

작업도구
struct 접근점 표기 col.field
배열 → 행explode / explode_outer / posexplode
JSON 문자열from_json(스키마 명시) / get_json_object
깨진 레코드읽기 모드 + 격리(quarantine)
스키마 진화mergeSchema, unionByName(allowMissingColumns)

복잡한 중첩 데이터의 핵심은 "구조를 먼저 이해하고(printSchema), 필요한 만큼만 펼치는 것"입니다. struct 는 점으로, 배열은 explode 로, JSON 문자열은 명시 스키마 from_json 으로 다루되, 빈 배열 유실(explode vs explode_outer)깨진 레코드 격리라는 두 함정만 조심하면 됩니다. 무작정 전체 평탄화하기보다, 분석에 필요한 경로만 골라 펼치는 절제가 성능과 가독성 모두를 지킵니다.


이 글은 Spark 3.5 기준으로 작성되었습니다. 반정형 데이터 수집·정규화 파이프라인이나 스키마 진화 전략 설계가 필요하시면 언제든 문의해 주세요.

— Data Dynamics 엔지니어링 팀