Python asyncio : Guide complet de la programmation asynchrone
Updated on
Votre application Python effectue 100 appels API, chacun prenant 2 secondes. Avec du code séquentiel traditionnel, cela représente 200 secondes d'attente. Vos utilisateurs fixent des écrans de chargement. Vos serveurs restent inactifs, consommant des ressources en attendant des réponses. Ce comportement bloquant est le problème qui détruit les performances des applications et l'expérience utilisateur.
La douleur s'intensifie lorsque vous montez en charge. Les requêtes de base de données s'accumulent. Les opérations de fichiers s'enfilent les unes derrière les autres. Les scrapers web rampent à une vitesse d'escargot. Chaque opération d'I/O devient un goulot d'étranglement qui se propage dans tout votre système, transformant ce qui devrait être une application rapide et réactive en un monstre lent et gaspilleur de ressources.
Python asyncio résout cela en permettant l'exécution concurrente de tâches I/O-bound. Au lieu d'attendre que chaque opération se termine avant de commencer la suivante, asyncio permet à votre code d'initier plusieurs opérations et de basculer entre elles pendant l'attente. Ces 100 appels API ? Avec asyncio, ils se terminent en environ 2 secondes au lieu de 200. Ce guide vous montre exactement comment implémenter la programmation asynchrone en Python, avec des exemples pratiques qui transforment un code lent et bloquant en applications rapides et concurrentes.
Qu'est-ce que la programmation asynchrone et pourquoi c'est important
La programmation asynchrone permet à un programme d'initier des tâches potentiellement longues et de passer à d'autres travaux avant que ces tâches ne se terminent, plutôt que d'attendre que chaque tâche se termine avant de commencer la suivante.
Dans du code synchrone traditionnel, lorsque vous faites une requête API, votre programme s'arrête et attend la réponse. Pendant cette période d'attente, votre CPU reste inactif, ne faisant rien de productif. C'est acceptable pour des opérations uniques, mais catastrophique pour les applications qui doivent gérer plusieurs opérations d'I/O.
Asyncio fournit un moyen d'écrire du code concurrent en utilisant la syntaxe async/await. Il est particulièrement efficace pour les opérations I/O-bound comme :
- Faire des requêtes HTTP vers des APIs
- Lire et écrire des fichiers
- Requêtes de base de données
- Communication réseau
- Connexions WebSocket
- Traitement de files de messages
L'amélioration des performances est spectaculaire. Considérez la récupération de données depuis 50 URLs différentes :
Approche synchrone : 50 requêtes × 2 secondes chacune = 100 secondes au total Approche asynchrone : Les 50 requêtes s'exécutent simultanément ≈ 2 secondes au total
Cette amélioration de performance 50x provient d'une meilleure utilisation des ressources. Au lieu de bloquer sur les opérations d'I/O, asyncio permet à votre programme de continuer à exécuter d'autres tâches pendant l'attente des opérations d'I/O.
Concurrence vs Parallélisme vs Async
Comprendre la distinction entre ces concepts est essentiel pour utiliser asyncio efficacement.
La concurrence signifie gérer plusieurs tâches à la fois. Les tâches se relaient pour progresser, mais seule une s'exécute à un moment donné. Pensez à un chef qui prépare plusieurs plats, passant d'une tâche à l'autre pendant que chacune attend de cuire.
Le parallélisme signifie exécuter plusieurs tâches simultanément sur différents cœurs de CPU. Cela nécessite du matériel de traitement parallèle réel et est idéal pour les tâches CPU-bound comme les calculs mathématiques ou le traitement d'images.
La programmation asynchrone est une forme spécifique de concurrence conçue pour les tâches I/O-bound. Elle utilise un seul thread et bascule entre les tâches lorsqu'elles attendent des opérations d'I/O.
| Caractéristique | asyncio | Threading | Multiprocessing |
|---|---|---|---|
| Modèle d'exécution | Single thread, multitâche coopératif | Multiples threads, multitâche préemptif | Processus multiples |
| Idéal pour | Tâches I/O-bound | Tâches I/O-bound avec bibliothèques bloquantes | Tâches CPU-bound |
| Surcharge mémoire | Minimale | Modérée | Élevée |
| Coût de changement de contexte | Très faible | Faible à modéré | Élevé |
| Complexité | Modérée (syntaxe async/await) | Élevée (conditions de course, verrous) | Élevée (IPC, sérialisation) |
| Limitation GIL | Non affectée (single thread) | Limitée par GIL | Non limitée (processus séparés) |
| Accélération typique pour I/O | 10-100x | 5-10x | N/A |
Le Global Interpreter Lock (GIL) de Python empêche l'exécution parallèle réelle du bytecode Python dans les threads, rendant le threading moins efficace pour les tâches CPU-bound. Asyncio contourne cette limitation en utilisant un seul thread avec du multitâche coopératif, tandis que multiprocessing la contourne entièrement avec des processus séparés.
Les mots-clés async def et await
La fondation d'asyncio repose sur deux mots-clés : async et await.
Le mot-clé async def définit une fonction coroutine. Lorsque vous appelez une fonction coroutine, elle ne s'exécute pas immédiatement. À la place, elle retourne un objet coroutine qui peut être attendu.
import asyncio
async def fetch_data():
print("Fetching data...")
await asyncio.sleep(2) # Simulate I/O operation
print("Data fetched!")
return {"status": "success"}
# Calling the function returns a coroutine object
coroutine = fetch_data()
print(type(coroutine)) # <class 'coroutine'>
# To actually run it, you need to await it or use asyncio.run()
# asyncio.run(coroutine) # This would execute the coroutineLe mot-clé await met en pause l'exécution d'une coroutine jusqu'à ce que l'opération attendue se termine. Pendant cette pause, l'event loop peut exécuter d'autres coroutines. Vous ne pouvez utiliser await qu'à l'intérieur d'une fonction async def.
async def process_user(user_id):
# Await an I/O operation
user_data = await fetch_user_from_database(user_id)
# Await another I/O operation
user_profile = await fetch_user_profile(user_id)
# Regular synchronous code runs normally
processed_data = transform_data(user_data, user_profile)
return processed_dataRègles clés pour async/await :
- Vous ne pouvez
awaitque des coroutines, tasks ou futures - Vous ne pouvez utiliser
awaitqu'à l'intérieur d'une fonctionasync def - Les fonctions régulières ne peuvent pas utiliser
await - Appeler une fonction async sans l'attendre crée un objet coroutine mais n'exécute pas le code
Erreur courante :
async def wrong_example():
# This creates a coroutine but doesn't execute it!
fetch_data() # Missing await
async def correct_example():
# This actually executes the coroutine
result = await fetch_data()Point d'entrée asyncio.run()
La fonction asyncio.run() est la manière standard de démarrer l'event loop asyncio et d'exécuter votre coroutine principale. Elle a été introduite dans Python 3.7 et simplifie l'exécution de code async depuis des contextes synchrones.
import asyncio
async def main():
print("Starting async operations")
await asyncio.sleep(1)
print("Finished")
# Run the main coroutine
asyncio.run(main())Ce que asyncio.run() fait en arrière-plan :
- Crée un nouvel event loop
- Exécute la coroutine fournie jusqu'à son terme
- Ferme l'event loop
- Retourne le résultat de la coroutine
import asyncio
async def main():
result = await compute_value()
return result
# The return value is accessible
final_result = asyncio.run(main())
print(final_result)Caractéristiques importantes de asyncio.run() :
- Ne peut pas être appelé depuis un event loop en cours d'exécution : Si vous êtes déjà dans une fonction async, utilisez
awaità la place - Crée un event loop frais à chaque fois : N'appelez pas
asyncio.run()plusieurs fois dans le même programme à moins de vouloir des instances d'event loop séparées - Ferme toujours la boucle : L'event loop est nettoyé après l'exécution
Pour les notebooks Jupyter ou les environnements où un event loop est déjà en cours d'exécution, utilisez await directement ou asyncio.create_task(). Des outils comme RunCell (opens in a new tab) fournissent un support async amélioré dans les environnements Jupyter, facilitant l'expérimentation interactive avec les patterns asyncio sans conflits d'event loop.
Avant Python 3.7, vous deviez gérer manuellement l'event loop :
# Old way (pre-Python 3.7)
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
finally:
loop.close()
# Modern way (Python 3.7+)
asyncio.run(main())Coroutines, Tasks et Futures
Comprendre ces trois concepts fondamentaux est essentiel pour maîtriser asyncio.
Coroutines
Une coroutine est une fonction définie avec async def. C'est une fonction spéciale qui peut être mise en pause et reprise, permettant à d'autres codes de s'exécuter pendant la pause.
import asyncio
async def my_coroutine():
await asyncio.sleep(1)
return "completed"
# This creates a coroutine object
coro = my_coroutine()
# To run it
result = asyncio.run(coro)Tasks
Une task est un wrapper autour d'une coroutine qui la planifie pour l'exécution sur l'event loop. Les tasks permettent aux coroutines de s'exécuter simultanément.
import asyncio
async def say_after(delay, message):
await asyncio.sleep(delay)
print(message)
return message
async def main():
# Create tasks to run concurrently
task1 = asyncio.create_task(say_after(1, "First"))
task2 = asyncio.create_task(say_after(2, "Second"))
# Wait for both tasks to complete
result1 = await task1
result2 = await task2
print(f"Results: {result1}, {result2}")
asyncio.run(main())Créer une task planifie immédiatement la coroutine pour l'exécution. L'event loop commence à l'exécuter dès que possible, même avant que vous n'attendiez la task.
async def background_work():
print("Starting background work")
await asyncio.sleep(2)
print("Background work completed")
async def main():
# The coroutine starts running immediately
task = asyncio.create_task(background_work())
print("Task created, doing other work")
await asyncio.sleep(1)
print("Other work done")
# Wait for the background task to finish
await task
asyncio.run(main())Output :
Task created, doing other work
Starting background work
Other work done
Background work completedFutures
Une future est un objet awaitable bas niveau qui représente un résultat éventuel d'une opération asynchrone. Vous créez rarement des futures directement ; elles sont généralement créées par asyncio ou les bibliothèques.
async def set_future_result(future):
await asyncio.sleep(1)
future.set_result("Future completed")
async def main():
loop = asyncio.get_event_loop()
future = loop.create_future()
# Schedule a coroutine to set the future's result
asyncio.create_task(set_future_result(future))
# Wait for the future to get a result
result = await future
print(result)
asyncio.run(main())Relation entre coroutines, tasks et futures :
- Les coroutines sont les fonctions que vous écrivez
- Les tasks enveloppent les coroutines et les planifient pour l'exécution
- Les futures représentent des résultats qui seront disponibles dans le futur
- Les tasks sont une sous-classe de Future
asyncio.create_task() pour l'exécution concurrente
La fonction asyncio.create_task() est votre outil principal pour atteindre la vraie concurrence avec asyncio. Elle planifie une coroutine pour s'exécuter sur l'event loop sans bloquer la coroutine actuelle.
import asyncio
import time
async def download_file(file_id):
print(f"Starting download {file_id}")
await asyncio.sleep(2) # Simulate download time
print(f"Completed download {file_id}")
return f"file_{file_id}.dat"
async def sequential_downloads():
"""Downloads files one at a time"""
start = time.time()
file1 = await download_file(1)
file2 = await download_file(2)
file3 = await download_file(3)
elapsed = time.time() - start
print(f"Sequential: {elapsed:.2f} seconds")
# Output: ~6 seconds (2 + 2 + 2)
async def concurrent_downloads():
"""Downloads files concurrently"""
start = time.time()
# Create tasks for concurrent execution
task1 = asyncio.create_task(download_file(1))
task2 = asyncio.create_task(download_file(2))
task3 = asyncio.create_task(download_file(3))
# Wait for all tasks to complete
file1 = await task1
file2 = await task2
file3 = await task3
elapsed = time.time() - start
print(f"Concurrent: {elapsed:.2f} seconds")
# Output: ~2 seconds (all run at the same time)
asyncio.run(concurrent_downloads())La task est planifiée immédiatement lorsque create_task() est appelée. Vous n'avez pas besoin de l'attendre immédiatement.
async def process_data():
# Start background tasks
task1 = asyncio.create_task(fetch_from_api_1())
task2 = asyncio.create_task(fetch_from_api_2())
# Do some work while tasks run in background
local_data = prepare_local_data()
# Now wait for the background tasks
api_data_1 = await task1
api_data_2 = await task2
# Combine all data
return combine_data(local_data, api_data_1, api_data_2)Vous pouvez également nommer les tasks pour un meilleur débogage :
async def main():
task = asyncio.create_task(
long_running_operation(),
name="long-operation-task"
)
# Task name is accessible
print(f"Task name: {task.get_name()}")
await taskasyncio.gather() pour exécuter plusieurs coroutines
La fonction asyncio.gather() exécute plusieurs coroutines simultanément et attend qu'elles se terminent toutes. C'est plus propre que de créer des tasks individuelles lorsque vous devez exécuter de nombreuses coroutines.
import asyncio
async def fetch_user(user_id):
await asyncio.sleep(1)
return {"id": user_id, "name": f"User {user_id}"}
async def main():
# Run multiple coroutines concurrently
results = await asyncio.gather(
fetch_user(1),
fetch_user(2),
fetch_user(3),
fetch_user(4),
fetch_user(5)
)
print(results)
# Output: [{'id': 1, 'name': 'User 1'}, {'id': 2, 'name': 'User 2'}, ...]
asyncio.run(main())gather() retourne les résultats dans le même ordre que les coroutines d'entrée, quelle que soit celle qui se termine en premier.
async def task_with_delay(delay, value):
await asyncio.sleep(delay)
return value
async def main():
results = await asyncio.gather(
task_with_delay(3, "slow"),
task_with_delay(1, "fast"),
task_with_delay(2, "medium")
)
print(results)
# Output: ['slow', 'fast', 'medium'] (order preserved)
asyncio.run(main())Gestion des erreurs avec gather()
Par défaut, si une coroutine lève une exception, gather() lève immédiatement cette exception et annule les tasks restantes.
async def failing_task():
await asyncio.sleep(1)
raise ValueError("Task failed!")
async def successful_task():
await asyncio.sleep(2)
return "success"
async def main():
try:
results = await asyncio.gather(
failing_task(),
successful_task()
)
except ValueError as e:
print(f"Error caught: {e}")
# The successful_task is cancelled
asyncio.run(main())Utilisez return_exceptions=True pour collecter les exceptions avec les résultats réussis :
async def main():
results = await asyncio.gather(
failing_task(),
successful_task(),
return_exceptions=True
)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Task {i} failed: {result}")
else:
print(f"Task {i} succeeded: {result}")
asyncio.run(main())gather() dynamique avec compréhension de liste
gather() fonctionne parfaitement avec des coroutines générées dynamiquement :
async def process_item(item):
await asyncio.sleep(1)
return item * 2
async def main():
items = [1, 2, 3, 4, 5]
# Process all items concurrently
results = await asyncio.gather(
*[process_item(item) for item in items]
)
print(results) # [2, 4, 6, 8, 10]
asyncio.run(main())asyncio.wait() et asyncio.as_completed()
Alors que gather() attend que toutes les coroutines se terminent, wait() et as_completed() fournissent un contrôle plus granulaire sur la gestion des tasks terminées.
asyncio.wait()
wait() vous permet d'attendre des tasks avec différentes conditions de fin : toutes les tasks, la première task, ou la première exception.
import asyncio
async def task(delay, name):
await asyncio.sleep(delay)
return f"{name} completed"
async def main():
tasks = [
asyncio.create_task(task(1, "Task 1")),
asyncio.create_task(task(2, "Task 2")),
asyncio.create_task(task(3, "Task 3"))
]
# Wait for all tasks to complete
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.ALL_COMPLETED
)
for task in done:
print(task.result())
asyncio.run(main())Différentes conditions de fin :
async def main():
tasks = [
asyncio.create_task(task(1, "Fast")),
asyncio.create_task(task(3, "Slow")),
asyncio.create_task(task(2, "Medium"))
]
# Return when the first task completes
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED
)
print(f"First completed: {done.pop().result()}")
print(f"Still pending: {len(pending)} tasks")
# Cancel remaining tasks
for task in pending:
task.cancel()
asyncio.run(main())asyncio.as_completed()
as_completed() retourne un itérateur qui produit les tasks au fur et à mesure qu'elles se terminent, vous permettant de traiter les résultats dès qu'ils sont disponibles.
import asyncio
async def fetch_data(url_id, delay):
await asyncio.sleep(delay)
return f"Data from URL {url_id}"
async def main():
tasks = [
fetch_data(1, 3),
fetch_data(2, 1),
fetch_data(3, 2)
]
# Process results as they complete
for coro in asyncio.as_completed(tasks):
result = await coro
print(f"Received: {result}")
asyncio.run(main())L'output montre les résultats dans l'ordre de fin, pas l'ordre de soumission :
Received: Data from URL 2
Received: Data from URL 3
Received: Data from URL 1C'est particulièrement utile lorsque vous voulez afficher des résultats aux utilisateurs aussi vite que possible :
async def search_engine(query, engine_name, delay):
await asyncio.sleep(delay)
return f"{engine_name}: Results for '{query}'"
async def main():
query = "python asyncio"
searches = [
search_engine(query, "Google", 1.5),
search_engine(query, "Bing", 2.0),
search_engine(query, "DuckDuckGo", 1.0)
]
print("Searching...")
for search in asyncio.as_completed(searches):
result = await search
print(result) # Display each result as soon as it arrives
asyncio.run(main())asyncio.sleep() vs time.sleep()
Cette distinction est cruciale pour la correction du code async.
time.sleep() est une opération bloquante qui met en pause l'intégralité du thread, y compris l'event loop. Cela empêche toutes les tasks async de s'exécuter.
asyncio.sleep() est une coroutine non bloquante qui met en pause uniquement la task actuelle, permettant aux autres tasks de s'exécuter.
import asyncio
import time
async def blocking_example():
"""Bad: Uses time.sleep() - blocks the entire event loop"""
print("Starting blocking sleep")
time.sleep(2) # WRONG: Blocks everything!
print("Finished blocking sleep")
async def non_blocking_example():
"""Good: Uses asyncio.sleep() - allows other tasks to run"""
print("Starting non-blocking sleep")
await asyncio.sleep(2) # CORRECT: Only pauses this task
print("Finished non-blocking sleep")
async def concurrent_task():
for i in range(3):
print(f"Concurrent task running: {i}")
await asyncio.sleep(0.5)
async def demo_blocking():
print("\n=== Blocking example (BAD) ===")
await asyncio.gather(
blocking_example(),
concurrent_task()
)
async def demo_non_blocking():
print("\n=== Non-blocking example (GOOD) ===")
await asyncio.gather(
non_blocking_example(),
concurrent_task()
)
asyncio.run(demo_blocking())
asyncio.run(demo_non_blocking())Dans l'exemple bloquant, la task concurrente ne s'exécute pas tant que time.sleep() n'est pas terminé. Dans l'exemple non bloquant, les deux tasks s'exécutent simultanément.
Règle de base : N'utilisez jamais time.sleep() dans du code async. Utilisez toujours await asyncio.sleep().
Pour les opérations CPU-bound que vous ne pouvez éviter, utilisez loop.run_in_executor() pour les exécuter dans un thread ou processus séparé :
import asyncio
import time
def cpu_intensive_task():
"""Some blocking CPU work"""
time.sleep(2) # Simulate heavy computation
return "CPU task completed"
async def main():
loop = asyncio.get_event_loop()
# Run blocking task in a thread pool
result = await loop.run_in_executor(None, cpu_intensive_task)
print(result)
asyncio.run(main())async for et async with
Python fournit des versions async des boucles for et des gestionnaires de contexte pour travailler avec des itérables et ressources asynchrones.
Itérateurs asynchrones (async for)
Un itérateur async est un objet qui implémente les méthodes __aiter__() et __anext__(), vous permettant d'itérer sur des éléments qui nécessitent des opérations async pour être récupérés.
import asyncio
class AsyncRange:
"""An async iterator that yields numbers with delays"""
def __init__(self, start, end):
self.current = start
self.end = end
def __aiter__(self):
return self
async def __anext__(self):
if self.current >= self.end:
raise StopAsyncIteration
# Simulate async operation to get next value
await asyncio.sleep(0.5)
value = self.current
self.current += 1
return value
async def main():
async for number in AsyncRange(1, 5):
print(f"Got number: {number}")
asyncio.run(main())Exemple concret avec un curseur de base de données async :
class AsyncDatabaseCursor:
"""Simulated async database cursor"""
def __init__(self, query):
self.query = query
self.results = []
self.index = 0
async def execute(self):
# Simulate database query
await asyncio.sleep(1)
self.results = [
{"id": 1, "name": "Alice"},
{"id": 2, "name": "Bob"},
{"id": 3, "name": "Charlie"}
]
def __aiter__(self):
return self
async def __anext__(self):
if self.index >= len(self.results):
raise StopAsyncIteration
# Simulate fetching next row
await asyncio.sleep(0.1)
row = self.results[self.index]
self.index += 1
return row
async def fetch_users():
cursor = AsyncDatabaseCursor("SELECT * FROM users")
await cursor.execute()
async for row in cursor:
print(f"User: {row['name']}")
asyncio.run(fetch_users())Gestionnaires de contexte asynchrones (async with)
Un gestionnaire de contexte async implémente les méthodes __aenter__() et __aexit__() pour gérer des ressources qui nécessitent une configuration et un nettoyage async.
import asyncio
class AsyncDatabaseConnection:
"""An async context manager for database connections"""
async def __aenter__(self):
print("Opening database connection...")
await asyncio.sleep(1) # Simulate connection time
print("Database connection opened")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("Closing database connection...")
await asyncio.sleep(0.5) # Simulate cleanup
print("Database connection closed")
async def query(self, sql):
await asyncio.sleep(0.5)
return f"Results for: {sql}"
async def main():
async with AsyncDatabaseConnection() as db:
result = await db.query("SELECT * FROM users")
print(result)
# Connection automatically closed after the with block
asyncio.run(main())Combiner les gestionnaires de contexte async avec les itérateurs async :
class AsyncFileReader:
"""Async context manager and iterator for reading files"""
def __init__(self, filename):
self.filename = filename
self.lines = []
self.index = 0
async def __aenter__(self):
# Simulate async file opening
await asyncio.sleep(0.5)
self.lines = ["Line 1", "Line 2", "Line 3"]
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await asyncio.sleep(0.2)
self.lines = []
def __aiter__(self):
return self
async def __anext__(self):
if self.index >= len(self.lines):
raise StopAsyncIteration
await asyncio.sleep(0.1)
line = self.lines[self.index]
self.index += 1
return line
async def read_file():
async with AsyncFileReader("data.txt") as reader:
async for line in reader:
print(line)
asyncio.run(read_file())asyncio.Queue pour les patterns Producteur-Consommateur
asyncio.Queue est une file thread-safe, consciente de l'async, parfaite pour coordonner le travail entre des coroutines producteur et consommateur.
import asyncio
import random
async def producer(queue, producer_id):
"""Produces items and puts them in the queue"""
for i in range(5):
await asyncio.sleep(random.uniform(0.1, 0.5))
item = f"Item-{producer_id}-{i}"
await queue.put(item)
print(f"Producer {producer_id} produced: {item}")
async def consumer(queue, consumer_id):
"""Consumes items from the queue"""
while True:
item = await queue.get()
print(f"Consumer {consumer_id} consuming: {item}")
# Simulate processing time
await asyncio.sleep(random.uniform(0.2, 0.8))
queue.task_done()
async def main():
queue = asyncio.Queue(maxsize=10)
# Create producers and consumers
producers = [asyncio.create_task(producer(queue, i)) for i in range(2)]
consumers = [asyncio.create_task(consumer(queue, i)) for i in range(3)]
# Wait for all producers to finish
await asyncio.gather(*producers)
# Wait for the queue to be fully processed
await queue.join()
# Cancel consumers (they run forever)
for consumer_task in consumers:
consumer_task.cancel()
asyncio.run(main())Exemple concret : Scraper web avec file d'URLs :
import asyncio
from typing import Set
async def fetch_url(session, url):
"""Simulate fetching a URL"""
await asyncio.sleep(1)
return f"Content from {url}"
async def producer(queue: asyncio.Queue, start_urls: list):
"""Add URLs to the queue"""
for url in start_urls:
await queue.put(url)
async def consumer(queue: asyncio.Queue, visited: Set[str]):
"""Fetch URLs from the queue"""
while True:
url = await queue.get()
if url in visited:
queue.task_done()
continue
visited.add(url)
print(f"Scraping: {url}")
# Simulate fetching
content = await fetch_url(None, url)
# Simulate finding new URLs in the content
# In real scraper, you'd parse HTML and extract links
new_urls = [] # Would extract from content
for new_url in new_urls:
if new_url not in visited:
await queue.put(new_url)
queue.task_done()
async def main():
queue = asyncio.Queue()
visited = set()
start_urls = [
"https://example.com/page1",
"https://example.com/page2",
"https://example.com/page3"
]
# Add initial URLs
await producer(queue, start_urls)
# Create consumers
consumers = [
asyncio.create_task(consumer(queue, visited))
for _ in range(3)
]
# Wait for all URLs to be processed
await queue.join()
# Cancel consumers
for consumer_task in consumers:
consumer_task.cancel()
print(f"Scraped {len(visited)} unique URLs")
asyncio.run(main())Sémaphores pour la limitation de débit
asyncio.Semaphore contrôle le nombre de coroutines qui peuvent accéder à une ressource simultanément. C'est essentiel pour limiter le débit des appels API ou limiter les connexions de base de données concurrentes.
import asyncio
import time
async def call_api(semaphore, api_id):
"""Make an API call with rate limiting"""
async with semaphore:
print(f"API call {api_id} started")
await asyncio.sleep(1) # Simulate API call
print(f"API call {api_id} completed")
return f"Result {api_id}"
async def main():
# Allow only 3 concurrent API calls
semaphore = asyncio.Semaphore(3)
start = time.time()
# Create 10 API calls
tasks = [call_api(semaphore, i) for i in range(10)]
results = await asyncio.gather(*tasks)
elapsed = time.time() - start
print(f"\nCompleted {len(results)} API calls in {elapsed:.2f} seconds")
# With semaphore(3): ~4 seconds (10 calls in batches of 3)
# Without semaphore: ~1 second (all concurrent)
asyncio.run(main())Limitation de débit pour la conformité des quotas API :
import asyncio
from datetime import datetime
class RateLimiter:
"""Rate limiter that allows N requests per time period"""
def __init__(self, max_requests, time_period):
self.max_requests = max_requests
self.time_period = time_period
self.semaphore = asyncio.Semaphore(max_requests)
self.request_times = []
async def __aenter__(self):
await self.semaphore.acquire()
# Wait if we've hit the rate limit
while len(self.request_times) >= self.max_requests:
oldest = self.request_times[0]
elapsed = datetime.now().timestamp() - oldest
if elapsed < self.time_period:
sleep_time = self.time_period - elapsed
await asyncio.sleep(sleep_time)
self.request_times.pop(0)
self.request_times.append(datetime.now().timestamp())
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
self.semaphore.release()
async def fetch_with_rate_limit(rate_limiter, item_id):
async with rate_limiter:
print(f"Fetching item {item_id} at {datetime.now()}")
await asyncio.sleep(0.5)
return f"Item {item_id}"
async def main():
# Allow 5 requests per 2 seconds
rate_limiter = RateLimiter(max_requests=5, time_period=2)
# Make 15 requests
tasks = [fetch_with_rate_limit(rate_limiter, i) for i in range(15)]
results = await asyncio.gather(*tasks)
print(f"Completed {len(results)} requests")
asyncio.run(main())aiohttp pour les requêtes HTTP asynchrones
La bibliothèque aiohttp fournit des fonctionnalités client et serveur HTTP async. C'est le choix standard pour faire des requêtes HTTP dans du code async.
import asyncio
import aiohttp
async def fetch_url(session, url):
"""Fetch a single URL"""
async with session.get(url) as response:
return await response.text()
async def main():
async with aiohttp.ClientSession() as session:
html = await fetch_url(session, "https://api.github.com")
print(f"Fetched {len(html)} characters")
asyncio.run(main())Récupération de plusieurs URLs simultanément :
import asyncio
import aiohttp
import time
async def fetch_url(session, url):
"""Fetch URL and return status code and content length"""
async with session.get(url) as response:
content = await response.text()
return {
"url": url,
"status": response.status,
"length": len(content)
}
async def fetch_all_urls(urls):
"""Fetch multiple URLs concurrently"""
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
async def main():
urls = [
"https://api.github.com",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/status/200",
"https://httpbin.org/status/404"
]
start = time.time()
results = await fetch_all_urls(urls)
elapsed = time.time() - start
for result in results:
if isinstance(result, Exception):
print(f"Error: {result}")
else:
print(f"{result['url']}: {result['status']} ({result['length']} bytes)")
print(f"\nFetched {len(urls)} URLs in {elapsed:.2f} seconds")
asyncio.run(main())Exemple pratique avec gestion des erreurs et retries :
import asyncio
import aiohttp
from typing import Optional
async def fetch_with_retry(
session: aiohttp.ClientSession,
url: str,
max_retries: int = 3
) -> Optional[dict]:
"""Fetch URL with retry logic"""
for attempt in range(max_retries):
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
if response.status == 200:
data = await response.json()
return data
elif response.status == 404:
print(f"URL not found: {url}")
return None
else:
print(f"Unexpected status {response.status} for {url}")
except asyncio.TimeoutError:
print(f"Timeout on attempt {attempt + 1} for {url}")
except aiohttp.ClientError as e:
print(f"Client error on attempt {attempt + 1} for {url}: {e}")
if attempt < max_retries - 1:
await asyncio.sleep(2 ** attempt) # Exponential backoff
print(f"Failed to fetch {url} after {max_retries} attempts")
return None
async def main():
urls = [
"https://api.github.com/users/github",
"https://api.github.com/users/nonexistent",
"https://httpbin.org/delay/5"
]
async with aiohttp.ClientSession() as session:
tasks = [fetch_with_retry(session, url) for url in urls]
results = await asyncio.gather(*tasks)
successful = [r for r in results if r is not None]
print(f"\nSuccessfully fetched {len(successful)}/{len(urls)} URLs")
asyncio.run(main())aiofiles pour les opérations de fichier asynchrones
La bibliothèque aiofiles fournit des opérations de fichier async, empêchant le blocage pendant les lectures et écritures de fichiers.
import asyncio
import aiofiles
async def read_file(filename):
"""Read file asynchronously"""
async with aiofiles.open(filename, mode='r') as f:
contents = await f.read()
return contents
async def write_file(filename, content):
"""Write file asynchronously"""
async with aiofiles.open(filename, mode='w') as f:
await f.write(content)
async def main():
# Write data
await write_file('test.txt', 'Hello, async file I/O!')
# Read data
content = await read_file('test.txt')
print(f"Read: {content}")
asyncio.run(main())Traitement de plusieurs fichiers simultanément :
import asyncio
import aiofiles
async def process_file(input_file, output_file):
"""Read, process, and write a file"""
async with aiofiles.open(input_file, mode='r') as f:
content = await f.read()
# Process content (convert to uppercase)
processed = content.upper()
async with aiofiles.open(output_file, mode='w') as f:
await f.write(processed)
return f"Processed {input_file} -> {output_file}"
async def main():
files = [
('input1.txt', 'output1.txt'),
('input2.txt', 'output2.txt'),
('input3.txt', 'output3.txt')
]
tasks = [process_file(inp, out) for inp, out in files]
results = await asyncio.gather(*tasks)
for result in results:
print(result)
asyncio.run(main())Lecture de gros fichiers ligne par ligne :
import asyncio
import aiofiles
async def process_large_file(filename):
"""Process a large file line by line without loading it all into memory"""
line_count = 0
async with aiofiles.open(filename, mode='r') as f:
async for line in f:
# Process each line
line_count += 1
if line.strip():
# Do something with the line
pass
return line_count
async def main():
count = await process_large_file('large_data.txt')
print(f"Processed {count} lines")
asyncio.run(main())Gestion des erreurs dans le code asynchrone
La gestion des erreurs dans le code async nécessite une attention particulière pour s'assurer que les exceptions sont correctement capturées et que les ressources sont nettoyées.
import asyncio
async def risky_operation(item_id):
"""An operation that might fail"""
await asyncio.sleep(1)
if item_id % 2 == 0:
raise ValueError(f"Item {item_id} caused an error")
return f"Item {item_id} processed"
async def handle_with_try_except():
"""Handle errors with try/except"""
try:
result = await risky_operation(2)
print(result)
except ValueError as e:
print(f"Error caught: {e}")
asyncio.run(handle_with_try_except())Gestion des erreurs dans des tasks concurrentes :
async def safe_operation(item_id):
"""Wrapper that catches errors from risky_operation"""
try:
result = await risky_operation(item_id)
return {"success": True, "result": result}
except Exception as e:
return {"success": False, "error": str(e), "item_id": item_id}
async def main():
tasks = [safe_operation(i) for i in range(5)]
results = await asyncio.gather(*tasks)
for result in results:
if result["success"]:
print(f"Success: {result['result']}")
else:
print(f"Failed: Item {result['item_id']} - {result['error']}")
asyncio.run(main())Utilisation de gather() avec return_exceptions=True :
async def main():
tasks = [risky_operation(i) for i in range(5)]
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Task {i} failed: {result}")
else:
print(f"Task {i} succeeded: {result}")
asyncio.run(main())Nettoyage avec des gestionnaires de contexte async :
class AsyncResource:
"""A resource that needs cleanup even if errors occur"""
async def __aenter__(self):
print("Acquiring resource")
await asyncio.sleep(0.5)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("Cleaning up resource")
await asyncio.sleep(0.5)
if exc_type is not None:
print(f"Exception during resource use: {exc_val}")
# Return False to propagate exception, True to suppress
return False
async def do_work(self):
await asyncio.sleep(1)
raise ValueError("Something went wrong")
async def main():
try:
async with AsyncResource() as resource:
await resource.do_work()
except ValueError as e:
print(f"Caught error: {e}")
asyncio.run(main())Benchmarks de performance : Synchrone vs Asynchrone pour les tâches I/O-bound
Comparons les approches synchrone et asynchrone pour les opérations I/O-bound avec des benchmarks réels.
import asyncio
import time
import requests
import aiohttp
# Synchronous approach
def fetch_sync(url):
response = requests.get(url)
return len(response.text)
def benchmark_sync(urls):
start = time.time()
results = [fetch_sync(url) for url in urls]
elapsed = time.time() - start
return elapsed, results
# Asynchronous approach
async def fetch_async(session, url):
async with session.get(url) as response:
text = await response.text()
return len(text)
async def benchmark_async(urls):
start = time.time()
async with aiohttp.ClientSession() as session:
tasks = [fetch_async(session, url) for url in urls]
results = await asyncio.gather(*tasks)
elapsed = time.time() - start
return elapsed, results
# Run benchmarks
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1"
]
print("Benchmarking synchronous approach...")
sync_time, sync_results = benchmark_sync(urls)
print(f"Synchronous: {sync_time:.2f} seconds")
print("\nBenchmarking asynchronous approach...")
async_time, async_results = asyncio.run(benchmark_async(urls))
print(f"Asynchronous: {async_time:.2f} seconds")
print(f"\nSpeedup: {sync_time / async_time:.2f}x")Résultats typiques pour 5 URLs avec 1 seconde de délai chacune :
- Synchrone : ~5 secondes (exécution séquentielle)
- Asynchrone : ~1 seconde (exécution concurrente)
- Accélération : ~5x
| Nombre d'URLs | Temps Sync | Temps Async | Accélération |
|---|---|---|---|
| 5 | 5.2s | 1.1s | 4.7x |
| 10 | 10.4s | 1.2s | 8.7x |
| 20 | 20.8s | 1.4s | 14.9x |
| 50 | 52.1s | 2.1s | 24.8x |
| 100 | 104.5s | 3.8s | 27.5x |
L'accélération augmente avec le nombre d'opérations concurrentes jusqu'à ce que vous atteigniez les contraintes de bande passante ou de limitation de débit.
Pièges courants et comment les éviter
Oublier d'await
# WRONG: Coroutine is created but not executed
async def wrong():
result = fetch_data() # Missing await!
print(result) # Prints coroutine object, not the result
# CORRECT: Await the coroutine
async def correct():
result = await fetch_data()
print(result) # Prints actual resultBloquer l'event loop
# WRONG: Blocks the entire event loop
async def blocking_code():
time.sleep(5) # Blocks everything!
result = compute_something() # Blocks if CPU-intensive
return result
# CORRECT: Use asyncio.sleep and run_in_executor
async def non_blocking_code():
await asyncio.sleep(5) # Only pauses this task
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, compute_something)
return resultNe pas gérer l'annulation des tasks
# WRONG: Doesn't handle cancellation
async def incomplete_cleanup():
resource = await acquire_resource()
await long_operation()
await release_resource(resource) # May not execute if cancelled
# CORRECT: Use try/finally or async with
async def proper_cleanup():
resource = await acquire_resource()
try:
await long_operation()
finally:
await release_resource(resource) # Always executesCréer des conflits d'event loop
# WRONG: Calling asyncio.run() in an async function
async def wrong_nesting():
result = asyncio.run(some_coroutine()) # Error!
return result
# CORRECT: Just await the coroutine
async def correct_nesting():
result = await some_coroutine()
return resultNe pas limiter la concurrence
# WRONG: Launches too many tasks simultaneously
async def unlimited_concurrency(items):
tasks = [process_item(item) for item in items] # 10,000 tasks!
return await asyncio.gather(*tasks)
# CORRECT: Use semaphore to limit concurrency
async def limited_concurrency(items):
semaphore = asyncio.Semaphore(10) # Max 10 concurrent
async def process_with_limit(item):
async with semaphore:
return await process_item(item)
tasks = [process_with_limit(item) for item in items]
return await asyncio.gather(*tasks)Exemples concrets
Web Scraping avec limitation de débit
import asyncio
import aiohttp
from bs4 import BeautifulSoup
class AsyncWebScraper:
def __init__(self, max_concurrent=5):
self.semaphore = asyncio.Semaphore(max_concurrent)
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.session.close()
async def fetch_page(self, url):
async with self.semaphore:
try:
async with self.session.get(url) as response:
return await response.text()
except Exception as e:
print(f"Error fetching {url}: {e}")
return None
async def scrape_urls(self, urls):
tasks = [self.fetch_page(url) for url in urls]
return await asyncio.gather(*tasks)
async def main():
urls = [
"https://example.com/page1",
"https://example.com/page2",
"https://example.com/page3"
]
async with AsyncWebScraper(max_concurrent=3) as scraper:
pages = await scraper.scrape_urls(urls)
for url, html in zip(urls, pages):
if html:
print(f"Scraped {url}: {len(html)} bytes")
asyncio.run(main())Pipeline de données API asynchrone
import asyncio
import aiohttp
async def fetch_user_ids(session):
"""Fetch list of user IDs from API"""
async with session.get("https://api.example.com/users") as response:
data = await response.json()
return [user["id"] for user in data["users"]]
async def fetch_user_details(session, user_id):
"""Fetch detailed info for a user"""
async with session.get(f"https://api.example.com/users/{user_id}") as response:
return await response.json()
async def fetch_user_posts(session, user_id):
"""Fetch posts for a user"""
async with session.get(f"https://api.example.com/users/{user_id}/posts") as response:
return await response.json()
async def process_user(session, user_id):
"""Fetch all data for a user concurrently"""
details, posts = await asyncio.gather(
fetch_user_details(session, user_id),
fetch_user_posts(session, user_id)
)
return {
"user": details,
"posts": posts,
"post_count": len(posts)
}
async def main():
async with aiohttp.ClientSession() as session:
# Fetch user IDs
user_ids = await fetch_user_ids(session)
# Process all users concurrently
user_data = await asyncio.gather(
*[process_user(session, uid) for uid in user_ids[:10]]
)
for data in user_data:
print(f"User {data['user']['name']}: {data['post_count']} posts")
asyncio.run(main())Serveur de chat asynchrone
import asyncio
class ChatServer:
def __init__(self):
self.clients = set()
async def handle_client(self, reader, writer):
"""Handle a single client connection"""
addr = writer.get_extra_info('peername')
print(f"Client connected: {addr}")
self.clients.add(writer)
try:
while True:
data = await reader.read(100)
if not data:
break
message = data.decode()
print(f"Received from {addr}: {message}")
# Broadcast to all clients
await self.broadcast(f"{addr}: {message}", writer)
except asyncio.CancelledError:
pass
finally:
print(f"Client disconnected: {addr}")
self.clients.remove(writer)
writer.close()
await writer.wait_closed()
async def broadcast(self, message, sender):
"""Send message to all clients except sender"""
for client in self.clients:
if client != sender:
try:
client.write(message.encode())
await client.drain()
except Exception as e:
print(f"Error broadcasting: {e}")
async def start(self, host='127.0.0.1', port=8888):
"""Start the chat server"""
server = await asyncio.start_server(
self.handle_client,
host,
port
)
addr = server.sockets[0].getsockname()
print(f"Chat server running on {addr}")
async with server:
await server.serve_forever()
async def main():
chat_server = ChatServer()
await chat_server.start()
asyncio.run(main())Expérimenter avec Asyncio dans Jupyter
Lorsque vous travaillez avec asyncio dans des notebooks Jupyter, vous pouvez rencontrer des conflits d'event loop. Jupyter exécute déjà un event loop, ce qui peut interférer avec asyncio.run().
Pour une expérimentation async transparente dans les environnements Jupyter, envisagez d'utiliser RunCell (opens in a new tab), un agent IA conçu spécifiquement pour les notebooks Jupyter. RunCell gère automatiquement la gestion de l'event loop et fournit des capacités de débogage async améliorées, vous permettant de tester les patterns asyncio interactivement sans conflits.
Dans Jupyter standard, vous pouvez utiliser await au niveau supérieur :
# In Jupyter, this works directly
async def fetch_data():
await asyncio.sleep(1)
return "data"
# Just await directly, no asyncio.run() needed
result = await fetch_data()
print(result)Ou utilisez le package nest_asyncio pour permettre des event loops imbriqués :
import nest_asyncio
nest_asyncio.apply()
# Now asyncio.run() works in Jupyter
asyncio.run(main())FAQ
Quelle est la différence entre asyncio et threading en Python ?
Asyncio utilise du multitâche coopératif sur un seul thread, où les tasks cèdent volontairement le contrôle en utilisant await. Threading utilise du multitâche préemptif avec plusieurs threads OS, où le OS décide quand basculer entre les threads. Asyncio est plus efficace pour les tâches I/O-bound avec une surcharge mémoire plus faible et aucun risque de conditions de course, tandis que threading peut gérer des bibliothèques bloquantes qui ne supportent pas l'async. Les deux sont limités par le GIL de Python pour le travail CPU-bound, mais asyncio évite la surcharge du changement de contexte de thread.
Quand dois-je utiliser asyncio plutôt que multiprocessing ?
Utilisez asyncio pour les tâches I/O-bound comme les appels API, les requêtes de base de données, les opérations de fichiers et la communication réseau. Ces tâches passent la plupart de leur temps à attendre des ressources externes. Utilisez multiprocessing pour les tâches CPU-bound comme le traitement de données, les calculs mathématiques, la manipulation d'images et l'entraînement de modèles de machine learning. Multiprocessing crée des processus séparés qui contournent le GIL de Python, permettant une véritable exécution parallèle sur plusieurs cœurs de CPU. Asyncio excelle dans la gestion de milliers d'opérations d'I/O concurrentes avec une surcharge minimale, tandis que multiprocessing est limité par le nombre de cœurs CPU mais fournit un calcul parallèle réel.
Puis-je mélanger du code sync et async dans la même application ?
Oui, mais avec une planification soignée. Vous pouvez appeler des fonctions async depuis du code sync en utilisant asyncio.run(), bien que vous ne puissiez pas l'appeler depuis un event loop déjà en cours d'exécution. Pour appeler des fonctions bloquantes sync depuis du code async, utilisez loop.run_in_executor() pour les exécuter dans un pool de threads, empêchant ainsi le blocage de l'event loop. N'utilisez jamais d'opérations bloquantes comme time.sleep(), requests.get(), ou des I/O de fichiers synchrones directement dans des fonctions async, car elles bloquent l'intégralité de l'event loop. Utilisez plutôt des équivalents async comme asyncio.sleep(), aiohttp et aiofiles.
Comment déboguer efficacement du code asyncio ?
Activez le mode debug d'asyncio avec asyncio.run(main(), debug=True) ou en définissant la variable d'environnement PYTHONASYNCIODEBUG=1. Cela détecte les erreurs courantes comme l'oubli d'await, les callbacks prenant trop de temps, et les coroutines qui n'ont jamais été attendues. Utilisez extensivement le logging pour tracer le flux d'exécution, car les débogueurs traditionnels peuvent être déroutants avec du code async. Ajoutez des noms aux tasks avec asyncio.create_task(coro(), name="task-name") pour des messages d'erreur plus clairs. Utilisez asyncio.gather(..., return_exceptions=True) pour éviter qu'une task en échec ne cache les erreurs des autres. Surveillez votre event loop avec asyncio.all_tasks() pour vérifier les tasks qui ne se terminent pas.
Quelles sont les limites de performance d'asyncio ?
Asyncio peut gérer des dizaines de milliers d'opérations d'I/O concurrentes sur un seul thread, dépassant largement les limites du threading. La contrainte principale est qu'asyncio n'apporte aucun bénéfice pour les opérations CPU-bound puisqu'il utilise un seul thread. Les performances se dégradent si vous bloquez l'event loop avec des I/O synchrones ou des calculs lourds. La bande passante réseau et les limites de débit des API deviennent le goulot d'étranglement avant les limites de concurrence d'asyncio. L'utilisation mémoire évolue avec le nombre de tasks concurrentes, mais chaque task a une surcharge minimale comparée aux threads. Pour des performances maximales, combinez asyncio pour la concurrence d'I/O avec multiprocessing pour le travail CPU-bound, et utilisez toujours des sémaphores pour limiter les opérations concurrentes à des niveaux raisonnables.
Conclusion
Python asyncio transforme les applications I/O-bound d'opérations lentes et bloquantes en systèmes rapides et concurrents. En maîtrisant la syntaxe async/await, en comprenant l'event loop, et en exploitant des outils comme gather(), create_task(), et les sémaphores, vous pouvez construire des applications qui gèrent efficacement des milliers d'opérations concurrentes.
La clé du succès avec asyncio est de reconnaître quand c'est le bon outil. Utilisez-le pour les requêtes réseau, les requêtes de base de données, les opérations de fichiers, et toute tâche qui passe du temps à attendre des ressources externes. Évitez de bloquer l'event loop avec des opérations synchrones, utilisez toujours await avec les coroutines, et limitez la concurrence avec des sémaphores quand nécessaire.
Commencez par convertir de petites sections de votre codebase vers l'async, mesurez l'amélioration des performances, et développez progressivement. Les accélérations spectaculaires dans les applications lourdes en I/O valent l'investissement d'apprentissage.