PySpark 깊게 중첩된 반정형 데이터 다루기 — 5단계 JSON과 스키마 진화
5단계로 중첩된 JSON, 배열 속의 배열, 그리고 소스마다 다른 스키마. PySpark 로 복잡한 nested/semi-structured 데이터를 평탄화(flatten)하고, explode 로 펼치고, 스키마 진화와 깨진 레코드를 안전하게 처리하는 실전 패턴을 정리합니다.
API 응답, 이벤트 로그, IoT 페이로드는 깔끔한 테이블이 아닙니다. 5단계로 중첩된 JSON, 배열 안에 또 객체가 든 구조, 게다가 소스 버전마다 필드가 들쭉날쭉합니다. 이런 반정형(semi-structured) 데이터를 분석 가능한 형태로 길들이는 것은 데이터 엔지니어링의 일상적이지만 까다로운 과제입니다.
이 글은 PySpark 로 깊게 중첩된 데이터를 다루는 핵심 도구 — struct 접근, explode, from_json, 평탄화, 스키마 진화 — 를 실전 패턴으로 정리합니다.
1. 중첩 데이터의 구조 이해
Spark 의 복합 타입은 세 가지입니다. 이걸 구분해야 올바른 도구를 고릅니다.
| 타입 | 의미 | 접근/펼치기 |
|---|---|---|
StructType | 객체(필드 묶음) | col.field (점 표기) |
ArrayType | 배열(리스트) | explode 로 행으로 |
MapType | 키-값 맵 | explode 또는 col[key] |
df.printSchema()
# root
# |-- user: struct
# | |-- id: long
# | |-- profile: struct
# | | |-- name: string
# | | |-- tags: array<string>
# |-- events: array<struct<type:string, ts:long>>2. Struct 접근 — 점 표기와 평탄화
중첩 struct 필드는 점(.)으로 접근합니다.
from pyspark.sql import functions as F
df.select(
F.col("user.id").alias("user_id"),
F.col("user.profile.name").alias("name"),
)struct 전체 평탄화(flatten)
깊은 struct 를 통째로 평탄화하려면 필드를 재귀적으로 펼칩니다.
def flatten(df):
"""모든 struct 필드를 최상위 컬럼으로 평탄화"""
flat_cols = []
nested = []
for field in df.schema.fields:
if isinstance(field.dataType, T.StructType):
nested.append(field.name)
else:
flat_cols.append(F.col(field.name))
for n in nested:
for child in df.schema[n].dataType.fields:
flat_cols.append(F.col(f"{n}.{child.name}").alias(f"{n}_{child.name}"))
flat = df.select(flat_cols)
# 아직 struct 가 남아있으면 재귀
if any(isinstance(f.dataType, T.StructType) for f in flat.schema.fields):
return flatten(flat)
return flat평탄화는 분석/BI 친화적이지만, 깊은 구조를 다 펼치면 컬럼이 수백 개가 될 수 있습니다. 필요한 필드만 select 하는 것이 보통 더 낫습니다 — 전체 평탄화는 탐색 단계에만.
3. Explode — 배열을 행으로
배열은 explode 로 행으로 펼칩니다. 한 행의 N개 원소가 N개 행이 됩니다.
# events 배열을 행으로
exploded = df.select(
"user.id",
F.explode("events").alias("event")) # 배열 → 행
# 펼친 뒤 struct 필드 접근
result = exploded.select(
"id",
F.col("event.type").alias("event_type"),
F.col("event.ts").alias("ts"))| 함수 | 차이 |
|---|---|
explode | 빈 배열/NULL 이면 행이 사라짐 |
explode_outer | 빈 배열/NULL 도 NULL 행으로 보존 |
posexplode | 인덱스(위치)도 함께 |
# 빈 배열도 보존해야 하면 (left join 같은 효과)
F.explode_outer("events")
# 순번이 필요하면
df.select("id", F.posexplode("events").alias("pos", "event"))흔한 버그:
explode를 쓰면 빈 배열·NULL 행이 조용히 사라집니다. 원본 행 수를 보존해야 한다면explode_outer를 쓰세요.
배열 속의 배열 (중첩 explode)
# orders[].items[] — 두 번 explode
df.select("order_id", F.explode("orders").alias("order")) \
.select("order_id", F.explode("order.items").alias("item")) \
.select("order_id", "item.sku", "item.qty")4. JSON 문자열 파싱 — from_json
데이터가 JSON 문자열로 들어오는 경우(Kafka value, 로그 컬럼)가 많습니다. from_json 으로 구조화합니다.
from pyspark.sql import types as T
# 스키마 명시 (권장 — 안정적, 빠름)
schema = T.StructType([
T.StructField("id", T.LongType()),
T.StructField("tags", T.ArrayType(T.StringType())),
T.StructField("meta", T.StructType([
T.StructField("lang", T.StringType()),
])),
])
parsed = df.withColumn("data", F.from_json("json_str", schema))
result = parsed.select("data.id", "data.tags", "data.meta.lang")| 함수 | 용도 |
|---|---|
from_json | JSON 문자열 → struct (스키마 지정) |
to_json | struct → JSON 문자열 |
get_json_object | 경로로 단일 값 추출(스키마 불필요) |
json_tuple | 여러 최상위 키 한 번에 |
schema_of_json | 샘플로 스키마 추론 |
스키마를 명시하지 않고 추론(
schema_of_json)에 의존하면 느리고 불안정합니다. 가능하면 스키마를 명시하세요. 미지의 필드가 많은 탐색 단계에서만 추론을 쓰고, 프로덕션은 고정 스키마로.
5. 깨진 레코드 처리 — 안전하게
실전 JSON 에는 깨진 레코드가 섞입니다. 무방비로 읽으면 잡 전체가 실패하거나, 한 행 때문에 파티션이 NULL 로 오염됩니다.
# 읽기 모드: PERMISSIVE(기본), DROPMALFORMED, FAILFAST
df = (spark.read
.option("mode", "PERMISSIVE") # 깨진 건 _corrupt_record 로
.option("columnNameOfCorruptRecord", "_corrupt")
.schema(schema)
.json("path"))
# 깨진 레코드 분리 (버리지 말고 격리 → 나중에 분석)
good = df.where("_corrupt IS NULL")
bad = df.where("_corrupt IS NOT NULL")
bad.write.mode("append").parquet("quarantine/")| 모드 | 동작 |
|---|---|
PERMISSIVE | 깨진 건 corrupt 컬럼에, 나머지 NULL |
DROPMALFORMED | 깨진 행 버림 |
FAILFAST | 깨진 행 만나면 즉시 실패 |
권장: 깨진 레코드를 버리지 말고 격리(quarantine) 하세요. 조용히 버리면 데이터 유실을 눈치채지 못합니다. 격리 테이블에 쌓아두면 소스 문제를 추적할 수 있습니다.
6. 스키마 진화(Schema Evolution)
소스 버전마다 필드가 추가/삭제됩니다. 여러 파일의 스키마가 다를 때:
# 읽기 시 스키마 병합 (Parquet)
df = spark.read.option("mergeSchema", "true").parquet("path")
# 여러 DataFrame union 시 누락 컬럼 허용
combined = df_v1.unionByName(df_v2, allowMissingColumns=True)Lakehouse 포맷은 스키마 진화를 테이블 차원에서 지원합니다.
# Iceberg/Delta: 쓰기 시 새 컬럼 자동 병합
(df.writeTo("analytics.events")
.option("mergeSchema", "true")
.append())- 새 컬럼 추가는 안전합니다(기존 데이터는 NULL).
- 타입 변경·컬럼 삭제는 위험하니 정책을 정하세요. (Iceberg 스키마 진화 규칙은 별도 글 "Apache Iceberg 사양 버전 비교"에서 다룹니다.)
7. 성능 주의
| 항목 | 주의 |
|---|---|
| explode 폭발 | 큰 배열 explode 는 행 수 급증 → 메모리·셔플 |
| 불필요 평탄화 | 필요 필드만 select(프루닝 효과) |
| 스키마 추론 | 전체 스캔 비용 — 프로덕션은 명시 |
| 중첩 explode | 곱연산으로 행 폭발(orders×items) |
| corrupt 무시 | 조용한 유실 — 반드시 격리 |
explode 는 행을 늘리므로, 필터를 explode 전에 적용해 펼칠 데이터를 먼저 줄이는 것이 좋습니다.
8. 정리
| 작업 | 도구 |
|---|---|
| struct 접근 | 점 표기 col.field |
| 배열 → 행 | explode / explode_outer / posexplode |
| JSON 문자열 | from_json(스키마 명시) / get_json_object |
| 깨진 레코드 | 읽기 모드 + 격리(quarantine) |
| 스키마 진화 | mergeSchema, unionByName(allowMissingColumns) |
복잡한 중첩 데이터의 핵심은 "구조를 먼저 이해하고(printSchema), 필요한 만큼만 펼치는 것"입니다. struct 는 점으로, 배열은 explode 로, JSON 문자열은 명시 스키마 from_json 으로 다루되, 빈 배열 유실(explode vs explode_outer) 과 깨진 레코드 격리라는 두 함정만 조심하면 됩니다. 무작정 전체 평탄화하기보다, 분석에 필요한 경로만 골라 펼치는 절제가 성능과 가독성 모두를 지킵니다.
이 글은 Spark 3.5 기준으로 작성되었습니다. 반정형 데이터 수집·정규화 파이프라인이나 스키마 진화 전략 설계가 필요하시면 언제든 문의해 주세요.
— Data Dynamics 엔지니어링 팀