Skip to content

Python asyncio: 비동기 프로그래밍 완벽 가이드

Updated on

귀하의 Python 애플리케이션이 100개의 API 호출을 수행하는데, 각각 2초가 소요됩니다. 기존의 순차적 코드를 사용하면 200초를 기다려야 합니다. 사용자들은 로딩 화면을 바라보고 있습니다. 서버들은 응답을 기다리며 유휴 상태로 리소스를 낭비하고 있습니다. 이러한 블로킹 동작은 애플리케이션 성능과 사용자 경험을 해치는 문제입니다.

규모가 커지면 고통이 심해집니다. 데이터베이스 쿼리가 쌓입니다. 파일 작업이 서로 뒤로 밀립니다. 웹 스크레이퍼가 달팽이 속도로 기어갑니다. 모든 I/O 작업이 병목 현상이 되어 전체 시스템에 연쇄적으로 영향을 미치며, 빠르고 반응성 좋은 애플리케이션이 되어야 할 것이 느리고 비효율적인 괴물로 변합니다.

Python asyncio는 I/O 바운드 태스크의 동시 실행을 가능하게 함으로써 이 문제를 해결합니다. 각 작업이 완료될 때까지 기다렸다가 다음을 시작하는 대신, asyncio는 코드가 여러 작업을 시작하고 대기하는 동안 그 사이를 전환할 수 있게 합니다. 그 100개의 API 호출이요? asyncio를 사용하면 200초가 아닌 대략 2초 안에 완료됩니다. 이 가이드는 Python에서 비동기 프로그래밍을 구현하는 방법을 정확히 보여주며, 느리고 블로킹되는 코드를 빠르고 동시적인 애플리케이션으로 변환하는 실용적인 예제를 제공합니다.

📚

비동기 프로그래밍이란 무엇이며 왜 중요한가

비동기 프로그래밍은 프로그램이 잠재적으로 오래 실행되는 태스크를 시작하고 해당 태스크가 완료되기 전에 다른 작업을 진행할 수 있게 합니다. 각 태스크가 완료될 때까지 기다렸다가 다음을 시작하는 것이 아닙니다.

기존의 동기식 코드에서는 API 요청을 하면 프로그램이 멈추고 응답을 기다립니다. 이 대기 기간 동안 CPU는 유휴 상태로 아무것도 생산적으로 하지 않습니다. 단일 작업에는 괜찮지만, 여러 I/O 작업을 처리해야 하는 애플리케이션에는 치명적입니다.

Asyncio는 async/await 구문을 사용하여 동시성 코드를 작성하는 방법을 제공합니다. 특히 다음과 같은 I/O 바운드 작업에 효과적입니다:

  • API에 HTTP 요청하기
  • 파일 읽고 쓰기
  • 데이터베이스 쿼리
  • 네트워크 통신
  • WebSocket 연결
  • 메시지 큐 처리

성능 향상은 극적입니다. 50개의 다른 URL에서 데이터를 가져오는 것을 고려해보세요:

동기식 접근: 50개 요청 × 각 2초 = 총 100초 비동기식 접근: 50개 요청 동시 실행 ≈ 총 2초

이 50배의 성능 향상은 더 나은 리소스 활용에서 비롯됩니다. I/O 작업에서 블로킹하는 대신, asyncio는 프로그램이 I/O가 완료되기를 기다리는 동안 다른 태스크를 계속 실행할 수 있게 합니다.

동시성(Concurrency) vs 병렬성(Parallelism) vs 비동기

이러한 개념들의 차이점을 이해하는 것은 asyncio를 효과적으로 사용하는 데 필수적입니다.

**동시성(Concurrency)**은 한 번에 여러 태스크를 관리하는 것을 의미합니다. 태스크들이 번갈아가며 진행되지만, 주어진 순간에는 하나만 실행됩니다. 여러 요리를 준비하면서 각 요리가 익기를 기다리는 동안 다른 태스크로 전환하는 요리사를 생각해보세요.

**병렬성(Parallelism)**은 여러 CPU 코어에서 여러 태스크를 동시에 실행하는 것을 의미합니다. 실제 병렬 처리 하드웨어가 필요하며, 수학 계산이나 이미지 처리와 같은 CPU 바운드 태스크에 이상적입니다.

비동기 프로그래밍은 I/O 바운드 태스크를 위해 설계된 동시성의 특정 형태입니다. 단일 스레드를 사용하고 I/O 작업을 기다릴 때 태스크 간에 전환합니다.

특징asyncioThreadingMultiprocessing
실행 모델단일 스레드, 협력적 멀티태스킹다중 스레드, 선점적 멀티태스킹다중 프로세스
가장 적합한 작업I/O 바운드 태스크블로킹 라이브러리가 있는 I/O 바운드 태스크CPU 바운드 태스크
메모리 오버헤드최소중간높음
컨텍스트 스위칭 비용매우 낮음낮음~중간높음
복잡성중간 (async/await 구문)높음 (레이스 컨디션, 락)높음 (IPC, 직렬화)
GIL 제한영향 없음 (단일 스레드)GIL에 의해 제한됨제한 없음 (별도 프로세스)
I/O에 대한 일반적인 속도 향상10-100배5-10배해당 없음

Python GIL(Global Interpreter Lock)은 스레드에서 Python 바이트코드의 진정한 병렬 실행을 방지하여 CPU 바운드 태스크에서 스레딩의 효율성을 떨어뜨립니다. Asyncio는 협력적 멀티태스킹을 사용하는 단일 스레드를 활용하여 이 제한을 회피하고, multiprocessing은 별도의 프로세스를 사용하여 완전히 우회합니다.

async def와 await 키워드

asyncio의 기초는 두 개의 키워드인 asyncawait에 있습니다.

async def 키워드는 코루틴 함수를 정의합니다. 코루틴 함수를 호출하면 즉시 실행되지 않습니다. 대신 await할 수 있는 코루틴 객체를 반환합니다.

import asyncio
 
async def fetch_data():
    print("Fetching data...")
    await asyncio.sleep(2)  # Simulate I/O operation
    print("Data fetched!")
    return {"status": "success"}
 
# Calling the function returns a coroutine object
coroutine = fetch_data()
print(type(coroutine))  # <class 'coroutine'>
 
# To actually run it, you need to await it or use asyncio.run()
# asyncio.run(coroutine)  # This would execute the coroutine

await 키워드는 await된 작업이 완료될 때까지 코루틴의 실행을 일시 중지합니다. 이 일시 중지 동안 이벤트 루프는 다른 코루틴을 실행할 수 있습니다. awaitasync def 함수 내부에서만 사용할 수 있습니다.

async def process_user(user_id):
    # Await an I/O operation
    user_data = await fetch_user_from_database(user_id)
 
    # Await another I/O operation
    user_profile = await fetch_user_profile(user_id)
 
    # Regular synchronous code runs normally
    processed_data = transform_data(user_data, user_profile)
 
    return processed_data

async/await의 핵심 규칙:

  1. 코루틴, 태스크, 또는 퓨처만 await할 수 있습니다
  2. awaitasync def 함수 내부에서만 사용할 수 있습니다
  3. 일반 함수는 await를 사용할 수 없습니다
  4. await 없이 async 함수를 호출하면 코루틴 객체를 만들지만 코드는 실행하지 않습니다

흔한 실수:

async def wrong_example():
    # This creates a coroutine but doesn't execute it!
    fetch_data()  # Missing await
 
async def correct_example():
    # This actually executes the coroutine
    result = await fetch_data()

asyncio.run() 진입점

asyncio.run() 함수는 asyncio 이벤트 루프를 시작하고 메인 코루틴을 실행하는 표준 방법입니다. Python 3.7에서 도입되었으며 동기 컨텍스트에서 비동기 코드를 실행하는 것을 단순화합니다.

import asyncio
 
async def main():
    print("Starting async operations")
    await asyncio.sleep(1)
    print("Finished")
 
# Run the main coroutine
asyncio.run(main())

asyncio.run()이 내부적으로 수행하는 작업:

  1. 새 이벤트 루프 생성
  2. 제공된 코루틴을 완료까지 실행
  3. 이벤트 루프 종료
  4. 코루틴의 결과 반환
import asyncio
 
async def main():
    result = await compute_value()
    return result
 
# The return value is accessible
final_result = asyncio.run(main())
print(final_result)

asyncio.run()의 중요한 특성:

  • 실행 중인 이벤트 루프 내부에서 호출할 수 없음: 이미 async 함수 내부에 있는 경우 대신 await를 사용하세요
  • 매번 새로운 이벤트 루프 생성: 별도의 이벤트 루프 인스턴스를 원하지 않는 한 같은 프로그램에서 asyncio.run()을 여러 번 호출하지 마세요
  • 항상 루프를 종료함: 실행 후 이벤트 루프가 정리됩니다

Jupyter 노트북이나 이미 이벤트 루프가 실행 중인 환경의 경우, await를 직접 사용하거나 asyncio.create_task()를 사용하세요. RunCell (opens in a new tab)과 같은 도구는 Jupyter 환경에서 이벤트 루프 충돌 없이 asyncio 패턴을 대화식으로 실험하기 쉽게 만들어 향상된 비동기 지원을 제공합니다.

Python 3.7 이전에는 이벤트 루프를 수동으로 관리해야 했습니다:

# Old way (pre-Python 3.7)
loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
finally:
    loop.close()
 
# Modern way (Python 3.7+)
asyncio.run(main())

코루틴, 태스크, 그리고 퓨처

이 세 가지 핵심 개념을 이해하는 것은 asyncio를 마스터하는 데 필수적입니다.

코루틴(Coroutines)

코루틴은 async def로 정의된 함수입니다. 일시 중지되고 재개될 수 있는 특별한 함수로, 일시 중지 동안 다른 코드가 실행될 수 있게 합니다.

import asyncio
 
async def my_coroutine():
    await asyncio.sleep(1)
    return "completed"
 
# This creates a coroutine object
coro = my_coroutine()
 
# To run it
result = asyncio.run(coro)

태스크(Tasks)

태스크는 이벤트 루프에서 실행을 위해 코루틴을 스케줄링하는 래퍼입니다. 태스크를 통해 코루틴을 동시에 실행할 수 있습니다.

import asyncio
 
async def say_after(delay, message):
    await asyncio.sleep(delay)
    print(message)
    return message
 
async def main():
    # Create tasks to run concurrently
    task1 = asyncio.create_task(say_after(1, "First"))
    task2 = asyncio.create_task(say_after(2, "Second"))
 
    # Wait for both tasks to complete
    result1 = await task1
    result2 = await task2
 
    print(f"Results: {result1}, {result2}")
 
asyncio.run(main())

태스크를 생성하면 코루틴이 즉시 실행을 위해 스케줄링됩니다. 이벤트 루프는 가능한 한 빨리, 태스크를 await하기 전에도 실행을 시작합니다.

async def background_work():
    print("Starting background work")
    await asyncio.sleep(2)
    print("Background work completed")
 
async def main():
    # The coroutine starts running immediately
    task = asyncio.create_task(background_work())
 
    print("Task created, doing other work")
    await asyncio.sleep(1)
    print("Other work done")
 
    # Wait for the background task to finish
    await task
 
asyncio.run(main())

출력:

Task created, doing other work
Starting background work
Other work done
Background work completed

퓨처(Futures)

퓨처는 비동기 작업의 최종 결과를 나타내는 저수준의 awaitable 객체입니다. 퓨처를 직접 생성하는 경우는 드물며, 일반적으로 asyncio 내부 또는 라이브러리에 의해 생성됩니다.

async def set_future_result(future):
    await asyncio.sleep(1)
    future.set_result("Future completed")
 
async def main():
    loop = asyncio.get_event_loop()
    future = loop.create_future()
 
    # Schedule a coroutine to set the future's result
    asyncio.create_task(set_future_result(future))
 
    # Wait for the future to get a result
    result = await future
    print(result)
 
asyncio.run(main())

코루틴, 태스크, 퓨처의 관계:

  • 코루틴은 여러분이 작성하는 함수입니다
  • 태스크는 코루틴을 감싸고 실행을 위해 스케줄링합니다
  • 퓨처는 미래에 사용할 수 있을 결과를 나타냅니다
  • 태스크는 퓨처의 서브클래스입니다

asyncio.create_task()로 동시 실행하기

asyncio.create_task() 함수는 asyncio로 진정한 동시성을 달성하는 주요 도구입니다. 현재 코루틴을 블로킹하지 않고 이벤트 루프에서 코루틴이 실행되도록 스케줄링합니다.

import asyncio
import time
 
async def download_file(file_id):
    print(f"Starting download {file_id}")
    await asyncio.sleep(2)  # Simulate download time
    print(f"Completed download {file_id}")
    return f"file_{file_id}.dat"
 
async def sequential_downloads():
    """Downloads files one at a time"""
    start = time.time()
 
    file1 = await download_file(1)
    file2 = await download_file(2)
    file3 = await download_file(3)
 
    elapsed = time.time() - start
    print(f"Sequential: {elapsed:.2f} seconds")
    # Output: ~6 seconds (2 + 2 + 2)
 
async def concurrent_downloads():
    """Downloads files concurrently"""
    start = time.time()
 
    # Create tasks for concurrent execution
    task1 = asyncio.create_task(download_file(1))
    task2 = asyncio.create_task(download_file(2))
    task3 = asyncio.create_task(download_file(3))
 
    # Wait for all tasks to complete
    file1 = await task1
    file2 = await task2
    file3 = await task3
 
    elapsed = time.time() - start
    print(f"Concurrent: {elapsed:.2f} seconds")
    # Output: ~2 seconds (all run at the same time)
 
asyncio.run(concurrent_downloads())

create_task()가 호출되면 태스크가 즉시 스케줄링됩니다. 바로 await할 필요가 없습니다.

async def process_data():
    # Start background tasks
    task1 = asyncio.create_task(fetch_from_api_1())
    task2 = asyncio.create_task(fetch_from_api_2())
 
    # Do some work while tasks run in background
    local_data = prepare_local_data()
 
    # Now wait for the background tasks
    api_data_1 = await task1
    api_data_2 = await task2
 
    # Combine all data
    return combine_data(local_data, api_data_1, api_data_2)

디버깅을 위해 태스크에 이름을 지정할 수도 있습니다:

async def main():
    task = asyncio.create_task(
        long_running_operation(),
        name="long-operation-task"
    )
 
    # Task name is accessible
    print(f"Task name: {task.get_name()}")
 
    await task

asyncio.gather()로 여러 코루틴 실행하기

asyncio.gather() 함수는 여러 코루틴을 동시에 실행하고 모두 완료될 때까지 기다립니다. 많은 코루틴을 실행해야 할 때 개별 태스크를 생성하는 것보다 깔끔합니다.

import asyncio
 
async def fetch_user(user_id):
    await asyncio.sleep(1)
    return {"id": user_id, "name": f"User {user_id}"}
 
async def main():
    # Run multiple coroutines concurrently
    results = await asyncio.gather(
        fetch_user(1),
        fetch_user(2),
        fetch_user(3),
        fetch_user(4),
        fetch_user(5)
    )
 
    print(results)
    # Output: [{'id': 1, 'name': 'User 1'}, {'id': 2, 'name': 'User 2'}, ...]
 
asyncio.run(main())

gather()는 입력 코루틴과 동일한 순서로 결과를 반환하며, 어떤 것이 먼저 완료되는지와 관계없습니다.

async def task_with_delay(delay, value):
    await asyncio.sleep(delay)
    return value
 
async def main():
    results = await asyncio.gather(
        task_with_delay(3, "slow"),
        task_with_delay(1, "fast"),
        task_with_delay(2, "medium")
    )
 
    print(results)
    # Output: ['slow', 'fast', 'medium'] (order preserved)
 
asyncio.run(main())

gather()로 에러 처리하기

기본적으로 코루틴 중 하나라도 예외를 발생시키면 gather()는 즉시 해당 예외를 발생시키고 남은 태스크를 취소합니다.

async def failing_task():
    await asyncio.sleep(1)
    raise ValueError("Task failed!")
 
async def successful_task():
    await asyncio.sleep(2)
    return "success"
 
async def main():
    try:
        results = await asyncio.gather(
            failing_task(),
            successful_task()
        )
    except ValueError as e:
        print(f"Error caught: {e}")
        # The successful_task is cancelled
 
asyncio.run(main())

성공적인 결과와 함께 예외를 수집하려면 return_exceptions=True를 사용하세요:

async def main():
    results = await asyncio.gather(
        failing_task(),
        successful_task(),
        return_exceptions=True
    )
 
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"Task {i} failed: {result}")
        else:
            print(f"Task {i} succeeded: {result}")
 
asyncio.run(main())

리스트 컴프리헨션으로 동적 gather

gather()는 동적으로 생성된 코루틴과 완벽하게 작동합니다:

async def process_item(item):
    await asyncio.sleep(1)
    return item * 2
 
async def main():
    items = [1, 2, 3, 4, 5]
 
    # Process all items concurrently
    results = await asyncio.gather(
        *[process_item(item) for item in items]
    )
 
    print(results)  # [2, 4, 6, 8, 10]
 
asyncio.run(main())

asyncio.wait()와 asyncio.as_completed()

gather()가 모든 코루틴이 완료될 때까지 기다리는 동안, wait()as_completed()는 완료된 태스크를 처리하는 방법에 대해 더 세밀한 제어를 제공합니다.

asyncio.wait()

wait()는 모든 태스크, 첫 번째 태스크, 또는 첫 번째 예외와 같은 다양한 완료 조건으로 태스크를 기다리게 합니다.

import asyncio
 
async def task(delay, name):
    await asyncio.sleep(delay)
    return f"{name} completed"
 
async def main():
    tasks = [
        asyncio.create_task(task(1, "Task 1")),
        asyncio.create_task(task(2, "Task 2")),
        asyncio.create_task(task(3, "Task 3"))
    ]
 
    # Wait for all tasks to complete
    done, pending = await asyncio.wait(
        tasks,
        return_when=asyncio.ALL_COMPLETED
    )
 
    for task in done:
        print(task.result())
 
asyncio.run(main())

다양한 완료 조건:

async def main():
    tasks = [
        asyncio.create_task(task(1, "Fast")),
        asyncio.create_task(task(3, "Slow")),
        asyncio.create_task(task(2, "Medium"))
    ]
 
    # Return when the first task completes
    done, pending = await asyncio.wait(
        tasks,
        return_when=asyncio.FIRST_COMPLETED
    )
 
    print(f"First completed: {done.pop().result()}")
    print(f"Still pending: {len(pending)} tasks")
 
    # Cancel remaining tasks
    for task in pending:
        task.cancel()
 
asyncio.run(main())

asyncio.as_completed()

as_completed()는 태스크가 완료되는 대로 반환하는 이터레이터를 제공하여, 결과를 사용할 수 있게 되는 즉시 처리할 수 있게 합니다.

import asyncio
 
async def fetch_data(url_id, delay):
    await asyncio.sleep(delay)
    return f"Data from URL {url_id}"
 
async def main():
    tasks = [
        fetch_data(1, 3),
        fetch_data(2, 1),
        fetch_data(3, 2)
    ]
 
    # Process results as they complete
    for coro in asyncio.as_completed(tasks):
        result = await coro
        print(f"Received: {result}")
 
asyncio.run(main())

출력은 완료 순서대로 결과를 보여주며, 제출 순서가 아닙니다:

Received: Data from URL 2
Received: Data from URL 3
Received: Data from URL 1

이는 사용자에게 결과를 가능한 한 빨리 표시하고 싶을 때 특히 유용합니다:

async def search_engine(query, engine_name, delay):
    await asyncio.sleep(delay)
    return f"{engine_name}: Results for '{query}'"
 
async def main():
    query = "python asyncio"
 
    searches = [
        search_engine(query, "Google", 1.5),
        search_engine(query, "Bing", 2.0),
        search_engine(query, "DuckDuckGo", 1.0)
    ]
 
    print("Searching...")
    for search in asyncio.as_completed(searches):
        result = await search
        print(result)  # Display each result as soon as it arrives
 
asyncio.run(main())

asyncio.sleep() vs time.sleep()

이 구분은 비동기 코드의 정확성을 위해 중요합니다.

time.sleep()은 이벤트 루프를 포함한 전체 스레드를 일시 중지하는 블로킹 작업입니다. 이렇게 하면 모든 비동기 태스크가 실행되지 못합니다.

asyncio.sleep()은 현재 태스크만 일시 중지하는 논블로킹 코루틴으로, 다른 태스크가 실행될 수 있게 합니다.

import asyncio
import time
 
async def blocking_example():
    """Bad: Uses time.sleep() - blocks the entire event loop"""
    print("Starting blocking sleep")
    time.sleep(2)  # WRONG: Blocks everything!
    print("Finished blocking sleep")
 
async def non_blocking_example():
    """Good: Uses asyncio.sleep() - allows other tasks to run"""
    print("Starting non-blocking sleep")
    await asyncio.sleep(2)  # CORRECT: Only pauses this task
    print("Finished non-blocking sleep")
 
async def concurrent_task():
    for i in range(3):
        print(f"Concurrent task running: {i}")
        await asyncio.sleep(0.5)
 
async def demo_blocking():
    print("\n=== Blocking example (BAD) ===")
    await asyncio.gather(
        blocking_example(),
        concurrent_task()
    )
 
async def demo_non_blocking():
    print("\n=== Non-blocking example (GOOD) ===")
    await asyncio.gather(
        non_blocking_example(),
        concurrent_task()
    )
 
asyncio.run(demo_blocking())
asyncio.run(demo_non_blocking())

블로킹 예제에서, time.sleep()이 완료될 때까지 동시 태스크가 실행되지 않습니다. 논블로킹 예제에서는 두 태스크가 동시에 실행됩니다.

경험 법칙: 비동기 코드에서 절대 time.sleep()을 사용하지 마세요. 항상 await asyncio.sleep()를 사용하세요.

피할 수 없는 CPU 바운드 작업의 경우, loop.run_in_executor()를 사용하여 별도의 스레드나 프로세스에서 실행하세요:

import asyncio
import time
 
def cpu_intensive_task():
    """Some blocking CPU work"""
    time.sleep(2)  # Simulate heavy computation
    return "CPU task completed"
 
async def main():
    loop = asyncio.get_event_loop()
 
    # Run blocking task in a thread pool
    result = await loop.run_in_executor(None, cpu_intensive_task)
    print(result)
 
asyncio.run(main())

async for와 async with

Python은 비동기 이터러블과 리소스 작업을 위해 for 루프와 컨텍스트 매니저의 비동기 버전을 제공합니다.

비동기 이터레이터 (async for)

비동기 이터레이터는 __aiter__()__anext__() 메서드를 구현하여 가져오는 데 비동기 작업이 필요한 항목들을 순회할 수 있는 객체입니다.

import asyncio
 
class AsyncRange:
    """An async iterator that yields numbers with delays"""
    def __init__(self, start, end):
        self.current = start
        self.end = end
 
    def __aiter__(self):
        return self
 
    async def __anext__(self):
        if self.current >= self.end:
            raise StopAsyncIteration
 
        # Simulate async operation to get next value
        await asyncio.sleep(0.5)
        value = self.current
        self.current += 1
        return value
 
async def main():
    async for number in AsyncRange(1, 5):
        print(f"Got number: {number}")
 
asyncio.run(main())

비동기 데이터베이스 커서를 사용한 실제 예제:

class AsyncDatabaseCursor:
    """Simulated async database cursor"""
    def __init__(self, query):
        self.query = query
        self.results = []
        self.index = 0
 
    async def execute(self):
        # Simulate database query
        await asyncio.sleep(1)
        self.results = [
            {"id": 1, "name": "Alice"},
            {"id": 2, "name": "Bob"},
            {"id": 3, "name": "Charlie"}
        ]
 
    def __aiter__(self):
        return self
 
    async def __anext__(self):
        if self.index >= len(self.results):
            raise StopAsyncIteration
 
        # Simulate fetching next row
        await asyncio.sleep(0.1)
        row = self.results[self.index]
        self.index += 1
        return row
 
async def fetch_users():
    cursor = AsyncDatabaseCursor("SELECT * FROM users")
    await cursor.execute()
 
    async for row in cursor:
        print(f"User: {row['name']}")
 
asyncio.run(fetch_users())

비동기 컨텍스트 매니저 (async with)

비동기 컨텍스트 매니저는 비동기 설정과 정리가 필요한 리소스를 관리하기 위해 __aenter__()__aexit__() 메서드를 구현합니다.

import asyncio
 
class AsyncDatabaseConnection:
    """An async context manager for database connections"""
 
    async def __aenter__(self):
        print("Opening database connection...")
        await asyncio.sleep(1)  # Simulate connection time
        print("Database connection opened")
        return self
 
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("Closing database connection...")
        await asyncio.sleep(0.5)  # Simulate cleanup
        print("Database connection closed")
 
    async def query(self, sql):
        await asyncio.sleep(0.5)
        return f"Results for: {sql}"
 
async def main():
    async with AsyncDatabaseConnection() as db:
        result = await db.query("SELECT * FROM users")
        print(result)
    # Connection automatically closed after the with block
 
asyncio.run(main())

비동기 컨텍스트 매니저와 비동기 이터레이터 결합:

class AsyncFileReader:
    """Async context manager and iterator for reading files"""
 
    def __init__(self, filename):
        self.filename = filename
        self.lines = []
        self.index = 0
 
    async def __aenter__(self):
        # Simulate async file opening
        await asyncio.sleep(0.5)
        self.lines = ["Line 1", "Line 2", "Line 3"]
        return self
 
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await asyncio.sleep(0.2)
        self.lines = []
 
    def __aiter__(self):
        return self
 
    async def __anext__(self):
        if self.index >= len(self.lines):
            raise StopAsyncIteration
 
        await asyncio.sleep(0.1)
        line = self.lines[self.index]
        self.index += 1
        return line
 
async def read_file():
    async with AsyncFileReader("data.txt") as reader:
        async for line in reader:
            print(line)
 
asyncio.run(read_file())

asyncio.Queue로 프로듀서-컨슈머 패턴 구현하기

asyncio.Queue는 프로듀서와 컨슈머 코루틴 간 작업 조정을 위해 완벽한 스레드 안전하고 비동기 인식 큐입니다.

import asyncio
import random
 
async def producer(queue, producer_id):
    """Produces items and puts them in the queue"""
    for i in range(5):
        await asyncio.sleep(random.uniform(0.1, 0.5))
        item = f"Item-{producer_id}-{i}"
        await queue.put(item)
        print(f"Producer {producer_id} produced: {item}")
 
async def consumer(queue, consumer_id):
    """Consumes items from the queue"""
    while True:
        item = await queue.get()
        print(f"Consumer {consumer_id} consuming: {item}")
 
        # Simulate processing time
        await asyncio.sleep(random.uniform(0.2, 0.8))
 
        queue.task_done()
 
async def main():
    queue = asyncio.Queue(maxsize=10)
 
    # Create producers and consumers
    producers = [asyncio.create_task(producer(queue, i)) for i in range(2)]
    consumers = [asyncio.create_task(consumer(queue, i)) for i in range(3)]
 
    # Wait for all producers to finish
    await asyncio.gather(*producers)
 
    # Wait for the queue to be fully processed
    await queue.join()
 
    # Cancel consumers (they run forever)
    for consumer_task in consumers:
        consumer_task.cancel()
 
asyncio.run(main())

실제 예제: URL 큐를 사용한 웹 스크레이퍼:

import asyncio
from typing import Set
 
async def fetch_url(session, url):
    """Simulate fetching a URL"""
    await asyncio.sleep(1)
    return f"Content from {url}"
 
async def producer(queue: asyncio.Queue, start_urls: list):
    """Add URLs to the queue"""
    for url in start_urls:
        await queue.put(url)
 
async def consumer(queue: asyncio.Queue, visited: Set[str]):
    """Fetch URLs from the queue"""
    while True:
        url = await queue.get()
 
        if url in visited:
            queue.task_done()
            continue
 
        visited.add(url)
        print(f"Scraping: {url}")
 
        # Simulate fetching
        content = await fetch_url(None, url)
 
        # Simulate finding new URLs in the content
        # In real scraper, you'd parse HTML and extract links
        new_urls = []  # Would extract from content
 
        for new_url in new_urls:
            if new_url not in visited:
                await queue.put(new_url)
 
        queue.task_done()
 
async def main():
    queue = asyncio.Queue()
    visited = set()
 
    start_urls = [
        "https://example.com/page1",
        "https://example.com/page2",
        "https://example.com/page3"
    ]
 
    # Add initial URLs
    await producer(queue, start_urls)
 
    # Create consumers
    consumers = [
        asyncio.create_task(consumer(queue, visited))
        for _ in range(3)
    ]
 
    # Wait for all URLs to be processed
    await queue.join()
 
    # Cancel consumers
    for consumer_task in consumers:
        consumer_task.cancel()
 
    print(f"Scraped {len(visited)} unique URLs")
 
asyncio.run(main())

레이트 리미팅을 위한 세마포어(Semaphores)

asyncio.Semaphore는 동시에 리소스에 접근할 수 있는 코루틴의 수를 제어합니다. 이는 API 호출의 레이트 리미팅이나 동시 데이터베이스 연결 수 제한에 필수적입니다.

import asyncio
import time
 
async def call_api(semaphore, api_id):
    """Make an API call with rate limiting"""
    async with semaphore:
        print(f"API call {api_id} started")
        await asyncio.sleep(1)  # Simulate API call
        print(f"API call {api_id} completed")
        return f"Result {api_id}"
 
async def main():
    # Allow only 3 concurrent API calls
    semaphore = asyncio.Semaphore(3)
 
    start = time.time()
 
    # Create 10 API calls
    tasks = [call_api(semaphore, i) for i in range(10)]
    results = await asyncio.gather(*tasks)
 
    elapsed = time.time() - start
    print(f"\nCompleted {len(results)} API calls in {elapsed:.2f} seconds")
    # With semaphore(3): ~4 seconds (10 calls in batches of 3)
    # Without semaphore: ~1 second (all concurrent)
 
asyncio.run(main())

API 쿼터 준수를 위한 레이트 리미팅:

import asyncio
from datetime import datetime
 
class RateLimiter:
    """Rate limiter that allows N requests per time period"""
 
    def __init__(self, max_requests, time_period):
        self.max_requests = max_requests
        self.time_period = time_period
        self.semaphore = asyncio.Semaphore(max_requests)
        self.request_times = []
 
    async def __aenter__(self):
        await self.semaphore.acquire()
 
        # Wait if we've hit the rate limit
        while len(self.request_times) >= self.max_requests:
            oldest = self.request_times[0]
            elapsed = datetime.now().timestamp() - oldest
 
            if elapsed < self.time_period:
                sleep_time = self.time_period - elapsed
                await asyncio.sleep(sleep_time)
 
            self.request_times.pop(0)
 
        self.request_times.append(datetime.now().timestamp())
        return self
 
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        self.semaphore.release()
 
async def fetch_with_rate_limit(rate_limiter, item_id):
    async with rate_limiter:
        print(f"Fetching item {item_id} at {datetime.now()}")
        await asyncio.sleep(0.5)
        return f"Item {item_id}"
 
async def main():
    # Allow 5 requests per 2 seconds
    rate_limiter = RateLimiter(max_requests=5, time_period=2)
 
    # Make 15 requests
    tasks = [fetch_with_rate_limit(rate_limiter, i) for i in range(15)]
    results = await asyncio.gather(*tasks)
 
    print(f"Completed {len(results)} requests")
 
asyncio.run(main())

비동기 HTTP 요청을 위한 aiohttp

aiohttp 라이브러리는 비동기 HTTP 클라이언트와 서버 기능을 제공합니다. 비동기 코드에서 HTTP 요청을 할 때 표준 선택입니다.

import asyncio
import aiohttp
 
async def fetch_url(session, url):
    """Fetch a single URL"""
    async with session.get(url) as response:
        return await response.text()
 
async def main():
    async with aiohttp.ClientSession() as session:
        html = await fetch_url(session, "https://api.github.com")
        print(f"Fetched {len(html)} characters")
 
asyncio.run(main())

여러 URL을 동시에 가져오기:

import asyncio
import aiohttp
import time
 
async def fetch_url(session, url):
    """Fetch URL and return status code and content length"""
    async with session.get(url) as response:
        content = await response.text()
        return {
            "url": url,
            "status": response.status,
            "length": len(content)
        }
 
async def fetch_all_urls(urls):
    """Fetch multiple URLs concurrently"""
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        return results
 
async def main():
    urls = [
        "https://api.github.com",
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/2",
        "https://httpbin.org/status/200",
        "https://httpbin.org/status/404"
    ]
 
    start = time.time()
    results = await fetch_all_urls(urls)
    elapsed = time.time() - start
 
    for result in results:
        if isinstance(result, Exception):
            print(f"Error: {result}")
        else:
            print(f"{result['url']}: {result['status']} ({result['length']} bytes)")
 
    print(f"\nFetched {len(urls)} URLs in {elapsed:.2f} seconds")
 
asyncio.run(main())

에러 처리와 재시도가 포함된 실제 예제:

import asyncio
import aiohttp
from typing import Optional
 
async def fetch_with_retry(
    session: aiohttp.ClientSession,
    url: str,
    max_retries: int = 3
) -> Optional[dict]:
    """Fetch URL with retry logic"""
 
    for attempt in range(max_retries):
        try:
            async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
                if response.status == 200:
                    data = await response.json()
                    return data
                elif response.status == 404:
                    print(f"URL not found: {url}")
                    return None
                else:
                    print(f"Unexpected status {response.status} for {url}")
 
        except asyncio.TimeoutError:
            print(f"Timeout on attempt {attempt + 1} for {url}")
 
        except aiohttp.ClientError as e:
            print(f"Client error on attempt {attempt + 1} for {url}: {e}")
 
        if attempt < max_retries - 1:
            await asyncio.sleep(2 ** attempt)  # Exponential backoff
 
    print(f"Failed to fetch {url} after {max_retries} attempts")
    return None
 
async def main():
    urls = [
        "https://api.github.com/users/github",
        "https://api.github.com/users/nonexistent",
        "https://httpbin.org/delay/5"
    ]
 
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_with_retry(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
 
        successful = [r for r in results if r is not None]
        print(f"\nSuccessfully fetched {len(successful)}/{len(urls)} URLs")
 
asyncio.run(main())

비동기 파일 I/O를 위한 aiofiles

aiofiles 라이브러리는 파일 읽기와 쓰기 중 블로킹을 방지하는 비동기 파일 작업을 제공합니다.

import asyncio
import aiofiles
 
async def read_file(filename):
    """Read file asynchronously"""
    async with aiofiles.open(filename, mode='r') as f:
        contents = await f.read()
        return contents
 
async def write_file(filename, content):
    """Write file asynchronously"""
    async with aiofiles.open(filename, mode='w') as f:
        await f.write(content)
 
async def main():
    # Write data
    await write_file('test.txt', 'Hello, async file I/O!')
 
    # Read data
    content = await read_file('test.txt')
    print(f"Read: {content}")
 
asyncio.run(main())

여러 파일을 동시에 처리하기:

import asyncio
import aiofiles
 
async def process_file(input_file, output_file):
    """Read, process, and write a file"""
    async with aiofiles.open(input_file, mode='r') as f:
        content = await f.read()
 
    # Process content (convert to uppercase)
    processed = content.upper()
 
    async with aiofiles.open(output_file, mode='w') as f:
        await f.write(processed)
 
    return f"Processed {input_file} -> {output_file}"
 
async def main():
    files = [
        ('input1.txt', 'output1.txt'),
        ('input2.txt', 'output2.txt'),
        ('input3.txt', 'output3.txt')
    ]
 
    tasks = [process_file(inp, out) for inp, out in files]
    results = await asyncio.gather(*tasks)
 
    for result in results:
        print(result)
 
asyncio.run(main())

대용량 파일을 한 줄씩 읽기:

import asyncio
import aiofiles
 
async def process_large_file(filename):
    """Process a large file line by line without loading it all into memory"""
    line_count = 0
 
    async with aiofiles.open(filename, mode='r') as f:
        async for line in f:
            # Process each line
            line_count += 1
            if line.strip():
                # Do something with the line
                pass
 
    return line_count
 
async def main():
    count = await process_large_file('large_data.txt')
    print(f"Processed {count} lines")
 
asyncio.run(main())

비동기 코드에서의 에러 처리

비동기 코드에서의 에러 처리는 예외가 적절히 잡히고 리소스가 정리되도록 특별한 주의가 필요합니다.

import asyncio
 
async def risky_operation(item_id):
    """An operation that might fail"""
    await asyncio.sleep(1)
 
    if item_id % 2 == 0:
        raise ValueError(f"Item {item_id} caused an error")
 
    return f"Item {item_id} processed"
 
async def handle_with_try_except():
    """Handle errors with try/except"""
    try:
        result = await risky_operation(2)
        print(result)
    except ValueError as e:
        print(f"Error caught: {e}")
 
asyncio.run(handle_with_try_except())

동시 태스크에서 에러 처리하기:

async def safe_operation(item_id):
    """Wrapper that catches errors from risky_operation"""
    try:
        result = await risky_operation(item_id)
        return {"success": True, "result": result}
    except Exception as e:
        return {"success": False, "error": str(e), "item_id": item_id}
 
async def main():
    tasks = [safe_operation(i) for i in range(5)]
    results = await asyncio.gather(*tasks)
 
    for result in results:
        if result["success"]:
            print(f"Success: {result['result']}")
        else:
            print(f"Failed: Item {result['item_id']} - {result['error']}")
 
asyncio.run(main())

return_exceptions=True를 사용한 gather():

async def main():
    tasks = [risky_operation(i) for i in range(5)]
    results = await asyncio.gather(*tasks, return_exceptions=True)
 
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"Task {i} failed: {result}")
        else:
            print(f"Task {i} succeeded: {result}")
 
asyncio.run(main())

비동기 컨텍스트 매니저를 사용한 정리:

class AsyncResource:
    """A resource that needs cleanup even if errors occur"""
 
    async def __aenter__(self):
        print("Acquiring resource")
        await asyncio.sleep(0.5)
        return self
 
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("Cleaning up resource")
        await asyncio.sleep(0.5)
 
        if exc_type is not None:
            print(f"Exception during resource use: {exc_val}")
 
        # Return False to propagate exception, True to suppress
        return False
 
    async def do_work(self):
        await asyncio.sleep(1)
        raise ValueError("Something went wrong")
 
async def main():
    try:
        async with AsyncResource() as resource:
            await resource.do_work()
    except ValueError as e:
        print(f"Caught error: {e}")
 
asyncio.run(main())

성능 벤치마크: I/O 바운드 태스크를 위한 동기 vs 비동기

실제 벤치마크와 함께 I/O 바운드 작업에 대한 동기식과 비동기식 접근 방식을 비교해 봅시다.

import asyncio
import time
import requests
import aiohttp
 
# Synchronous approach
def fetch_sync(url):
    response = requests.get(url)
    return len(response.text)
 
def benchmark_sync(urls):
    start = time.time()
    results = [fetch_sync(url) for url in urls]
    elapsed = time.time() - start
    return elapsed, results
 
# Asynchronous approach
async def fetch_async(session, url):
    async with session.get(url) as response:
        text = await response.text()
        return len(text)
 
async def benchmark_async(urls):
    start = time.time()
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_async(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
    elapsed = time.time() - start
    return elapsed, results
 
# Run benchmarks
urls = [
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/1"
]
 
print("Benchmarking synchronous approach...")
sync_time, sync_results = benchmark_sync(urls)
print(f"Synchronous: {sync_time:.2f} seconds")
 
print("\nBenchmarking asynchronous approach...")
async_time, async_results = asyncio.run(benchmark_async(urls))
print(f"Asynchronous: {async_time:.2f} seconds")
 
print(f"\nSpeedup: {sync_time / async_time:.2f}x")

각각 1초 지연되는 5개 URL에 대한 일반적인 결과:

  • 동기식: ~5초 (순차 실행)
  • 비동기식: ~1초 (동시 실행)
  • 속도 향상: ~5배
URL 수동기식 시간비동기식 시간속도 향상
55.2초1.1초4.7배
1010.4초1.2초8.7배
2020.8초1.4초14.9배
5052.1초2.1초24.8배
100104.5초3.8초27.5배

대역폭 또는 레이트 리미팅 제약에 도달할 때까지 동시 작업 수가 증가함에 따라 속도 향상이 증가합니다.

흔한 함정과 피하는 방법

await를 잊는 것

# WRONG: Coroutine is created but not executed
async def wrong():
    result = fetch_data()  # Missing await!
    print(result)  # Prints coroutine object, not the result
 
# CORRECT: Await the coroutine
async def correct():
    result = await fetch_data()
    print(result)  # Prints actual result

이벤트 루프 블로킹하기

# WRONG: Blocks the entire event loop
async def blocking_code():
    time.sleep(5)  # Blocks everything!
    result = compute_something()  # Blocks if CPU-intensive
    return result
 
# CORRECT: Use asyncio.sleep and run_in_executor
async def non_blocking_code():
    await asyncio.sleep(5)  # Only pauses this task
 
    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(None, compute_something)
    return result

태스크 취소 처리하지 않기

# WRONG: Doesn't handle cancellation
async def incomplete_cleanup():
    resource = await acquire_resource()
    await long_operation()
    await release_resource(resource)  # May not execute if cancelled
 
# CORRECT: Use try/finally or async with
async def proper_cleanup():
    resource = await acquire_resource()
    try:
        await long_operation()
    finally:
        await release_resource(resource)  # Always executes

이벤트 루프 충돌 생성하기

# WRONG: Calling asyncio.run() in an async function
async def wrong_nesting():
    result = asyncio.run(some_coroutine())  # Error!
    return result
 
# CORRECT: Just await the coroutine
async def correct_nesting():
    result = await some_coroutine()
    return result

동시성 제한하지 않기

# WRONG: Launches too many tasks simultaneously
async def unlimited_concurrency(items):
    tasks = [process_item(item) for item in items]  # 10,000 tasks!
    return await asyncio.gather(*tasks)
 
# CORRECT: Use semaphore to limit concurrency
async def limited_concurrency(items):
    semaphore = asyncio.Semaphore(10)  # Max 10 concurrent
 
    async def process_with_limit(item):
        async with semaphore:
            return await process_item(item)
 
    tasks = [process_with_limit(item) for item in items]
    return await asyncio.gather(*tasks)

실제 예제들

레이트 리미팅이 포함된 웹 스크레이핑

import asyncio
import aiohttp
from bs4 import BeautifulSoup
 
class AsyncWebScraper:
    def __init__(self, max_concurrent=5):
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.session = None
 
    async def __aenter__(self):
        self.session = aiohttp.ClientSession()
        return self
 
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.session.close()
 
    async def fetch_page(self, url):
        async with self.semaphore:
            try:
                async with self.session.get(url) as response:
                    return await response.text()
            except Exception as e:
                print(f"Error fetching {url}: {e}")
                return None
 
    async def scrape_urls(self, urls):
        tasks = [self.fetch_page(url) for url in urls]
        return await asyncio.gather(*tasks)
 
async def main():
    urls = [
        "https://example.com/page1",
        "https://example.com/page2",
        "https://example.com/page3"
    ]
 
    async with AsyncWebScraper(max_concurrent=3) as scraper:
        pages = await scraper.scrape_urls(urls)
 
        for url, html in zip(urls, pages):
            if html:
                print(f"Scraped {url}: {len(html)} bytes")
 
asyncio.run(main())

비동기 API 데이터 파이프라인

import asyncio
import aiohttp
 
async def fetch_user_ids(session):
    """Fetch list of user IDs from API"""
    async with session.get("https://api.example.com/users") as response:
        data = await response.json()
        return [user["id"] for user in data["users"]]
 
async def fetch_user_details(session, user_id):
    """Fetch detailed info for a user"""
    async with session.get(f"https://api.example.com/users/{user_id}") as response:
        return await response.json()
 
async def fetch_user_posts(session, user_id):
    """Fetch posts for a user"""
    async with session.get(f"https://api.example.com/users/{user_id}/posts") as response:
        return await response.json()
 
async def process_user(session, user_id):
    """Fetch all data for a user concurrently"""
    details, posts = await asyncio.gather(
        fetch_user_details(session, user_id),
        fetch_user_posts(session, user_id)
    )
 
    return {
        "user": details,
        "posts": posts,
        "post_count": len(posts)
    }
 
async def main():
    async with aiohttp.ClientSession() as session:
        # Fetch user IDs
        user_ids = await fetch_user_ids(session)
 
        # Process all users concurrently
        user_data = await asyncio.gather(
            *[process_user(session, uid) for uid in user_ids[:10]]
        )
 
        for data in user_data:
            print(f"User {data['user']['name']}: {data['post_count']} posts")
 
asyncio.run(main())

비동기 채팅 서버

import asyncio
 
class ChatServer:
    def __init__(self):
        self.clients = set()
 
    async def handle_client(self, reader, writer):
        """Handle a single client connection"""
        addr = writer.get_extra_info('peername')
        print(f"Client connected: {addr}")
 
        self.clients.add(writer)
 
        try:
            while True:
                data = await reader.read(100)
                if not data:
                    break
 
                message = data.decode()
                print(f"Received from {addr}: {message}")
 
                # Broadcast to all clients
                await self.broadcast(f"{addr}: {message}", writer)
 
        except asyncio.CancelledError:
            pass
 
        finally:
            print(f"Client disconnected: {addr}")
            self.clients.remove(writer)
            writer.close()
            await writer.wait_closed()
 
    async def broadcast(self, message, sender):
        """Send message to all clients except sender"""
        for client in self.clients:
            if client != sender:
                try:
                    client.write(message.encode())
                    await client.drain()
                except Exception as e:
                    print(f"Error broadcasting: {e}")
 
    async def start(self, host='127.0.0.1', port=8888):
        """Start the chat server"""
        server = await asyncio.start_server(
            self.handle_client,
            host,
            port
        )
 
        addr = server.sockets[0].getsockname()
        print(f"Chat server running on {addr}")
 
        async with server:
            await server.serve_forever()
 
async def main():
    chat_server = ChatServer()
    await chat_server.start()
 
asyncio.run(main())

Jupyter에서 Asyncio 실험하기

Jupyter 노트북에서 asyncio를 사용할 때 이벤트 루프 충돌이 발생할 수 있습니다. Jupyter는 이미 이벤트 루프를 실행 중이므로 asyncio.run()과 간섭할 수 있습니다.

Jupyter 환경에서 원활한 비동기 실험을 위해 Jupyter 노트북을 위해 특별히 설계된 AI 에이전트인 RunCell (opens in a new tab)을 고려해 보세요. RunCell은 이벤트 루프 관리를 자동으로 처리하고 향상된 비동기 디버깅 기능을 제공하여 충돌 없이 대화식으로 asyncio 패턴을 테스트할 수 있게 합니다.

표준 Jupyter에서는 최상위 await를 사용할 수 있습니다:

# In Jupyter, this works directly
async def fetch_data():
    await asyncio.sleep(1)
    return "data"
 
# Just await directly, no asyncio.run() needed
result = await fetch_data()
print(result)

또는 중첩된 이벤트 루프를 허용하는 nest_asyncio 패키지를 사용하세요:

import nest_asyncio
nest_asyncio.apply()
 
# Now asyncio.run() works in Jupyter
asyncio.run(main())

FAQ

Python에서 asyncio와 threading의 차이점은 무엇인가요?

Asyncio는 await를 사용하여 태스크가 자발적으로 제어권을 양보하는 단일 스레드에서 협력적 멀티태스킹을 사용합니다. Threading은 OS가 스레드 간 전환 시기를 결정하는 선점적 멀티태스킹을 사용하는 여러 OS 스레드를 사용합니다. Asyncio는 메모리 오버헤드가 더 낮고 레이스 컨디션 위험이 없어 I/O 바운드 태스크에 더 효율적이며, threading은 비동기를 지원하지 않는 블로킹 라이브러리를 처리할 수 있습니다. 둘 다 CPU 바운드 작업에서는 Python의 GIL에 의해 제한되지만, asyncio는 스레드 컨텍스트 스위칭 오버헤드를 피합니다.

언제 asyncio 대신 multiprocessing을 사용해야 하나요?

API 호출, 데이터베이스 쿼리, 파일 작업, 네트워크 통신과 같은 I/O 바운드 태스크에는 asyncio를 사용하세요. 데이터 처리, 수학 계산, 이미지 조작, 머신 러닝 모델 학습과 같은 CPU 바운드 태스크에는 multiprocessing을 사용하세요. Asyncio는 최소한의 리소스 오버헤드로 수천 개의 동시 I/O 작업을 처리하는 데 탁월하며, multiprocessing은 여러 CPU 코어에서 실제 병렬 계산을 제공합니다.

같은 애플리케이션에서 비동기와 동기 코드를 혼합할 수 있나요?

네, 하지만 신중한 계획이 필요합니다. 이미 실행 중인 이벤트 루프 내부가 아닌 한 asyncio.run()을 사용하여 동기 코드에서 비동기 함수를 호출할 수 있습니다. 비동기 코드에서 동기 블로킹 함수를 호출하려면 loop.run_in_executor()를 사용하여 스레드 풀에서 실행하고 이벤트 루프가 블로킹되지 않도록 하세요. 절대 time.sleep(), requests.get() 또는 동기 파일 I/O와 같은 블로킹 작업을 비동기 함수에서 직접 사용하지 마세요. 대신 asyncio.sleep(), aiohttp, aiofiles와 같은 비동기 대안을 사용하세요.

비동기 코드를 효과적으로 디버깅하려면 어떻게 해야 하나요?

asyncio.run(main(), debug=True)를 사용하거나 환경 변수 PYTHONASYNCIODEBUG=1을 설정하여 asyncio 디버그 모드를 활성화하세요. 이렇게 하면 await를 잊거나 코루틴이 await되지 않은 것과 같은 일반적인 실수를 감지할 수 있습니다. 전통적인 디버거가 비동기 코드에서 혼란스러울 수 있으므로 실행 흐름을 추적하기 위해 로깅을 광범위하게 사용하세요. 더 명확한 에러 메시지를 위해 asyncio.create_task(coro(), name="task-name")으로 태스크 이름을 추가하세요. 하나의 실패한 태스크가 다른 태스크의 에러를 숨기지 않도록 asyncio.gather(..., return_exceptions=True)를 사용하세요. 완료되지 않는 태스크를 확인하기 위해 asyncio.all_tasks()로 이벤트 루프를 모니터링하세요.

asyncio의 성능 한계는 무엇인가요?

Asyncio는 단일 스레드에서 수만 개의 동시 I/O 작업을 처리할 수 있어 threading 한계를 훨씬 능가합니다. 주요 제약은 단일 스레드를 사용하므로 CPU 바운드 작업에는 이점이 없다는 것입니다. 동기 I/O나 무거운 계산으로 이벤트 루프를 블로킹하면 성능이 저하됩니다. Asyncio의 동시성 한계에 도달하기 전에 네트워크 대역폭과 API 레이트 리밋이 병목 현상이 됩니다. 메모리 사용량은 동시 태스크 수에 따라 확장되지만, 각 태스크는 스레드에 비해 최소한의 오버헤드를 가집니다. 최대 성능을 위해 I/O 동시성에는 asyncio를, CPU 바운드 작업에는 multiprocessing을 결합하고, 항상 세마포어를 사용하여 동시 작업을 적절한 수준으로 제한하세요.

결론

Python asyncio는 느리고 블로킹되는 작업에서 빠르고 동시적인 시스템으로 I/O 바운드 애플리케이션을 변환합니다. async/await 구문을 마스터하고, 이벤트 루프를 이해하고, gather(), create_task(), 세마포어와 같은 도구를 활용함으로써 수천 개의 동시 작업을 효율적으로 처리하는 애플리케이션을 구축할 수 있습니다.

asyncio로 성공하는 핵심은 적합한 도구인지 인식하는 것입니다. 네트워크 요청, 데이터베이스 쿼리, 파일 작업 및 외부 리소스를 기다리는 데 시간을 보내는 모든 태스크에 사용하세요. 동기 작업으로 이벤트 루프를 블로킹하지 않고, 코루틴에는 항상 await를 사용하고, 필요할 때 세마포어로 동시성을 제한하세요.

코드베이스의 작은 섹션을 비동기로 변환하는 것부터 시작하여 성능 향상을 측정하고 점진적으로 확장하세요. I/O 집약적인 애플리케이션에서 극적인 속도 향상은 학습 투자가 가치 있게 만듭니다.

📚