Python Multiprocessing:高速化のための並列処理ガイド
Updated on
Pythonのシングルスレッド実行モデルは、大規模なデータセットの処理やCPU集約的な計算を行う際に限界に達します。10分かかるデータ処理スクリプトは、理論的には5コアマシンで2分で実行できるはずですが、PythonのGlobal Interpreter Lock(GIL)により、標準スレッドが真の並列処理を実現できません。その結果、CPUコアが無駄になり、マルチコアプロセッサが遊休状態のまま、Pythonがタスクを1つずつ処理するのを開発者は苛立ちながら見守ることになります。
このボトルネックは実際の時間とコストを消費します。データサイエンティストは数分で終わるはずのモデルトレーニングに何時間も待ちます。Webスクレイパーは潜在的な速度の一部でクロールします。すべての利用可能なコアを活用すべき画像処理パイプラインが、代わりに1つのコアだけを使って遅々として進みます。
multiprocessingモジュールは、それぞれ独自のインタープリタとメモリ空間を持つ別々のPythonプロセスを作成することでこれを解決します。スレッドとは異なり、プロセスはGILを完全にバイパスし、CPUコア全体で真の並列実行を可能にします。このガイドでは、基本的な並列実行から、プロセスプールや共有メモリなどの高度なパターンまで、劇的なパフォーマンス向上のためにmultiprocessingを活用する方法を示します。
GIL問題を理解する
Global Interpreter Lock(GIL)は、Pythonオブジェクトへのアクセスを保護するmutexで、複数のスレッドが同時にPythonバイトコードを実行することを防ぎます。16コアマシンでも、CPU束縛タスクの場合、Pythonスレッドは一度に1つずつ実行されます。
import threading
import time
def cpu_bound_task(n):
count = 0
for i in range(n):
count += i * i
return count
# ThreadingはCPU束縛作業を並列化しません
start = time.time()
threads = [threading.Thread(target=cpu_bound_task, args=(10_000_000,)) for _ in range(4)]
for t in threads: t.start()
for t in threads: t.join()
print(f"Threading: {time.time() - start:.2f}s") # シングルスレッドとほぼ同じ時間GILはI/O操作(ファイル読み取り、ネットワークリクエスト)中にのみ解放されるため、threadingはI/O束縛タスクには有用ですが、CPU束縛作業には効果がありません。Multiprocessingは、別々のPythonインタープリタを並列実行することでGILをバイパスします。
Processを使った基本的なMultiprocessing
Processクラスは、独立して実行される新しいPythonプロセスを作成します。各プロセスには独自のメモリ空間とPythonインタープリタがあります。
from multiprocessing import Process
import os
def worker(name):
print(f"Worker {name} がプロセス {os.getpid()} で実行中")
result = sum(i*i for i in range(5_000_000))
print(f"Worker {name} 完了: {result}")
if __name__ == '__main__':
processes = []
# 4つのプロセスを作成
for i in range(4):
p = Process(target=worker, args=(f"#{i}",))
processes.append(p)
p.start()
# すべての完了を待つ
for p in processes:
p.join()
print("すべてのプロセスが完了しました")重要な要件:WindowsとmacOSでは常にif __name__ == '__main__'ガードを使用してください。これがないと、子プロセスが再帰的にさらにプロセスを生成し、フォーク爆弾を引き起こします。
Process Pool:簡素化された並列実行
Poolはワーカープロセスのプールを管理し、タスクを自動的に分散します。これは最も一般的なmultiprocessingパターンです。
from multiprocessing import Pool
import time
def process_item(x):
"""CPU集約的な作業をシミュレート"""
time.sleep(0.1)
return x * x
if __name__ == '__main__':
data = range(100)
# 逐次処理
start = time.time()
results_seq = [process_item(x) for x in data]
seq_time = time.time() - start
# 4ワーカーでの並列処理
start = time.time()
with Pool(processes=4) as pool:
results_par = pool.map(process_item, data)
par_time = time.time() - start
print(f"逐次: {seq_time:.2f}s")
print(f"並列 (4コア): {par_time:.2f}s")
print(f"高速化: {seq_time/par_time:.2f}x")Poolメソッドの比較
異なるPoolメソッドは異なるユースケースに適しています:
| メソッド | ユースケース | ブロック | 戻り値 | 複数引数 |
|---|---|---|---|---|
map() | 単純な並列化 | はい | 順序付きリスト | なし(単一引数) |
map_async() | 非ブロッキングmap | いいえ | AsyncResult | なし |
starmap() | 複数引数 | はい | 順序付きリスト | あり(タプル展開) |
starmap_async() | 非ブロッキングstarmap | いいえ | AsyncResult | あり |
apply() | 単一関数呼び出し | はい | 単一結果 | あり |
apply_async() | 非ブロッキングapply | いいえ | AsyncResult | あり |
imap() | 遅延イテレータ | はい | イテレータ | なし |
imap_unordered() | 遅延、順序なし | はい | イテレータ | なし |
from multiprocessing import Pool
def add(x, y):
return x + y
def power(x, exp):
return x ** exp
if __name__ == '__main__':
with Pool(4) as pool:
# map: 単一引数
squares = pool.map(lambda x: x**2, [1, 2, 3, 4])
# starmap: 複数引数(タプルを展開)
results = pool.starmap(add, [(1, 2), (3, 4), (5, 6)])
# apply_async: 非ブロッキング単一呼び出し
async_result = pool.apply_async(power, (2, 10))
result = async_result.get() # 準備完了までブロック
# imap: 大規模データセット用の遅延評価
for result in pool.imap(lambda x: x**2, range(1000)):
pass # 結果が到着するたびに1つずつ処理プロセス間通信
プロセスはデフォルトでメモリを共有しません。通信にはQueueまたはPipeを使用します。
Queue:スレッドセーフなメッセージ受け渡し
from multiprocessing import Process, Queue
def producer(queue, items):
for item in items:
queue.put(item)
print(f"生成: {item}")
queue.put(None) # センチネル値
def consumer(queue):
while True:
item = queue.get()
if item is None:
break
print(f"消費: {item}")
# アイテムを処理...
if __name__ == '__main__':
q = Queue()
items = [1, 2, 3, 4, 5]
prod = Process(target=producer, args=(q, items))
cons = Process(target=consumer, args=(q,))
prod.start()
cons.start()
prod.join()
cons.join()Pipe:双方向通信
from multiprocessing import Process, Pipe
def worker(conn):
conn.send("ワーカーからこんにちは")
msg = conn.recv()
print(f"ワーカーが受信: {msg}")
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=worker, args=(child_conn,))
p.start()
msg = parent_conn.recv()
print(f"親が受信: {msg}")
parent_conn.send("親からこんにちは")
p.join()共有メモリと状態
プロセスは別々のメモリを持ちますが、multiprocessingは共有メモリプリミティブを提供します。
ValueとArray:共有プリミティブ
from multiprocessing import Process, Value, Array
import time
def increment_counter(counter, lock):
for _ in range(100_000):
with lock:
counter.value += 1
def fill_array(arr, start, end):
for i in range(start, end):
arr[i] = i * i
if __name__ == '__main__':
# ロック付き共有値
counter = Value('i', 0)
lock = counter.get_lock()
processes = [Process(target=increment_counter, args=(counter, lock)) for _ in range(4)]
for p in processes: p.start()
for p in processes: p.join()
print(f"カウンター: {counter.value}") # 400,000であるべき
# 共有配列
shared_arr = Array('i', 1000)
p1 = Process(target=fill_array, args=(shared_arr, 0, 500))
p2 = Process(target=fill_array, args=(shared_arr, 500, 1000))
p1.start(); p2.start()
p1.join(); p2.join()
print(f"Array[100]: {shared_arr[100]}") # 10,000Manager:複雑な共有オブジェクト
from multiprocessing import Process, Manager
def update_dict(shared_dict, key, value):
shared_dict[key] = value
if __name__ == '__main__':
with Manager() as manager:
# 共有dict、list、namespace
shared_dict = manager.dict()
shared_list = manager.list()
processes = [
Process(target=update_dict, args=(shared_dict, f"key{i}", i*10))
for i in range(5)
]
for p in processes: p.start()
for p in processes: p.join()
print(dict(shared_dict)) # {'key0': 0, 'key1': 10, ...}比較:Multiprocessing vs Threading vs Asyncio
| 機能 | Multiprocessing | Threading | Asyncio | concurrent.futures |
|---|---|---|---|---|
| GILバイパス | はい | いいえ | いいえ | エグゼキュータ次第 |
| CPU束縛タスク | 優秀 | 劣る | 劣る | 優秀(ProcessPoolExecutor) |
| I/O束縛タスク | 良い | 優秀 | 優秀 | 優秀(ThreadPoolExecutor) |
| メモリオーバーヘッド | 高い(別プロセス) | 低い(共有メモリ) | 低い | 変動 |
| 起動コスト | 高い | 低い | 非常に低い | 変動 |
| 通信 | Queue、Pipe、共有メモリ | 直接(共有状態) | ネイティブasync/await | Futures |
| 最適な用途 | CPU集約的並列タスク | I/O束縛タスク、単純な並行性 | 非同期I/O、多数の並行タスク | 両方の統一API |
# CPU束縛にmultiprocessingを使用
from multiprocessing import Pool
def cpu_bound(n):
return sum(i*i for i in range(n))
with Pool(4) as pool:
results = pool.map(cpu_bound, [10_000_000] * 4)
# I/O束縛にthreadingを使用
import threading
import requests
def fetch_url(url):
return requests.get(url).text
threads = [threading.Thread(target=fetch_url, args=(url,)) for url in urls]
for t in threads: t.start()
for t in threads: t.join()
# 非同期I/Oにasyncioを使用
import asyncio
import aiohttp
async def fetch_async(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()
asyncio.run(asyncio.gather(*[fetch_async(url) for url in urls]))高度:ProcessPoolExecutor
concurrent.futures.ProcessPoolExecutorは、ThreadPoolExecutorと同じAPIを持つ高レベルインターフェースを提供します。
from concurrent.futures import ProcessPoolExecutor, as_completed
import time
def process_task(x):
time.sleep(0.1)
return x * x
if __name__ == '__main__':
# コンテキストマネージャがクリーンアップを保証
with ProcessPoolExecutor(max_workers=4) as executor:
# 個別のタスクを送信
futures = [executor.submit(process_task, i) for i in range(20)]
# 完了次第処理
for future in as_completed(futures):
result = future.result()
print(f"結果: {result}")
# またはmapを使用(Pool.mapのように)
results = executor.map(process_task, range(20))
print(list(results))Poolに対する利点:
ThreadPoolExecutorとProcessPoolExecutorで同じAPI- より細かい制御のためのFuturesインターフェース
- より良いエラーハンドリング
- 同期と非同期コードの混在が容易
一般的なパターン
恥ずかしいほど並列なタスク
依存関係のないタスクはmultiprocessingに最適です:
from multiprocessing import Pool
import pandas as pd
def process_chunk(chunk):
"""データチャンクを独立して処理"""
chunk['new_col'] = chunk['value'] * 2
return chunk.groupby('category').sum()
if __name__ == '__main__':
df = pd.DataFrame({'category': ['A', 'B'] * 5000, 'value': range(10000)})
# チャンクに分割
chunks = [df.iloc[i:i+2500] for i in range(0, len(df), 2500)]
with Pool(4) as pool:
results = pool.map(process_chunk, chunks)
# 結果を結合
final = pd.concat(results).groupby('category').sum()Map-Reduceパターン
from multiprocessing import Pool
from functools import reduce
def mapper(text):
"""Map: 単語を抽出してカウント"""
words = text.lower().split()
return {word: 1 for word in words}
def reducer(dict1, dict2):
"""Reduce: 単語カウントをマージ"""
for word, count in dict2.items():
dict1[word] = dict1.get(word, 0) + count
return dict1
if __name__ == '__main__':
documents = ["こんにちは 世界", "pythonの世界", "こんにちは python"] * 1000
with Pool(4) as pool:
# Mapフェーズ: 並列
word_dicts = pool.map(mapper, documents)
# Reduceフェーズ: 逐次(またはツリー削減を使用)
word_counts = reduce(reducer, word_dicts)
print(word_counts)複数プロデューサーによるProducer-Consumer
from multiprocessing import Process, Queue, cpu_count
def producer(queue, producer_id, items):
for item in items:
queue.put((producer_id, item))
print(f"プロデューサー {producer_id} 完了")
def consumer(queue, num_producers):
finished_producers = 0
while finished_producers < num_producers:
if not queue.empty():
item = queue.get()
if item is None:
finished_producers += 1
else:
producer_id, data = item
print(f"プロデューサー {producer_id} から消費: {data}")
if __name__ == '__main__':
q = Queue()
num_producers = 3
# プロデューサーを開始
producers = [
Process(target=producer, args=(q, i, range(i*10, (i+1)*10)))
for i in range(num_producers)
]
for p in producers: p.start()
# コンシューマーを開始
cons = Process(target=consumer, args=(q, num_producers))
cons.start()
# クリーンアップ
for p in producers: p.join()
for _ in range(num_producers):
q.put(None) # コンシューマーに通知
cons.join()パフォーマンスに関する考慮事項
Multiprocessingが役立つ場合
- CPU束縛タスク:データ処理、数学計算、画像処理
- 大規模データセット:アイテムごとの処理時間がプロセスオーバーヘッドを正当化する場合
- 独立したタスク:共有状態がないか、最小限の通信
Multiprocessingが不利な場合
プロセス作成のオーバーヘッドが利益を上回る場合:
from multiprocessing import Pool
import time
def tiny_task(x):
return x + 1
if __name__ == '__main__':
data = range(100)
# 小さなタスクには逐次の方が速い
start = time.time()
results = [tiny_task(x) for x in data]
print(f"逐次: {time.time() - start:.4f}s") # ~0.0001s
start = time.time()
with Pool(4) as pool:
results = pool.map(tiny_task, data)
print(f"並列: {time.time() - start:.4f}s") # ~0.05s(500倍遅い!)経験則:
- 最小タスク期間:アイテムごとに約0.1秒
- データサイズ:データのpickle化が処理よりも時間がかかる場合は共有メモリを使用
- ワーカー数:
cpu_count()から始め、タスク特性に基づいて調整
Pickle化の要件
プロセス間で渡せるのはpickle可能なオブジェクトのみです:
from multiprocessing import Pool
# ❌ ラムダ関数はpickle不可
# pool.map(lambda x: x*2, range(10)) # 失敗
# ✅ 名前付き関数を使用
def double(x):
return x * 2
with Pool(4) as pool:
pool.map(double, range(10))
# ❌ ノートブック内のローカル関数は失敗
# def process():
# def inner(x): return x*2
# pool.map(inner, range(10)) # 失敗
# ✅ モジュールレベルで定義するか、functools.partialを使用
from functools import partial
def multiply(x, factor):
return x * factor
with Pool(4) as pool:
pool.map(partial(multiply, factor=3), range(10))RunCellで並列コードをデバッグ
multiprocessingコードのデバッグは非常に困難です。print文は消え、ブレークポイントは機能せず、スタックトレースは暗号的です。プロセスが静かにクラッシュしたり、誤った結果を生成したりすると、従来のデバッグツールは失敗します。
Poolワーカーがトレースバックなしでクラッシュした場合、RunCellはエラーキューを検査し、どの関数呼び出しが失敗したのか、その理由を正確に示すことができます。共有状態が誤った結果を生成する場合、RunCellはメモリアクセスパターンをトレースして欠落しているロックを見つけます。複雑な並列データパイプラインをデバッグするデータサイエンティストにとって、RunCellは何時間ものprint文デバッグを、AIガイドによる修正の数分に変えます。
ベストプラクティス
1. 常にif __name__ガードを使用
# ✅ 正しい
if __name__ == '__main__':
with Pool(4) as pool:
pool.map(func, data)
# ❌ 間違い - Windowsでフォーク爆弾を引き起こす
with Pool(4) as pool:
pool.map(func, data)2. Poolを明示的に閉じる
# ✅ コンテキストマネージャ(推奨)
with Pool(4) as pool:
results = pool.map(func, data)
# ✅ 明示的なクローズとjoin
pool = Pool(4)
results = pool.map(func, data)
pool.close()
pool.join()
# ❌ リソースリーク
pool = Pool(4)
results = pool.map(func, data)3. 例外を処理
from multiprocessing import Pool
def risky_task(x):
if x == 5:
raise ValueError("不正な値")
return x * 2
if __name__ == '__main__':
with Pool(4) as pool:
try:
results = pool.map(risky_task, range(10))
except ValueError as e:
print(f"タスク失敗: {e}")
# またはapply_asyncで個別に処理
async_results = [pool.apply_async(risky_task, (i,)) for i in range(10)]
for i, ar in enumerate(async_results):
try:
result = ar.get()
print(f"結果 {i}: {result}")
except ValueError:
print(f"タスク {i} 失敗")4. 可能な限り共有状態を避ける
# ❌ 共有状態には同期が必要
from multiprocessing import Process, Value
counter = Value('i', 0)
def increment():
for _ in range(100000):
counter.value += 1 # 競合状態!
# ✅ ロックを使用するか共有を避ける
from multiprocessing import Lock
lock = Lock()
def increment_safe():
for _ in range(100000):
with lock:
counter.value += 1
# ✅ さらに良い:共有状態を避ける
def count_locally(n):
return n # 代わりに結果を返す
with Pool(4) as pool:
results = pool.map(count_locally, [100000] * 4)
total = sum(results)5. 適切なワーカー数を選択
from multiprocessing import cpu_count, Pool
# CPU束縛:すべてのコアを使用
num_workers = cpu_count()
# I/O束縛:より多くのワーカーを使用可能
num_workers = cpu_count() * 2
# 混合ワークロード:経験的に調整
with Pool(processes=num_workers) as pool:
results = pool.map(func, data)よくある間違い
1. if __name__ガードを忘れる
Windows/macOSで無限プロセス生成につながります。
2. pickle化できないオブジェクトをpickle化しようとする
# ❌ クラスメソッド、ラムダ、ローカル関数は失敗
class DataProcessor:
def process(self, x):
return x * 2
dp = DataProcessor()
# pool.map(dp.process, data) # 失敗
# ✅ トップレベル関数を使用
def process(x):
return x * 2
with Pool(4) as pool:
pool.map(process, data)3. プロセス終了を処理しない
# ❌ 適切にクリーンアップしない
pool = Pool(4)
results = pool.map(func, data)
# プールがまだ実行中
# ✅ 常にクローズとjoin
pool = Pool(4)
try:
results = pool.map(func, data)
finally:
pool.close()
pool.join()4. 過度なデータ転送
# ❌ 巨大なオブジェクトのpickle化は遅い
large_data = [np.random.rand(1000, 1000) for _ in range(100)]
with Pool(4) as pool:
pool.map(process_array, large_data) # 遅いシリアライゼーション
# ✅ 共有メモリまたはメモリマップファイルを使用
import numpy as np
from multiprocessing import shared_memory
# 共有メモリを作成
shm = shared_memory.SharedMemory(create=True, size=1000*1000*8)
arr = np.ndarray((1000, 1000), dtype=np.float64, buffer=shm.buf)
# 名前と形状のみを渡す
def process_shared(name, shape):
existing_shm = shared_memory.SharedMemory(name=name)
arr = np.ndarray(shape, dtype=np.float64, buffer=existing_shm.buf)
# arrを処理...
existing_shm.close()
with Pool(4) as pool:
pool.starmap(process_shared, [(shm.name, (1000, 1000))] * 4)
shm.close()
shm.unlink()FAQ
multiprocessingはどのようにしてGILをバイパスしますか?
GIL(Global Interpreter Lock)は各Pythonインタープリタ内のmutexで、複数のスレッドが同時にPythonバイトコードを実行することを防ぎます。Multiprocessingは、それぞれ独自のインタープリタとGILを持つ別々のPythonプロセスを作成することでこれをバイパスします。プロセスはメモリを共有しないため、GILの競合なしにCPUコア全体で真に並列に実行されます。
multiprocessingとthreadingはいつ使い分けるべきですか?
GILがパフォーマンスを制限するCPU束縛タスク(データ処理、計算、画像操作)にはmultiprocessingを使用します。I/O中にGILが解放されるI/O束縛タスク(ネットワークリクエスト、ファイル操作)にはthreadingを使用し、スレッドが並行して動作できるようにします。Threadingはオーバーヘッドが低いですが、GILのためCPU作業を並列化できません。
なぜif name == 'main'ガードが必要ですか?
WindowsとmacOSでは、子プロセスが関数にアクセスするためにメインモジュールをインポートします。ガードがないと、モジュールのインポートがPool作成コードを再度実行し、無限プロセス(フォーク爆弾)を生成します。Linuxはインポートを必要としないfork()を使用しますが、クロスプラットフォームコードのベストプラクティスとしてガードは必要です。
何個のワーカープロセスを使用すべきですか?
CPU束縛タスクの場合、cpu_count()(CPUコア数)から始めます。コア数より多いワーカーはコンテキストスイッチングオーバーヘッドを引き起こします。I/O束縛タスクの場合、プロセスがI/Oを待機するため、より多くのワーカー(2-4倍のコア)を使用できます。メモリとデータ転送オーバーヘッドが最適なワーカー数を制限する可能性があるため、常に特定のワークロードでベンチマークしてください。
multiprocessing関数に渡せるオブジェクトは何ですか?
オブジェクトはpickle可能(pickleでシリアライズ可能)である必要があります。これには組み込み型(int、str、list、dict)、NumPy配列、pandas DataFrame、ほとんどのユーザー定義クラスが含まれます。ラムダ、ローカル関数、クラスメソッド、ファイルハンドル、データベース接続、スレッドロックはpickle化できません。モジュールレベルで関数を定義するか、部分適用にはfunctools.partialを使用してください。
結論
Python multiprocessingは、CPU束縛のボトルネックを、利用可能なコアに応じてスケールする並列操作に変換します。別々のプロセスを通じてGILをバイパスすることで、threadingでは不可能な真の並列処理を実現します。Poolインターフェースは一般的なパターンを簡素化し、Queue、Pipe、共有メモリは複雑なプロセス間ワークフローを可能にします。
恥ずかしいほど並列なタスクにはPool.map()から始め、高速化を測定し、そこから最適化してください。if __name__ == '__main__'ガードを忘れず、プロセスオーバーヘッドを償却するためにタスクを粗粒度に保ち、プロセス間のデータ転送を最小限に抑えてください。デバッグが複雑になった場合、RunCellのようなツールがプロセス境界を越えた実行のトレースに役立ちます。
Multiprocessingが常に答えというわけではありません。I/O束縛作業の場合、threadingやasyncioの方がシンプルで高速かもしれません。しかし、大規模なデータセットを処理したり、モデルをトレーニングしたり、重い計算を実行したりする場合、multiprocessingはマルチコアマシンが構築された目的のパフォーマンスを提供します。