Blog
pysparksparkpandas-udfarrowperformancedata-engineering

PySpark UDF가 느린 이유와 Pandas UDF — 10배 빠르게 만들기

Python UDF 하나가 Spark 잡 전체를 느리게 만드는 진짜 이유(직렬화·행 단위 처리)를 파헤치고, Arrow 기반 벡터화된 Pandas UDF / applyInPandas / mapInPandas 로 성능을 끌어올리는 법, 그리고 UDF 자체를 피하는 전략을 정리합니다.

Data Dynamics2026年6月5日9 min read
This post is not yet translated. The original Korean version is shown below.

PySpark 잡을 프로파일링하면, 전체 시간의 80%를 UDF 하나가 잡아먹는 경우가 흔합니다. "파이썬 함수 하나 적용했을 뿐인데" 잡이 10배 느려집니다. 원인은 명확합니다 — 일반 Python UDF 는 행 단위로, JVM 과 Python 사이를 직렬화하며 돕니다.

이 글은 Python UDF 가 왜 느린지 구조적으로 설명하고, Arrow 기반 벡터화된 Pandas UDF 로 성능을 끌어올리는 법, 그리고 가장 좋은 해법인 "UDF 자체를 피하기"를 정리합니다.

1. 왜 Python UDF 는 느린가

Spark 는 JVM 위에서 돕니다. Python UDF 를 만나면 데이터가 JVM → Python → JVM 으로 오가야 합니다.

[일반 Python UDF]  행 하나마다:
  JVM 의 행  ──(직렬화/피클)──>  Python 워커  ──함수 적용──  ──(직렬화)──>  JVM
  → 1억 행이면 1억 번의 직렬화 왕복 + 행 단위 파이썬 인터프리터 호출

세 가지 비용이 겹칩니다.

비용설명
직렬화행마다 JVM↔Python 피클 직렬화/역직렬화
행 단위 호출파이썬 인터프리터를 행마다 호출(벡터화 없음)
옵티마이저 블랙박스Catalyst 가 UDF 내부를 못 봄 → pushdown·최적화 불가

내장 함수(F.col, F.when, …)는 JVM 안에서 컬럼 단위로 실행되어 이 비용이 전혀 없습니다. 그래서 첫 번째 원칙은 "내장 함수로 되면 UDF 쓰지 말 것" 입니다.

2. 1순위 해법 — 내장 함수로 대체

많은 UDF 는 사실 내장 함수 조합으로 표현됩니다.

from pyspark.sql import functions as F
 
# BAD: Python UDF
@F.udf("string")
def grade(score):
    if score >= 90: return "A"
    elif score >= 80: return "B"
    else: return "C"
df = df.withColumn("g", grade("score"))
 
# GOOD: 내장 when/otherwise (JVM 내 실행, 수배~수십배 빠름)
df = df.withColumn("g",
    F.when(F.col("score") >= 90, "A")
     .when(F.col("score") >= 80, "B")
     .otherwise("C"))

문자열·날짜·정규식·JSON 가공은 거의 다 내장 함수가 있습니다(regexp_replace, split, from_json, date_format, transform, filter 등). UDF 를 쓰기 전에 내장 함수를 먼저 찾으세요.

3. 2순위 해법 — Pandas UDF (벡터화)

내장으로 안 되는 진짜 커스텀 로직(복잡한 파싱, ML 추론, 특수 계산)은 Pandas UDF 를 씁니다. Apache Arrow 로 데이터를 배치(컬럼) 단위로 한 번에 넘겨, 직렬화·인터프리터 비용을 없앱니다.

[Pandas UDF]  배치 단위:
  JVM 컬럼  ──(Arrow, 제로카피에 가까움)──>  Python: pandas.Series 로 벡터 연산  ──>  JVM
  → 행 단위 왕복이 사라지고, pandas/numpy 의 벡터화 속도로 처리

먼저 Arrow 를 켭니다.

spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

Series → Series (스칼라 Pandas UDF)

가장 흔한 형태. 입력 컬럼을 pd.Series 로 받아 벡터 연산.

import pandas as pd
from pyspark.sql.functions import pandas_udf
 
@pandas_udf("double")
def celsius_to_f(c: pd.Series) -> pd.Series:
    return c * 9 / 5 + 32          # pandas 벡터 연산, 루프 없음
 
df = df.withColumn("temp_f", celsius_to_f("temp_c"))

여러 컬럼 입력

@pandas_udf("double")
def weighted(a: pd.Series, b: pd.Series, w: pd.Series) -> pd.Series:
    return (a * w + b * (1 - w))
 
df = df.withColumn("score", weighted("x", "y", "w"))
일반 UDFPandas UDF
데이터 전달행 단위 피클배치 단위 Arrow
연산파이썬 루프pandas/numpy 벡터
전형적 속도기준수배~수십배 빠름

4. applyInPandas — 그룹별 pandas 처리

"그룹마다 pandas DataFrame 으로 받아 자유롭게 처리"가 필요할 때(그룹별 정규화, 시계열 보간, 그룹별 모델 적용) 강력합니다.

from pyspark.sql.types import StructType, StructField, DoubleType, StringType
 
schema = StructType([
    StructField("user_id", StringType()),
    StructField("z_score", DoubleType()),
])
 
def normalize(pdf: pd.DataFrame) -> pd.DataFrame:
    # pdf 는 한 그룹 전체 (pandas DataFrame)
    pdf["z_score"] = (pdf["amount"] - pdf["amount"].mean()) / pdf["amount"].std()
    return pdf[["user_id", "z_score"]]
 
result = df.groupBy("user_id").applyInPandas(normalize, schema=schema)

주의: applyInPandas그룹 하나가 한 익스큐터 메모리에 다 들어가야 합니다. 그룹이 거대하면(스큐) OOM 이 납니다. 그룹 크기를 확인하고, 큰 그룹은 사전 분할하세요.

5. mapInPandas — 파티션 단위 스트리밍 처리

그룹이 아니라 파티션을 pandas DataFrame 배치들의 이터레이터로 받아 처리합니다. 전체를 메모리에 안 올려도 되어, 큰 데이터에 ML 추론을 태울 때 적합합니다.

from typing import Iterator
 
def predict(batches: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:
    model = load_model()                 # 파티션당 1회 로드 (행마다 아님!)
    for pdf in batches:
        pdf["pred"] = model.predict(pdf[features])
        yield pdf
 
result = df.mapInPandas(predict, schema="... pred double")

mapInPandas 의 큰 장점은 모델·커넥션 같은 무거운 객체를 배치 루프 밖에서 한 번만 초기화할 수 있다는 것입니다. 일반 UDF 로 추론하면 행마다 모델을 들고 다녀 비효율적입니다. (대규모 배치 추론 패턴은 별도 글 "Spark + LLM 연동 가이드"에서도 다룹니다.)

6. 어떤 걸 언제 쓰나

내장 함수로 표현 가능?
  ├─ 예  → 내장 함수 (항상 1순위)
  └─ 아니오 →
       컬럼 → 컬럼 변환?         → pandas_udf (Series→Series)
       그룹별 DataFrame 처리?    → applyInPandas
       파티션 스트리밍/무거운 초기화? → mapInPandas
       정말 행 단위 단순 로직?   → (최후) 일반 UDF
방식입력 단위적합
내장 함수컬럼대부분의 변환
pandas_udfSeries(배치)커스텀 컬럼 변환
applyInPandas그룹 DataFrame그룹별 복잡 로직
mapInPandas파티션 이터레이터추론·무거운 초기화
일반 UDF최후의 수단

7. UDF 사용 시 추가 주의

  • WHERE 절에 UDF 금지: UDF 로 필터하면 Catalyst 가 못 들여다봐 pushdown·파티션 프루닝이 깨집니다. 필터는 내장 표현식으로, UDF 는 select(투영) 단계에서.
  • Arrow 활성화 확인: spark.sql.execution.arrow.pyspark.enabled=true 가 꺼져 있으면 Pandas UDF 도 느려집니다.
  • 타입 일치: UDF 반환 타입과 선언 스키마가 어긋나면 조용히 NULL 이 되거나 에러. 명시적으로.
  • Python 메모리: Pandas UDF 는 Python 워커 메모리를 씁니다. 배치가 크면 overhead OOM 위험(별도 글 "PySpark Executor OOM 정복" 참고).
  • null 처리: pandas 연산에서 NaN/None 처리를 명시하지 않으면 결과가 틀어질 수 있습니다.

8. 정리

원칙내용
1순위내장 함수로 대체 (UDF 회피)
2순위Pandas UDF (Arrow 벡터화)
그룹 로직applyInPandas (그룹 OOM 주의)
추론/무거운 초기화mapInPandas
금기WHERE 절 UDF, Arrow 비활성

PySpark 성능 문제의 단골 원인은 "당연하게 쓴 Python UDF" 입니다. 핵심 통찰은 두 가지 — Spark 는 JVM 엔진이므로 행 단위 Python 왕복이 가장 비싸고, 이를 없애는 길은 내장 함수(JVM 내) 아니면 Arrow 벡터화(배치 단위) 라는 것입니다. "내장으로 되나? 안 되면 Pandas UDF" 한 줄만 습관이 되어도, 잡의 절반이 빨라지는 경험을 하게 됩니다.


이 글은 Spark 3.5 + PyArrow 기준으로 작성되었습니다. PySpark 파이프라인의 UDF 성능 개선이나 대규모 추론 최적화가 필요하시면 언제든 문의해 주세요.

— Data Dynamics 엔지니어링 팀