Blog
pysparksparkdata-qualitydeequvalidationdata-engineering

Large-Scale Data Quality Validation in PySpark — Catching "Silently Wrong Data" with Deequ

How to automatically validate data quality in multi-billion-row pipelines. We cover constraint-based validation, anomaly and distribution-shift detection, statistical profiling, anomaly detection, and quality-gate patterns that stop bad data from propagating downstream.

Data DynamicsJune 5, 20266 min read

The most expensive bug in a data pipeline is not the error that kills your job. It is silently wrong data. The job succeeds, but the NULL ratio suddenly jumps to 30%, negative values sneak into the revenue column, or yesterday's 1 million rows shrink to 10 thousand today. Bad data quietly flows into dashboards and models, and nobody notices until weeks later.

This post covers how to automatically validate the quality of large-scale data in PySpark — constraint-based validation, anomaly and distribution-shift detection, and quality gates.

1. Two Layers of Quality Validation

1) Unit tests       : Is the transformation "logic" correct? (small synthetic data)  <- separate post "Testing PySpark Code"
2) Data validation  : Is the actual "data" within expected bounds? (production data)  <- this post

These are different things. Even with flawless logic, broken source data produces wrong results. Data validation inspects the actual data at runtime.

2. Rolling Your Own — Core Constraint Checks

You can implement the essential checks directly in PySpark without any dedicated tooling.

from pyspark.sql import functions as F
 
def validate(df):
    total = df.count()
    checks = {}
 
    # Row count (detect sudden drops/spikes)
    checks["row_count"] = total
 
    # NULL ratio (key columns)
    checks["user_id_null_pct"] = df.filter(F.col("user_id").isNull()).count() / total
 
    # Uniqueness (PK duplicates)
    checks["user_id_unique"] = df.select("user_id").distinct().count() == total
 
    # Value range (negative revenue)
    checks["negative_amount"] = df.filter(F.col("amount") < 0).count()
 
    # Allowed value set (enum)
    valid_status = ["active", "inactive", "pending"]
    checks["invalid_status"] = df.filter(~F.col("status").isin(valid_status)).count()
 
    return checks
Check typeExamples
CompletenessNULL ratio
UniquenessPK duplicates
ValidityValue ranges, enums, regex
ConsistencyCross-column relationships (a <= b)
TimelinessPresence of recent data

A single count() is a full scan, so calling each check separately repeats that scan over and over. Whenever possible, compute multiple metrics in a single aggregation (see below).

3. Efficient Validation — One Scan

Bundle multiple metrics into one agg so they are computed in a single scan.

metrics = df.agg(
    F.count("*").alias("rows"),
    F.sum(F.col("user_id").isNull().cast("int")).alias("uid_nulls"),
    F.sum((F.col("amount") < 0).cast("int")).alias("neg_amount"),
    F.countDistinct("user_id").alias("uid_distinct"),
    F.min("amount").alias("amount_min"),
    F.max("amount").alias("amount_max"),
).collect()[0]

Instead of calling count() per metric, combining everything into one aggregation cuts the scan cost on large data by a factor of N.

4. Deequ / PyDeequ — A Declarative Quality Framework

As things scale, manage constraints declaratively with AWS's Deequ (PyDeequ is the PySpark binding). The concept is "unit tests for data."

from pydeequ.checks import Check, CheckLevel
from pydeequ.verification import VerificationSuite
 
check = (Check(spark, CheckLevel.Error, "data quality")
    .hasSize(lambda s: s > 1_000_000)            # row count lower bound
    .isComplete("user_id")                        # no NULLs
    .isUnique("user_id")                          # unique
    .isNonNegative("amount")                      # no negatives
    .isContainedIn("status", ["active", "inactive", "pending"])
    .hasCompleteness("email", lambda c: c > 0.95))  # at least 95% populated
 
result = (VerificationSuite(spark)
    .onData(df)
    .addCheck(check)
    .run())

Deequ's strengths:

  • Declarative constraints: manage rules as specifications rather than control flow.
  • Metrics repository: store quality metrics as time series for trend analysis.
  • Automatic suggestions: profile the data and automatically suggest constraints.

5. Detecting Anomalies and Distribution Shifts (Anomaly Detection)

Beyond static rules like "are there any negatives," the genuinely hard part is asking whether the distribution has changed compared to yesterday. A sudden rise in the NULL ratio or an abrupt change in row count is usually a signal of an upstream failure.

# Compare against historical metrics (with metrics persisted to a table)
history = spark.table("quality.metrics_history")
 
today_rows = metrics["rows"]
baseline = (history
    .filter("metric = 'rows'")
    .agg(F.avg("value").alias("avg"), F.stddev("value").alias("std"))
    .collect()[0])
 
# Alert if the value deviates more than 3 sigma from the mean
if abs(today_rows - baseline["avg"]) > 3 * baseline["std"]:
    raise ValueError(f"Row count anomaly: {today_rows} (expected {baseline['avg']:.0f}±{baseline['std']:.0f})")

Deequ ships with built-in support for this kind of time-series-based anomaly detection (relative change rate, standard-deviation based) via AnomalyDetection.

DetectionMethod
Sudden row-count change3σ vs. history / relative change rate
Rising NULL ratioCompleteness trend
Distribution shiftChanges in mean/quantiles, cardinality changes
New categoriesChanges in the enum set

6. Quality Gates — Blocking the Propagation of Bad Data

The goal of validation is not "detection" but "blocking propagation." Put quality validation into the pipeline as a gate, so data only flows downstream if it passes.

def quality_gate(df):
    result = run_checks(df)
    if result.has_errors():
        # Quarantine the bad data and block the downstream load
        df.write.mode("overwrite").save("quarantine/...")
        alert(result)                       # notify
        raise QualityGateError(result)      # halt the pipeline
 
    df.writeTo("analytics.events").append()  # load only on pass
[Transform] -> [Quality gate] --pass--> [Production table]
                    |
                    └--fail--> [Quarantine + alert + halt]

The core philosophy: it is better to stop and alert than to silently let suspicious data through. Once wrong data reaches dashboards and models, the cost of recovery is far higher.

7. Where to Place Validation

LocationChecks
Right after ingestion (raw)Source schema, completeness, encoding
After transformation (curated)Business rules, consistency
Before loading (quality gate)Final constraints, anomaly detection
After loading (monitoring)Trends, distribution shifts

Place validation at every layer, but make the pre-load gate the strictest.

8. Summary

AreaKey point
Hand-rolled checksMultiple metrics in a single agg
Declarative frameworkDeequ/PyDeequ constraints
Anomaly detectionDistribution change vs. history (3σ)
Quality gateLoad only on pass; on failure, quarantine + halt
PlacementValidate per layer, strictest before loading

The key insight of data quality validation is that "the job can succeed while the data is still wrong." Catch the obvious errors with static constraints (NULLs, ranges, uniqueness), capture subtle distribution shifts with history-based anomaly detection, and block the downstream propagation of bad data with quality gates. The most expensive bug of all — "silently wrong data" — can only be prevented when validation becomes part of the pipeline itself.


This post is based on Spark 3.5 + PyDeequ. If you need help building large-scale data quality validation and monitoring systems, feel free to reach out.

— Data Dynamics Engineering Team