Blog
pysparksparkdata-qualitydeequvalidationdata-engineering

PySpark 대규모 데이터 품질 검증 — Deequ로 "조용히 틀린 데이터" 잡기

수십억 행 파이프라인에서 데이터 품질을 자동 검증하는 법. 제약(Constraint) 기반 검증, 이상치·분포 변화 탐지, 통계 프로파일링, anomaly detection, 그리고 품질 게이트로 나쁜 데이터의 하류 전파를 막는 패턴을 정리합니다.

Data Dynamics2026년 6월 5일9 min read

데이터 파이프라인에서 가장 비싼 버그는 잡을 죽이는 에러가 아닙니다. 조용히 틀린 데이터입니다. 잡은 성공했는데 NULL 비율이 갑자기 30%로 튀거나, 매출 컬럼에 음수가 섞이거나, 어제 100만이던 행이 오늘 1만으로 줄어듭니다. 아무도 모르게 잘못된 데이터가 대시보드와 모델로 흘러가, 몇 주 뒤에야 발견됩니다.

이 글은 PySpark 대규모 데이터의 품질을 자동 검증하는 법 — 제약 기반 검증, 이상치·분포 변화 탐지, 품질 게이트 — 를 정리합니다.

1. 품질 검증의 두 층위

① 단위 테스트   : 변환 "로직"이 맞는가 (작은 합성 데이터)  ← 별도 글 "PySpark 코드 테스트"
② 데이터 검증   : 실제 "데이터"가 기대 범위인가 (프로덕션 데이터)  ← 이 글

둘은 다릅니다. 로직이 완벽해도 소스 데이터가 망가지면 결과가 틀립니다. 데이터 검증은 런타임에 실제 데이터를 점검합니다.

2. 직접 구현 — 핵심 제약 검증

전용 도구 없이도 핵심 검증은 PySpark 로 바로 구현합니다.

from pyspark.sql import functions as F
 
def validate(df):
    total = df.count()
    checks = {}
 
    # 행 수 (급감/급증 감지)
    checks["row_count"] = total
 
    # NULL 비율 (핵심 컬럼)
    checks["user_id_null_pct"] = df.filter(F.col("user_id").isNull()).count() / total
 
    # 유일성 (PK 중복)
    checks["user_id_unique"] = df.select("user_id").distinct().count() == total
 
    # 값 범위 (음수 매출)
    checks["negative_amount"] = df.filter(F.col("amount") < 0).count()
 
    # 허용 값 집합 (enum)
    valid_status = ["active", "inactive", "pending"]
    checks["invalid_status"] = df.filter(~F.col("status").isin(valid_status)).count()
 
    return checks
검증 종류예시
완전성(completeness)NULL 비율
유일성(uniqueness)PK 중복
유효성(validity)값 범위, enum, 정규식
일관성(consistency)컬럼 간 관계(a <= b)
적시성(timeliness)최신 데이터 존재 여부

한 번의 count() 가 전체 스캔이므로, 검증을 여러 번 분리해 호출하면 그만큼 스캔이 반복됩니다. 가능하면 한 번의 집계로 여러 메트릭을 동시에 계산하세요(아래).

3. 효율적 검증 — 한 번의 스캔으로

여러 메트릭을 하나의 agg 로 모아 단일 스캔으로 처리합니다.

metrics = df.agg(
    F.count("*").alias("rows"),
    F.sum(F.col("user_id").isNull().cast("int")).alias("uid_nulls"),
    F.sum((F.col("amount") < 0).cast("int")).alias("neg_amount"),
    F.countDistinct("user_id").alias("uid_distinct"),
    F.min("amount").alias("amount_min"),
    F.max("amount").alias("amount_max"),
).collect()[0]

count() 를 메트릭마다 부르는 대신 하나의 집계로 묶으면, 큰 데이터에서 스캔 비용이 N분의 1이 됩니다.

4. Deequ / PyDeequ — 선언적 품질 프레임워크

규모가 커지면 AWS 의 Deequ(PySpark 바인딩 PyDeequ)로 제약을 선언적으로 관리합니다. "데이터에 대한 단위 테스트"라는 컨셉입니다.

from pydeequ.checks import Check, CheckLevel
from pydeequ.verification import VerificationSuite
 
check = (Check(spark, CheckLevel.Error, "data quality")
    .hasSize(lambda s: s > 1_000_000)            # 행 수 하한
    .isComplete("user_id")                        # NULL 없음
    .isUnique("user_id")                          # 유일
    .isNonNegative("amount")                      # 음수 없음
    .isContainedIn("status", ["active", "inactive", "pending"])
    .hasCompleteness("email", lambda c: c > 0.95))  # 95% 이상 채워짐
 
result = (VerificationSuite(spark)
    .onData(df)
    .addCheck(check)
    .run())

Deequ 의 강점:

  • 선언적 제약: 규칙을 코드 흐름이 아니라 명세로 관리.
  • 메트릭 저장소: 품질 메트릭을 시계열로 저장해 추세 분석.
  • 자동 제안: 데이터를 프로파일링해 제약을 자동 제안.

5. 이상치·분포 변화 탐지 (Anomaly Detection)

"음수가 있나" 같은 정적 규칙을 넘어, 어제와 비교해 분포가 변했는가를 보는 것이 진짜 어려운 부분입니다. 갑작스러운 NULL 비율 증가, 행 수 급변은 보통 상류 장애의 신호입니다.

# 과거 메트릭 이력과 비교 (메트릭을 테이블에 적재해두고)
history = spark.table("quality.metrics_history")
 
today_rows = metrics["rows"]
baseline = (history
    .filter("metric = 'rows'")
    .agg(F.avg("value").alias("avg"), F.stddev("value").alias("std"))
    .collect()[0])
 
# 평균에서 3σ 이상 벗어나면 경보
if abs(today_rows - baseline["avg"]) > 3 * baseline["std"]:
    raise ValueError(f"행 수 이상: {today_rows} (기대 {baseline['avg']:.0f}±{baseline['std']:.0f})")

Deequ 는 AnomalyDetection 으로 이런 시계열 기반 이상 탐지(상대 변화율, 표준편차 기반)를 내장 지원합니다.

탐지방법
행 수 급변이력 대비 3σ / 상대 변화율
NULL 비율 상승완전성 추세
분포 이동평균·분위수 변화, 카디널리티 변화
신규 카테고리enum 집합 변화

6. 품질 게이트 — 나쁜 데이터의 전파 차단

검증의 목적은 "발견"이 아니라 "전파 차단" 입니다. 품질 검증을 파이프라인의 게이트로 넣어, 통과해야만 하류로 내보냅니다.

def quality_gate(df):
    result = run_checks(df)
    if result.has_errors():
        # 나쁜 데이터를 격리하고 하류 적재를 막음
        df.write.mode("overwrite").save("quarantine/...")
        alert(result)                       # 알림
        raise QualityGateError(result)      # 파이프라인 중단
 
    df.writeTo("analytics.events").append()  # 통과 시에만 적재
[변환] → [품질 게이트] ──통과──> [프로덕션 테이블]

              └──실패──> [격리 + 알림 + 중단]

핵심 철학: 의심스러운 데이터를 조용히 흘려보내는 것보다, 멈추고 알리는 것이 낫습니다. 잘못된 데이터가 대시보드·모델에 들어가면 복구 비용이 훨씬 큽니다.

7. 검증을 어디에 둘까

위치검증
수집 직후(raw)소스 스키마·완전성·인코딩
변환 후(curated)비즈니스 규칙·일관성
적재 전(품질 게이트)최종 제약·이상 탐지
적재 후(모니터링)추세·분포 변화

레이어마다 검증을 두되, 적재 전 게이트를 가장 엄격하게 합니다.

8. 정리

영역핵심
직접 검증한 번의 agg 로 여러 메트릭
선언적 프레임워크Deequ/PyDeequ 제약
이상 탐지이력 대비 분포 변화(3σ)
품질 게이트통과해야 적재, 실패는 격리+중단
배치레이어별 검증, 적재 전 가장 엄격

데이터 품질 검증의 핵심 통찰은 "잡이 성공해도 데이터는 틀릴 수 있다"는 것입니다. 정적 제약(NULL·범위·유일성)으로 명백한 오류를 잡고, 이력 대비 이상 탐지로 은밀한 분포 변화를 포착하며, 품질 게이트로 나쁜 데이터의 하류 전파를 차단하세요. "조용히 틀린 데이터"라는 가장 비싼 버그는, 검증을 파이프라인의 일부로 만들 때 비로소 막을 수 있습니다.


이 글은 Spark 3.5 + PyDeequ 기준으로 작성되었습니다. 대규모 데이터 품질 검증·모니터링 체계 구축이 필요하시면 언제든 문의해 주세요.

— Data Dynamics 엔지니어링 팀