Blog
pysparksparkpandas-udfarrowperformancedata-engineering

PySpark UDF가 느린 이유와 Pandas UDF — 10배 빠르게 만들기

Python UDF 하나가 Spark 잡 전체를 느리게 만드는 진짜 이유(직렬화·행 단위 처리)를 파헤치고, Arrow 기반 벡터화된 Pandas UDF / applyInPandas / mapInPandas 로 성능을 끌어올리는 법, 그리고 UDF 자체를 피하는 전략을 정리합니다.

Data Dynamics2026년 6월 5일9 min read

PySpark 잡을 프로파일링하면, 전체 시간의 80%를 UDF 하나가 잡아먹는 경우가 흔합니다. "파이썬 함수 하나 적용했을 뿐인데" 잡이 10배 느려집니다. 원인은 명확합니다 — 일반 Python UDF 는 행 단위로, JVM 과 Python 사이를 직렬화하며 돕니다.

이 글은 Python UDF 가 왜 느린지 구조적으로 설명하고, Arrow 기반 벡터화된 Pandas UDF 로 성능을 끌어올리는 법, 그리고 가장 좋은 해법인 "UDF 자체를 피하기"를 정리합니다.

1. 왜 Python UDF 는 느린가

Spark 는 JVM 위에서 돕니다. Python UDF 를 만나면 데이터가 JVM → Python → JVM 으로 오가야 합니다.

[일반 Python UDF]  행 하나마다:
  JVM 의 행  ──(직렬화/피클)──>  Python 워커  ──함수 적용──  ──(직렬화)──>  JVM
  → 1억 행이면 1억 번의 직렬화 왕복 + 행 단위 파이썬 인터프리터 호출

세 가지 비용이 겹칩니다.

비용설명
직렬화행마다 JVM↔Python 피클 직렬화/역직렬화
행 단위 호출파이썬 인터프리터를 행마다 호출(벡터화 없음)
옵티마이저 블랙박스Catalyst 가 UDF 내부를 못 봄 → pushdown·최적화 불가

내장 함수(F.col, F.when, …)는 JVM 안에서 컬럼 단위로 실행되어 이 비용이 전혀 없습니다. 그래서 첫 번째 원칙은 "내장 함수로 되면 UDF 쓰지 말 것" 입니다.

2. 1순위 해법 — 내장 함수로 대체

많은 UDF 는 사실 내장 함수 조합으로 표현됩니다.

from pyspark.sql import functions as F
 
# BAD: Python UDF
@F.udf("string")
def grade(score):
    if score >= 90: return "A"
    elif score >= 80: return "B"
    else: return "C"
df = df.withColumn("g", grade("score"))
 
# GOOD: 내장 when/otherwise (JVM 내 실행, 수배~수십배 빠름)
df = df.withColumn("g",
    F.when(F.col("score") >= 90, "A")
     .when(F.col("score") >= 80, "B")
     .otherwise("C"))

문자열·날짜·정규식·JSON 가공은 거의 다 내장 함수가 있습니다(regexp_replace, split, from_json, date_format, transform, filter 등). UDF 를 쓰기 전에 내장 함수를 먼저 찾으세요.

3. 2순위 해법 — Pandas UDF (벡터화)

내장으로 안 되는 진짜 커스텀 로직(복잡한 파싱, ML 추론, 특수 계산)은 Pandas UDF 를 씁니다. Apache Arrow 로 데이터를 배치(컬럼) 단위로 한 번에 넘겨, 직렬화·인터프리터 비용을 없앱니다.

[Pandas UDF]  배치 단위:
  JVM 컬럼  ──(Arrow, 제로카피에 가까움)──>  Python: pandas.Series 로 벡터 연산  ──>  JVM
  → 행 단위 왕복이 사라지고, pandas/numpy 의 벡터화 속도로 처리

먼저 Arrow 를 켭니다.

spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

Series → Series (스칼라 Pandas UDF)

가장 흔한 형태. 입력 컬럼을 pd.Series 로 받아 벡터 연산.

import pandas as pd
from pyspark.sql.functions import pandas_udf
 
@pandas_udf("double")
def celsius_to_f(c: pd.Series) -> pd.Series:
    return c * 9 / 5 + 32          # pandas 벡터 연산, 루프 없음
 
df = df.withColumn("temp_f", celsius_to_f("temp_c"))

여러 컬럼 입력

@pandas_udf("double")
def weighted(a: pd.Series, b: pd.Series, w: pd.Series) -> pd.Series:
    return (a * w + b * (1 - w))
 
df = df.withColumn("score", weighted("x", "y", "w"))
일반 UDFPandas UDF
데이터 전달행 단위 피클배치 단위 Arrow
연산파이썬 루프pandas/numpy 벡터
전형적 속도기준수배~수십배 빠름

4. applyInPandas — 그룹별 pandas 처리

"그룹마다 pandas DataFrame 으로 받아 자유롭게 처리"가 필요할 때(그룹별 정규화, 시계열 보간, 그룹별 모델 적용) 강력합니다.

from pyspark.sql.types import StructType, StructField, DoubleType, StringType
 
schema = StructType([
    StructField("user_id", StringType()),
    StructField("z_score", DoubleType()),
])
 
def normalize(pdf: pd.DataFrame) -> pd.DataFrame:
    # pdf 는 한 그룹 전체 (pandas DataFrame)
    pdf["z_score"] = (pdf["amount"] - pdf["amount"].mean()) / pdf["amount"].std()
    return pdf[["user_id", "z_score"]]
 
result = df.groupBy("user_id").applyInPandas(normalize, schema=schema)

주의: applyInPandas그룹 하나가 한 익스큐터 메모리에 다 들어가야 합니다. 그룹이 거대하면(스큐) OOM 이 납니다. 그룹 크기를 확인하고, 큰 그룹은 사전 분할하세요.

5. mapInPandas — 파티션 단위 스트리밍 처리

그룹이 아니라 파티션을 pandas DataFrame 배치들의 이터레이터로 받아 처리합니다. 전체를 메모리에 안 올려도 되어, 큰 데이터에 ML 추론을 태울 때 적합합니다.

from typing import Iterator
 
def predict(batches: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:
    model = load_model()                 # 파티션당 1회 로드 (행마다 아님!)
    for pdf in batches:
        pdf["pred"] = model.predict(pdf[features])
        yield pdf
 
result = df.mapInPandas(predict, schema="... pred double")

mapInPandas 의 큰 장점은 모델·커넥션 같은 무거운 객체를 배치 루프 밖에서 한 번만 초기화할 수 있다는 것입니다. 일반 UDF 로 추론하면 행마다 모델을 들고 다녀 비효율적입니다. (대규모 배치 추론 패턴은 별도 글 "Spark + LLM 연동 가이드"에서도 다룹니다.)

6. 어떤 걸 언제 쓰나

내장 함수로 표현 가능?
  ├─ 예  → 내장 함수 (항상 1순위)
  └─ 아니오 →
       컬럼 → 컬럼 변환?         → pandas_udf (Series→Series)
       그룹별 DataFrame 처리?    → applyInPandas
       파티션 스트리밍/무거운 초기화? → mapInPandas
       정말 행 단위 단순 로직?   → (최후) 일반 UDF
방식입력 단위적합
내장 함수컬럼대부분의 변환
pandas_udfSeries(배치)커스텀 컬럼 변환
applyInPandas그룹 DataFrame그룹별 복잡 로직
mapInPandas파티션 이터레이터추론·무거운 초기화
일반 UDF최후의 수단

7. UDF 사용 시 추가 주의

  • WHERE 절에 UDF 금지: UDF 로 필터하면 Catalyst 가 못 들여다봐 pushdown·파티션 프루닝이 깨집니다. 필터는 내장 표현식으로, UDF 는 select(투영) 단계에서.
  • Arrow 활성화 확인: spark.sql.execution.arrow.pyspark.enabled=true 가 꺼져 있으면 Pandas UDF 도 느려집니다.
  • 타입 일치: UDF 반환 타입과 선언 스키마가 어긋나면 조용히 NULL 이 되거나 에러. 명시적으로.
  • Python 메모리: Pandas UDF 는 Python 워커 메모리를 씁니다. 배치가 크면 overhead OOM 위험(별도 글 "PySpark Executor OOM 정복" 참고).
  • null 처리: pandas 연산에서 NaN/None 처리를 명시하지 않으면 결과가 틀어질 수 있습니다.

8. 정리

원칙내용
1순위내장 함수로 대체 (UDF 회피)
2순위Pandas UDF (Arrow 벡터화)
그룹 로직applyInPandas (그룹 OOM 주의)
추론/무거운 초기화mapInPandas
금기WHERE 절 UDF, Arrow 비활성

PySpark 성능 문제의 단골 원인은 "당연하게 쓴 Python UDF" 입니다. 핵심 통찰은 두 가지 — Spark 는 JVM 엔진이므로 행 단위 Python 왕복이 가장 비싸고, 이를 없애는 길은 내장 함수(JVM 내) 아니면 Arrow 벡터화(배치 단위) 라는 것입니다. "내장으로 되나? 안 되면 Pandas UDF" 한 줄만 습관이 되어도, 잡의 절반이 빨라지는 경험을 하게 됩니다.


이 글은 Spark 3.5 + PyArrow 기준으로 작성되었습니다. PySpark 파이프라인의 UDF 성능 개선이나 대규모 추론 최적화가 필요하시면 언제든 문의해 주세요.

— Data Dynamics 엔지니어링 팀