Blog
pysparksparkpandas-udfarrowperformancedata-engineering

Why PySpark UDFs Are Slow, and Pandas UDFs — Making Them 10x Faster

We dig into the real reasons a single Python UDF can slow down an entire Spark job (serialization and row-by-row processing), show how to boost performance with Arrow-based vectorized Pandas UDFs / applyInPandas / mapInPandas, and cover strategies for avoiding UDFs altogether.

Data DynamicsJune 5, 20266 min read

When you profile a PySpark job, it's common to find a single UDF eating 80% of the total runtime. "All I did was apply one Python function" — and the job is 10x slower. The cause is clear: regular Python UDFs process data row by row, serializing back and forth between the JVM and Python.

This post explains structurally why Python UDFs are slow, how to recover performance with Arrow-based vectorized Pandas UDFs, and the best solution of all: avoiding UDFs in the first place.

1. Why Python UDFs Are Slow

Spark runs on the JVM. When it encounters a Python UDF, the data has to travel JVM → Python → JVM.

[Regular Python UDF]  for every single row:
  row in JVM  ──(serialize/pickle)──>  Python worker  ──apply function──  ──(serialize)──>  JVM
  → 100 million rows means 100 million serialization round trips + a Python interpreter call per row

Three costs stack on top of each other.

CostDescription
SerializationJVM↔Python pickle serialization/deserialization for every row
Per-row invocationThe Python interpreter is invoked per row (no vectorization)
Optimizer black boxCatalyst cannot see inside the UDF → no pushdown or optimization

Built-in functions (F.col, F.when, …) execute column-wise inside the JVM and pay none of these costs. Hence the first principle: "If a built-in function can do it, don't write a UDF."

2. First Choice — Replace with Built-in Functions

Many UDFs can in fact be expressed as combinations of built-in functions.

from pyspark.sql import functions as F
 
# BAD: Python UDF
@F.udf("string")
def grade(score):
    if score >= 90: return "A"
    elif score >= 80: return "B"
    else: return "C"
df = df.withColumn("g", grade("score"))
 
# GOOD: built-in when/otherwise (runs inside the JVM, several to dozens of times faster)
df = df.withColumn("g",
    F.when(F.col("score") >= 90, "A")
     .when(F.col("score") >= 80, "B")
     .otherwise("C"))

For string, date, regex, and JSON manipulation there is almost always a built-in function (regexp_replace, split, from_json, date_format, transform, filter, and so on). Look for a built-in before reaching for a UDF.

3. Second Choice — Pandas UDF (Vectorized)

For genuinely custom logic that built-ins can't express (complex parsing, ML inference, specialized computation), use a Pandas UDF. It transfers data in batches (column-wise) via Apache Arrow, eliminating the serialization and interpreter costs.

[Pandas UDF]  per batch:
  JVM column  ──(Arrow, near zero-copy)──>  Python: vectorized ops on pandas.Series  ──>  JVM
  → row-by-row round trips disappear, processing runs at pandas/numpy vectorized speed

First, enable Arrow.

spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

Series → Series (Scalar Pandas UDF)

The most common form. Receive the input column as a pd.Series and apply vectorized operations.

import pandas as pd
from pyspark.sql.functions import pandas_udf
 
@pandas_udf("double")
def celsius_to_f(c: pd.Series) -> pd.Series:
    return c * 9 / 5 + 32          # pandas vectorized op, no loop
 
df = df.withColumn("temp_f", celsius_to_f("temp_c"))

Multiple Input Columns

@pandas_udf("double")
def weighted(a: pd.Series, b: pd.Series, w: pd.Series) -> pd.Series:
    return (a * w + b * (1 - w))
 
df = df.withColumn("score", weighted("x", "y", "w"))
Regular UDFPandas UDF
Data transferPer-row picklePer-batch Arrow
ComputationPython looppandas/numpy vectorized
Typical speedBaselineSeveral to dozens of times faster

4. applyInPandas — Per-Group pandas Processing

Powerful when you need to "receive each group as a pandas DataFrame and process it freely" (per-group normalization, time-series interpolation, applying a model per group).

from pyspark.sql.types import StructType, StructField, DoubleType, StringType
 
schema = StructType([
    StructField("user_id", StringType()),
    StructField("z_score", DoubleType()),
])
 
def normalize(pdf: pd.DataFrame) -> pd.DataFrame:
    # pdf is one entire group (a pandas DataFrame)
    pdf["z_score"] = (pdf["amount"] - pdf["amount"].mean()) / pdf["amount"].std()
    return pdf[["user_id", "z_score"]]
 
result = df.groupBy("user_id").applyInPandas(normalize, schema=schema)

Caution: with applyInPandas, a single group must fit entirely in one executor's memory. If a group is huge (skew), you get an OOM. Check group sizes, and pre-split oversized groups.

5. mapInPandas — Partition-Level Streaming Processing

Instead of groups, this receives a partition as an iterator of pandas DataFrame batches. Since the whole partition never has to sit in memory at once, it's a great fit for running ML inference over large datasets.

from typing import Iterator
 
def predict(batches: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:
    model = load_model()                 # loaded once per partition (not per row!)
    for pdf in batches:
        pdf["pred"] = model.predict(pdf[features])
        yield pdf
 
result = df.mapInPandas(predict, schema="... pred double")

The big win with mapInPandas is that heavy objects like models and connections can be initialized once, outside the batch loop. Running inference through a regular UDF drags the model along for every row, which is wildly inefficient. (Large-scale batch inference patterns are also covered in a separate post, "Spark + LLM Integration Guide".)

6. Which One to Use When

Can it be expressed with built-in functions?
  ├─ Yes → built-in functions (always first choice)
  └─ No →
       Column → column transformation?        → pandas_udf (Series→Series)
       Per-group DataFrame processing?        → applyInPandas
       Partition streaming / heavy init?      → mapInPandas
       Truly simple row-level logic?          → (last resort) regular UDF
ApproachInput unitBest for
Built-in functionsColumnMost transformations
pandas_udfSeries (batch)Custom column transformations
applyInPandasGroup DataFrameComplex per-group logic
mapInPandasPartition iteratorInference / heavy initialization
Regular UDFRowLast resort

7. Additional Cautions When Using UDFs

  • No UDFs in WHERE clauses: filtering through a UDF blinds Catalyst, breaking pushdown and partition pruning. Filter with built-in expressions; keep UDFs in the select (projection) stage.
  • Verify Arrow is enabled: if spark.sql.execution.arrow.pyspark.enabled=true is off, even Pandas UDFs will be slow.
  • Type consistency: if the UDF's return type doesn't match the declared schema, you silently get NULLs or an error. Be explicit.
  • Python memory: Pandas UDFs use Python worker memory. Large batches risk overhead OOM (see the separate post "Conquering PySpark Executor OOM").
  • Null handling: if you don't explicitly handle NaN/None in pandas operations, results can come out wrong.

8. Summary

PrincipleDetail
First choiceReplace with built-in functions (avoid UDFs)
Second choicePandas UDF (Arrow vectorization)
Group logicapplyInPandas (watch out for per-group OOM)
Inference / heavy initmapInPandas
ForbiddenUDFs in WHERE clauses, Arrow disabled

The usual suspect behind PySpark performance problems is "the Python UDF you wrote without thinking twice." The key insight is twofold — Spark is a JVM engine, so row-by-row Python round trips are the most expensive thing you can do, and the way to eliminate them is either built-in functions (inside the JVM) or Arrow vectorization (per batch). Make "Can a built-in do it? If not, Pandas UDF" a one-line habit, and you'll watch half your jobs get faster.


This article is based on Spark 3.5 + PyArrow. If you need help improving UDF performance in your PySpark pipelines or optimizing large-scale inference, feel free to reach out.

— Data Dynamics Engineering Team