Skip to content

PySpark UDF vs Pandas UDF vs mapInPandas: Welche solltest du verwenden?

Wenn du in PySpark eigene Logik brauchst, greifst du typischerweise zu einem von drei Werkzeugen:

  • Normale Python UDF (udf(...))
  • Pandas UDF (@pandas_udf)
  • mapInPandas (DataFrame → Iterator von Pandas DataFrames)

Alle können „Python auf Spark ausführen“, aber sie unterscheiden sich stark in Performance, Flexibilität und darin, wie viel von Sparks Optimierungen erhalten bleibt.

Dieser Guide bietet dir einen praktischen Entscheidungsrahmen plus Beispiele zum Kopieren.


Mentales Modell: Was ändert sich zwischen den drei Optionen?

1) Normale UDF (Python zeilenweise)

  • Spark schickt Spalten an Python-Worker-Prozesse.
  • Deine Funktion läuft eine Zeile nach der anderen.
  • Oft die langsamste Option.
  • Kann Sparks Optimizer und Code-Generierung ausbremsen.

Nutze sie, wenn: die Logik simpel ist, die Datenmenge klein ist oder Geschwindigkeit egal ist.


2) Pandas UDF (vektorisierte Batches via Arrow)

  • Spark sendet Daten in spaltenorientierten Batches über Apache Arrow nach Python.
  • Deine Funktion arbeitet auf Pandas Series / DataFrames (vektorisiert).
  • In der Regel deutlich schneller als eine normale UDF.

Nutze sie, wenn: du benutzerdefinierte Spaltenlogik brauchst und bessere Performance willst.


3) mapInPandas (volle Kontrolle pro Partitions-Batch)

  • Spark ruft deine Funktion einmal pro Partition-Chunk auf und gibt dir einen Iterator von Pandas DataFrames.
  • Du kannst Multi-Column-Logik, komplexe Transformationen und sogar Zeilen-Expansion umsetzen.
  • Ideal für „Mini-ETL“-Schritte in Python, weiterhin parallelisiert durch Spark.

Nutze es, wenn: du komplexe Transformationen brauchst, die nicht in die Form „eine Spalte rein → eine Spalte raus“ passen.


Schnelle Entscheidungstabelle

Du brauchst…Beste Wahl
Einfache Custom-Transformation, kleines VolumenNormale UDF
Spaltenweise Transformation, mittlere/große DatenPandas UDF
Komplexe Logik: mehrere Spalten, mehrere Output-Zeilen, Joins in pandas, schwere Python-LibsmapInPandas
Maximale Performance, wenn möglichEingebaute Spark SQL-Funktionen (alle drei meiden)

Regel: Built-in Spark functions > Pandas UDF > mapInPandas > normale UDF (typisch, nicht absolut).


Beispieldatensatz

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, DoubleType, StructType, StructField, LongType
 
spark = SparkSession.builder.getOrCreate()
 
df = spark.createDataFrame(
    [(" Alice  ", "US", 10),
     ("bob", "UK", 3),
     (None, "US", 7)],
    ["name", "country", "visits"]
)

Beispiel für eine normale UDF (einfach, aber langsam)

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
 
def clean_name(x):
    if x is None:
        return None
    return x.strip().lower()
 
clean_name_udf = udf(clean_name, StringType())
 
df_udf = df.withColumn("name_clean", clean_name_udf("name"))
df_udf.show()

Vorteile

  • Am einfachsten zu verstehen
  • Funktioniert überall

Nachteile

  • Python-Overhead pro Zeile
  • Verhindert oft, dass Spark aggressiv optimieren kann

Pandas UDF Beispiel (vektorisiert, meist schneller)

import pandas as pd
from pyspark.sql.functions import pandas_udf
 
@pandas_udf("string")
def clean_name_vec(s: pd.Series) -> pd.Series:
    return s.str.strip().str.lower()
 
df_pandas_udf = df.withColumn("name_clean", clean_name_vec("name"))
df_pandas_udf.show()

Vorteile

  • Batch-Verarbeitung + Vektorisierung
  • Deutlich höherer Durchsatz bei großen Spalten

Nachteile

  • Erfordert Arrow-Support und eine kompatible Umgebung
  • Läuft weiterhin auf der Python-Seite und ist nicht so gut optimierbar wie Built-ins

mapInPandas Beispiel (am flexibelsten)

Use Case: mehrere abgeleitete Spalten ausgeben + Custom-Regeln

Vielleicht willst du:

  • bereinigten Namen
  • Score basierend auf country und visits
  • Bucket-Labels
import pandas as pd
 
def transform(pdf_iter):
    for pdf in pdf_iter:
        pdf["name_clean"] = pdf["name"].astype("string").str.strip().str.lower()
        pdf["visits"] = pdf["visits"].fillna(0).astype("float64")
        pdf["score"] = pdf["visits"] * pdf["country"].eq("US").astype("float64").add(1.0)  # US -> 2.0x, else 1.0x
        pdf["bucket"] = pd.cut(pdf["visits"], bins=[-1, 0, 5, 999999], labels=["none", "low", "high"])
        yield pdf
 
out_schema = "name string, country string, visits long, name_clean string, score double, bucket string"
 
df_map = df.mapInPandas(transform, schema=out_schema)
df_map.show()

Vorteile

  • Extrem flexibel
  • Sehr gut für „pandas-Pipelines pro Partition“
  • Kann Zeilen expandieren, mehrere Outputs berechnen, externe Libraries nutzen (mit Vorsicht)

Nachteile

  • Du musst das Schema korrekt definieren
  • Mehr Risiko, aus Versehen Skew / große Partitionen zu erzeugen
  • Weiterhin Python/Arrow-Overhead

Und was ist mit Performance?

Du brauchst keine perfekten Benchmarks, um die richtige Wahl zu treffen. Nutze diese praktischen Heuristiken:

Normale UDF ist meist am schlechtesten, wenn:

  • zig Millionen Zeilen
  • simple Transformationen, die Spark nativ kann
  • innerhalb von Filtern/Joins verwendet

Pandas UDF glänzt, wenn:

  • du eine oder mehrere Spalten mit vektorisierten Operationen transformierst
  • du Logik effizient mit pandas/numpy ausdrücken kannst

mapInPandas ist am besten, wenn:

  • du mehrstufige Transformationen brauchst, die in Spark SQL mühsam wären
  • du mehrere Spalten auf einmal erzeugen willst
  • du Zeilen-Expansion oder komplexe bedingte Logik brauchst

Correctness- und Schema-Fallstricke

Pandas UDF

  • Der Output muss exakt zum deklarierten Typ passen.
  • Nulls/NaNs können auftauchen; behandle sie explizit.

mapInPandas

  • Der Output-DataFrame muss zum Schema passen: Spaltennamen + dtypes + Reihenfolge.
  • Vorsicht mit Python-object-dtype; caste explizit zu string/float.

Die „vermeide das“ Liste (häufige Anti-Patterns)

  • Normale UDF für grundlegende String-Operationen (lower, trim, regex) → nutze Spark Built-ins.
  • Netzwerk-APIs innerhalb von UDFs aufrufen → langsam, anfällig und schwer sauber zu retryn.
  • Schwere Python-Loops pro Zeile in Pandas UDF/mapInPandas → zerstört Vektorisierungs-Vorteile.
  • Inkonsistente Typen zurückgeben (manchmal int, manchmal string) → Runtime-Fehler / nulls.

Empfohlener Entscheidungs-Flow

  1. Können Spark Built-ins das? → Nutze Built-ins.

  2. Ist es eine spaltenweise Transformation (gleiche Anzahl Zeilen rein/raus)? → Nutze Pandas UDF.

  3. Brauchst du Multi-Column-Logik, mehrere Output-Spalten oder Zeilen-Expansion? → Nutze mapInPandas.

  4. Ist es wenig Daten / schneller Prototyp? → Normale UDF ist akzeptabel.


Ein letzter Tipp: validiere mit Spark-Plänen

Auch ohne vollständige Benchmarks lernst du viel mit:

df_pandas_udf.explain(True)

Wenn du siehst, dass Spark nach dem Hinzufügen deiner Funktion weniger optimiert, ist das ein Hinweis, Built-ins zu probieren oder die Logik umzustrukturieren.