Python Multiprocessing: Guia de Processamento Paralelo para Velocidade
Updated on
O modelo de execução single-threaded do Python atinge um limite ao processar grandes conjuntos de dados ou realizar cálculos intensivos de CPU. Um script que leva 10 minutos para processar dados poderia teoricamente executar em 2 minutos numa máquina de 5 núcleos, mas o Global Interpreter Lock (GIL) do Python impede que threads padrão alcancem paralelismo verdadeiro. O resultado são núcleos de CPU desperdiçados e desenvolvedores frustrados vendo seus processadores multi-core ficarem ociosos enquanto Python processa tarefas uma de cada vez.
Este gargalo custa tempo e dinheiro reais. Cientistas de dados esperam horas por treinamento de modelo que poderia terminar em minutos. Web scrapers rastejam a uma fração de sua velocidade potencial. Pipelines de processamento de imagem que deveriam aproveitar todos os núcleos disponíveis em vez disso avançam lentamente usando apenas um.
O módulo multiprocessing resolve isso criando processos Python separados, cada um com seu próprio interpretador e espaço de memória. Diferente de threads, processos ignoram completamente o GIL, permitindo verdadeira execução paralela através de núcleos de CPU. Este guia mostra como aproveitar multiprocessing para melhorias dramáticas de desempenho, desde execução paralela básica até padrões avançados como pools de processos e memória compartilhada.
Entendendo o Problema do GIL
O Global Interpreter Lock (GIL) é um mutex que protege o acesso a objetos Python, impedindo que múltiplas threads executem bytecode Python simultaneamente. Mesmo numa máquina de 16 núcleos, threads Python executam uma de cada vez para tarefas vinculadas à CPU.
import threading
import time
def cpu_bound_task(n):
count = 0
for i in range(n):
count += i * i
return count
# Threading NÃO paraleliza trabalho vinculado à CPU
start = time.time()
threads = [threading.Thread(target=cpu_bound_task, args=(10_000_000,)) for _ in range(4)]
for t in threads: t.start()
for t in threads: t.join()
print(f"Threading: {time.time() - start:.2f}s") # ~mesmo tempo que single-threadedO GIL apenas libera durante operações de I/O (leituras de arquivo, requisições de rede), tornando threading útil para tarefas vinculadas a I/O mas ineficaz para trabalho vinculado à CPU. Multiprocessing ignora o GIL executando interpretadores Python separados em paralelo.
Multiprocessing Básico com Process
A classe Process cria um novo processo Python que executa independentemente. Cada processo tem seu próprio espaço de memória e interpretador Python.
from multiprocessing import Process
import os
def worker(name):
print(f"Worker {name} executando no processo {os.getpid()}")
result = sum(i*i for i in range(5_000_000))
print(f"Worker {name} finalizado: {result}")
if __name__ == '__main__':
processes = []
# Criar 4 processos
for i in range(4):
p = Process(target=worker, args=(f"#{i}",))
processes.append(p)
p.start()
# Aguardar todos completarem
for p in processes:
p.join()
print("Todos os processos completados")Requisito crítico: Sempre use a proteção if __name__ == '__main__' no Windows e macOS. Sem ela, processos filhos irão recursivamente gerar mais processos, causando uma bomba fork.
Process Pool: Execução Paralela Simplificada
Pool gerencia um pool de processos worker, distribuindo tarefas automaticamente. Este é o padrão de multiprocessing mais comum.
from multiprocessing import Pool
import time
def process_item(x):
"""Simula trabalho intensivo de CPU"""
time.sleep(0.1)
return x * x
if __name__ == '__main__':
data = range(100)
# Processamento sequencial
start = time.time()
results_seq = [process_item(x) for x in data]
seq_time = time.time() - start
# Processamento paralelo com 4 workers
start = time.time()
with Pool(processes=4) as pool:
results_par = pool.map(process_item, data)
par_time = time.time() - start
print(f"Sequencial: {seq_time:.2f}s")
print(f"Paralelo (4 núcleos): {par_time:.2f}s")
print(f"Aceleração: {seq_time/par_time:.2f}x")Comparação de Métodos do Pool
Diferentes métodos do Pool se adequam a diferentes casos de uso:
| Método | Caso de Uso | Bloqueia | Retorna | Múltiplos Args |
|---|---|---|---|---|
map() | Paralelização simples | Sim | Lista ordenada | Não (arg único) |
map_async() | Map não bloqueante | Não | AsyncResult | Não |
starmap() | Múltiplos argumentos | Sim | Lista ordenada | Sim (desempacotamento de tupla) |
starmap_async() | Starmap não bloqueante | Não | AsyncResult | Sim |
apply() | Chamada de função única | Sim | Resultado único | Sim |
apply_async() | Apply não bloqueante | Não | AsyncResult | Sim |
imap() | Iterador lazy | Sim | Iterador | Não |
imap_unordered() | Lazy, desordenado | Sim | Iterador | Não |
from multiprocessing import Pool
def add(x, y):
return x + y
def power(x, exp):
return x ** exp
if __name__ == '__main__':
with Pool(4) as pool:
# map: argumento único
squares = pool.map(lambda x: x**2, [1, 2, 3, 4])
# starmap: múltiplos argumentos (desempacota tuplas)
results = pool.starmap(add, [(1, 2), (3, 4), (5, 6)])
# apply_async: chamada única não bloqueante
async_result = pool.apply_async(power, (2, 10))
result = async_result.get() # bloqueia até estar pronto
# imap: avaliação lazy para grandes datasets
for result in pool.imap(lambda x: x**2, range(1000)):
pass # processa um de cada vez conforme resultados chegamComunicação Inter-Processo
Processos não compartilham memória por padrão. Use Queue ou Pipe para comunicação.
Queue: Passagem de Mensagem Thread-Safe
from multiprocessing import Process, Queue
def producer(queue, items):
for item in items:
queue.put(item)
print(f"Produzido: {item}")
queue.put(None) # valor sentinela
def consumer(queue):
while True:
item = queue.get()
if item is None:
break
print(f"Consumido: {item}")
# Processar item...
if __name__ == '__main__':
q = Queue()
items = [1, 2, 3, 4, 5]
prod = Process(target=producer, args=(q, items))
cons = Process(target=consumer, args=(q,))
prod.start()
cons.start()
prod.join()
cons.join()Pipe: Comunicação Bidirecional
from multiprocessing import Process, Pipe
def worker(conn):
conn.send("Olá do worker")
msg = conn.recv()
print(f"Worker recebeu: {msg}")
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=worker, args=(child_conn,))
p.start()
msg = parent_conn.recv()
print(f"Parent recebeu: {msg}")
parent_conn.send("Olá do parent")
p.join()Memória Compartilhada e Estado
Embora processos tenham memória separada, multiprocessing fornece primitivas de memória compartilhada.
Value e Array: Primitivas Compartilhadas
from multiprocessing import Process, Value, Array
import time
def increment_counter(counter, lock):
for _ in range(100_000):
with lock:
counter.value += 1
def fill_array(arr, start, end):
for i in range(start, end):
arr[i] = i * i
if __name__ == '__main__':
# Valor compartilhado com lock
counter = Value('i', 0)
lock = counter.get_lock()
processes = [Process(target=increment_counter, args=(counter, lock)) for _ in range(4)]
for p in processes: p.start()
for p in processes: p.join()
print(f"Counter: {counter.value}") # Deveria ser 400,000
# Array compartilhado
shared_arr = Array('i', 1000)
p1 = Process(target=fill_array, args=(shared_arr, 0, 500))
p2 = Process(target=fill_array, args=(shared_arr, 500, 1000))
p1.start(); p2.start()
p1.join(); p2.join()
print(f"Array[100]: {shared_arr[100]}") # 10,000Manager: Objetos Compartilhados Complexos
from multiprocessing import Process, Manager
def update_dict(shared_dict, key, value):
shared_dict[key] = value
if __name__ == '__main__':
with Manager() as manager:
# Dict, list, namespace compartilhados
shared_dict = manager.dict()
shared_list = manager.list()
processes = [
Process(target=update_dict, args=(shared_dict, f"key{i}", i*10))
for i in range(5)
]
for p in processes: p.start()
for p in processes: p.join()
print(dict(shared_dict)) # {'key0': 0, 'key1': 10, ...}Comparação: Multiprocessing vs Threading vs Asyncio
| Característica | Multiprocessing | Threading | Asyncio | concurrent.futures |
|---|---|---|---|---|
| Ignora GIL | Sim | Não | Não | Depende do executor |
| Tarefas vinculadas à CPU | Excelente | Fraco | Fraco | Excelente (ProcessPoolExecutor) |
| Tarefas vinculadas a I/O | Bom | Excelente | Excelente | Excelente (ThreadPoolExecutor) |
| Overhead de memória | Alto (processos separados) | Baixo (memória compartilhada) | Baixo | Varia |
| Custo de inicialização | Alto | Baixo | Muito baixo | Varia |
| Comunicação | Queue, Pipe, memória compartilhada | Direto (estado compartilhado) | Nativo async/await | Futures |
| Melhor para | Tarefas paralelas intensivas de CPU | Tarefas vinculadas a I/O, concorrência simples | Async I/O, muitas tarefas concorrentes | API unificada para ambos |
# Usar multiprocessing para vinculado à CPU
from multiprocessing import Pool
def cpu_bound(n):
return sum(i*i for i in range(n))
with Pool(4) as pool:
results = pool.map(cpu_bound, [10_000_000] * 4)
# Usar threading para vinculado a I/O
import threading
import requests
def fetch_url(url):
return requests.get(url).text
threads = [threading.Thread(target=fetch_url, args=(url,)) for url in urls]
for t in threads: t.start()
for t in threads: t.join()
# Usar asyncio para I/O assíncrono
import asyncio
import aiohttp
async def fetch_async(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()
asyncio.run(asyncio.gather(*[fetch_async(url) for url in urls]))Avançado: ProcessPoolExecutor
concurrent.futures.ProcessPoolExecutor fornece uma interface de nível superior com a mesma API que ThreadPoolExecutor.
from concurrent.futures import ProcessPoolExecutor, as_completed
import time
def process_task(x):
time.sleep(0.1)
return x * x
if __name__ == '__main__':
# Context manager garante limpeza
with ProcessPoolExecutor(max_workers=4) as executor:
# Submeter tarefas individuais
futures = [executor.submit(process_task, i) for i in range(20)]
# Processar conforme completam
for future in as_completed(futures):
result = future.result()
print(f"Resultado: {result}")
# Ou usar map (como Pool.map)
results = executor.map(process_task, range(20))
print(list(results))Vantagens sobre Pool:
- Mesma API para
ThreadPoolExecutoreProcessPoolExecutor - Interface Futures para mais controle
- Melhor tratamento de erros
- Mais fácil misturar código sync e async
Padrões Comuns
Tarefas Embaraçosamente Paralelas
Tarefas sem dependências são ideais para multiprocessing:
from multiprocessing import Pool
import pandas as pd
def process_chunk(chunk):
"""Processa um pedaço de dados independentemente"""
chunk['new_col'] = chunk['value'] * 2
return chunk.groupby('category').sum()
if __name__ == '__main__':
df = pd.DataFrame({'category': ['A', 'B'] * 5000, 'value': range(10000)})
# Dividir em pedaços
chunks = [df.iloc[i:i+2500] for i in range(0, len(df), 2500)]
with Pool(4) as pool:
results = pool.map(process_chunk, chunks)
# Combinar resultados
final = pd.concat(results).groupby('category').sum()Padrão Map-Reduce
from multiprocessing import Pool
from functools import reduce
def mapper(text):
"""Map: extrair palavras e contar"""
words = text.lower().split()
return {word: 1 for word in words}
def reducer(dict1, dict2):
"""Reduce: mesclar contagens de palavras"""
for word, count in dict2.items():
dict1[word] = dict1.get(word, 0) + count
return dict1
if __name__ == '__main__':
documents = ["olá mundo", "mundo do python", "olá python"] * 1000
with Pool(4) as pool:
# Fase Map: paralela
word_dicts = pool.map(mapper, documents)
# Fase Reduce: sequencial (ou usar redução em árvore)
word_counts = reduce(reducer, word_dicts)
print(word_counts)Producer-Consumer com Múltiplos Producers
from multiprocessing import Process, Queue, cpu_count
def producer(queue, producer_id, items):
for item in items:
queue.put((producer_id, item))
print(f"Producer {producer_id} finalizado")
def consumer(queue, num_producers):
finished_producers = 0
while finished_producers < num_producers:
if not queue.empty():
item = queue.get()
if item is None:
finished_producers += 1
else:
producer_id, data = item
print(f"Consumido do producer {producer_id}: {data}")
if __name__ == '__main__':
q = Queue()
num_producers = 3
# Iniciar producers
producers = [
Process(target=producer, args=(q, i, range(i*10, (i+1)*10)))
for i in range(num_producers)
]
for p in producers: p.start()
# Iniciar consumer
cons = Process(target=consumer, args=(q, num_producers))
cons.start()
# Limpeza
for p in producers: p.join()
for _ in range(num_producers):
q.put(None) # Sinalizar consumer
cons.join()Considerações de Desempenho
Quando Multiprocessing Ajuda
- Tarefas vinculadas à CPU: Processamento de dados, cálculos matemáticos, processamento de imagens
- Grandes datasets: Quando tempo de processamento por item justifica overhead do processo
- Tarefas independentes: Sem estado compartilhado ou comunicação mínima
Quando Multiprocessing Prejudica
Overhead de criação de processo pode exceder benefícios para:
from multiprocessing import Pool
import time
def tiny_task(x):
return x + 1
if __name__ == '__main__':
data = range(100)
# Sequencial é mais rápido para tarefas pequenas
start = time.time()
results = [tiny_task(x) for x in data]
print(f"Sequencial: {time.time() - start:.4f}s") # ~0.0001s
start = time.time()
with Pool(4) as pool:
results = pool.map(tiny_task, data)
print(f"Paralelo: {time.time() - start:.4f}s") # ~0.05s (500x mais lento!)Regras práticas:
- Duração mínima de tarefa: ~0.1 segundos por item
- Tamanho de dados: Se fazer pickle dos dados leva mais tempo que processar, use memória compartilhada
- Número de workers: Comece com
cpu_count(), ajuste baseado em características da tarefa
Requisitos de Pickling
Apenas objetos picklable podem ser passados entre processos:
from multiprocessing import Pool
# ❌ Funções lambda não são picklable
# pool.map(lambda x: x*2, range(10)) # Falha
# ✅ Usar funções nomeadas
def double(x):
return x * 2
with Pool(4) as pool:
pool.map(double, range(10))
# ❌ Funções locais em notebooks falham
# def process():
# def inner(x): return x*2
# pool.map(inner, range(10)) # Falha
# ✅ Definir em nível de módulo ou usar functools.partial
from functools import partial
def multiply(x, factor):
return x * factor
with Pool(4) as pool:
pool.map(partial(multiply, factor=3), range(10))Depurar Código Paralelo com RunCell
Depurar código de multiprocessing é notoriamente difícil. Declarações print desaparecem, breakpoints não funcionam, e stack traces são crípticos. Quando processos travam silenciosamente ou produzem resultados incorretos, ferramentas de depuração tradicionais falham.
RunCell (www.runcell.dev (opens in a new tab)) é um AI Agent construído para Jupyter que se destaca em depurar código paralelo. Diferente de depuradores padrão que não conseguem seguir execução através de processos, RunCell analisa seus padrões de multiprocessing, identifica condições de corrida, detecta erros de pickling antes do runtime, e explica por que processos ficam bloqueados.
Quando um worker do Pool trava sem traceback, RunCell pode inspecionar a fila de erros e mostrar exatamente qual chamada de função falhou e por quê. Quando estado compartilhado produz resultados errados, RunCell rastreia padrões de acesso à memória para encontrar o lock faltante. Para cientistas de dados depurando pipelines de dados paralelos complexos, RunCell transforma horas de depuração com declarações print em minutos de correções guiadas por IA.
Melhores Práticas
1. Sempre Use a Proteção if name
# ✅ Correto
if __name__ == '__main__':
with Pool(4) as pool:
pool.map(func, data)
# ❌ Errado - causa bomba fork no Windows
with Pool(4) as pool:
pool.map(func, data)2. Feche Pools Explicitamente
# ✅ Context manager (recomendado)
with Pool(4) as pool:
results = pool.map(func, data)
# ✅ Close e join explícitos
pool = Pool(4)
results = pool.map(func, data)
pool.close()
pool.join()
# ❌ Vaza recursos
pool = Pool(4)
results = pool.map(func, data)3. Trate Exceções
from multiprocessing import Pool
def risky_task(x):
if x == 5:
raise ValueError("Valor ruim")
return x * 2
if __name__ == '__main__':
with Pool(4) as pool:
try:
results = pool.map(risky_task, range(10))
except ValueError as e:
print(f"Tarefa falhou: {e}")
# Ou tratar individualmente com apply_async
async_results = [pool.apply_async(risky_task, (i,)) for i in range(10)]
for i, ar in enumerate(async_results):
try:
result = ar.get()
print(f"Resultado {i}: {result}")
except ValueError:
print(f"Tarefa {i} falhou")4. Evite Estado Compartilhado Quando Possível
# ❌ Estado compartilhado requer sincronização
from multiprocessing import Process, Value
counter = Value('i', 0)
def increment():
for _ in range(100000):
counter.value += 1 # Condição de corrida!
# ✅ Usar locks ou evitar compartilhamento
from multiprocessing import Lock
lock = Lock()
def increment_safe():
for _ in range(100000):
with lock:
counter.value += 1
# ✅ Ainda melhor: evitar estado compartilhado
def count_locally(n):
return n # Retornar resultado ao invés
with Pool(4) as pool:
results = pool.map(count_locally, [100000] * 4)
total = sum(results)5. Escolha o Número Certo de Workers
from multiprocessing import cpu_count, Pool
# Vinculado à CPU: usar todos os núcleos
num_workers = cpu_count()
# Vinculado a I/O: pode usar mais workers
num_workers = cpu_count() * 2
# Carga de trabalho mista: ajustar empiricamente
with Pool(processes=num_workers) as pool:
results = pool.map(func, data)Erros Comuns
1. Esquecer a Proteção if name
Leva a geração infinita de processos no Windows/macOS.
2. Tentar Fazer Pickle de Objetos Não-Picklable
# ❌ Métodos de classe, lambdas, funções locais falham
class DataProcessor:
def process(self, x):
return x * 2
dp = DataProcessor()
# pool.map(dp.process, data) # Falha
# ✅ Usar funções de nível superior
def process(x):
return x * 2
with Pool(4) as pool:
pool.map(process, data)3. Não Tratar Terminação de Processo
# ❌ Não limpa apropriadamente
pool = Pool(4)
results = pool.map(func, data)
# pool ainda executando
# ✅ Sempre close e join
pool = Pool(4)
try:
results = pool.map(func, data)
finally:
pool.close()
pool.join()4. Transferência Excessiva de Dados
# ❌ Fazer pickle de objetos enormes é lento
large_data = [np.random.rand(1000, 1000) for _ in range(100)]
with Pool(4) as pool:
pool.map(process_array, large_data) # Serialização lenta
# ✅ Usar memória compartilhada ou arquivos mapeados em memória
import numpy as np
from multiprocessing import shared_memory
# Criar memória compartilhada
shm = shared_memory.SharedMemory(create=True, size=1000*1000*8)
arr = np.ndarray((1000, 1000), dtype=np.float64, buffer=shm.buf)
# Passar apenas o nome e forma
def process_shared(name, shape):
existing_shm = shared_memory.SharedMemory(name=name)
arr = np.ndarray(shape, dtype=np.float64, buffer=existing_shm.buf)
# Processar arr...
existing_shm.close()
with Pool(4) as pool:
pool.starmap(process_shared, [(shm.name, (1000, 1000))] * 4)
shm.close()
shm.unlink()FAQ
Como multiprocessing ignora o GIL?
O GIL (Global Interpreter Lock) é um mutex em cada interpretador Python que impede múltiplas threads de executar bytecode Python simultaneamente. Multiprocessing ignora isso criando processos Python separados, cada um com seu próprio interpretador e GIL. Já que processos não compartilham memória, eles executam verdadeiramente em paralelo através de núcleos de CPU sem contenção do GIL.
Quando devo usar multiprocessing vs threading?
Use multiprocessing para tarefas vinculadas à CPU (processamento de dados, cálculos, manipulação de imagens) onde o GIL limita desempenho. Use threading para tarefas vinculadas a I/O (requisições de rede, operações de arquivo) onde o GIL libera durante I/O, permitindo que threads trabalhem concorrentemente. Threading tem overhead menor mas não pode paralelizar trabalho de CPU devido ao GIL.
Por que preciso da proteção if name == 'main'?
No Windows e macOS, processos filhos importam o módulo principal para acessar funções. Sem a proteção, importar o módulo executa o código de criação do Pool novamente, gerando processos infinitos (bomba fork). Linux usa fork() que não requer imports, mas a proteção ainda é melhor prática para código multiplataforma.
Quantos processos worker devo usar?
Para tarefas vinculadas à CPU, comece com cpu_count() (número de núcleos de CPU). Mais workers que núcleos causa overhead de troca de contexto. Para tarefas vinculadas a I/O, você pode usar mais workers (2-4x núcleos) já que processos esperam em I/O. Sempre faça benchmark com sua carga de trabalho específica, já que overhead de memória e transferência de dados pode limitar a contagem ótima de workers.
Quais objetos posso passar para funções de multiprocessing?
Objetos devem ser picklable (serializáveis com pickle). Isso inclui tipos integrados (int, str, list, dict), arrays NumPy, DataFrames pandas, e a maioria das classes definidas pelo usuário. Lambdas, funções locais, métodos de classe, handles de arquivo, conexões de banco de dados e locks de thread não podem ser pickled. Defina funções em nível de módulo ou use functools.partial para aplicação parcial.
Conclusão
Python multiprocessing transforma gargalos vinculados à CPU em operações paralelas que escalam com núcleos disponíveis. Ao ignorar o GIL através de processos separados, você alcança paralelismo verdadeiro impossível com threading. A interface Pool simplifica padrões comuns, enquanto Queue, Pipe e memória compartilhada permitem workflows inter-processo complexos.
Comece com Pool.map() para tarefas embaraçosamente paralelas, meça a aceleração e otimize a partir daí. Lembre-se da proteção if __name__ == '__main__', mantenha tarefas de granulação grossa para amortizar overhead de processo, e minimize transferência de dados entre processos. Quando depuração fica complexa, ferramentas como RunCell podem ajudar a rastrear execução através de limites de processo.
Multiprocessing não é sempre a resposta. Para trabalho vinculado a I/O, threading ou asyncio podem ser mais simples e rápidos. Mas quando você está processando grandes datasets, treinando modelos, ou realizando cálculos pesados, multiprocessing entrega o desempenho para o qual sua máquina multi-core foi construída.