Blog
pysparksparkjsonnested-dataschema-evolutiondata-engineering

Taming Deeply Nested Semi-Structured Data in PySpark — 5-Level JSON and Schema Evolution

Five levels of nested JSON, arrays inside arrays, and a different schema from every source. This post walks through battle-tested PySpark patterns for flattening complex nested/semi-structured data, unrolling it with explode, and safely handling schema evolution and corrupt records.

Data DynamicsJune 5, 20266 min read

API responses, event logs, and IoT payloads don't arrive as tidy tables. You get JSON nested five levels deep, objects buried inside arrays, and fields that come and go from one source version to the next. Taming this semi-structured data into an analyzable form is one of the most routine — yet trickiest — tasks in data engineering.

This post distills the core PySpark tools for working with deeply nested data — struct access, explode, from_json, flattening, and schema evolution — into practical patterns.

1. Understanding the Structure of Nested Data

Spark has three complex types. Telling them apart is what lets you pick the right tool.

TypeMeaningAccess / unroll
StructTypeObject (group of fields)col.field (dot notation)
ArrayTypeArray (list)Turn into rows with explode
MapTypeKey-value mapexplode or col[key]
df.printSchema()
# root
#  |-- user: struct
#  |    |-- id: long
#  |    |-- profile: struct
#  |    |    |-- name: string
#  |    |    |-- tags: array<string>
#  |-- events: array<struct<type:string, ts:long>>

2. Struct Access — Dot Notation and Flattening

Nested struct fields are accessed with a dot (.).

from pyspark.sql import functions as F
 
df.select(
    F.col("user.id").alias("user_id"),
    F.col("user.profile.name").alias("name"),
)

Flattening an entire struct

To flatten a deep struct wholesale, unroll its fields recursively.

def flatten(df):
    """Flatten all struct fields into top-level columns"""
    flat_cols = []
    nested = []
    for field in df.schema.fields:
        if isinstance(field.dataType, T.StructType):
            nested.append(field.name)
        else:
            flat_cols.append(F.col(field.name))
    for n in nested:
        for child in df.schema[n].dataType.fields:
            flat_cols.append(F.col(f"{n}.{child.name}").alias(f"{n}_{child.name}"))
    flat = df.select(flat_cols)
    # Recurse if any structs remain
    if any(isinstance(f.dataType, T.StructType) for f in flat.schema.fields):
        return flatten(flat)
    return flat

Flattening is analytics/BI friendly, but fully unrolling a deep structure can leave you with hundreds of columns. Selecting only the fields you need is usually better — save full flattening for the exploration phase.

3. Explode — Arrays into Rows

Arrays are unrolled into rows with explode. N elements in one row become N rows.

# Explode the events array into rows
exploded = df.select(
    "user.id",
    F.explode("events").alias("event"))  # array -> rows
 
# Access struct fields after exploding
result = exploded.select(
    "id",
    F.col("event.type").alias("event_type"),
    F.col("event.ts").alias("ts"))
FunctionDifference
explodeEmpty array/NULL drops the row
explode_outerPreserves empty arrays/NULLs as NULL rows
posexplodeIncludes the index (position)
# If you must preserve empty arrays (acts like a left join)
F.explode_outer("events")
 
# If you need the position
df.select("id", F.posexplode("events").alias("pos", "event"))

A common bug: with explode, rows with empty arrays or NULLs silently disappear. If you need to preserve the original row count, use explode_outer.

Arrays inside arrays (nested explode)

# orders[].items[] — explode twice
df.select("order_id", F.explode("orders").alias("order")) \
  .select("order_id", F.explode("order.items").alias("item")) \
  .select("order_id", "item.sku", "item.qty")

4. Parsing JSON Strings — from_json

Data often arrives as a JSON string (a Kafka value, a log column). Structure it with from_json.

from pyspark.sql import types as T
 
# Explicit schema (recommended — stable and fast)
schema = T.StructType([
    T.StructField("id", T.LongType()),
    T.StructField("tags", T.ArrayType(T.StringType())),
    T.StructField("meta", T.StructType([
        T.StructField("lang", T.StringType()),
    ])),
])
 
parsed = df.withColumn("data", F.from_json("json_str", schema))
result = parsed.select("data.id", "data.tags", "data.meta.lang")
FunctionUse case
from_jsonJSON string → struct (with explicit schema)
to_jsonstruct → JSON string
get_json_objectExtract a single value by path (no schema needed)
json_tupleMultiple top-level keys at once
schema_of_jsonInfer a schema from a sample

Relying on inference (schema_of_json) instead of an explicit schema is slow and unstable. Specify the schema whenever you can. Use inference only during exploration when many fields are unknown; in production, use a fixed schema.

5. Handling Corrupt Records — Safely

Real-world JSON comes with corrupt records mixed in. Read it unguarded and the whole job fails, or a single row poisons a partition with NULLs.

# Read modes: PERMISSIVE (default), DROPMALFORMED, FAILFAST
df = (spark.read
    .option("mode", "PERMISSIVE")                       # corrupt rows go to _corrupt_record
    .option("columnNameOfCorruptRecord", "_corrupt")
    .schema(schema)
    .json("path"))
 
# Separate corrupt records (don't discard — quarantine for later analysis)
good = df.where("_corrupt IS NULL")
bad  = df.where("_corrupt IS NOT NULL")
bad.write.mode("append").parquet("quarantine/")
ModeBehavior
PERMISSIVECorrupt data goes into the corrupt column, other fields NULL
DROPMALFORMEDDrops corrupt rows
FAILFASTFails immediately on a corrupt row

Recommendation: don't discard corrupt records — quarantine them. Silent drops mean data loss you never notice. Accumulating them in a quarantine table lets you trace problems back to the source.

6. Schema Evolution

Fields get added and removed with each source version. When files have differing schemas:

# Merge schemas at read time (Parquet)
df = spark.read.option("mergeSchema", "true").parquet("path")
 
# Allow missing columns when unioning multiple DataFrames
combined = df_v1.unionByName(df_v2, allowMissingColumns=True)

Lakehouse formats support schema evolution at the table level.

# Iceberg/Delta: automatically merge new columns on write
(df.writeTo("analytics.events")
   .option("mergeSchema", "true")
   .append())
  • Adding new columns is safe (existing data reads as NULL).
  • Type changes and column drops are risky, so establish a policy. (Iceberg's schema evolution rules are covered in a separate post, "Apache Iceberg Spec Version Comparison.")

7. Performance Watch-Outs

ItemWatch out
explode blow-upExploding large arrays multiplies row counts → memory and shuffle pressure
Unnecessary flatteningSelect only required fields (pruning benefits)
Schema inferenceFull-scan cost — be explicit in production
Nested explodeMultiplicative row explosion (orders×items)
Ignoring corrupt recordsSilent data loss — always quarantine

Since explode multiplies rows, it's best to apply filters before exploding to shrink the data you're about to unroll.

8. Summary

TaskTool
struct accessDot notation col.field
array → rowsexplode / explode_outer / posexplode
JSON stringsfrom_json (explicit schema) / get_json_object
Corrupt recordsRead mode + quarantine
Schema evolutionmergeSchema, unionByName(allowMissingColumns)

The key to complex nested data is "understand the structure first (printSchema), then unroll only as much as you need." Handle structs with dot notation, arrays with explode, and JSON strings with explicit-schema from_json — and watch out for the two traps: losing empty arrays (explode vs explode_outer) and failing to quarantine corrupt records. Rather than flattening everything indiscriminately, the discipline of unrolling only the paths your analysis needs protects both performance and readability.


This post is based on Spark 3.5. If you need help designing semi-structured data ingestion and normalization pipelines or a schema evolution strategy, feel free to reach out.

— The Data Dynamics Engineering Team