Skip to content

Pythonスレッディング:マルチスレッディング完全ガイドと実例

Updated on

あなたのPythonプログラムが50回のAPIコールを順番に実行しているとします。各コールは200ミリ秒の待機時間がかかります。計算は厳しいですね:10秒もの時間が、ネットワークレスポンスを待つだけで無駄になっています。CPUはほぼゼロの利用率でアイドル状態のまま、I/Oバウンドな処理を順番に処理しているだけです。

この問題は急速に深刻化します。数千のページを順番に取得するWebスクレイパー。一度に1つのファイルしか読み書きしないファイル処理スクリプト。結果を待つ間にアプリケーション全体をブロックするデータベースクエリ。アイドル状態で待つ毎秒は、本来なら有益な作業ができるはずの時間です。

Pythonのthreadingモジュールは、単一のプロセス内で複数の操作を同時に実行することでこの問題を解決します。スレッドはメモリを共有し、素早く起動し、プログラムが大部分の時間を待機に費やすI/Oバウンドなワークロードに最適です。このガイドでは、基本的なスレッド作成から高度な同期パターンまで、すぐに使える実戦的なコード例とともにすべてを解説します。

📚

Pythonにおけるスレッディングとは?

スレッディングにより、プログラムは同じプロセス内で複数の操作を同時に実行できます。各スレッドは同じメモリ空間を共有するため、スレッド間の通信は高速かつ簡単です。

Pythonのthreadingモジュールは、スレッドの作成と管理のための高レベルインターフェースを提供します。ただし、重要な注意点があります:グローバルインタプリタロック(GIL)です。

グローバルインタプリタロック(GIL)

GILはCPythonのミューテックスで、一度に1つのスレッドのみがPythonバイトコードを実行できるようにします。これは、CPUバウンドな操作についてはスレッドが真の並列処理を実現できないことを意味します。しかし、GILはI/O操作(ネットワークコール、ファイル読み込み、データベースクエリ)中に解放されるため、1つのスレッドがI/Oを待っている間に他のスレッドを実行できます。

import threading
import time
 
def cpu_bound(n):
    """CPUバウンド: GILにより並列実行が妨げられる"""
    total = 0
    for i in range(n):
        total += i * i
    return total
 
def io_bound(url):
    """I/Oバウンド: ネットワーク待機中にGILが解放される"""
    import urllib.request
    return urllib.request.urlopen(url).read()
 
# CPUバウンド: 4つのスレッドが順番に実行される(スピードアップなし)
start = time.time()
threads = [threading.Thread(target=cpu_bound, args=(10_000_000,)) for _ in range(4)]
for t in threads: t.start()
for t in threads: t.join()
print(f"スレッドを使ったCPUバウンド処理: {time.time() - start:.2f}秒")
 
# I/Oバウンド: 4つのスレッドが待機時間を重ねて実行される(大幅なスピードアップ)

これは、スレッディングがI/Oバウンドなタスクには最適だが、CPU負荷の高い計算には適していないことを意味します。CPUバウンドな作業には、代わりにmultiprocessingモジュールを使用してください。

スレッディング、マルチプロセッシング、Asyncioの使い分け

機能threadingmultiprocessingasyncio
最適な用途I/OバウンドなタスクCPUバウンドなタスク高並列I/O
並列性並行処理(GIL制限あり)真の並列処理並行処理(シングルスレッド)
メモリ共有(軽量)プロセスごとに分離共有(軽量)
起動コスト低(約1ms)高(約50-100ms)非常に低い
通信直接メモリアクセスパイプ、キュー、共有メモリAwaitableなコルーチン
スケーラビリティ数十〜数百のスレッドCPUコア数に制限数千のコルーチン
複雑さ中(ロックが必要)中(シリアライゼーション)高(async/await構文)
ユースケースWebスクレイピング、ファイルI/O、APIコールデータ処理、MLトレーニングWebサーバー、チャットアプリ

経験則:プログラムがネットワークやディスクを待つ場合はスレッディングを使用します。数値計算を行う場合はマルチプロセッシングを使用します。数千の同時接続が必要な場合はasyncioを使用します。

スレッドの基本:スレッドの作成と実行

threading.Threadクラス

スレッドを作成する最も簡単な方法は、ターゲット関数をthreading.Threadに渡すことです:

import threading
import time
 
def download_file(filename):
    print(f"[{threading.current_thread().name}] {filename}をダウンロード中...")
    time.sleep(2)  # ダウンロードをシミュレート
    print(f"[{threading.current_thread().name}] {filename}が完了")
 
# スレッドを作成
t1 = threading.Thread(target=download_file, args=("data.csv",))
t2 = threading.Thread(target=download_file, args=("report.pdf",))
 
# スレッドを開始
t1.start()
t2.start()
 
# 両方の終了を待つ
t1.join()
t2.join()
 
print("すべてのダウンロードが完了")

両方のダウンロードが並行して実行され、4秒ではなく約2秒で完了します。

start()とjoin()

  • start()はスレッドの実行を開始します。スレッドは一度しか開始できません。
  • join(timeout=None)は、対象のスレッドが終了するまで呼び出し元のスレッドをブロックします。永遠に待たないように、秒単位でtimeoutを渡すことができます。
import threading
import time
 
def slow_task():
    time.sleep(10)
 
t = threading.Thread(target=slow_task)
t.start()
 
# 最大3秒待つ
t.join(timeout=3)
 
if t.is_alive():
    print("3秒後もスレッドは実行中")
else:
    print("スレッドが完了")

スレッドへの命名

名前付きスレッドはデバッグを容易にします:

import threading
 
def worker():
    name = threading.current_thread().name
    print(f"実行中のスレッド: {name}")
 
t = threading.Thread(target=worker, name="DataProcessor")
t.start()
t.join()

デーモンスレッド

デーモンスレッドは、メインプログラムが終了すると自動的に終了するバックグラウンドスレッドです。非デーモンスレッドは、終了するまでプログラムを維持します。

import threading
import time
 
def background_monitor():
    while True:
        print("システム健全性を監視中...")
        time.sleep(5)
 
# デーモンスレッド: メインプログラム終了時に終了する
monitor = threading.Thread(target=background_monitor, daemon=True)
monitor.start()
 
# メインプログラムが作業を行う
time.sleep(12)
print("メインプログラムを終了")
# monitorスレッドは自動的に終了される

デーモンスレッドは、プログラム終了を妨げるべきではないバックグラウンドのロギング、監視、またはクリーンアップタスクに使用します。

Threadのサブクラス化

より複雑なスレッド動作のためには、threading.Threadをサブクラス化します:

import threading
import time
 
class FileProcessor(threading.Thread):
    def __init__(self, filepath):
        super().__init__()
        self.filepath = filepath
        self.result = None
 
    def run(self):
        """run()をスレッドロジックでオーバーライド"""
        print(f"{self.filepath}を処理中")
        time.sleep(1)  # 作業をシミュレート
        self.result = f"処理済み: {self.filepath}"
 
# 作成して実行
processor = FileProcessor("/data/report.csv")
processor.start()
processor.join()
print(processor.result)

スレッドへの引数の渡し方

argsとkwargsの使用

位置引数はargs(タプル)で、キーワード引数はkwargs(辞書)で渡します:

import threading
 
def fetch_data(url, timeout, retries=3, verbose=False):
    print(f"{url}を取得中 (タイムアウト={timeout}秒, 再試行={retries}, 詳細={verbose})")
 
# 位置引数をタプルとして
t1 = threading.Thread(target=fetch_data, args=("https://api.example.com", 30))
 
# キーワード引数を辞書として
t2 = threading.Thread(
    target=fetch_data,
    args=("https://api.example.com",),
    kwargs={"timeout": 30, "retries": 5, "verbose": True}
)
 
t1.start()
t2.start()
t1.join()
t2.join()

よくある間違い:1要素のタプルで末尾のカンマを忘れること。args=("hello",)はタプルですが、args=("hello")は括弧付きの文字列に過ぎません。

スレッドからの結果の収集

スレッドは直接値を返しません。共有データ構造やリストを使用して結果を収集します:

import threading
 
results = {}
lock = threading.Lock()
 
def compute(task_id, value):
    result = value ** 2
    with lock:
        results[task_id] = result
 
threads = []
for i in range(5):
    t = threading.Thread(target=compute, args=(i, i * 10))
    threads.append(t)
    t.start()
 
for t in threads:
    t.join()
 
print(results)  # {0: 0, 1: 100, 2: 400, 3: 900, 4: 1600}

よりクリーンなアプローチは、結果収集を自動的に処理するThreadPoolExecutor(次に解説)を使用することです。

ThreadPoolExecutor:モダンなアプローチ

concurrent.futuresモジュールは、ワーカースレッドのプールを管理するThreadPoolExecutorを提供します。これはスレッドの作成、結果収集、例外伝播を自動的に処理します。

submit()による基本的な使用方法

from concurrent.futures import ThreadPoolExecutor, as_completed
import time
 
def fetch_url(url):
    time.sleep(1)  # ネットワークリクエストをシミュレート
    return f"{url}からのコンテンツ"
 
urls = [
    "https://example.com/page1",
    "https://example.com/page2",
    "https://example.com/page3",
    "https://example.com/page4",
    "https://example.com/page5",
]
 
with ThreadPoolExecutor(max_workers=3) as executor:
    # タスクを送信してFutureオブジェクトを取得
    future_to_url = {executor.submit(fetch_url, url): url for url in urls}
 
    # 完了順に結果を処理
    for future in as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
            print(f"{url}: {data}")
        except Exception as e:
            print(f"{url}で例外が発生: {e}")

順序付き結果のためのmap()

executor.map()は組み込みのmap()と同様に、入力と同じ順序で結果を返します:

from concurrent.futures import ThreadPoolExecutor
 
def process_item(item):
    return item.upper()
 
items = ["apple", "banana", "cherry", "date"]
 
with ThreadPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(process_item, items))
 
print(results)  # ['APPLE', 'BANANA', 'CHERRY', 'DATE']

submit() vs map()

submit()map()
返り値Futureオブジェクト結果のイテレータ
結果の順序完了順(as_completed使用時)入力順
エラーハンドリングfuture.result()でタスクごと最初の失敗で発生
引数単一の関数コール各アイテムに関数を適用
最適な用途異種タスク、早期結果同種のバッチ処理

Futureを使用した例外処理

from concurrent.futures import ThreadPoolExecutor, as_completed
 
def risky_task(n):
    if n == 3:
        raise ValueError(f"不正な入力: {n}")
    return n * 10
 
with ThreadPoolExecutor(max_workers=4) as executor:
    futures = {executor.submit(risky_task, i): i for i in range(5)}
 
    for future in as_completed(futures):
        task_id = futures[future]
        try:
            result = future.result(timeout=5)
            print(f"タスク {task_id}: {result}")
        except ValueError as e:
            print(f"タスク {task_id}が失敗: {e}")
        except TimeoutError:
            print(f"タスク {task_id}がタイムアウト")

タスクのキャンセル

from concurrent.futures import ThreadPoolExecutor
import time
 
def long_task(n):
    time.sleep(5)
    return n
 
with ThreadPoolExecutor(max_workers=2) as executor:
    futures = [executor.submit(long_task, i) for i in range(10)]
 
    # 保留中のタスクをキャンセル(実行中のタスクはキャンセルできない)
    for f in futures[4:]:
        cancelled = f.cancel()
        print(f"キャンセル済み: {cancelled}")

スレッド同期プリミティブ

複数のスレッドが共有データにアクセスする場合、競合状態を防ぐために同期が必要です。

Lock

Lockは、一度に1つのスレッドのみがクリティカルセクションに入ることを保証します:

import threading
 
class BankAccount:
    def __init__(self, balance):
        self.balance = balance
        self.lock = threading.Lock()
 
    def withdraw(self, amount):
        with self.lock:  # 一度に1つのスレッドのみ
            if self.balance >= amount:
                self.balance -= amount
                return True
            return False
 
    def deposit(self, amount):
        with self.lock:
            self.balance += amount
 
account = BankAccount(1000)
 
def make_transactions():
    for _ in range(100):
        account.deposit(10)
        account.withdraw(10)
 
threads = [threading.Thread(target=make_transactions) for _ in range(10)]
for t in threads: t.start()
for t in threads: t.join()
 
print(f"最終残高: {account.balance}")  # 常に1000

ロックがない場合、同時読み書きによって不正確な結果が生じます(競合状態)。

RLock(再入可能ロック)

RLockは、同じスレッドによって複数回獲得できます。これにより、ロックを保持している関数が同じロックを必要とする別の関数を呼び出す際のデッドロックを防ぎます:

import threading
 
class SafeCache:
    def __init__(self):
        self._data = {}
        self._lock = threading.RLock()
 
    def get(self, key):
        with self._lock:
            return self._data.get(key)
 
    def set(self, key, value):
        with self._lock:
            self._data[key] = value
 
    def get_or_set(self, key, default):
        with self._lock:
            # これはget()を呼び出し、_lockも獲得する
            # RLockはこれを許可する; 通常のLockはデッドロックする
            existing = self.get(key)
            if existing is None:
                self.set(key, default)
                return default
            return existing

Semaphore

Semaphoreは、固定数のスレッドが同時にリソースにアクセスできるようにします:

import threading
import time
 
# 最大3つの同時データベース接続を許可
db_semaphore = threading.Semaphore(3)
 
def query_database(query_id):
    with db_semaphore:
        print(f"クエリ {query_id}: 接続済み (アクティブ接続: {3 - db_semaphore._value})")
        time.sleep(2)  # クエリをシミュレート
        print(f"クエリ {query_id}: 完了")
 
threads = [threading.Thread(target=query_database, args=(i,)) for i in range(10)]
for t in threads: t.start()
for t in threads: t.join()

Event

Eventにより、1つのスレッドが他の待機中のスレッドにシグナルを送ることができます:

import threading
import time
 
data_ready = threading.Event()
shared_data = []
 
def producer():
    print("Producer: データを準備中...")
    time.sleep(3)
    shared_data.extend([1, 2, 3, 4, 5])
    print("Producer: データ準備完了、コンシューマーにシグナル送信")
    data_ready.set()
 
def consumer(name):
    print(f"Consumer {name}: データを待機中...")
    data_ready.wait()  # イベントがセットされるまでブロック
    print(f"Consumer {name}: データを取得 = {shared_data}")
 
threads = [
    threading.Thread(target=producer),
    threading.Thread(target=consumer, args=("A",)),
    threading.Thread(target=consumer, args=("B",)),
]
for t in threads: t.start()
for t in threads: t.join()

Condition

Conditionは、通知を待つ能力とロックを組み合わせます。これはプロデューサー・コンシューマーパターンの基盤です:

import threading
import time
import random
 
buffer = []
MAX_SIZE = 5
condition = threading.Condition()
 
def producer():
    for i in range(20):
        with condition:
            while len(buffer) >= MAX_SIZE:
                condition.wait()  # 空きができるまで待機
            item = random.randint(1, 100)
            buffer.append(item)
            print(f"生産: {item} (バッファサイズ: {len(buffer)})")
            condition.notify_all()
        time.sleep(0.1)
 
def consumer(name):
    for _ in range(10):
        with condition:
            while len(buffer) == 0:
                condition.wait()  # アイテムができるまで待機
            item = buffer.pop(0)
            print(f"Consumer {name}が消費: {item} (バッファサイズ: {len(buffer)})")
            condition.notify_all()
        time.sleep(0.15)
 
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer, args=("X",))
t3 = threading.Thread(target=consumer, args=("Y",))
t1.start(); t2.start(); t3.start()
t1.join(); t2.join(); t3.join()

同期プリミティブのまとめ

プリミティブ目的使用時期
Lock相互排他共有された可変状態を保護
RLock再入可能なミューテックス同じスレッドでのネストされたロック
Semaphore並行性の制限レート制限、コネクションプール
Event1回限りのシグナル初期化完了、シャットダウンシグナル
Condition待機/通知パターンプロデューサー・コンシューマー、状態変化
BarrierNスレッドの同期すべてのスレッドが継続する前にポイントに到達する必要がある場合

スレッドセーフなデータ構造

queue.Queue

queue.Queueは、スレッドセーフなデータ構造の第一選択です。すべてのロックを内部で処理します:

import threading
import queue
import time
 
task_queue = queue.Queue()
results = queue.Queue()
 
def worker():
    while True:
        item = task_queue.get()  # アイテムが利用可能になるまでブロック
        if item is None:
            break
        result = item ** 2
        results.put(result)
        task_queue.task_done()
 
# 4つのワーカーを開始
workers = []
for _ in range(4):
    t = threading.Thread(target=worker, daemon=True)
    t.start()
    workers.append(t)
 
# タスクを送信
for i in range(20):
    task_queue.put(i)
 
# すべてのタスクが完了するのを待つ
task_queue.join()
 
# ワーカーを停止
for _ in range(4):
    task_queue.put(None)
for w in workers:
    w.join()
 
# 結果を収集
all_results = []
while not results.empty():
    all_results.append(results.get())
print(f"結果: {sorted(all_results)}")

queue.Queueはまた以下もサポートしています:

  • Queue(maxsize=10): 満杯のときにput()をブロック
  • PriorityQueue(): 優先度でソートされたアイテム
  • LifoQueue(): 後入れ先出し(スタック動作)

collections.deque

collections.dequeappend()popleft()操作(CPythonではCレベルでアトミック)に対してスレッドセーフなため、単純なプロデューサー・コンシューマーパターンの高速な代替となります:

from collections import deque
import threading
import time
 
buffer = deque(maxlen=1000)
 
def producer():
    for i in range(100):
        buffer.append(i)
        time.sleep(0.01)
 
def consumer():
    consumed = 0
    while consumed < 100:
        if buffer:
            item = buffer.popleft()
            consumed += 1
        else:
            time.sleep(0.01)
    print(f"{consumed}個のアイテムを消費")
 
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t1.start(); t2.start()
t1.join(); t2.join()

注意:個別のappendpopleft操作はスレッドセーフですが、len(buffer)を確認してからpopすることはアトミックではありません。完全なスレッドセーフティが必要な場合は、queue.Queueを使用してください。

一般的なスレッディングパターン

プロデューサー・コンシューマーパターン

データの生成とデータの処理を分離するための古典的なパターン:

import threading
import queue
import time
import random
 
def producer(q, name, num_items):
    for i in range(num_items):
        item = f"{name}-item-{i}"
        q.put(item)
        print(f"Producer {name}: {item}を作成")
        time.sleep(random.uniform(0.05, 0.15))
    print(f"Producer {name}: 完了")
 
def consumer(q, name, stop_event):
    while not stop_event.is_set() or not q.empty():
        try:
            item = q.get(timeout=0.5)
            print(f"Consumer {name}: {item}を処理中")
            time.sleep(random.uniform(0.1, 0.2))
            q.task_done()
        except queue.Empty:
            continue
    print(f"Consumer {name}: シャットダウン中")
 
task_queue = queue.Queue(maxsize=10)
stop_event = threading.Event()
 
producers = [
    threading.Thread(target=producer, args=(task_queue, "P1", 10)),
    threading.Thread(target=producer, args=(task_queue, "P2", 10)),
]
consumers = [
    threading.Thread(target=consumer, args=(task_queue, "C1", stop_event)),
    threading.Thread(target=consumer, args=(task_queue, "C2", stop_event)),
    threading.Thread(target=consumer, args=(task_queue, "C3", stop_event)),
]
 
for c in consumers: c.start()
for p in producers: p.start()
for p in producers: p.join()
 
task_queue.join()  # すべてのアイテムが処理されるのを待つ
stop_event.set()   # コンシューマーに停止をシグナル
for c in consumers: c.join()

ワーカースレッドプール(手動)

ThreadPoolExecutorよりも細かい制御が必要な場合:

import threading
import queue
 
class WorkerPool:
    def __init__(self, num_workers):
        self.task_queue = queue.Queue()
        self.result_queue = queue.Queue()
        self.workers = []
 
        for _ in range(num_workers):
            t = threading.Thread(target=self._worker, daemon=True)
            t.start()
            self.workers.append(t)
 
    def _worker(self):
        while True:
            func, args, kwargs, future_id = self.task_queue.get()
            if func is None:
                break
            try:
                result = func(*args, **kwargs)
                self.result_queue.put((future_id, result, None))
            except Exception as e:
                self.result_queue.put((future_id, None, e))
            finally:
                self.task_queue.task_done()
 
    def submit(self, func, *args, **kwargs):
        future_id = id(func)  # 単純なID
        self.task_queue.put((func, args, kwargs, future_id))
        return future_id
 
    def shutdown(self):
        for _ in self.workers:
            self.task_queue.put((None, None, None, None))
        for w in self.workers:
            w.join()
 
# 使用法
pool = WorkerPool(4)
for i in range(10):
    pool.submit(lambda x: x * x, i)
pool.task_queue.join()
pool.shutdown()

レート制限付きスレッドプール

スレッドが外部リクエストを行う速度を制御する:

import threading
import time
from concurrent.futures import ThreadPoolExecutor
 
class RateLimiter:
    def __init__(self, max_per_second):
        self.interval = 1.0 / max_per_second
        self.lock = threading.Lock()
        self.last_call = 0
 
    def wait(self):
        with self.lock:
            elapsed = time.time() - self.last_call
            wait_time = self.interval - elapsed
            if wait_time > 0:
                time.sleep(wait_time)
            self.last_call = time.time()
 
limiter = RateLimiter(max_per_second=5)
 
def rate_limited_fetch(url):
    limiter.wait()
    print(f"{time.time():.2f}{url}を取得中")
    time.sleep(0.5)  # リクエストをシミュレート
    return f"{url}からのデータ"
 
urls = [f"https://api.example.com/item/{i}" for i in range(20)]
 
with ThreadPoolExecutor(max_workers=10) as executor:
    results = list(executor.map(rate_limited_fetch, urls))

スレッドセーフティの落とし穴と回避方法

競合状態

競合状態は、スレッド実行のタイミングに結果が依存する場合に発生します:

import threading
 
# 悪い例: 競合状態
counter = 0
 
def increment_unsafe():
    global counter
    for _ in range(100_000):
        counter += 1  # 読み込み、インクリメント、書き込み: アトミックではない
 
threads = [threading.Thread(target=increment_unsafe) for _ in range(5)]
for t in threads: t.start()
for t in threads: t.join()
print(f"期待値: 500000, 取得値: {counter}")  # 500000未満になることが多い
 
# 良い例: ロックで保護
counter = 0
lock = threading.Lock()
 
def increment_safe():
    global counter
    for _ in range(100_000):
        with lock:
            counter += 1
 
threads = [threading.Thread(target=increment_safe) for _ in range(5)]
for t in threads: t.start()
for t in threads: t.join()
print(f"期待値: 500000, 取得値: {counter}")  # 常に500000

デッドロック

デッドロックは、2つのスレッドがそれぞれ相手が必要とするロックを保持している場合に発生します:

import threading
 
lock_a = threading.Lock()
lock_b = threading.Lock()
 
def thread_1():
    with lock_a:
        print("Thread 1: lock_aを獲得")
        with lock_b:  # thread_2がlock_bを保持している場合、永遠に待つ
            print("Thread 1: lock_bを獲得")
 
def thread_2():
    with lock_b:
        print("Thread 2: lock_bを獲得")
        with lock_a:  # thread_1がlock_aを保持している場合、永遠に待つ
            print("Thread 2: lock_aを獲得")
 
# これはデッドロックする
# t1 = threading.Thread(target=thread_1)
# t2 = threading.Thread(target=thread_2)
# t1.start(); t2.start()

デッドロックの防止方法

  1. 常に同じ順序でロックを獲得する
def thread_1_fixed():
    with lock_a:    # 常にlock_aを最初に
        with lock_b:
            print("Thread 1: 両方のロックを獲得")
 
def thread_2_fixed():
    with lock_a:    # 常にlock_aを最初に(同じ順序)
        with lock_b:
            print("Thread 2: 両方のロックを獲得")
  1. タイムアウトを使用する
def safe_acquire():
    acquired_a = lock_a.acquire(timeout=2)
    if not acquired_a:
        print("lock_aを獲得できないため、バックオフ")
        return
    try:
        acquired_b = lock_b.acquire(timeout=2)
        if not acquired_b:
            print("lock_bを獲得できないため、lock_aを解放")
            return
        try:
            print("安全に両方のロックを獲得")
        finally:
            lock_b.release()
    finally:
        lock_a.release()
  1. ロックの範囲を最小化する:可能な限り短時間ロックを保持する。

スレッドセーフティチェックリスト

  • すべての共有可変状態をロックで保護する
  • 可能な場合は共有リストや辞書の代わりにqueue.Queueを使用する
  • グローバル可変状態を避け、データを関数引数を通じて渡す
  • 手動のスレッド管理よりThreadPoolExecutorを使用する
  • スレッド間の操作順序を決して仮定しない
  • スレッドリークを検出するためにthreading.active_count()とロギングを使用してテストする

実世界の例

並列Webスクレイピング

from concurrent.futures import ThreadPoolExecutor, as_completed
import urllib.request
import time
 
def fetch_page(url):
    """Webページを取得してコンテンツ長を返す"""
    try:
        with urllib.request.urlopen(url, timeout=10) as response:
            content = response.read()
            return url, len(content), None
    except Exception as e:
        return url, 0, str(e)
 
urls = [
    "https://python.org",
    "https://docs.python.org",
    "https://pypi.org",
    "https://realpython.com",
    "https://github.com",
    "https://stackoverflow.com",
    "https://news.ycombinator.com",
    "https://httpbin.org",
]
 
# 順次実行
start = time.time()
for url in urls:
    fetch_page(url)
sequential_time = time.time() - start
 
# スレッドを使用した並列実行
start = time.time()
with ThreadPoolExecutor(max_workers=8) as executor:
    futures = {executor.submit(fetch_page, url): url for url in urls}
    for future in as_completed(futures):
        url, size, error = future.result()
        if error:
            print(f"  FAIL {url}: {error}")
        else:
            print(f"  OK   {url}: {size:,} bytes")
threaded_time = time.time() - start
 
print(f"\n順次実行: {sequential_time:.2f}秒")
print(f"スレッド使用:   {threaded_time:.2f}秒")
print(f"スピードアップ:    {sequential_time / threaded_time:.1f}倍")

並列ファイルI/O

from concurrent.futures import ThreadPoolExecutor
import os
import hashlib
 
def process_file(filepath):
    """ファイルを読み込んでSHA-256ハッシュを計算"""
    with open(filepath, 'rb') as f:
        content = f.read()
    file_hash = hashlib.sha256(content).hexdigest()
    size = os.path.getsize(filepath)
    return filepath, file_hash, size
 
def hash_all_files(directory, pattern="*.py"):
    """スレッドを使用してディレクトリ内の一致するすべてのファイルのハッシュを計算"""
    import glob
    files = glob.glob(os.path.join(directory, "**", pattern), recursive=True)
 
    results = {}
    with ThreadPoolExecutor(max_workers=8) as executor:
        futures = {executor.submit(process_file, f): f for f in files}
        for future in futures:
            try:
                path, hash_val, size = future.result()
                results[path] = {"hash": hash_val, "size": size}
            except Exception as e:
                print(f"{futures[future]}の処理中にエラー: {e}")
 
    return results
 
# 使用法
# file_hashes = hash_all_files("/path/to/project")

再試行ロジック付き並列APIコール

from concurrent.futures import ThreadPoolExecutor, as_completed
import urllib.request
import json
import time
 
def fetch_api(endpoint, max_retries=3, backoff=1.0):
    """指数バックオフ再試行でAPIエンドポイントを取得"""
    for attempt in range(max_retries):
        try:
            url = f"https://jsonplaceholder.typicode.com{endpoint}"
            req = urllib.request.Request(url)
            with urllib.request.urlopen(req, timeout=10) as response:
                data = json.loads(response.read())
                return {"endpoint": endpoint, "data": data, "error": None}
        except Exception as e:
            if attempt < max_retries - 1:
                wait = backoff * (2 ** attempt)
                time.sleep(wait)
            else:
                return {"endpoint": endpoint, "data": None, "error": str(e)}
 
endpoints = [f"/posts/{i}" for i in range(1, 21)]
 
start = time.time()
with ThreadPoolExecutor(max_workers=10) as executor:
    futures = [executor.submit(fetch_api, ep) for ep in endpoints]
    results = [f.result() for f in futures]
 
elapsed = time.time() - start
success = sum(1 for r in results if r["error"] is None)
print(f"{elapsed:.2f}秒で{len(endpoints)}エンドポイント中{success}個を取得")

定期的なバックグラウンドタスク

import threading
import time
 
class PeriodicTask:
    """バックグラウンドスレッドで固定間隔で関数を実行"""
    def __init__(self, interval, func, *args, **kwargs):
        self.interval = interval
        self.func = func
        self.args = args
        self.kwargs = kwargs
        self._stop_event = threading.Event()
        self._thread = None
 
    def start(self):
        self._thread = threading.Thread(target=self._run, daemon=True)
        self._thread.start()
 
    def _run(self):
        while not self._stop_event.is_set():
            self.func(*self.args, **self.kwargs)
            self._stop_event.wait(self.interval)
 
    def stop(self):
        self._stop_event.set()
        if self._thread:
            self._thread.join()
 
# 使用法
def check_health():
    print(f"{time.strftime('%H:%M:%S')}にヘルスチェック")
 
task = PeriodicTask(2.0, check_health)
task.start()
time.sleep(7)
task.stop()
print("停止")

パフォーマンス:スレッディング vs マルチプロセッシング vs Asyncio

適切な並列処理ツールはワークロードに依存します。一般的なタスクのウォールクロック時間を比較します:

タスク順次スレッディング (4)マルチプロセッシング (4)Asyncio
100 HTTPリクエスト(各200ms)20.0秒5.1秒5.8秒4.9秒
100ファイル読み込み(各10ms)1.0秒0.28秒0.35秒0.26秒
100 CPUタスク(各100ms)10.0秒10.2秒2.7秒10.0秒
50 DBクエリ(各50ms)2.5秒0.68秒0.85秒0.62秒
I/O + CPUの混在15.0秒8.2秒4.1秒9.5秒

重要なポイント

  • スレッディングは、最小限のコード変更でI/Oバウンドなワークロードで3-5倍のスピードアップを提供します
  • マルチプロセッシングは真のCPU並列処理の唯一の選択肢ですが、プロセスオーバーヘッドが追加されます
  • Asyncioは高並列I/Oでスレッディングをわずかに上回りますが、async/awaitでコードを書き換える必要があります
  • 混在ワークロードの場合、I/Oにはスレッディング、CPUタスクにはマルチプロセッシングの組み合わせを検討してください
import time
import threading
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
 
def io_task():
    time.sleep(0.2)
 
def cpu_task(n=2_000_000):
    return sum(i * i for i in range(n))
 
# スレッディング vs マルチプロセッシングのベンチマーク
NUM_TASKS = 20
 
# スレッディング - I/Oバウンド
start = time.time()
with ThreadPoolExecutor(max_workers=4) as pool:
    list(pool.map(lambda _: io_task(), range(NUM_TASKS)))
print(f"スレッディング (I/O): {time.time() - start:.2f}秒")
 
# スレッディング - CPUバウンド
start = time.time()
with ThreadPoolExecutor(max_workers=4) as pool:
    list(pool.map(lambda _: cpu_task(), range(NUM_TASKS)))
print(f"スレッディング (CPU): {time.time() - start:.2f}秒")

RunCellでのスレッディング実験

スレッド化されたコードのデバッグとプロファイリングは困難な場合があります。スレッド同期のテスト、タイミングオーバーラップの可視化、または競合状態を対話的に診断する必要がある場合、RunCellwww.runcell.dev)はこのワークフロー用に設計されたAI搭載Jupyter環境を提供します。 (opens in a new tab)

RunCellのAIエージェントは、スレッディングコードを分析し、デッドロックが発生する前に潜在的なデッドロックを特定し、ワークロードに基づいて最適なワーカー数を提案し、スレッドが予期せぬ動作をする理由を理解するのに役立ちます。スレッドプールが断続的に不正確な結果を生成する場合、RunCellは共有状態が破損した正確な時刻を追跡して実行タイムラインを追跡します。

異なるスレッディング構成のパフォーマンス特性を可視化したい場合、PyGWalker(github.com/Kanaries/pygwalker)はベンチマークDataFrameをインタラクティブなチャートに変換できます。スレッディングベンチマークを実行し、タイミングデータをpandas DataFrameに収集し、ドラッグ&ドロップ可視化を使用してワークロードに最適なスレッド数を見つけてください。

FAQ

Pythonにおけるスレッディングとマルチプロセッシングの違いは何ですか?

スレッディングは単一のプロセス内で複数のスレッドを実行し、メモリを共有します。グローバルインタプリタロック(GIL)により、スレッドはPythonバイトコードを並列に実行できないため、スレッディングはネットワークリクエストやファイル操作などのI/Oバウンドなタスクにのみ有効です。マルチプロセッシングは、それぞれが独自のPythonインタプリタとメモリ空間を持つ独立したプロセスを作成し、CPUバウンドなタスクの真の並列実行を可能にします。スレッディングはオーバーヘッドが低く(起動が速く、メモリ使用量が少ない)ですが、マルチプロセッシングは真の並列性のためにGILを迂回します。

Pythonのスレッディングは真の並列ですか?

いいえ、PythonのスレッディングはGILのため、CPUバウンドなコードについては並行処理ですが並列処理ではありません。一度に1つのスレッドのみがPythonバイトコードを実行できます。しかし、GILはI/O操作(ネットワーク、ディスク、データベース)中に解放されるため、I/Oを待っている間に複数のスレッドが効果的に並列で実行されます。CPUバウンドな並列性が必要な場合は、マルチプロセッシングモジュールまたはGILを解放するC拡張(NumPyなど)を使用してください。

Pythonで使用すべきスレッド数はいくつですか?

I/Oバウンドなタスクの場合、外部サービスのレート制限とネットワーク帯域に応じて5-20スレッドから始めてください。単一のサーバーに対して多すぎるスレッドは接続拒否やスロットリングを引き起こす可能性があります。混在ワークロードの場合、CPUコア数からコア数の4倍の間でスレッド数を実験してください。ThreadPoolExecutorを使用し、特定のワークロードに最適な数を見つけるために異なるmax_workers値でベンチマークしてください。ThreadPoolExecutorのデフォルトはmin(32, os.cpu_count() + 4)です。

Pythonのスレッドから値を返すにはどうすればよいですか?

スレッドはターゲット関数から直接値を返しません。主なアプローチは3つあります:(1) 値を取得するためにfuture.result()を呼び出すFutureオブジェクトを返すThreadPoolExecutor.submit()を使用する。(2) 可変コンテナ(辞書やリストなど)を引数として渡し、スレッドがLockで保護してそこに結果を書き込むようにする。(3) スレッドが結果をキューに入れ、メインスレッドがそこから読み取るqueue.Queueを使用する。ほとんどのユースケースではThreadPoolExecutorが最もクリーンなアプローチです。

Pythonのスレッドで例外が発生した場合はどうなりますか?

生のthreading.Threadでは、未処理の例外はそのスレッドを暗黙的に終了させ、例外は失われます。メインスレッドや他のスレッドは何の通知もなく実行を継続します。ThreadPoolExecutorでは、例外がキャプチャされ、future.result()を呼び出したときに再発生するため、エラーハンドリングがはるかに信頼性が高くなります。スレッドターゲット関数内で常にtry/exceptブロックを使用するか、例外が適切にキャッチされ処理されるようにThreadPoolExecutorを使用してください。

まとめ

Pythonスレッディングは、I/Oバウンドなプログラムを高速化する強力なツールです。ネットワークリクエスト、ファイル操作、データベースクエリを同時に実行することで、20秒かかる順次スクリプトを、わずかなコード変更で5秒で終わるものに変換できます。

覚えておくべき重要なポイント:

  • I/Oバウンドな作業にスレッディングを使用する。GILはCPU並列処理を妨げますが、スレッドはI/O待機時間を効果的に重ね合わせます。
  • ほとんどのスレッディングニーズにThreadPoolExecutorを使用する。これはスレッドを管理し、結果を収集し、例外をクリーンに伝播します。
  • ロックで共有状態を保護する。競合状態は最も一般的なスレッディングバグであり、queue.Queueはほとんどのロックの懸念を排除します。
  • 一貫した順序でロックを獲得し、タイムアウトを使用してデッドロックを回避する
  • 適切なツールを選択する:I/Oにはスレッディング、CPUにはマルチプロセッシング、数千の同時接続にはasyncio。

ThreadPoolExecutorと単純なexecutor.map()コールから始めてください。スピードアップを測定してください。共有可変状態が必要な場所でのみ同期を追加してください。スレッディングはコードを完全に書き換える必要はありません。concurrent.futuresの数行で、待機時間を費やすあらゆるプログラムで劇的なパフォーマンス向上をもたらすことができます。

📚