Python Multiprocessing : Guide du Traitement Parallèle pour Plus de Vitesse
Updated on
Le modèle d'exécution mono-thread de Python atteint ses limites lors du traitement de grands ensembles de données ou de calculs intensifs en CPU. Un script qui prend 10 minutes pour traiter des données pourrait théoriquement s'exécuter en 2 minutes sur une machine 5 cœurs, mais le Global Interpreter Lock (GIL) de Python empêche les threads standards d'atteindre un véritable parallélisme. Le résultat est des cœurs CPU gaspillés et des développeurs frustrés regardant leurs processeurs multi-cœurs rester inactifs pendant que Python traite les tâches une par une.
Ce goulot d'étranglement coûte du temps et de l'argent réels. Les data scientists attendent des heures pour un entraînement de modèle qui pourrait se terminer en minutes. Les web scrapers explorent à une fraction de leur vitesse potentielle. Les pipelines de traitement d'images qui devraient exploiter tous les cœurs disponibles avancent péniblement en utilisant un seul cœur.
Le module multiprocessing résout cela en créant des processus Python séparés, chacun avec son propre interpréteur et espace mémoire. Contrairement aux threads, les processus contournent entièrement le GIL, permettant une véritable exécution parallèle sur les cœurs CPU. Ce guide vous montre comment exploiter le multiprocessing pour des améliorations de performances spectaculaires, de l'exécution parallèle de base aux modèles avancés comme les pools de processus et la mémoire partagée.
Comprendre le Problème du GIL
Le Global Interpreter Lock (GIL) est un mutex qui protège l'accès aux objets Python, empêchant plusieurs threads d'exécuter du bytecode Python simultanément. Même sur une machine 16 cœurs, les threads Python s'exécutent un à la fois pour les tâches liées au CPU.
import threading
import time
def cpu_bound_task(n):
count = 0
for i in range(n):
count += i * i
return count
# Threading ne parallélise PAS le travail lié au CPU
start = time.time()
threads = [threading.Thread(target=cpu_bound_task, args=(10_000_000,)) for _ in range(4)]
for t in threads: t.start()
for t in threads: t.join()
print(f"Threading: {time.time() - start:.2f}s") # ~même temps que mono-threadLe GIL ne se libère que pendant les opérations d'I/O (lectures de fichiers, requêtes réseau), rendant le threading utile pour les tâches liées à l'I/O mais inefficace pour le travail lié au CPU. Le multiprocessing contourne le GIL en exécutant des interpréteurs Python séparés en parallèle.
Multiprocessing de Base avec Process
La classe Process crée un nouveau processus Python qui s'exécute indépendamment. Chaque processus a son propre espace mémoire et interpréteur Python.
from multiprocessing import Process
import os
def worker(name):
print(f"Worker {name} s'exécute dans le processus {os.getpid()}")
result = sum(i*i for i in range(5_000_000))
print(f"Worker {name} terminé: {result}")
if __name__ == '__main__':
processes = []
# Créer 4 processus
for i in range(4):
p = Process(target=worker, args=(f"#{i}",))
processes.append(p)
p.start()
# Attendre que tous se terminent
for p in processes:
p.join()
print("Tous les processus terminés")Exigence critique: Utilisez toujours la garde if __name__ == '__main__' sur Windows et macOS. Sans elle, les processus enfants généreront récursivement plus de processus, causant une bombe fork.
Process Pool : Exécution Parallèle Simplifiée
Pool gère un pool de processus workers, distribuant les tâches automatiquement. C'est le modèle de multiprocessing le plus courant.
from multiprocessing import Pool
import time
def process_item(x):
"""Simule un travail intensif en CPU"""
time.sleep(0.1)
return x * x
if __name__ == '__main__':
data = range(100)
# Traitement séquentiel
start = time.time()
results_seq = [process_item(x) for x in data]
seq_time = time.time() - start
# Traitement parallèle avec 4 workers
start = time.time()
with Pool(processes=4) as pool:
results_par = pool.map(process_item, data)
par_time = time.time() - start
print(f"Séquentiel: {seq_time:.2f}s")
print(f"Parallèle (4 cœurs): {par_time:.2f}s")
print(f"Accélération: {seq_time/par_time:.2f}x")Comparaison des Méthodes de Pool
Différentes méthodes de Pool conviennent à différents cas d'usage:
| Méthode | Cas d'Usage | Bloque | Retourne | Plusieurs Args |
|---|---|---|---|---|
map() | Parallélisation simple | Oui | Liste ordonnée | Non (arg unique) |
map_async() | Map non bloquant | Non | AsyncResult | Non |
starmap() | Plusieurs arguments | Oui | Liste ordonnée | Oui (déballage de tuple) |
starmap_async() | Starmap non bloquant | Non | AsyncResult | Oui |
apply() | Appel de fonction unique | Oui | Résultat unique | Oui |
apply_async() | Apply non bloquant | Non | AsyncResult | Oui |
imap() | Itérateur lazy | Oui | Itérateur | Non |
imap_unordered() | Lazy, non ordonné | Oui | Itérateur | Non |
from multiprocessing import Pool
def add(x, y):
return x + y
def power(x, exp):
return x ** exp
if __name__ == '__main__':
with Pool(4) as pool:
# map: argument unique
squares = pool.map(lambda x: x**2, [1, 2, 3, 4])
# starmap: plusieurs arguments (déballe les tuples)
results = pool.starmap(add, [(1, 2), (3, 4), (5, 6)])
# apply_async: appel unique non bloquant
async_result = pool.apply_async(power, (2, 10))
result = async_result.get() # bloque jusqu'à ce qu'il soit prêt
# imap: évaluation lazy pour grands ensembles de données
for result in pool.imap(lambda x: x**2, range(1000)):
pass # traite un à la fois au fur et à mesure que les résultats arriventCommunication Inter-Processus
Les processus ne partagent pas de mémoire par défaut. Utilisez Queue ou Pipe pour la communication.
Queue: Passage de Messages Thread-Safe
from multiprocessing import Process, Queue
def producer(queue, items):
for item in items:
queue.put(item)
print(f"Produit: {item}")
queue.put(None) # valeur sentinelle
def consumer(queue):
while True:
item = queue.get()
if item is None:
break
print(f"Consommé: {item}")
# Traiter l'élément...
if __name__ == '__main__':
q = Queue()
items = [1, 2, 3, 4, 5]
prod = Process(target=producer, args=(q, items))
cons = Process(target=consumer, args=(q,))
prod.start()
cons.start()
prod.join()
cons.join()Pipe: Communication Bidirectionnelle
from multiprocessing import Process, Pipe
def worker(conn):
conn.send("Bonjour du worker")
msg = conn.recv()
print(f"Worker a reçu: {msg}")
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=worker, args=(child_conn,))
p.start()
msg = parent_conn.recv()
print(f"Parent a reçu: {msg}")
parent_conn.send("Bonjour du parent")
p.join()Mémoire Partagée et État
Bien que les processus aient une mémoire séparée, multiprocessing fournit des primitives de mémoire partagée.
Value et Array: Primitives Partagées
from multiprocessing import Process, Value, Array
import time
def increment_counter(counter, lock):
for _ in range(100_000):
with lock:
counter.value += 1
def fill_array(arr, start, end):
for i in range(start, end):
arr[i] = i * i
if __name__ == '__main__':
# Valeur partagée avec lock
counter = Value('i', 0)
lock = counter.get_lock()
processes = [Process(target=increment_counter, args=(counter, lock)) for _ in range(4)]
for p in processes: p.start()
for p in processes: p.join()
print(f"Counter: {counter.value}") # Devrait être 400,000
# Tableau partagé
shared_arr = Array('i', 1000)
p1 = Process(target=fill_array, args=(shared_arr, 0, 500))
p2 = Process(target=fill_array, args=(shared_arr, 500, 1000))
p1.start(); p2.start()
p1.join(); p2.join()
print(f"Array[100]: {shared_arr[100]}") # 10,000Manager: Objets Partagés Complexes
from multiprocessing import Process, Manager
def update_dict(shared_dict, key, value):
shared_dict[key] = value
if __name__ == '__main__':
with Manager() as manager:
# Dict, list, namespace partagés
shared_dict = manager.dict()
shared_list = manager.list()
processes = [
Process(target=update_dict, args=(shared_dict, f"key{i}", i*10))
for i in range(5)
]
for p in processes: p.start()
for p in processes: p.join()
print(dict(shared_dict)) # {'key0': 0, 'key1': 10, ...}Comparaison: Multiprocessing vs Threading vs Asyncio
| Fonctionnalité | Multiprocessing | Threading | Asyncio | concurrent.futures |
|---|---|---|---|---|
| Contournement GIL | Oui | Non | Non | Dépend de l'executor |
| Tâches liées CPU | Excellent | Faible | Faible | Excellent (ProcessPoolExecutor) |
| Tâches liées I/O | Bon | Excellent | Excellent | Excellent (ThreadPoolExecutor) |
| Overhead mémoire | Élevé (processus séparés) | Faible (mémoire partagée) | Faible | Varie |
| Coût de démarrage | Élevé | Faible | Très faible | Varie |
| Communication | Queue, Pipe, mémoire partagée | Direct (état partagé) | Natif async/await | Futures |
| Meilleur pour | Tâches parallèles intensives CPU | Tâches liées I/O, concurrence simple | Async I/O, nombreuses tâches concurrentes | API unifiée pour les deux |
# Utiliser multiprocessing pour lié au CPU
from multiprocessing import Pool
def cpu_bound(n):
return sum(i*i for i in range(n))
with Pool(4) as pool:
results = pool.map(cpu_bound, [10_000_000] * 4)
# Utiliser threading pour lié à l'I/O
import threading
import requests
def fetch_url(url):
return requests.get(url).text
threads = [threading.Thread(target=fetch_url, args=(url,)) for url in urls]
for t in threads: t.start()
for t in threads: t.join()
# Utiliser asyncio pour I/O asynchrone
import asyncio
import aiohttp
async def fetch_async(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()
asyncio.run(asyncio.gather(*[fetch_async(url) for url in urls]))Avancé: ProcessPoolExecutor
concurrent.futures.ProcessPoolExecutor fournit une interface de niveau supérieur avec la même API que ThreadPoolExecutor.
from concurrent.futures import ProcessPoolExecutor, as_completed
import time
def process_task(x):
time.sleep(0.1)
return x * x
if __name__ == '__main__':
# Le context manager assure le nettoyage
with ProcessPoolExecutor(max_workers=4) as executor:
# Soumettre des tâches individuelles
futures = [executor.submit(process_task, i) for i in range(20)]
# Traiter au fur et à mesure qu'elles se terminent
for future in as_completed(futures):
result = future.result()
print(f"Résultat: {result}")
# Ou utiliser map (comme Pool.map)
results = executor.map(process_task, range(20))
print(list(results))Avantages par rapport à Pool:
- Même API pour
ThreadPoolExecutoretProcessPoolExecutor - Interface Futures pour plus de contrôle
- Meilleure gestion des erreurs
- Plus facile de mélanger code sync et async
Modèles Courants
Tâches Embarrassingly Parallel
Les tâches sans dépendances sont idéales pour le multiprocessing:
from multiprocessing import Pool
import pandas as pd
def process_chunk(chunk):
"""Traite un morceau de données indépendamment"""
chunk['new_col'] = chunk['value'] * 2
return chunk.groupby('category').sum()
if __name__ == '__main__':
df = pd.DataFrame({'category': ['A', 'B'] * 5000, 'value': range(10000)})
# Diviser en morceaux
chunks = [df.iloc[i:i+2500] for i in range(0, len(df), 2500)]
with Pool(4) as pool:
results = pool.map(process_chunk, chunks)
# Combiner les résultats
final = pd.concat(results).groupby('category').sum()Modèle Map-Reduce
from multiprocessing import Pool
from functools import reduce
def mapper(text):
"""Map: extraire les mots et compter"""
words = text.lower().split()
return {word: 1 for word in words}
def reducer(dict1, dict2):
"""Reduce: fusionner les comptages de mots"""
for word, count in dict2.items():
dict1[word] = dict1.get(word, 0) + count
return dict1
if __name__ == '__main__':
documents = ["bonjour monde", "monde de python", "bonjour python"] * 1000
with Pool(4) as pool:
# Phase Map: parallèle
word_dicts = pool.map(mapper, documents)
# Phase Reduce: séquentielle (ou utiliser réduction en arbre)
word_counts = reduce(reducer, word_dicts)
print(word_counts)Producer-Consumer avec Plusieurs Producers
from multiprocessing import Process, Queue, cpu_count
def producer(queue, producer_id, items):
for item in items:
queue.put((producer_id, item))
print(f"Producer {producer_id} terminé")
def consumer(queue, num_producers):
finished_producers = 0
while finished_producers < num_producers:
if not queue.empty():
item = queue.get()
if item is None:
finished_producers += 1
else:
producer_id, data = item
print(f"Consommé du producer {producer_id}: {data}")
if __name__ == '__main__':
q = Queue()
num_producers = 3
# Démarrer les producers
producers = [
Process(target=producer, args=(q, i, range(i*10, (i+1)*10)))
for i in range(num_producers)
]
for p in producers: p.start()
# Démarrer le consumer
cons = Process(target=consumer, args=(q, num_producers))
cons.start()
# Nettoyage
for p in producers: p.join()
for _ in range(num_producers):
q.put(None) # Signaler au consumer
cons.join()Considérations de Performance
Quand le Multiprocessing Aide
- Tâches liées au CPU: Traitement de données, calculs mathématiques, traitement d'images
- Grands ensembles de données: Quand le temps de traitement par élément justifie l'overhead du processus
- Tâches indépendantes: Pas d'état partagé ou communication minimale
Quand le Multiprocessing Nuit
L'overhead de création de processus peut dépasser les bénéfices pour:
from multiprocessing import Pool
import time
def tiny_task(x):
return x + 1
if __name__ == '__main__':
data = range(100)
# Séquentiel est plus rapide pour les petites tâches
start = time.time()
results = [tiny_task(x) for x in data]
print(f"Séquentiel: {time.time() - start:.4f}s") # ~0.0001s
start = time.time()
with Pool(4) as pool:
results = pool.map(tiny_task, data)
print(f"Parallèle: {time.time() - start:.4f}s") # ~0.05s (500x plus lent!)Règles générales:
- Durée minimale de tâche: ~0,1 seconde par élément
- Taille des données: Si la sérialisation prend plus de temps que le traitement, utiliser la mémoire partagée
- Nombre de workers: Commencer avec
cpu_count(), ajuster selon les caractéristiques de la tâche
Exigences de Pickling
Seuls les objets sérialisables peuvent être passés entre processus:
from multiprocessing import Pool
# ❌ Les fonctions lambda ne sont pas sérialisables
# pool.map(lambda x: x*2, range(10)) # Échoue
# ✅ Utiliser des fonctions nommées
def double(x):
return x * 2
with Pool(4) as pool:
pool.map(double, range(10))
# ❌ Les fonctions locales dans les notebooks échouent
# def process():
# def inner(x): return x*2
# pool.map(inner, range(10)) # Échoue
# ✅ Définir au niveau module ou utiliser functools.partial
from functools import partial
def multiply(x, factor):
return x * factor
with Pool(4) as pool:
pool.map(partial(multiply, factor=3), range(10))Déboguer du Code Parallèle avec RunCell
Le débogage de code multiprocessing est notoirement difficile. Les instructions print disparaissent, les breakpoints ne fonctionnent pas, et les stack traces sont cryptiques. Quand les processus plantent silencieusement ou produisent des résultats incorrects, les outils de débogage traditionnels échouent.
RunCell (www.runcell.dev (opens in a new tab)) est un AI Agent conçu pour Jupyter qui excelle dans le débogage de code parallèle. Contrairement aux débogueurs standards qui ne peuvent pas suivre l'exécution à travers les processus, RunCell analyse vos modèles de multiprocessing, identifie les conditions de course, détecte les erreurs de pickling avant l'exécution, et explique pourquoi les processus se bloquent.
Quand un worker de Pool plante sans traceback, RunCell peut inspecter la file d'erreurs et vous montrer exactement quel appel de fonction a échoué et pourquoi. Quand l'état partagé produit de mauvais résultats, RunCell trace les modèles d'accès mémoire pour trouver le lock manquant. Pour les data scientists déboguant des pipelines de données parallèles complexes, RunCell transforme des heures de débogage par print en minutes de corrections guidées par IA.
Meilleures Pratiques
1. Utilisez Toujours la Garde if name
# ✅ Correct
if __name__ == '__main__':
with Pool(4) as pool:
pool.map(func, data)
# ❌ Incorrect - cause une bombe fork sur Windows
with Pool(4) as pool:
pool.map(func, data)2. Fermez les Pools Explicitement
# ✅ Context manager (recommandé)
with Pool(4) as pool:
results = pool.map(func, data)
# ✅ Fermeture et join explicites
pool = Pool(4)
results = pool.map(func, data)
pool.close()
pool.join()
# ❌ Fuite de ressources
pool = Pool(4)
results = pool.map(func, data)3. Gérez les Exceptions
from multiprocessing import Pool
def risky_task(x):
if x == 5:
raise ValueError("Mauvaise valeur")
return x * 2
if __name__ == '__main__':
with Pool(4) as pool:
try:
results = pool.map(risky_task, range(10))
except ValueError as e:
print(f"Tâche échouée: {e}")
# Ou gérer individuellement avec apply_async
async_results = [pool.apply_async(risky_task, (i,)) for i in range(10)]
for i, ar in enumerate(async_results):
try:
result = ar.get()
print(f"Résultat {i}: {result}")
except ValueError:
print(f"Tâche {i} échouée")4. Évitez l'État Partagé Quand Possible
# ❌ L'état partagé nécessite la synchronisation
from multiprocessing import Process, Value
counter = Value('i', 0)
def increment():
for _ in range(100000):
counter.value += 1 # Condition de course!
# ✅ Utiliser des locks ou éviter le partage
from multiprocessing import Lock
lock = Lock()
def increment_safe():
for _ in range(100000):
with lock:
counter.value += 1
# ✅ Encore mieux: éviter l'état partagé
def count_locally(n):
return n # Retourner le résultat à la place
with Pool(4) as pool:
results = pool.map(count_locally, [100000] * 4)
total = sum(results)5. Choisissez le Bon Nombre de Workers
from multiprocessing import cpu_count, Pool
# Lié au CPU: utiliser tous les cœurs
num_workers = cpu_count()
# Lié à l'I/O: peut utiliser plus de workers
num_workers = cpu_count() * 2
# Charge de travail mixte: ajuster empiriquement
with Pool(processes=num_workers) as pool:
results = pool.map(func, data)Erreurs Courantes
1. Oublier la Garde if name
Conduit à une génération infinie de processus sur Windows/macOS.
2. Essayer de Sérialiser des Objets Non Sérialisables
# ❌ Méthodes de classe, lambdas, fonctions locales échouent
class DataProcessor:
def process(self, x):
return x * 2
dp = DataProcessor()
# pool.map(dp.process, data) # Échoue
# ✅ Utiliser des fonctions de niveau supérieur
def process(x):
return x * 2
with Pool(4) as pool:
pool.map(process, data)3. Ne Pas Gérer la Terminaison des Processus
# ❌ Ne nettoie pas correctement
pool = Pool(4)
results = pool.map(func, data)
# pool toujours en cours d'exécution
# ✅ Toujours fermer et join
pool = Pool(4)
try:
results = pool.map(func, data)
finally:
pool.close()
pool.join()4. Transfert Excessif de Données
# ❌ Sérialiser d'énormes objets est lent
large_data = [np.random.rand(1000, 1000) for _ in range(100)]
with Pool(4) as pool:
pool.map(process_array, large_data) # Sérialisation lente
# ✅ Utiliser la mémoire partagée ou les fichiers mappés en mémoire
import numpy as np
from multiprocessing import shared_memory
# Créer de la mémoire partagée
shm = shared_memory.SharedMemory(create=True, size=1000*1000*8)
arr = np.ndarray((1000, 1000), dtype=np.float64, buffer=shm.buf)
# Passer seulement le nom et la forme
def process_shared(name, shape):
existing_shm = shared_memory.SharedMemory(name=name)
arr = np.ndarray(shape, dtype=np.float64, buffer=existing_shm.buf)
# Traiter arr...
existing_shm.close()
with Pool(4) as pool:
pool.starmap(process_shared, [(shm.name, (1000, 1000))] * 4)
shm.close()
shm.unlink()FAQ
Comment le multiprocessing contourne-t-il le GIL?
Le GIL (Global Interpreter Lock) est un mutex dans chaque interpréteur Python qui empêche plusieurs threads d'exécuter du bytecode Python simultanément. Le multiprocessing contourne cela en créant des processus Python séparés, chacun avec son propre interpréteur et GIL. Puisque les processus ne partagent pas de mémoire, ils s'exécutent véritablement en parallèle sur les cœurs CPU sans contention du GIL.
Quand dois-je utiliser multiprocessing vs threading?
Utilisez multiprocessing pour les tâches liées au CPU (traitement de données, calculs, manipulation d'images) où le GIL limite les performances. Utilisez threading pour les tâches liées à l'I/O (requêtes réseau, opérations sur fichiers) où le GIL se libère pendant l'I/O, permettant aux threads de travailler concurremment. Threading a un overhead plus faible mais ne peut pas paralléliser le travail CPU à cause du GIL.
Pourquoi ai-je besoin de la garde if name == 'main'?
Sur Windows et macOS, les processus enfants importent le module principal pour accéder aux fonctions. Sans la garde, importer le module exécute à nouveau le code de création de Pool, générant des processus infinis (bombe fork). Linux utilise fork() qui ne nécessite pas d'imports, mais la garde reste une bonne pratique pour du code multiplateforme.
Combien de processus workers dois-je utiliser?
Pour les tâches liées au CPU, commencez avec cpu_count() (nombre de cœurs CPU). Plus de workers que de cœurs cause un overhead de changement de contexte. Pour les tâches liées à l'I/O, vous pouvez utiliser plus de workers (2-4x cœurs) car les processus attendent sur l'I/O. Toujours faire un benchmark avec votre charge de travail spécifique, car l'overhead mémoire et de transfert de données peut limiter le nombre optimal de workers.
Quels objets puis-je passer aux fonctions de multiprocessing?
Les objets doivent être sérialisables (sérialisables avec pickle). Cela inclut les types intégrés (int, str, list, dict), les tableaux NumPy, les DataFrames pandas, et la plupart des classes définies par l'utilisateur. Les lambdas, fonctions locales, méthodes de classe, handles de fichiers, connexions de base de données et locks de threads ne peuvent pas être sérialisés. Définissez les fonctions au niveau module ou utilisez functools.partial pour l'application partielle.
Conclusion
Python multiprocessing transforme les goulots d'étranglement liés au CPU en opérations parallèles qui évoluent avec les cœurs disponibles. En contournant le GIL via des processus séparés, vous atteignez un véritable parallélisme impossible avec threading. L'interface Pool simplifie les modèles courants, tandis que Queue, Pipe et mémoire partagée permettent des workflows inter-processus complexes.
Commencez avec Pool.map() pour les tâches embarrassingly parallel, mesurez l'accélération et optimisez à partir de là. Rappelez-vous la garde if __name__ == '__main__', gardez les tâches à grain grossier pour amortir l'overhead du processus, et minimisez le transfert de données entre processus. Quand le débogage devient complexe, des outils comme RunCell peuvent aider à tracer l'exécution à travers les frontières de processus.
Le multiprocessing n'est pas toujours la réponse. Pour le travail lié à l'I/O, threading ou asyncio peuvent être plus simples et plus rapides. Mais quand vous traitez de grands ensembles de données, entraînez des modèles ou effectuez de lourds calculs, multiprocessing offre la performance pour laquelle votre machine multi-cœur a été construite.