PySpark UDF vs Pandas UDF vs mapInPandas: Welche solltest du verwenden?
Wenn du in PySpark eigene Logik brauchst, greifst du typischerweise zu einem von drei Werkzeugen:
- Normale Python UDF (
udf(...)) - Pandas UDF (
@pandas_udf) mapInPandas(DataFrame → Iterator von Pandas DataFrames)
Alle können „Python auf Spark ausführen“, aber sie unterscheiden sich stark in Performance, Flexibilität und darin, wie viel von Sparks Optimierungen erhalten bleibt.
Dieser Guide bietet dir einen praktischen Entscheidungsrahmen plus Beispiele zum Kopieren.
Mentales Modell: Was ändert sich zwischen den drei Optionen?
1) Normale UDF (Python zeilenweise)
- Spark schickt Spalten an Python-Worker-Prozesse.
- Deine Funktion läuft eine Zeile nach der anderen.
- Oft die langsamste Option.
- Kann Sparks Optimizer und Code-Generierung ausbremsen.
Nutze sie, wenn: die Logik simpel ist, die Datenmenge klein ist oder Geschwindigkeit egal ist.
2) Pandas UDF (vektorisierte Batches via Arrow)
- Spark sendet Daten in spaltenorientierten Batches über Apache Arrow nach Python.
- Deine Funktion arbeitet auf Pandas Series / DataFrames (vektorisiert).
- In der Regel deutlich schneller als eine normale UDF.
Nutze sie, wenn: du benutzerdefinierte Spaltenlogik brauchst und bessere Performance willst.
3) mapInPandas (volle Kontrolle pro Partitions-Batch)
- Spark ruft deine Funktion einmal pro Partition-Chunk auf und gibt dir einen Iterator von Pandas DataFrames.
- Du kannst Multi-Column-Logik, komplexe Transformationen und sogar Zeilen-Expansion umsetzen.
- Ideal für „Mini-ETL“-Schritte in Python, weiterhin parallelisiert durch Spark.
Nutze es, wenn: du komplexe Transformationen brauchst, die nicht in die Form „eine Spalte rein → eine Spalte raus“ passen.
Schnelle Entscheidungstabelle
| Du brauchst… | Beste Wahl |
|---|---|
| Einfache Custom-Transformation, kleines Volumen | Normale UDF |
| Spaltenweise Transformation, mittlere/große Daten | Pandas UDF |
| Komplexe Logik: mehrere Spalten, mehrere Output-Zeilen, Joins in pandas, schwere Python-Libs | mapInPandas |
| Maximale Performance, wenn möglich | Eingebaute Spark SQL-Funktionen (alle drei meiden) |
Regel: Built-in Spark functions > Pandas UDF > mapInPandas > normale UDF (typisch, nicht absolut).
Beispieldatensatz
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, DoubleType, StructType, StructField, LongType
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame(
[(" Alice ", "US", 10),
("bob", "UK", 3),
(None, "US", 7)],
["name", "country", "visits"]
)Beispiel für eine normale UDF (einfach, aber langsam)
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
def clean_name(x):
if x is None:
return None
return x.strip().lower()
clean_name_udf = udf(clean_name, StringType())
df_udf = df.withColumn("name_clean", clean_name_udf("name"))
df_udf.show()Vorteile
- Am einfachsten zu verstehen
- Funktioniert überall
Nachteile
- Python-Overhead pro Zeile
- Verhindert oft, dass Spark aggressiv optimieren kann
Pandas UDF Beispiel (vektorisiert, meist schneller)
import pandas as pd
from pyspark.sql.functions import pandas_udf
@pandas_udf("string")
def clean_name_vec(s: pd.Series) -> pd.Series:
return s.str.strip().str.lower()
df_pandas_udf = df.withColumn("name_clean", clean_name_vec("name"))
df_pandas_udf.show()Vorteile
- Batch-Verarbeitung + Vektorisierung
- Deutlich höherer Durchsatz bei großen Spalten
Nachteile
- Erfordert Arrow-Support und eine kompatible Umgebung
- Läuft weiterhin auf der Python-Seite und ist nicht so gut optimierbar wie Built-ins
mapInPandas Beispiel (am flexibelsten)
Use Case: mehrere abgeleitete Spalten ausgeben + Custom-Regeln
Vielleicht willst du:
- bereinigten Namen
- Score basierend auf country und visits
- Bucket-Labels
import pandas as pd
def transform(pdf_iter):
for pdf in pdf_iter:
pdf["name_clean"] = pdf["name"].astype("string").str.strip().str.lower()
pdf["visits"] = pdf["visits"].fillna(0).astype("float64")
pdf["score"] = pdf["visits"] * pdf["country"].eq("US").astype("float64").add(1.0) # US -> 2.0x, else 1.0x
pdf["bucket"] = pd.cut(pdf["visits"], bins=[-1, 0, 5, 999999], labels=["none", "low", "high"])
yield pdf
out_schema = "name string, country string, visits long, name_clean string, score double, bucket string"
df_map = df.mapInPandas(transform, schema=out_schema)
df_map.show()Vorteile
- Extrem flexibel
- Sehr gut für „pandas-Pipelines pro Partition“
- Kann Zeilen expandieren, mehrere Outputs berechnen, externe Libraries nutzen (mit Vorsicht)
Nachteile
- Du musst das Schema korrekt definieren
- Mehr Risiko, aus Versehen Skew / große Partitionen zu erzeugen
- Weiterhin Python/Arrow-Overhead
Und was ist mit Performance?
Du brauchst keine perfekten Benchmarks, um die richtige Wahl zu treffen. Nutze diese praktischen Heuristiken:
Normale UDF ist meist am schlechtesten, wenn:
- zig Millionen Zeilen
- simple Transformationen, die Spark nativ kann
- innerhalb von Filtern/Joins verwendet
Pandas UDF glänzt, wenn:
- du eine oder mehrere Spalten mit vektorisierten Operationen transformierst
- du Logik effizient mit pandas/numpy ausdrücken kannst
mapInPandas ist am besten, wenn:
- du mehrstufige Transformationen brauchst, die in Spark SQL mühsam wären
- du mehrere Spalten auf einmal erzeugen willst
- du Zeilen-Expansion oder komplexe bedingte Logik brauchst
Correctness- und Schema-Fallstricke
Pandas UDF
- Der Output muss exakt zum deklarierten Typ passen.
- Nulls/NaNs können auftauchen; behandle sie explizit.
mapInPandas
- Der Output-DataFrame muss zum Schema passen: Spaltennamen + dtypes + Reihenfolge.
- Vorsicht mit Python-
object-dtype; caste explizit zu string/float.
Die „vermeide das“ Liste (häufige Anti-Patterns)
- Normale UDF für grundlegende String-Operationen (
lower,trim, regex) → nutze Spark Built-ins. - Netzwerk-APIs innerhalb von UDFs aufrufen → langsam, anfällig und schwer sauber zu retryn.
- Schwere Python-Loops pro Zeile in Pandas UDF/mapInPandas → zerstört Vektorisierungs-Vorteile.
- Inkonsistente Typen zurückgeben (manchmal int, manchmal string) → Runtime-Fehler / nulls.
Empfohlener Entscheidungs-Flow
-
Können Spark Built-ins das? → Nutze Built-ins.
-
Ist es eine spaltenweise Transformation (gleiche Anzahl Zeilen rein/raus)? → Nutze Pandas UDF.
-
Brauchst du Multi-Column-Logik, mehrere Output-Spalten oder Zeilen-Expansion? → Nutze
mapInPandas. -
Ist es wenig Daten / schneller Prototyp? → Normale UDF ist akzeptabel.
Ein letzter Tipp: validiere mit Spark-Plänen
Auch ohne vollständige Benchmarks lernst du viel mit:
df_pandas_udf.explain(True)Wenn du siehst, dass Spark nach dem Hinzufügen deiner Funktion weniger optimiert, ist das ein Hinweis, Built-ins zu probieren oder die Logik umzustrukturieren.