Skip to content

Python 多线程:完整指南与实战示例

Updated on

你的 Python 程序需要发起 50 次 API 调用,而且必须逐个进行。每次调用需要等待 200 毫秒。简单计算一下:你的程序要浪费整整 10 秒钟盯着网络响应发呆。此时 CPU 利用率几乎为零,脚本却在处理本可以同时进行的 I/O 密集型操作时缓慢爬行。

这个问题会迅速恶化。顺序抓取数千个页面的网络爬虫、逐个读写文件的处理脚本、阻塞整个应用的数据库查询。每一秒的闲置等待,都是程序本可以用于执行有用工作的时间。

Python 的 threading 模块通过在单个进程内并发运行多个操作来解决这个问题。线程共享内存空间,启动速度快,特别擅长处理程序大部分时间都在等待的 I/O 密集型工作负载。本指南涵盖从基础线程创建到高级同步模式的所有内容,并提供可立即投入生产的代码示例。

📚

Python 中的多线程是什么?

多线程允许程序在同一个进程内并发运行多个操作。每个线程共享相同的内存空间,使得线程间通信快速且直接。

Python 的 threading 模块提供了创建和管理线程的高级接口。但有一个重要的注意事项:全局解释器锁(GIL)。

全局解释器锁(GIL)

GIL 是 CPython 中的一个互斥锁,它确保任何时候只有一个线程在执行 Python 字节码。这意味着对于 CPU 密集型操作,线程无法实现真正的并行。然而,在 I/O 操作(网络调用、文件读取、数据库查询)期间,GIL 会释放,允许一个线程等待 I/O 时其他线程运行。

import threading
import time
 
def cpu_bound(n):
    """CPU 密集型:GIL 阻止并行执行"""
    total = 0
    for i in range(n):
        total += i * i
    return total
 
def io_bound(url):
    """I/O 密集型:网络等待期间 GIL 会释放"""
    import urllib.request
    return urllib.request.urlopen(url).read()
 
# CPU 密集型:4 个线程逐个运行(无加速效果)
start = time.time()
threads = [threading.Thread(target=cpu_bound, args=(10_000_000,)) for _ in range(4)]
for t in threads: t.start()
for t in threads: t.join()
print(f"CPU 密集型使用线程:{time.time() - start:.2f}s")
 
# I/O 密集型:4 个线程重叠等待时间(大幅加速)

这意味着多线程适用于 I/O 密集型任务,但不适用于 CPU 密集型计算。对于 CPU 密集型工作,应改用 multiprocessing 模块。

何时使用 Threading、Multiprocessing 还是 Asyncio

特性threadingmultiprocessingasyncio
最适合I/O 密集型任务CPU 密集型任务高并发 I/O
并行性并发(受 GIL 限制)真正的并行并发(单线程)
内存共享(轻量级)每个进程独立共享(轻量级)
启动开销低(约 1ms)高(约 50-100ms)极低
通信方式直接内存访问管道、队列、共享内存可等待的协程
可扩展性数十到数百个线程受 CPU 核心数限制数千个协程
复杂度中等(需要加锁)中等(需要序列化)高(async/await 语法)
使用场景网络爬虫、文件 I/O、API 调用数据处理、机器学习训练Web 服务器、聊天应用

经验法则:如果你的程序等待网络或磁盘,使用多线程;如果进行数值计算,使用多进程;如果需要数千个并发连接,使用 asyncio。

线程基础:创建与运行线程

threading.Thread 类

创建线程最简单的方式是将目标函数传递给 threading.Thread

import threading
import time
 
def download_file(filename):
    print(f"[{threading.current_thread().name}] 正在下载 {filename}...")
    time.sleep(2)  # 模拟下载
    print(f"[{threading.current_thread().name}] 完成 {filename}")
 
# 创建线程
t1 = threading.Thread(target=download_file, args=("data.csv",))
t2 = threading.Thread(target=download_file, args=("report.pdf",))
 
# 启动线程
t1.start()
t2.start()
 
# 等待两个线程都完成
t1.join()
t2.join()
 
print("所有下载已完成")

两个下载任务并发运行,大约 2 秒完成,而不是 4 秒。

start() 和 join()

  • start() 开始执行线程。一个线程只能被启动一次。
  • join(timeout=None) 阻塞调用线程,直到目标线程完成。传入 timeout 参数(以秒为单位)可避免无限期等待。
import threading
import time
 
def slow_task():
    time.sleep(10)
 
t = threading.Thread(target=slow_task)
t.start()
 
# 最多等待 3 秒
t.join(timeout=3)
 
if t.is_alive():
    print("3 秒后线程仍在运行")
else:
    print("线程已完成")

线程命名

为线程命名有助于调试:

import threading
 
def worker():
    name = threading.current_thread().name
    print(f"运行在线程中:{name}")
 
t = threading.Thread(target=worker, name="DataProcessor")
t.start()
t.join()

守护线程

守护线程是后台线程,当主程序退出时会自动终止。非守护线程会阻止程序退出,直到它们完成。

import threading
import time
 
def background_monitor():
    while True:
        print("监控系统健康状态...")
        time.sleep(5)
 
# 守护线程:主程序退出时自动终止
monitor = threading.Thread(target=background_monitor, daemon=True)
monitor.start()
 
# 主程序执行工作
time.sleep(12)
print("主程序退出")
# monitor 线程被自动终止

将守护线程用于后台日志记录、监控或清理任务,这些任务不应阻止程序退出。

继承 Thread 类

对于更复杂的线程行为,可以继承 threading.Thread

import threading
import time
 
class FileProcessor(threading.Thread):
    def __init__(self, filepath):
        super().__init__()
        self.filepath = filepath
        self.result = None
 
    def run(self):
        """重写 run() 方法实现线程逻辑"""
        print(f"正在处理 {self.filepath}")
        time.sleep(1)  # 模拟工作
        self.result = f"已处理:{self.filepath}"
 
# 创建并运行
processor = FileProcessor("/data/report.csv")
processor.start()
processor.join()
print(processor.result)

向线程传递参数

使用 args 和 kwargs

使用 args(元组)传递位置参数,使用 kwargs(字典)传递关键字参数:

import threading
 
def fetch_data(url, timeout, retries=3, verbose=False):
    print(f"正在获取 {url}(超时={timeout}秒,重试={retries},详细={verbose})")
 
# 位置参数作为元组
t1 = threading.Thread(target=fetch_data, args=("https://api.example.com", 30))
 
# 关键字参数作为字典
t2 = threading.Thread(
    target=fetch_data,
    args=("https://api.example.com",),
    kwargs={"timeout": 30, "retries": 5, "verbose": True}
)
 
t1.start()
t2.start()
t1.join()
t2.join()

常见错误:忘记单元素元组中的尾随逗号。args=("hello",) 是元组;args=("hello") 只是带括号的字符串。

从线程收集结果

线程不会直接返回值。使用共享数据结构或列表来收集结果:

import threading
 
results = {}
lock = threading.Lock()
 
def compute(task_id, value):
    result = value ** 2
    with lock:
        results[task_id] = result
 
threads = []
for i in range(5):
    t = threading.Thread(target=compute, args=(i, i * 10))
    threads.append(t)
    t.start()
 
for t in threads:
    t.join()
 
print(results)  # {0: 0, 1: 100, 2: 400, 3: 900, 4: 1600}

更简洁的方法是使用 ThreadPoolExecutor(接下来介绍),它可以自动处理结果收集。

ThreadPoolExecutor:现代处理方式

concurrent.futures 模块提供了 ThreadPoolExecutor,这是一个高级接口,用于管理工作者线程池。它自动处理线程创建、结果收集和异常传播。

使用 submit() 的基础用法

from concurrent.futures import ThreadPoolExecutor, as_completed
import time
 
def fetch_url(url):
    time.sleep(1)  # 模拟网络请求
    return f"来自 {url} 的内容"
 
urls = [
    "https://example.com/page1",
    "https://example.com/page2",
    "https://example.com/page3",
    "https://example.com/page4",
    "https://example.com/page5",
]
 
with ThreadPoolExecutor(max_workers=3) as executor:
    # 提交任务并获取 Future 对象
    future_to_url = {executor.submit(fetch_url, url): url for url in urls}
 
    # 按完成顺序处理结果
    for future in as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
            print(f"{url}{data}")
        except Exception as e:
            print(f"{url} 生成异常:{e}")

使用 map() 获取有序结果

executor.map() 返回与输入顺序相同的结果,类似于内置的 map()

from concurrent.futures import ThreadPoolExecutor
 
def process_item(item):
    return item.upper()
 
items = ["apple", "banana", "cherry", "date"]
 
with ThreadPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(process_item, items))
 
print(results)  # ['APPLE', 'BANANA', 'CHERRY', 'DATE']

submit() 与 map() 的对比

submit()map()
返回Future 对象结果迭代器
结果顺序完成顺序(配合 as_completed输入顺序
错误处理通过 future.result() 逐个处理首次失败时抛出
参数单次函数调用对每项应用函数
最适合异构任务、需要提前获取结果同构批处理

使用 Futures 处理异常

from concurrent.futures import ThreadPoolExecutor, as_completed
 
def risky_task(n):
    if n == 3:
        raise ValueError(f"错误输入:{n}")
    return n * 10
 
with ThreadPoolExecutor(max_workers=4) as executor:
    futures = {executor.submit(risky_task, i): i for i in range(5)}
 
    for future in as_completed(futures):
        task_id = futures[future]
        try:
            result = future.result(timeout=5)
            print(f"任务 {task_id}{result}")
        except ValueError as e:
            print(f"任务 {task_id} 失败:{e}")
        except TimeoutError:
            print(f"任务 {task_id} 超时")

取消任务

from concurrent.futures import ThreadPoolExecutor
import time
 
def long_task(n):
    time.sleep(5)
    return n
 
with ThreadPoolExecutor(max_workers=2) as executor:
    futures = [executor.submit(long_task, i) for i in range(10)]
 
    # 取消待处理任务(已在运行的任务无法取消)
    for f in futures[4:]:
        cancelled = f.cancel()
        print(f"已取消:{cancelled}")

线程同步原语

当多个线程访问共享数据时,需要同步机制来防止竞态条件。

Lock

Lock 确保同一时间只有一个线程进入临界区:

import threading
 
class BankAccount:
    def __init__(self, balance):
        self.balance = balance
        self.lock = threading.Lock()
 
    def withdraw(self, amount):
        with self.lock:  # 同一时间只有一个线程
            if self.balance >= amount:
                self.balance -= amount
                return True
            return False
 
    def deposit(self, amount):
        with self.lock:
            self.balance += amount
 
account = BankAccount(1000)
 
def make_transactions():
    for _ in range(100):
        account.deposit(10)
        account.withdraw(10)
 
threads = [threading.Thread(target=make_transactions) for _ in range(10)]
for t in threads: t.start()
for t in threads: t.join()
 
print(f"最终余额:{account.balance}")  # 始终为 1000

没有锁的话,并发读写会产生错误结果(竞态条件)。

RLock(可重入锁)

RLock 可以被同一线程多次获取。这可以防止当一个持有锁的函数调用另一个也需要相同锁的函数时发生死锁:

import threading
 
class SafeCache:
    def __init__(self):
        self._data = {}
        self._lock = threading.RLock()
 
    def get(self, key):
        with self._lock:
            return self._data.get(key)
 
    def set(self, key, value):
        with self._lock:
            self._data[key] = value
 
    def get_or_set(self, key, default):
        with self._lock:
            # 这里调用 get(),它也会获取 _lock
            # RLock 允许这样做;普通 Lock 会导致死锁
            existing = self.get(key)
            if existing is None:
                self.set(key, default)
                return default
            return existing

Semaphore

Semaphore 允许固定数量的线程同时访问资源:

import threading
import time
 
# 最多允许 3 个并发数据库连接
db_semaphore = threading.Semaphore(3)
 
def query_database(query_id):
    with db_semaphore:
        print(f"查询 {query_id}:已连接(活动连接数:{3 - db_semaphore._value})")
        time.sleep(2)  # 模拟查询
        print(f"查询 {query_id}:完成")
 
threads = [threading.Thread(target=query_database, args=(i,)) for i in range(10)]
for t in threads: t.start()
for t in threads: t.join()

Event

Event 允许一个线程向其他等待的线程发送信号:

import threading
import time
 
data_ready = threading.Event()
shared_data = []
 
def producer():
    print("生产者:准备数据...")
    time.sleep(3)
    shared_data.extend([1, 2, 3, 4, 5])
    print("生产者:数据就绪,通知消费者")
    data_ready.set()
 
def consumer(name):
    print(f"消费者 {name}:等待数据...")
    data_ready.wait()  # 阻塞直到事件被设置
    print(f"消费者 {name}:获取数据 = {shared_data}")
 
threads = [
    threading.Thread(target=producer),
    threading.Thread(target=consumer, args=("A",)),
    threading.Thread(target=consumer, args=("B",)),
]
for t in threads: t.start()
for t in threads: t.join()

Condition

Condition 将锁与等待通知的能力结合在一起。它是生产者-消费者模式的基础:

import threading
import time
import random
 
buffer = []
MAX_SIZE = 5
condition = threading.Condition()
 
def producer():
    for i in range(20):
        with condition:
            while len(buffer) >= MAX_SIZE:
                condition.wait()  # 等待直到有空间
            item = random.randint(1, 100)
            buffer.append(item)
            print(f"生产:{item}(缓冲区大小:{len(buffer)})")
            condition.notify_all()
        time.sleep(0.1)
 
def consumer(name):
    for _ in range(10):
        with condition:
            while len(buffer) == 0:
                condition.wait()  # 等待直到有数据
            item = buffer.pop(0)
            print(f"消费者 {name} 消费:{item}(缓冲区大小:{len(buffer)})")
            condition.notify_all()
        time.sleep(0.15)
 
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer, args=("X",))
t3 = threading.Thread(target=consumer, args=("Y",))
t1.start(); t2.start(); t3.start()
t1.join(); t2.join(); t3.join()

同步原语总结

原语用途何时使用
Lock互斥保护共享可变状态
RLock可重入互斥同一线程内的嵌套锁定
Semaphore限制并发速率限制、连接池
Event一次性信号初始化完成、关闭信号
Condition等待/通知模式生产者-消费者、状态变化
Barrier同步 N 个线程所有线程必须在某点同步后才能继续

线程安全的数据结构

queue.Queue

queue.Queue 是首选的线程安全数据结构。它内部处理了所有锁定:

import threading
import queue
import time
 
task_queue = queue.Queue()
results = queue.Queue()
 
def worker():
    while True:
        item = task_queue.get()  # 阻塞直到有数据
        if item is None:
            break
        result = item ** 2
        results.put(result)
        task_queue.task_done()
 
# 启动 4 个工作者
workers = []
for _ in range(4):
    t = threading.Thread(target=worker, daemon=True)
    t.start()
    workers.append(t)
 
# 提交任务
for i in range(20):
    task_queue.put(i)
 
# 等待所有任务完成
task_queue.join()
 
# 停止工作者
for _ in range(4):
    task_queue.put(None)
for w in workers:
    w.join()
 
# 收集结果
all_results = []
while not results.empty():
    all_results.append(results.get())
print(f"结果:{sorted(all_results)}")

queue.Queue 还支持:

  • Queue(maxsize=10)put() 在队列满时阻塞
  • PriorityQueue():按优先级排序的项目
  • LifoQueue():后进先出(栈行为)

collections.deque

collections.deque 对于 append()popleft() 操作是线程安全的(在 CPython 中 C 层面是原子的),这使其成为简单生产者-消费者模式的快速替代方案:

from collections import deque
import threading
import time
 
buffer = deque(maxlen=1000)
 
def producer():
    for i in range(100):
        buffer.append(i)
        time.sleep(0.01)
 
def consumer():
    consumed = 0
    while consumed < 100:
        if buffer:
            item = buffer.popleft()
            consumed += 1
        else:
            time.sleep(0.01)
    print(f"消费了 {consumed} 个项目")
 
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t1.start(); t2.start()
t1.join(); t2.join()

注意:虽然单独的 appendpopleft 操作是线程安全的,但先检查 len(buffer) 再弹出不是原子操作。为了完全的线程安全,请使用 queue.Queue

常见的多线程模式

生产者-消费者模式

解耦数据生产与数据处理的经典模式:

import threading
import queue
import time
import random
 
def producer(q, name, num_items):
    for i in range(num_items):
        item = f"{name}-item-{i}"
        q.put(item)
        print(f"生产者 {name}:创建 {item}")
        time.sleep(random.uniform(0.05, 0.15))
    print(f"生产者 {name}:完成")
 
def consumer(q, name, stop_event):
    while not stop_event.is_set() or not q.empty():
        try:
            item = q.get(timeout=0.5)
            print(f"消费者 {name}:处理 {item}")
            time.sleep(random.uniform(0.1, 0.2))
            q.task_done()
        except queue.Empty:
            continue
    print(f"消费者 {name}:关闭")
 
task_queue = queue.Queue(maxsize=10)
stop_event = threading.Event()
 
producers = [
    threading.Thread(target=producer, args=(task_queue, "P1", 10)),
    threading.Thread(target=producer, args=(task_queue, "P2", 10)),
]
consumers = [
    threading.Thread(target=consumer, args=(task_queue, "C1", stop_event)),
    threading.Thread(target=consumer, args=(task_queue, "C2", stop_event)),
    threading.Thread(target=consumer, args=(task_queue, "C3", stop_event)),
]
 
for c in consumers: c.start()
for p in producers: p.start()
for p in producers: p.join()
 
task_queue.join()  # 等待所有项目被处理
stop_event.set()   # 通知消费者停止
for c in consumers: c.join()

工作线程池(手动实现)

当你需要比 ThreadPoolExecutor 更多的控制时:

import threading
import queue
 
class WorkerPool:
    def __init__(self, num_workers):
        self.task_queue = queue.Queue()
        self.result_queue = queue.Queue()
        self.workers = []
 
        for _ in range(num_workers):
            t = threading.Thread(target=self._worker, daemon=True)
            t.start()
            self.workers.append(t)
 
    def _worker(self):
        while True:
            func, args, kwargs, future_id = self.task_queue.get()
            if func is None:
                break
            try:
                result = func(*args, **kwargs)
                self.result_queue.put((future_id, result, None))
            except Exception as e:
                self.result_queue.put((future_id, None, e))
            finally:
                self.task_queue.task_done()
 
    def submit(self, func, *args, **kwargs):
        future_id = id(func)  # 简单 ID
        self.task_queue.put((func, args, kwargs, future_id))
        return future_id
 
    def shutdown(self):
        for _ in self.workers:
            self.task_queue.put((None, None, None, None))
        for w in self.workers:
            w.join()
 
# 使用
pool = WorkerPool(4)
for i in range(10):
    pool.submit(lambda x: x * x, i)
pool.task_queue.join()
pool.shutdown()

速率限制线程池

控制线程发起外部请求的速度:

import threading
import time
from concurrent.futures import ThreadPoolExecutor
 
class RateLimiter:
    def __init__(self, max_per_second):
        self.interval = 1.0 / max_per_second
        self.lock = threading.Lock()
        self.last_call = 0
 
    def wait(self):
        with self.lock:
            elapsed = time.time() - self.last_call
            wait_time = self.interval - elapsed
            if wait_time > 0:
                time.sleep(wait_time)
            self.last_call = time.time()
 
limiter = RateLimiter(max_per_second=5)
 
def rate_limited_fetch(url):
    limiter.wait()
    print(f"正在获取 {url},时间 {time.time():.2f}")
    time.sleep(0.5)  # 模拟请求
    return f"来自 {url} 的数据"
 
urls = [f"https://api.example.com/item/{i}" for i in range(20)]
 
with ThreadPoolExecutor(max_workers=10) as executor:
    results = list(executor.map(rate_limited_fetch, urls))

线程安全陷阱及避免方法

竞态条件

竞态条件发生在程序结果依赖于线程执行时序的情况下:

import threading
 
# 错误:竞态条件
counter = 0
 
def increment_unsafe():
    global counter
    for _ in range(100_000):
        counter += 1  # 读、增、写:不是原子操作
 
threads = [threading.Thread(target=increment_unsafe) for _ in range(5)]
for t in threads: t.start()
for t in threads: t.join()
print(f"预期:500000,实际:{counter}")  # 通常小于 500000
 
# 正确:使用锁保护
counter = 0
lock = threading.Lock()
 
def increment_safe():
    global counter
    for _ in range(100_000):
        with lock:
            counter += 1
 
threads = [threading.Thread(target=increment_safe) for _ in range(5)]
for t in threads: t.start()
for t in threads: t.join()
print(f"预期:500000,实际:{counter}")  # 始终为 500000

死锁

死锁发生在两个线程各自持有对方需要的锁时:

import threading
 
lock_a = threading.Lock()
lock_b = threading.Lock()
 
def thread_1():
    with lock_a:
        print("线程 1:获取 lock_a")
        with lock_b:  # 如果 thread_2 持有 lock_b 则永远等待
            print("线程 1:获取 lock_b")
 
def thread_2():
    with lock_b:
        print("线程 2:获取 lock_b")
        with lock_a:  # 如果 thread_1 持有 lock_a 则永远等待
            print("线程 2:获取 lock_a")
 
# 这将导致死锁
# t1 = threading.Thread(target=thread_1)
# t2 = threading.Thread(target=thread_2)
# t1.start(); t2.start()

预防死锁的方法

  1. 始终以相同顺序获取锁
def thread_1_fixed():
    with lock_a:    # 始终先 lock_a
        with lock_b:
            print("线程 1:获取两个锁")
 
def thread_2_fixed():
    with lock_a:    # 始终先 lock_a(相同顺序)
        with lock_b:
            print("线程 2:获取两个锁")
  1. 使用超时
def safe_acquire():
    acquired_a = lock_a.acquire(timeout=2)
    if not acquired_a:
        print("无法获取 lock_a,回退")
        return
    try:
        acquired_b = lock_b.acquire(timeout=2)
        if not acquired_b:
            print("无法获取 lock_b,释放 lock_a")
            return
        try:
            print("安全获取两个锁")
        finally:
            lock_b.release()
    finally:
        lock_a.release()
  1. 最小化锁的作用域:持有锁的时间尽可能短。

线程安全检查清单

  • 使用锁保护所有共享可变状态
  • 尽可能使用 queue.Queue 代替共享列表或字典
  • 避免全局可变状态;通过函数参数传递数据
  • 使用 ThreadPoolExecutor 代替手动线程管理
  • 永远不要假设线程间的操作顺序
  • 使用 threading.active_count() 和日志记录来检测线程泄漏

实战案例

并发网络爬虫

from concurrent.futures import ThreadPoolExecutor, as_completed
import urllib.request
import time
 
def fetch_page(url):
    """获取网页并返回内容长度"""
    try:
        with urllib.request.urlopen(url, timeout=10) as response:
            content = response.read()
            return url, len(content), None
    except Exception as e:
        return url, 0, str(e)
 
urls = [
    "https://python.org",
    "https://docs.python.org",
    "https://pypi.org",
    "https://realpython.com",
    "https://github.com",
    "https://stackoverflow.com",
    "https://news.ycombinator.com",
    "https://httpbin.org",
]
 
# 顺序执行
start = time.time()
for url in urls:
    fetch_page(url)
sequential_time = time.time() - start
 
# 使用多线程并发
start = time.time()
with ThreadPoolExecutor(max_workers=8) as executor:
    futures = {executor.submit(fetch_page, url): url for url in urls}
    for future in as_completed(futures):
        url, size, error = future.result()
        if error:
            print(f"  失败 {url}{error}")
        else:
            print(f"  成功 {url}{size:,} 字节")
threaded_time = time.time() - start
 
print(f"\n顺序执行:{sequential_time:.2f}s")
print(f"多线程:  {threaded_time:.2f}s")
print(f"加速比:  {sequential_time / threaded_time:.1f}x")

并行文件 I/O

from concurrent.futures import ThreadPoolExecutor
import os
import hashlib
 
def process_file(filepath):
    """读取文件并计算其 SHA-256 哈希值"""
    with open(filepath, 'rb') as f:
        content = f.read()
    file_hash = hashlib.sha256(content).hexdigest()
    size = os.path.getsize(filepath)
    return filepath, file_hash, size
 
def hash_all_files(directory, pattern="*.py"):
    """使用多线程哈希目录中所有匹配的文件"""
    import glob
    files = glob.glob(os.path.join(directory, "**", pattern), recursive=True)
 
    results = {}
    with ThreadPoolExecutor(max_workers=8) as executor:
        futures = {executor.submit(process_file, f): f for f in files}
        for future in futures:
            try:
                path, hash_val, size = future.result()
                results[path] = {"hash": hash_val, "size": size}
            except Exception as e:
                print(f"处理 {futures[future]} 时出错:{e}")
 
    return results
 
# 使用
# file_hashes = hash_all_files("/path/to/project")

带重试逻辑的并发 API 调用

from concurrent.futures import ThreadPoolExecutor, as_completed
import urllib.request
import json
import time
 
def fetch_api(endpoint, max_retries=3, backoff=1.0):
    """获取 API 端点,带指数退避重试"""
    for attempt in range(max_retries):
        try:
            url = f"https://jsonplaceholder.typicode.com{endpoint}"
            req = urllib.request.Request(url)
            with urllib.request.urlopen(req, timeout=10) as response:
                data = json.loads(response.read())
                return {"endpoint": endpoint, "data": data, "error": None}
        except Exception as e:
            if attempt < max_retries - 1:
                wait = backoff * (2 ** attempt)
                time.sleep(wait)
            else:
                return {"endpoint": endpoint, "data": None, "error": str(e)}
 
endpoints = [f"/posts/{i}" for i in range(1, 21)]
 
start = time.time()
with ThreadPoolExecutor(max_workers=10) as executor:
    futures = [executor.submit(fetch_api, ep) for ep in endpoints]
    results = [f.result() for f in futures]
 
elapsed = time.time() - start
success = sum(1 for r in results if r["error"] is None)
print(f"在 {elapsed:.2f} 秒内获取了 {success}/{len(endpoints)} 个端点")

周期性后台任务

import threading
import time
 
class PeriodicTask:
    """在后台线程中以固定间隔运行函数"""
    def __init__(self, interval, func, *args, **kwargs):
        self.interval = interval
        self.func = func
        self.args = args
        self.kwargs = kwargs
        self._stop_event = threading.Event()
        self._thread = None
 
    def start(self):
        self._thread = threading.Thread(target=self._run, daemon=True)
        self._thread.start()
 
    def _run(self):
        while not self._stop_event.is_set():
            self.func(*self.args, **self.kwargs)
            self._stop_event.wait(self.interval)
 
    def stop(self):
        self._stop_event.set()
        if self._thread:
            self._thread.join()
 
# 使用
def check_health():
    print(f"健康检查时间 {time.strftime('%H:%M:%S')}")
 
task = PeriodicTask(2.0, check_health)
task.start()
time.sleep(7)
task.stop()
print("已停止")

性能对比:Threading vs Multiprocessing vs Asyncio

合适的并发工具取决于工作负载。以下是常见任务的实际耗时对比:

任务顺序执行Threading (4)Multiprocessing (4)Asyncio
100 次 HTTP 请求(每次 200ms)20.0s5.1s5.8s4.9s
100 次文件读取(每次 10ms)1.0s0.28s0.35s0.26s
100 次 CPU 任务(每次 100ms)10.0s10.2s2.7s10.0s
50 次数据库查询(每次 50ms)2.5s0.68s0.85s0.62s
I/O + CPU 混合任务15.0s8.2s4.1s9.5s

关键结论

  • Threading 在 I/O 密集型工作负载上提供 3-5 倍加速,代码改动最小
  • Multiprocessing 是真正的 CPU 并行唯一选择,但增加了进程开销
  • Asyncio 在高并发 I/O 上略胜 Threading,但需要用 async/await 重写代码
  • 对于混合工作负载,考虑结合使用 Threading 处理 I/O 和 Multiprocessing 处理 CPU 任务
import time
import threading
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
 
def io_task():
    time.sleep(0.2)
 
def cpu_task(n=2_000_000):
    return sum(i * i for i in range(n))
 
# 对比 Threading 和 Multiprocessing 的基准测试
NUM_TASKS = 20
 
# Threading - I/O 密集型
start = time.time()
with ThreadPoolExecutor(max_workers=4) as pool:
    list(pool.map(lambda _: io_task(), range(NUM_TASKS)))
print(f"Threading (I/O):{time.time() - start:.2f}s")
 
# Threading - CPU 密集型
start = time.time()
with ThreadPoolExecutor(max_workers=4) as pool:
    list(pool.map(lambda _: cpu_task(), range(NUM_TASKS)))
print(f"Threading (CPU):{time.time() - start:.2f}s")

在 RunCell 中试验多线程

调试和分析多线程代码可能具有挑战性。当你需要测试线程同步、可视化时间重叠或交互式诊断竞态条件时,RunCell (www.runcell.dev (opens in a new tab)) 提供了专为这种工作流程设计的 AI 驱动 Jupyter 环境。

RunCell 的 AI 代理可以分析你的多线程代码,在死锁发生前识别潜在风险,根据你的工作负载建议最佳工作线程数,并帮助你理解线程异常行为的原因。当线程池间歇性产生错误结果时,RunCell 会追踪执行时间线,精确定位共享状态被破坏的确切时刻。

如果你想可视化不同多线程配置的性能特征,PyGWalker (github.com/Kanaries/pygwalker) 可以将你的基准测试 DataFrame 转换为交互式图表。运行多线程基准测试,将计时数据收集到 pandas DataFrame 中,然后通过拖放可视化探索结果,找到适合你工作负载的最佳线程数。

常见问题

Python 中 Threading 和 Multiprocessing 有什么区别?

Threading 在单个进程内运行多个线程,共享内存。全局解释器锁(GIL)阻止线程并行执行 Python 字节码,使得 Threading 仅对网络请求和文件操作等 I/O 密集型任务有效。Multiprocessing 创建独立进程,每个进程有自己的 Python 解释器和内存空间,支持 CPU 密集型任务的真正并行执行。Threading 开销更低(启动更快、内存更少),而 Multiprocessing 通过绕过 GIL 实现真正的并行。

Python 多线程是真正的并行吗?

不是,由于 GIL 的存在,Python 多线程对于 CPU 密集型代码是并发而非并行的。任何时刻只有一个线程在执行 Python 字节码。然而,在 I/O 操作(网络、磁盘、数据库)期间,GIL 会释放,因此多个线程在等待 I/O 时可以有效并行运行。对于 CPU 密集型并行,请使用 multiprocessing 模块或释放 GIL 的 C 扩展(如 NumPy)。

Python 中应该使用多少个线程?

对于 I/O 密集型任务,根据外部服务的速率限制和网络带宽,从 5-20 个线程开始。对单个服务器发起过多线程可能导致连接被拒绝或限速。对于混合工作负载,尝试在 CPU 核心数到 4 倍核心数之间的线程数。使用 ThreadPoolExecutor 并通过不同的 max_workers 值进行基准测试,找到适合你特定工作负载的最佳数量。ThreadPoolExecutor 的默认值是 min(32, os.cpu_count() + 4)

如何从 Python 线程返回值?

线程不会直接从目标函数返回值。三种主要方法是:(1) 使用 ThreadPoolExecutor.submit(),它返回一个 Future 对象,你可以调用 future.result() 获取返回值。(2) 传入可变容器(如字典或列表)作为参数,让线程将结果写入其中,使用 Lock 保护。(3) 使用 queue.Queue,线程将结果放入队列,主线程从队列读取。对于大多数用例,ThreadPoolExecutor 是最简洁的方法。

如果 Python 线程抛出异常会发生什么?

在原始的 threading.Thread 中,未处理的异常会静默终止该线程,异常信息会丢失。主线程和其他线程继续运行而没有任何通知。使用 ThreadPoolExecutor 时,异常会被捕获并在调用 future.result() 时重新抛出,使错误处理更加可靠。始终在线程目标函数内部使用 try/except 块,或使用 ThreadPoolExecutor 确保异常被正确捕获和处理。

总结

Python 多线程是加速 I/O 密集型程序的强大工具。通过并发运行网络请求、文件操作和数据库查询,你可以将原本需要 20 秒的顺序执行脚本优化为 5 秒完成,而且代码改动极小。

需要记住的关键点:

  • 将多线程用于 I/O 密集型工作。GIL 阻止 CPU 并行,但线程能有效重叠 I/O 等待时间。
  • 使用 ThreadPoolExecutor 满足大多数多线程需求。它管理线程、收集结果并干净地传播异常。
  • 使用锁保护共享状态。竞态条件是最常见的多线程 bug,而 queue.Queue 消除了大多数加锁顾虑。
  • 避免死锁,通过以一致的顺序获取锁并使用超时。
  • 选择合适的工具:I/O 用 Threading,CPU 用 Multiprocessing,数千个并发连接用 Asyncio。

ThreadPoolExecutor 和简单的 executor.map() 调用开始。测量加速效果。仅在共享可变状态需要的地方添加同步。多线程不需要完全重写代码。几行 concurrent.futures 的代码就能为任何花费时间等待的程序带来显著的性能提升。

📚