PySpark has over 300 built-in functions in pyspark.sql.functions. For everything those don't cover, you have User-Defined Functions. UDFs are the escape hatch when the built-ins don't fit — but they come with real performance costs. Understanding why helps you know when the tradeoff is worth it.
The Standard Python UDF
from pyspark.sql.types import StringType
from pyspark.sql import functions as F
def normalize_phone(phone_str):
"""Strip non-digits from phone number."""
import re
if phone_str is None:
return None
return re.sub(r'D', '', str(phone_str))
# Register as a UDF
normalize_phone_udf = F.udf(normalize_phone, returnType=StringType())
# Apply to a column
df = df.withColumn("phone_clean", normalize_phone_udf(F.col("phone")))
This works. It's also significantly slower than a built-in Spark function, for a specific reason.
Why Standard Python UDFs Are Slow
Spark is a JVM-native system. Your DataFrame lives in JVM memory. When you apply a Python UDF, Spark has to:
- Serialize each row (or batch of rows) from JVM binary format into Python-serializable format (Pickle)
- Send it to a Python worker process via a socket
- Call your Python function in the Python process
- Serialize the result back
- Send it back to the JVM executor
- Deserialize into Spark's internal format
This serialization/deserialization overhead per row adds up. It also breaks Whole Stage Code Generation (the JVM-compiled execution loop), which means Spark can't fuse the UDF operation with adjacent operations.
For the phone normalization example: F.regexp_replace(F.col("phone"), r'D', '') does the same thing as the UDF, runs in the JVM, doesn't serialize anything, and is 5–20x faster depending on data volume.
Always Check Built-ins First
# Before writing a UDF, check if a built-in handles it
# String operations
F.regexp_replace(col, pattern, replacement)
F.regexp_extract(col, pattern, idx)
F.split(col, pattern)
F.substring(col, pos, len)
F.trim(col), F.ltrim(col), F.rtrim(col)
F.upper(col), F.lower(col)
F.concat_ws(sep, *cols)
# Date operations
F.date_trunc(format, timestamp)
F.date_add(date, days)
F.datediff(date1, date2)
F.to_date(col, format)
F.year(col), F.month(col), F.dayofweek(col)
# JSON operations
F.from_json(col, schema)
F.to_json(col)
F.get_json_object(col, path)
F.json_tuple(col, *fields)
# Array operations
F.array_contains(col, value)
F.explode(col)
F.array_distinct(col)
F.transform(col, lambda_func) -- Spark 3.1+
Pandas UDFs: When You Need Python Logic at Scale
Pandas UDFs (also called vectorized UDFs) solve the performance problem by operating on pandas Series or DataFrames per partition instead of row by row. Spark sends an entire partition as a pandas Series to the Python process, you operate on it with pandas vectorized operations, and return a pandas Series. The serialization overhead is paid once per partition instead of once per row.
from pyspark.sql.functions import pandas_udf
import pandas as pd
@pandas_udf(returnType=StringType())
def normalize_phone_vec(phone_series: pd.Series) -> pd.Series:
return phone_series.str.replace(r'D', '', regex=True).fillna('')
# Apply the pandas UDF
df = df.withColumn("phone_clean", normalize_phone_vec(F.col("phone")))
Pandas UDFs are the right choice when:
- The logic requires Python libraries (numpy, scipy, statsmodels) with no PySpark equivalent
- You're running ML model inference on each partition (predict() on a loaded model)
- The computation genuinely can't be expressed with built-in Spark functions
Scalar Iterator Pandas UDF: Model Inference at Scale
The most powerful Pandas UDF variant for ML inference: the iterator pattern loads a model once per partition instead of once per row:
from typing import Iterator
import pandas as pd
import mlflow
@pandas_udf(returnType=DoubleType())
def predict_churn(iterator: Iterator[pd.DataFrame]) -> Iterator[pd.Series]:
# Load model once per partition (executor), not once per row
model = mlflow.sklearn.load_model("models:/churn_model/Production")
for batch in iterator:
features = batch[["tenure", "monthly_charges", "contract_type"]]
yield pd.Series(model.predict_proba(features)[:, 1])
df_predictions = customer_df.withColumn("churn_probability", predict_churn(*feature_cols))
The model loads once when the partition starts, then scores all rows in the partition. No repeated model deserialization per row.
The Rule
Use a built-in Spark function if it exists. Use a Pandas UDF if you need Python logic that operates well on vectors (numpy, pandas, scikit-learn). Use a standard Python UDF only for quick prototyping or for truly row-level logic that can't be vectorized and won't be called on large datasets. Profile before optimizing — the cost of a UDF on a 10,000-row reference table is negligible; on a 10-billion-row event log it's significant.