Taming Deeply Nested Semi-Structured Data in PySpark — 5-Level JSON and Schema Evolution
Five levels of nested JSON, arrays inside arrays, and a different schema from every source. This post walks through battle-tested PySpark patterns for flattening complex nested/semi-structured data, unrolling it with explode, and safely handling schema evolution and corrupt records.
API responses, event logs, and IoT payloads don't arrive as tidy tables. You get JSON nested five levels deep, objects buried inside arrays, and fields that come and go from one source version to the next. Taming this semi-structured data into an analyzable form is one of the most routine — yet trickiest — tasks in data engineering.
This post distills the core PySpark tools for working with deeply nested data — struct access, explode, from_json, flattening, and schema evolution — into practical patterns.
1. Understanding the Structure of Nested Data
Spark has three complex types. Telling them apart is what lets you pick the right tool.
| Type | Meaning | Access / unroll |
|---|---|---|
StructType | Object (group of fields) | col.field (dot notation) |
ArrayType | Array (list) | Turn into rows with explode |
MapType | Key-value map | explode or col[key] |
df.printSchema()
# root
# |-- user: struct
# | |-- id: long
# | |-- profile: struct
# | | |-- name: string
# | | |-- tags: array<string>
# |-- events: array<struct<type:string, ts:long>>2. Struct Access — Dot Notation and Flattening
Nested struct fields are accessed with a dot (.).
from pyspark.sql import functions as F
df.select(
F.col("user.id").alias("user_id"),
F.col("user.profile.name").alias("name"),
)Flattening an entire struct
To flatten a deep struct wholesale, unroll its fields recursively.
def flatten(df):
"""Flatten all struct fields into top-level columns"""
flat_cols = []
nested = []
for field in df.schema.fields:
if isinstance(field.dataType, T.StructType):
nested.append(field.name)
else:
flat_cols.append(F.col(field.name))
for n in nested:
for child in df.schema[n].dataType.fields:
flat_cols.append(F.col(f"{n}.{child.name}").alias(f"{n}_{child.name}"))
flat = df.select(flat_cols)
# Recurse if any structs remain
if any(isinstance(f.dataType, T.StructType) for f in flat.schema.fields):
return flatten(flat)
return flatFlattening is analytics/BI friendly, but fully unrolling a deep structure can leave you with hundreds of columns. Selecting only the fields you need is usually better — save full flattening for the exploration phase.
3. Explode — Arrays into Rows
Arrays are unrolled into rows with explode. N elements in one row become N rows.
# Explode the events array into rows
exploded = df.select(
"user.id",
F.explode("events").alias("event")) # array -> rows
# Access struct fields after exploding
result = exploded.select(
"id",
F.col("event.type").alias("event_type"),
F.col("event.ts").alias("ts"))| Function | Difference |
|---|---|
explode | Empty array/NULL drops the row |
explode_outer | Preserves empty arrays/NULLs as NULL rows |
posexplode | Includes the index (position) |
# If you must preserve empty arrays (acts like a left join)
F.explode_outer("events")
# If you need the position
df.select("id", F.posexplode("events").alias("pos", "event"))A common bug: with
explode, rows with empty arrays or NULLs silently disappear. If you need to preserve the original row count, useexplode_outer.
Arrays inside arrays (nested explode)
# orders[].items[] — explode twice
df.select("order_id", F.explode("orders").alias("order")) \
.select("order_id", F.explode("order.items").alias("item")) \
.select("order_id", "item.sku", "item.qty")4. Parsing JSON Strings — from_json
Data often arrives as a JSON string (a Kafka value, a log column). Structure it with from_json.
from pyspark.sql import types as T
# Explicit schema (recommended — stable and fast)
schema = T.StructType([
T.StructField("id", T.LongType()),
T.StructField("tags", T.ArrayType(T.StringType())),
T.StructField("meta", T.StructType([
T.StructField("lang", T.StringType()),
])),
])
parsed = df.withColumn("data", F.from_json("json_str", schema))
result = parsed.select("data.id", "data.tags", "data.meta.lang")| Function | Use case |
|---|---|
from_json | JSON string → struct (with explicit schema) |
to_json | struct → JSON string |
get_json_object | Extract a single value by path (no schema needed) |
json_tuple | Multiple top-level keys at once |
schema_of_json | Infer a schema from a sample |
Relying on inference (
schema_of_json) instead of an explicit schema is slow and unstable. Specify the schema whenever you can. Use inference only during exploration when many fields are unknown; in production, use a fixed schema.
5. Handling Corrupt Records — Safely
Real-world JSON comes with corrupt records mixed in. Read it unguarded and the whole job fails, or a single row poisons a partition with NULLs.
# Read modes: PERMISSIVE (default), DROPMALFORMED, FAILFAST
df = (spark.read
.option("mode", "PERMISSIVE") # corrupt rows go to _corrupt_record
.option("columnNameOfCorruptRecord", "_corrupt")
.schema(schema)
.json("path"))
# Separate corrupt records (don't discard — quarantine for later analysis)
good = df.where("_corrupt IS NULL")
bad = df.where("_corrupt IS NOT NULL")
bad.write.mode("append").parquet("quarantine/")| Mode | Behavior |
|---|---|
PERMISSIVE | Corrupt data goes into the corrupt column, other fields NULL |
DROPMALFORMED | Drops corrupt rows |
FAILFAST | Fails immediately on a corrupt row |
Recommendation: don't discard corrupt records — quarantine them. Silent drops mean data loss you never notice. Accumulating them in a quarantine table lets you trace problems back to the source.
6. Schema Evolution
Fields get added and removed with each source version. When files have differing schemas:
# Merge schemas at read time (Parquet)
df = spark.read.option("mergeSchema", "true").parquet("path")
# Allow missing columns when unioning multiple DataFrames
combined = df_v1.unionByName(df_v2, allowMissingColumns=True)Lakehouse formats support schema evolution at the table level.
# Iceberg/Delta: automatically merge new columns on write
(df.writeTo("analytics.events")
.option("mergeSchema", "true")
.append())- Adding new columns is safe (existing data reads as NULL).
- Type changes and column drops are risky, so establish a policy. (Iceberg's schema evolution rules are covered in a separate post, "Apache Iceberg Spec Version Comparison.")
7. Performance Watch-Outs
| Item | Watch out |
|---|---|
| explode blow-up | Exploding large arrays multiplies row counts → memory and shuffle pressure |
| Unnecessary flattening | Select only required fields (pruning benefits) |
| Schema inference | Full-scan cost — be explicit in production |
| Nested explode | Multiplicative row explosion (orders×items) |
| Ignoring corrupt records | Silent data loss — always quarantine |
Since explode multiplies rows, it's best to apply filters before exploding to shrink the data you're about to unroll.
8. Summary
| Task | Tool |
|---|---|
| struct access | Dot notation col.field |
| array → rows | explode / explode_outer / posexplode |
| JSON strings | from_json (explicit schema) / get_json_object |
| Corrupt records | Read mode + quarantine |
| Schema evolution | mergeSchema, unionByName(allowMissingColumns) |
The key to complex nested data is "understand the structure first (printSchema), then unroll only as much as you need." Handle structs with dot notation, arrays with explode, and JSON strings with explicit-schema from_json — and watch out for the two traps: losing empty arrays (explode vs explode_outer) and failing to quarantine corrupt records. Rather than flattening everything indiscriminately, the discipline of unrolling only the paths your analysis needs protects both performance and readability.
This post is based on Spark 3.5. If you need help designing semi-structured data ingestion and normalization pipelines or a schema evolution strategy, feel free to reach out.
— The Data Dynamics Engineering Team