Python Threading: Guía Completa de Multithreading con Ejemplos
Updated on
Tu programa en Python realiza 50 llamadas a API, una tras otra. Cada llamada implica 200 milisegundos de espera. Las matemáticas son despiadadas: 10 segundos de la vida de tu programa desperdiciados esperando respuestas de red. Tu CPU permanece inactiva con una utilización cercana a cero mientras tu script se arrastra a través de operaciones I/O-bound que podrían ejecutarse simultáneamente.
Este problema se agrava rápidamente. Web scrapers que obtienen miles de páginas secuencialmente. Scripts de procesamiento de archivos que leen y escriben un archivo a la vez. Consultas a bases de datos que bloquean toda la aplicación mientras esperan resultados. Cada segundo de espera inactivo es un segundo en el que tu programa podría estar haciendo trabajo útil.
El módulo threading de Python resuelve esto ejecutando múltiples operaciones concurrentemente dentro de un único proceso. Los threads comparten memoria, se inician rápidamente y son excelentes para cargas de trabajo I/O-bound donde el programa pasa la mayor parte del tiempo esperando. Esta guía cubre todo, desde la creación básica de threads hasta patrones avanzados de sincronización, con ejemplos de código listos para producción que puedes usar inmediatamente.
¿Qué es el Threading en Python?
El threading permite que un programa ejecute múltiples operaciones concurrentemente dentro del mismo proceso. Cada thread comparte el mismo espacio de memoria, lo que hace que la comunicación entre threads sea rápida y sencilla.
El módulo threading de Python proporciona una interfaz de alto nivel para crear y gestionar threads. Pero hay una advertencia importante: el Global Interpreter Lock (GIL).
El Global Interpreter Lock (GIL)
El GIL es un mutex en CPython que permite que solo un thread ejecute bytecode de Python a la vez. Esto significa que los threads no pueden lograr verdadero paralelismo para operaciones CPU-bound. Sin embargo, el GIL se libera durante operaciones I/O (llamadas de red, lecturas de archivos, consultas a bases de datos), permitiendo que otros threads se ejecuten mientras uno espera por I/O.
import threading
import time
def cpu_bound(n):
"""CPU-bound: GIL previene la ejecución paralela"""
total = 0
for i in range(n):
total += i * i
return total
def io_bound(url):
"""I/O-bound: GIL se libera durante la espera de red"""
import urllib.request
return urllib.request.urlopen(url).read()
# CPU-bound: 4 threads se ejecutan uno a la vez (sin aceleración)
start = time.time()
threads = [threading.Thread(target=cpu_bound, args=(10_000_000,)) for _ in range(4)]
for t in threads: t.start()
for t in threads: t.join()
print(f"CPU-bound con threads: {time.time() - start:.2f}s")
# I/O-bound: 4 threads solapan su tiempo de espera (gran aceleración)Esto significa que el threading es ideal para tareas I/O-bound pero no para cómputo intensivo en CPU. Para trabajo CPU-bound, usa el módulo multiprocessing en su lugar.
Cuándo Usar Threading vs Multiprocessing vs Asyncio
| Característica | threading | multiprocessing | asyncio |
|---|---|---|---|
| Mejor para | Tareas I/O-bound | Tareas CPU-bound | I/O de alta concurrencia |
| Paralelismo | Concurrente (limitado por GIL) | Paralelo verdadero | Concurrente (single thread) |
| Memoria | Compartida (ligera) | Separada por proceso | Compartida (ligera) |
| Costo de inicio | Bajo (~1ms) | Alto (~50-100ms) | Muy bajo |
| Comunicación | Acceso directo a memoria | Pipes, Queues, memoria compartida | Corrutinas awaitables |
| Escalabilidad | Decenas-cientos de threads | Limitado por núcleos de CPU | Miles de corrutinas |
| Complejidad | Media (requiere locks) | Media (serialización) | Alta (sintaxis async/await) |
| Caso de uso | Web scraping, I/O de archivos, llamadas a API | Procesamiento de datos, entrenamiento de ML | Servidores web, apps de chat |
Regla general: Si tu programa espera por red o disco, usa threading. Si procesa números, usa multiprocessing. Si necesitas miles de conexiones concurrentes, usa asyncio.
Fundamentos de Threads: Crear y Ejecutar Threads
La Clase threading.Thread
La forma más simple de crear un thread es pasando una función objetivo a threading.Thread:
import threading
import time
def download_file(filename):
print(f"[{threading.current_thread().name}] Descargando {filename}...")
time.sleep(2) # Simular descarga
print(f"[{threading.current_thread().name}] Finalizado {filename}")
# Crear threads
t1 = threading.Thread(target=download_file, args=("data.csv",))
t2 = threading.Thread(target=download_file, args=("report.pdf",))
# Iniciar threads
t1.start()
t2.start()
# Esperar a que ambos terminen
t1.join()
t2.join()
print("Todas las descargas completadas")Ambas descargas se ejecutan concurrentemente, terminando en aproximadamente 2 segundos en lugar de 4.
start() y join()
start()comienza la ejecución del thread. Un thread solo puede iniciarse una vez.join(timeout=None)bloquea el thread que llama hasta que el thread objetivo termina. Pasa untimeouten segundos para evitar esperar indefinidamente.
import threading
import time
def slow_task():
time.sleep(10)
t = threading.Thread(target=slow_task)
t.start()
# Esperar como máximo 3 segundos
t.join(timeout=3)
if t.is_alive():
print("El thread sigue ejecutándose después de 3 segundos")
else:
print("El thread ha finalizado")Nombrar Threads
Los threads nombrados facilitan la depuración:
import threading
def worker():
name = threading.current_thread().name
print(f"Ejecutándose en el thread: {name}")
t = threading.Thread(target=worker, name="DataProcessor")
t.start()
t.join()Threads Daemon
Los threads daemon son threads de segundo plano que terminan automáticamente cuando el programa principal finaliza. Los threads no-daemon mantienen el programa activo hasta que terminan.
import threading
import time
def background_monitor():
while True:
print("Monitoreando salud del sistema...")
time.sleep(5)
# Thread daemon: muere cuando el programa principal termina
monitor = threading.Thread(target=background_monitor, daemon=True)
monitor.start()
# El programa principal hace su trabajo
time.sleep(12)
print("El programa principal está terminando")
# el thread monitor es eliminado automáticamenteUsa threads daemon para tareas de logging en segundo plano, monitoreo o limpieza que no deberían impedir la salida del programa.
Subclasificando Thread
Para comportamientos de thread más complejos, subclasifica threading.Thread:
import threading
import time
class FileProcessor(threading.Thread):
def __init__(self, filepath):
super().__init__()
self.filepath = filepath
self.result = None
def run(self):
"""Sobrescribir run() con la lógica del thread"""
print(f"Procesando {self.filepath}")
time.sleep(1) # Simular trabajo
self.result = f"Procesado: {self.filepath}"
# Crear y ejecutar
processor = FileProcessor("/data/report.csv")
processor.start()
processor.join()
print(processor.result)Pasando Argumentos a Threads
Usando args y kwargs
Pasa argumentos posicionales con args (una tupla) y argumentos de palabra clave con kwargs (un diccionario):
import threading
def fetch_data(url, timeout, retries=3, verbose=False):
print(f"Obteniendo {url} (timeout={timeout}s, retries={retries}, verbose={verbose})")
# Args posicionales como tupla
t1 = threading.Thread(target=fetch_data, args=("https://api.example.com", 30))
# Args de palabra clave como dict
t2 = threading.Thread(
target=fetch_data,
args=("https://api.example.com",),
kwargs={"timeout": 30, "retries": 5, "verbose": True}
)
t1.start()
t2.start()
t1.join()
t2.join()Error común: Olvidar la coma final en una tupla de un solo elemento. args=("hello",) es una tupla; args=("hello") es solo una cadena entre paréntesis.
Recolectando Resultados de Threads
Los threads no retornan valores directamente. Usa estructuras de datos compartidas o una lista para recolectar resultados:
import threading
results = {}
lock = threading.Lock()
def compute(task_id, value):
result = value ** 2
with lock:
results[task_id] = result
threads = []
for i in range(5):
t = threading.Thread(target=compute, args=(i, i * 10))
threads.append(t)
t.start()
for t in threads:
t.join()
print(results) # {0: 0, 1: 100, 2: 400, 3: 900, 4: 1600}Un enfoque más limpio usa ThreadPoolExecutor (cubierto a continuación), que maneja la recolección de resultados automáticamente.
ThreadPoolExecutor: El Enfoque Moderno
El módulo concurrent.futures proporciona ThreadPoolExecutor, una interfaz de alto nivel que gestiona un pool de threads trabajadores. Maneja la creación de threads, recolección de resultados y propagación de excepciones automáticamente.
Uso Básico con submit()
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
def fetch_url(url):
time.sleep(1) # Simular petición de red
return f"Contenido de {url}"
urls = [
"https://example.com/page1",
"https://example.com/page2",
"https://example.com/page3",
"https://example.com/page4",
"https://example.com/page5",
]
with ThreadPoolExecutor(max_workers=3) as executor:
# Enviar tareas y obtener objetos Future
future_to_url = {executor.submit(fetch_url, url): url for url in urls}
# Procesar resultados a medida que se completan
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
print(f"{url}: {data}")
except Exception as e:
print(f"{url} generó una excepción: {e}")Usando map() para Resultados Ordenados
executor.map() retorna resultados en el mismo orden que la entrada, similar al map() incorporado:
from concurrent.futures import ThreadPoolExecutor
def process_item(item):
return item.upper()
items = ["apple", "banana", "cherry", "date"]
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(process_item, items))
print(results) # ['APPLE', 'BANANA', 'CHERRY', 'DATE']submit() vs map()
submit() | map() | |
|---|---|---|
| Retorna | Objetos Future | Iterador de resultados |
| Orden de resultados | Orden de finalización (con as_completed) | Orden de entrada |
| Manejo de errores | Por tarea vía future.result() | Lanza en el primer fallo |
| Argumentos | Llamada única a función | Aplica función a cada ítem |
| Mejor para | Tareas heterogéneas, resultados tempranos | Procesamiento por lotes homogéneo |
Manejo de Excepciones con Futures
from concurrent.futures import ThreadPoolExecutor, as_completed
def risky_task(n):
if n == 3:
raise ValueError(f"Entrada inválida: {n}")
return n * 10
with ThreadPoolExecutor(max_workers=4) as executor:
futures = {executor.submit(risky_task, i): i for i in range(5)}
for future in as_completed(futures):
task_id = futures[future]
try:
result = future.result(timeout=5)
print(f"Tarea {task_id}: {result}")
except ValueError as e:
print(f"Tarea {task_id} falló: {e}")
except TimeoutError:
print(f"Tarea {task_id} agotó el tiempo de espera")Cancelando Tareas
from concurrent.futures import ThreadPoolExecutor
import time
def long_task(n):
time.sleep(5)
return n
with ThreadPoolExecutor(max_workers=2) as executor:
futures = [executor.submit(long_task, i) for i in range(10)]
# Cancelar tareas pendientes (las tareas ya en ejecución no pueden cancelarse)
for f in futures[4:]:
cancelled = f.cancel()
print(f"Cancelado: {cancelled}")Primitivas de Sincronización de Threads
Cuando múltiples threads acceden a datos compartidos, necesitas sincronización para prevenir condiciones de carrera.
Lock
Un Lock asegura que solo un thread entre en una sección crítica a la vez:
import threading
class BankAccount:
def __init__(self, balance):
self.balance = balance
self.lock = threading.Lock()
def withdraw(self, amount):
with self.lock: # Solo un thread a la vez
if self.balance >= amount:
self.balance -= amount
return True
return False
def deposit(self, amount):
with self.lock:
self.balance += amount
account = BankAccount(1000)
def make_transactions():
for _ in range(100):
account.deposit(10)
account.withdraw(10)
threads = [threading.Thread(target=make_transactions) for _ in range(10)]
for t in threads: t.start()
for t in threads: t.join()
print(f"Balance final: {account.balance}") # Siempre 1000Sin el lock, las lecturas y escrituras concurrentes producen resultados incorrectos (una condición de carrera).
RLock (Reentrant Lock)
Un RLock puede ser adquirido múltiples veces por el mismo thread. Esto previene bloqueos mutuos cuando una función que tiene un lock llama a otra función que también necesita el mismo lock:
import threading
class SafeCache:
def __init__(self):
self._data = {}
self._lock = threading.RLock()
def get(self, key):
with self._lock:
return self._data.get(key)
def set(self, key, value):
with self._lock:
self._data[key] = value
def get_or_set(self, key, default):
with self._lock:
# Esto llama a get(), que también adquiere _lock
# RLock permite esto; un Lock normal causaría deadlock
existing = self.get(key)
if existing is None:
self.set(key, default)
return default
return existingSemaphore
Un Semaphore permite que un número fijo de threads accedan a un recurso simultáneamente:
import threading
import time
# Permitir máximo 3 conexiones concurrentes a base de datos
db_semaphore = threading.Semaphore(3)
def query_database(query_id):
with db_semaphore:
print(f"Consulta {query_id}: conectado (conexiones activas: {3 - db_semaphore._value})")
time.sleep(2) # Simular consulta
print(f"Consulta {query_id}: terminado")
threads = [threading.Thread(target=query_database, args=(i,)) for i in range(10)]
for t in threads: t.start()
for t in threads: t.join()Event
Un Event permite que un thread señalice a otros threads en espera:
import threading
import time
data_ready = threading.Event()
shared_data = []
def producer():
print("Productor: preparando datos...")
time.sleep(3)
shared_data.extend([1, 2, 3, 4, 5])
print("Productor: datos listos, señalizando consumidores")
data_ready.set()
def consumer(name):
print(f"Consumidor {name}: esperando datos...")
data_ready.wait() # Bloquea hasta que el evento se establezca
print(f"Consumidor {name}: obtuvo datos = {shared_data}")
threads = [
threading.Thread(target=producer),
threading.Thread(target=consumer, args=("A",)),
threading.Thread(target=consumer, args=("B",)),
]
for t in threads: t.start()
for t in threads: t.join()Condition
Un Condition combina un lock con la capacidad de esperar una notificación. Es la base para patrones productor-consumidor:
import threading
import time
import random
buffer = []
MAX_SIZE = 5
condition = threading.Condition()
def producer():
for i in range(20):
with condition:
while len(buffer) >= MAX_SIZE:
condition.wait() # Esperar hasta que haya espacio disponible
item = random.randint(1, 100)
buffer.append(item)
print(f"Producido: {item} (tamaño del buffer: {len(buffer)})")
condition.notify_all()
time.sleep(0.1)
def consumer(name):
for _ in range(10):
with condition:
while len(buffer) == 0:
condition.wait() # Esperar hasta que haya un ítem disponible
item = buffer.pop(0)
print(f"Consumidor {name} consumió: {item} (tamaño del buffer: {len(buffer)})")
condition.notify_all()
time.sleep(0.15)
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer, args=("X",))
t3 = threading.Thread(target=consumer, args=("Y",))
t1.start(); t2.start(); t3.start()
t1.join(); t2.join(); t3.join()Resumen de Primitivas de Sincronización
| Primitiva | Propósito | Cuándo Usar |
|---|---|---|
Lock | Exclusión mutua | Proteger estado mutable compartido |
RLock | Mutex reentrante | Bloqueo anidado en el mismo thread |
Semaphore | Limitar concurrencia | Rate limiting, pools de conexiones |
Event | Señal única | Inicialización completa, señal de apagado |
Condition | Patrón wait/notify | Productor-consumidor, cambios de estado |
Barrier | Sincronizar N threads | Todos los threads deben llegar a un punto antes de continuar |
Estructuras de Datos Thread-Safe
queue.Queue
queue.Queue es la estructura de datos thread-safe por excelencia. Maneja todo el locking internamente:
import threading
import queue
import time
task_queue = queue.Queue()
results = queue.Queue()
def worker():
while True:
item = task_queue.get() # Bloquea hasta que haya un ítem disponible
if item is None:
break
result = item ** 2
results.put(result)
task_queue.task_done()
# Iniciar 4 workers
workers = []
for _ in range(4):
t = threading.Thread(target=worker, daemon=True)
t.start()
workers.append(t)
# Enviar tareas
for i in range(20):
task_queue.put(i)
# Esperar a que todas las tareas se completen
task_queue.join()
# Detener workers
for _ in range(4):
task_queue.put(None)
for w in workers:
w.join()
# Recolectar resultados
all_results = []
while not results.empty():
all_results.append(results.get())
print(f"Resultados: {sorted(all_results)}")queue.Queue también soporta:
Queue(maxsize=10): Bloqueaput()cuando está llenaPriorityQueue(): Ítems ordenados por prioridadLifoQueue(): Último en entrar, primero en salir (comportamiento de pila)
collections.deque
collections.deque es thread-safe para operaciones append() y popleft() (atómico a nivel C en CPython), haciéndolo una alternativa rápida para patrones simples de productor-consumidor:
from collections import deque
import threading
import time
buffer = deque(maxlen=1000)
def producer():
for i in range(100):
buffer.append(i)
time.sleep(0.01)
def consumer():
consumed = 0
while consumed < 100:
if buffer:
item = buffer.popleft()
consumed += 1
else:
time.sleep(0.01)
print(f"Consumidos {consumed} ítems")
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t1.start(); t2.start()
t1.join(); t2.join()Nota: Aunque las operaciones individuales append y popleft son thread-safe, verificar len(buffer) y luego hacer pop no es atómico. Para thread safety completa, usa queue.Queue.
Patrones Comunes de Threading
Patrón Productor-Consumidor
El patrón clásico para desacoplar la producción de datos del procesamiento de datos:
import threading
import queue
import time
import random
def producer(q, name, num_items):
for i in range(num_items):
item = f"{name}-item-{i}"
q.put(item)
print(f"Productor {name}: creado {item}")
time.sleep(random.uniform(0.05, 0.15))
print(f"Productor {name}: terminado")
def consumer(q, name, stop_event):
while not stop_event.is_set() or not q.empty():
try:
item = q.get(timeout=0.5)
print(f"Consumidor {name}: procesando {item}")
time.sleep(random.uniform(0.1, 0.2))
q.task_done()
except queue.Empty:
continue
print(f"Consumidor {name}: apagándose")
task_queue = queue.Queue(maxsize=10)
stop_event = threading.Event()
producers = [
threading.Thread(target=producer, args=(task_queue, "P1", 10)),
threading.Thread(target=producer, args=(task_queue, "P2", 10)),
]
consumers = [
threading.Thread(target=consumer, args=(task_queue, "C1", stop_event)),
threading.Thread(target=consumer, args=(task_queue, "C2", stop_event)),
threading.Thread(target=consumer, args=(task_queue, "C3", stop_event)),
]
for c in consumers: c.start()
for p in producers: p.start()
for p in producers: p.join()
task_queue.join() # Esperar a que todos los ítems sean procesados
stop_event.set() # Señalizar a los consumidores que se detengan
for c in consumers: c.join()Pool de Threads Trabajadores (Manual)
Cuando necesitas más control que el que proporciona ThreadPoolExecutor:
import threading
import queue
class WorkerPool:
def __init__(self, num_workers):
self.task_queue = queue.Queue()
self.result_queue = queue.Queue()
self.workers = []
for _ in range(num_workers):
t = threading.Thread(target=self._worker, daemon=True)
t.start()
self.workers.append(t)
def _worker(self):
while True:
func, args, kwargs, future_id = self.task_queue.get()
if func is None:
break
try:
result = func(*args, **kwargs)
self.result_queue.put((future_id, result, None))
except Exception as e:
self.result_queue.put((future_id, None, e))
finally:
self.task_queue.task_done()
def submit(self, func, *args, **kwargs):
future_id = id(func) # ID simple
self.task_queue.put((func, args, kwargs, future_id))
return future_id
def shutdown(self):
for _ in self.workers:
self.task_queue.put((None, None, None, None))
for w in self.workers:
w.join()
# Uso
pool = WorkerPool(4)
for i in range(10):
pool.submit(lambda x: x * x, i)
pool.task_queue.join()
pool.shutdown()Thread Pool con Rate Limiting
Controla qué tan rápido los threads hacen peticiones externas:
import threading
import time
from concurrent.futures import ThreadPoolExecutor
class RateLimiter:
def __init__(self, max_per_second):
self.interval = 1.0 / max_per_second
self.lock = threading.Lock()
self.last_call = 0
def wait(self):
with self.lock:
elapsed = time.time() - self.last_call
wait_time = self.interval - elapsed
if wait_time > 0:
time.sleep(wait_time)
self.last_call = time.time()
limiter = RateLimiter(max_per_second=5)
def rate_limited_fetch(url):
limiter.wait()
print(f"Obteniendo {url} a las {time.time():.2f}")
time.sleep(0.5) # Simular petición
return f"Datos de {url}"
urls = [f"https://api.example.com/item/{i}" for i in range(20)]
with ThreadPoolExecutor(max_workers=10) as executor:
results = list(executor.map(rate_limited_fetch, urls))Errores Comunes de Thread Safety y Cómo Evitarlos
Condiciones de Carrera
Una condición de carrera ocurre cuando el resultado depende del tiempo de ejecución de los threads:
import threading
# MAL: Condición de carrera
counter = 0
def increment_unsafe():
global counter
for _ in range(100_000):
counter += 1 # Leer, incrementar, escribir: NO atómico
threads = [threading.Thread(target=increment_unsafe) for _ in range(5)]
for t in threads: t.start()
for t in threads: t.join()
print(f"Esperado: 500000, Obtenido: {counter}") # A menudo menos de 500000
# BIEN: Protegido con lock
counter = 0
lock = threading.Lock()
def increment_safe():
global counter
for _ in range(100_000):
with lock:
counter += 1
threads = [threading.Thread(target=increment_safe) for _ in range(5)]
for t in threads: t.start()
for t in threads: t.join()
print(f"Esperado: 500000, Obtenido: {counter}") # Siempre 500000Bloqueos Mutuos (Deadlocks)
Un deadlock ocurre cuando dos threads tienen cada uno un lock que el otro necesita:
import threading
lock_a = threading.Lock()
lock_b = threading.Lock()
def thread_1():
with lock_a:
print("Thread 1: adquirió lock_a")
with lock_b: # Espera para siempre si thread_2 tiene lock_b
print("Thread 1: adquirió lock_b")
def thread_2():
with lock_b:
print("Thread 2: adquirió lock_b")
with lock_a: # Espera para siempre si thread_1 tiene lock_a
print("Thread 2: adquirió lock_a")
# Esto CAUSARÁ deadlock
# t1 = threading.Thread(target=thread_1)
# t2 = threading.Thread(target=thread_2)
# t1.start(); t2.start()Cómo prevenir deadlocks:
- Siempre adquiere los locks en el mismo orden:
def thread_1_fixed():
with lock_a: # Siempre lock_a primero
with lock_b:
print("Thread 1: adquirió ambos locks")
def thread_2_fixed():
with lock_a: # Siempre lock_a primero (mismo orden)
with lock_b:
print("Thread 2: adquirió ambos locks")- Usa timeouts:
def safe_acquire():
acquired_a = lock_a.acquire(timeout=2)
if not acquired_a:
print("No se pudo adquirir lock_a, retrocediendo")
return
try:
acquired_b = lock_b.acquire(timeout=2)
if not acquired_b:
print("No se pudo adquirir lock_b, liberando lock_a")
return
try:
print("Adquiridos ambos locks de forma segura")
finally:
lock_b.release()
finally:
lock_a.release()- Minimiza el alcance de los locks: Mantén los locks por el menor tiempo posible.
Lista de Verificación de Thread Safety
- Protege todo el estado mutable compartido con locks
- Usa
queue.Queueen lugar de listas o diccionarios compartidos cuando sea posible - Evita el estado mutable global; pasa datos a través de argumentos de función
- Usa
ThreadPoolExecutoren lugar de gestión manual de threads - Nunca asumas orden de operación entre threads
- Prueba con
threading.active_count()y logging para detectar fugas de threads
Ejemplos del Mundo Real
Web Scraping Concurrente
from concurrent.futures import ThreadPoolExecutor, as_completed
import urllib.request
import time
def fetch_page(url):
"""Obtiene una página web y retorna su longitud de contenido"""
try:
with urllib.request.urlopen(url, timeout=10) as response:
content = response.read()
return url, len(content), None
except Exception as e:
return url, 0, str(e)
urls = [
"https://python.org",
"https://docs.python.org",
"https://pypi.org",
"https://realpython.com",
"https://github.com",
"https://stackoverflow.com",
"https://news.ycombinator.com",
"https://httpbin.org",
]
# Secuencial
start = time.time()
for url in urls:
fetch_page(url)
sequential_time = time.time() - start
# Concurrente con threads
start = time.time()
with ThreadPoolExecutor(max_workers=8) as executor:
futures = {executor.submit(fetch_page, url): url for url in urls}
for future in as_completed(futures):
url, size, error = future.result()
if error:
print(f" FAIL {url}: {error}")
else:
print(f" OK {url}: {size:,} bytes")
threaded_time = time.time() - start
print(f"\nSecuencial: {sequential_time:.2f}s")
print(f"Threaded: {threaded_time:.2f}s")
print(f"Aceleración: {sequential_time / threaded_time:.1f}x")I/O de Archivos en Paralelo
from concurrent.futures import ThreadPoolExecutor
import os
import hashlib
def process_file(filepath):
"""Lee archivo y calcula su hash SHA-256"""
with open(filepath, 'rb') as f:
content = f.read()
file_hash = hashlib.sha256(content).hexdigest()
size = os.path.getsize(filepath)
return filepath, file_hash, size
def hash_all_files(directory, pattern="*.py"):
"""Calcula hash de todos los archivos coincidentes en un directorio usando threads"""
import glob
files = glob.glob(os.path.join(directory, "**", pattern), recursive=True)
results = {}
with ThreadPoolExecutor(max_workers=8) as executor:
futures = {executor.submit(process_file, f): f for f in files}
for future in futures:
try:
path, hash_val, size = future.result()
results[path] = {"hash": hash_val, "size": size}
except Exception as e:
print(f"Error procesando {futures[future]}: {e}")
return results
# Uso
# file_hashes = hash_all_files("/path/to/project")Llamadas a API Concurrentes con Lógica de Reintentos
from concurrent.futures import ThreadPoolExecutor, as_completed
import urllib.request
import json
import time
def fetch_api(endpoint, max_retries=3, backoff=1.0):
"""Obtiene endpoint de API con reintentos de backoff exponencial"""
for attempt in range(max_retries):
try:
url = f"https://jsonplaceholder.typicode.com{endpoint}"
req = urllib.request.Request(url)
with urllib.request.urlopen(req, timeout=10) as response:
data = json.loads(response.read())
return {"endpoint": endpoint, "data": data, "error": None}
except Exception as e:
if attempt < max_retries - 1:
wait = backoff * (2 ** attempt)
time.sleep(wait)
else:
return {"endpoint": endpoint, "data": None, "error": str(e)}
endpoints = [f"/posts/{i}" for i in range(1, 21)]
start = time.time()
with ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(fetch_api, ep) for ep in endpoints]
results = [f.result() for f in futures]
elapsed = time.time() - start
success = sum(1 for r in results if r["error"] is None)
print(f"Obtenidos {success}/{len(endpoints)} endpoints en {elapsed:.2f}s")Tareas Periódicas en Segundo Plano
import threading
import time
class PeriodicTask:
"""Ejecuta una función a intervalos fijos en un thread de segundo plano"""
def __init__(self, interval, func, *args, **kwargs):
self.interval = interval
self.func = func
self.args = args
self.kwargs = kwargs
self._stop_event = threading.Event()
self._thread = None
def start(self):
self._thread = threading.Thread(target=self._run, daemon=True)
self._thread.start()
def _run(self):
while not self._stop_event.is_set():
self.func(*self.args, **self.kwargs)
self._stop_event.wait(self.interval)
def stop(self):
self._stop_event.set()
if self._thread:
self._thread.join()
# Uso
def check_health():
print(f"Verificación de salud a las {time.strftime('%H:%M:%S')}")
task = PeriodicTask(2.0, check_health)
task.start()
time.sleep(7)
task.stop()
print("Detenido")Rendimiento: Threading vs Multiprocessing vs Asyncio
La herramienta de concurrencia correcta depende de la carga de trabajo. Aquí hay una comparación del tiempo de reloj para tareas comunes:
| Tarea | Secuencial | Threading (4) | Multiprocessing (4) | Asyncio |
|---|---|---|---|---|
| 100 peticiones HTTP (200ms cada una) | 20.0s | 5.1s | 5.8s | 4.9s |
| 100 lecturas de archivo (10ms cada una) | 1.0s | 0.28s | 0.35s | 0.26s |
| 100 tareas de CPU (100ms cada una) | 10.0s | 10.2s | 2.7s | 10.0s |
| 50 consultas a BD (50ms cada una) | 2.5s | 0.68s | 0.85s | 0.62s |
| I/O + CPU mixto | 15.0s | 8.2s | 4.1s | 9.5s |
Conclusiones clave:
- El Threading ofrece aceleración de 3-5x en cargas de trabajo I/O-bound con cambios mínimos de código
- El Multiprocessing es la única opción para paralelismo verdadero de CPU pero añade overhead de proceso
- El Asyncio supera al threading en I/O de alta concurrencia pero requiere reescribir código con async/await
- Para cargas mixtas, considera combinar threading para I/O y multiprocessing para tareas de CPU
import time
import threading
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
def io_task():
time.sleep(0.2)
def cpu_task(n=2_000_000):
return sum(i * i for i in range(n))
# Benchmark threading vs multiprocessing
NUM_TASKS = 20
# Threading - I/O bound
start = time.time()
with ThreadPoolExecutor(max_workers=4) as pool:
list(pool.map(lambda _: io_task(), range(NUM_TASKS)))
print(f"Threading (I/O): {time.time() - start:.2f}s")
# Threading - CPU bound
start = time.time()
with ThreadPoolExecutor(max_workers=4) as pool:
list(pool.map(lambda _: cpu_task(), range(NUM_TASKS)))
print(f"Threading (CPU): {time.time() - start:.2f}s")Experimentando con Threading en RunCell
Depurar y perfilar código threaded puede ser desafiante. Cuando necesitas probar sincronización de threads, visualizar solapamientos de tiempo o diagnosticar condiciones de carrera de forma interactiva, RunCell (www.runcell.dev (opens in a new tab)) proporciona un entorno Jupyter potenciado por IA diseñado para este flujo de trabajo.
El agente de IA de RunCell puede analizar tu código de threading, identificar potenciales deadlocks antes de que ocurran, sugerir cantidades óptimas de workers basadas en tu carga de trabajo y ayudarte a entender por qué los threads se comportan de manera inesperada. Cuando un pool de threads produce resultados incorrectos intermitentemente, RunCell rastrea la línea de tiempo de ejecución para identificar el momento exacto en que el estado compartido se corrompe.
Si quieres visualizar las características de rendimiento de diferentes configuraciones de threading, PyGWalker (github.com/Kanaries/pygwalker) puede convertir tus DataFrames de benchmark en gráficos interactivos. Ejecuta benchmarks de threading, recolecta datos de tiempo en un DataFrame de pandas y explora los resultados con visualizaciones drag-and-drop para encontrar la cantidad óptima de threads para tu carga de trabajo.
FAQ
¿Cuál es la diferencia entre threading y multiprocessing en Python?
El threading ejecuta múltiples threads dentro de un único proceso, compartiendo memoria. El Global Interpreter Lock (GIL) previene que los threads ejecuten bytecode de Python en paralelo, haciendo que el threading sea efectivo solo para tareas I/O-bound como peticiones de red y operaciones de archivos. El multiprocessing crea procesos separados, cada uno con su propio intérprete de Python y espacio de memoria, permitiendo ejecución paralela verdadera para tareas CPU-bound. El threading tiene menor overhead (inicio más rápido, menos memoria), mientras que el multiprocessing evade el GIL para paralelismo genuino.
¿Es el threading de Python verdaderamente paralelo?
No, el threading de Python es concurrente pero no paralelo para código CPU-bound debido al GIL. Solo un thread ejecuta bytecode de Python a la vez. Sin embargo, el GIL se libera durante operaciones I/O (red, disco, base de datos), por lo que múltiples threads se ejecutan efectivamente en paralelo cuando esperan por I/O. Para paralelismo CPU-bound, usa el módulo multiprocessing o extensiones en C que liberan el GIL (como NumPy).
¿Cuántos threads debería usar en Python?
Para tareas I/O-bound, comienza con 5-20 threads dependiendo de los límites de rate del servicio externo y tu ancho de banda de red. Demasiados threads hacia un único servidor pueden causar rechazos de conexión o throttling. Para cargas mixtas, experimenta con cantidades de threads entre el número de núcleos de CPU y 4 veces esa cantidad. Usa ThreadPoolExecutor y haz benchmark con diferentes valores de max_workers para encontrar la cantidad óptima para tu carga de trabajo específica. El valor por defecto para ThreadPoolExecutor es min(32, os.cpu_count() + 4).
¿Cómo retorno un valor desde un thread en Python?
Los threads no retornan valores directamente desde su función objetivo. Los tres enfoques principales son: (1) Usa ThreadPoolExecutor.submit() que retorna un objeto Future donde llamas future.result() para obtener el valor de retorno. (2) Pasa un contenedor mutable (como un diccionario o lista) como argumento y haz que el thread escriba resultados en él, protegido por un Lock. (3) Usa queue.Queue donde el thread pone resultados en la cola y el thread principal lee de ella. ThreadPoolExecutor es el enfoque más limpio para la mayoría de los casos de uso.
¿Qué sucede si un thread de Python lanza una excepción?
En un threading.Thread crudo, una excepción no manejada termina ese thread silenciosamente y la excepción se pierde. El thread principal y otros threads continúan ejecutándose sin notificación. Con ThreadPoolExecutor, las excepciones son capturadas y relanzadas cuando llamas future.result(), haciendo que el manejo de errores sea mucho más confiable. Siempre usa bloques try/except dentro de funciones objetivo de threads o usa ThreadPoolExecutor para asegurar que las excepciones sean capturadas y manejadas adecuadamente.
Conclusión
El threading de Python es una herramienta poderosa para acelerar programas I/O-bound. Ejecutando peticiones de red, operaciones de archivos y consultas a bases de datos concurrentemente, puedes convertir un script secuencial de 20 segundos en uno que termina en 5 segundos con cambios mínimos de código.
Los puntos clave a recordar:
- Usa threading para trabajo I/O-bound. El GIL previene el paralelismo de CPU, pero los threads solapan el tiempo de espera de I/O efectivamente.
- Usa
ThreadPoolExecutorpara la mayoría de las necesidades de threading. Gestiona threads, recolecta resultados y propaga excepciones limpiamente. - Protege el estado compartido con locks. Las condiciones de carrera son el bug más común de threading, y
queue.Queueelimina la mayoría de las preocupaciones de locking. - Evita deadlocks adquiriendo locks en un orden consistente y usando timeouts.
- Elige la herramienta correcta: threading para I/O, multiprocessing para CPU, asyncio para miles de conexiones concurrentes.
Comienza con ThreadPoolExecutor y una simple llamada executor.map(). Mide la aceleración. Añade sincronización solo donde el estado mutable compartido lo demande. El threading no requiere una reescritura completa de tu código. Unas pocas líneas de concurrent.futures pueden entregar mejoras dramáticas de rendimiento para cualquier programa que pase tiempo esperando.