Skip to content

Python Threading: Guia Completo de Multithreading com Exemplos

Updated on

Seu programa Python faz 50 chamadas de API, uma após a outra. Cada chamada leva 200 milissegundos de espera. A matemática é brutal: 10 segundos da vida do seu programa desperdiçados olhando para respostas de rede. Sua CPU fica ociosa com utilização quase zero enquanto seu script rasteja através de operações limitadas por I/O que poderiam rodar simultaneamente.

Este problema se agrava rapidamente. Web scrapers que buscam milhares de páginas sequencialmente. Scripts de processamento de arquivos que leem e escrevem um arquivo por vez. Consultas de banco de dados que bloqueiam toda a aplicação enquanto aguardam resultados. Cada segundo de espera ociosa é um segundo em que seu programa poderia estar fazendo trabalho útil.

O módulo threading do Python resolve isso executando múltiplas operações concorrentemente dentro de um único processo. Threads compartilham memória, iniciam rapidamente e são excelentes para cargas de trabalho limitadas por I/O onde o programa passa a maior parte do tempo esperando. Este guia cobre tudo, desde a criação básica de threads até padrões avançados de sincronização, com exemplos de código prontos para produção que você pode usar imediatamente.

📚

O que é Threading em Python?

Threading permite que um programa execute múltiplas operações concorrentemente dentro do mesmo processo. Cada thread compartilha o mesmo espaço de memória, tornando a comunicação entre threads rápida e direta.

O módulo threading do Python fornece uma interface de alto nível para criar e gerenciar threads. Mas há uma ressalva importante: o Global Interpreter Lock (GIL).

O Global Interpreter Lock (GIL)

O GIL é um mutex no CPython que permite apenas que uma thread execute bytecode Python por vez. Isso significa que threads não podem alcançar verdadeiro paralelismo para operações limitadas por CPU. No entanto, o GIL é liberado durante operações de I/O (chamadas de rede, leitura de arquivos, consultas de banco de dados), permitindo que outras threads rodem enquanto uma aguarda por I/O.

import threading
import time
 
def cpu_bound(n):
    """CPU-bound: GIL prevents parallel execution"""
    total = 0
    for i in range(n):
        total += i * i
    return total
 
def io_bound(url):
    """I/O-bound: GIL releases during network wait"""
    import urllib.request
    return urllib.request.urlopen(url).read()
 
# CPU-bound: 4 threads run one-at-a-time (no speedup)
start = time.time()
threads = [threading.Thread(target=cpu_bound, args=(10_000_000,)) for _ in range(4)]
for t in threads: t.start()
for t in threads: t.join()
print(f"CPU-bound with threads: {time.time() - start:.2f}s")
 
# I/O-bound: 4 threads overlap their waiting time (big speedup)

Isso significa que threading é ideal para tarefas limitadas por I/O, mas não para computação intensiva em CPU. Para trabalhos limitados por CPU, use o módulo multiprocessing em vez disso.

Quando Usar Threading vs Multiprocessing vs Asyncio

Recursothreadingmultiprocessingasyncio
Melhor paraTarefas limitadas por I/OTarefas limitadas por CPUI/O de alta concorrência
ParalelismoConcorrente (limitado pelo GIL)Paralelo verdadeiroConcorrente (thread única)
MemóriaCompartilhada (leve)Separada por processoCompartilhada (leve)
Custo de inicializaçãoBaixo (~1ms)Alto (~50-100ms)Muito baixo
ComunicaçãoAcesso direto à memóriaPipes, Queues, memória compartilhadaCorrotinas aguardáveis
EscalabilidadeDezenas a centenas de threadsLimitado por núcleos de CPUMilhares de corrotinas
ComplexidadeMédia (necessita locking)Média (serialização)Alta (sintaxe async/await)
Caso de usoWeb scraping, I/O de arquivos, chamadas de APIProcessamento de dados, treinamento de MLServidores web, apps de chat

Regra de ouro: Se seu programa aguarda rede ou disco, use threading. Se processa números, use multiprocessing. Se precisa de milhares de conexões concorrentes, use asyncio.

Fundamentos de Threads: Criando e Executando Threads

A Classe threading.Thread

A forma mais simples de criar uma thread é passando uma função alvo para threading.Thread:

import threading
import time
 
def download_file(filename):
    print(f"[{threading.current_thread().name}] Downloading {filename}...")
    time.sleep(2)  # Simulate download
    print(f"[{threading.current_thread().name}] Finished {filename}")
 
# Create threads
t1 = threading.Thread(target=download_file, args=("data.csv",))
t2 = threading.Thread(target=download_file, args=("report.pdf",))
 
# Start threads
t1.start()
t2.start()
 
# Wait for both to finish
t1.join()
t2.join()
 
print("All downloads complete")

Ambos os downloads rodam concorrentemente, terminando em aproximadamente 2 segundos em vez de 4.

start() e join()

  • start() inicia a execução da thread. Uma thread só pode ser iniciada uma vez.
  • join(timeout=None) bloqueia a thread chamadora até que a thread alvo termine. Passe um timeout em segundos para evitar esperar eternamente.
import threading
import time
 
def slow_task():
    time.sleep(10)
 
t = threading.Thread(target=slow_task)
t.start()
 
# Wait at most 3 seconds
t.join(timeout=3)
 
if t.is_alive():
    print("Thread still running after 3 seconds")
else:
    print("Thread finished")

Nomeando Threads

Threads nomeadas facilitam a depuração:

import threading
 
def worker():
    name = threading.current_thread().name
    print(f"Running in thread: {name}")
 
t = threading.Thread(target=worker, name="DataProcessor")
t.start()
t.join()

Threads Daemon

Threads daemon são threads de fundo que terminam automaticamente quando o programa principal sai. Threads não-daemon mantêm o programa vivo até que terminem.

import threading
import time
 
def background_monitor():
    while True:
        print("Monitoring system health...")
        time.sleep(5)
 
# Daemon thread: dies when main program exits
monitor = threading.Thread(target=background_monitor, daemon=True)
monitor.start()
 
# Main program does its work
time.sleep(12)
print("Main program exiting")
# monitor thread is killed automatically

Use threads daemon para tarefas de background como logging, monitoramento ou limpeza que não devem impedir a saída do programa.

Subclassificando Thread

Para comportamento de thread mais complexo, subclassifique threading.Thread:

import threading
import time
 
class FileProcessor(threading.Thread):
    def __init__(self, filepath):
        super().__init__()
        self.filepath = filepath
        self.result = None
 
    def run(self):
        """Override run() with thread logic"""
        print(f"Processing {self.filepath}")
        time.sleep(1)  # Simulate work
        self.result = f"Processed: {self.filepath}"
 
# Create and run
processor = FileProcessor("/data/report.csv")
processor.start()
processor.join()
print(processor.result)

Passando Argumentos para Threads

Usando args e kwargs

Passe argumentos posicionais com args (uma tupla) e argumentos nomeados com kwargs (um dict):

import threading
 
def fetch_data(url, timeout, retries=3, verbose=False):
    print(f"Fetching {url} (timeout={timeout}s, retries={retries}, verbose={verbose})")
 
# Positional args as tuple
t1 = threading.Thread(target=fetch_data, args=("https://api.example.com", 30))
 
# Keyword args as dict
t2 = threading.Thread(
    target=fetch_data,
    args=("https://api.example.com",),
    kwargs={"timeout": 30, "retries": 5, "verbose": True}
)
 
t1.start()
t2.start()
t1.join()
t2.join()

Erro comum: Esquecer a vírgula final em uma tupla de elemento único. args=("hello",) é uma tupla; args=("hello") é apenas uma string entre parênteses.

Coletando Resultados de Threads

Threads não retornam valores diretamente. Use estruturas de dados compartilhadas ou uma lista para coletar resultados:

import threading
 
results = {}
lock = threading.Lock()
 
def compute(task_id, value):
    result = value ** 2
    with lock:
        results[task_id] = result
 
threads = []
for i in range(5):
    t = threading.Thread(target=compute, args=(i, i * 10))
    threads.append(t)
    t.start()
 
for t in threads:
    t.join()
 
print(results)  # {0: 0, 1: 100, 2: 400, 3: 900, 4: 1600}

Uma abordagem mais limpa usa ThreadPoolExecutor (coberto a seguir), que lida com a coleta de resultados automaticamente.

ThreadPoolExecutor: A Abordagem Moderna

O módulo concurrent.futures fornece ThreadPoolExecutor, uma interface de alto nível que gerencia um pool de threads worker. Ele lida com a criação de threads, coleta de resultados e propagação de exceções automaticamente.

Uso Básico com submit()

from concurrent.futures import ThreadPoolExecutor, as_completed
import time
 
def fetch_url(url):
    time.sleep(1)  # Simulate network request
    return f"Content from {url}"
 
urls = [
    "https://example.com/page1",
    "https://example.com/page2",
    "https://example.com/page3",
    "https://example.com/page4",
    "https://example.com/page5",
]
 
with ThreadPoolExecutor(max_workers=3) as executor:
    # Submit tasks and get Future objects
    future_to_url = {executor.submit(fetch_url, url): url for url in urls}
 
    # Process results as they complete
    for future in as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
            print(f"{url}: {data}")
        except Exception as e:
            print(f"{url} generated an exception: {e}")

Usando map() para Resultados Ordenados

executor.map() retorna resultados na mesma ordem da entrada, similar ao map() built-in:

from concurrent.futures import ThreadPoolExecutor
 
def process_item(item):
    return item.upper()
 
items = ["apple", "banana", "cherry", "date"]
 
with ThreadPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(process_item, items))
 
print(results)  # ['APPLE', 'BANANA', 'CHERRY', 'DATE']

submit() vs map()

submit()map()
RetornaObjetos FutureIterador de resultados
Ordem dos resultadosOrdem de conclusão (com as_completed)Ordem da entrada
Tratamento de errosPor tarefa via future.result()Levanta exceção no primeiro erro
ArgumentosChamada de função únicaAplica função a cada item
Melhor paraTarefas heterogêneas, resultados antecipadosProcessamento em lote homogêneo

Tratamento de Exceções com Futures

from concurrent.futures import ThreadPoolExecutor, as_completed
 
def risky_task(n):
    if n == 3:
        raise ValueError(f"Bad input: {n}")
    return n * 10
 
with ThreadPoolExecutor(max_workers=4) as executor:
    futures = {executor.submit(risky_task, i): i for i in range(5)}
 
    for future in as_completed(futures):
        task_id = futures[future]
        try:
            result = future.result(timeout=5)
            print(f"Task {task_id}: {result}")
        except ValueError as e:
            print(f"Task {task_id} failed: {e}")
        except TimeoutError:
            print(f"Task {task_id} timed out")

Cancelando Tarefas

from concurrent.futures import ThreadPoolExecutor
import time
 
def long_task(n):
    time.sleep(5)
    return n
 
with ThreadPoolExecutor(max_workers=2) as executor:
    futures = [executor.submit(long_task, i) for i in range(10)]
 
    # Cancel pending tasks (already-running tasks cannot be cancelled)
    for f in futures[4:]:
        cancelled = f.cancel()
        print(f"Cancelled: {cancelled}")

Primitivas de Sincronização de Threads

Quando múltiplas threads acessam dados compartilhados, você precisa de sincronização para prevenir condições de corrida.

Lock

Um Lock garante que apenas uma thread entre em uma seção crítica por vez:

import threading
 
class BankAccount:
    def __init__(self, balance):
        self.balance = balance
        self.lock = threading.Lock()
 
    def withdraw(self, amount):
        with self.lock:  # Only one thread at a time
            if self.balance >= amount:
                self.balance -= amount
                return True
            return False
 
    def deposit(self, amount):
        with self.lock:
            self.balance += amount
 
account = BankAccount(1000)
 
def make_transactions():
    for _ in range(100):
        account.deposit(10)
        account.withdraw(10)
 
threads = [threading.Thread(target=make_transactions) for _ in range(10)]
for t in threads: t.start()
for t in threads: t.join()
 
print(f"Final balance: {account.balance}")  # Always 1000

Sem o lock, leituras e escritas concorrentes produzem resultados incorretos (uma condição de corrida).

RLock (Reentrant Lock)

Um RLock pode ser adquirido múltiplas vezes pela mesma thread. Isso previne deadlocks quando uma função que segura um lock chama outra função que também precisa do mesmo lock:

import threading
 
class SafeCache:
    def __init__(self):
        self._data = {}
        self._lock = threading.RLock()
 
    def get(self, key):
        with self._lock:
            return self._data.get(key)
 
    def set(self, key, value):
        with self._lock:
            self._data[key] = value
 
    def get_or_set(self, key, default):
        with self._lock:
            # This calls get(), which also acquires _lock
            # RLock allows this; a regular Lock would deadlock
            existing = self.get(key)
            if existing is None:
                self.set(key, default)
                return default
            return existing

Semaphore

Um Semaphore permite que um número fixo de threads acesse um recurso simultaneamente:

import threading
import time
 
# Allow max 3 concurrent database connections
db_semaphore = threading.Semaphore(3)
 
def query_database(query_id):
    with db_semaphore:
        print(f"Query {query_id}: connected (active connections: {3 - db_semaphore._value})")
        time.sleep(2)  # Simulate query
        print(f"Query {query_id}: done")
 
threads = [threading.Thread(target=query_database, args=(i,)) for i in range(10)]
for t in threads: t.start()
for t in threads: t.join()

Event

Um Event permite que uma thread sinalize outras threads em espera:

import threading
import time
 
data_ready = threading.Event()
shared_data = []
 
def producer():
    print("Producer: preparing data...")
    time.sleep(3)
    shared_data.extend([1, 2, 3, 4, 5])
    print("Producer: data ready, signaling consumers")
    data_ready.set()
 
def consumer(name):
    print(f"Consumer {name}: waiting for data...")
    data_ready.wait()  # Blocks until event is set
    print(f"Consumer {name}: got data = {shared_data}")
 
threads = [
    threading.Thread(target=producer),
    threading.Thread(target=consumer, args=("A",)),
    threading.Thread(target=consumer, args=("B",)),
]
for t in threads: t.start()
for t in threads: t.join()

Condition

Uma Condition combina um lock com a capacidade de esperar por uma notificação. É a fundação para padrões produtor-consumidor:

import threading
import time
import random
 
buffer = []
MAX_SIZE = 5
condition = threading.Condition()
 
def producer():
    for i in range(20):
        with condition:
            while len(buffer) >= MAX_SIZE:
                condition.wait()  # Wait until space available
            item = random.randint(1, 100)
            buffer.append(item)
            print(f"Produced: {item} (buffer size: {len(buffer)})")
            condition.notify_all()
        time.sleep(0.1)
 
def consumer(name):
    for _ in range(10):
        with condition:
            while len(buffer) == 0:
                condition.wait()  # Wait until item available
            item = buffer.pop(0)
            print(f"Consumer {name} consumed: {item} (buffer size: {len(buffer)})")
            condition.notify_all()
        time.sleep(0.15)
 
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer, args=("X",))
t3 = threading.Thread(target=consumer, args=("Y",))
t1.start(); t2.start(); t3.start()
t1.join(); t2.join(); t3.join()

Resumo de Primitivas de Sincronização

PrimitivaPropósitoQuando Usar
LockExclusão mútuaProteger estado mutável compartilhado
RLockMutex reentranteLocking aninhado na mesma thread
SemaphoreLimitar concorrênciaRate limiting, pools de conexão
EventSinalização únicaInicialização completa, sinal de desligamento
ConditionPadrão wait/notifyProdutor-consumidor, mudanças de estado
BarrierSincronizar N threadsTodas as threads devem alcançar um ponto antes de continuar

Estruturas de Dados Thread-Safe

queue.Queue

queue.Queue é a estrutura de dados thread-safe padrão. Ela lida com todo o locking internamente:

import threading
import queue
import time
 
task_queue = queue.Queue()
results = queue.Queue()
 
def worker():
    while True:
        item = task_queue.get()  # Blocks until item available
        if item is None:
            break
        result = item ** 2
        results.put(result)
        task_queue.task_done()
 
# Start 4 workers
workers = []
for _ in range(4):
    t = threading.Thread(target=worker, daemon=True)
    t.start()
    workers.append(t)
 
# Submit tasks
for i in range(20):
    task_queue.put(i)
 
# Wait for all tasks to complete
task_queue.join()
 
# Stop workers
for _ in range(4):
    task_queue.put(None)
for w in workers:
    w.join()
 
# Collect results
all_results = []
while not results.empty():
    all_results.append(results.get())
print(f"Results: {sorted(all_results)}")

queue.Queue também suporta:

  • Queue(maxsize=10): Bloqueia put() quando cheia
  • PriorityQueue(): Itens ordenados por prioridade
  • LifoQueue(): Last-in, first-out (comportamento de pilha)

collections.deque

collections.deque é thread-safe para operações append() e popleft() (atômicas ao nível C no CPython), tornando-se uma alternativa rápida para padrões simples de produtor-consumidor:

from collections import deque
import threading
import time
 
buffer = deque(maxlen=1000)
 
def producer():
    for i in range(100):
        buffer.append(i)
        time.sleep(0.01)
 
def consumer():
    consumed = 0
    while consumed < 100:
        if buffer:
            item = buffer.popleft()
            consumed += 1
        else:
            time.sleep(0.01)
    print(f"Consumed {consumed} items")
 
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t1.start(); t2.start()
t1.join(); t2.join()

Nota: Embora operações individuais append e popleft sejam thread-safe, verificar len(buffer) e depois fazer pop não é atômico. Para thread-safety completa, use queue.Queue.

Padrões Comuns de Threading

Padrão Produtor-Consumidor

O padrão clássico para desacoplar produção de dados do processamento de dados:

import threading
import queue
import time
import random
 
def producer(q, name, num_items):
    for i in range(num_items):
        item = f"{name}-item-{i}"
        q.put(item)
        print(f"Producer {name}: created {item}")
        time.sleep(random.uniform(0.05, 0.15))
    print(f"Producer {name}: done")
 
def consumer(q, name, stop_event):
    while not stop_event.is_set() or not q.empty():
        try:
            item = q.get(timeout=0.5)
            print(f"Consumer {name}: processing {item}")
            time.sleep(random.uniform(0.1, 0.2))
            q.task_done()
        except queue.Empty:
            continue
    print(f"Consumer {name}: shutting down")
 
task_queue = queue.Queue(maxsize=10)
stop_event = threading.Event()
 
producers = [
    threading.Thread(target=producer, args=(task_queue, "P1", 10)),
    threading.Thread(target=producer, args=(task_queue, "P2", 10)),
]
consumers = [
    threading.Thread(target=consumer, args=(task_queue, "C1", stop_event)),
    threading.Thread(target=consumer, args=(task_queue, "C2", stop_event)),
    threading.Thread(target=consumer, args=(task_queue, "C3", stop_event)),
]
 
for c in consumers: c.start()
for p in producers: p.start()
for p in producers: p.join()
 
task_queue.join()  # Wait for all items to be processed
stop_event.set()   # Signal consumers to stop
for c in consumers: c.join()

Pool de Threads Worker (Manual)

Quando você precisa de mais controle do que ThreadPoolExecutor fornece:

import threading
import queue
 
class WorkerPool:
    def __init__(self, num_workers):
        self.task_queue = queue.Queue()
        self.result_queue = queue.Queue()
        self.workers = []
 
        for _ in range(num_workers):
            t = threading.Thread(target=self._worker, daemon=True)
            t.start()
            self.workers.append(t)
 
    def _worker(self):
        while True:
            func, args, kwargs, future_id = self.task_queue.get()
            if func is None:
                break
            try:
                result = func(*args, **kwargs)
                self.result_queue.put((future_id, result, None))
            except Exception as e:
                self.result_queue.put((future_id, None, e))
            finally:
                self.task_queue.task_done()
 
    def submit(self, func, *args, **kwargs):
        future_id = id(func)  # Simple ID
        self.task_queue.put((func, args, kwargs, future_id))
        return future_id
 
    def shutdown(self):
        for _ in self.workers:
            self.task_queue.put((None, None, None, None))
        for w in self.workers:
            w.join()
 
# Usage
pool = WorkerPool(4)
for i in range(10):
    pool.submit(lambda x: x * x, i)
pool.task_queue.join()
pool.shutdown()

Pool de Threads com Rate Limiting

Controle a velocidade com que as threads fazem requisições externas:

import threading
import time
from concurrent.futures import ThreadPoolExecutor
 
class RateLimiter:
    def __init__(self, max_per_second):
        self.interval = 1.0 / max_per_second
        self.lock = threading.Lock()
        self.last_call = 0
 
    def wait(self):
        with self.lock:
            elapsed = time.time() - self.last_call
            wait_time = self.interval - elapsed
            if wait_time > 0:
                time.sleep(wait_time)
            self.last_call = time.time()
 
limiter = RateLimiter(max_per_second=5)
 
def rate_limited_fetch(url):
    limiter.wait()
    print(f"Fetching {url} at {time.time():.2f}")
    time.sleep(0.5)  # Simulate request
    return f"Data from {url}"
 
urls = [f"https://api.example.com/item/{i}" for i in range(20)]
 
with ThreadPoolExecutor(max_workers=10) as executor:
    results = list(executor.map(rate_limited_fetch, urls))

Armadilhas de Thread-Safety e Como Evitá-las

Condições de Corrida

Uma condição de corrida ocorre quando o resultado depende do timing da execução da thread:

import threading
 
# BAD: Race condition
counter = 0
 
def increment_unsafe():
    global counter
    for _ in range(100_000):
        counter += 1  # Read, increment, write: NOT atomic
 
threads = [threading.Thread(target=increment_unsafe) for _ in range(5)]
for t in threads: t.start()
for t in threads: t.join()
print(f"Expected: 500000, Got: {counter}")  # Often less than 500000
 
# GOOD: Protected with lock
counter = 0
lock = threading.Lock()
 
def increment_safe():
    global counter
    for _ in range(100_000):
        with lock:
            counter += 1
 
threads = [threading.Thread(target=increment_safe) for _ in range(5)]
for t in threads: t.start()
for t in threads: t.join()
print(f"Expected: 500000, Got: {counter}")  # Always 500000

Deadlocks

Um deadlock acontece quando duas threads seguram um lock que a outra precisa:

import threading
 
lock_a = threading.Lock()
lock_b = threading.Lock()
 
def thread_1():
    with lock_a:
        print("Thread 1: acquired lock_a")
        with lock_b:  # Waits forever if thread_2 holds lock_b
            print("Thread 1: acquired lock_b")
 
def thread_2():
    with lock_b:
        print("Thread 2: acquired lock_b")
        with lock_a:  # Waits forever if thread_1 holds lock_a
            print("Thread 2: acquired lock_a")
 
# This WILL deadlock
# t1 = threading.Thread(target=thread_1)
# t2 = threading.Thread(target=thread_2)
# t1.start(); t2.start()

Como prevenir deadlocks:

  1. Sempre adquira locks na mesma ordem:
def thread_1_fixed():
    with lock_a:    # Always lock_a first
        with lock_b:
            print("Thread 1: acquired both locks")
 
def thread_2_fixed():
    with lock_a:    # Always lock_a first (same order)
        with lock_b:
            print("Thread 2: acquired both locks")
  1. Use timeouts:
def safe_acquire():
    acquired_a = lock_a.acquire(timeout=2)
    if not acquired_a:
        print("Could not acquire lock_a, backing off")
        return
    try:
        acquired_b = lock_b.acquire(timeout=2)
        if not acquired_b:
            print("Could not acquire lock_b, releasing lock_a")
            return
        try:
            print("Acquired both locks safely")
        finally:
            lock_b.release()
    finally:
        lock_a.release()
  1. Minimize o escopo do lock: Segure locks pelo menor tempo possível.

Checklist de Thread-Safety

  • Proteja todo estado mutável compartilhado com locks
  • Use queue.Queue em vez de listas ou dicts compartilhados quando possível
  • Evite estado global mutável; passe dados através de argumentos de função
  • Use ThreadPoolExecutor em vez de gerenciamento manual de threads
  • Nunca assuma ordem de operação entre threads
  • Teste com threading.active_count() e logging para detectar vazamentos de thread

Exemplos do Mundo Real

Web Scraping Concorrente

from concurrent.futures import ThreadPoolExecutor, as_completed
import urllib.request
import time
 
def fetch_page(url):
    """Fetch a web page and return its content length"""
    try:
        with urllib.request.urlopen(url, timeout=10) as response:
            content = response.read()
            return url, len(content), None
    except Exception as e:
        return url, 0, str(e)
 
urls = [
    "https://python.org",
    "https://docs.python.org",
    "https://pypi.org",
    "https://realpython.com",
    "https://github.com",
    "https://stackoverflow.com",
    "https://news.ycombinator.com",
    "https://httpbin.org",
]
 
# Sequential
start = time.time()
for url in urls:
    fetch_page(url)
sequential_time = time.time() - start
 
# Concurrent with threads
start = time.time()
with ThreadPoolExecutor(max_workers=8) as executor:
    futures = {executor.submit(fetch_page, url): url for url in urls}
    for future in as_completed(futures):
        url, size, error = future.result()
        if error:
            print(f"  FAIL {url}: {error}")
        else:
            print(f"  OK   {url}: {size:,} bytes")
threaded_time = time.time() - start
 
print(f"\nSequential: {sequential_time:.2f}s")
print(f"Threaded:   {threaded_time:.2f}s")
print(f"Speedup:    {sequential_time / threaded_time:.1f}x")

I/O de Arquivos Paralelo

from concurrent.futures import ThreadPoolExecutor
import os
import hashlib
 
def process_file(filepath):
    """Read file and compute its SHA-256 hash"""
    with open(filepath, 'rb') as f:
        content = f.read()
    file_hash = hashlib.sha256(content).hexdigest()
    size = os.path.getsize(filepath)
    return filepath, file_hash, size
 
def hash_all_files(directory, pattern="*.py"):
    """Hash all matching files in a directory using threads"""
    import glob
    files = glob.glob(os.path.join(directory, "**", pattern), recursive=True)
 
    results = {}
    with ThreadPoolExecutor(max_workers=8) as executor:
        futures = {executor.submit(process_file, f): f for f in files}
        for future in futures:
            try:
                path, hash_val, size = future.result()
                results[path] = {"hash": hash_val, "size": size}
            except Exception as e:
                print(f"Error processing {futures[future]}: {e}")
 
    return results
 
# Usage
# file_hashes = hash_all_files("/path/to/project")

Chamadas de API Concorrentes com Lógica de Retry

from concurrent.futures import ThreadPoolExecutor, as_completed
import urllib.request
import json
import time
 
def fetch_api(endpoint, max_retries=3, backoff=1.0):
    """Fetch API endpoint with exponential backoff retry"""
    for attempt in range(max_retries):
        try:
            url = f"https://jsonplaceholder.typicode.com{endpoint}"
            req = urllib.request.Request(url)
            with urllib.request.urlopen(req, timeout=10) as response:
                data = json.loads(response.read())
                return {"endpoint": endpoint, "data": data, "error": None}
        except Exception as e:
            if attempt < max_retries - 1:
                wait = backoff * (2 ** attempt)
                time.sleep(wait)
            else:
                return {"endpoint": endpoint, "data": None, "error": str(e)}
 
endpoints = [f"/posts/{i}" for i in range(1, 21)]
 
start = time.time()
with ThreadPoolExecutor(max_workers=10) as executor:
    futures = [executor.submit(fetch_api, ep) for ep in endpoints]
    results = [f.result() for f in futures]
 
elapsed = time.time() - start
success = sum(1 for r in results if r["error"] is None)
print(f"Fetched {success}/{len(endpoints)} endpoints in {elapsed:.2f}s")

Tarefas Periódicas em Background

import threading
import time
 
class PeriodicTask:
    """Run a function at fixed intervals in a background thread"""
    def __init__(self, interval, func, *args, **kwargs):
        self.interval = interval
        self.func = func
        self.args = args
        self.kwargs = kwargs
        self._stop_event = threading.Event()
        self._thread = None
 
    def start(self):
        self._thread = threading.Thread(target=self._run, daemon=True)
        self._thread.start()
 
    def _run(self):
        while not self._stop_event.is_set():
            self.func(*self.args, **self.kwargs)
            self._stop_event.wait(self.interval)
 
    def stop(self):
        self._stop_event.set()
        if self._thread:
            self._thread.join()
 
# Usage
def check_health():
    print(f"Health check at {time.strftime('%H:%M:%S')}")
 
task = PeriodicTask(2.0, check_health)
task.start()
time.sleep(7)
task.stop()
print("Stopped")

Performance: Threading vs Multiprocessing vs Asyncio

A ferramenta certa de concorrência depende da carga de trabalho. Aqui está uma comparação de tempo de relógio para tarefas comuns:

TarefaSequencialThreading (4)Multiprocessing (4)Asyncio
100 requisições HTTP (200ms cada)20.0s5.1s5.8s4.9s
100 leituras de arquivo (10ms cada)1.0s0.28s0.35s0.26s
100 tarefas de CPU (100ms cada)10.0s10.2s2.7s10.0s
50 consultas de DB (50ms cada)2.5s0.68s0.85s0.62s
I/O + CPU misto15.0s8.2s4.1s9.5s

Principais conclusões:

  • Threading entrega aceleração de 3-5x em cargas de trabalho limitadas por I/O com mudanças mínimas de código
  • Multiprocessing é a única opção para paralelismo verdadeiro de CPU mas adiciona overhead de processo
  • Asyncio supera o threading em I/O de alta concorrência mas requer reescrever código com async/await
  • Para cargas mistas, considere combinar threading para I/O e multiprocessing para tarefas de CPU
import time
import threading
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
 
def io_task():
    time.sleep(0.2)
 
def cpu_task(n=2_000_000):
    return sum(i * i for i in range(n))
 
# Benchmark threading vs multiprocessing
NUM_TASKS = 20
 
# Threading - I/O bound
start = time.time()
with ThreadPoolExecutor(max_workers=4) as pool:
    list(pool.map(lambda _: io_task(), range(NUM_TASKS)))
print(f"Threading (I/O): {time.time() - start:.2f}s")
 
# Threading - CPU bound
start = time.time()
with ThreadPoolExecutor(max_workers=4) as pool:
    list(pool.map(lambda _: cpu_task(), range(NUM_TASKS)))
print(f"Threading (CPU): {time.time() - start:.2f}s")

Experimentando com Threading no RunCell

Depurar e analisar código threaded pode ser desafiador. Quando você precisa testar sincronização de threads, visualizar sobreposições de timing ou diagnosticar condições de corrida interativamente, o RunCell (www.runcell.dev (opens in a new tab)) fornece um ambiente Jupyter powered by AI projetado para este fluxo de trabalho.

O agente de IA do RunCell pode analisar seu código de threading, identificar potenciais deadlocks antes que aconteçam, sugerir contagens ótimas de workers baseadas na sua carga de trabalho, e ajudá-lo a entender por que threads se comportam de forma inesperada. Quando um pool de threads produz resultados incorretos intermitentemente, o RunCell rastreia a linha do tempo de execução para identificar o momento exato em que o estado compartilhado é corrompido.

Se você quer visualizar as características de performance de diferentes configurações de threading, o PyGWalker (github.com/Kanaries/pygwalker) pode transformar seus DataFrames de benchmark em gráficos interativos. Execute benchmarks de threading, colete dados de timing em um DataFrame pandas, e explore os resultados com visualizações drag-and-drop para encontrar a contagem ótima de threads para sua carga de trabalho.

FAQ

Qual é a diferença entre threading e multiprocessing em Python?

Threading executa múltiplas threads dentro de um único processo, compartilhando memória. O Global Interpreter Lock (GIL) impede que threads executem bytecode Python em paralelo, tornando o threading efetivo apenas para tarefas limitadas por I/O como requisições de rede e operações de arquivo. Multiprocessing cria processos separados, cada um com seu próprio interpretador Python e espaço de memória, habilitando execução paralela verdadeira para tarefas limitadas por CPU. Threading tem menor overhead (inicialização mais rápida, menos memória), enquanto multiprocessing contorna o GIL para paralelismo genuíno.

O threading em Python é verdadeiramente paralelo?

Não, o threading em Python é concorrente mas não paralelo para código limitado por CPU devido ao GIL. Apenas uma thread executa bytecode Python por vez. No entanto, o GIL é liberado durante operações de I/O (rede, disco, banco de dados), então múltiplas threads efetivamente rodam em paralelo quando aguardam por I/O. Para paralelismo limitado por CPU, use o módulo multiprocessing ou extensões C que liberam o GIL (como NumPy).

Quantas threads devo usar em Python?

Para tarefas limitadas por I/O, comece com 5-20 threads dependendo dos limites de rate do serviço externo e da sua largura de banda de rede. Threads demais para um único servidor podem causar recusas de conexão ou throttling. Para cargas mistas, experimente com contagens de threads entre o número de núcleos de CPU e 4x a contagem de núcleos. Use ThreadPoolExecutor e faça benchmark com diferentes valores de max_workers para encontrar a contagem ótima para sua carga de trabalho específica. O padrão para ThreadPoolExecutor é min(32, os.cpu_count() + 4).

Como retorno um valor de uma thread em Python?

Threads não retornam valores diretamente de sua função alvo. As três abordagens principais são: (1) Use ThreadPoolExecutor.submit() que retorna um objeto Future onde você chama future.result() para obter o valor de retorno. (2) Passe um container mutável (como um dicionário ou lista) como argumento e faça a thread escrever resultados nele, protegido por um Lock. (3) Use queue.Queue onde a thread coloca resultados na fila e a thread principal lê dele. ThreadPoolExecutor é a abordagem mais limpa para a maioria dos casos.

O que acontece se uma thread Python levantar uma exceção?

Em uma threading.Thread crua, uma exceção não tratada termina aquela thread silenciosamente e a exceção é perdida. A thread principal e outras threads continuam rodando sem qualquer notificação. Com ThreadPoolExecutor, exceções são capturadas e relançadas quando você chama future.result(), tornando o tratamento de erros muito mais confiável. Sempre use blocos try/except dentro de funções alvo de thread ou use ThreadPoolExecutor para garantir que exceções sejam propriamente capturadas e tratadas.

Conclusão

O threading em Python é uma ferramenta poderosa para acelerar programas limitados por I/O. Ao executar requisições de rede, operações de arquivo e consultas de banco de dados concorrentemente, você pode transformar um script sequencial de 20 segundos em um que termina em 5 segundos com mudanças mínimas de código.

Os pontos chave para lembrar:

  • Use threading para trabalhos limitados por I/O. O GIL impede paralelismo de CPU, mas threads sobrepõem efetivamente o tempo de espera de I/O.
  • Use ThreadPoolExecutor para a maioria das necessidades de threading. Ele gerencia threads, coleta resultados e propaga exceções de forma limpa.
  • Proteja estado compartilhado com locks. Condições de corrida são o bug mais comum de threading, e queue.Queue elimina a maioria das preocupações de locking.
  • Evite deadlocks adquirindo locks em uma ordem consistente e usando timeouts.
  • Escolha a ferramenta certa: threading para I/O, multiprocessing para CPU, asyncio para milhares de conexões concorrentes.

Comece com ThreadPoolExecutor e uma simples chamada executor.map(). Meça a aceleração. Adicione sincronização apenas onde estado mutável compartilhado demanda. Threading não requer uma reescrita completa do seu código. Poucas linhas de concurrent.futures podem entregar melhorias dramáticas de performance para qualquer programa que gaste tempo esperando.

📚