Skip to content

Python Threading: Guía Completa de Multithreading con Ejemplos

Updated on

Tu programa en Python realiza 50 llamadas a API, una tras otra. Cada llamada implica 200 milisegundos de espera. Las matemáticas son despiadadas: 10 segundos de la vida de tu programa desperdiciados esperando respuestas de red. Tu CPU permanece inactiva con una utilización cercana a cero mientras tu script se arrastra a través de operaciones I/O-bound que podrían ejecutarse simultáneamente.

Este problema se agrava rápidamente. Web scrapers que obtienen miles de páginas secuencialmente. Scripts de procesamiento de archivos que leen y escriben un archivo a la vez. Consultas a bases de datos que bloquean toda la aplicación mientras esperan resultados. Cada segundo de espera inactivo es un segundo en el que tu programa podría estar haciendo trabajo útil.

El módulo threading de Python resuelve esto ejecutando múltiples operaciones concurrentemente dentro de un único proceso. Los threads comparten memoria, se inician rápidamente y son excelentes para cargas de trabajo I/O-bound donde el programa pasa la mayor parte del tiempo esperando. Esta guía cubre todo, desde la creación básica de threads hasta patrones avanzados de sincronización, con ejemplos de código listos para producción que puedes usar inmediatamente.

📚

¿Qué es el Threading en Python?

El threading permite que un programa ejecute múltiples operaciones concurrentemente dentro del mismo proceso. Cada thread comparte el mismo espacio de memoria, lo que hace que la comunicación entre threads sea rápida y sencilla.

El módulo threading de Python proporciona una interfaz de alto nivel para crear y gestionar threads. Pero hay una advertencia importante: el Global Interpreter Lock (GIL).

El Global Interpreter Lock (GIL)

El GIL es un mutex en CPython que permite que solo un thread ejecute bytecode de Python a la vez. Esto significa que los threads no pueden lograr verdadero paralelismo para operaciones CPU-bound. Sin embargo, el GIL se libera durante operaciones I/O (llamadas de red, lecturas de archivos, consultas a bases de datos), permitiendo que otros threads se ejecuten mientras uno espera por I/O.

import threading
import time
 
def cpu_bound(n):
    """CPU-bound: GIL previene la ejecución paralela"""
    total = 0
    for i in range(n):
        total += i * i
    return total
 
def io_bound(url):
    """I/O-bound: GIL se libera durante la espera de red"""
    import urllib.request
    return urllib.request.urlopen(url).read()
 
# CPU-bound: 4 threads se ejecutan uno a la vez (sin aceleración)
start = time.time()
threads = [threading.Thread(target=cpu_bound, args=(10_000_000,)) for _ in range(4)]
for t in threads: t.start()
for t in threads: t.join()
print(f"CPU-bound con threads: {time.time() - start:.2f}s")
 
# I/O-bound: 4 threads solapan su tiempo de espera (gran aceleración)

Esto significa que el threading es ideal para tareas I/O-bound pero no para cómputo intensivo en CPU. Para trabajo CPU-bound, usa el módulo multiprocessing en su lugar.

Cuándo Usar Threading vs Multiprocessing vs Asyncio

Característicathreadingmultiprocessingasyncio
Mejor paraTareas I/O-boundTareas CPU-boundI/O de alta concurrencia
ParalelismoConcurrente (limitado por GIL)Paralelo verdaderoConcurrente (single thread)
MemoriaCompartida (ligera)Separada por procesoCompartida (ligera)
Costo de inicioBajo (~1ms)Alto (~50-100ms)Muy bajo
ComunicaciónAcceso directo a memoriaPipes, Queues, memoria compartidaCorrutinas awaitables
EscalabilidadDecenas-cientos de threadsLimitado por núcleos de CPUMiles de corrutinas
ComplejidadMedia (requiere locks)Media (serialización)Alta (sintaxis async/await)
Caso de usoWeb scraping, I/O de archivos, llamadas a APIProcesamiento de datos, entrenamiento de MLServidores web, apps de chat

Regla general: Si tu programa espera por red o disco, usa threading. Si procesa números, usa multiprocessing. Si necesitas miles de conexiones concurrentes, usa asyncio.

Fundamentos de Threads: Crear y Ejecutar Threads

La Clase threading.Thread

La forma más simple de crear un thread es pasando una función objetivo a threading.Thread:

import threading
import time
 
def download_file(filename):
    print(f"[{threading.current_thread().name}] Descargando {filename}...")
    time.sleep(2)  # Simular descarga
    print(f"[{threading.current_thread().name}] Finalizado {filename}")
 
# Crear threads
t1 = threading.Thread(target=download_file, args=("data.csv",))
t2 = threading.Thread(target=download_file, args=("report.pdf",))
 
# Iniciar threads
t1.start()
t2.start()
 
# Esperar a que ambos terminen
t1.join()
t2.join()
 
print("Todas las descargas completadas")

Ambas descargas se ejecutan concurrentemente, terminando en aproximadamente 2 segundos en lugar de 4.

start() y join()

  • start() comienza la ejecución del thread. Un thread solo puede iniciarse una vez.
  • join(timeout=None) bloquea el thread que llama hasta que el thread objetivo termina. Pasa un timeout en segundos para evitar esperar indefinidamente.
import threading
import time
 
def slow_task():
    time.sleep(10)
 
t = threading.Thread(target=slow_task)
t.start()
 
# Esperar como máximo 3 segundos
t.join(timeout=3)
 
if t.is_alive():
    print("El thread sigue ejecutándose después de 3 segundos")
else:
    print("El thread ha finalizado")

Nombrar Threads

Los threads nombrados facilitan la depuración:

import threading
 
def worker():
    name = threading.current_thread().name
    print(f"Ejecutándose en el thread: {name}")
 
t = threading.Thread(target=worker, name="DataProcessor")
t.start()
t.join()

Threads Daemon

Los threads daemon son threads de segundo plano que terminan automáticamente cuando el programa principal finaliza. Los threads no-daemon mantienen el programa activo hasta que terminan.

import threading
import time
 
def background_monitor():
    while True:
        print("Monitoreando salud del sistema...")
        time.sleep(5)
 
# Thread daemon: muere cuando el programa principal termina
monitor = threading.Thread(target=background_monitor, daemon=True)
monitor.start()
 
# El programa principal hace su trabajo
time.sleep(12)
print("El programa principal está terminando")
# el thread monitor es eliminado automáticamente

Usa threads daemon para tareas de logging en segundo plano, monitoreo o limpieza que no deberían impedir la salida del programa.

Subclasificando Thread

Para comportamientos de thread más complejos, subclasifica threading.Thread:

import threading
import time
 
class FileProcessor(threading.Thread):
    def __init__(self, filepath):
        super().__init__()
        self.filepath = filepath
        self.result = None
 
    def run(self):
        """Sobrescribir run() con la lógica del thread"""
        print(f"Procesando {self.filepath}")
        time.sleep(1)  # Simular trabajo
        self.result = f"Procesado: {self.filepath}"
 
# Crear y ejecutar
processor = FileProcessor("/data/report.csv")
processor.start()
processor.join()
print(processor.result)

Pasando Argumentos a Threads

Usando args y kwargs

Pasa argumentos posicionales con args (una tupla) y argumentos de palabra clave con kwargs (un diccionario):

import threading
 
def fetch_data(url, timeout, retries=3, verbose=False):
    print(f"Obteniendo {url} (timeout={timeout}s, retries={retries}, verbose={verbose})")
 
# Args posicionales como tupla
t1 = threading.Thread(target=fetch_data, args=("https://api.example.com", 30))
 
# Args de palabra clave como dict
t2 = threading.Thread(
    target=fetch_data,
    args=("https://api.example.com",),
    kwargs={"timeout": 30, "retries": 5, "verbose": True}
)
 
t1.start()
t2.start()
t1.join()
t2.join()

Error común: Olvidar la coma final en una tupla de un solo elemento. args=("hello",) es una tupla; args=("hello") es solo una cadena entre paréntesis.

Recolectando Resultados de Threads

Los threads no retornan valores directamente. Usa estructuras de datos compartidas o una lista para recolectar resultados:

import threading
 
results = {}
lock = threading.Lock()
 
def compute(task_id, value):
    result = value ** 2
    with lock:
        results[task_id] = result
 
threads = []
for i in range(5):
    t = threading.Thread(target=compute, args=(i, i * 10))
    threads.append(t)
    t.start()
 
for t in threads:
    t.join()
 
print(results)  # {0: 0, 1: 100, 2: 400, 3: 900, 4: 1600}

Un enfoque más limpio usa ThreadPoolExecutor (cubierto a continuación), que maneja la recolección de resultados automáticamente.

ThreadPoolExecutor: El Enfoque Moderno

El módulo concurrent.futures proporciona ThreadPoolExecutor, una interfaz de alto nivel que gestiona un pool de threads trabajadores. Maneja la creación de threads, recolección de resultados y propagación de excepciones automáticamente.

Uso Básico con submit()

from concurrent.futures import ThreadPoolExecutor, as_completed
import time
 
def fetch_url(url):
    time.sleep(1)  # Simular petición de red
    return f"Contenido de {url}"
 
urls = [
    "https://example.com/page1",
    "https://example.com/page2",
    "https://example.com/page3",
    "https://example.com/page4",
    "https://example.com/page5",
]
 
with ThreadPoolExecutor(max_workers=3) as executor:
    # Enviar tareas y obtener objetos Future
    future_to_url = {executor.submit(fetch_url, url): url for url in urls}
 
    # Procesar resultados a medida que se completan
    for future in as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
            print(f"{url}: {data}")
        except Exception as e:
            print(f"{url} generó una excepción: {e}")

Usando map() para Resultados Ordenados

executor.map() retorna resultados en el mismo orden que la entrada, similar al map() incorporado:

from concurrent.futures import ThreadPoolExecutor
 
def process_item(item):
    return item.upper()
 
items = ["apple", "banana", "cherry", "date"]
 
with ThreadPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(process_item, items))
 
print(results)  # ['APPLE', 'BANANA', 'CHERRY', 'DATE']

submit() vs map()

submit()map()
RetornaObjetos FutureIterador de resultados
Orden de resultadosOrden de finalización (con as_completed)Orden de entrada
Manejo de erroresPor tarea vía future.result()Lanza en el primer fallo
ArgumentosLlamada única a funciónAplica función a cada ítem
Mejor paraTareas heterogéneas, resultados tempranosProcesamiento por lotes homogéneo

Manejo de Excepciones con Futures

from concurrent.futures import ThreadPoolExecutor, as_completed
 
def risky_task(n):
    if n == 3:
        raise ValueError(f"Entrada inválida: {n}")
    return n * 10
 
with ThreadPoolExecutor(max_workers=4) as executor:
    futures = {executor.submit(risky_task, i): i for i in range(5)}
 
    for future in as_completed(futures):
        task_id = futures[future]
        try:
            result = future.result(timeout=5)
            print(f"Tarea {task_id}: {result}")
        except ValueError as e:
            print(f"Tarea {task_id} falló: {e}")
        except TimeoutError:
            print(f"Tarea {task_id} agotó el tiempo de espera")

Cancelando Tareas

from concurrent.futures import ThreadPoolExecutor
import time
 
def long_task(n):
    time.sleep(5)
    return n
 
with ThreadPoolExecutor(max_workers=2) as executor:
    futures = [executor.submit(long_task, i) for i in range(10)]
 
    # Cancelar tareas pendientes (las tareas ya en ejecución no pueden cancelarse)
    for f in futures[4:]:
        cancelled = f.cancel()
        print(f"Cancelado: {cancelled}")

Primitivas de Sincronización de Threads

Cuando múltiples threads acceden a datos compartidos, necesitas sincronización para prevenir condiciones de carrera.

Lock

Un Lock asegura que solo un thread entre en una sección crítica a la vez:

import threading
 
class BankAccount:
    def __init__(self, balance):
        self.balance = balance
        self.lock = threading.Lock()
 
    def withdraw(self, amount):
        with self.lock:  # Solo un thread a la vez
            if self.balance >= amount:
                self.balance -= amount
                return True
            return False
 
    def deposit(self, amount):
        with self.lock:
            self.balance += amount
 
account = BankAccount(1000)
 
def make_transactions():
    for _ in range(100):
        account.deposit(10)
        account.withdraw(10)
 
threads = [threading.Thread(target=make_transactions) for _ in range(10)]
for t in threads: t.start()
for t in threads: t.join()
 
print(f"Balance final: {account.balance}")  # Siempre 1000

Sin el lock, las lecturas y escrituras concurrentes producen resultados incorrectos (una condición de carrera).

RLock (Reentrant Lock)

Un RLock puede ser adquirido múltiples veces por el mismo thread. Esto previene bloqueos mutuos cuando una función que tiene un lock llama a otra función que también necesita el mismo lock:

import threading
 
class SafeCache:
    def __init__(self):
        self._data = {}
        self._lock = threading.RLock()
 
    def get(self, key):
        with self._lock:
            return self._data.get(key)
 
    def set(self, key, value):
        with self._lock:
            self._data[key] = value
 
    def get_or_set(self, key, default):
        with self._lock:
            # Esto llama a get(), que también adquiere _lock
            # RLock permite esto; un Lock normal causaría deadlock
            existing = self.get(key)
            if existing is None:
                self.set(key, default)
                return default
            return existing

Semaphore

Un Semaphore permite que un número fijo de threads accedan a un recurso simultáneamente:

import threading
import time
 
# Permitir máximo 3 conexiones concurrentes a base de datos
db_semaphore = threading.Semaphore(3)
 
def query_database(query_id):
    with db_semaphore:
        print(f"Consulta {query_id}: conectado (conexiones activas: {3 - db_semaphore._value})")
        time.sleep(2)  # Simular consulta
        print(f"Consulta {query_id}: terminado")
 
threads = [threading.Thread(target=query_database, args=(i,)) for i in range(10)]
for t in threads: t.start()
for t in threads: t.join()

Event

Un Event permite que un thread señalice a otros threads en espera:

import threading
import time
 
data_ready = threading.Event()
shared_data = []
 
def producer():
    print("Productor: preparando datos...")
    time.sleep(3)
    shared_data.extend([1, 2, 3, 4, 5])
    print("Productor: datos listos, señalizando consumidores")
    data_ready.set()
 
def consumer(name):
    print(f"Consumidor {name}: esperando datos...")
    data_ready.wait()  # Bloquea hasta que el evento se establezca
    print(f"Consumidor {name}: obtuvo datos = {shared_data}")
 
threads = [
    threading.Thread(target=producer),
    threading.Thread(target=consumer, args=("A",)),
    threading.Thread(target=consumer, args=("B",)),
]
for t in threads: t.start()
for t in threads: t.join()

Condition

Un Condition combina un lock con la capacidad de esperar una notificación. Es la base para patrones productor-consumidor:

import threading
import time
import random
 
buffer = []
MAX_SIZE = 5
condition = threading.Condition()
 
def producer():
    for i in range(20):
        with condition:
            while len(buffer) >= MAX_SIZE:
                condition.wait()  # Esperar hasta que haya espacio disponible
            item = random.randint(1, 100)
            buffer.append(item)
            print(f"Producido: {item} (tamaño del buffer: {len(buffer)})")
            condition.notify_all()
        time.sleep(0.1)
 
def consumer(name):
    for _ in range(10):
        with condition:
            while len(buffer) == 0:
                condition.wait()  # Esperar hasta que haya un ítem disponible
            item = buffer.pop(0)
            print(f"Consumidor {name} consumió: {item} (tamaño del buffer: {len(buffer)})")
            condition.notify_all()
        time.sleep(0.15)
 
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer, args=("X",))
t3 = threading.Thread(target=consumer, args=("Y",))
t1.start(); t2.start(); t3.start()
t1.join(); t2.join(); t3.join()

Resumen de Primitivas de Sincronización

PrimitivaPropósitoCuándo Usar
LockExclusión mutuaProteger estado mutable compartido
RLockMutex reentranteBloqueo anidado en el mismo thread
SemaphoreLimitar concurrenciaRate limiting, pools de conexiones
EventSeñal únicaInicialización completa, señal de apagado
ConditionPatrón wait/notifyProductor-consumidor, cambios de estado
BarrierSincronizar N threadsTodos los threads deben llegar a un punto antes de continuar

Estructuras de Datos Thread-Safe

queue.Queue

queue.Queue es la estructura de datos thread-safe por excelencia. Maneja todo el locking internamente:

import threading
import queue
import time
 
task_queue = queue.Queue()
results = queue.Queue()
 
def worker():
    while True:
        item = task_queue.get()  # Bloquea hasta que haya un ítem disponible
        if item is None:
            break
        result = item ** 2
        results.put(result)
        task_queue.task_done()
 
# Iniciar 4 workers
workers = []
for _ in range(4):
    t = threading.Thread(target=worker, daemon=True)
    t.start()
    workers.append(t)
 
# Enviar tareas
for i in range(20):
    task_queue.put(i)
 
# Esperar a que todas las tareas se completen
task_queue.join()
 
# Detener workers
for _ in range(4):
    task_queue.put(None)
for w in workers:
    w.join()
 
# Recolectar resultados
all_results = []
while not results.empty():
    all_results.append(results.get())
print(f"Resultados: {sorted(all_results)}")

queue.Queue también soporta:

  • Queue(maxsize=10): Bloquea put() cuando está llena
  • PriorityQueue(): Ítems ordenados por prioridad
  • LifoQueue(): Último en entrar, primero en salir (comportamiento de pila)

collections.deque

collections.deque es thread-safe para operaciones append() y popleft() (atómico a nivel C en CPython), haciéndolo una alternativa rápida para patrones simples de productor-consumidor:

from collections import deque
import threading
import time
 
buffer = deque(maxlen=1000)
 
def producer():
    for i in range(100):
        buffer.append(i)
        time.sleep(0.01)
 
def consumer():
    consumed = 0
    while consumed < 100:
        if buffer:
            item = buffer.popleft()
            consumed += 1
        else:
            time.sleep(0.01)
    print(f"Consumidos {consumed} ítems")
 
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t1.start(); t2.start()
t1.join(); t2.join()

Nota: Aunque las operaciones individuales append y popleft son thread-safe, verificar len(buffer) y luego hacer pop no es atómico. Para thread safety completa, usa queue.Queue.

Patrones Comunes de Threading

Patrón Productor-Consumidor

El patrón clásico para desacoplar la producción de datos del procesamiento de datos:

import threading
import queue
import time
import random
 
def producer(q, name, num_items):
    for i in range(num_items):
        item = f"{name}-item-{i}"
        q.put(item)
        print(f"Productor {name}: creado {item}")
        time.sleep(random.uniform(0.05, 0.15))
    print(f"Productor {name}: terminado")
 
def consumer(q, name, stop_event):
    while not stop_event.is_set() or not q.empty():
        try:
            item = q.get(timeout=0.5)
            print(f"Consumidor {name}: procesando {item}")
            time.sleep(random.uniform(0.1, 0.2))
            q.task_done()
        except queue.Empty:
            continue
    print(f"Consumidor {name}: apagándose")
 
task_queue = queue.Queue(maxsize=10)
stop_event = threading.Event()
 
producers = [
    threading.Thread(target=producer, args=(task_queue, "P1", 10)),
    threading.Thread(target=producer, args=(task_queue, "P2", 10)),
]
consumers = [
    threading.Thread(target=consumer, args=(task_queue, "C1", stop_event)),
    threading.Thread(target=consumer, args=(task_queue, "C2", stop_event)),
    threading.Thread(target=consumer, args=(task_queue, "C3", stop_event)),
]
 
for c in consumers: c.start()
for p in producers: p.start()
for p in producers: p.join()
 
task_queue.join()  # Esperar a que todos los ítems sean procesados
stop_event.set()   # Señalizar a los consumidores que se detengan
for c in consumers: c.join()

Pool de Threads Trabajadores (Manual)

Cuando necesitas más control que el que proporciona ThreadPoolExecutor:

import threading
import queue
 
class WorkerPool:
    def __init__(self, num_workers):
        self.task_queue = queue.Queue()
        self.result_queue = queue.Queue()
        self.workers = []
 
        for _ in range(num_workers):
            t = threading.Thread(target=self._worker, daemon=True)
            t.start()
            self.workers.append(t)
 
    def _worker(self):
        while True:
            func, args, kwargs, future_id = self.task_queue.get()
            if func is None:
                break
            try:
                result = func(*args, **kwargs)
                self.result_queue.put((future_id, result, None))
            except Exception as e:
                self.result_queue.put((future_id, None, e))
            finally:
                self.task_queue.task_done()
 
    def submit(self, func, *args, **kwargs):
        future_id = id(func)  # ID simple
        self.task_queue.put((func, args, kwargs, future_id))
        return future_id
 
    def shutdown(self):
        for _ in self.workers:
            self.task_queue.put((None, None, None, None))
        for w in self.workers:
            w.join()
 
# Uso
pool = WorkerPool(4)
for i in range(10):
    pool.submit(lambda x: x * x, i)
pool.task_queue.join()
pool.shutdown()

Thread Pool con Rate Limiting

Controla qué tan rápido los threads hacen peticiones externas:

import threading
import time
from concurrent.futures import ThreadPoolExecutor
 
class RateLimiter:
    def __init__(self, max_per_second):
        self.interval = 1.0 / max_per_second
        self.lock = threading.Lock()
        self.last_call = 0
 
    def wait(self):
        with self.lock:
            elapsed = time.time() - self.last_call
            wait_time = self.interval - elapsed
            if wait_time > 0:
                time.sleep(wait_time)
            self.last_call = time.time()
 
limiter = RateLimiter(max_per_second=5)
 
def rate_limited_fetch(url):
    limiter.wait()
    print(f"Obteniendo {url} a las {time.time():.2f}")
    time.sleep(0.5)  # Simular petición
    return f"Datos de {url}"
 
urls = [f"https://api.example.com/item/{i}" for i in range(20)]
 
with ThreadPoolExecutor(max_workers=10) as executor:
    results = list(executor.map(rate_limited_fetch, urls))

Errores Comunes de Thread Safety y Cómo Evitarlos

Condiciones de Carrera

Una condición de carrera ocurre cuando el resultado depende del tiempo de ejecución de los threads:

import threading
 
# MAL: Condición de carrera
counter = 0
 
def increment_unsafe():
    global counter
    for _ in range(100_000):
        counter += 1  # Leer, incrementar, escribir: NO atómico
 
threads = [threading.Thread(target=increment_unsafe) for _ in range(5)]
for t in threads: t.start()
for t in threads: t.join()
print(f"Esperado: 500000, Obtenido: {counter}")  # A menudo menos de 500000
 
# BIEN: Protegido con lock
counter = 0
lock = threading.Lock()
 
def increment_safe():
    global counter
    for _ in range(100_000):
        with lock:
            counter += 1
 
threads = [threading.Thread(target=increment_safe) for _ in range(5)]
for t in threads: t.start()
for t in threads: t.join()
print(f"Esperado: 500000, Obtenido: {counter}")  # Siempre 500000

Bloqueos Mutuos (Deadlocks)

Un deadlock ocurre cuando dos threads tienen cada uno un lock que el otro necesita:

import threading
 
lock_a = threading.Lock()
lock_b = threading.Lock()
 
def thread_1():
    with lock_a:
        print("Thread 1: adquirió lock_a")
        with lock_b:  # Espera para siempre si thread_2 tiene lock_b
            print("Thread 1: adquirió lock_b")
 
def thread_2():
    with lock_b:
        print("Thread 2: adquirió lock_b")
        with lock_a:  # Espera para siempre si thread_1 tiene lock_a
            print("Thread 2: adquirió lock_a")
 
# Esto CAUSARÁ deadlock
# t1 = threading.Thread(target=thread_1)
# t2 = threading.Thread(target=thread_2)
# t1.start(); t2.start()

Cómo prevenir deadlocks:

  1. Siempre adquiere los locks en el mismo orden:
def thread_1_fixed():
    with lock_a:    # Siempre lock_a primero
        with lock_b:
            print("Thread 1: adquirió ambos locks")
 
def thread_2_fixed():
    with lock_a:    # Siempre lock_a primero (mismo orden)
        with lock_b:
            print("Thread 2: adquirió ambos locks")
  1. Usa timeouts:
def safe_acquire():
    acquired_a = lock_a.acquire(timeout=2)
    if not acquired_a:
        print("No se pudo adquirir lock_a, retrocediendo")
        return
    try:
        acquired_b = lock_b.acquire(timeout=2)
        if not acquired_b:
            print("No se pudo adquirir lock_b, liberando lock_a")
            return
        try:
            print("Adquiridos ambos locks de forma segura")
        finally:
            lock_b.release()
    finally:
        lock_a.release()
  1. Minimiza el alcance de los locks: Mantén los locks por el menor tiempo posible.

Lista de Verificación de Thread Safety

  • Protege todo el estado mutable compartido con locks
  • Usa queue.Queue en lugar de listas o diccionarios compartidos cuando sea posible
  • Evita el estado mutable global; pasa datos a través de argumentos de función
  • Usa ThreadPoolExecutor en lugar de gestión manual de threads
  • Nunca asumas orden de operación entre threads
  • Prueba con threading.active_count() y logging para detectar fugas de threads

Ejemplos del Mundo Real

Web Scraping Concurrente

from concurrent.futures import ThreadPoolExecutor, as_completed
import urllib.request
import time
 
def fetch_page(url):
    """Obtiene una página web y retorna su longitud de contenido"""
    try:
        with urllib.request.urlopen(url, timeout=10) as response:
            content = response.read()
            return url, len(content), None
    except Exception as e:
        return url, 0, str(e)
 
urls = [
    "https://python.org",
    "https://docs.python.org",
    "https://pypi.org",
    "https://realpython.com",
    "https://github.com",
    "https://stackoverflow.com",
    "https://news.ycombinator.com",
    "https://httpbin.org",
]
 
# Secuencial
start = time.time()
for url in urls:
    fetch_page(url)
sequential_time = time.time() - start
 
# Concurrente con threads
start = time.time()
with ThreadPoolExecutor(max_workers=8) as executor:
    futures = {executor.submit(fetch_page, url): url for url in urls}
    for future in as_completed(futures):
        url, size, error = future.result()
        if error:
            print(f"  FAIL {url}: {error}")
        else:
            print(f"  OK   {url}: {size:,} bytes")
threaded_time = time.time() - start
 
print(f"\nSecuencial: {sequential_time:.2f}s")
print(f"Threaded:   {threaded_time:.2f}s")
print(f"Aceleración:    {sequential_time / threaded_time:.1f}x")

I/O de Archivos en Paralelo

from concurrent.futures import ThreadPoolExecutor
import os
import hashlib
 
def process_file(filepath):
    """Lee archivo y calcula su hash SHA-256"""
    with open(filepath, 'rb') as f:
        content = f.read()
    file_hash = hashlib.sha256(content).hexdigest()
    size = os.path.getsize(filepath)
    return filepath, file_hash, size
 
def hash_all_files(directory, pattern="*.py"):
    """Calcula hash de todos los archivos coincidentes en un directorio usando threads"""
    import glob
    files = glob.glob(os.path.join(directory, "**", pattern), recursive=True)
 
    results = {}
    with ThreadPoolExecutor(max_workers=8) as executor:
        futures = {executor.submit(process_file, f): f for f in files}
        for future in futures:
            try:
                path, hash_val, size = future.result()
                results[path] = {"hash": hash_val, "size": size}
            except Exception as e:
                print(f"Error procesando {futures[future]}: {e}")
 
    return results
 
# Uso
# file_hashes = hash_all_files("/path/to/project")

Llamadas a API Concurrentes con Lógica de Reintentos

from concurrent.futures import ThreadPoolExecutor, as_completed
import urllib.request
import json
import time
 
def fetch_api(endpoint, max_retries=3, backoff=1.0):
    """Obtiene endpoint de API con reintentos de backoff exponencial"""
    for attempt in range(max_retries):
        try:
            url = f"https://jsonplaceholder.typicode.com{endpoint}"
            req = urllib.request.Request(url)
            with urllib.request.urlopen(req, timeout=10) as response:
                data = json.loads(response.read())
                return {"endpoint": endpoint, "data": data, "error": None}
        except Exception as e:
            if attempt < max_retries - 1:
                wait = backoff * (2 ** attempt)
                time.sleep(wait)
            else:
                return {"endpoint": endpoint, "data": None, "error": str(e)}
 
endpoints = [f"/posts/{i}" for i in range(1, 21)]
 
start = time.time()
with ThreadPoolExecutor(max_workers=10) as executor:
    futures = [executor.submit(fetch_api, ep) for ep in endpoints]
    results = [f.result() for f in futures]
 
elapsed = time.time() - start
success = sum(1 for r in results if r["error"] is None)
print(f"Obtenidos {success}/{len(endpoints)} endpoints en {elapsed:.2f}s")

Tareas Periódicas en Segundo Plano

import threading
import time
 
class PeriodicTask:
    """Ejecuta una función a intervalos fijos en un thread de segundo plano"""
    def __init__(self, interval, func, *args, **kwargs):
        self.interval = interval
        self.func = func
        self.args = args
        self.kwargs = kwargs
        self._stop_event = threading.Event()
        self._thread = None
 
    def start(self):
        self._thread = threading.Thread(target=self._run, daemon=True)
        self._thread.start()
 
    def _run(self):
        while not self._stop_event.is_set():
            self.func(*self.args, **self.kwargs)
            self._stop_event.wait(self.interval)
 
    def stop(self):
        self._stop_event.set()
        if self._thread:
            self._thread.join()
 
# Uso
def check_health():
    print(f"Verificación de salud a las {time.strftime('%H:%M:%S')}")
 
task = PeriodicTask(2.0, check_health)
task.start()
time.sleep(7)
task.stop()
print("Detenido")

Rendimiento: Threading vs Multiprocessing vs Asyncio

La herramienta de concurrencia correcta depende de la carga de trabajo. Aquí hay una comparación del tiempo de reloj para tareas comunes:

TareaSecuencialThreading (4)Multiprocessing (4)Asyncio
100 peticiones HTTP (200ms cada una)20.0s5.1s5.8s4.9s
100 lecturas de archivo (10ms cada una)1.0s0.28s0.35s0.26s
100 tareas de CPU (100ms cada una)10.0s10.2s2.7s10.0s
50 consultas a BD (50ms cada una)2.5s0.68s0.85s0.62s
I/O + CPU mixto15.0s8.2s4.1s9.5s

Conclusiones clave:

  • El Threading ofrece aceleración de 3-5x en cargas de trabajo I/O-bound con cambios mínimos de código
  • El Multiprocessing es la única opción para paralelismo verdadero de CPU pero añade overhead de proceso
  • El Asyncio supera al threading en I/O de alta concurrencia pero requiere reescribir código con async/await
  • Para cargas mixtas, considera combinar threading para I/O y multiprocessing para tareas de CPU
import time
import threading
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
 
def io_task():
    time.sleep(0.2)
 
def cpu_task(n=2_000_000):
    return sum(i * i for i in range(n))
 
# Benchmark threading vs multiprocessing
NUM_TASKS = 20
 
# Threading - I/O bound
start = time.time()
with ThreadPoolExecutor(max_workers=4) as pool:
    list(pool.map(lambda _: io_task(), range(NUM_TASKS)))
print(f"Threading (I/O): {time.time() - start:.2f}s")
 
# Threading - CPU bound
start = time.time()
with ThreadPoolExecutor(max_workers=4) as pool:
    list(pool.map(lambda _: cpu_task(), range(NUM_TASKS)))
print(f"Threading (CPU): {time.time() - start:.2f}s")

Experimentando con Threading en RunCell

Depurar y perfilar código threaded puede ser desafiante. Cuando necesitas probar sincronización de threads, visualizar solapamientos de tiempo o diagnosticar condiciones de carrera de forma interactiva, RunCell (www.runcell.dev (opens in a new tab)) proporciona un entorno Jupyter potenciado por IA diseñado para este flujo de trabajo.

El agente de IA de RunCell puede analizar tu código de threading, identificar potenciales deadlocks antes de que ocurran, sugerir cantidades óptimas de workers basadas en tu carga de trabajo y ayudarte a entender por qué los threads se comportan de manera inesperada. Cuando un pool de threads produce resultados incorrectos intermitentemente, RunCell rastrea la línea de tiempo de ejecución para identificar el momento exacto en que el estado compartido se corrompe.

Si quieres visualizar las características de rendimiento de diferentes configuraciones de threading, PyGWalker (github.com/Kanaries/pygwalker) puede convertir tus DataFrames de benchmark en gráficos interactivos. Ejecuta benchmarks de threading, recolecta datos de tiempo en un DataFrame de pandas y explora los resultados con visualizaciones drag-and-drop para encontrar la cantidad óptima de threads para tu carga de trabajo.

FAQ

¿Cuál es la diferencia entre threading y multiprocessing en Python?

El threading ejecuta múltiples threads dentro de un único proceso, compartiendo memoria. El Global Interpreter Lock (GIL) previene que los threads ejecuten bytecode de Python en paralelo, haciendo que el threading sea efectivo solo para tareas I/O-bound como peticiones de red y operaciones de archivos. El multiprocessing crea procesos separados, cada uno con su propio intérprete de Python y espacio de memoria, permitiendo ejecución paralela verdadera para tareas CPU-bound. El threading tiene menor overhead (inicio más rápido, menos memoria), mientras que el multiprocessing evade el GIL para paralelismo genuino.

¿Es el threading de Python verdaderamente paralelo?

No, el threading de Python es concurrente pero no paralelo para código CPU-bound debido al GIL. Solo un thread ejecuta bytecode de Python a la vez. Sin embargo, el GIL se libera durante operaciones I/O (red, disco, base de datos), por lo que múltiples threads se ejecutan efectivamente en paralelo cuando esperan por I/O. Para paralelismo CPU-bound, usa el módulo multiprocessing o extensiones en C que liberan el GIL (como NumPy).

¿Cuántos threads debería usar en Python?

Para tareas I/O-bound, comienza con 5-20 threads dependiendo de los límites de rate del servicio externo y tu ancho de banda de red. Demasiados threads hacia un único servidor pueden causar rechazos de conexión o throttling. Para cargas mixtas, experimenta con cantidades de threads entre el número de núcleos de CPU y 4 veces esa cantidad. Usa ThreadPoolExecutor y haz benchmark con diferentes valores de max_workers para encontrar la cantidad óptima para tu carga de trabajo específica. El valor por defecto para ThreadPoolExecutor es min(32, os.cpu_count() + 4).

¿Cómo retorno un valor desde un thread en Python?

Los threads no retornan valores directamente desde su función objetivo. Los tres enfoques principales son: (1) Usa ThreadPoolExecutor.submit() que retorna un objeto Future donde llamas future.result() para obtener el valor de retorno. (2) Pasa un contenedor mutable (como un diccionario o lista) como argumento y haz que el thread escriba resultados en él, protegido por un Lock. (3) Usa queue.Queue donde el thread pone resultados en la cola y el thread principal lee de ella. ThreadPoolExecutor es el enfoque más limpio para la mayoría de los casos de uso.

¿Qué sucede si un thread de Python lanza una excepción?

En un threading.Thread crudo, una excepción no manejada termina ese thread silenciosamente y la excepción se pierde. El thread principal y otros threads continúan ejecutándose sin notificación. Con ThreadPoolExecutor, las excepciones son capturadas y relanzadas cuando llamas future.result(), haciendo que el manejo de errores sea mucho más confiable. Siempre usa bloques try/except dentro de funciones objetivo de threads o usa ThreadPoolExecutor para asegurar que las excepciones sean capturadas y manejadas adecuadamente.

Conclusión

El threading de Python es una herramienta poderosa para acelerar programas I/O-bound. Ejecutando peticiones de red, operaciones de archivos y consultas a bases de datos concurrentemente, puedes convertir un script secuencial de 20 segundos en uno que termina en 5 segundos con cambios mínimos de código.

Los puntos clave a recordar:

  • Usa threading para trabajo I/O-bound. El GIL previene el paralelismo de CPU, pero los threads solapan el tiempo de espera de I/O efectivamente.
  • Usa ThreadPoolExecutor para la mayoría de las necesidades de threading. Gestiona threads, recolecta resultados y propaga excepciones limpiamente.
  • Protege el estado compartido con locks. Las condiciones de carrera son el bug más común de threading, y queue.Queue elimina la mayoría de las preocupaciones de locking.
  • Evita deadlocks adquiriendo locks en un orden consistente y usando timeouts.
  • Elige la herramienta correcta: threading para I/O, multiprocessing para CPU, asyncio para miles de conexiones concurrentes.

Comienza con ThreadPoolExecutor y una simple llamada executor.map(). Mide la aceleración. Añade sincronización solo donde el estado mutable compartido lo demande. El threading no requiere una reescritura completa de tu código. Unas pocas líneas de concurrent.futures pueden entregar mejoras dramáticas de rendimiento para cualquier programa que pase tiempo esperando.

📚