Skip to content

Python Threading : Guide Complet du Multithreading avec Exemples

Updated on

Votre programme Python effectue 50 appels API, l'un après l'autre. Chaque appel prend 200 millisecondes d'attente. Le calcul est brutal : 10 secondes de la vie de votre programme perdues à attendre les réponses réseau. Votre CPU reste inactif avec une utilisation proche de zéro pendant que votre script rampe à travers des opérations I/O-bound qui pourraient s'exécuter simultanément.

Ce problème s'aggrave rapidement. Des scrapers web qui récupèrent des milliers de pages séquentiellement. Des scripts de traitement de fichiers qui lisent et écrivent un fichier à la fois. Des requêtes de base de données qui bloquent l'application entière pendant l'attente des résultats. Chaque seconde d'attente inactive est une seconde pendant laquelle votre programme pourrait effectuer un travail utile.

Le module threading de Python résout cela en exécutant plusieurs opérations simultanément au sein d'un seul processus. Les threads partagent la mémoire, démarrent rapidement et excellent dans les charges de travail I/O-bound où le programme passe la plupart de son temps à attendre. Ce guide couvre tout, de la création de threads basiques aux modèles de synchronisation avancés, avec des exemples de code prêts pour la production que vous pouvez utiliser immédiatement.

📚

Qu'est-ce que le Threading en Python ?

Le threading permet à un programme d'exécuter plusieurs opérations simultanément au sein du même processus. Chaque thread partage le même espace mémoire, rendant la communication entre threads rapide et simple.

Le module threading de Python fournit une interface de haut niveau pour créer et gérer des threads. Mais il y a une mise en garde importante : le Global Interpreter Lock (GIL).

Le Global Interpreter Lock (GIL)

Le GIL est un mutex dans CPython qui ne permet à un seul thread à la fois d'exécuter du bytecode Python. Cela signifie que les threads ne peuvent pas atteindre un véritable parallélisme pour les opérations CPU-bound. Cependant, le GIL se libère pendant les opérations I/O (appels réseau, lectures de fichiers, requêtes de base de données), permettant aux autres threads de s'exécuter pendant qu'un autre attend les E/S.

import threading
import time
 
def cpu_bound(n):
    """CPU-bound: GIL prevents parallel execution"""
    total = 0
    for i in range(n):
        total += i * i
    return total
 
def io_bound(url):
    """I/O-bound: GIL releases during network wait"""
    import urllib.request
    return urllib.request.urlopen(url).read()
 
# CPU-bound: 4 threads run one-at-a-time (no speedup)
start = time.time()
threads = [threading.Thread(target=cpu_bound, args=(10_000_000,)) for _ in range(4)]
for t in threads: t.start()
for t in threads: t.join()
print(f"CPU-bound with threads: {time.time() - start:.2f}s")
 
# I/O-bound: 4 threads overlap their waiting time (big speedup)

Cela signifie que le threading est idéal pour les tâches I/O-bound mais pas pour les calculs intensifs en CPU. Pour le travail CPU-bound, utilisez plutôt le module multiprocessing.

Quand utiliser Threading vs Multiprocessing vs Asyncio

Fonctionnalitéthreadingmultiprocessingasyncio
Idéal pourTâches I/O-boundTâches CPU-boundE/S haute concurrence
ParallélismeConcurrent (limité par GIL)Parallélisme réelConcurrent (thread unique)
MémoirePartagée (léger)Séparée par processusPartagée (léger)
Coût de démarrageFaible (~1ms)Élevé (~50-100ms)Très faible
CommunicationAccès mémoire directPipes, Queues, mémoire partagéeCoroutines awaitables
ScalabilitéDizaines à centaines de threadsLimité par les cœurs CPUMilliers de coroutines
ComplexitéMoyenne (locking nécessaire)Moyenne (sérialisation)Élevée (syntaxe async/await)
Cas d'usageWeb scraping, E/S fichiers, appels APITraitement de données, entraînement MLServeurs web, applications chat

Règle empirique : Si votre programme attend le réseau ou le disque, utilisez le threading. S'il effectue des calculs numériques, utilisez le multiprocessing. Si vous avez besoin de milliers de connexions simultanées, utilisez asyncio.

Bases des Threads : Création et Exécution

La classe threading.Thread

Le moyen le plus simple de créer un thread est de passer une fonction cible à threading.Thread :

import threading
import time
 
def download_file(filename):
    print(f"[{threading.current_thread().name}] Downloading {filename}...")
    time.sleep(2)  # Simulate download
    print(f"[{threading.current_thread().name}] Finished {filename}")
 
# Create threads
t1 = threading.Thread(target=download_file, args=("data.csv",))
t2 = threading.Thread(target=download_file, args=("report.pdf",))
 
# Start threads
t1.start()
t2.start()
 
# Wait for both to finish
t1.join()
t2.join()
 
print("All downloads complete")

Les deux téléchargements s'exécutent simultanément, se terminant en environ 2 secondes au lieu de 4.

start() et join()

  • start() démarre l'exécution du thread. Un thread ne peut être démarré qu'une seule fois.
  • join(timeout=None) bloque le thread appelant jusqu'à ce que le thread cible se termine. Passez un timeout en secondes pour éviter d'attendre indéfiniment.
import threading
import time
 
def slow_task():
    time.sleep(10)
 
t = threading.Thread(target=slow_task)
t.start()
 
# Wait at most 3 seconds
t.join(timeout=3)
 
if t.is_alive():
    print("Thread still running after 3 seconds")
else:
    print("Thread finished")

Nommer les Threads

Les threads nommés facilitent le débogage :

import threading
 
def worker():
    name = threading.current_thread().name
    print(f"Running in thread: {name}")
 
t = threading.Thread(target=worker, name="DataProcessor")
t.start()
t.join()

Threads Daemons

Les threads daemons sont des threads d'arrière-plan qui se terminent automatiquement quand le programme principal se termine. Les threads non-daemons maintiennent le programme en vie jusqu'à ce qu'ils finissent.

import threading
import time
 
def background_monitor():
    while True:
        print("Monitoring system health...")
        time.sleep(5)
 
# Daemon thread: dies when main program exits
monitor = threading.Thread(target=background_monitor, daemon=True)
monitor.start()
 
# Main program does its work
time.sleep(12)
print("Main program exiting")
# monitor thread is killed automatically

Utilisez les threads daemons pour les tâches de journalisation d'arrière-plan, la surveillance ou le nettoyage qui ne devraient pas empêcher la sortie du programme.

Héritage de Thread

Pour un comportement de thread plus complexe, héritez de threading.Thread :

import threading
import time
 
class FileProcessor(threading.Thread):
    def __init__(self, filepath):
        super().__init__()
        self.filepath = filepath
        self.result = None
 
    def run(self):
        """Override run() with thread logic"""
        print(f"Processing {self.filepath}")
        time.sleep(1)  # Simulate work
        self.result = f"Processed: {self.filepath}"
 
# Create and run
processor = FileProcessor("/data/report.csv")
processor.start()
processor.join()
print(processor.result)

Passage d'Arguments aux Threads

Utilisation de args et kwargs

Passez les arguments positionnels avec args (un tuple) et les arguments nommés avec kwargs (un dictionnaire) :

import threading
 
def fetch_data(url, timeout, retries=3, verbose=False):
    print(f"Fetching {url} (timeout={timeout}s, retries={retries}, verbose={verbose})")
 
# Positional args as tuple
t1 = threading.Thread(target=fetch_data, args=("https://api.example.com", 30))
 
# Keyword args as dict
t2 = threading.Thread(
    target=fetch_data,
    args=("https://api.example.com",),
    kwargs={"timeout": 30, "retries": 5, "verbose": True}
)
 
t1.start()
t2.start()
t1.join()
t2.join()

Erreur courante : Oublier la virgule finale dans un tuple à un seul élément. args=("hello",) est un tuple ; args=("hello") est juste une chaîne entre parenthèses.

Collecte des Résultats des Threads

Les threads ne retournent pas de valeurs directement. Utilisez des structures de données partagées ou une liste pour collecter les résultats :

import threading
 
results = {}
lock = threading.Lock()
 
def compute(task_id, value):
    result = value ** 2
    with lock:
        results[task_id] = result
 
threads = []
for i in range(5):
    t = threading.Thread(target=compute, args=(i, i * 10))
    threads.append(t)
    t.start()
 
for t in threads:
    t.join()
 
print(results)  # {0: 0, 1: 100, 2: 400, 3: 900, 4: 1600}

Une approche plus propre utilise ThreadPoolExecutor (couvert ensuite), qui gère la collecte des résultats automatiquement.

ThreadPoolExecutor : L'Approche Moderne

Le module concurrent.futures fournit ThreadPoolExecutor, une interface de haut niveau qui gère un pool de threads workers. Il gère la création de threads, la collecte des résultats et la propagation des exceptions automatiquement.

Utilisation Basique avec submit()

from concurrent.futures import ThreadPoolExecutor, as_completed
import time
 
def fetch_url(url):
    time.sleep(1)  # Simulate network request
    return f"Content from {url}"
 
urls = [
    "https://example.com/page1",
    "https://example.com/page2",
    "https://example.com/page3",
    "https://example.com/page4",
    "https://example.com/page5",
]
 
with ThreadPoolExecutor(max_workers=3) as executor:
    # Submit tasks and get Future objects
    future_to_url = {executor.submit(fetch_url, url): url for url in urls}
 
    # Process results as they complete
    for future in as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
            print(f"{url}: {data}")
        except Exception as e:
            print(f"{url} generated an exception: {e}")

Utilisation de map() pour des Résultats Ordonnés

executor.map() retourne les résultats dans le même ordre que l'entrée, similaire à map() intégré :

from concurrent.futures import ThreadPoolExecutor
 
def process_item(item):
    return item.upper()
 
items = ["apple", "banana", "cherry", "date"]
 
with ThreadPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(process_item, items))
 
print(results)  # ['APPLE', 'BANANA', 'CHERRY', 'DATE']

submit() vs map()

submit()map()
RetourneObjets FutureItérateur de résultats
Ordre des résultatsOrdre de complétion (avec as_completed)Ordre d'entrée
Gestion des erreursPar tâche via future.result()Lève une exception à la première défaillance
ArgumentsAppel de fonction uniqueApplique la fonction à chaque élément
Idéal pourTâches hétérogènes, résultats précocesTraitement par lots homogène

Gestion des Exceptions avec Futures

from concurrent.futures import ThreadPoolExecutor, as_completed
 
def risky_task(n):
    if n == 3:
        raise ValueError(f"Bad input: {n}")
    return n * 10
 
with ThreadPoolExecutor(max_workers=4) as executor:
    futures = {executor.submit(risky_task, i): i for i in range(5)}
 
    for future in as_completed(futures):
        task_id = futures[future]
        try:
            result = future.result(timeout=5)
            print(f"Task {task_id}: {result}")
        except ValueError as e:
            print(f"Task {task_id} failed: {e}")
        except TimeoutError:
            print(f"Task {task_id} timed out")

Annulation de Tâches

from concurrent.futures import ThreadPoolExecutor
import time
 
def long_task(n):
    time.sleep(5)
    return n
 
with ThreadPoolExecutor(max_workers=2) as executor:
    futures = [executor.submit(long_task, i) for i in range(10)]
 
    # Cancel pending tasks (already-running tasks cannot be cancelled)
    for f in futures[4:]:
        cancelled = f.cancel()
        print(f"Cancelled: {cancelled}")

Primitives de Synchronisation des Threads

Quand plusieurs threads accèdent à des données partagées, vous avez besoin de synchronisation pour prévenir les conditions de course.

Lock

Un Lock garantit qu'un seul thread entre dans une section critique à la fois :

import threading
 
class BankAccount:
    def __init__(self, balance):
        self.balance = balance
        self.lock = threading.Lock()
 
    def withdraw(self, amount):
        with self.lock:  # Only one thread at a time
            if self.balance >= amount:
                self.balance -= amount
                return True
            return False
 
    def deposit(self, amount):
        with self.lock:
            self.balance += amount
 
account = BankAccount(1000)
 
def make_transactions():
    for _ in range(100):
        account.deposit(10)
        account.withdraw(10)
 
threads = [threading.Thread(target=make_transactions) for _ in range(10)]
for t in threads: t.start()
for t in threads: t.join()
 
print(f"Final balance: {account.balance}")  # Always 1000

Sans le lock, les lectures et écritures concurrentes produiraient des résultats incorrects (une condition de course).

RLock (Reentrant Lock)

Un RLock peut être acquis plusieurs fois par le même thread. Cela évite les deadlocks quand une fonction détenant un lock appelle une autre fonction qui a aussi besoin du même lock :

import threading
 
class SafeCache:
    def __init__(self):
        self._data = {}
        self._lock = threading.RLock()
 
    def get(self, key):
        with self._lock:
            return self._data.get(key)
 
    def set(self, key, value):
        with self._lock:
            self._data[key] = value
 
    def get_or_set(self, key, default):
        with self._lock:
            # This calls get(), which also acquires _lock
            # RLock allows this; a regular Lock would deadlock
            existing = self.get(key)
            if existing is None:
                self.set(key, default)
                return default
            return existing

Semaphore

Un Semaphore permet à un nombre fixe de threads d'accéder simultanément à une ressource :

import threading
import time
 
# Allow max 3 concurrent database connections
db_semaphore = threading.Semaphore(3)
 
def query_database(query_id):
    with db_semaphore:
        print(f"Query {query_id}: connected (active connections: {3 - db_semaphore._value})")
        time.sleep(2)  # Simulate query
        print(f"Query {query_id}: done")
 
threads = [threading.Thread(target=query_database, args=(i,)) for i in range(10)]
for t in threads: t.start()
for t in threads: t.join()

Event

Un Event permet à un thread de signaler d'autres threads en attente :

import threading
import time
 
data_ready = threading.Event()
shared_data = []
 
def producer():
    print("Producer: preparing data...")
    time.sleep(3)
    shared_data.extend([1, 2, 3, 4, 5])
    print("Producer: data ready, signaling consumers")
    data_ready.set()
 
def consumer(name):
    print(f"Consumer {name}: waiting for data...")
    data_ready.wait()  # Blocks until event is set
    print(f"Consumer {name}: got data = {shared_data}")
 
threads = [
    threading.Thread(target=producer),
    threading.Thread(target=consumer, args=("A",)),
    threading.Thread(target=consumer, args=("B",)),
]
for t in threads: t.start()
for t in threads: t.join()

Condition

Une Condition combine un lock avec la capacité d'attendre une notification. C'est le fondement des modèles producteur-consommateur :

import threading
import time
import random
 
buffer = []
MAX_SIZE = 5
condition = threading.Condition()
 
def producer():
    for i in range(20):
        with condition:
            while len(buffer) >= MAX_SIZE:
                condition.wait()  # Wait until space available
            item = random.randint(1, 100)
            buffer.append(item)
            print(f"Produced: {item} (buffer size: {len(buffer)})")
            condition.notify_all()
        time.sleep(0.1)
 
def consumer(name):
    for _ in range(10):
        with condition:
            while len(buffer) == 0:
                condition.wait()  # Wait until item available
            item = buffer.pop(0)
            print(f"Consumer {name} consumed: {item} (buffer size: {len(buffer)})")
            condition.notify_all()
        time.sleep(0.15)
 
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer, args=("X",))
t3 = threading.Thread(target=consumer, args=("Y",))
t1.start(); t2.start(); t3.start()
t1.join(); t2.join(); t3.join()

Résumé des Primitives de Synchronisation

PrimitiveObjectifQuand l'utiliser
LockExclusion mutuelleProtéger l'état mutable partagé
RLockMutex réentrantVerrouillage imbriqué dans le même thread
SemaphoreLimiter la concurrenceLimitation de débit, pools de connexions
EventSignal ponctuelInitialisation terminée, signal d'arrêt
ConditionModèle attendre/notifierProducteur-consommateur, changements d'état
BarrierSynchroniser N threadsTous les threads doivent atteindre un point avant de continuer

Structures de Données Thread-Safe

queue.Queue

queue.Queue est la structure de données thread-safe de référence. Elle gère tout le verrouillage en interne :

import threading
import queue
import time
 
task_queue = queue.Queue()
results = queue.Queue()
 
def worker():
    while True:
        item = task_queue.get()  # Blocks until item available
        if item is None:
            break
        result = item ** 2
        results.put(result)
        task_queue.task_done()
 
# Start 4 workers
workers = []
for _ in range(4):
    t = threading.Thread(target=worker, daemon=True)
    t.start()
    workers.append(t)
 
# Submit tasks
for i in range(20):
    task_queue.put(i)
 
# Wait for all tasks to complete
task_queue.join()
 
# Stop workers
for _ in range(4):
    task_queue.put(None)
for w in workers:
    w.join()
 
# Collect results
all_results = []
while not results.empty():
    all_results.append(results.get())
print(f"Results: {sorted(all_results)}")

queue.Queue supporte aussi :

  • Queue(maxsize=10) : Bloque put() quand plein
  • PriorityQueue() : Éléments triés par priorité
  • LifoQueue() : Dernier entré, premier sorti (comportement de pile)

collections.deque

collections.deque est thread-safe pour les opérations append() et popleft() (atomique au niveau C dans CPython), ce qui en fait une alternative rapide pour les modèles producteur-consommateur simples :

from collections import deque
import threading
import time
 
buffer = deque(maxlen=1000)
 
def producer():
    for i in range(100):
        buffer.append(i)
        time.sleep(0.01)
 
def consumer():
    consumed = 0
    while consumed < 100:
        if buffer:
            item = buffer.popleft()
            consumed += 1
        else:
            time.sleep(0.01)
    print(f"Consumed {consumed} items")
 
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t1.start(); t2.start()
t1.join(); t2.join()

Note : Bien que les opérations individuelles append et popleft soient thread-safe, vérifier len(buffer) puis faire un pop n'est pas atomique. Pour une sécurité complète, utilisez queue.Queue.

Modèles de Threading Courants

Modèle Producteur-Consommateur

Le modèle classique pour découpler la production de données du traitement :

import threading
import queue
import time
import random
 
def producer(q, name, num_items):
    for i in range(num_items):
        item = f"{name}-item-{i}"
        q.put(item)
        print(f"Producer {name}: created {item}")
        time.sleep(random.uniform(0.05, 0.15))
    print(f"Producer {name}: done")
 
def consumer(q, name, stop_event):
    while not stop_event.is_set() or not q.empty():
        try:
            item = q.get(timeout=0.5)
            print(f"Consumer {name}: processing {item}")
            time.sleep(random.uniform(0.1, 0.2))
            q.task_done()
        except queue.Empty:
            continue
    print(f"Consumer {name}: shutting down")
 
task_queue = queue.Queue(maxsize=10)
stop_event = threading.Event()
 
producers = [
    threading.Thread(target=producer, args=(task_queue, "P1", 10)),
    threading.Thread(target=producer, args=(task_queue, "P2", 10)),
]
consumers = [
    threading.Thread(target=consumer, args=(task_queue, "C1", stop_event)),
    threading.Thread(target=consumer, args=(task_queue, "C2", stop_event)),
    threading.Thread(target=consumer, args=(task_queue, "C3", stop_event)),
]
 
for c in consumers: c.start()
for p in producers: p.start()
for p in producers: p.join()
 
task_queue.join()  # Wait for all items to be processed
stop_event.set()   # Signal consumers to stop
for c in consumers: c.join()

Pool de Threads Workers (Manuel)

Quand vous avez besoin de plus de contrôle que ce que ThreadPoolExecutor fournit :

import threading
import queue
 
class WorkerPool:
    def __init__(self, num_workers):
        self.task_queue = queue.Queue()
        self.result_queue = queue.Queue()
        self.workers = []
 
        for _ in range(num_workers):
            t = threading.Thread(target=self._worker, daemon=True)
            t.start()
            self.workers.append(t)
 
    def _worker(self):
        while True:
            func, args, kwargs, future_id = self.task_queue.get()
            if func is None:
                break
            try:
                result = func(*args, **kwargs)
                self.result_queue.put((future_id, result, None))
            except Exception as e:
                self.result_queue.put((future_id, None, e))
            finally:
                self.task_queue.task_done()
 
    def submit(self, func, *args, **kwargs):
        future_id = id(func)  # Simple ID
        self.task_queue.put((func, args, kwargs, future_id))
        return future_id
 
    def shutdown(self):
        for _ in self.workers:
            self.task_queue.put((None, None, None, None))
        for w in self.workers:
            w.join()
 
# Usage
pool = WorkerPool(4)
for i in range(10):
    pool.submit(lambda x: x * x, i)
pool.task_queue.join()
pool.shutdown()

Pool de Threads avec Limitation de Débit

Contrôlez la vitesse à laquelle les threads font des requêtes externes :

import threading
import time
from concurrent.futures import ThreadPoolExecutor
 
class RateLimiter:
    def __init__(self, max_per_second):
        self.interval = 1.0 / max_per_second
        self.lock = threading.Lock()
        self.last_call = 0
 
    def wait(self):
        with self.lock:
            elapsed = time.time() - self.last_call
            wait_time = self.interval - elapsed
            if wait_time > 0:
                time.sleep(wait_time)
            self.last_call = time.time()
 
limiter = RateLimiter(max_per_second=5)
 
def rate_limited_fetch(url):
    limiter.wait()
    print(f"Fetching {url} at {time.time():.2f}")
    time.sleep(0.5)  # Simulate request
    return f"Data from {url}"
 
urls = [f"https://api.example.com/item/{i}" for i in range(20)]
 
with ThreadPoolExecutor(max_workers=10) as executor:
    results = list(executor.map(rate_limited_fetch, urls))

Pièges de la Sécurité Thread et Comment les Éviter

Conditions de Course

Une condition de course survient quand le résultat dépend du timing de l'exécution des threads :

import threading
 
# BAD: Race condition
counter = 0
 
def increment_unsafe():
    global counter
    for _ in range(100_000):
        counter += 1  # Read, increment, write: NOT atomic
 
threads = [threading.Thread(target=increment_unsafe) for _ in range(5)]
for t in threads: t.start()
for t in threads: t.join()
print(f"Expected: 500000, Got: {counter}")  # Often less than 500000
 
# GOOD: Protected with lock
counter = 0
lock = threading.Lock()
 
def increment_safe():
    global counter
    for _ in range(100_000):
        with lock:
            counter += 1
 
threads = [threading.Thread(target=increment_safe) for _ in range(5)]
for t in threads: t.start()
for t in threads: t.join()
print(f"Expected: 500000, Got: {counter}")  # Always 500000

Deadlocks

Un deadlock survient quand deux threads détiennent chacun un lock dont l'autre a besoin :

import threading
 
lock_a = threading.Lock()
lock_b = threading.Lock()
 
def thread_1():
    with lock_a:
        print("Thread 1: acquired lock_a")
        with lock_b:  # Waits forever if thread_2 holds lock_b
            print("Thread 1: acquired lock_b")
 
def thread_2():
    with lock_b:
        print("Thread 2: acquired lock_b")
        with lock_a:  # Waits forever if thread_1 holds lock_a
            print("Thread 2: acquired lock_a")
 
# This WILL deadlock
# t1 = threading.Thread(target=thread_1)
# t2 = threading.Thread(target=thread_2)
# t1.start(); t2.start()

Comment prévenir les deadlocks :

  1. Acquérez toujours les locks dans le même ordre :
def thread_1_fixed():
    with lock_a:    # Always lock_a first
        with lock_b:
            print("Thread 1: acquired both locks")
 
def thread_2_fixed():
    with lock_a:    # Always lock_a first (same order)
        with lock_b:
            print("Thread 2: acquired both locks")
  1. Utilisez des timeouts :
def safe_acquire():
    acquired_a = lock_a.acquire(timeout=2)
    if not acquired_a:
        print("Could not acquire lock_a, backing off")
        return
    try:
        acquired_b = lock_b.acquire(timeout=2)
        if not acquired_b:
            print("Could not acquire lock_b, releasing lock_a")
            return
        try:
            print("Acquired both locks safely")
        finally:
            lock_b.release()
    finally:
        lock_a.release()
  1. Minimisez la portée des locks : Détenez les locks pendant le temps le plus court possible.

Checklist de Sécurité Thread

  • Protégez tout état mutable partagé avec des locks
  • Utilisez queue.Queue au lieu de listes ou dicts partagés quand possible
  • Évitez l'état mutable global ; passez les données par arguments de fonction
  • Utilisez ThreadPoolExecutor au lieu de la gestion manuelle des threads
  • Ne supposez jamais l'ordre d'exécution entre threads
  • Testez avec threading.active_count() et la journalisation pour détecter les fuites de threads

Exemples du Monde Réel

Web Scraping Concurrent

from concurrent.futures import ThreadPoolExecutor, as_completed
import urllib.request
import time
 
def fetch_page(url):
    """Fetch a web page and return its content length"""
    try:
        with urllib.request.urlopen(url, timeout=10) as response:
            content = response.read()
            return url, len(content), None
    except Exception as e:
        return url, 0, str(e)
 
urls = [
    "https://python.org",
    "https://docs.python.org",
    "https://pypi.org",
    "https://realpython.com",
    "https://github.com",
    "https://stackoverflow.com",
    "https://news.ycombinator.com",
    "https://httpbin.org",
]
 
# Sequential
start = time.time()
for url in urls:
    fetch_page(url)
sequential_time = time.time() - start
 
# Concurrent with threads
start = time.time()
with ThreadPoolExecutor(max_workers=8) as executor:
    futures = {executor.submit(fetch_page, url): url for url in urls}
    for future in as_completed(futures):
        url, size, error = future.result()
        if error:
            print(f"  FAIL {url}: {error}")
        else:
            print(f"  OK   {url}: {size:,} bytes")
threaded_time = time.time() - start
 
print(f"\nSequential: {sequential_time:.2f}s")
print(f"Threaded:   {threaded_time:.2f}s")
print(f"Speedup:    {sequential_time / threaded_time:.1f}x")

E/S Fichiers Parallèles

from concurrent.futures import ThreadPoolExecutor
import os
import hashlib
 
def process_file(filepath):
    """Read file and compute its SHA-256 hash"""
    with open(filepath, 'rb') as f:
        content = f.read()
    file_hash = hashlib.sha256(content).hexdigest()
    size = os.path.getsize(filepath)
    return filepath, file_hash, size
 
def hash_all_files(directory, pattern="*.py"):
    """Hash all matching files in a directory using threads"""
    import glob
    files = glob.glob(os.path.join(directory, "**", pattern), recursive=True)
 
    results = {}
    with ThreadPoolExecutor(max_workers=8) as executor:
        futures = {executor.submit(process_file, f): f for f in files}
        for future in futures:
            try:
                path, hash_val, size = future.result()
                results[path] = {"hash": hash_val, "size": size}
            except Exception as e:
                print(f"Error processing {futures[future]}: {e}")
 
    return results
 
# Usage
# file_hashes = hash_all_files("/path/to/project")

Appels API Concurrents avec Logique de Retry

from concurrent.futures import ThreadPoolExecutor, as_completed
import urllib.request
import json
import time
 
def fetch_api(endpoint, max_retries=3, backoff=1.0):
    """Fetch API endpoint with exponential backoff retry"""
    for attempt in range(max_retries):
        try:
            url = f"https://jsonplaceholder.typicode.com{endpoint}"
            req = urllib.request.Request(url)
            with urllib.request.urlopen(req, timeout=10) as response:
                data = json.loads(response.read())
                return {"endpoint": endpoint, "data": data, "error": None}
        except Exception as e:
            if attempt < max_retries - 1:
                wait = backoff * (2 ** attempt)
                time.sleep(wait)
            else:
                return {"endpoint": endpoint, "data": None, "error": str(e)}
 
endpoints = [f"/posts/{i}" for i in range(1, 21)]
 
start = time.time()
with ThreadPoolExecutor(max_workers=10) as executor:
    futures = [executor.submit(fetch_api, ep) for ep in endpoints]
    results = [f.result() for f in futures]
 
elapsed = time.time() - start
success = sum(1 for r in results if r["error"] is None)
print(f"Fetched {success}/{len(endpoints)} endpoints in {elapsed:.2f}s")

Tâches d'Arrière-Plan Périodiques

import threading
import time
 
class PeriodicTask:
    """Run a function at fixed intervals in a background thread"""
    def __init__(self, interval, func, *args, **kwargs):
        self.interval = interval
        self.func = func
        self.args = args
        self.kwargs = kwargs
        self._stop_event = threading.Event()
        self._thread = None
 
    def start(self):
        self._thread = threading.Thread(target=self._run, daemon=True)
        self._thread.start()
 
    def _run(self):
        while not self._stop_event.is_set():
            self.func(*self.args, **self.kwargs)
            self._stop_event.wait(self.interval)
 
    def stop(self):
        self._stop_event.set()
        if self._thread:
            self._thread.join()
 
# Usage
def check_health():
    print(f"Health check at {time.strftime('%H:%M:%S')}")
 
task = PeriodicTask(2.0, check_health)
task.start()
time.sleep(7)
task.stop()
print("Stopped")

Performance : Threading vs Multiprocessing vs Asyncio

Le bon outil de concurrence dépend de la charge de travail. Voici une comparaison du temps d'exécution réel pour des tâches communes :

TâcheSéquentielThreading (4)Multiprocessing (4)Asyncio
100 requêtes HTTP (200ms chacune)20.0s5.1s5.8s4.9s
100 lectures de fichiers (10ms chacune)1.0s0.28s0.35s0.26s
100 tâches CPU (100ms chacune)10.0s10.2s2.7s10.0s
50 requêtes DB (50ms chacune)2.5s0.68s0.85s0.62s
E/S mixte + CPU15.0s8.2s4.1s9.5s

Points clés :

  • Le threading offre une accélération 3-5x sur les charges de travail I/O-bound avec des changements de code minimaux
  • Le multiprocessing est la seule option pour un véritable parallélisme CPU mais ajoute une surcharge de processus
  • Asyncio devance légèrement le threading sur les E/S haute concurrence mais nécessité de réécrire le code avec async/await
  • Pour les charges mixtes, envisagez de combiner le threading pour les E/S et le multiprocessing pour les tâches CPU
import time
import threading
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
 
def io_task():
    time.sleep(0.2)
 
def cpu_task(n=2_000_000):
    return sum(i * i for i in range(n))
 
# Benchmark threading vs multiprocessing
NUM_TASKS = 20
 
# Threading - I/O bound
start = time.time()
with ThreadPoolExecutor(max_workers=4) as pool:
    list(pool.map(lambda _: io_task(), range(NUM_TASKS)))
print(f"Threading (I/O): {time.time() - start:.2f}s")
 
# Threading - CPU bound
start = time.time()
with ThreadPoolExecutor(max_workers=4) as pool:
    list(pool.map(lambda _: cpu_task(), range(NUM_TASKS)))
print(f"Threading (CPU): {time.time() - start:.2f}s")

Expérimenter avec le Threading dans RunCell

Déboguer et profiler du code threadé peut être difficile. Quand vous avez besoin de tester la synchronisation des threads, visualiser les chevauchements temporels, ou diagnostiquer des conditions de course de manière interactive, RunCell (www.runcell.dev (opens in a new tab)) fournit un environnement Jupyter propulsé par IA conçu pour ce workflow.

L'agent IA de RunCell peut analyser votre code de threading, identifier les deadlocks potentiels avant qu'ils ne surviennent, suggérer des nombres de workers optimaux basés sur votre charge de travail, et vous aider à comprendre pourquoi les threads se comportent de manière inattendue. Quand un pool de threads produit des résultats incorrects de manière intermittente, RunCell trace la timeline d'exécution pour identifier le moment exact où l'état partagé est corrompu.

Si vous voulez visualiser les caractéristiques de performance de différentes configurations de threading, PyGWalker (github.com/Kanaries/pygwalker) peut transformer vos DataFrames de benchmark en graphiques interactifs. Exécutez des benchmarks de threading, collectez les données de timing dans un DataFrame pandas, et explorez les résultats avec des visualisations drag-and-drop pour trouver le nombre de threads optimal pour votre charge de travail.

FAQ

Quelle est la différence entre threading et multiprocessing en Python ?

Le threading exécute plusieurs threads au sein d'un seul processus, partageant la mémoire. Le Global Interpreter Lock (GIL) empêche les threads d'exécuter du bytecode Python en parallèle, rendant le threading efficace uniquement pour les tâches I/O-bound comme les requêtes réseau et les opérations sur fichiers. Le multiprocessing crée des processus séparés, chacun avec son propre interpréteur Python et espace mémoire, permettant une exécution parallèle réelle pour les tâches CPU-bound. Le threading a une surcharge plus faible (démarrage plus rapide, moins de mémoire), tandis que le multiprocessing contourne le GIL pour un véritable parallélisme.

Le threading Python est-il vraiment parallèle ?

Non, le threading Python est concurrent mais pas parallèle pour du code CPU-bound à cause du GIL. Un seul thread exécute du bytecode Python à la fois. Cependant, le GIL se libère pendant les opérations I/O (réseau, disque, base de données), donc plusieurs threads s'exécutent effectivement en parallèle quand ils attendent les E/S. Pour du parallélisme CPU-bound, utilisez le module multiprocessing ou des extensions C qui libèrent le GIL (comme NumPy).

Combien de threads dois-je utiliser en Python ?

Pour les tâches I/O-bound, commencez avec 5-20 threads selon les limites de débit du service externe et votre bande passante réseau. Trop de threads vers un seul serveur peuvent causer des refus de connexion ou du throttling. Pour les charges mixtes, expérimentez avec des nombres de threads entre le nombre de cœurs CPU et 4x le nombre de cœurs. Utilisez ThreadPoolExecutor et faites des benchmarks avec différentes valeurs de max_workers pour trouver le nombre optimal pour votre charge de travail spécifique. La valeur par défaut pour ThreadPoolExecutor est min(32, os.cpu_count() + 4).

Comment retourner une valeur depuis un thread Python ?

Les threads ne retournent pas de valeurs directement depuis leur fonction cible. Les trois approches principales sont : (1) Utilisez ThreadPoolExecutor.submit() qui retourne un objet Future où vous appelez future.result() pour obtenir la valeur de retour. (2) Passez un conteneur mutable (comme un dictionnaire ou une liste) en argument et faites écrire les résultats dedans par le thread, protégé par un Lock. (3) Utilisez queue.Queue où le thread met les résultats dans la queue et le thread principal lit depuis celle-ci. ThreadPoolExecutor est l'approche la plus propre pour la plupart des cas d'usage.

Que se passe-t-il si un thread Python lève une exception ?

Dans un threading.Thread brut, une exception non gérée termine ce thread silencieusement et l'exception est perdue. Le thread principal et les autres threads continuent de s'exécuter sans aucune notification. Avec ThreadPoolExecutor, les exceptions sont capturées et relancées quand vous appelez future.result(), rendant la gestion des erreurs bien plus fiable. Utilisez toujours des blocs try/except à l'intérieur des fonctions cibles des threads ou utilisez ThreadPoolExecutor pour vous assurer que les exceptions sont correctement capturées et gérées.

Conclusion

Le threading Python est un outil puissant pour accélérer les programmes I/O-bound. En exécutant les requêtes réseau, opérations sur fichiers et requêtes de base de données simultanément, vous pouvez transformer un script séquentiel de 20 secondes en un script qui se termine en 5 secondes avec des changements de code minimaux.

Les points clés à retenir :

  • Utilisez le threading pour le travail I/O-bound. Le GIL empêche le parallélisme CPU, mais les threads chevauchent efficacement le temps d'attente des E/S.
  • Utilisez ThreadPoolExecutor pour la plupart des besoins de threading. Il gère les threads, collecte les résultats et propage les exceptions proprement.
  • Protégez l'état partagé avec des locks. Les conditions de course sont le bug de threading le plus courant, et queue.Queue élimine la plupart des préoccupations de verrouillage.
  • Évitez les deadlocks en acquérant les locks dans un ordre cohérent et en utilisant des timeouts.
  • Choisissez le bon outil : threading pour les E/S, multiprocessing pour le CPU, asyncio pour des milliers de connexions simultanées.

Commencez avec ThreadPoolExecutor et un simple appel executor.map(). Mesurez l'accélération. Ajoutez la synchronisation seulement là où l'état mutable partagé l'exige. Le threading ne nécessite pas une réécriture complète de votre code. Quelques lignes de concurrent.futures peuvent apporter des améliorations de performance dramatiques pour tout programme qui passe du temps à attendre.

📚