Skip to content
Temas
Python
Python Multiprocessing: Parallel Processing Guide for Speed

Python Multiprocessing: Guía de Procesamiento Paralelo para Mayor Velocidad

Updated on

El modelo de ejecución de un solo hilo de Python alcanza un límite al procesar grandes conjuntos de datos o realizar cálculos intensivos de CPU. Un script que tarda 10 minutos en procesar datos podría teóricamente ejecutarse en 2 minutos en una máquina de 5 núcleos, pero el Global Interpreter Lock (GIL) de Python impide que los hilos estándar logren un verdadero paralelismo. El resultado son núcleos de CPU desperdiciados y desarrolladores frustrados viendo cómo sus procesadores multinúcleo permanecen inactivos mientras Python procesa tareas de una en una.

Este cuello de botella cuesta tiempo y dinero real. Los científicos de datos esperan horas para entrenar modelos que podrían finalizar en minutos. Los web scrapers rastrean a una fracción de su velocidad potencial. Los pipelines de procesamiento de imágenes que deberían aprovechar todos los núcleos disponibles en su lugar avanzan lentamente usando solo uno.

El módulo multiprocessing resuelve esto creando procesos de Python separados, cada uno con su propio intérprete y espacio de memoria. A diferencia de los hilos, los procesos evitan completamente el GIL, permitiendo una verdadera ejecución paralela en los núcleos de CPU. Esta guía te muestra cómo aprovechar multiprocessing para mejoras dramáticas de rendimiento, desde ejecución paralela básica hasta patrones avanzados como pools de procesos y memoria compartida.

📚

Entendiendo el Problema del GIL

El Global Interpreter Lock (GIL) es un mutex que protege el acceso a los objetos de Python, evitando que múltiples hilos ejecuten bytecode de Python simultáneamente. Incluso en una máquina de 16 núcleos, los hilos de Python se ejecutan uno a la vez para tareas vinculadas a CPU.

import threading
import time
 
def cpu_bound_task(n):
    count = 0
    for i in range(n):
        count += i * i
    return count
 
# Threading NO paraleliza trabajo vinculado a CPU
start = time.time()
threads = [threading.Thread(target=cpu_bound_task, args=(10_000_000,)) for _ in range(4)]
for t in threads: t.start()
for t in threads: t.join()
print(f"Threading: {time.time() - start:.2f}s")  # ~mismo tiempo que single-threaded

El GIL solo se libera durante operaciones de I/O (lecturas de archivos, peticiones de red), haciendo que threading sea útil para tareas vinculadas a I/O pero inefectivo para trabajo vinculado a CPU. Multiprocessing evita el GIL ejecutando intérpretes de Python separados en paralelo.

Multiprocessing Básico con Process

La clase Process crea un nuevo proceso de Python que se ejecuta independientemente. Cada proceso tiene su propio espacio de memoria e intérprete de Python.

from multiprocessing import Process
import os
 
def worker(name):
    print(f"Worker {name} ejecutándose en proceso {os.getpid()}")
    result = sum(i*i for i in range(5_000_000))
    print(f"Worker {name} finalizado: {result}")
 
if __name__ == '__main__':
    processes = []
 
    # Crear 4 procesos
    for i in range(4):
        p = Process(target=worker, args=(f"#{i}",))
        processes.append(p)
        p.start()
 
    # Esperar a que todos se completen
    for p in processes:
        p.join()
 
    print("Todos los procesos completados")

Requisito crítico: Siempre usa la protección if __name__ == '__main__' en Windows y macOS. Sin ella, los procesos hijos generarán recursivamente más procesos, causando una bomba de fork.

Process Pool: Ejecución Paralela Simplificada

Pool gestiona un pool de procesos worker, distribuyendo tareas automáticamente. Este es el patrón de multiprocessing más común.

from multiprocessing import Pool
import time
 
def process_item(x):
    """Simula trabajo intensivo de CPU"""
    time.sleep(0.1)
    return x * x
 
if __name__ == '__main__':
    data = range(100)
 
    # Procesamiento secuencial
    start = time.time()
    results_seq = [process_item(x) for x in data]
    seq_time = time.time() - start
 
    # Procesamiento paralelo con 4 workers
    start = time.time()
    with Pool(processes=4) as pool:
        results_par = pool.map(process_item, data)
    par_time = time.time() - start
 
    print(f"Secuencial: {seq_time:.2f}s")
    print(f"Paralelo (4 núcleos): {par_time:.2f}s")
    print(f"Aceleración: {seq_time/par_time:.2f}x")

Comparación de Métodos de Pool

Diferentes métodos de Pool se adaptan a diferentes casos de uso:

MétodoCaso de UsoBloqueaRetornaMúltiples Args
map()Paralelización simpleLista ordenadaNo (arg único)
map_async()Map no bloqueanteNoAsyncResultNo
starmap()Múltiples argumentosLista ordenadaSí (desempaquetado de tupla)
starmap_async()Starmap no bloqueanteNoAsyncResult
apply()Llamada de función únicaResultado único
apply_async()Apply no bloqueanteNoAsyncResult
imap()Iterador lazyIteradorNo
imap_unordered()Lazy, desordenadoIteradorNo
from multiprocessing import Pool
 
def add(x, y):
    return x + y
 
def power(x, exp):
    return x ** exp
 
if __name__ == '__main__':
    with Pool(4) as pool:
        # map: argumento único
        squares = pool.map(lambda x: x**2, [1, 2, 3, 4])
 
        # starmap: múltiples argumentos (desempaqueta tuplas)
        results = pool.starmap(add, [(1, 2), (3, 4), (5, 6)])
 
        # apply_async: llamada única no bloqueante
        async_result = pool.apply_async(power, (2, 10))
        result = async_result.get()  # bloquea hasta estar listo
 
        # imap: evaluación lazy para datasets grandes
        for result in pool.imap(lambda x: x**2, range(1000)):
            pass  # procesa uno a la vez conforme llegan resultados

Comunicación Inter-Proceso

Los procesos no comparten memoria por defecto. Usa Queue o Pipe para comunicación.

Queue: Paso de Mensajes Thread-Safe

from multiprocessing import Process, Queue
 
def producer(queue, items):
    for item in items:
        queue.put(item)
        print(f"Producido: {item}")
    queue.put(None)  # valor centinela
 
def consumer(queue):
    while True:
        item = queue.get()
        if item is None:
            break
        print(f"Consumido: {item}")
        # Procesar item...
 
if __name__ == '__main__':
    q = Queue()
    items = [1, 2, 3, 4, 5]
 
    prod = Process(target=producer, args=(q, items))
    cons = Process(target=consumer, args=(q,))
 
    prod.start()
    cons.start()
 
    prod.join()
    cons.join()

Pipe: Comunicación Bidireccional

from multiprocessing import Process, Pipe
 
def worker(conn):
    conn.send("Hola desde el worker")
    msg = conn.recv()
    print(f"Worker recibió: {msg}")
    conn.close()
 
if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=worker, args=(child_conn,))
    p.start()
 
    msg = parent_conn.recv()
    print(f"Parent recibió: {msg}")
    parent_conn.send("Hola desde el parent")
 
    p.join()

Memoria Compartida y Estado

Aunque los procesos tienen memoria separada, multiprocessing proporciona primitivas de memoria compartida.

Value y Array: Primitivas Compartidas

from multiprocessing import Process, Value, Array
import time
 
def increment_counter(counter, lock):
    for _ in range(100_000):
        with lock:
            counter.value += 1
 
def fill_array(arr, start, end):
    for i in range(start, end):
        arr[i] = i * i
 
if __name__ == '__main__':
    # Valor compartido con lock
    counter = Value('i', 0)
    lock = counter.get_lock()
 
    processes = [Process(target=increment_counter, args=(counter, lock)) for _ in range(4)]
    for p in processes: p.start()
    for p in processes: p.join()
 
    print(f"Counter: {counter.value}")  # Debería ser 400,000
 
    # Array compartido
    shared_arr = Array('i', 1000)
    p1 = Process(target=fill_array, args=(shared_arr, 0, 500))
    p2 = Process(target=fill_array, args=(shared_arr, 500, 1000))
    p1.start(); p2.start()
    p1.join(); p2.join()
 
    print(f"Array[100]: {shared_arr[100]}")  # 10,000

Manager: Objetos Compartidos Complejos

from multiprocessing import Process, Manager
 
def update_dict(shared_dict, key, value):
    shared_dict[key] = value
 
if __name__ == '__main__':
    with Manager() as manager:
        # Dict, list, namespace compartidos
        shared_dict = manager.dict()
        shared_list = manager.list()
 
        processes = [
            Process(target=update_dict, args=(shared_dict, f"key{i}", i*10))
            for i in range(5)
        ]
 
        for p in processes: p.start()
        for p in processes: p.join()
 
        print(dict(shared_dict))  # {'key0': 0, 'key1': 10, ...}

Comparación: Multiprocessing vs Threading vs Asyncio

CaracterísticaMultiprocessingThreadingAsyncioconcurrent.futures
Evita GILNoNoDepende del executor
Tareas vinculadas a CPUExcelentePobrePobreExcelente (ProcessPoolExecutor)
Tareas vinculadas a I/OBuenoExcelenteExcelenteExcelente (ThreadPoolExecutor)
Overhead de memoriaAlto (procesos separados)Bajo (memoria compartida)BajoVaría
Costo de inicioAltoBajoMuy bajoVaría
ComunicaciónQueue, Pipe, memoria compartidaDirecto (estado compartido)Nativo async/awaitFutures
Mejor paraTareas paralelas intensivas de CPUTareas vinculadas a I/O, concurrencia simpleAsync I/O, muchas tareas concurrentesAPI unificada para ambos
# Usar multiprocessing para vinculado a CPU
from multiprocessing import Pool
 
def cpu_bound(n):
    return sum(i*i for i in range(n))
 
with Pool(4) as pool:
    results = pool.map(cpu_bound, [10_000_000] * 4)
 
# Usar threading para vinculado a I/O
import threading
import requests
 
def fetch_url(url):
    return requests.get(url).text
 
threads = [threading.Thread(target=fetch_url, args=(url,)) for url in urls]
for t in threads: t.start()
for t in threads: t.join()
 
# Usar asyncio para I/O asíncrono
import asyncio
import aiohttp
 
async def fetch_async(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.text()
 
asyncio.run(asyncio.gather(*[fetch_async(url) for url in urls]))

Avanzado: ProcessPoolExecutor

concurrent.futures.ProcessPoolExecutor proporciona una interfaz de nivel superior con la misma API que ThreadPoolExecutor.

from concurrent.futures import ProcessPoolExecutor, as_completed
import time
 
def process_task(x):
    time.sleep(0.1)
    return x * x
 
if __name__ == '__main__':
    # El context manager asegura limpieza
    with ProcessPoolExecutor(max_workers=4) as executor:
        # Enviar tareas individuales
        futures = [executor.submit(process_task, i) for i in range(20)]
 
        # Procesar a medida que se completan
        for future in as_completed(futures):
            result = future.result()
            print(f"Resultado: {result}")
 
        # O usar map (como Pool.map)
        results = executor.map(process_task, range(20))
        print(list(results))

Ventajas sobre Pool:

  • Misma API para ThreadPoolExecutor y ProcessPoolExecutor
  • Interfaz de Futures para mayor control
  • Mejor manejo de errores
  • Más fácil mezclar código sync y async

Patrones Comunes

Tareas Embarazosamente Paralelas

Las tareas sin dependencias son ideales para multiprocessing:

from multiprocessing import Pool
import pandas as pd
 
def process_chunk(chunk):
    """Procesa un fragmento de datos independientemente"""
    chunk['new_col'] = chunk['value'] * 2
    return chunk.groupby('category').sum()
 
if __name__ == '__main__':
    df = pd.DataFrame({'category': ['A', 'B'] * 5000, 'value': range(10000)})
 
    # Dividir en fragmentos
    chunks = [df.iloc[i:i+2500] for i in range(0, len(df), 2500)]
 
    with Pool(4) as pool:
        results = pool.map(process_chunk, chunks)
 
    # Combinar resultados
    final = pd.concat(results).groupby('category').sum()

Patrón Map-Reduce

from multiprocessing import Pool
from functools import reduce
 
def mapper(text):
    """Map: extraer palabras y contar"""
    words = text.lower().split()
    return {word: 1 for word in words}
 
def reducer(dict1, dict2):
    """Reduce: fusionar conteos de palabras"""
    for word, count in dict2.items():
        dict1[word] = dict1.get(word, 0) + count
    return dict1
 
if __name__ == '__main__':
    documents = ["hola mundo", "mundo de python", "hola python"] * 1000
 
    with Pool(4) as pool:
        # Fase Map: paralela
        word_dicts = pool.map(mapper, documents)
 
    # Fase Reduce: secuencial (o usar reducción en árbol)
    word_counts = reduce(reducer, word_dicts)
    print(word_counts)

Producer-Consumer con Múltiples Producers

from multiprocessing import Process, Queue, cpu_count
 
def producer(queue, producer_id, items):
    for item in items:
        queue.put((producer_id, item))
    print(f"Producer {producer_id} finalizado")
 
def consumer(queue, num_producers):
    finished_producers = 0
    while finished_producers < num_producers:
        if not queue.empty():
            item = queue.get()
            if item is None:
                finished_producers += 1
            else:
                producer_id, data = item
                print(f"Consumido del producer {producer_id}: {data}")
 
if __name__ == '__main__':
    q = Queue()
    num_producers = 3
 
    # Iniciar producers
    producers = [
        Process(target=producer, args=(q, i, range(i*10, (i+1)*10)))
        for i in range(num_producers)
    ]
    for p in producers: p.start()
 
    # Iniciar consumer
    cons = Process(target=consumer, args=(q, num_producers))
    cons.start()
 
    # Limpieza
    for p in producers: p.join()
    for _ in range(num_producers):
        q.put(None)  # Señalar al consumer
    cons.join()

Consideraciones de Rendimiento

Cuándo Ayuda Multiprocessing

  • Tareas vinculadas a CPU: Procesamiento de datos, cálculos matemáticos, procesamiento de imágenes
  • Datasets grandes: Cuando el tiempo de procesamiento por elemento justifica el overhead del proceso
  • Tareas independientes: Sin estado compartido o comunicación mínima

Cuándo Multiprocessing Perjudica

El overhead de creación de procesos puede exceder los beneficios para:

from multiprocessing import Pool
import time
 
def tiny_task(x):
    return x + 1
 
if __name__ == '__main__':
    data = range(100)
 
    # Secuencial es más rápido para tareas pequeñas
    start = time.time()
    results = [tiny_task(x) for x in data]
    print(f"Secuencial: {time.time() - start:.4f}s")  # ~0.0001s
 
    start = time.time()
    with Pool(4) as pool:
        results = pool.map(tiny_task, data)
    print(f"Paralelo: {time.time() - start:.4f}s")  # ~0.05s (¡500x más lento!)

Reglas generales:

  • Duración mínima de tarea: ~0.1 segundos por elemento
  • Tamaño de datos: Si serializar datos toma más que procesarlos, usar memoria compartida
  • Número de workers: Comenzar con cpu_count(), ajustar según características de la tarea

Requisitos de Pickling

Solo objetos serializables pueden pasarse entre procesos:

from multiprocessing import Pool
 
# ❌ Las funciones lambda no son serializables
# pool.map(lambda x: x*2, range(10))  # Falla
 
# ✅ Usar funciones nombradas
def double(x):
    return x * 2
 
with Pool(4) as pool:
    pool.map(double, range(10))
 
# ❌ Las funciones locales en notebooks fallan
# def process():
#     def inner(x): return x*2
#     pool.map(inner, range(10))  # Falla
 
# ✅ Definir a nivel de módulo o usar functools.partial
from functools import partial
 
def multiply(x, factor):
    return x * factor
 
with Pool(4) as pool:
    pool.map(partial(multiply, factor=3), range(10))

Depurar Código Paralelo con RunCell

Depurar código de multiprocessing es notoriamente difícil. Las declaraciones print desaparecen, los breakpoints no funcionan, y los stack traces son crípticos. Cuando los procesos fallan silenciosamente o producen resultados incorrectos, las herramientas de depuración tradicionales fallan.

RunCell (www.runcell.dev (opens in a new tab)) es un AI Agent construido para Jupyter que sobresale en depurar código paralelo. A diferencia de los depuradores estándar que no pueden seguir la ejecución a través de procesos, RunCell analiza tus patrones de multiprocessing, identifica condiciones de carrera, detecta errores de pickling antes de tiempo de ejecución y explica por qué los procesos se bloquean.

Cuando un worker de Pool falla sin traceback, RunCell puede inspeccionar la cola de errores y mostrarte exactamente qué llamada de función falló y por qué. Cuando el estado compartido produce resultados incorrectos, RunCell rastrea patrones de acceso a memoria para encontrar el lock faltante. Para científicos de datos depurando pipelines de datos paralelos complejos, RunCell convierte horas de depuración con declaraciones print en minutos de correcciones guiadas por IA.

Mejores Prácticas

1. Siempre Usa el Guard if name

# ✅ Correcto
if __name__ == '__main__':
    with Pool(4) as pool:
        pool.map(func, data)
 
# ❌ Incorrecto - causa bomba de fork en Windows
with Pool(4) as pool:
    pool.map(func, data)

2. Cierra Pools Explícitamente

# ✅ Context manager (recomendado)
with Pool(4) as pool:
    results = pool.map(func, data)
 
# ✅ Cierre y join explícitos
pool = Pool(4)
results = pool.map(func, data)
pool.close()
pool.join()
 
# ❌ Fuga de recursos
pool = Pool(4)
results = pool.map(func, data)

3. Maneja Excepciones

from multiprocessing import Pool
 
def risky_task(x):
    if x == 5:
        raise ValueError("Valor incorrecto")
    return x * 2
 
if __name__ == '__main__':
    with Pool(4) as pool:
        try:
            results = pool.map(risky_task, range(10))
        except ValueError as e:
            print(f"Tarea falló: {e}")
 
        # O manejar individualmente con apply_async
        async_results = [pool.apply_async(risky_task, (i,)) for i in range(10)]
 
        for i, ar in enumerate(async_results):
            try:
                result = ar.get()
                print(f"Resultado {i}: {result}")
            except ValueError:
                print(f"Tarea {i} falló")

4. Evita Estado Compartido Cuando Sea Posible

# ❌ El estado compartido requiere sincronización
from multiprocessing import Process, Value
 
counter = Value('i', 0)
 
def increment():
    for _ in range(100000):
        counter.value += 1  # ¡Condición de carrera!
 
# ✅ Usar locks o evitar compartir
from multiprocessing import Lock
 
lock = Lock()
 
def increment_safe():
    for _ in range(100000):
        with lock:
            counter.value += 1
 
# ✅ Aún mejor: evitar estado compartido
def count_locally(n):
    return n  # Retornar resultado en su lugar
 
with Pool(4) as pool:
    results = pool.map(count_locally, [100000] * 4)
    total = sum(results)

5. Elige el Número Correcto de Workers

from multiprocessing import cpu_count, Pool
 
# Vinculado a CPU: usar todos los núcleos
num_workers = cpu_count()
 
# Vinculado a I/O: puede usar más workers
num_workers = cpu_count() * 2
 
# Carga de trabajo mixta: ajustar empíricamente
with Pool(processes=num_workers) as pool:
    results = pool.map(func, data)

Errores Comunes

1. Olvidar el Guard if name

Lleva a generación infinita de procesos en Windows/macOS.

2. Intentar Serializar Objetos No Serializables

# ❌ Métodos de clase, lambdas, funciones locales fallan
class DataProcessor:
    def process(self, x):
        return x * 2
 
dp = DataProcessor()
# pool.map(dp.process, data)  # Falla
 
# ✅ Usar funciones de nivel superior
def process(x):
    return x * 2
 
with Pool(4) as pool:
    pool.map(process, data)

3. No Manejar Terminación de Procesos

# ❌ No limpia apropiadamente
pool = Pool(4)
results = pool.map(func, data)
# pool todavía ejecutándose
 
# ✅ Siempre cerrar y join
pool = Pool(4)
try:
    results = pool.map(func, data)
finally:
    pool.close()
    pool.join()

4. Transferencia Excesiva de Datos

# ❌ Serializar objetos enormes es lento
large_data = [np.random.rand(1000, 1000) for _ in range(100)]
with Pool(4) as pool:
    pool.map(process_array, large_data)  # Serialización lenta
 
# ✅ Usar memoria compartida o archivos mapeados en memoria
import numpy as np
from multiprocessing import shared_memory
 
# Crear memoria compartida
shm = shared_memory.SharedMemory(create=True, size=1000*1000*8)
arr = np.ndarray((1000, 1000), dtype=np.float64, buffer=shm.buf)
 
# Pasar solo el nombre y forma
def process_shared(name, shape):
    existing_shm = shared_memory.SharedMemory(name=name)
    arr = np.ndarray(shape, dtype=np.float64, buffer=existing_shm.buf)
    # Procesar arr...
    existing_shm.close()
 
with Pool(4) as pool:
    pool.starmap(process_shared, [(shm.name, (1000, 1000))] * 4)
 
shm.close()
shm.unlink()

FAQ

¿Cómo evita multiprocessing el GIL?

El GIL (Global Interpreter Lock) es un mutex en cada intérprete de Python que evita que múltiples hilos ejecuten bytecode de Python simultáneamente. Multiprocessing evita esto creando procesos de Python separados, cada uno con su propio intérprete y GIL. Dado que los procesos no comparten memoria, se ejecutan verdaderamente en paralelo a través de núcleos de CPU sin contención del GIL.

¿Cuándo debo usar multiprocessing vs threading?

Usa multiprocessing para tareas vinculadas a CPU (procesamiento de datos, cálculos, manipulación de imágenes) donde el GIL limita el rendimiento. Usa threading para tareas vinculadas a I/O (peticiones de red, operaciones de archivo) donde el GIL se libera durante I/O, permitiendo que los hilos trabajen concurrentemente. Threading tiene menor overhead pero no puede paralelizar trabajo de CPU debido al GIL.

¿Por qué necesito el guard if name == 'main'?

En Windows y macOS, los procesos hijos importan el módulo principal para acceder a funciones. Sin el guard, importar el módulo ejecuta el código de creación de Pool nuevamente, generando procesos infinitos (bomba de fork). Linux usa fork() que no requiere imports, pero el guard es aún una mejor práctica para código multiplataforma.

¿Cuántos procesos worker debo usar?

Para tareas vinculadas a CPU, comienza con cpu_count() (número de núcleos de CPU). Más workers que núcleos causa overhead de cambio de contexto. Para tareas vinculadas a I/O, puedes usar más workers (2-4x núcleos) ya que los procesos esperan en I/O. Siempre haz benchmark con tu carga de trabajo específica, ya que el overhead de memoria y transferencia de datos puede limitar el número óptimo de workers.

¿Qué objetos puedo pasar a funciones de multiprocessing?

Los objetos deben ser serializables (serializables con pickle). Esto incluye tipos integrados (int, str, list, dict), arrays NumPy, DataFrames de pandas, y la mayoría de clases definidas por el usuario. Lambdas, funciones locales, métodos de clase, manejadores de archivo, conexiones de base de datos y locks de hilos no pueden ser serializados. Define funciones a nivel de módulo o usa functools.partial para aplicación parcial.

Conclusión

Python multiprocessing transforma cuellos de botella vinculados a CPU en operaciones paralelas que escalan con los núcleos disponibles. Al evitar el GIL a través de procesos separados, logras verdadero paralelismo imposible con threading. La interfaz Pool simplifica patrones comunes, mientras que Queue, Pipe y memoria compartida permiten flujos de trabajo inter-proceso complejos.

Comienza con Pool.map() para tareas embarazosamente paralelas, mide la aceleración y optimiza desde ahí. Recuerda el guard if __name__ == '__main__', mantén las tareas de grano grueso para amortizar el overhead del proceso, y minimiza la transferencia de datos entre procesos. Cuando la depuración se vuelve compleja, herramientas como RunCell pueden ayudar a rastrear la ejecución a través de límites de proceso.

Multiprocessing no siempre es la respuesta. Para trabajo vinculado a I/O, threading o asyncio pueden ser más simples y rápidos. Pero cuando estás procesando datasets grandes, entrenando modelos o realizando cálculos pesados, multiprocessing ofrece el rendimiento para el que fue construida tu máquina multinúcleo.

📚