PySpark 코드 테스트 — 데이터 파이프라인을 회귀로부터 지키기
PySpark 변환 로직을 어떻게 단위 테스트하는가. SparkSession 픽스처, DataFrame 동등성 비교(chispa/내장 assert), 변환 함수 설계, 스키마·엣지 케이스 테스트, 데이터 품질 검증, 그리고 CI 통합까지 실전 패턴을 정리합니다.
"데이터 파이프라인은 테스트하기 어렵다"는 통념 때문에, 많은 PySpark 코드가 테스트 없이 프로덕션에 올라갑니다. 그러다 누군가 컬럼 하나를 바꾸면 조용히 잘못된 데이터가 흘러가고, 몇 주 뒤 대시보드가 틀어진 걸 발견합니다. PySpark 도 충분히 테스트할 수 있습니다. 핵심은 변환 로직을 테스트 가능하게 설계하는 것입니다.
이 글은 SparkSession 픽스처, DataFrame 동등성 비교, 변환 함수 설계, 엣지 케이스·스키마 테스트, 그리고 CI 통합까지 PySpark 테스트의 실전 패턴을 정리합니다.
1. 테스트 가능한 설계 — 변환을 함수로 분리
테스트의 90%는 코드 구조에서 결정됩니다. 읽기·쓰기(I/O)와 변환 로직을 분리하면, 변환을 순수 함수로 테스트할 수 있습니다.
# BAD: I/O 와 로직이 뒤섞여 테스트 불가
def job():
df = spark.read.parquet("s3://...")
result = df.filter(...).groupBy(...).agg(...) # 로직이 여기 묶임
result.write.parquet("s3://...")
# GOOD: 변환을 순수 함수로 분리
def transform(df): # 입력 DataFrame → 출력 DataFrame
return (df.filter(F.col("amount") > 0)
.groupBy("user").agg(F.sum("amount").alias("total")))
def job():
df = spark.read.parquet("s3://...")
transform(df).write.parquet("s3://...") # I/O 는 얇게transform(df) 처럼 DataFrame 을 받아 DataFrame 을 반환하는 함수는 작은 입력으로 쉽게 테스트됩니다.
2. SparkSession 픽스처
테스트용 로컬 SparkSession 을 한 번 만들어 재사용합니다(pytest 픽스처).
# conftest.py
import pytest
from pyspark.sql import SparkSession
@pytest.fixture(scope="session")
def spark():
spark = (SparkSession.builder
.master("local[2]") # 로컬 2코어
.appName("tests")
.config("spark.sql.shuffle.partitions", "2") # 테스트는 작게 → 빠르게
.config("spark.ui.enabled", "false")
.getOrCreate())
yield spark
spark.stop()팁: 테스트에서
spark.sql.shuffle.partitions를 2 정도로 낮추세요. 기본 200 이면 작은 테스트 데이터에도 200개 빈 파티션을 만들어 느려집니다.
3. DataFrame 동등성 비교
테스트의 핵심은 "출력 DataFrame 이 기대값과 같은가"입니다. 단순 == 로는 안 되고, 전용 비교가 필요합니다.
내장 assert (Spark 3.5+)
from pyspark.testing import assertDataFrameEqual
def test_transform(spark):
input_df = spark.createDataFrame(
[("a", 100), ("a", 50), ("b", 30), ("b", -10)], # -10 은 필터돼야
["user", "amount"])
expected = spark.createDataFrame(
[("a", 150), ("b", 30)], ["user", "total"])
result = transform(input_df)
assertDataFrameEqual(result, expected) # 순서 무관 비교Spark 3.5+ 는 pyspark.testing.assertDataFrameEqual 을 내장합니다. 행 순서·부동소수 허용오차까지 다룹니다.
chispa (널리 쓰이는 서드파티)
from chispa import assert_df_equality
assert_df_equality(result, expected, ignore_row_order=True, ignore_nullable=True)| 도구 | 특징 |
|---|---|
pyspark.testing (내장) | 3.5+, 의존성 없음 |
chispa | 풍부한 옵션, 친절한 diff 출력 |
4. 엣지 케이스 — 진짜 버그가 숨는 곳
정상 케이스보다 경계 조건에서 버그가 나옵니다. 반드시 테스트할 것:
def test_empty_input(spark):
# 빈 DataFrame 도 깨지지 않아야
empty = spark.createDataFrame([], "user string, amount long")
result = transform(empty)
assert result.count() == 0
def test_nulls(spark):
# NULL 처리 — 집계·필터에서 NULL 이 어떻게 되는가
df = spark.createDataFrame([("a", None), ("a", 100)], "user string, amount long")
result = transform(df)
# 기대 동작을 명시적으로 검증
def test_negative_and_zero(spark):
# 경계값: 0, 음수
...| 엣지 케이스 | 왜 |
|---|---|
| 빈 입력 | 빈 데이터에 집계/조인 시 깨짐 |
| NULL 값 | 집계·비교에서 NULL 전파 |
| 중복 키 | dedup/조인 로직 |
| 경계값(0, 음수, 최대) | off-by-one, 부호 |
| 단일 행 / 단일 그룹 | window·집계 |
5. 스키마 테스트
데이터뿐 아니라 출력 스키마도 계약입니다. 컬럼 이름·타입이 바뀌면 하류가 깨집니다.
def test_output_schema(spark):
input_df = spark.createDataFrame([("a", 100)], ["user", "amount"])
result = transform(input_df)
expected_schema = ["user", "total"]
assert result.columns == expected_schema
assert dict(result.dtypes)["total"] == "bigint"6. 데이터 품질 검증 — 프로덕션 가드
단위 테스트와 별개로, 프로덕션 데이터에 대한 런타임 품질 검증을 파이프라인에 넣습니다.
def assert_quality(df):
total = df.count()
# 핵심 컬럼 NULL 없어야
null_ids = df.filter(F.col("user").isNull()).count()
assert null_ids == 0, f"user NULL {null_ids}건"
# 키 유일성
assert df.select("user").distinct().count() == total, "user 중복"
# 값 범위
assert df.filter(F.col("total") < 0).count() == 0, "음수 total"규모가 커지면 Great Expectations 나 dbt 테스트 같은 전용 프레임워크로 선언적 품질 규칙을 관리합니다. (AI 기반 품질 검증은 별도 글 "AI 기반 데이터 파이프라인 자동화"에서도 다룹니다.)
7. CI 통합
테스트를 CI 에서 자동 실행해 회귀를 막습니다.
# 예: GitHub Actions 개념
steps:
- uses: actions/setup-java@v4 # Spark 는 JVM 필요
with: { java-version: '17' }
- run: pip install pyspark chispa pytest
- run: pytest tests/ -v| 고려 | 지침 |
|---|---|
| JVM | Spark 는 Java 필요(CI 에 설치) |
| 속도 | local[2], shuffle 파티션↓, 작은 데이터 |
| 격리 | SparkSession 픽스처 재사용 |
| 통합 테스트 | 일부만 실제 I/O(샘플 파일)로 |
8. 테스트 피라미드
▲ 통합 테스트 (실제 읽기/쓰기, 소수)
───
───── 변환 단위 테스트 (transform 함수, 다수) ← 여기에 집중
───────
───────── 순수 헬퍼 함수 (Spark 불필요, 가장 빠름)대부분의 테스트는 변환 함수 단위에 두고, 실제 I/O 통합 테스트는 최소한으로. Spark 가 필요 없는 순수 로직은 분리해 더 빠르게 테스트합니다.
9. 정리
| 영역 | 핵심 |
|---|---|
| 설계 | I/O 와 변환 분리 → transform(df) 순수 함수 |
| 픽스처 | 로컬 SparkSession, shuffle 파티션↓ |
| 비교 | assertDataFrameEqual / chispa |
| 엣지 | 빈 입력·NULL·중복·경계값 |
| 스키마 | 컬럼·타입도 계약으로 검증 |
| 품질 | 런타임 가드 + Great Expectations |
| CI | JVM 설치, 작게 빠르게 |
PySpark 테스트의 핵심 통찰은 "변환 로직을 DataFrame→DataFrame 순수 함수로 분리하면 테스트가 쉬워진다"는 것입니다. I/O 에서 로직을 떼어내고, 작은 입력으로 동등성을 비교하며, NULL·빈 입력 같은 엣지 케이스를 명시적으로 검증하면 — 데이터 파이프라인도 일반 소프트웨어처럼 회귀로부터 보호할 수 있습니다. 테스트 없는 파이프라인은 "조용히 틀린 데이터"라는 가장 비싼 버그를 키운다는 점을 기억하세요.
이 글은 Spark 3.5 기준으로 작성되었습니다. 데이터 파이프라인의 테스트 체계나 품질 검증 자동화 구축이 필요하시면 언제든 문의해 주세요.
— Data Dynamics 엔지니어링 팀