Skip to content

PySpark Select, Filter, and withColumn : recettes essentielles pour les DataFrames

Updated on

Des DataFrames brouillons ralentissent tous les pipelines : mauvaises colonnes, filtres dangereux et casts fragiles créent des erreurs silencieuses. Les équipes ont besoin de modèles reproductibles pour sélectionner, filtrer et dériver des colonnes afin de garder les jobs Spark prévisibles.

Les APIs select, filter/where et withColumn de PySpark résolvent ce problème en gardant les transformations explicites, sûres au niveau des types et testables. Ce guide présente les principaux modèles et la manière d’éviter les pièges courants.

Vous voulez créer rapidement de la Data Visualization à partir d’un DataFrame Python Pandas sans écrire de code ?

PyGWalker est une bibliothèque Python pour l’Exploratory Data Analysis avec Visualisation. PyGWalker (opens in a new tab) peut simplifier votre workflow d’analyse et de visualisation de données dans Jupyter Notebook, en transformant votre pandas dataframe (et polars dataframe) en une interface utilisateur alternative à tableau pour l’exploration visuelle.

PyGWalker for Data visualization (opens in a new tab)

Carte de lecture

TâcheAPIQuand l’utiliser
Sélectionner / renommer des colonnesselect, aliasNe garder que les colonnes nécessaires ; éviter selectExpr pour les cas simples
Filtrage de lignesfilter / whereIdentiques ; chaîner les conditions avec & et |
Colonnes dérivées / conditionnelleswithColumn, when/otherwiseAjouter ou remplacer des colonnes avec de la logique métier
Expressions type SQLselectExpr, exprCalculs rapides ou fonctions SQL sans trop d’imports
Casts sûrscast, try_cast (Spark 3.5+)Imposer les types sans planter sur les mauvaises valeurs

Configuration et données d’exemple

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
 
spark = SparkSession.builder.appName("select-filter-withcolumn").getOrCreate()
 
df = spark.createDataFrame(
    [
        (1, "Alice", "2025-11-01", "premium", "42"),
        (2, "Bob",   "2025-11-02", "basic",   "x"),
        (3, "Cara",  "2025-11-02", None,      "7"),
    ],
    "id INT, name STRING, signup_date STRING, tier STRING, score STRING",
)

Ne sélectionner que ce dont vous avez besoin

slim = df.select(
    "id",
    F.col("name").alias("customer_name"),
    F.to_date("signup_date").alias("signup_dt"),
)
  • select rend les projections explicites et évite les scans de largeurs inutiles.
  • Utilisez alias pour des noms plus parlants.

filter / where : APIs identiques

active = slim.where(
    (F.col("signup_dt") >= "2025-11-01")
    & (F.col("customer_name") != "Bob")
)
  • filter et where sont équivalents ; choisissez selon la lisibilité.
  • Combinez les conditions avec & et | ; mettez chaque condition entre parenthèses.
  • Vérifications de null : isNull() / isNotNull() évitent les surprises.

withColumn pour les valeurs dérivées et conditionnelles

scored = active.withColumn(
    "score_int",
    F.when(F.col("score").rlike("^[0-9]+$"), F.col("score").cast("int"))
     .otherwise(None),
)
  • withColumn ajoute ou remplace ; gardez des noms uniques pour éviter les écrasements accidentels.
  • when/otherwise permet de définir clairement la logique de branchement.

selectExpr pour des expressions rapides

expr_df = df.selectExpr(
    "id",
    "upper(name) AS name_upper",
    "to_date(signup_date) AS signup_dt",
    "CASE WHEN tier = 'premium' THEN 1 ELSE 0 END AS is_premium",
)
  • Pratique pour des calculs de style SQL sans multiplier les imports functions.
  • Gardez la logique complexe dans des withColumn classiques pour la lisibilité et les tests.

Modèles de cast sûrs

typed = (
    df
    .withColumn("score_int", F.col("score").cast("int"))
    .withColumn("signup_ts", F.to_timestamp("signup_date"))
)
  • Préférez les casts explicites ; évitez de compter sur l’inférence de schéma des sources CSV.
  • Sur Spark 3.5+, try_cast renvoie null au lieu d’échouer sur les valeurs invalides.

Vérifications rapides de la qualité des données

from pyspark.sql import functions as F
 
bad_counts = df.select(
    F.sum(F.col("score").rlike("^[0-9]+$").cast("int")).alias("valid_scores"),
    F.sum(F.col("score").rlike("^[^0-9]").cast("int")).alias("invalid_scores"),
)
  • Validez avant d’écrire ; de petites agrégations font remonter les problèmes tôt.
  • Utilisez count + isNull pour mesurer les taux de valeurs manquantes sur les colonnes clés.

Pièges courants et solutions

  • Conditions sans parenthèses : encapsulez toujours chaque clause booléenne quand vous utilisez & / |.
  • Écrasement accidentel de colonne : vérifiez df.columns ou utilisez de nouveaux noms avec withColumn.
  • Dates en chaîne non parsées : convertissez avec to_date / to_timestamp avant les comparaisons.
  • Comparaisons sensibles aux null : utilisez isNull / isNotNull pour éviter les surprises liées à la logique à trois valeurs.

Exemple de pipeline minimal

clean = (
    df
    .select("id", "name", "signup_date", "tier", "score")
    .where(F.col("tier").isNotNull())
    .withColumn("signup_dt", F.to_date("signup_date"))
    .withColumn(
        "score_int",
        F.when(F.col("score").rlike("^[0-9]+$"), F.col("score").cast("int")),
    )
)

Ce pipeline garde les sélections resserrées, les filtres explicites et les colonnes dérivées typées avant les écritures en aval.