Testing PySpark Code — Protecting Data Pipelines from Regressions
How to unit test PySpark transformation logic. Practical patterns covering SparkSession fixtures, DataFrame equality comparison (chispa/built-in assert), transformation function design, schema and edge-case testing, data quality validation, and CI integration.
Because of the conventional wisdom that "data pipelines are hard to test," a lot of PySpark code ships to production without any tests. Then someone changes a single column, bad data flows downstream silently, and weeks later you discover the dashboards are off. PySpark is perfectly testable. The key is designing your transformation logic to be testable in the first place.
This post walks through the practical patterns of PySpark testing: SparkSession fixtures, DataFrame equality comparison, transformation function design, edge-case and schema testing, and CI integration.
1. Testable Design — Isolate Transformations as Functions
90% of testability is decided by code structure. Separate reads/writes (I/O) from transformation logic, and you can test transformations as pure functions.
# BAD: I/O and logic are tangled together — untestable
def job():
df = spark.read.parquet("s3://...")
result = df.filter(...).groupBy(...).agg(...) # logic trapped in here
result.write.parquet("s3://...")
# GOOD: extract the transformation as a pure function
def transform(df): # input DataFrame -> output DataFrame
return (df.filter(F.col("amount") > 0)
.groupBy("user").agg(F.sum("amount").alias("total")))
def job():
df = spark.read.parquet("s3://...")
transform(df).write.parquet("s3://...") # keep I/O thinA function like transform(df) that takes a DataFrame and returns a DataFrame is easy to test with small inputs.
2. SparkSession Fixture
Create one local SparkSession for tests and reuse it (a pytest fixture).
# conftest.py
import pytest
from pyspark.sql import SparkSession
@pytest.fixture(scope="session")
def spark():
spark = (SparkSession.builder
.master("local[2]") # 2 local cores
.appName("tests")
.config("spark.sql.shuffle.partitions", "2") # keep tests small -> fast
.config("spark.ui.enabled", "false")
.getOrCreate())
yield spark
spark.stop()Tip: lower
spark.sql.shuffle.partitionsto around 2 in tests. With the default of 200, even tiny test data produces 200 empty partitions and slows everything down.
3. DataFrame Equality Comparison
The heart of a test is "does the output DataFrame match the expected one?" A plain == won't do — you need a dedicated comparison.
Built-in assert (Spark 3.5+)
from pyspark.testing import assertDataFrameEqual
def test_transform(spark):
input_df = spark.createDataFrame(
[("a", 100), ("a", 50), ("b", 30), ("b", -10)], # -10 should be filtered out
["user", "amount"])
expected = spark.createDataFrame(
[("a", 150), ("b", 30)], ["user", "total"])
result = transform(input_df)
assertDataFrameEqual(result, expected) # order-insensitive comparisonSpark 3.5+ ships pyspark.testing.assertDataFrameEqual out of the box. It handles row ordering and floating-point tolerance.
chispa (widely used third party)
from chispa import assert_df_equality
assert_df_equality(result, expected, ignore_row_order=True, ignore_nullable=True)| Tool | Characteristics |
|---|---|
pyspark.testing (built-in) | 3.5+, no extra dependency |
chispa | Rich options, friendly diff output |
4. Edge Cases — Where the Real Bugs Hide
Bugs show up at boundary conditions, not in the happy path. Tests you must write:
def test_empty_input(spark):
# an empty DataFrame must not break anything
empty = spark.createDataFrame([], "user string, amount long")
result = transform(empty)
assert result.count() == 0
def test_nulls(spark):
# NULL handling — what happens to NULLs in aggregations and filters
df = spark.createDataFrame([("a", None), ("a", 100)], "user string, amount long")
result = transform(df)
# explicitly verify the expected behavior
def test_negative_and_zero(spark):
# boundary values: 0, negatives
...| Edge case | Why |
|---|---|
| Empty input | Aggregations/joins break on empty data |
| NULL values | NULL propagation in aggregation and comparison |
| Duplicate keys | dedup/join logic |
| Boundary values (0, negatives, max) | off-by-one, sign errors |
| Single row / single group | window functions, aggregations |
5. Schema Tests
It's not just the data — the output schema is a contract too. Change a column name or type and downstream consumers break.
def test_output_schema(spark):
input_df = spark.createDataFrame([("a", 100)], ["user", "amount"])
result = transform(input_df)
expected_schema = ["user", "total"]
assert result.columns == expected_schema
assert dict(result.dtypes)["total"] == "bigint"6. Data Quality Validation — Production Guards
Separate from unit tests, embed runtime quality checks against production data in the pipeline itself.
def assert_quality(df):
total = df.count()
# critical columns must not be NULL
null_ids = df.filter(F.col("user").isNull()).count()
assert null_ids == 0, f"user NULL count: {null_ids}"
# key uniqueness
assert df.select("user").distinct().count() == total, "duplicate user"
# value range
assert df.filter(F.col("total") < 0).count() == 0, "negative total"As things grow, manage declarative quality rules with a dedicated framework such as Great Expectations or dbt tests. (AI-based quality validation is also covered in a separate post, "AI-Driven Data Pipeline Automation.")
7. CI Integration
Run the tests automatically in CI to block regressions.
# Example: GitHub Actions sketch
steps:
- uses: actions/setup-java@v4 # Spark needs a JVM
with: { java-version: '17' }
- run: pip install pyspark chispa pytest
- run: pytest tests/ -v| Concern | Guideline |
|---|---|
| JVM | Spark requires Java (install in CI) |
| Speed | local[2], fewer shuffle partitions, small data |
| Isolation | Reuse the SparkSession fixture |
| Integration tests | Real I/O (sample files) for a small subset only |
8. The Test Pyramid
▲ Integration tests (real reads/writes, a few)
───
───── Transformation unit tests (transform functions, many) <- focus here
───────
───────── Pure helper functions (no Spark needed, fastest)Put most of your tests at the transformation-function level and keep real I/O integration tests to a minimum. Pure logic that doesn't need Spark should be extracted and tested even faster on its own.
9. Summary
| Area | Key point |
|---|---|
| Design | Separate I/O from transformation -> pure transform(df) functions |
| Fixture | Local SparkSession, fewer shuffle partitions |
| Comparison | assertDataFrameEqual / chispa |
| Edge cases | Empty input, NULLs, duplicates, boundary values |
| Schema | Validate columns and types as a contract |
| Quality | Runtime guards + Great Expectations |
| CI | Install a JVM, keep it small and fast |
The core insight of PySpark testing is that "extracting transformation logic into pure DataFrame-to-DataFrame functions makes testing easy." Pull the logic out of I/O, compare equality with small inputs, and explicitly verify edge cases like NULLs and empty input — and your data pipelines can be protected from regressions just like any other software. Remember: a pipeline without tests breeds the most expensive bug of all — "silently wrong data."
This post is based on Spark 3.5. If you need help building a testing practice or automated quality validation for your data pipelines, feel free to reach out.
— The Data Dynamics Engineering Team