Skip to content

Python Threading: 예제를 통한 멀티스레딩 완벽 가이드

Updated on

당신의 Python 프로그램이 50개의 API 호출을 하나씩 순차적으로 실행합니다. 각 호출은 200밀리초의 대기 시간이 소요됩니다. 계산은 냉정합니다: 10초라는 시간이 네트워크 응답을 기다리며 낭비되는 것입니다. 당신의 CPU는 거의 0%에 가까운 사용률로 유휴 상태로 앉아 있고, 동시에 실행될 수 있는 I/O-bound 작업들을 당신의 스크립트가 순차적으로 처리하며 기어가고 있습니다.

이 문제는 금방 심각해집니다. 수천 개의 페이지를 순차적으로 가져오는 웹 스크래퍼. 한 번에 하나씩 파일을 읽고 쓰는 파일 처리 스크립트. 결과를 기다리는 동안 전체 애플리케이션을 블로킹하는 데이터베이스 쿼리. 유휴 대기의 매 초는 당신의 프로그램이 유용한 작업을 할 수 있는 시간입니다.

Python의 threading 모듈은 단일 프로세스 내에서 여러 작업을 동시에 실행하여 이 문제를 해결합니다. 스레드는 메모리를 공유하고, 빠르게 시작되며, 프로그램이 대부분의 시간을 기다리는 데 소비하는 I/O-bound 작업에 탁월합니다. 이 가이드는 기본적인 스레드 생성부터 고급 동기화 패턴까지 모든 것을 다루며, 즉시 사용할 수 있는 프로덕션 준비된 코드 예제를 제공합니다.

📚

Python에서 Threading이란?

Threading은 프로그램이 동일한 프로세스 내에서 여러 작업을 동시에 실행할 수 있게 합니다. 각 스레드는 동일한 메모리 공간을 공유하므로 스레드 간 통신이 빠르고 간단합니다.

Python의 threading 모듈은 스레드를 생성하고 관리하기 위한 고수준 인터페이스를 제공합니다. 하지만 중요한 주의사항이 있습니다: Global Interpreter Lock(GIL)입니다.

Global Interpreter Lock (GIL)

GIL은 CPython에서 한 번에 하나의 스레드만 Python 바이트코드를 실행할 수 있도록 허용하는 뮤텍스입니다. 이는 CPU-bound 작업에 대해 스레드가 진정한 병렬 처리를 달성할 수 없음을 의미합니다. 그러나 GIL은 I/O 작업(네트워크 호출, 파일 읽기, 데이터베이스 쿼리) 중에 해제되어 한 스레드가 I/O를 기다리는 동안 다른 스레드가 실행될 수 있게 합니다.

import threading
import time
 
def cpu_bound(n):
    """CPU-bound: GIL이 병렬 실행을 방지"""
    total = 0
    for i in range(n):
        total += i * i
    return total
 
def io_bound(url):
    """I/O-bound: GIL이 네트워크 대기 중 해제"""
    import urllib.request
    return urllib.request.urlopen(url).read()
 
# CPU-bound: 4개의 스레드가 한 번에 하나씩 실행 (속도 향상 없음)
start = time.time()
threads = [threading.Thread(target=cpu_bound, args=(10_000_000,)) for _ in range(4)]
for t in threads: t.start()
for t in threads: t.join()
print(f"CPU-bound with threads: {time.time() - start:.2f}s")
 
# I/O-bound: 4개의 스레드가 대기 시간을 겹쳐서 실행 (큰 속도 향상)

이는 threading이 I/O-bound 작업에는 이상적이지만 CPU 집약적 계산에는 적합하지 않음을 의미합니다. CPU-bound 작업의 경우 대신 multiprocessing 모듈을 사용하세요.

Threading vs Multiprocessing vs Asyncio 사용 시기

기능threadingmultiprocessingasyncio
최적의 용도I/O-bound 작업CPU-bound 작업고동시성 I/O
병렬성동시 실행 (GIL 제한)진정한 병렬동시 실행 (단일 스레드)
메모리공유 (가벼움)프로세스별 분리공유 (가벼움)
시작 비용낮음 (~1ms)높음 (~50-100ms)매우 낮음
통신직접 메모리 접근파이프, 큐, 공유 메모리Awaitable 코루틴
확장성수십~수백 개의 스레드CPU 코어 수 제한수천 개의 코루틴
복잡성중간 (락 필요)중간 (직렬화)높음 (async/await 구문)
사용 사례웹 스크래핑, 파일 I/O, API 호출데이터 처리, ML 트레이닝웹 서버, 채팅 앱

경험 법칙: 프로그램이 네트워크나 디스크를 기다린다면 threading을 사용하세요. 숫자를 계산한다면 multiprocessing을 사용하세요. 수천 개의 동시 연결이 필요하다면 asyncio를 사용하세요.

Thread 기초: 스레드 생성 및 실행

threading.Thread 클래스

스레드를 생성하는 가장 간단한 방법은 threading.Thread에 target 함수를 전달하는 것입니다:

import threading
import time
 
def download_file(filename):
    print(f"[{threading.current_thread().name}] Downloading {filename}...")
    time.sleep(2)  # 다운로드 시뮬레이션
    print(f"[{threading.current_thread().name}] Finished {filename}")
 
# 스레드 생성
t1 = threading.Thread(target=download_file, args=("data.csv",))
t2 = threading.Thread(target=download_file, args=("report.pdf",))
 
# 스레드 시작
t1.start()
t2.start()
 
# 둘 다 끝날 때까지 대기
t1.join()
t2.join()
 
print("All downloads complete")

두 다운로드가 동시에 실행되어 4초 대신 대략 2초 만에 완료됩니다.

start()와 join()

  • start()는 스레드 실행을 시작합니다. 스레드는 한 번만 시작할 수 있습니다.
  • join(timeout=None)은 대상 스레드가 끝날 때까지 호출한 스레드를 블로킹합니다. 영원히 기다리지 않으려면 timeout에 초 단위로 전달하세요.
import threading
import time
 
def slow_task():
    time.sleep(10)
 
t = threading.Thread(target=slow_task)
t.start()
 
# 최대 3초 대기
t.join(timeout=3)
 
if t.is_alive():
    print("Thread still running after 3 seconds")
else:
    print("Thread finished")

스레드 이름 지정

명명된 스레드는 디버깅을 더 쉽게 만듭니다:

import threading
 
def worker():
    name = threading.current_thread().name
    print(f"Running in thread: {name}")
 
t = threading.Thread(target=worker, name="DataProcessor")
t.start()
t.join()

Daemon 스레드

Daemon 스레드는 메인 프로그램이 종료하면 자동으로 종료되는 백그라운드 스레드입니다. Non-daemon 스레드는 끝날 때까지 프로그램을 계속 실행 상태로 유지합니다.

import threading
import time
 
def background_monitor():
    while True:
        print("Monitoring system health...")
        time.sleep(5)
 
# Daemon 스레드: 메인 프로그램 종료 시 종료됨
monitor = threading.Thread(target=background_monitor, daemon=True)
monitor.start()
 
# 메인 프로그램이 작업 수행
time.sleep(12)
print("Main program exiting")
# monitor 스레드는 자동으로 종료됨

Daemon 스레드는 프로그램 종료를 막지 않아야 하는 백그라운드 로깅, 모니터링, 또는 정리 작업에 사용하세요.

Thread 서브클래싱

더 복잡한 스레드 동작을 위해 threading.Thread를 서브클래싱하세요:

import threading
import time
 
class FileProcessor(threading.Thread):
    def __init__(self, filepath):
        super().__init__()
        self.filepath = filepath
        self.result = None
 
    def run(self):
        """run()을 스레드 로직으로 오버라이드"""
        print(f"Processing {self.filepath}")
        time.sleep(1)  # 작업 시뮬레이션
        self.result = f"Processed: {self.filepath}"
 
# 생성 및 실행
processor = FileProcessor("/data/report.csv")
processor.start()
processor.join()
print(processor.result)

스레드에 인자 전달하기

args와 kwargs 사용

위치 인자는 args(튜플)로, 키워드 인자는 kwargs(딕셔너리)로 전달합니다:

import threading
 
def fetch_data(url, timeout, retries=3, verbose=False):
    print(f"Fetching {url} (timeout={timeout}s, retries={retries}, verbose={verbose})")
 
# 튜플로 위치 인자 전달
t1 = threading.Thread(target=fetch_data, args=("https://api.example.com", 30))
 
# 딕셔너리로 키워드 인자 전달
t2 = threading.Thread(
    target=fetch_data,
    args=("https://api.example.com",),
    kwargs={"timeout": 30, "retries": 5, "verbose": True}
)
 
t1.start()
t2.start()
t1.join()
t2.join()

흔한 실수: 단일 요소 튜플에서 뒤에 콤마를 잊는 것. args=("hello",)는 튜플입니다; args=("hello")는 괄호 안의 문자열일 뿐입니다.

스레드에서 결과 수집하기

스레드는 직접적으로 값을 반환하지 않습니다. 공유 데이터 구조나 리스트를 사용하여 결과를 수집하세요:

import threading
 
results = {}
lock = threading.Lock()
 
def compute(task_id, value):
    result = value ** 2
    with lock:
        results[task_id] = result
 
threads = []
for i in range(5):
    t = threading.Thread(target=compute, args=(i, i * 10))
    threads.append(t)
    t.start()
 
for t in threads:
    t.join()
 
print(results)  # {0: 0, 1: 100, 2: 400, 3: 900, 4: 1600}

더 깔끔한 접근 방식은 결과 수집을 자동으로 처리하는 ThreadPoolExecutor(다음에 설명)를 사용하는 것입니다.

ThreadPoolExecutor: 현대적인 접근 방식

concurrent.futures 모듈은 ThreadPoolExecutor를 제공하며, 이는 워커 스레드 풀을 관리하는 고수준 인터페이스입니다. 스레드 생성, 결과 수집, 예외 전파를 자동으로 처리합니다.

submit()을 사용한 기본 사용법

from concurrent.futures import ThreadPoolExecutor, as_completed
import time
 
def fetch_url(url):
    time.sleep(1)  # 네트워크 요청 시뮬레이션
    return f"Content from {url}"
 
urls = [
    "https://example.com/page1",
    "https://example.com/page2",
    "https://example.com/page3",
    "https://example.com/page4",
    "https://example.com/page5",
]
 
with ThreadPoolExecutor(max_workers=3) as executor:
    # 작업 제출하고 Future 객체 받기
    future_to_url = {executor.submit(fetch_url, url): url for url in urls}
 
    # 완료되는 대로 결과 처리
    for future in as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
            print(f"{url}: {data}")
        except Exception as e:
            print(f"{url} generated an exception: {e}")

순서가 있는 결과를 위한 map() 사용

executor.map()은 내장 map()과 유사하게 입력과 동일한 순서로 결과를 반환합니다:

from concurrent.futures import ThreadPoolExecutor
 
def process_item(item):
    return item.upper()
 
items = ["apple", "banana", "cherry", "date"]
 
with ThreadPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(process_item, items))
 
print(results)  # ['APPLE', 'BANANA', 'CHERRY', 'DATE']

submit() vs map()

submit()map()
반환Future 객체결과의 이터레이터
결과 순서완료 순서 (as_completed와 함께)입력 순서
에러 처리future.result()를 통한 작업별첫 번째 실패 시 예외 발생
인자단일 함수 호출각 항목에 함수 적용
최적의 용도이종 작업, 조기 결과동종 배치 처리

Future를 사용한 예외 처리

from concurrent.futures import ThreadPoolExecutor, as_completed
 
def risky_task(n):
    if n == 3:
        raise ValueError(f"Bad input: {n}")
    return n * 10
 
with ThreadPoolExecutor(max_workers=4) as executor:
    futures = {executor.submit(risky_task, i): i for i in range(5)}
 
    for future in as_completed(futures):
        task_id = futures[future]
        try:
            result = future.result(timeout=5)
            print(f"Task {task_id}: {result}")
        except ValueError as e:
            print(f"Task {task_id} failed: {e}")
        except TimeoutError:
            print(f"Task {task_id} timed out")

작업 취소하기

from concurrent.futures import ThreadPoolExecutor
import time
 
def long_task(n):
    time.sleep(5)
    return n
 
with ThreadPoolExecutor(max_workers=2) as executor:
    futures = [executor.submit(long_task, i) for i in range(10)]
 
    # 보류 중인 작업 취소 (이미 실행 중인 작업은 취소할 수 없음)
    for f in futures[4:]:
        cancelled = f.cancel()
        print(f"Cancelled: {cancelled}")

스레드 동기화 기본 요소

여러 스레드가 공유 데이터에 접근할 때 경쟁 조건을 방지하려면 동기화가 필요합니다.

Lock

Lock은 한 번에 하나의 스레드만 임계 영역에 들어갈 수 있도록 보장합니다:

import threading
 
class BankAccount:
    def __init__(self, balance):
        self.balance = balance
        self.lock = threading.Lock()
 
    def withdraw(self, amount):
        with self.lock:  # 한 번에 하나의 스레드만
            if self.balance >= amount:
                self.balance -= amount
                return True
            return False
 
    def deposit(self, amount):
        with self.lock:
            self.balance += amount
 
account = BankAccount(1000)
 
def make_transactions():
    for _ in range(100):
        account.deposit(10)
        account.withdraw(10)
 
threads = [threading.Thread(target=make_transactions) for _ in range(10)]
for t in threads: t.start()
for t in threads: t.join()
 
print(f"Final balance: {account.balance}")  # 항상 1000

락 없이는 동시 읽기와 쓰기가 잘못된 결과를 생성합니다(경쟁 조건).

RLock (재진입 가능한 락)

RLock은 동일한 스레드에서 여러 번 획득할 수 있습니다. 이는 락을 보유한 함수가 동일한 락이 필요한 다른 함수를 호출할 때 데드락을 방지합니다:

import threading
 
class SafeCache:
    def __init__(self):
        self._data = {}
        self._lock = threading.RLock()
 
    def get(self, key):
        with self._lock:
            return self._data.get(key)
 
    def set(self, key, value):
        with self._lock:
            self._data[key] = value
 
    def get_or_set(self, key, default):
        with self._lock:
            # 이는 get()을 호출하는데, 이것도 _lock을 획득함
            # RLock은 이를 허용함; 일반 Lock은 데드락을 일으킬 것임
            existing = self.get(key)
            if existing is None:
                self.set(key, default)
                return default
            return existing

Semaphore

Semaphore는 고정된 수의 스레드가 동시에 리소스에 접근하도록 허용합니다:

import threading
import time
 
# 최대 3개의 동시 데이터베이스 연결 허용
db_semaphore = threading.Semaphore(3)
 
def query_database(query_id):
    with db_semaphore:
        print(f"Query {query_id}: connected (active connections: {3 - db_semaphore._value})")
        time.sleep(2)  # 쿼리 시뮬레이션
        print(f"Query {query_id}: done")
 
threads = [threading.Thread(target=query_database, args=(i,)) for i in range(10)]
for t in threads: t.start()
for t in threads: t.join()

Event

Event는 한 스레드가 다른 대기 중인 스레드에게 신호를 보낼 수 있게 합니다:

import threading
import time
 
data_ready = threading.Event()
shared_data = []
 
def producer():
    print("Producer: preparing data...")
    time.sleep(3)
    shared_data.extend([1, 2, 3, 4, 5])
    print("Producer: data ready, signaling consumers")
    data_ready.set()
 
def consumer(name):
    print(f"Consumer {name}: waiting for data...")
    data_ready.wait()  # 이벤트가 설정될 때까지 블로킹
    print(f"Consumer {name}: got data = {shared_data}")
 
threads = [
    threading.Thread(target=producer),
    threading.Thread(target=consumer, args=("A",)),
    threading.Thread(target=consumer, args=("B",)),
]
for t in threads: t.start()
for t in threads: t.join()

Condition

Condition은 락과 알림 대기 능력을 결합합니다. 이는 생산자-소비자 패턴의 기초입니다:

import threading
import time
import random
 
buffer = []
MAX_SIZE = 5
condition = threading.Condition()
 
def producer():
    for i in range(20):
        with condition:
            while len(buffer) >= MAX_SIZE:
                condition.wait()  # 공간이 있을 때까지 대기
            item = random.randint(1, 100)
            buffer.append(item)
            print(f"Produced: {item} (buffer size: {len(buffer)})")
            condition.notify_all()
        time.sleep(0.1)
 
def consumer(name):
    for _ in range(10):
        with condition:
            while len(buffer) == 0:
                condition.wait()  # 항목이 있을 때까지 대기
            item = buffer.pop(0)
            print(f"Consumer {name} consumed: {item} (buffer size: {len(buffer)})")
            condition.notify_all()
        time.sleep(0.15)
 
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer, args=("X",))
t3 = threading.Thread(target=consumer, args=("Y",))
t1.start(); t2.start(); t3.start()
t1.join(); t2.join(); t3.join()

동기화 기본 요소 요약

기본 요소목적사용 시기
Lock상호 배제공유 가변 상태 보호
RLock재진입 가능한 뮤텍스동일한 스레드의 중첩 락킹
Semaphore동시성 제한속도 제한, 연결 풀
Event일회성 신호초기화 완료, 종료 신호
Condition대기/알림 패턴생산자-소비자, 상태 변경
BarrierN개 스레드 동기화모든 스레드가 계속하기 전에 특정 지점에 도달해야 할 때

스레드 안전한 데이터 구조

queue.Queue

queue.Queue는 가장 기본적인 스레드 안전한 데이터 구조입니다. 모든 락킹을 내부적으로 처리합니다:

import threading
import queue
import time
 
task_queue = queue.Queue()
results = queue.Queue()
 
def worker():
    while True:
        item = task_queue.get()  # 항목이 있을 때까지 블로킹
        if item is None:
            break
        result = item ** 2
        results.put(result)
        task_queue.task_done()
 
# 4개의 워커 시작
workers = []
for _ in range(4):
    t = threading.Thread(target=worker, daemon=True)
    t.start()
    workers.append(t)
 
# 작업 제출
for i in range(20):
    task_queue.put(i)
 
# 모든 작업 완료 대기
task_queue.join()
 
# 워커 중지
for _ in range(4):
    task_queue.put(None)
for w in workers:
    w.join()
 
# 결과 수집
all_results = []
while not results.empty():
    all_results.append(results.get())
print(f"Results: {sorted(all_results)}")

queue.Queue는 다음도 지원합니다:

  • Queue(maxsize=10): 가득 차면 put() 블로킹
  • PriorityQueue(): 우선순위별로 정렬된 항목
  • LifoQueue(): 후입선출 (스택 동작)

collections.deque

collections.dequeappend()popleft() 작업에 대해 스레드 안전합니다(CPython에서 C 수준에서 원자적), 이는 단순한 생산자-소비자 패턴을 위한 빠른 대안이 됩니다:

from collections import deque
import threading
import time
 
buffer = deque(maxlen=1000)
 
def producer():
    for i in range(100):
        buffer.append(i)
        time.sleep(0.01)
 
def consumer():
    consumed = 0
    while consumed < 100:
        if buffer:
            item = buffer.popleft()
            consumed += 1
        else:
            time.sleep(0.01)
    print(f"Consumed {consumed} items")
 
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t1.start(); t2.start()
t1.join(); t2.join()

참고: 개별 appendpopleft 작업은 스레드 안전하지만, len(buffer)를 확인한 다음 팝하는 것은 원자적이지 않습니다. 완전한 스레드 안전성을 위해 queue.Queue를 사용하세요.

일반적인 Threading 패턴

생산자-소비자 패턴

데이터 생산을 데이터 처리에서 분리하는 고전적인 패턴입니다:

import threading
import queue
import time
import random
 
def producer(q, name, num_items):
    for i in range(num_items):
        item = f"{name}-item-{i}"
        q.put(item)
        print(f"Producer {name}: created {item}")
        time.sleep(random.uniform(0.05, 0.15))
    print(f"Producer {name}: done")
 
def consumer(q, name, stop_event):
    while not stop_event.is_set() or not q.empty():
        try:
            item = q.get(timeout=0.5)
            print(f"Consumer {name}: processing {item}")
            time.sleep(random.uniform(0.1, 0.2))
            q.task_done()
        except queue.Empty:
            continue
    print(f"Consumer {name}: shutting down")
 
task_queue = queue.Queue(maxsize=10)
stop_event = threading.Event()
 
producers = [
    threading.Thread(target=producer, args=(task_queue, "P1", 10)),
    threading.Thread(target=producer, args=(task_queue, "P2", 10)),
]
consumers = [
    threading.Thread(target=consumer, args=(task_queue, "C1", stop_event)),
    threading.Thread(target=consumer, args=(task_queue, "C2", stop_event)),
    threading.Thread(target=consumer, args=(task_queue, "C3", stop_event)),
]
 
for c in consumers: c.start()
for p in producers: p.start()
for p in producers: p.join()
 
task_queue.join()  # 모든 항목이 처리될 때까지 대기
stop_event.set()   # 소비자에게 중지 신호
for c in consumers: c.join()

워커 스레드 풀 (수동)

ThreadPoolExecutor보다 더 많은 제어가 필요할 때:

import threading
import queue
 
class WorkerPool:
    def __init__(self, num_workers):
        self.task_queue = queue.Queue()
        self.result_queue = queue.Queue()
        self.workers = []
 
        for _ in range(num_workers):
            t = threading.Thread(target=self._worker, daemon=True)
            t.start()
            self.workers.append(t)
 
    def _worker(self):
        while True:
            func, args, kwargs, future_id = self.task_queue.get()
            if func is None:
                break
            try:
                result = func(*args, **kwargs)
                self.result_queue.put((future_id, result, None))
            except Exception as e:
                self.result_queue.put((future_id, None, e))
            finally:
                self.task_queue.task_done()
 
    def submit(self, func, *args, **kwargs):
        future_id = id(func)  # 간단한 ID
        self.task_queue.put((func, args, kwargs, future_id))
        return future_id
 
    def shutdown(self):
        for _ in self.workers:
            self.task_queue.put((None, None, None, None))
        for w in self.workers:
            w.join()
 
# 사용법
pool = WorkerPool(4)
for i in range(10):
    pool.submit(lambda x: x * x, i)
pool.task_queue.join()
pool.shutdown()

속도 제한된 스레드 풀

스레드가 외부 요청을 얼마나 빨리 만드는지 제어합니다:

import threading
import time
from concurrent.futures import ThreadPoolExecutor
 
class RateLimiter:
    def __init__(self, max_per_second):
        self.interval = 1.0 / max_per_second
        self.lock = threading.Lock()
        self.last_call = 0
 
    def wait(self):
        with self.lock:
            elapsed = time.time() - self.last_call
            wait_time = self.interval - elapsed
            if wait_time > 0:
                time.sleep(wait_time)
            self.last_call = time.time()
 
limiter = RateLimiter(max_per_second=5)
 
def rate_limited_fetch(url):
    limiter.wait()
    print(f"Fetching {url} at {time.time():.2f}")
    time.sleep(0.5)  # 요청 시뮬레이션
    return f"Data from {url}"
 
urls = [f"https://api.example.com/item/{i}" for i in range(20)]
 
with ThreadPoolExecutor(max_workers=10) as executor:
    results = list(executor.map(rate_limited_fetch, urls))

스레드 안전 함정과 피하는 방법

경쟁 조건

경쟁 조건은 결과가 스레드 실행 타이밍에 의존할 때 발생합니다:

import threading
 
# 나쁨: 경쟁 조건
counter = 0
 
def increment_unsafe():
    global counter
    for _ in range(100_000):
        counter += 1  # 읽기, 증가, 쓰기: 원자적이 아님
 
threads = [threading.Thread(target=increment_unsafe) for _ in range(5)]
for t in threads: t.start()
for t in threads: t.join()
print(f"Expected: 500000, Got: {counter}")  # 종종 500000보다 적음
 
# 좋음: 락으로 보호
counter = 0
lock = threading.Lock()
 
def increment_safe():
    global counter
    for _ in range(100_000):
        with lock:
            counter += 1
 
threads = [threading.Thread(target=increment_safe) for _ in range(5)]
for t in threads: t.start()
for t in threads: t.join()
print(f"Expected: 500000, Got: {counter}")  # 항상 500000

데드락

데드락은 두 스레드가 각각 다른 스레드가 필요로 하는 락을 보유할 때 발생합니다:

import threading
 
lock_a = threading.Lock()
lock_b = threading.Lock()
 
def thread_1():
    with lock_a:
        print("Thread 1: acquired lock_a")
        with lock_b:  # thread_2가 lock_b를 보유하면 영원히 대기
            print("Thread 1: acquired lock_b")
 
def thread_2():
    with lock_b:
        print("Thread 2: acquired lock_b")
        with lock_a:  # thread_1이 lock_a를 보유하면 영원히 대기
            print("Thread 2: acquired lock_a")
 
# 이것은 데드락이 발생함
# t1 = threading.Thread(target=thread_1)
# t2 = threading.Thread(target=thread_2)
# t1.start(); t2.start()

데드락을 방지하는 방법:

  1. 항상 같은 순서로 락을 획득:
def thread_1_fixed():
    with lock_a:    # 항상 lock_a 먼저
        with lock_b:
            print("Thread 1: acquired both locks")
 
def thread_2_fixed():
    with lock_a:    # 항상 lock_a 먼저 (같은 순서)
        with lock_b:
            print("Thread 2: acquired both locks")
  1. 타임아웃 사용:
def safe_acquire():
    acquired_a = lock_a.acquire(timeout=2)
    if not acquired_a:
        print("Could not acquire lock_a, backing off")
        return
    try:
        acquired_b = lock_b.acquire(timeout=2)
        if not acquired_b:
            print("Could not acquire lock_b, releasing lock_a")
            return
        try:
            print("Acquired both locks safely")
        finally:
            lock_b.release()
    finally:
        lock_a.release()
  1. 락 범위 최소화: 가능한 한 짧은 시간 동안만 락을 보유하세요.

스레드 안전 체크리스트

  • 락으로 모든 공유 가변 상태 보호
  • 가능한 경우 공유 리스트나 딕셔너리 대신 queue.Queue 사용
  • 전역 가변 상태 피하기; 함수 인자를 통해 데이터 전달
  • 수동 스레드 관리 대신 ThreadPoolExecutor 사용
  • 스레드 간 작업 순서를 절대 가정하지 않기
  • 스레드 누수를 감지하기 위해 threading.active_count()와 로깅으로 테스트

실제 예제

동시 웹 스크래핑

from concurrent.futures import ThreadPoolExecutor, as_completed
import urllib.request
import time
 
def fetch_page(url):
    """웹 페이지를 가져와 콘텐츠 길이 반환"""
    try:
        with urllib.request.urlopen(url, timeout=10) as response:
            content = response.read()
            return url, len(content), None
    except Exception as e:
        return url, 0, str(e)
 
urls = [
    "https://python.org",
    "https://docs.python.org",
    "https://pypi.org",
    "https://realpython.com",
    "https://github.com",
    "https://stackoverflow.com",
    "https://news.ycombinator.com",
    "https://httpbin.org",
]
 
# 순차적
start = time.time()
for url in urls:
    fetch_page(url)
sequential_time = time.time() - start
 
# 스레드를 사용한 동시 실행
start = time.time()
with ThreadPoolExecutor(max_workers=8) as executor:
    futures = {executor.submit(fetch_page, url): url for url in urls}
    for future in as_completed(futures):
        url, size, error = future.result()
        if error:
            print(f"  FAIL {url}: {error}")
        else:
            print(f"  OK   {url}: {size:,} bytes")
threaded_time = time.time() - start
 
print(f"\nSequential: {sequential_time:.2f}s")
print(f"Threaded:   {threaded_time:.2f}s")
print(f"Speedup:    {sequential_time / threaded_time:.1f}x")

병렬 파일 I/O

from concurrent.futures import ThreadPoolExecutor
import os
import hashlib
 
def process_file(filepath):
    """파일을 읽고 SHA-256 해시 계산"""
    with open(filepath, 'rb') as f:
        content = f.read()
    file_hash = hashlib.sha256(content).hexdigest()
    size = os.path.getsize(filepath)
    return filepath, file_hash, size
 
def hash_all_files(directory, pattern="*.py"):
    """스레드를 사용하여 디렉토리의 모든 일치 파일 해싱"""
    import glob
    files = glob.glob(os.path.join(directory, "**", pattern), recursive=True)
 
    results = {}
    with ThreadPoolExecutor(max_workers=8) as executor:
        futures = {executor.submit(process_file, f): f for f in files}
        for future in futures:
            try:
                path, hash_val, size = future.result()
                results[path] = {"hash": hash_val, "size": size}
            except Exception as e:
                print(f"Error processing {futures[future]}: {e}")
 
    return results
 
# 사용법
# file_hashes = hash_all_files("/path/to/project")

재시도 로직이 있는 동시 API 호출

from concurrent.futures import ThreadPoolExecutor, as_completed
import urllib.request
import json
import time
 
def fetch_api(endpoint, max_retries=3, backoff=1.0):
    """지수 백오프 재시도로 API 엔드포인트 가져오기"""
    for attempt in range(max_retries):
        try:
            url = f"https://jsonplaceholder.typicode.com{endpoint}"
            req = urllib.request.Request(url)
            with urllib.request.urlopen(req, timeout=10) as response:
                data = json.loads(response.read())
                return {"endpoint": endpoint, "data": data, "error": None}
        except Exception as e:
            if attempt < max_retries - 1:
                wait = backoff * (2 ** attempt)
                time.sleep(wait)
            else:
                return {"endpoint": endpoint, "data": None, "error": str(e)}
 
endpoints = [f"/posts/{i}" for i in range(1, 21)]
 
start = time.time()
with ThreadPoolExecutor(max_workers=10) as executor:
    futures = [executor.submit(fetch_api, ep) for ep in endpoints]
    results = [f.result() for f in futures]
 
elapsed = time.time() - start
success = sum(1 for r in results if r["error"] is None)
print(f"Fetched {success}/{len(endpoints)} endpoints in {elapsed:.2f}s")

주기적 백그라운드 작업

import threading
import time
 
class PeriodicTask:
    """백그라운드 스레드에서 고정된 간격으로 함수 실행"""
    def __init__(self, interval, func, *args, **kwargs):
        self.interval = interval
        self.func = func
        self.args = args
        self.kwargs = kwargs
        self._stop_event = threading.Event()
        self._thread = None
 
    def start(self):
        self._thread = threading.Thread(target=self._run, daemon=True)
        self._thread.start()
 
    def _run(self):
        while not self._stop_event.is_set():
            self.func(*self.args, **self.kwargs)
            self._stop_event.wait(self.interval)
 
    def stop(self):
        self._stop_event.set()
        if self._thread:
            self._thread.join()
 
# 사용법
def check_health():
    print(f"Health check at {time.strftime('%H:%M:%S')}")
 
task = PeriodicTask(2.0, check_health)
task.start()
time.sleep(7)
task.stop()
print("Stopped")

성능: Threading vs Multiprocessing vs Asyncio

올바른 동시성 도구는 작업 부하에 따라 다릅니다. 일반적인 작업에 대한 벽시계 시간 비교입니다:

작업순차적Threading (4)Multiprocessing (4)Asyncio
100개 HTTP 요청 (각 200ms)20.0s5.1s5.8s4.9s
100개 파일 읽기 (각 10ms)1.0s0.28s0.35s0.26s
100개 CPU 작업 (각 100ms)10.0s10.2s2.7s10.0s
50개 DB 쿼리 (각 50ms)2.5s0.68s0.85s0.62s
I/O + CPU 혼합15.0s8.2s4.1s9.5s

핵심 요약:

  • Threading은 최소한의 코드 변경으로 I/O-bound 작업에서 3-5배 속도 향상을 제공합니다
  • Multiprocessing은 진정한 CPU 병렬 처리를 위한 유일한 옵션이지만 프로세스 오버헤드를 추가합니다
  • Asyncio는 고동시성 I/O에서 threading보다 약간 우수하지만 async/await로 코드를 다시 작성해야 합니다
  • 혼합 작업의 경우 I/O에는 threading을, CPU 작업에는 multiprocessing을 고려하세요
import time
import threading
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
 
def io_task():
    time.sleep(0.2)
 
def cpu_task(n=2_000_000):
    return sum(i * i for i in range(n))
 
# Threading vs Multiprocessing 벤치마크
NUM_TASKS = 20
 
# Threading - I/O bound
start = time.time()
with ThreadPoolExecutor(max_workers=4) as pool:
    list(pool.map(lambda _: io_task(), range(NUM_TASKS)))
print(f"Threading (I/O): {time.time() - start:.2f}s")
 
# Threading - CPU bound
start = time.time()
with ThreadPoolExecutor(max_workers=4) as pool:
    list(pool.map(lambda _: cpu_task(), range(NUM_TASKS)))
print(f"Threading (CPU): {time.time() - start:.2f}s")

RunCell에서 Threading 실험하기

스레드 코드의 디버깅과 프로파일링은 어려울 수 있습니다. 스레드 동기화를 테스트하거나, 타이밍 중첩을 시각화하거나, 경쟁 조건을 대화식으로 진단해야 할 때, RunCell(www.runcell.dev)은 (opens in a new tab) 이러한 워크플로우를 위해 설계된 AI 기반 Jupyter 환경을 제공합니다.

RunCell의 AI 에이전트는 threading 코드를 분석하여 데드락이 발생하기 전에 잠재적인 데드락을 식별하고, 작업 부하에 따라 최적의 워커 수를 제안하며, 스레드가 예상치 못한 동작을 하는 이유를 이해하는 데 도움을 줍니다. 스레드 풀이 간헐적으로 잘못된 결과를 생성할 때, RunCell은 공유 상태가 손상되는 정확한 순간을 추적하기 위해 실행 타임라인을 추적합니다.

다양한 threading 구성의 성능 특성을 시각화하려면, PyGWalker(github.com/Kanaries/pygwalker)가 벤치마크 DataFrame을 인터랙티브 차트로 변환할 수 있습니다. Threading 벤치마크를 실행하고, 타이밍 데이터를 pandas DataFrame으로 수집한 다음, 워크플로우에 최적의 스레드 수를 찾기 위해 드래그 앤 드롭 시각화로 결과를 탐색하세요.

FAQ

Python에서 threading과 multiprocessing의 차이점은 무엇인가요?

Threading은 메모리를 공유하는 단일 프로세스 내에서 여러 스레드를 실행합니다. Global Interpreter Lock(GIL)은 스레드가 병렬로 Python 바이트코드를 실행하지 못하게 하여 threading을 네트워크 요청과 파일 작업 같은 I/O-bound 작업에만 효과적으로 만듭니다. Multiprocessing은 각각 자체 Python 인터프리터와 메모리 공간을 가진 별도의 프로세스를 생성하여 CPU-bound 작업을 위한 진정한 병렬 실행을 가능하게 합니다. Threading은 오버헤드가 더 낮고(시작이 더 빠르고 메모리 사용이 적음), 반면 multiprocessing은 진정한 병렬 처리를 위해 GIL을 우회합니다.

Python threading은 진정한 병렬인가요?

아니요, GIL로 인해 Python threading은 CPU-bound 코드에 대해 동시적이지만 병렬은 아닙니다. 한 번에 하나의 스레드만 Python 바이트코드를 실행할 수 있습니다. 그러나 GIL은 I/O 작업(네트워크, 디스크, 데이터베이스) 중에 해제되므로, 한 스레드가 I/O를 기다리는 동안 여러 스레드가 효과적으로 병렬로 실행됩니다. CPU-bound 병렬 처리의 경우 multiprocessing 모듈이나 NumPy 같은 GIL을 해제하는 C 확장을 사용하세요.

Python에서 몇 개의 스레드를 사용해야 하나요?

I/O-bound 작업의 경우 외부 서비스의 속도 제한과 네트워크 대역폭에 따라 5-20개의 스레드로 시작하세요. 단일 서버에 너무 많은 스레드는 연결 거부나 스로틀링을 유발할 수 있습니다. 혼합 작업 부하의 경우 CPU 코어 수에서 코어 수의 4배 사이의 스레드 수로 실험하세요. 특정 작업 부하에 최적의 수를 찾기 위해 ThreadPoolExecutor와 다른 max_workers 값으로 벤치마크하세요. ThreadPoolExecutor의 기본값은 min(32, os.cpu_count() + 4)입니다.

Python 스레드에서 값을 반환하려면 어떻게 하나요?

스레드는 target 함수에서 직접적으로 값을 반환하지 않습니다. 세 가지 주요 접근 방식이 있습니다: (1) 반환 값을 얻기 위해 future.result()를 호출하는 Future 객체를 반환하는 ThreadPoolExecutor.submit()을 사용합니다. (2) 가변 컨테이너(딕셔너리나 리스트)를 인자로 전달하고 스레드가 Lock으로 보호하여 그 안에 결과를 쓰게 합니다. (3) 스레드가 결과를 큐에 넣고 메인 스레드가 읽는 queue.Queue를 사용합니다. 대부분의 사용 사례에서 ThreadPoolExecutor가 가장 깔끔한 접근 방식입니다.

Python 스레드에서 예외가 발생하면 어떻게 되나요?

원시 threading.Thread에서 처리되지 않은 예외는 해당 스레드를 조용히 종료하고 예외는 사라집니다. 메인 스레드와 다른 스레드는 어떤 알림도 없이 계속 실행됩니다. ThreadPoolExecutor를 사용하면 예외가 캡처되어 future.result()를 호출할 때 다시 발생되므로 에러 처리가 훨씬 더 안정적입니다. 예외가 적절히 잡히고 처리되도록 항상 스레드 target 함수 내부에 try/except 블록을 사용하거나 ThreadPoolExecutor를 사용하세요.

결론

Python threading은 I/O-bound 프로그램의 속도를 높이는 강력한 도구입니다. 네트워크 요청, 파일 작업, 데이터베이스 쿼리를 동시에 실행함으로써, 20초가 걸리는 순차적 스크립트를 최소한의 코드 변경으로 5초 만에 끝나게 할 수 있습니다.

기억해야 할 핵심 포인트:

  • I/O-bound 작업에 threading 사용. GIL은 CPU 병렬 처리를 방지하지만, 스레드는 I/O 대기 시간을 효과적으로 겹쳐서 실행합니다.
  • 대부분의 threading 필요에 ThreadPoolExecutor 사용. 스레드를 관리하고, 결과를 수집하며, 예외를 깔끔하게 전파합니다.
  • 락으로 공유 상태 보호. 경쟁 조건이 가장 흔한 스레딩 버그이며, queue.Queue는 대부분의 락 우려를 제거합니다.
  • 데드락 피하기 - 일관된 순서로 락을 획득하고 타임아웃을 사용합니다.
  • 올바른 도구 선택: I/O에는 threading, CPU에는 multiprocessing, 수천 개의 동시 연결에는 asyncio.

ThreadPoolExecutor와 간단한 executor.map() 호출로 시작하세요. 속도 향상을 측정하세요. 공유 가변 상태가 요구하는 곳에만 동기화를 추가하세요. Threading은 코드를 완전히 다시 작성할 필요가 없습니다. 몇 줄의 concurrent.futures는 대기 시간을 소비하는 모든 프로그램에서 극적인 성능 향상을 제공할 수 있습니다.

📚