Why PySpark UDFs Are Slow, and Pandas UDFs — Making Them 10x Faster
We dig into the real reasons a single Python UDF can slow down an entire Spark job (serialization and row-by-row processing), show how to boost performance with Arrow-based vectorized Pandas UDFs / applyInPandas / mapInPandas, and cover strategies for avoiding UDFs altogether.
When you profile a PySpark job, it's common to find a single UDF eating 80% of the total runtime. "All I did was apply one Python function" — and the job is 10x slower. The cause is clear: regular Python UDFs process data row by row, serializing back and forth between the JVM and Python.
This post explains structurally why Python UDFs are slow, how to recover performance with Arrow-based vectorized Pandas UDFs, and the best solution of all: avoiding UDFs in the first place.
1. Why Python UDFs Are Slow
Spark runs on the JVM. When it encounters a Python UDF, the data has to travel JVM → Python → JVM.
[Regular Python UDF] for every single row:
row in JVM ──(serialize/pickle)──> Python worker ──apply function── ──(serialize)──> JVM
→ 100 million rows means 100 million serialization round trips + a Python interpreter call per rowThree costs stack on top of each other.
| Cost | Description |
|---|---|
| Serialization | JVM↔Python pickle serialization/deserialization for every row |
| Per-row invocation | The Python interpreter is invoked per row (no vectorization) |
| Optimizer black box | Catalyst cannot see inside the UDF → no pushdown or optimization |
Built-in functions (F.col, F.when, …) execute column-wise inside the JVM and pay none of these costs. Hence the first principle: "If a built-in function can do it, don't write a UDF."
2. First Choice — Replace with Built-in Functions
Many UDFs can in fact be expressed as combinations of built-in functions.
from pyspark.sql import functions as F
# BAD: Python UDF
@F.udf("string")
def grade(score):
if score >= 90: return "A"
elif score >= 80: return "B"
else: return "C"
df = df.withColumn("g", grade("score"))
# GOOD: built-in when/otherwise (runs inside the JVM, several to dozens of times faster)
df = df.withColumn("g",
F.when(F.col("score") >= 90, "A")
.when(F.col("score") >= 80, "B")
.otherwise("C"))For string, date, regex, and JSON manipulation there is almost always a built-in function (regexp_replace, split, from_json, date_format, transform, filter, and so on). Look for a built-in before reaching for a UDF.
3. Second Choice — Pandas UDF (Vectorized)
For genuinely custom logic that built-ins can't express (complex parsing, ML inference, specialized computation), use a Pandas UDF. It transfers data in batches (column-wise) via Apache Arrow, eliminating the serialization and interpreter costs.
[Pandas UDF] per batch:
JVM column ──(Arrow, near zero-copy)──> Python: vectorized ops on pandas.Series ──> JVM
→ row-by-row round trips disappear, processing runs at pandas/numpy vectorized speedFirst, enable Arrow.
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")Series → Series (Scalar Pandas UDF)
The most common form. Receive the input column as a pd.Series and apply vectorized operations.
import pandas as pd
from pyspark.sql.functions import pandas_udf
@pandas_udf("double")
def celsius_to_f(c: pd.Series) -> pd.Series:
return c * 9 / 5 + 32 # pandas vectorized op, no loop
df = df.withColumn("temp_f", celsius_to_f("temp_c"))Multiple Input Columns
@pandas_udf("double")
def weighted(a: pd.Series, b: pd.Series, w: pd.Series) -> pd.Series:
return (a * w + b * (1 - w))
df = df.withColumn("score", weighted("x", "y", "w"))| Regular UDF | Pandas UDF | |
|---|---|---|
| Data transfer | Per-row pickle | Per-batch Arrow |
| Computation | Python loop | pandas/numpy vectorized |
| Typical speed | Baseline | Several to dozens of times faster |
4. applyInPandas — Per-Group pandas Processing
Powerful when you need to "receive each group as a pandas DataFrame and process it freely" (per-group normalization, time-series interpolation, applying a model per group).
from pyspark.sql.types import StructType, StructField, DoubleType, StringType
schema = StructType([
StructField("user_id", StringType()),
StructField("z_score", DoubleType()),
])
def normalize(pdf: pd.DataFrame) -> pd.DataFrame:
# pdf is one entire group (a pandas DataFrame)
pdf["z_score"] = (pdf["amount"] - pdf["amount"].mean()) / pdf["amount"].std()
return pdf[["user_id", "z_score"]]
result = df.groupBy("user_id").applyInPandas(normalize, schema=schema)Caution: with
applyInPandas, a single group must fit entirely in one executor's memory. If a group is huge (skew), you get an OOM. Check group sizes, and pre-split oversized groups.
5. mapInPandas — Partition-Level Streaming Processing
Instead of groups, this receives a partition as an iterator of pandas DataFrame batches. Since the whole partition never has to sit in memory at once, it's a great fit for running ML inference over large datasets.
from typing import Iterator
def predict(batches: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:
model = load_model() # loaded once per partition (not per row!)
for pdf in batches:
pdf["pred"] = model.predict(pdf[features])
yield pdf
result = df.mapInPandas(predict, schema="... pred double")The big win with mapInPandas is that heavy objects like models and connections can be initialized once, outside the batch loop. Running inference through a regular UDF drags the model along for every row, which is wildly inefficient. (Large-scale batch inference patterns are also covered in a separate post, "Spark + LLM Integration Guide".)
6. Which One to Use When
Can it be expressed with built-in functions?
├─ Yes → built-in functions (always first choice)
└─ No →
Column → column transformation? → pandas_udf (Series→Series)
Per-group DataFrame processing? → applyInPandas
Partition streaming / heavy init? → mapInPandas
Truly simple row-level logic? → (last resort) regular UDF| Approach | Input unit | Best for |
|---|---|---|
| Built-in functions | Column | Most transformations |
pandas_udf | Series (batch) | Custom column transformations |
applyInPandas | Group DataFrame | Complex per-group logic |
mapInPandas | Partition iterator | Inference / heavy initialization |
| Regular UDF | Row | Last resort |
7. Additional Cautions When Using UDFs
- No UDFs in WHERE clauses: filtering through a UDF blinds Catalyst, breaking pushdown and partition pruning. Filter with built-in expressions; keep UDFs in the
select(projection) stage. - Verify Arrow is enabled: if
spark.sql.execution.arrow.pyspark.enabled=trueis off, even Pandas UDFs will be slow. - Type consistency: if the UDF's return type doesn't match the declared schema, you silently get NULLs or an error. Be explicit.
- Python memory: Pandas UDFs use Python worker memory. Large batches risk overhead OOM (see the separate post "Conquering PySpark Executor OOM").
- Null handling: if you don't explicitly handle NaN/None in pandas operations, results can come out wrong.
8. Summary
| Principle | Detail |
|---|---|
| First choice | Replace with built-in functions (avoid UDFs) |
| Second choice | Pandas UDF (Arrow vectorization) |
| Group logic | applyInPandas (watch out for per-group OOM) |
| Inference / heavy init | mapInPandas |
| Forbidden | UDFs in WHERE clauses, Arrow disabled |
The usual suspect behind PySpark performance problems is "the Python UDF you wrote without thinking twice." The key insight is twofold — Spark is a JVM engine, so row-by-row Python round trips are the most expensive thing you can do, and the way to eliminate them is either built-in functions (inside the JVM) or Arrow vectorization (per batch). Make "Can a built-in do it? If not, Pandas UDF" a one-line habit, and you'll watch half your jobs get faster.
This article is based on Spark 3.5 + PyArrow. If you need help improving UDF performance in your PySpark pipelines or optimizing large-scale inference, feel free to reach out.
— Data Dynamics Engineering Team