Blog
pysparksparkdata-qualitydeequvalidationdata-engineering

PySpark 대규모 데이터 품질 검증 — Deequ로 "조용히 틀린 데이터" 잡기

수십억 행 파이프라인에서 데이터 품질을 자동 검증하는 법. 제약(Constraint) 기반 검증, 이상치·분포 변화 탐지, 통계 프로파일링, anomaly detection, 그리고 품질 게이트로 나쁜 데이터의 하류 전파를 막는 패턴을 정리합니다.

Data Dynamics2026년 6월 5일13 min read

여러분의 파이프라인 잡이 "성공"으로 표시됐다고 안심하셨나요? 데이터 파이프라인에서 가장 비싼 버그는 잡을 죽이는 에러가 아닙니다. 조용히 틀린 데이터입니다. 잡은 성공했는데 NULL 비율이 갑자기 30%로 튀거나, 매출 컬럼에 음수가 섞이거나, 어제 100만이던 행이 오늘 1만으로 줄어듭니다. 아무도 모르게 잘못된 데이터가 대시보드와 모델로 흘러가, 몇 주 뒤에야 발견됩니다.

이 글은 PySpark 대규모 데이터의 품질을 자동 검증하는 법 — 제약 기반 검증, 이상치·분포 변화 탐지, 품질 게이트 — 를 정리합니다.

이 글에서 배우는 것

  • 데이터 품질 검증이 단위 테스트와 어떻게 다른지
  • PySpark 만으로 핵심 제약을 효율적으로 검증하는 방법
  • Deequ/PyDeequ 를 활용한 선언적 품질 프레임워크 구성
  • 행 수 급변·NULL 비율 상승 같은 이상치를 이력 기반으로 탐지하는 법
  • 나쁜 데이터가 하류로 흘러가지 못하도록 막는 품질 게이트 패턴

1. 품질 검증의 두 층위

① 단위 테스트   : 변환 "로직"이 맞는가 (작은 합성 데이터)  ← 별도 글 "PySpark 코드 테스트"
② 데이터 검증   : 실제 "데이터"가 기대 범위인가 (프로덕션 데이터)  ← 이 글

둘은 엄연히 다릅니다. 코드 로직이 완벽해도 소스 데이터가 망가지면 결과가 틀립니다. 데이터 검증은 런타임에 실제 데이터를 점검하는 작업이거든요.

한 문장으로: "로직 테스트"는 코드를, "데이터 검증"은 프로덕션 데이터를 대상으로 합니다.

2. 직접 구현 — 핵심 제약 검증

전용 도구 없이도 핵심 검증은 PySpark 로 바로 구현할 수 있습니다. 먼저 가장 기본적인 패턴을 살펴봅시다.

from pyspark.sql import functions as F
 
def validate(df):
    total = df.count()
    checks = {}
 
    # 행 수 (급감/급증 감지)
    checks["row_count"] = total
 
    # NULL 비율 (핵심 컬럼)
    checks["user_id_null_pct"] = df.filter(F.col("user_id").isNull()).count() / total
 
    # 유일성 (PK 중복)
    checks["user_id_unique"] = df.select("user_id").distinct().count() == total
 
    # 값 범위 (음수 매출)
    checks["negative_amount"] = df.filter(F.col("amount") < 0).count()
 
    # 허용 값 집합 (enum)
    valid_status = ["active", "inactive", "pending"]
    checks["invalid_status"] = df.filter(~F.col("status").isin(valid_status)).count()
 
    return checks

데이터 품질은 크게 다섯 가지 관점으로 나눌 수 있습니다.

검증 종류예시
완전성(completeness)NULL 비율
유일성(uniqueness)PK 중복
유효성(validity)값 범위, enum, 정규식
일관성(consistency)컬럼 간 관계(a <= b)
적시성(timeliness)최신 데이터 존재 여부

⚠️ 한 번의 count() 가 전체 스캔이므로, 검증을 여러 번 분리해 호출하면 그만큼 스캔이 반복됩니다. 가능하면 한 번의 집계로 여러 메트릭을 동시에 계산하세요(아래).

3. 효율적 검증 — 한 번의 스캔으로

검증 함수를 메트릭마다 따로 호출하면 Spark 가 데이터를 그만큼 반복해서 읽습니다. 마치 같은 책을 페이지마다 처음부터 다시 펼치는 것과 같죠. 여러 메트릭을 하나의 agg 로 모아 단일 스캔으로 처리하면 됩니다.

metrics = df.agg(
    F.count("*").alias("rows"),
    F.sum(F.col("user_id").isNull().cast("int")).alias("uid_nulls"),
    F.sum((F.col("amount") < 0).cast("int")).alias("neg_amount"),
    F.countDistinct("user_id").alias("uid_distinct"),
    F.min("amount").alias("amount_min"),
    F.max("amount").alias("amount_max"),
).collect()[0]

count() 를 메트릭마다 부르는 대신 하나의 집계로 묶으면, 큰 데이터에서 스캔 비용이 N분의 1이 됩니다.

4. Deequ / PyDeequ — 선언적 품질 프레임워크

규모가 커지면 검증 규칙이 수십 개로 늘어나고, 코드는 점점 복잡해집니다. 이때 AWS 의 Deequ(PySpark 바인딩 PyDeequ)를 사용하면 제약을 코드 흐름이 아닌 명세로 관리할 수 있습니다. "데이터에 대한 단위 테스트"라는 컨셉이죠.

from pydeequ.checks import Check, CheckLevel
from pydeequ.verification import VerificationSuite
 
check = (Check(spark, CheckLevel.Error, "data quality")
    .hasSize(lambda s: s > 1_000_000)            # 행 수 하한
    .isComplete("user_id")                        # NULL 없음
    .isUnique("user_id")                          # 유일
    .isNonNegative("amount")                      # 음수 없음
    .isContainedIn("status", ["active", "inactive", "pending"])
    .hasCompleteness("email", lambda c: c > 0.95))  # 95% 이상 채워짐
 
result = (VerificationSuite(spark)
    .onData(df)
    .addCheck(check)
    .run())

Deequ 의 강점:

  • 선언적 제약: 규칙을 코드 흐름이 아니라 명세로 관리.
  • 메트릭 저장소: 품질 메트릭을 시계열로 저장해 추세 분석.
  • 자동 제안: 데이터를 프로파일링해 제약을 자동 제안.

5. 이상치·분포 변화 탐지 (Anomaly Detection)

"음수가 있나" 같은 정적 규칙을 넘어, 어제와 비교해 분포가 변했는가를 보는 것이 진짜 어려운 부분입니다. 갑작스러운 NULL 비율 증가나 행 수 급변은 보통 상류 시스템 장애의 신호거든요. 과거 이력과 비교해 통계적으로 크게 벗어나는 시점을 포착해 봅시다.

# 과거 메트릭 이력과 비교 (메트릭을 테이블에 적재해두고)
history = spark.table("quality.metrics_history")
 
today_rows = metrics["rows"]
baseline = (history
    .filter("metric = 'rows'")
    .agg(F.avg("value").alias("avg"), F.stddev("value").alias("std"))
    .collect()[0])
 
# 평균에서 3σ 이상 벗어나면 경보
if abs(today_rows - baseline["avg"]) > 3 * baseline["std"]:
    raise ValueError(f"행 수 이상: {today_rows} (기대 {baseline['avg']:.0f}±{baseline['std']:.0f})")

Deequ 는 AnomalyDetection 으로 이런 시계열 기반 이상 탐지(상대 변화율, 표준편차 기반)를 내장 지원합니다.

탐지방법
행 수 급변이력 대비 3σ / 상대 변화율
NULL 비율 상승완전성 추세
분포 이동평균·분위수 변화, 카디널리티 변화
신규 카테고리enum 집합 변화

6. 품질 게이트 — 나쁜 데이터의 전파 차단

검증의 목적은 단순한 "발견"이 아닙니다. "전파 차단" 이 핵심입니다. 문제를 발견하고도 데이터를 그냥 흘려보내면 의미가 없으니까요. 품질 검증을 파이프라인의 게이트로 넣어, 통과해야만 하류로 내보내도록 구성하면 됩니다.

def quality_gate(df):
    result = run_checks(df)
    if result.has_errors():
        # 나쁜 데이터를 격리하고 하류 적재를 막음
        df.write.mode("overwrite").save("quarantine/...")
        alert(result)                       # 알림
        raise QualityGateError(result)      # 파이프라인 중단
 
    df.writeTo("analytics.events").append()  # 통과 시에만 적재
Loading diagram…

팁: 의심스러운 데이터를 조용히 흘려보내는 것보다, 멈추고 알리는 것이 낫습니다. 잘못된 데이터가 대시보드·모델에 들어가면 복구 비용이 훨씬 큽니다.

7. 검증을 어디에 둘까

검증은 파이프라인의 한 지점에만 두는 게 아닙니다. 레이어마다 성격이 다른 검증을 배치하는 것이 효과적입니다.

위치검증
수집 직후(raw)소스 스키마·완전성·인코딩
변환 후(curated)비즈니스 규칙·일관성
적재 전(품질 게이트)최종 제약·이상 탐지
적재 후(모니터링)추세·분포 변화

레이어마다 검증을 두되, 적재 전 게이트를 가장 엄격하게 합니다.

8. 정리

영역핵심
직접 검증한 번의 agg 로 여러 메트릭
선언적 프레임워크Deequ/PyDeequ 제약
이상 탐지이력 대비 분포 변화(3σ)
품질 게이트통과해야 적재, 실패는 격리+중단
배치레이어별 검증, 적재 전 가장 엄격

데이터 품질 검증의 핵심 통찰은 "잡이 성공해도 데이터는 틀릴 수 있다"는 것입니다. 정적 제약(NULL·범위·유일성)으로 명백한 오류를 잡고, 이력 대비 이상 탐지로 은밀한 분포 변화를 포착하며, 품질 게이트로 나쁜 데이터의 하류 전파를 차단하세요. "조용히 틀린 데이터"라는 가장 비싼 버그는, 검증을 파이프라인의 일부로 만들 때 비로소 막을 수 있습니다.

마치며 — 핵심 요약

  • 잡 성공 ≠ 데이터 정확: 파이프라인이 돌아가도 데이터는 조용히 틀릴 수 있습니다.
  • 단일 agg 로 묶기: 메트릭마다 count() 를 따로 부르면 스캔이 반복됩니다. 하나의 집계로 묶으세요.
  • Deequ 는 규모가 커질 때: 검증 규칙이 많아지면 선언적 프레임워크가 관리 부담을 크게 줄여 줍니다.
  • 이상 탐지는 이력이 핵심: 정적 규칙만으론 부족합니다. 과거 분포와 비교해야 은밀한 변화를 잡을 수 있습니다.
  • 게이트가 없으면 검증도 없다: 발견만 하고 차단하지 않으면 나쁜 데이터는 계속 흘러갑니다.
  • 레이어마다 다른 검증: 수집 직후는 스키마·완전성, 적재 직전은 비즈니스 규칙·이상 탐지로 역할을 나누세요.

데이터를 믿을 수 있어야 분석도 모델도 신뢰할 수 있습니다. 오늘 파이프라인에 품질 게이트 하나 추가하는 것부터 시작해 봅시다.


이 글은 Spark 3.5 + PyDeequ 기준으로 작성되었습니다. 대규모 데이터 품질 검증·모니터링 체계 구축이 필요하시면 언제든 문의해 주세요.

— Data Dynamics 엔지니어링 팀