PySpark Select、Filter 和 withColumn:核心 DataFrame 配方
Updated on
凌乱的 DataFrame 会拖慢整个管道:错误的列选择、不安全的过滤条件、以及脆弱的类型转换,都会制造隐蔽的错误。团队需要一套可重复的模式,用于“选列、过滤、派生列”,从而让 Spark 作业可预测、可维护。
PySpark 的 select、filter/where 和 withColumn API 可以通过显式、类型安全、可测试的转换方式解决这些问题。本文将展示关键用法模式,以及如何避免常见陷阱。
想在不写代码的情况下,从 Python Pandas DataFrame 快速创建数据可视化?
PyGWalker 是一个用于可视化探索性数据分析的 Python 库。PyGWalker (opens in a new tab) 可以简化你在 Jupyter Notebook 中的数据分析和数据可视化工作流,把 pandas dataframe(以及 polars dataframe)变成类似 tableau 的交互式可视化界面。
阅读导航
| 任务 | API | 适用场景 |
|---|---|---|
| 选择 / 重命名列 | select、alias | 只保留需要的列;简单场景尽量避免 selectExpr |
| 行过滤 | filter / where | 两者完全相同;用 & 和 ` |
| 派生 / 条件列 | withColumn、when/otherwise | 通过逻辑新增或替换列 |
| 类 SQL 表达式 | selectExpr、expr | 便捷进行算术或 SQL 函数调用,无需大量导入 |
| 安全类型转换 | cast、try_cast(Spark 3.5+) | 强制类型而不在坏值上直接抛错 |
环境准备与示例数据
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.appName("select-filter-withcolumn").getOrCreate()
df = spark.createDataFrame(
[
(1, "Alice", "2025-11-01", "premium", "42"),
(2, "Bob", "2025-11-02", "basic", "x"),
(3, "Cara", "2025-11-02", None, "7"),
],
"id INT, name STRING, signup_date STRING, tier STRING, score STRING",
)只选你需要的列
slim = df.select(
"id",
F.col("name").alias("customer_name"),
F.to_date("signup_date").alias("signup_dt"),
)- 使用
select明确指定投影,避免对宽表进行不必要的扫描。 - 使用
alias为列起更易读的名字。
Filter / where:完全等价的 API
active = slim.where(
(F.col("signup_dt") >= "2025-11-01")
& (F.col("customer_name") != "Bob")
)filter和where完全等价,只是可读性选择问题。- 用
&和|组合条件时,一定给每个条件加括号。 - 空值判断使用
isNull()/isNotNull(),避免三值逻辑带来的意外。
使用 withColumn 创建派生列和条件列
scored = active.withColumn(
"score_int",
F.when(F.col("score").rlike("^[0-9]+$"), F.col("score").cast("int"))
.otherwise(None),
)withColumn既可以新增列也可以覆盖原列;保持列名唯一,避免无意间覆盖。when/otherwise让分支逻辑清晰可读。
使用 selectExpr 快速编写表达式
expr_df = df.selectExpr(
"id",
"upper(name) AS name_upper",
"to_date(signup_date) AS signup_dt",
"CASE WHEN tier = 'premium' THEN 1 ELSE 0 END AS is_premium",
)- 适用于类 SQL 的计算场景,无需频繁从
functions导入函数。 - 复杂逻辑更推荐用常规
withColumn,以便维护和测试。
安全类型转换模式
typed = (
df
.withColumn("score_int", F.col("score").cast("int"))
.withColumn("signup_ts", F.to_timestamp("signup_date"))
)- 优先使用显式转换;不要依赖 CSV 等数据源的自动推断 schema。
- 在 Spark 3.5+ 中,
try_cast会在坏值上返回 null,而不是直接失败。
快速进行数据质量检查
from pyspark.sql import functions as F
bad_counts = df.select(
F.sum(F.col("score").rlike("^[0-9]+$").cast("int")).alias("valid_scores"),
F.sum(F.col("score").rlike("^[^0-9]").cast("int")).alias("invalid_scores"),
)- 在写出数据前做校验;小规模聚合足以在早期暴露问题。
- 使用
count配合isNull来评估关键字段的缺失比例。
常见坑与解决方式
- 布尔条件未加括号:结合
&/|时,一定给每个条件加括号。 - 不小心覆盖列:用
df.columns检查,或在withColumn中使用新列名。 - 字符串日期未解析就比较:比较前先用
to_date/to_timestamp转换。 - 空值敏感比较:用
isNull/isNotNull替代直接的= null比较。
简洁的管道示例
clean = (
df
.select("id", "name", "signup_date", "tier", "score")
.where(F.col("tier").isNotNull())
.withColumn("signup_dt", F.to_date("signup_date"))
.withColumn(
"score_int",
F.when(F.col("score").rlike("^[0-9]+$"), F.col("score").cast("int")),
)
)这个示例在写出数据前完成了:精简列选择、显式过滤条件、以及带类型的派生列计算,为下游任务打好基础。
