Skip to content

PySpark Select, Filter y withColumn: Recetas básicas de DataFrame

Updated on

Los DataFrames desordenados ralentizan cualquier pipeline: columnas incorrectas, filtros inseguros y casteos frágiles generan errores silenciosos. Los equipos necesitan patrones repetibles para seleccionar, filtrar y derivar columnas que mantengan los jobs de Spark predecibles.

Las APIs select, filter/where y withColumn de PySpark resuelven esto al mantener las transformaciones explícitas, seguras en tipos y comprobables mediante tests. Esta guía muestra los patrones clave y cómo evitar los errores más comunes.

¿Quieres crear visualizaciones de datos rápidamente desde un DataFrame de Python Pandas sin escribir código?

PyGWalker es una librería de Python para Exploratory Data Analysis con visualización. PyGWalker (opens in a new tab) puede simplificar tu flujo de análisis y visualización de datos en Jupyter Notebook, convirtiendo tu pandas dataframe (y polars dataframe) en una interfaz de usuario tipo tableau para exploración visual.

PyGWalker for Data visualization (opens in a new tab)

Mapa para la lectura

TareaAPICuándo usarla
Elegir/renombrar columnasselect, aliasMantener solo las columnas necesarias; evita selectExpr cuando el caso es simple
Filtrado de filasfilter / whereAmbas son idénticas; encadena condiciones con & y |
Columnas derivadas/condicionaleswithColumn, when/otherwiseAñadir o reemplazar columnas con lógica
Expresiones tipo SQLselectExpr, exprAritmética rápida o funciones SQL sin muchos imports
Casteo segurocast, try_cast (Spark 3.5+)Forzar tipos sin lanzar errores en valores inválidos

Configuración y datos de ejemplo

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
 
spark = SparkSession.builder.appName("select-filter-withcolumn").getOrCreate()
 
df = spark.createDataFrame(
    [
        (1, "Alice", "2025-11-01", "premium", "42"),
        (2, "Bob",   "2025-11-02", "basic",   "x"),
        (3, "Cara",  "2025-11-02", None,      "7"),
    ],
    "id INT, name STRING, signup_date STRING, tier STRING, score STRING",
)

Selecciona solo lo que necesitas

slim = df.select(
    "id",
    F.col("name").alias("customer_name"),
    F.to_date("signup_date").alias("signup_dt"),
)
  • select mantiene las proyecciones explícitas y evita lecturas innecesariamente anchas.
  • Usa alias para nombres más legibles.

Filter / where: APIs idénticas

active = slim.where(
    (F.col("signup_dt") >= "2025-11-01")
    & (F.col("customer_name") != "Bob")
)
  • filter y where son lo mismo; elige una por legibilidad.
  • Combina condiciones con & y |; encierra cada condición entre paréntesis.
  • Para nulos: isNull() / isNotNull() evitan comportamientos inesperados.

withColumn para valores derivados y condicionales

scored = active.withColumn(
    "score_int",
    F.when(F.col("score").rlike("^[0-9]+$"), F.col("score").cast("int"))
     .otherwise(None),
)
  • withColumn agrega o reemplaza columnas; mantén nombres únicos para evitar sobrescrituras accidentales.
  • when/otherwise define una lógica de ramas clara.

selectExpr para expresiones rápidas

expr_df = df.selectExpr(
    "id",
    "upper(name) AS name_upper",
    "to_date(signup_date) AS signup_dt",
    "CASE WHEN tier = 'premium' THEN 1 ELSE 0 END AS is_premium",
)
  • Muy útil para cálculos estilo SQL sin muchos functions imports.
  • Mantén la lógica compleja en withColumn normal para mejor legibilidad y testing.

Patrones de casteo seguro

typed = (
    df
    .withColumn("score_int", F.col("score").cast("int"))
    .withColumn("signup_ts", F.to_timestamp("signup_date"))
)
  • Prefiere casteos explícitos; evita depender de la inferencia de esquema en fuentes CSV.
  • En Spark 3.5+, try_cast devuelve null en vez de fallar ante valores inválidos.

Comprobaciones rápidas de calidad de datos

from pyspark.sql import functions as F
 
bad_counts = df.select(
    F.sum(F.col("score").rlike("^[0-9]+$").cast("int")).alias("valid_scores"),
    F.sum(F.col("score").rlike("^[^0-9]").cast("int")).alias("invalid_scores"),
)
  • Valida antes de escribir; pequeñas agregaciones sacan a la luz problemas pronto.
  • Usa count + isNull para medir tasas de datos faltantes en columnas clave.

Errores comunes y cómo corregirlos

  • Condiciones sin paréntesis: encapsula siempre cada cláusula booleana cuando uses & / |.
  • Sobrescritura accidental de columnas: revisa df.columns o usa nombres nuevos con withColumn.
  • Fechas en string sin parsear: convierte con to_date / to_timestamp antes de comparar.
  • Comparaciones sensibles a null: apóyate en isNull / isNotNull para evitar sorpresas con la lógica ternaria de nulls.

Ejemplo de pipeline mínimo

clean = (
    df
    .select("id", "name", "signup_date", "tier", "score")
    .where(F.col("tier").isNotNull())
    .withColumn("signup_dt", F.to_date("signup_date"))
    .withColumn(
        "score_int",
        F.when(F.col("score").rlike("^[0-9]+$"), F.col("score").cast("int")),
    )
)

Esto mantiene las selecciones ajustadas, los filtros explícitos y las columnas derivadas tipadas antes de las escrituras posteriores.