PySpark UDF-Tutorial (anfängerfreundlich): Was, warum und wie du pyspark udf verwendest
PySpark ist großartig darin, Big Data schnell zu verarbeiten – bis du Logik schreibst, die Spark nicht versteht. Genau dafür gibt es eine UDF (User Defined Function): Sie ermöglicht dir, eigenen Python-Code auf Spark DataFrames auszuführen.
Der Haken: UDFs können Spark auch langsamer machen, wenn man sie unbedacht einsetzt. Dieses Tutorial zeigt, wann UDFs sinnvoll sind, wie du sie korrekt schreibst und wie du schnellere Alternativen auswählst (z. B. Built-in Functions oder Pandas UDFs).
Das Problem, das UDFs lösen
Du hast eine DataFrame-Spalte und möchtest eigene Logik anwenden:
- domänenspezifisches Parsing (komische IDs, eigene Regeln)
- eigene Scoring-Funktionen
- Textnormalisierung, die von den eingebauten Spark SQL-Funktionen nicht abgedeckt ist
- Werte über eine komplexe Regel mappen
Sparks Built-in Functions sind in der JVM optimiert. Reine Python-Logik ist aber nicht automatisch „Spark-native“.
Die Lösungsoptionen (schnellste → langsamste)
In der Praxis solltest du diese Reihenfolge bevorzugen:
- Built-in Spark functions (
pyspark.sql.functions) ✅ am schnellsten - SQL expressions (
expr,when,regexp_extract) ✅ schnell - Pandas UDFs (vektorisiert mit Apache Arrow) ✅ oft schnell
- Regular Python UDFs (Zeile-für-Zeile in Python) ⚠️ oft am langsamsten
Dieser Beitrag konzentriert sich auf (3) und (4) – mit klarer Empfehlung, wann du sie besser vermeiden solltest.
Setup: ein kleiner DataFrame zum Ausprobieren
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, IntegerType, DoubleType
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame(
[
(" Alice ", "US", 10),
("bob", "UK", 3),
(None, "US", 7),
],
["name", "country", "visits"]
)
df.show()Beispiel 1: Eine einfache Python UDF (String-Bereinigung)
Ziel
Leerzeichen trimmen, in Kleinbuchstaben umwandeln und Nulls sicher behandeln.
from pyspark.sql.functions import udf
def clean_name(x: str) -> str:
if x is None:
return None
return x.strip().lower()
clean_name_udf = udf(clean_name, StringType())
df2 = df.withColumn("name_clean", clean_name_udf(F.col("name")))
df2.show()Hinweise
- Du musst einen Rückgabetyp angeben (
StringType()). - Eine Python UDF läuft Zeile für Zeile in Python-Workern (kann bei großen Datenmengen langsam sein).
Beispiel 2: Wenn möglich Built-in Functions bevorzugen
Die vorherige UDF kann durch Built-ins ersetzt werden (schneller):
df_fast = df.withColumn(
"name_clean",
F.lower(F.trim(F.col("name")))
)
df_fast.show()Faustregel: Wenn du es mit F.* ausdrücken kannst, mach es so.
Beispiel 3: UDF mit mehreren Inputs (Custom Scoring)
Erstellen wir einen Score basierend auf country + visits.
def score(country: str, visits: int) -> float:
if country is None or visits is None:
return 0.0
bonus = 1.5 if country == "US" else 1.0
return float(visits) * bonus
score_udf = udf(score, DoubleType())
df3 = df.withColumn("score", score_udf(F.col("country"), F.col("visits")))
df3.show()Beispiel 4: Eine UDF für Spark SQL registrieren
Wenn du sie in SQL verwenden möchtest:
spark.udf.register("score_udf_sql", score, DoubleType())
df.createOrReplaceTempView("t")
spark.sql("""
SELECT name, country, visits, score_udf_sql(country, visits) AS score
FROM t
""").show()Häufige Fehler (und wie du sie vermeidest)
1) Den falschen Typ zurückgeben
Wenn Spark DoubleType() erwartet und du manchmal einen String zurückgibst, bekommst du Laufzeitfehler oder null-Werte.
2) UDFs in joins/filters unnötig verwenden
UDFs können Spark-Optimierungen blockieren. Versuche stattdessen zuerst eine abgeleitete Spalte zu berechnen oder Built-ins zu nutzen.
3) Null-Handling vergessen
Gehe immer davon aus, dass Inputs None sein können.
Schnellere Alternative: Pandas UDF (vektorisiert)
Eine Pandas UDF verarbeitet Daten stapelweise (vektorisiert) und ist oft deutlich schneller als eine normale UDF.
Beispiel 5: Pandas UDF für Name-Cleaning
import pandas as pd
from pyspark.sql.functions import pandas_udf
@pandas_udf("string")
def clean_name_pandas(s: pd.Series) -> pd.Series:
return s.str.strip().str.lower()
df4 = df.withColumn("name_clean", clean_name_pandas(F.col("name")))
df4.show()Wann Pandas UDFs helfen
- Logik ist in pandas einfacher
- du brauchst vektorisierte Operationen
- Performance ist wichtig und Built-ins reichen nicht aus
Beispiel 6: Pandas UDF für elementweises Scoring
@pandas_udf("double")
def score_pandas(country: pd.Series, visits: pd.Series) -> pd.Series:
bonus = country.eq("US").astype("float64") * 0.5 + 1.0
visits_filled = visits.fillna(0).astype("float64")
return visits_filled * bonus
df5 = df.withColumn("score", score_pandas(F.col("country"), F.col("visits")))
df5.show()Praktische Best-Practices-Checkliste
- ✅ Versuche zuerst Built-in Functions (
F.lower,F.when,F.regexp_extract, etc.) - ✅ Wenn Custom Logic nötig ist, ziehe Pandas UDF vor einer normalen UDF in Betracht
- ✅ Immer die korrekten Rückgabetypen angeben
- ✅ Nulls (
None) explizit behandeln - ✅ UDF-Logik sauber und deterministisch halten (gleicher Input → gleicher Output)
- ⚠️ UDFs in Filters/Joins möglichst vermeiden (sie können Optimierungen reduzieren)
- ⚠️ Messen: Laufzeit mit/ohne UDF auf einem Sample-Dataset vergleichen
Wann du eine normale Python UDF verwenden solltest
Nutze eine normale UDF, wenn:
- die Datenmenge nicht riesig ist (oder Performance nicht kritisch ist)
- die Logik schwer mit Built-ins auszudrücken ist
- pandas/Arrow nicht verfügbar ist oder nicht zur Transformation passt
Wenn Performance wichtig ist, greife zuerst zu Built-ins oder Pandas UDFs.