Skip to content

PySpark Select, Filter, and withColumn: Core DataFrame Recipes

Updated on

雑多な DataFrame はパイプライン全体を遅くします。不要な列、危険なフィルタ条件、もろいキャストは、気づきにくいエラーを生みます。Spark ジョブを予測可能に保つには、列の選択・フィルタリング・派生列作成のための再利用しやすいパターンが必要です。

PySpark の selectfilter/wherewithColumn API を使うと、変換を明示的かつ型安全にし、テストしやすくできます。本ガイドでは主要なパターンを示し、よくある落とし穴の避け方を解説します。

Python Pandas DataFrame からノーコードで素早く Data Visualization を作成したいですか?

PyGWalker は可視化付き Exploratory Data Analysis のための Python ライブラリです。PyGWalker (opens in a new tab) を使うと、pandas dataframe(および polars dataframe)を、可視的に探索できる tableau 代替のユーザーインターフェースへと変換し、Jupyter Notebook におけるデータ分析・データ可視化のワークフローを簡素化できます。

PyGWalker for Data visualization (opens in a new tab)

読み方ガイド(Reader’s map)

タスクAPI使う場面
列の選択・リネームselect, alias必要な列だけを残したいとき。シンプルな場合は selectExpr を避ける
行のフィルタリングfilter / whereどちらも同じ挙動。条件は & や `
派生列・条件付き列withColumn, when/otherwiseロジック付きで列を追加・置き換えたいとき
SQL ライクな式selectExpr, expr多数の import なしに算術や SQL 関数を書きたいとき
安全なキャストcast, try_cast (Spark 3.5+)不正値で落とさずに型を強制したいとき

セットアップとサンプルデータ

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
 
spark = SparkSession.builder.appName("select-filter-withcolumn").getOrCreate()
 
df = spark.createDataFrame(
    [
        (1, "Alice", "2025-11-01", "premium", "42"),
        (2, "Bob",   "2025-11-02", "basic",   "x"),
        (3, "Cara",  "2025-11-02", None,      "7"),
    ],
    "id INT, name STRING, signup_date STRING, tier STRING, score STRING",
)

必要な列だけを選択する

slim = df.select(
    "id",
    F.col("name").alias("customer_name"),
    F.to_date("signup_date").alias("signup_dt"),
)
  • select で投影を明示し、不要に幅広いスキャンを避ける。
  • 分かりやすい列名には alias を使う。

filter / where: 同一 API

active = slim.where(
    (F.col("signup_dt") >= "2025-11-01")
    & (F.col("customer_name") != "Bob")
)
  • filterwhere は同じ動作。読みやすさでどちらかに統一するとよい。
  • 条件は &| で組み合わせる。各条件はかならず括弧で囲む。
  • null チェックには isNull() / isNotNull() を使い、思わぬ挙動を避ける。

withColumn で派生列・条件付き値を作る

scored = active.withColumn(
    "score_int",
    F.when(F.col("score").rlike("^[0-9]+$"), F.col("score").cast("int"))
     .otherwise(None),
)
  • withColumn は列の追加・置き換えの両方に使われる。意図せぬ上書きを防ぐには列名の一意性を意識する。
  • when/otherwise で分岐ロジックを明確に表現できる。

selectExpr で手早く式を書く

expr_df = df.selectExpr(
    "id",
    "upper(name) AS name_upper",
    "to_date(signup_date) AS signup_dt",
    "CASE WHEN tier = 'premium' THEN 1 ELSE 0 END AS is_premium",
)
  • 多数の functions import を書かずに、SQL スタイルの計算を手早く書きたいときに便利。
  • 複雑なロジックは、可読性とテスト容易性のために通常の withColumn で書く方がよい。

安全なキャストパターン

typed = (
    df
    .withColumn("score_int", F.col("score").cast("int"))
    .withColumn("signup_ts", F.to_timestamp("signup_date"))
)
  • キャストは明示的に行う。CSV のスキーマ推論に頼るのは避ける。
  • Spark 3.5+ では、不正値で失敗させたくない場合に try_cast を使うと null を返してくれる。

データ品質をざっくりチェックする

from pyspark.sql import functions as F
 
bad_counts = df.select(
    F.sum(F.col("score").rlike("^[0-9]+$").cast("int")).alias("valid_scores"),
    F.sum(F.col("score").rlike("^[^0-9]").cast("int")).alias("invalid_scores"),
)
  • 書き込み前にバリデーションを行う。軽量な集計で早めに問題を炙り出す。
  • 重要な列については、countisNull を組み合わせて欠損率を把握する。

ありがちな落とし穴と対処法

  • 条件式の括弧漏れ: & / | を使うときは、各ブール条件を必ず括弧で囲む。
  • 意図しない列の上書き: df.columns を確認するか、withColumn で新しい列名を使う。
  • 文字列日付をパースせず比較: 比較の前に to_date / to_timestamp で日付型に変換する。
  • null に敏感な比較: 三値論理による予期せぬ挙動を避けるため、isNull / isNotNull を使う。

最小構成のパイプライン例

clean = (
    df
    .select("id", "name", "signup_date", "tier", "score")
    .where(F.col("tier").isNotNull())
    .withColumn("signup_dt", F.to_date("signup_date"))
    .withColumn(
        "score_int",
        F.when(F.col("score").rlike("^[0-9]+$"), F.col("score").cast("int")),
    )
)

このように、列の選択を絞り、フィルタ条件を明示し、派生列を適切な型で作成してから後続処理へ渡すことで、堅牢なパイプラインを構築できます。