PySpark UDF가 느린 이유와 Pandas UDF — 10배 빠르게 만들기
Python UDF 하나가 Spark 잡 전체를 느리게 만드는 진짜 이유(직렬화·행 단위 처리)를 파헤치고, Arrow 기반 벡터화된 Pandas UDF / applyInPandas / mapInPandas 로 성능을 끌어올리는 법, 그리고 UDF 자체를 피하는 전략을 정리합니다.
PySpark 잡을 프로파일링하면, 전체 시간의 80%를 UDF 하나가 잡아먹는 경우가 흔합니다. "파이썬 함수 하나 적용했을 뿐인데" 잡이 10배 느려집니다. 원인은 명확합니다 — 일반 Python UDF 는 행 단위로, JVM 과 Python 사이를 직렬화하며 돕니다.
이 글은 Python UDF 가 왜 느린지 구조적으로 설명하고, Arrow 기반 벡터화된 Pandas UDF 로 성능을 끌어올리는 법, 그리고 가장 좋은 해법인 "UDF 자체를 피하기"를 정리합니다.
1. 왜 Python UDF 는 느린가
Spark 는 JVM 위에서 돕니다. Python UDF 를 만나면 데이터가 JVM → Python → JVM 으로 오가야 합니다.
[일반 Python UDF] 행 하나마다:
JVM 의 행 ──(직렬화/피클)──> Python 워커 ──함수 적용── ──(직렬화)──> JVM
→ 1억 행이면 1억 번의 직렬화 왕복 + 행 단위 파이썬 인터프리터 호출세 가지 비용이 겹칩니다.
| 비용 | 설명 |
|---|---|
| 직렬화 | 행마다 JVM↔Python 피클 직렬화/역직렬화 |
| 행 단위 호출 | 파이썬 인터프리터를 행마다 호출(벡터화 없음) |
| 옵티마이저 블랙박스 | Catalyst 가 UDF 내부를 못 봄 → pushdown·최적화 불가 |
내장 함수(F.col, F.when, …)는 JVM 안에서 컬럼 단위로 실행되어 이 비용이 전혀 없습니다. 그래서 첫 번째 원칙은 "내장 함수로 되면 UDF 쓰지 말 것" 입니다.
2. 1순위 해법 — 내장 함수로 대체
많은 UDF 는 사실 내장 함수 조합으로 표현됩니다.
from pyspark.sql import functions as F
# BAD: Python UDF
@F.udf("string")
def grade(score):
if score >= 90: return "A"
elif score >= 80: return "B"
else: return "C"
df = df.withColumn("g", grade("score"))
# GOOD: 내장 when/otherwise (JVM 내 실행, 수배~수십배 빠름)
df = df.withColumn("g",
F.when(F.col("score") >= 90, "A")
.when(F.col("score") >= 80, "B")
.otherwise("C"))문자열·날짜·정규식·JSON 가공은 거의 다 내장 함수가 있습니다(regexp_replace, split, from_json, date_format, transform, filter 등). UDF 를 쓰기 전에 내장 함수를 먼저 찾으세요.
3. 2순위 해법 — Pandas UDF (벡터화)
내장으로 안 되는 진짜 커스텀 로직(복잡한 파싱, ML 추론, 특수 계산)은 Pandas UDF 를 씁니다. Apache Arrow 로 데이터를 배치(컬럼) 단위로 한 번에 넘겨, 직렬화·인터프리터 비용을 없앱니다.
[Pandas UDF] 배치 단위:
JVM 컬럼 ──(Arrow, 제로카피에 가까움)──> Python: pandas.Series 로 벡터 연산 ──> JVM
→ 행 단위 왕복이 사라지고, pandas/numpy 의 벡터화 속도로 처리먼저 Arrow 를 켭니다.
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")Series → Series (스칼라 Pandas UDF)
가장 흔한 형태. 입력 컬럼을 pd.Series 로 받아 벡터 연산.
import pandas as pd
from pyspark.sql.functions import pandas_udf
@pandas_udf("double")
def celsius_to_f(c: pd.Series) -> pd.Series:
return c * 9 / 5 + 32 # pandas 벡터 연산, 루프 없음
df = df.withColumn("temp_f", celsius_to_f("temp_c"))여러 컬럼 입력
@pandas_udf("double")
def weighted(a: pd.Series, b: pd.Series, w: pd.Series) -> pd.Series:
return (a * w + b * (1 - w))
df = df.withColumn("score", weighted("x", "y", "w"))| 일반 UDF | Pandas UDF | |
|---|---|---|
| 데이터 전달 | 행 단위 피클 | 배치 단위 Arrow |
| 연산 | 파이썬 루프 | pandas/numpy 벡터 |
| 전형적 속도 | 기준 | 수배~수십배 빠름 |
4. applyInPandas — 그룹별 pandas 처리
"그룹마다 pandas DataFrame 으로 받아 자유롭게 처리"가 필요할 때(그룹별 정규화, 시계열 보간, 그룹별 모델 적용) 강력합니다.
from pyspark.sql.types import StructType, StructField, DoubleType, StringType
schema = StructType([
StructField("user_id", StringType()),
StructField("z_score", DoubleType()),
])
def normalize(pdf: pd.DataFrame) -> pd.DataFrame:
# pdf 는 한 그룹 전체 (pandas DataFrame)
pdf["z_score"] = (pdf["amount"] - pdf["amount"].mean()) / pdf["amount"].std()
return pdf[["user_id", "z_score"]]
result = df.groupBy("user_id").applyInPandas(normalize, schema=schema)주의:
applyInPandas는 그룹 하나가 한 익스큐터 메모리에 다 들어가야 합니다. 그룹이 거대하면(스큐) OOM 이 납니다. 그룹 크기를 확인하고, 큰 그룹은 사전 분할하세요.
5. mapInPandas — 파티션 단위 스트리밍 처리
그룹이 아니라 파티션을 pandas DataFrame 배치들의 이터레이터로 받아 처리합니다. 전체를 메모리에 안 올려도 되어, 큰 데이터에 ML 추론을 태울 때 적합합니다.
from typing import Iterator
def predict(batches: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:
model = load_model() # 파티션당 1회 로드 (행마다 아님!)
for pdf in batches:
pdf["pred"] = model.predict(pdf[features])
yield pdf
result = df.mapInPandas(predict, schema="... pred double")mapInPandas 의 큰 장점은 모델·커넥션 같은 무거운 객체를 배치 루프 밖에서 한 번만 초기화할 수 있다는 것입니다. 일반 UDF 로 추론하면 행마다 모델을 들고 다녀 비효율적입니다. (대규모 배치 추론 패턴은 별도 글 "Spark + LLM 연동 가이드"에서도 다룹니다.)
6. 어떤 걸 언제 쓰나
내장 함수로 표현 가능?
├─ 예 → 내장 함수 (항상 1순위)
└─ 아니오 →
컬럼 → 컬럼 변환? → pandas_udf (Series→Series)
그룹별 DataFrame 처리? → applyInPandas
파티션 스트리밍/무거운 초기화? → mapInPandas
정말 행 단위 단순 로직? → (최후) 일반 UDF| 방식 | 입력 단위 | 적합 |
|---|---|---|
| 내장 함수 | 컬럼 | 대부분의 변환 |
pandas_udf | Series(배치) | 커스텀 컬럼 변환 |
applyInPandas | 그룹 DataFrame | 그룹별 복잡 로직 |
mapInPandas | 파티션 이터레이터 | 추론·무거운 초기화 |
| 일반 UDF | 행 | 최후의 수단 |
7. UDF 사용 시 추가 주의
- WHERE 절에 UDF 금지: UDF 로 필터하면 Catalyst 가 못 들여다봐 pushdown·파티션 프루닝이 깨집니다. 필터는 내장 표현식으로, UDF 는
select(투영) 단계에서. - Arrow 활성화 확인:
spark.sql.execution.arrow.pyspark.enabled=true가 꺼져 있으면 Pandas UDF 도 느려집니다. - 타입 일치: UDF 반환 타입과 선언 스키마가 어긋나면 조용히 NULL 이 되거나 에러. 명시적으로.
- Python 메모리: Pandas UDF 는 Python 워커 메모리를 씁니다. 배치가 크면 overhead OOM 위험(별도 글 "PySpark Executor OOM 정복" 참고).
- null 처리: pandas 연산에서 NaN/None 처리를 명시하지 않으면 결과가 틀어질 수 있습니다.
8. 정리
| 원칙 | 내용 |
|---|---|
| 1순위 | 내장 함수로 대체 (UDF 회피) |
| 2순위 | Pandas UDF (Arrow 벡터화) |
| 그룹 로직 | applyInPandas (그룹 OOM 주의) |
| 추론/무거운 초기화 | mapInPandas |
| 금기 | WHERE 절 UDF, Arrow 비활성 |
PySpark 성능 문제의 단골 원인은 "당연하게 쓴 Python UDF" 입니다. 핵심 통찰은 두 가지 — Spark 는 JVM 엔진이므로 행 단위 Python 왕복이 가장 비싸고, 이를 없애는 길은 내장 함수(JVM 내) 아니면 Arrow 벡터화(배치 단위) 라는 것입니다. "내장으로 되나? 안 되면 Pandas UDF" 한 줄만 습관이 되어도, 잡의 절반이 빨라지는 경험을 하게 됩니다.
이 글은 Spark 3.5 + PyArrow 기준으로 작성되었습니다. PySpark 파이프라인의 UDF 성능 개선이나 대규모 추론 최적화가 필요하시면 언제든 문의해 주세요.
— Data Dynamics 엔지니어링 팀