PySpark Select, Filter, and withColumn: Core DataFrame Recipes
Updated on
雑多な DataFrame はパイプライン全体を遅くします。不要な列、危険なフィルタ条件、もろいキャストは、気づきにくいエラーを生みます。Spark ジョブを予測可能に保つには、列の選択・フィルタリング・派生列作成のための再利用しやすいパターンが必要です。
PySpark の select、filter/where、withColumn API を使うと、変換を明示的かつ型安全にし、テストしやすくできます。本ガイドでは主要なパターンを示し、よくある落とし穴の避け方を解説します。
Python Pandas DataFrame からノーコードで素早く Data Visualization を作成したいですか?
PyGWalker は可視化付き Exploratory Data Analysis のための Python ライブラリです。PyGWalker (opens in a new tab) を使うと、pandas dataframe(および polars dataframe)を、可視的に探索できる tableau 代替のユーザーインターフェースへと変換し、Jupyter Notebook におけるデータ分析・データ可視化のワークフローを簡素化できます。
読み方ガイド(Reader’s map)
| タスク | API | 使う場面 |
|---|---|---|
| 列の選択・リネーム | select, alias | 必要な列だけを残したいとき。シンプルな場合は selectExpr を避ける |
| 行のフィルタリング | filter / where | どちらも同じ挙動。条件は & や ` |
| 派生列・条件付き列 | withColumn, when/otherwise | ロジック付きで列を追加・置き換えたいとき |
| SQL ライクな式 | selectExpr, expr | 多数の import なしに算術や SQL 関数を書きたいとき |
| 安全なキャスト | cast, try_cast (Spark 3.5+) | 不正値で落とさずに型を強制したいとき |
セットアップとサンプルデータ
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.appName("select-filter-withcolumn").getOrCreate()
df = spark.createDataFrame(
[
(1, "Alice", "2025-11-01", "premium", "42"),
(2, "Bob", "2025-11-02", "basic", "x"),
(3, "Cara", "2025-11-02", None, "7"),
],
"id INT, name STRING, signup_date STRING, tier STRING, score STRING",
)必要な列だけを選択する
slim = df.select(
"id",
F.col("name").alias("customer_name"),
F.to_date("signup_date").alias("signup_dt"),
)selectで投影を明示し、不要に幅広いスキャンを避ける。- 分かりやすい列名には
aliasを使う。
filter / where: 同一 API
active = slim.where(
(F.col("signup_dt") >= "2025-11-01")
& (F.col("customer_name") != "Bob")
)filterとwhereは同じ動作。読みやすさでどちらかに統一するとよい。- 条件は
&と|で組み合わせる。各条件はかならず括弧で囲む。 - null チェックには
isNull()/isNotNull()を使い、思わぬ挙動を避ける。
withColumn で派生列・条件付き値を作る
scored = active.withColumn(
"score_int",
F.when(F.col("score").rlike("^[0-9]+$"), F.col("score").cast("int"))
.otherwise(None),
)withColumnは列の追加・置き換えの両方に使われる。意図せぬ上書きを防ぐには列名の一意性を意識する。when/otherwiseで分岐ロジックを明確に表現できる。
selectExpr で手早く式を書く
expr_df = df.selectExpr(
"id",
"upper(name) AS name_upper",
"to_date(signup_date) AS signup_dt",
"CASE WHEN tier = 'premium' THEN 1 ELSE 0 END AS is_premium",
)- 多数の
functionsimport を書かずに、SQL スタイルの計算を手早く書きたいときに便利。 - 複雑なロジックは、可読性とテスト容易性のために通常の
withColumnで書く方がよい。
安全なキャストパターン
typed = (
df
.withColumn("score_int", F.col("score").cast("int"))
.withColumn("signup_ts", F.to_timestamp("signup_date"))
)- キャストは明示的に行う。CSV のスキーマ推論に頼るのは避ける。
- Spark 3.5+ では、不正値で失敗させたくない場合に
try_castを使うと null を返してくれる。
データ品質をざっくりチェックする
from pyspark.sql import functions as F
bad_counts = df.select(
F.sum(F.col("score").rlike("^[0-9]+$").cast("int")).alias("valid_scores"),
F.sum(F.col("score").rlike("^[^0-9]").cast("int")).alias("invalid_scores"),
)- 書き込み前にバリデーションを行う。軽量な集計で早めに問題を炙り出す。
- 重要な列については、
countとisNullを組み合わせて欠損率を把握する。
ありがちな落とし穴と対処法
- 条件式の括弧漏れ:
&/|を使うときは、各ブール条件を必ず括弧で囲む。 - 意図しない列の上書き:
df.columnsを確認するか、withColumnで新しい列名を使う。 - 文字列日付をパースせず比較: 比較の前に
to_date/to_timestampで日付型に変換する。 - null に敏感な比較: 三値論理による予期せぬ挙動を避けるため、
isNull/isNotNullを使う。
最小構成のパイプライン例
clean = (
df
.select("id", "name", "signup_date", "tier", "score")
.where(F.col("tier").isNotNull())
.withColumn("signup_dt", F.to_date("signup_date"))
.withColumn(
"score_int",
F.when(F.col("score").rlike("^[0-9]+$"), F.col("score").cast("int")),
)
)このように、列の選択を絞り、フィルタ条件を明示し、派生列を適切な型で作成してから後続処理へ渡すことで、堅牢なパイプラインを構築できます。
