PySpark 코드 테스트 — 데이터 파이프라인을 회귀로부터 지키기
PySpark 변환 로직을 어떻게 단위 테스트하는가. SparkSession 픽스처, DataFrame 동등성 비교(chispa/내장 assert), 변환 함수 설계, 스키마·엣지 케이스 테스트, 데이터 품질 검증, 그리고 CI 통합까지 실전 패턴을 정리합니다.
어느 날 동료가 컬럼 이름을 하나 바꿉니다. 아무도 모르게. 그로부터 3주 뒤, 경영진이 대시보드 숫자가 이상하다고 합니다. 원인을 추적해 보니 그 컬럼 변경이 파이프라인 끝까지 조용히 흘러들어간 것이었죠. "데이터 파이프라인은 테스트하기 어렵다"는 통념이 이런 사태를 만듭니다. 하지만 PySpark 도 충분히 테스트할 수 있습니다. 핵심은 변환 로직을 테스트 가능하게 설계하는 것입니다.
이 글에서 배우는 것
- I/O 와 변환 로직을 분리해 테스트 가능한 함수로 만드는 법
- 로컬 SparkSession 픽스처를 pytest 에 연결하는 방법
- DataFrame 동등성 비교 도구(
assertDataFrameEqual,chispa) 사용법- NULL·빈 입력·경계값 등 진짜 버그가 숨는 엣지 케이스 테스트
- 스키마 계약 검증과 CI 자동화까지 실전 패턴
1. 테스트 가능한 설계 — 변환을 함수로 분리
테스트의 90%는 코드 구조에서 결정됩니다. 읽기·쓰기(I/O)와 변환 로직을 분리하면, 변환을 순수 함수로 테스트할 수 있습니다.
# BAD: I/O 와 로직이 뒤섞여 테스트 불가
def job():
df = spark.read.parquet("s3://...")
result = df.filter(...).groupBy(...).agg(...) # 로직이 여기 묶임
result.write.parquet("s3://...")
# GOOD: 변환을 순수 함수로 분리
def transform(df): # 입력 DataFrame → 출력 DataFrame
return (df.filter(F.col("amount") > 0)
.groupBy("user").agg(F.sum("amount").alias("total")))
def job():
df = spark.read.parquet("s3://...")
transform(df).write.parquet("s3://...") # I/O 는 얇게transform(df) 처럼 DataFrame 을 받아 DataFrame 을 반환하는 함수는 작은 입력으로 쉽게 테스트됩니다.
한 문장으로: 테스트하기 어려운 코드는 설계가 잘못된 것입니다.
2. SparkSession 픽스처
테스트마다 SparkSession 을 새로 띄우면 시간이 너무 많이 걸립니다. 한 번 만들어 세션 전체에서 재사용하는 pytest 픽스처를 conftest.py 에 두면 됩니다.
# conftest.py
import pytest
from pyspark.sql import SparkSession
@pytest.fixture(scope="session")
def spark():
spark = (SparkSession.builder
.master("local[2]") # 로컬 2코어
.appName("tests")
.config("spark.sql.shuffle.partitions", "2") # 테스트는 작게 → 빠르게
.config("spark.ui.enabled", "false")
.getOrCreate())
yield spark
spark.stop()팁: 테스트에서
spark.sql.shuffle.partitions를 2 정도로 낮추세요. 기본 200 이면 작은 테스트 데이터에도 200개 빈 파티션을 만들어 느려집니다.
3. DataFrame 동등성 비교
테스트의 핵심은 "출력 DataFrame 이 기대값과 같은가"입니다. 단순 == 로는 안 되고, 전용 비교가 필요합니다.
내장 assert (Spark 3.5+)
Spark 3.5+ 를 사용하고 있다면 별도 라이브러리 없이 내장 함수로 바로 비교할 수 있습니다.
from pyspark.testing import assertDataFrameEqual
def test_transform(spark):
input_df = spark.createDataFrame(
[("a", 100), ("a", 50), ("b", 30), ("b", -10)], # -10 은 필터돼야
["user", "amount"])
expected = spark.createDataFrame(
[("a", 150), ("b", 30)], ["user", "total"])
result = transform(input_df)
assertDataFrameEqual(result, expected) # 순서 무관 비교Spark 3.5+ 는 pyspark.testing.assertDataFrameEqual 을 내장합니다. 행 순서·부동소수 허용오차까지 다룹니다.
chispa (널리 쓰이는 서드파티)
diff 출력이 더 친절하고 옵션이 풍부한 chispa 도 많이 쓰입니다. 실패 시 어느 행이 다른지 바로 보여줘서 디버깅이 편합니다.
from chispa import assert_df_equality
assert_df_equality(result, expected, ignore_row_order=True, ignore_nullable=True)| 도구 | 특징 |
|---|---|
pyspark.testing (내장) | 3.5+, 의존성 없음 |
chispa | 풍부한 옵션, 친절한 diff 출력 |
4. 엣지 케이스 — 진짜 버그가 숨는 곳
정상 케이스보다 경계 조건에서 버그가 나옵니다. 반드시 테스트할 것:
def test_empty_input(spark):
# 빈 DataFrame 도 깨지지 않아야
empty = spark.createDataFrame([], "user string, amount long")
result = transform(empty)
assert result.count() == 0
def test_nulls(spark):
# NULL 처리 — 집계·필터에서 NULL 이 어떻게 되는가
df = spark.createDataFrame([("a", None), ("a", 100)], "user string, amount long")
result = transform(df)
# 기대 동작을 명시적으로 검증
def test_negative_and_zero(spark):
# 경계값: 0, 음수
...| 엣지 케이스 | 왜 |
|---|---|
| 빈 입력 | 빈 데이터에 집계/조인 시 깨짐 |
| NULL 값 | 집계·비교에서 NULL 전파 |
| 중복 키 | dedup/조인 로직 |
| 경계값(0, 음수, 최대) | off-by-one, 부호 |
| 단일 행 / 단일 그룹 | window·집계 |
직관: "정상 데이터로만 테스트하면, 비정상 데이터가 프로덕션에서 테스터 역할을 합니다."
5. 스키마 테스트
여러분의 변환 함수는 특정 컬럼 이름과 타입을 출력한다는 계약을 맺고 있습니다. 데이터뿐 아니라 출력 스키마도 계약입니다. 컬럼 이름·타입이 바뀌면 하류가 깨집니다.
def test_output_schema(spark):
input_df = spark.createDataFrame([("a", 100)], ["user", "amount"])
result = transform(input_df)
expected_schema = ["user", "total"]
assert result.columns == expected_schema
assert dict(result.dtypes)["total"] == "bigint"6. 데이터 품질 검증 — 프로덕션 가드
단위 테스트는 로직의 정확성을 보장하지만, 실제 운영 데이터는 테스트 데이터와 다를 수 있습니다. 단위 테스트와 별개로, 프로덕션 데이터에 대한 런타임 품질 검증을 파이프라인에 넣습니다.
def assert_quality(df):
total = df.count()
# 핵심 컬럼 NULL 없어야
null_ids = df.filter(F.col("user").isNull()).count()
assert null_ids == 0, f"user NULL {null_ids}건"
# 키 유일성
assert df.select("user").distinct().count() == total, "user 중복"
# 값 범위
assert df.filter(F.col("total") < 0).count() == 0, "음수 total"규모가 커지면 Great Expectations 나 dbt 테스트 같은 전용 프레임워크로 선언적 품질 규칙을 관리합니다. (AI 기반 품질 검증은 별도 글 "AI 기반 데이터 파이프라인 자동화"에서도 다룹니다.)
7. CI 통합
테스트가 로컬에만 존재한다면 "커밋하기 전에 돌려야지"라는 다짐은 오래가지 않습니다. 테스트를 CI 에서 자동 실행해 회귀를 막아 두면 됩니다.
# 예: GitHub Actions 개념
steps:
- uses: actions/setup-java@v4 # Spark 는 JVM 필요
with: { java-version: '17' }
- run: pip install pyspark chispa pytest
- run: pytest tests/ -v| 고려 | 지침 |
|---|---|
| JVM | Spark 는 Java 필요(CI 에 설치) |
| 속도 | local[2], shuffle 파티션↓, 작은 데이터 |
| 격리 | SparkSession 픽스처 재사용 |
| 통합 테스트 | 일부만 실제 I/O(샘플 파일)로 |
8. 테스트 피라미드
피라미드의 어느 층에 테스트를 집중해야 할지 헷갈린다면 아래를 참고하세요. 대부분의 테스트는 중간 층, 즉 변환 함수 단위에 둡니다.
대부분의 테스트는 변환 함수 단위에 두고, 실제 I/O 통합 테스트는 최소한으로. Spark 가 필요 없는 순수 로직은 분리해 더 빠르게 테스트합니다.
9. 정리
| 영역 | 핵심 |
|---|---|
| 설계 | I/O 와 변환 분리 → transform(df) 순수 함수 |
| 픽스처 | 로컬 SparkSession, shuffle 파티션↓ |
| 비교 | assertDataFrameEqual / chispa |
| 엣지 | 빈 입력·NULL·중복·경계값 |
| 스키마 | 컬럼·타입도 계약으로 검증 |
| 품질 | 런타임 가드 + Great Expectations |
| CI | JVM 설치, 작게 빠르게 |
PySpark 테스트의 핵심 통찰은 "변환 로직을 DataFrame→DataFrame 순수 함수로 분리하면 테스트가 쉬워진다"는 것입니다. I/O 에서 로직을 떼어내고, 작은 입력으로 동등성을 비교하며, NULL·빈 입력 같은 엣지 케이스를 명시적으로 검증하면 — 데이터 파이프라인도 일반 소프트웨어처럼 회귀로부터 보호할 수 있습니다. 테스트 없는 파이프라인은 "조용히 틀린 데이터"라는 가장 비싼 버그를 키운다는 점을 기억하세요.
마치며 — 핵심 요약
- 설계가 먼저입니다. I/O 와 변환 로직을 분리하는 것만으로 테스트 가능성이 크게 올라갑니다.
- SparkSession 픽스처 하나로 모든 테스트가 공유하면, 시작 비용을 한 번만 치릅니다.
- DataFrame 비교는
assertDataFrameEqual또는chispa를 쓰세요. 행 순서, 부동소수, nullable 차이까지 알아서 처리해 줍니다. - 엣지 케이스(빈 입력, NULL, 중복, 경계값)가 프로덕션 장애의 대부분을 만듭니다. 명시적으로 테스트하세요.
- 스키마도 계약입니다. 컬럼 이름·타입 변경이 조용히 하류를 깨는 일을 막으세요.
- CI 에 연결하면 테스트는 자동으로 여러분의 코드를 지켜줍니다.
잘 설계된 테스트 스위트는 팀 전체가 더 빠르고 대담하게 변경할 수 있게 해 줍니다 — 파이프라인이 복잡해질수록 그 가치는 더 커집니다.
이 글은 Spark 3.5 기준으로 작성되었습니다. 데이터 파이프라인의 테스트 체계나 품질 검증 자동화 구축이 필요하시면 언제든 문의해 주세요.
— Data Dynamics 엔지니어링 팀