Skip to content

PySpark Select、Filter 和 withColumn:核心 DataFrame 配方

Updated on

凌乱的 DataFrame 会拖慢整个管道:错误的列选择、不安全的过滤条件、以及脆弱的类型转换,都会制造隐蔽的错误。团队需要一套可重复的模式,用于“选列、过滤、派生列”,从而让 Spark 作业可预测、可维护。

PySpark 的 selectfilter/wherewithColumn API 可以通过显式、类型安全、可测试的转换方式解决这些问题。本文将展示关键用法模式,以及如何避免常见陷阱。

想在不写代码的情况下,从 Python Pandas DataFrame 快速创建数据可视化?

PyGWalker 是一个用于可视化探索性数据分析的 Python 库。PyGWalker (opens in a new tab) 可以简化你在 Jupyter Notebook 中的数据分析和数据可视化工作流,把 pandas dataframe(以及 polars dataframe)变成类似 tableau 的交互式可视化界面。

PyGWalker for Data visualization (opens in a new tab)

阅读导航

任务API适用场景
选择 / 重命名列selectalias只保留需要的列;简单场景尽量避免 selectExpr
行过滤filter / where两者完全相同;用 & 和 `
派生 / 条件列withColumnwhen/otherwise通过逻辑新增或替换列
类 SQL 表达式selectExprexpr便捷进行算术或 SQL 函数调用,无需大量导入
安全类型转换casttry_cast(Spark 3.5+)强制类型而不在坏值上直接抛错

环境准备与示例数据

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
 
spark = SparkSession.builder.appName("select-filter-withcolumn").getOrCreate()
 
df = spark.createDataFrame(
    [
        (1, "Alice", "2025-11-01", "premium", "42"),
        (2, "Bob",   "2025-11-02", "basic",   "x"),
        (3, "Cara",  "2025-11-02", None,      "7"),
    ],
    "id INT, name STRING, signup_date STRING, tier STRING, score STRING",
)

只选你需要的列

slim = df.select(
    "id",
    F.col("name").alias("customer_name"),
    F.to_date("signup_date").alias("signup_dt"),
)
  • 使用 select 明确指定投影,避免对宽表进行不必要的扫描。
  • 使用 alias 为列起更易读的名字。

Filter / where:完全等价的 API

active = slim.where(
    (F.col("signup_dt") >= "2025-11-01")
    & (F.col("customer_name") != "Bob")
)
  • filterwhere 完全等价,只是可读性选择问题。
  • &| 组合条件时,一定给每个条件加括号。
  • 空值判断使用 isNull() / isNotNull(),避免三值逻辑带来的意外。

使用 withColumn 创建派生列和条件列

scored = active.withColumn(
    "score_int",
    F.when(F.col("score").rlike("^[0-9]+$"), F.col("score").cast("int"))
     .otherwise(None),
)
  • withColumn 既可以新增列也可以覆盖原列;保持列名唯一,避免无意间覆盖。
  • when/otherwise 让分支逻辑清晰可读。

使用 selectExpr 快速编写表达式

expr_df = df.selectExpr(
    "id",
    "upper(name) AS name_upper",
    "to_date(signup_date) AS signup_dt",
    "CASE WHEN tier = 'premium' THEN 1 ELSE 0 END AS is_premium",
)
  • 适用于类 SQL 的计算场景,无需频繁从 functions 导入函数。
  • 复杂逻辑更推荐用常规 withColumn,以便维护和测试。

安全类型转换模式

typed = (
    df
    .withColumn("score_int", F.col("score").cast("int"))
    .withColumn("signup_ts", F.to_timestamp("signup_date"))
)
  • 优先使用显式转换;不要依赖 CSV 等数据源的自动推断 schema。
  • 在 Spark 3.5+ 中,try_cast 会在坏值上返回 null,而不是直接失败。

快速进行数据质量检查

from pyspark.sql import functions as F
 
bad_counts = df.select(
    F.sum(F.col("score").rlike("^[0-9]+$").cast("int")).alias("valid_scores"),
    F.sum(F.col("score").rlike("^[^0-9]").cast("int")).alias("invalid_scores"),
)
  • 在写出数据前做校验;小规模聚合足以在早期暴露问题。
  • 使用 count 配合 isNull 来评估关键字段的缺失比例。

常见坑与解决方式

  • 布尔条件未加括号:结合 & / | 时,一定给每个条件加括号。
  • 不小心覆盖列:用 df.columns 检查,或在 withColumn 中使用新列名。
  • 字符串日期未解析就比较:比较前先用 to_date / to_timestamp 转换。
  • 空值敏感比较:用 isNull / isNotNull 替代直接的 = null 比较。

简洁的管道示例

clean = (
    df
    .select("id", "name", "signup_date", "tier", "score")
    .where(F.col("tier").isNotNull())
    .withColumn("signup_dt", F.to_date("signup_date"))
    .withColumn(
        "score_int",
        F.when(F.col("score").rlike("^[0-9]+$"), F.col("score").cast("int")),
    )
)

这个示例在写出数据前完成了:精简列选择、显式过滤条件、以及带类型的派生列计算,为下游任务打好基础。