Blog
pysparksparktestingpytestdata-qualitydata-engineering

Testing PySpark Code — Protecting Data Pipelines from Regressions

How to unit test PySpark transformation logic. Practical patterns covering SparkSession fixtures, DataFrame equality comparison (chispa/built-in assert), transformation function design, schema and edge-case testing, data quality validation, and CI integration.

Data DynamicsJune 5, 20266 min read

Because of the conventional wisdom that "data pipelines are hard to test," a lot of PySpark code ships to production without any tests. Then someone changes a single column, bad data flows downstream silently, and weeks later you discover the dashboards are off. PySpark is perfectly testable. The key is designing your transformation logic to be testable in the first place.

This post walks through the practical patterns of PySpark testing: SparkSession fixtures, DataFrame equality comparison, transformation function design, edge-case and schema testing, and CI integration.

1. Testable Design — Isolate Transformations as Functions

90% of testability is decided by code structure. Separate reads/writes (I/O) from transformation logic, and you can test transformations as pure functions.

# BAD: I/O and logic are tangled together — untestable
def job():
    df = spark.read.parquet("s3://...")
    result = df.filter(...).groupBy(...).agg(...)   # logic trapped in here
    result.write.parquet("s3://...")
 
# GOOD: extract the transformation as a pure function
def transform(df):                    # input DataFrame -> output DataFrame
    return (df.filter(F.col("amount") > 0)
              .groupBy("user").agg(F.sum("amount").alias("total")))
 
def job():
    df = spark.read.parquet("s3://...")
    transform(df).write.parquet("s3://...")   # keep I/O thin

A function like transform(df) that takes a DataFrame and returns a DataFrame is easy to test with small inputs.

2. SparkSession Fixture

Create one local SparkSession for tests and reuse it (a pytest fixture).

# conftest.py
import pytest
from pyspark.sql import SparkSession
 
@pytest.fixture(scope="session")
def spark():
    spark = (SparkSession.builder
        .master("local[2]")                         # 2 local cores
        .appName("tests")
        .config("spark.sql.shuffle.partitions", "2") # keep tests small -> fast
        .config("spark.ui.enabled", "false")
        .getOrCreate())
    yield spark
    spark.stop()

Tip: lower spark.sql.shuffle.partitions to around 2 in tests. With the default of 200, even tiny test data produces 200 empty partitions and slows everything down.

3. DataFrame Equality Comparison

The heart of a test is "does the output DataFrame match the expected one?" A plain == won't do — you need a dedicated comparison.

Built-in assert (Spark 3.5+)

from pyspark.testing import assertDataFrameEqual
 
def test_transform(spark):
    input_df = spark.createDataFrame(
        [("a", 100), ("a", 50), ("b", 30), ("b", -10)],  # -10 should be filtered out
        ["user", "amount"])
 
    expected = spark.createDataFrame(
        [("a", 150), ("b", 30)], ["user", "total"])
 
    result = transform(input_df)
    assertDataFrameEqual(result, expected)   # order-insensitive comparison

Spark 3.5+ ships pyspark.testing.assertDataFrameEqual out of the box. It handles row ordering and floating-point tolerance.

chispa (widely used third party)

from chispa import assert_df_equality
 
assert_df_equality(result, expected, ignore_row_order=True, ignore_nullable=True)
ToolCharacteristics
pyspark.testing (built-in)3.5+, no extra dependency
chispaRich options, friendly diff output

4. Edge Cases — Where the Real Bugs Hide

Bugs show up at boundary conditions, not in the happy path. Tests you must write:

def test_empty_input(spark):
    # an empty DataFrame must not break anything
    empty = spark.createDataFrame([], "user string, amount long")
    result = transform(empty)
    assert result.count() == 0
 
def test_nulls(spark):
    # NULL handling — what happens to NULLs in aggregations and filters
    df = spark.createDataFrame([("a", None), ("a", 100)], "user string, amount long")
    result = transform(df)
    # explicitly verify the expected behavior
 
def test_negative_and_zero(spark):
    # boundary values: 0, negatives
    ...
Edge caseWhy
Empty inputAggregations/joins break on empty data
NULL valuesNULL propagation in aggregation and comparison
Duplicate keysdedup/join logic
Boundary values (0, negatives, max)off-by-one, sign errors
Single row / single groupwindow functions, aggregations

5. Schema Tests

It's not just the data — the output schema is a contract too. Change a column name or type and downstream consumers break.

def test_output_schema(spark):
    input_df = spark.createDataFrame([("a", 100)], ["user", "amount"])
    result = transform(input_df)
 
    expected_schema = ["user", "total"]
    assert result.columns == expected_schema
    assert dict(result.dtypes)["total"] == "bigint"

6. Data Quality Validation — Production Guards

Separate from unit tests, embed runtime quality checks against production data in the pipeline itself.

def assert_quality(df):
    total = df.count()
    # critical columns must not be NULL
    null_ids = df.filter(F.col("user").isNull()).count()
    assert null_ids == 0, f"user NULL count: {null_ids}"
    # key uniqueness
    assert df.select("user").distinct().count() == total, "duplicate user"
    # value range
    assert df.filter(F.col("total") < 0).count() == 0, "negative total"

As things grow, manage declarative quality rules with a dedicated framework such as Great Expectations or dbt tests. (AI-based quality validation is also covered in a separate post, "AI-Driven Data Pipeline Automation.")

7. CI Integration

Run the tests automatically in CI to block regressions.

# Example: GitHub Actions sketch
steps:
  - uses: actions/setup-java@v4   # Spark needs a JVM
    with: { java-version: '17' }
  - run: pip install pyspark chispa pytest
  - run: pytest tests/ -v
ConcernGuideline
JVMSpark requires Java (install in CI)
Speedlocal[2], fewer shuffle partitions, small data
IsolationReuse the SparkSession fixture
Integration testsReal I/O (sample files) for a small subset only

8. The Test Pyramid

        ▲  Integration tests (real reads/writes, a few)
       ───
      ─────  Transformation unit tests (transform functions, many)  <- focus here
     ───────
    ─────────  Pure helper functions (no Spark needed, fastest)

Put most of your tests at the transformation-function level and keep real I/O integration tests to a minimum. Pure logic that doesn't need Spark should be extracted and tested even faster on its own.

9. Summary

AreaKey point
DesignSeparate I/O from transformation -> pure transform(df) functions
FixtureLocal SparkSession, fewer shuffle partitions
ComparisonassertDataFrameEqual / chispa
Edge casesEmpty input, NULLs, duplicates, boundary values
SchemaValidate columns and types as a contract
QualityRuntime guards + Great Expectations
CIInstall a JVM, keep it small and fast

The core insight of PySpark testing is that "extracting transformation logic into pure DataFrame-to-DataFrame functions makes testing easy." Pull the logic out of I/O, compare equality with small inputs, and explicitly verify edge cases like NULLs and empty input — and your data pipelines can be protected from regressions just like any other software. Remember: a pipeline without tests breeds the most expensive bug of all — "silently wrong data."


This post is based on Spark 3.5. If you need help building a testing practice or automated quality validation for your data pipelines, feel free to reach out.

— The Data Dynamics Engineering Team