Blog
pysparksparktestingpytestdata-qualitydata-engineering

PySpark 코드 테스트 — 데이터 파이프라인을 회귀로부터 지키기

PySpark 변환 로직을 어떻게 단위 테스트하는가. SparkSession 픽스처, DataFrame 동등성 비교(chispa/내장 assert), 변환 함수 설계, 스키마·엣지 케이스 테스트, 데이터 품질 검증, 그리고 CI 통합까지 실전 패턴을 정리합니다.

Data Dynamics2026년 6월 5일9 min read

"데이터 파이프라인은 테스트하기 어렵다"는 통념 때문에, 많은 PySpark 코드가 테스트 없이 프로덕션에 올라갑니다. 그러다 누군가 컬럼 하나를 바꾸면 조용히 잘못된 데이터가 흘러가고, 몇 주 뒤 대시보드가 틀어진 걸 발견합니다. PySpark 도 충분히 테스트할 수 있습니다. 핵심은 변환 로직을 테스트 가능하게 설계하는 것입니다.

이 글은 SparkSession 픽스처, DataFrame 동등성 비교, 변환 함수 설계, 엣지 케이스·스키마 테스트, 그리고 CI 통합까지 PySpark 테스트의 실전 패턴을 정리합니다.

1. 테스트 가능한 설계 — 변환을 함수로 분리

테스트의 90%는 코드 구조에서 결정됩니다. 읽기·쓰기(I/O)와 변환 로직을 분리하면, 변환을 순수 함수로 테스트할 수 있습니다.

# BAD: I/O 와 로직이 뒤섞여 테스트 불가
def job():
    df = spark.read.parquet("s3://...")
    result = df.filter(...).groupBy(...).agg(...)   # 로직이 여기 묶임
    result.write.parquet("s3://...")
 
# GOOD: 변환을 순수 함수로 분리
def transform(df):                    # 입력 DataFrame → 출력 DataFrame
    return (df.filter(F.col("amount") > 0)
              .groupBy("user").agg(F.sum("amount").alias("total")))
 
def job():
    df = spark.read.parquet("s3://...")
    transform(df).write.parquet("s3://...")   # I/O 는 얇게

transform(df) 처럼 DataFrame 을 받아 DataFrame 을 반환하는 함수는 작은 입력으로 쉽게 테스트됩니다.

2. SparkSession 픽스처

테스트용 로컬 SparkSession 을 한 번 만들어 재사용합니다(pytest 픽스처).

# conftest.py
import pytest
from pyspark.sql import SparkSession
 
@pytest.fixture(scope="session")
def spark():
    spark = (SparkSession.builder
        .master("local[2]")                         # 로컬 2코어
        .appName("tests")
        .config("spark.sql.shuffle.partitions", "2") # 테스트는 작게 → 빠르게
        .config("spark.ui.enabled", "false")
        .getOrCreate())
    yield spark
    spark.stop()

팁: 테스트에서 spark.sql.shuffle.partitions 를 2 정도로 낮추세요. 기본 200 이면 작은 테스트 데이터에도 200개 빈 파티션을 만들어 느려집니다.

3. DataFrame 동등성 비교

테스트의 핵심은 "출력 DataFrame 이 기대값과 같은가"입니다. 단순 == 로는 안 되고, 전용 비교가 필요합니다.

내장 assert (Spark 3.5+)

from pyspark.testing import assertDataFrameEqual
 
def test_transform(spark):
    input_df = spark.createDataFrame(
        [("a", 100), ("a", 50), ("b", 30), ("b", -10)],  # -10 은 필터돼야
        ["user", "amount"])
 
    expected = spark.createDataFrame(
        [("a", 150), ("b", 30)], ["user", "total"])
 
    result = transform(input_df)
    assertDataFrameEqual(result, expected)   # 순서 무관 비교

Spark 3.5+ 는 pyspark.testing.assertDataFrameEqual 을 내장합니다. 행 순서·부동소수 허용오차까지 다룹니다.

chispa (널리 쓰이는 서드파티)

from chispa import assert_df_equality
 
assert_df_equality(result, expected, ignore_row_order=True, ignore_nullable=True)
도구특징
pyspark.testing (내장)3.5+, 의존성 없음
chispa풍부한 옵션, 친절한 diff 출력

4. 엣지 케이스 — 진짜 버그가 숨는 곳

정상 케이스보다 경계 조건에서 버그가 나옵니다. 반드시 테스트할 것:

def test_empty_input(spark):
    # 빈 DataFrame 도 깨지지 않아야
    empty = spark.createDataFrame([], "user string, amount long")
    result = transform(empty)
    assert result.count() == 0
 
def test_nulls(spark):
    # NULL 처리 — 집계·필터에서 NULL 이 어떻게 되는가
    df = spark.createDataFrame([("a", None), ("a", 100)], "user string, amount long")
    result = transform(df)
    # 기대 동작을 명시적으로 검증
 
def test_negative_and_zero(spark):
    # 경계값: 0, 음수
    ...
엣지 케이스
빈 입력빈 데이터에 집계/조인 시 깨짐
NULL 값집계·비교에서 NULL 전파
중복 키dedup/조인 로직
경계값(0, 음수, 최대)off-by-one, 부호
단일 행 / 단일 그룹window·집계

5. 스키마 테스트

데이터뿐 아니라 출력 스키마도 계약입니다. 컬럼 이름·타입이 바뀌면 하류가 깨집니다.

def test_output_schema(spark):
    input_df = spark.createDataFrame([("a", 100)], ["user", "amount"])
    result = transform(input_df)
 
    expected_schema = ["user", "total"]
    assert result.columns == expected_schema
    assert dict(result.dtypes)["total"] == "bigint"

6. 데이터 품질 검증 — 프로덕션 가드

단위 테스트와 별개로, 프로덕션 데이터에 대한 런타임 품질 검증을 파이프라인에 넣습니다.

def assert_quality(df):
    total = df.count()
    # 핵심 컬럼 NULL 없어야
    null_ids = df.filter(F.col("user").isNull()).count()
    assert null_ids == 0, f"user NULL {null_ids}건"
    # 키 유일성
    assert df.select("user").distinct().count() == total, "user 중복"
    # 값 범위
    assert df.filter(F.col("total") < 0).count() == 0, "음수 total"

규모가 커지면 Great Expectationsdbt 테스트 같은 전용 프레임워크로 선언적 품질 규칙을 관리합니다. (AI 기반 품질 검증은 별도 글 "AI 기반 데이터 파이프라인 자동화"에서도 다룹니다.)

7. CI 통합

테스트를 CI 에서 자동 실행해 회귀를 막습니다.

# 예: GitHub Actions 개념
steps:
  - uses: actions/setup-java@v4   # Spark 는 JVM 필요
    with: { java-version: '17' }
  - run: pip install pyspark chispa pytest
  - run: pytest tests/ -v
고려지침
JVMSpark 는 Java 필요(CI 에 설치)
속도local[2], shuffle 파티션↓, 작은 데이터
격리SparkSession 픽스처 재사용
통합 테스트일부만 실제 I/O(샘플 파일)로

8. 테스트 피라미드

        ▲  통합 테스트 (실제 읽기/쓰기, 소수)
       ───
      ─────  변환 단위 테스트 (transform 함수, 다수)  ← 여기에 집중
     ───────
    ─────────  순수 헬퍼 함수 (Spark 불필요, 가장 빠름)

대부분의 테스트는 변환 함수 단위에 두고, 실제 I/O 통합 테스트는 최소한으로. Spark 가 필요 없는 순수 로직은 분리해 더 빠르게 테스트합니다.

9. 정리

영역핵심
설계I/O 와 변환 분리 → transform(df) 순수 함수
픽스처로컬 SparkSession, shuffle 파티션↓
비교assertDataFrameEqual / chispa
엣지빈 입력·NULL·중복·경계값
스키마컬럼·타입도 계약으로 검증
품질런타임 가드 + Great Expectations
CIJVM 설치, 작게 빠르게

PySpark 테스트의 핵심 통찰은 "변환 로직을 DataFrame→DataFrame 순수 함수로 분리하면 테스트가 쉬워진다"는 것입니다. I/O 에서 로직을 떼어내고, 작은 입력으로 동등성을 비교하며, NULL·빈 입력 같은 엣지 케이스를 명시적으로 검증하면 — 데이터 파이프라인도 일반 소프트웨어처럼 회귀로부터 보호할 수 있습니다. 테스트 없는 파이프라인은 "조용히 틀린 데이터"라는 가장 비싼 버그를 키운다는 점을 기억하세요.


이 글은 Spark 3.5 기준으로 작성되었습니다. 데이터 파이프라인의 테스트 체계나 품질 검증 자동화 구축이 필요하시면 언제든 문의해 주세요.

— Data Dynamics 엔지니어링 팀