Skip to content

PySpark Leer y Escribir CSV y Parquet: Guía de IO Confiable

Updated on

Los proyectos de datos se detienen cuando los archivos se cargan de forma incorrecta, los esquemas cambian inesperadamente o las escrituras sobrescriben datos válidos. PySpark resuelve estos problemas con opciones de lectura/escritura consistentes entre CSV y Parquet, pero los valores por defecto pueden sorprender a equipos que manejan conjuntos de datos variados.

Las APIs DataFrameReader / DataFrameWriter de PySpark proporcionan una ingesta y exportación predecibles. Al elegir el formato y el modo de guardado adecuados, añadir control de esquema y particionar de forma inteligente, los equipos mantienen las canalizaciones estables y con buen rendimiento.

¿Quieres crear visualizaciones de datos rápidamente desde un DataFrame de Python Pandas sin escribir código?

PyGWalker es una biblioteca de Python para Análisis Exploratorio de Datos con visualización. PyGWalker (opens in a new tab) puede simplificar tu flujo de trabajo de análisis y visualización de datos en Jupyter Notebook, convirtiendo tu pandas dataframe (y polars dataframe) en una interfaz de usuario alternativa a Tableau para exploración visual.

PyGWalker for Data visualization (opens in a new tab)

CSV vs Parquet de un vistazo

TemaCSVParquet
EsquemaInferido a menos que se proporcione; deriva de tipos comúnEsquema almacenado en el archivo; tipos estables
CompresiónNinguna por defecto; archivos más grandesColumnar + compresión; más pequeño y rápido
Velocidad de lecturaBasado en filas; más lento para lecturas ampliasColumnar; selección de columnas más rápida
Mejor paraIntercambio con herramientas externasAnalítica, lecturas repetidas, data lakes particionados

Configuración rápida

from pyspark.sql import SparkSession
 
spark = SparkSession.builder.appName("io-guide").getOrCreate()

Lectura de CSV con control

df_csv = (
    spark.read
    .option("header", True)
    .option("inferSchema", False)
    .schema("id INT, name STRING, ts TIMESTAMP")
    .option("delimiter", ",")
    .option("mode", "DROPMALFORMED")
    .csv("s3://bucket/raw/users/*.csv")
)
  • header lee los nombres de columnas de la primera fila.
  • Proporciona un esquema para evitar sorpresas de tipos y acelerar las lecturas.
  • Opciones de mode: PERMISSIVE (por defecto), DROPMALFORMED, FAILFAST.

Lectura de Parquet

df_parquet = spark.read.parquet("s3://bucket/curated/users/")
  • Parquet almacena esquema y estadísticas; inferSchema no es necesario.
  • El recorte de columnas (column pruning) y el predicate pushdown mejoran el rendimiento.

Escritura de CSV de forma segura

(
    df_csv
    .repartition(1)  # reduce output shards when needed
    .write
    .option("header", True)
    .option("delimiter", ",")
    .mode("overwrite")
    .csv("s3://bucket/exports/users_csv")
)
  • Usa repartition para controlar el número de archivos; evita archivos únicos y gigantes.
  • Elige el mode cuidadosamente (ver tabla abajo).

Escritura de Parquet con particiones

(
    df_parquet
    .write
    .mode("append")
    .partitionBy("country", "ingest_date")
    .parquet("s3://bucket/warehouse/users_parquet")
)
  • partitionBy crea directorios para filtros comunes y acelera las consultas.
  • Mantén las claves de partición con baja cardinalidad para evitar muchos archivos pequeños.

Comportamientos de los modos de guardado

modeComportamiento
error / errorifexistsPor defecto; falla si la ruta existe
overwriteReemplaza los datos existentes en la ruta
appendAñade nuevos archivos a la ruta existente
ignoreOmite la escritura si la ruta existe

Consejos para evolución de esquemas

  • Prefiere Parquet cuando se espera evolución de esquema; aprovecha mergeSchema para actualizaciones compatibles.
  • Para CSV, versiona los esquemas explícitamente y valida las columnas antes de escribir.
  • Al añadir columnas, escribe Parquet con el nuevo esquema y asegúrate de que los lectores utilicen la definición de tabla más reciente.

Errores comunes y cómo solucionarlos

  • Delimitadores que no coinciden: establece delimiter explícitamente; evita confiar en los valores por defecto.
  • Tipos incorrectos: proporciona schema en lugar de inferSchema=True en entornos de producción.
  • Demasiados archivos pequeños: usa coalesce o repartition antes de escribir; considera maxRecordsPerFile.
  • Sobrescrituras accidentales: publica salidas en una ruta temporal y luego muévela de forma atómica si tu capa de almacenamiento lo permite.

Receta mínima de punta a punta

schema = "user_id BIGINT, event STRING, ts TIMESTAMP, country STRING"
 
df = (
    spark.read
    .option("header", True)
    .schema(schema)
    .csv("s3://bucket/raw/events/*.csv")
)
 
cleaned = df.dropna(subset=["user_id", "event"]).withColumnRenamed("ts", "event_ts")
 
(
    cleaned
    .write
    .mode("append")
    .partitionBy("country")
    .parquet("s3://bucket/warehouse/events_parquet")
)