Skip to content

Python heapq:轻松实现优先队列和堆操作

Updated on

你需要按优先级处理项目——任务调度器中的任务、模拟中的事件或图中的最短路径。排序列表似乎是显而易见的选择,但在排序列表中插入的时间复杂度为O(n),当处理数百万个项目时会成为瓶颈。每次添加元素都重新排序整个列表则更糟,为O(n log n)。

成本会迅速累积。每次插入都重新排序队列的任务调度器将大部分时间浪费在管理工作上。每一步都扫描完整列表寻找最小距离的寻路算法将线性时间操作变成了二次时间操作。

Python的heapq模块提供基于堆的优先队列,插入和提取最小元素的时间复杂度都是O(log n)。它还包含优化函数,可以在不对整个集合排序的情况下找到任意可迭代对象中的N个最大或最小元素。本指南通过实际示例介绍heapq的每个函数。

📚

什么是堆?

堆是一棵二叉树,其中每个父节点都小于或等于其子节点(最小堆)。Python的heapq使用普通列表实现最小堆,最小元素始终位于索引0。

关键操作及其时间复杂度:

操作函数时间复杂度
插入元素heapq.heappush(heap, item)O(log n)
弹出最小值heapq.heappop(heap)O(log n)
先插入后弹出heapq.heappushpop(heap, item)O(log n)
先弹出后插入heapq.heapreplace(heap, item)O(log n)
从列表构建堆heapq.heapify(list)O(n)
N个最小元素heapq.nsmallest(n, iterable)O(n log k)
N个最大元素heapq.nlargest(n, iterable)O(n log k)

基本堆操作

创建和使用堆

import heapq
 
# Start with an empty heap (just a regular list)
heap = []
 
# Push items
heapq.heappush(heap, 5)
heapq.heappush(heap, 1)
heapq.heappush(heap, 3)
heapq.heappush(heap, 2)
 
print(heap)        # [1, 2, 3, 5] (heap order, not sorted)
print(heap[0])     # 1 (smallest element, always at index 0)
 
# Pop smallest item
smallest = heapq.heappop(heap)
print(smallest)    # 1
print(heap)        # [2, 5, 3]

将列表转换为堆

import heapq
 
data = [5, 3, 8, 1, 2, 7, 4, 6]
heapq.heapify(data)  # In-place, O(n)
print(data)  # [1, 2, 4, 3, 5, 7, 8, 6] (heap order)
 
# Pop elements in sorted order
sorted_data = []
while data:
    sorted_data.append(heapq.heappop(data))
print(sorted_data)  # [1, 2, 3, 4, 5, 6, 7, 8]

heappushpop和heapreplace

这些组合操作比单独的push/pop调用更高效:

import heapq
 
heap = [1, 3, 5, 7]
heapq.heapify(heap)
 
# heappushpop: push first, then pop smallest
result = heapq.heappushpop(heap, 2)
print(result)  # 1 (pushed 2, popped 1)
print(heap)    # [2, 3, 5, 7]
 
# heapreplace: pop smallest first, then push
result = heapq.heapreplace(heap, 10)
print(result)  # 2 (popped 2, pushed 10)
print(heap)    # [3, 7, 5, 10]

nlargest和nsmallest

这些函数在不排序整个集合的情况下找到N个最大或最小的元素。

import heapq
 
scores = [85, 92, 78, 95, 88, 72, 91, 83, 97, 69]
 
# Top 3 scores
top_3 = heapq.nlargest(3, scores)
print(top_3)  # [97, 95, 92]
 
# Bottom 3 scores
bottom_3 = heapq.nsmallest(3, scores)
print(bottom_3)  # [69, 72, 78]

使用键函数

两者都接受key参数用于自定义比较:

import heapq
 
students = [
    {'name': 'Alice', 'gpa': 3.9},
    {'name': 'Bob', 'gpa': 3.5},
    {'name': 'Charlie', 'gpa': 3.8},
    {'name': 'Diana', 'gpa': 3.95},
    {'name': 'Eve', 'gpa': 3.2},
]
 
top_students = heapq.nlargest(3, students, key=lambda s: s['gpa'])
for s in top_students:
    print(f"{s['name']}: {s['gpa']}")
# Diana: 3.95
# Alice: 3.9
# Charlie: 3.8

何时使用nlargest/nsmallest vs sorted()

场景最佳方法原因
N为1min() / max()O(n),最简单
N相对于总量很小nlargest / nsmallestO(n log k),内存更少
N接近总长度sorted(iterable)[:n]O(n log n),但实际上很快
需要所有项目排序sorted()完全排序是合适的

实现最大堆

Python的heapq只提供最小堆。要创建最大堆,需要取反值:

import heapq
 
# Max-heap using negation
max_heap = []
for val in [5, 1, 3, 8, 2]:
    heapq.heappush(max_heap, -val)
 
# Pop largest items
while max_heap:
    print(-heapq.heappop(max_heap), end=' ')
# 8 5 3 2 1

自定义对象的优先队列

对于无法直接比较的对象,使用元组(priority, counter, item)

import heapq
from dataclasses import dataclass
 
@dataclass
class Task:
    name: str
    description: str
 
# Use (priority, tie_breaker, task) tuples
task_queue = []
counter = 0
 
def add_task(priority, task):
    global counter
    heapq.heappush(task_queue, (priority, counter, task))
    counter += 1
 
def get_next_task():
    if task_queue:
        priority, _, task = heapq.heappop(task_queue)
        return task
    return None
 
add_task(3, Task("Low", "Clean up logs"))
add_task(1, Task("Critical", "Fix production bug"))
add_task(2, Task("Medium", "Update docs"))
add_task(1, Task("Critical", "Patch security hole"))
 
while task_queue:
    task = get_next_task()
    print(f"{task.name}: {task.description}")
 
# Critical: Fix production bug
# Critical: Patch security hole
# Medium: Update docs
# Low: Clean up logs

计数器作为平局决胜,确保相同优先级的项目按插入顺序(FIFO)返回。

实际用例

合并已排序的可迭代对象

import heapq
 
list1 = [1, 4, 7, 10]
list2 = [2, 5, 8, 11]
list3 = [3, 6, 9, 12]
 
merged = list(heapq.merge(list1, list2, list3))
print(merged)  # [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]

heapq.merge()内存效率高——它不会一次性将所有列表加载到内存中。

运行中位数

import heapq
 
class RunningMedian:
    """Track the median of a stream of numbers."""
    def __init__(self):
        self.low = []   # max-heap (negated values)
        self.high = []  # min-heap
 
    def add(self, num):
        heapq.heappush(self.low, -num)
        heapq.heappush(self.high, -heapq.heappop(self.low))
        if len(self.high) > len(self.low):
            heapq.heappush(self.low, -heapq.heappop(self.high))
 
    def median(self):
        if len(self.low) > len(self.high):
            return -self.low[0]
        return (-self.low[0] + self.high[0]) / 2
 
rm = RunningMedian()
for num in [2, 1, 5, 7, 2, 0, 5]:
    rm.add(num)
    print(f"Added {num}, median = {rm.median()}")

Dijkstra最短路径

import heapq
 
def dijkstra(graph, start):
    """Find shortest paths from start to all nodes."""
    distances = {node: float('inf') for node in graph}
    distances[start] = 0
    pq = [(0, start)]
 
    while pq:
        dist, node = heapq.heappop(pq)
        if dist > distances[node]:
            continue
        for neighbor, weight in graph[node]:
            new_dist = dist + weight
            if new_dist < distances[neighbor]:
                distances[neighbor] = new_dist
                heapq.heappush(pq, (new_dist, neighbor))
 
    return distances
 
graph = {
    'A': [('B', 1), ('C', 4)],
    'B': [('C', 2), ('D', 5)],
    'C': [('D', 1)],
    'D': [],
}
 
print(dijkstra(graph, 'A'))
# {'A': 0, 'B': 1, 'C': 3, 'D': 4}

在Jupyter中分析堆性能

在将基于堆的算法与替代方案进行比较或分析优先队列性能时,交互式实验可以加速这一过程。RunCell (opens in a new tab)是一个Jupyter的AI代理,可以帮助你对堆操作进行基准测试、可视化性能特征,并以交互方式优化你的数据结构选择。

FAQ

Python中的heapq是什么?

heapq是一个使用普通Python列表实现最小堆的标准库模块。它提供在O(log n)时间内插入和弹出项目的函数,同时维护堆属性(最小元素始终在索引0)。这是Python中实现优先队列的标准方式。

如何在Python中创建最大堆?

Python的heapq只支持最小堆。要模拟最大堆,在插入时取反值,弹出时再次取反:用heapq.heappush(heap, -value)插入,用-heapq.heappop(heap)获取最大值。对于复杂对象,在元组中取反优先级:(-priority, counter, item)

什么时候应该使用heapq而不是sorted()?

当只需要Top N个元素且N相对于数据总大小较小时,使用heapq.nlargest(n, data)heapq.nsmallest(n, data)。当需要所有元素按顺序排列或N接近总大小时,使用sorted()。当N为1时,使用min()/max()

heapq是线程安全的吗?

不是。单独的heappushheappop调用不是原子操作。如果多个线程访问同一个堆,需要外部同步。对于线程安全的优先队列,请使用queue.PriorityQueue,它用适当的锁包装了heapq

heapq操作的时间复杂度是多少?

heappushheappop都是O(log n)。heapify将列表转换为堆的时间为O(n)。nlargest(k, data)nsmallest(k, data)的运行时间为O(n log k),其中n是数据大小,k是请求的元素数量。

总结

Python的heapq模块是优先队列和Top-N选择的标准工具。其基于堆的方法提供O(log n)的push和pop操作,比维护排序列表快得多。使用heappush/heappop实现优先队列模式,nlargest/nsmallest用于Top-N选择,merge用于合并已排序序列,heapify用于转换现有列表。

📚