Skip to content

PySpark: Ler e Escrever CSV e Parquet – Guia Confiável de IO

Updated on

Projetos de dados travam quando arquivos são carregados de forma incorreta, schemas mudam silenciosamente ou gravações sobrescrevem dados válidos. PySpark resolve esses problemas com opções consistentes de leitura/escrita para CSV e Parquet, mas os padrões podem surpreender times que lidam com conjuntos de dados variados.

As APIs DataFrameReader/DataFrameWriter do PySpark oferecem ingestão e exportação previsíveis. Ao escolher o formato e o modo de gravação corretos, adicionar controle de schema e particionar de forma inteligente, as equipes mantêm pipelines estáveis e performáticos.

Quer criar rapidamente visualizações de dados a partir de um Pandas DataFrame em Python sem escrever código?

PyGWalker é uma biblioteca Python para Análise Exploratória de Dados com Visualização. O PyGWalker (opens in a new tab) pode simplificar seu fluxo de análise e visualização de dados no Jupyter Notebook, transformando seu pandas dataframe (e polars dataframe) em uma interface de usuário ao estilo tableau para exploração visual.

PyGWalker for Data visualization (opens in a new tab)

CSV vs Parquet em resumo

TópicoCSVParquet
SchemaInferido, a menos que fornecido; derivações de tipo são comunsSchema armazenado no arquivo; tipos estáveis
CompressãoNenhuma por padrão; arquivos maioresColunar + compressão; menor tamanho, mais rápido
Velocidade de leituraBaseado em linhas; mais lento para leituras amplasColunar; seleção de colunas mais rápida
Melhor usoIntercâmbio com ferramentas externasAnalytics, leituras repetidas, data lakes particionados

Configuração rápida

from pyspark.sql import SparkSession
 
spark = SparkSession.builder.appName("io-guide").getOrCreate()

Lendo CSV com controle

df_csv = (
    spark.read
    .option("header", True)
    .option("inferSchema", False)
    .schema("id INT, name STRING, ts TIMESTAMP")
    .option("delimiter", ",")
    .option("mode", "DROPMALFORMED")
    .csv("s3://bucket/raw/users/*.csv")
)
  • header lê os nomes das colunas da primeira linha.
  • Forneça um schema para evitar surpresas de tipos e acelerar leituras.
  • Opções de mode: PERMISSIVE (padrão), DROPMALFORMED, FAILFAST.

Lendo Parquet

df_parquet = spark.read.parquet("s3://bucket/curated/users/")
  • Parquet armazena schema e estatísticas; inferSchema é desnecessário.
  • Column pruning e predicate pushdown melhoram a performance.

Escrevendo CSV com segurança

(
    df_csv
    .repartition(1)  # reduce output shards when needed
    .write
    .option("header", True)
    .option("delimiter", ",")
    .mode("overwrite")
    .csv("s3://bucket/exports/users_csv")
)
  • Use repartition para controlar a quantidade de arquivos; evite um único arquivo gigantesco.
  • Escolha o mode com cuidado (veja tabela abaixo).

Escrevendo Parquet com particionamento

(
    df_parquet
    .write
    .mode("append")
    .partitionBy("country", "ingest_date")
    .parquet("s3://bucket/warehouse/users_parquet")
)
  • partitionBy cria diretórios para filtros comuns, acelerando consultas.
  • Mantenha chaves de partição com baixa cardinalidade para evitar muitos arquivos pequenos.

Comportamento dos modos de gravação

modeComportamento
error / errorifexistsPadrão; falha se o caminho já existir
overwriteSubstitui os dados existentes no caminho
appendAdiciona novos arquivos ao caminho existente
ignoreIgnora a gravação se o caminho já existir

Dicas para evolução de schema

  • Prefira Parquet quando a evolução de schema é esperada; use mergeSchema para atualizações compatíveis.
  • Para CSV, versione schemas explicitamente e valide colunas antes de gravar.
  • Ao adicionar colunas, escreva Parquet com o novo schema e garanta que os consumidores usem a definição de tabela mais recente.

Armadilhas comuns e correções

  • Delimitadores incorretos: defina delimiter explicitamente; não dependa dos padrões.
  • Tipos errados: forneça schema em vez de inferSchema=True em produção.
  • Muitos arquivos pequenos: use coalesce ou repartition antes da escrita; considere maxRecordsPerFile.
  • Acidentes com overwrite: grave primeiro em um caminho temporário e depois mova de forma atômica, se sua camada de armazenamento suportar.

Receita mínima de ponta a ponta

schema = "user_id BIGINT, event STRING, ts TIMESTAMP, country STRING"
 
df = (
    spark.read
    .option("header", True)
    .schema(schema)
    .csv("s3://bucket/raw/events/*.csv")
)
 
cleaned = df.dropna(subset=["user_id", "event"]).withColumnRenamed("ts", "event_ts")
 
(
    cleaned
    .write
    .mode("append")
    .partitionBy("country")
    .parquet("s3://bucket/warehouse/events_parquet")
)