Pythonジェネレータ完全ガイド:yield、ジェネレータ式、遅延評価
Updated on
10GBのログファイルを処理したり、データベースレコードを何百万件もストリーミングしたりすると、Pythonアプリケーションは簡単に限界に達します。従来どおりデータを一括でメモリに読み込む方法は、パフォーマンスのボトルネック、メモリエラー、そして不満を抱えるユーザーにつながりがちです。そこで重要になるのがPythonのジェネレータです。ジェネレータは、すべてを事前に保存するのではなく、必要になったタイミングで値を生成することで、最小限のメモリ使用量で巨大なデータセットを処理できるようにします。
Pythonジェネレータとは?なぜ重要なのか
ジェネレータは、すべての値を一度に計算して返すのではなく、時間の経過とともに値のシーケンスを生成する特別な関数です。returnで単一の結果を返して終了する通常の関数と異なり、ジェネレータはyieldキーワードを使って値を順次生成し、各値の間で実行を一時停止し、次の値が要求されたときに再開します。
ジェネレータの根本的な利点は 遅延評価(lazy evaluation) にあります。値は必要になったときにだけ生成されます。これにより、次の2つの重要なメリットが得られます。
- メモリ効率:ジェネレータはシーケンス全体をメモリに保持しません。10個の数を生成するジェネレータも、10億個の数を生成するジェネレータも、消費するメモリ量は同程度です。
- パフォーマンス:データセット全体の準備を待たずに、最初に
yieldされた値からすぐ処理を開始できます。
以下は違いが分かる簡単な比較です。
# Traditional approach - loads entire list into memory
def get_squares_list(n):
result = []
for i in range(n):
result.append(i * i)
return result
# Generator approach - produces values one at a time
def get_squares_generator(n):
for i in range(n):
yield i * i
# Memory impact comparison
import sys
# List approach
squares_list = get_squares_list(1000000)
print(f"List memory: {sys.getsizeof(squares_list):,} bytes") # ~8,000,000 bytes
# Generator approach
squares_gen = get_squares_generator(1000000)
print(f"Generator memory: {sys.getsizeof(squares_gen):,} bytes") # ~112 bytesメモリ差は圧倒的で、この例ではジェネレータはリストより99.999%少ないメモリしか使いません。データセットが大きくなるほど、この差はさらに劇的になります。
yieldキーワード:ジェネレータ関数の中核
yieldキーワードこそが、通常の関数をジェネレータ関数へと変換する要素です。Pythonがyieldに遭遇すると、その関数をすぐ実行するのではなく、ジェネレータオブジェクトを返すべきだと判断します。
def countdown(n):
print(f"Starting countdown from {n}")
while n > 0:
yield n
n -= 1
print("Countdown complete!")
# Creating the generator doesn't execute the function
gen = countdown(3)
print(type(gen)) # <class 'generator'>
# Values are produced on-demand
print(next(gen)) # Starting countdown from 3 -> 3
print(next(gen)) # 2
print(next(gen)) # 1
# next(gen) # Countdown complete! -> Raises StopIteration理解すべき主な挙動は次のとおりです。
yieldのたびに実行が一時停止し、次回呼び出しでその地点から再開する- ローカル変数は
yieldをまたいでも状態が保持される - ジェネレータ関数が終了(値を出し尽くす)するとStopIteration例外が発生する
1つのジェネレータ内に複数のyieldを書けます。
def data_pipeline():
# Phase 1: Loading
yield "Loading data..."
# Phase 2: Processing
yield "Processing records..."
# Phase 3: Validation
yield "Validating results..."
# Phase 4: Complete
yield "Pipeline complete!"
for status in data_pipeline():
print(status)ジェネレータプロトコル:iter() と next() を理解する
ジェネレータは、次の2つの特殊メソッドを通じてイテレータプロトコルを実装しています。
__iter__():イテレータオブジェクト自身(ジェネレータ)を返す__next__():ジェネレータから次の値を返す
このためジェネレータはforループや、その他の反復処理コンテキストで扱うのに最適です。このプロトコルを理解すると、ジェネレータが内部でどう動くのかが明確になります。
def simple_gen():
yield 1
yield 2
yield 3
gen = simple_gen()
# These are equivalent
print(gen.__next__()) # 1
print(next(gen)) # 2
# for loops call __next__() automatically until StopIteration
for value in simple_gen():
print(value) # 1, 2, 3イテレータプロトコルを手動実装して、ジェネレータに似た挙動を作ることもできます。
class CountDown:
def __init__(self, start):
self.current = start
def __iter__(self):
return self
def __next__(self):
if self.current <= 0:
raise StopIteration
self.current -= 1
return self.current + 1
# Behaves like a generator
for num in CountDown(3):
print(num) # 3, 2, 1ただし、手動のイテレータクラスよりも、ジェネレータ関数のほうがはるかに簡潔で読みやすいのが一般的です。
ジェネレータ式 vs リスト内包表記
ジェネレータ式は、リスト内包表記とよく似た簡潔な構文でジェネレータを作れます。ただし、角括弧[]ではなく丸括弧()を使います。
# List comprehension - creates entire list in memory
squares_list = [x * x for x in range(10)]
print(type(squares_list)) # <class 'list'>
print(squares_list) # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
# Generator expression - creates generator object
squares_gen = (x * x for x in range(10))
print(type(squares_gen)) # <class 'generator'>
print(squares_gen) # <generator object at 0x...>
# Consume the generator
print(list(squares_gen)) # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]構文の比較:
| 特徴 | リスト内包表記 | ジェネレータ式 |
|---|---|---|
| 構文 | [expr for item in iterable] | (expr for item in iterable) |
| 返り値 | listオブジェクト | generatorオブジェクト |
| メモリ | すべての値を保持 | 必要に応じて生成 |
| 速度 | 小規模データでは速い | 大規模データでは速い |
| 再利用 | 可(何度も反復可能) | 不可(1回で使い切り) |
メモリ差が分かる実用例:
import sys
# List comprehension for 1 million numbers
list_comp = [x for x in range(1000000)]
print(f"List comprehension: {sys.getsizeof(list_comp):,} bytes")
# Generator expression for the same range
gen_exp = (x for x in range(1000000))
print(f"Generator expression: {sys.getsizeof(gen_exp):,} bytes")
# Output:
# List comprehension: 8,000,056 bytes
# Generator expression: 112 bytesジェネレータ式は、値を1回だけ順に処理できれば十分で、メモリ使用量を最小化したいときに最適です。
yield from:サブジェネレータへの委譲
yield fromは、サブジェネレータや他のiterableへの委譲を簡単にします。手動でループして値をyieldする代わりに、yield fromがそれを自動的に行います。
# Without yield from
def get_numbers_manual():
for i in range(3):
yield i
for i in range(10, 13):
yield i
# With yield from
def get_numbers_delegated():
yield from range(3)
yield from range(10, 13)
print(list(get_numbers_manual())) # [0, 1, 2, 10, 11, 12]
print(list(get_numbers_delegated())) # [0, 1, 2, 10, 11, 12]特にネスト構造のフラット化に便利です。
def flatten(nested_list):
for item in nested_list:
if isinstance(item, list):
yield from flatten(item) # Recursive delegation
else:
yield item
nested = [1, [2, 3, [4, 5]], 6, [7, [8, 9]]]
print(list(flatten(nested))) # [1, 2, 3, 4, 5, 6, 7, 8, 9]またyield fromは、サブジェネレータの例外やreturn値も適切に扱えるため、複雑なジェネレータパイプラインでは重要な要素になります。
発展:send() と throw() メソッド
ジェネレータは値を生成するだけでなく、send()やthrow()で値を受け取ったり例外を処理したりもできます。これにより、コルーチン風の双方向通信が可能になります。
send()でジェネレータに値を送る
def running_average():
total = 0
count = 0
average = None
while True:
value = yield average # Yield current average, receive new value
total += value
count += 1
average = total / count
# Create generator
avg = running_average()
next(avg) # Prime the generator (advance to first yield)
# Send values and receive running averages
print(avg.send(10)) # 10.0
print(avg.send(20)) # 15.0
print(avg.send(30)) # 20.0
print(avg.send(40)) # 25.0send()は、ジェネレータに値を送り(それがyield式の評価結果になります)、同時に次のyieldまで実行を進めます。
throw()で例外を注入する
def error_handling_gen():
try:
while True:
value = yield
print(f"Received: {value}")
except ValueError as e:
print(f"Caught ValueError: {e}")
yield "Recovered from error"
except GeneratorExit:
print("Generator is closing")
gen = error_handling_gen()
next(gen) # Prime the generator
gen.send(10) # Received: 10
gen.send(20) # Received: 20
result = gen.throw(ValueError, "Invalid value") # Caught ValueError: Invalid value
print(result) # Recovered from error
gen.close() # Generator is closingこれらの発展機能は、状態機械、コルーチン、複雑な非同期パターンの実装に特に役立ちます。
無限ジェネレータ:終わりのないシーケンス
ジェネレータはシーケンス全体を具体化する必要がないため、無限シーケンスの生成に非常に向いています。
# Infinite counter
def count_from(start=0, step=1):
current = start
while True:
yield current
current += step
# Fibonacci sequence
def fibonacci():
a, b = 0, 1
while True:
yield a
a, b = b, a + b
# Cycling through a sequence
def cycle(iterable):
saved = []
for item in iterable:
yield item
saved.append(item)
while saved:
for item in saved:
yield item
# Usage examples
counter = count_from(10, 2)
for _ in range(5):
print(next(counter)) # 10, 12, 14, 16, 18
fib = fibonacci()
print([next(fib) for _ in range(10)]) # [0, 1, 1, 2, 3, 5, 8, 13, 21, 34]
colors = cycle(['red', 'green', 'blue'])
print([next(colors) for _ in range(8)]) # ['red', 'green', 'blue', 'red', 'green', 'blue', 'red', 'green']無限ジェネレータは、イベントストリーム、継続監視、状態を持つ反復処理などで特に有用です。
ジェネレータの連結:データ処理パイプラインを構築する
ジェネレータの非常に強力な使い方として、複数のジェネレータをつないで効率的なデータ処理パイプラインを作るパターンがあります。各ステージが遅延的に処理し、中間結果を保持せずに次へ渡します。
# Stage 1: Read lines from a file (generator)
def read_log_file(filename):
with open(filename, 'r') as f:
for line in f:
yield line.strip()
# Stage 2: Filter lines containing 'ERROR'
def filter_errors(lines):
for line in lines:
if 'ERROR' in line:
yield line
# Stage 3: Extract timestamp and message
def parse_error_lines(lines):
for line in lines:
parts = line.split(' - ')
if len(parts) >= 2:
yield {'timestamp': parts[0], 'message': parts[1]}
# Stage 4: Count errors by hour
def group_by_hour(errors):
from collections import defaultdict
hourly_counts = defaultdict(int)
for error in errors:
hour = error['timestamp'][:13] # Extract hour portion
hourly_counts[hour] += 1
return hourly_counts
# Build pipeline
log_lines = read_log_file('app.log')
error_lines = filter_errors(log_lines)
parsed_errors = parse_error_lines(error_lines)
results = group_by_hour(parsed_errors)
print(results)このパイプラインは、巨大なログファイルでもメモリ使用量を最小化して処理できます。最終集計ステージに到達するまで、基本的にメモリ上にあるのは1行だけです。
データ変換の別例:
# Pipeline: numbers -> square -> filter evens -> sum
def square_numbers(numbers):
for n in numbers:
yield n * n
def filter_even(numbers):
for n in numbers:
if n % 2 == 0:
yield n
# Chain the pipeline
numbers = range(1, 11) # 1-10
squared = square_numbers(numbers)
evens = filter_even(squared)
result = sum(evens) # Only even squares
print(result) # 220 (4 + 16 + 36 + 64 + 100)メモリ比較:ジェネレータ vs リストのベンチマーク
ジェネレータの効果を定量化するため、実運用に近いメモリとパフォーマンスのベンチマークを行いましょう。
import sys
import time
import tracemalloc
def process_with_list(n):
"""Traditional approach using lists"""
tracemalloc.start()
start_time = time.time()
# Create list of squares
squares = [x * x for x in range(n)]
# Filter even squares
even_squares = [x for x in squares if x % 2 == 0]
# Sum results
result = sum(even_squares)
current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
elapsed = time.time() - start_time
return result, peak / 1024 / 1024, elapsed # Convert to MB
def process_with_generator(n):
"""Generator approach"""
tracemalloc.start()
start_time = time.time()
# Generator pipeline
squares = (x * x for x in range(n))
even_squares = (x for x in squares if x % 2 == 0)
result = sum(even_squares)
current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
elapsed = time.time() - start_time
return result, peak / 1024 / 1024, elapsed
# Benchmark with 1 million numbers
n = 1000000
list_result, list_memory, list_time = process_with_list(n)
gen_result, gen_memory, gen_time = process_with_generator(n)
print(f"Results match: {list_result == gen_result}")
print(f"\nList approach:")
print(f" Memory: {list_memory:.2f} MB")
print(f" Time: {list_time:.4f} seconds")
print(f"\nGenerator approach:")
print(f" Memory: {gen_memory:.2f} MB")
print(f" Time: {gen_time:.4f} seconds")
print(f"\nMemory savings: {((list_memory - gen_memory) / list_memory * 100):.1f}%")典型的な出力:
Results match: True
List approach:
Memory: 36.21 MB
Time: 0.0892 seconds
Generator approach:
Memory: 0.12 MB
Time: 0.0624 seconds
Memory savings: 99.7%ジェネレータはメモリ使用量が99.7%少なく、速度も30%改善しています。データが大きくなるほど、この改善はさらに効いてきます。
itertoolsモジュール:ジェネレータ系ユーティリティ
Pythonのitertoolsモジュールは、効率的な反復処理のための強力なジェネレータベースのツール群を提供します。これらはCで実装されており、高度に最適化されています。
主要なitertools関数
import itertools
# chain - concatenate multiple iterables
combined = itertools.chain([1, 2], [3, 4], [5, 6])
print(list(combined)) # [1, 2, 3, 4, 5, 6]
# islice - slice an iterable (like list slicing but for generators)
numbers = itertools.count() # Infinite counter: 0, 1, 2, 3...
first_ten = itertools.islice(numbers, 10)
print(list(first_ten)) # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
# count - infinite counter with start and step
counter = itertools.count(start=10, step=2)
print([next(counter) for _ in range(5)]) # [10, 12, 14, 16, 18]
# cycle - infinite repetition of an iterable
colors = itertools.cycle(['red', 'green', 'blue'])
print([next(colors) for _ in range(7)]) # ['red', 'green', 'blue', 'red', 'green', 'blue', 'red']
# accumulate - cumulative sums or other operations
numbers = [1, 2, 3, 4, 5]
cumulative = itertools.accumulate(numbers)
print(list(cumulative)) # [1, 3, 6, 10, 15]
# accumulate with custom function
import operator
products = itertools.accumulate(numbers, operator.mul)
print(list(products)) # [1, 2, 6, 24, 120]
# groupby - group consecutive elements by key
data = [('A', 1), ('A', 2), ('B', 3), ('B', 4), ('C', 5)]
for key, group in itertools.groupby(data, key=lambda x: x[0]):
print(f"{key}: {list(group)}")
# A: [('A', 1), ('A', 2)]
# B: [('B', 3), ('B', 4)]
# C: [('C', 5)]実用的なitertoolsの組み合わせ
# Paginating results with islice
def paginate(iterable, page_size):
iterator = iter(iterable)
while True:
page = list(itertools.islice(iterator, page_size))
if not page:
break
yield page
# Usage
data = range(25)
for page_num, page in enumerate(paginate(data, 10), 1):
print(f"Page {page_num}: {page}")
# Page 1: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
# Page 2: [10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
# Page 3: [20, 21, 22, 23, 24]
# Windowed iteration (sliding window)
def window(iterable, size):
it = iter(iterable)
win = list(itertools.islice(it, size))
if len(win) == size:
yield tuple(win)
for item in it:
win = win[1:] + [item]
yield tuple(win)
print(list(window([1, 2, 3, 4, 5], 3)))
# [(1, 2, 3), (2, 3, 4), (3, 4, 5)]実世界でのユースケース
大きなファイルを1行ずつ読む
def process_large_csv(filename):
"""Process a multi-GB CSV file efficiently"""
with open(filename, 'r') as f:
# Skip header
next(f)
for line in f:
# Parse and yield record
fields = line.strip().split(',')
yield {
'user_id': fields[0],
'action': fields[1],
'timestamp': fields[2]
}
# Process millions of records with minimal memory
for record in process_large_csv('user_events.csv'):
# Process one record at a time
if record['action'] == 'purchase':
print(f"Purchase by user {record['user_id']}")ストリーミングデータ処理
def stream_api_data(url, batch_size=100):
"""Stream paginated API data without loading all results"""
offset = 0
while True:
response = requests.get(url, params={'offset': offset, 'limit': batch_size})
data = response.json()
if not data:
break
for item in data:
yield item
offset += batch_size
# Process unlimited API results
for item in stream_api_data('https://api.example.com/records'):
process_item(item)DBクエリ結果を反復処理する
def fetch_users_batch(cursor, batch_size=1000):
"""Fetch database records in batches without loading all into memory"""
while True:
results = cursor.fetchmany(batch_size)
if not results:
break
for row in results:
yield row
# Database query
cursor.execute("SELECT * FROM users WHERE active = 1")
# Process millions of users efficiently
for user in fetch_users_batch(cursor):
send_email(user['email'], generate_report(user))ETLパイプライン例
# Extract: Read from source
def extract_from_csv(filename):
with open(filename, 'r') as f:
for line in f:
yield line.strip().split(',')
# Transform: Clean and convert data
def transform_records(records):
for record in records:
yield {
'id': int(record[0]),
'name': record[1].title(),
'email': record[2].lower(),
'age': int(record[3]) if record[3] else None
}
# Load: Write to database
def load_to_database(records, db_connection):
for record in records:
db_connection.execute(
"INSERT INTO users VALUES (?, ?, ?, ?)",
(record['id'], record['name'], record['email'], record['age'])
)
yield record # Pass through for logging
# Build ETL pipeline
raw_data = extract_from_csv('users.csv')
transformed = transform_records(raw_data)
loaded = load_to_database(transformed, db_conn)
# Execute pipeline and count processed records
processed_count = sum(1 for _ in loaded)
print(f"Processed {processed_count} records")ジェネレータのベストプラクティスとよくある落とし穴
ベストプラクティス
-
単純なケースではジェネレータ式を使う
# Simple transformation - use generator expression squares = (x * x for x in range(1000)) # Complex logic - use generator function def complex_processing(data): for item in data: # Multi-step processing result = step1(item) result = step2(result) if validate(result): yield result -
パイプラインのためにジェネレータを連結する
# Each stage processes lazily data = read_source() filtered = filter_stage(data) transformed = transform_stage(filtered) results = aggregate_stage(transformed) -
委譲には
yield fromを使うdef process_all_files(directory): for filename in os.listdir(directory): yield from process_file(filename)
よくある落とし穴
-
ジェネレータは1回の反復で使い切られる
gen = (x for x in range(3)) print(list(gen)) # [0, 1, 2] print(list(gen)) # [] - exhausted! # Solution: Convert to list or recreate generator data = list(gen) # If data fits in memory # OR gen = (x for x in range(3)) # Recreate -
ジェネレータはlen()やインデックスアクセスをサポートしない
gen = (x for x in range(10)) # len(gen) # TypeError # gen[5] # TypeError # Solution: Convert to list if you need these operations items = list(gen) print(len(items)) print(items[5]) -
ジェネレータのスコープやクロージャに注意する
# Wrong - all generators will use final value of i generators = [lambda: i for i in range(3)] print([g() for g in generators]) # [2, 2, 2] # Correct - capture i in default argument generators = [lambda i=i: i for i in range(3)] print([g() for g in generators]) # [0, 1, 2] -
ジェネレータチェーンでの例外処理
def stage1(): for i in range(5): if i == 3: raise ValueError("Error in stage1") yield i def stage2(data): try: for item in data: yield item * 2 except ValueError as e: print(f"Caught: {e}") yield -1 # Error marker # Exception is caught in stage2 for result in stage2(stage1()): print(result)
比較:ジェネレータ vs リスト vs イテレータ vs map/filter
| Feature | Generators | Lists | Iterators | map/filter |
|---|---|---|---|---|
| Memory usage | Minimal (lazy) | Full dataset | Minimal (lazy) | Minimal (lazy) |
| Creation speed | Instant | Depends on size | Instant | Instant |
| Reusable | No | Yes | No | No |
| Indexable | No | Yes | No | No |
| len() support | No | Yes | No | No |
| Modification | Read-only | Mutable | Read-only | Read-only |
| Infinite sequences | Yes | No | Yes | Yes |
| Syntax | yield or () | [] | iter() | map(), filter() |
| Best for | Large datasets, pipelines | Small datasets, random access | Protocol implementation | Functional transformations |
比較例:
# All produce same results but with different characteristics
data = range(1000000)
# Generator - memory efficient, not reusable
gen = (x * 2 for x in data)
# List - memory intensive, reusable, indexable
lst = [x * 2 for x in data]
# map - memory efficient, functional style
mapped = map(lambda x: x * 2, data)
# Iterator - explicit protocol implementation
class Doubler:
def __init__(self, data):
self.data = iter(data)
def __iter__(self):
return self
def __next__(self):
return next(self.data) * 2
iterator = Doubler(data)Jupyterでジェネレータを試す
ジェネレータのパターンやパフォーマンス特性を探索する際、対話的なノートブック環境は学習を加速します。RunCell (opens in a new tab) はAI支援をJupyterノートブックに直接統合し、ジェネレータベースのデータ処理パイプラインを試すデータサイエンティストに適しています。
RunCellを使うと次のことができます。
- ジェネレータ関数を素早く試作し、メモリ特性をテストする
- 実データセットでジェネレータとリストの性能をベンチマークする
- 複雑なジェネレータパイプラインを対話的に構築・デバッグする
- ジェネレータベースETLワークフロー最適化のAI提案を得る
ノートブックでジェネレータを探索する例:
# Cell 1: Define generator pipeline
def read_data():
for i in range(1000000):
yield {'id': i, 'value': i * 2}
def filter_large(records):
for record in records:
if record['value'] > 1000:
yield record
def transform(records):
for record in records:
record['squared'] = record['value'] ** 2
yield record
# Cell 2: Execute pipeline and measure
import time
start = time.time()
pipeline = transform(filter_large(read_data()))
results = list(itertools.islice(pipeline, 100)) # Take first 100
print(f"Time: {time.time() - start:.4f}s")
print(f"Results: {len(results)}")
# Cell 3: Visualize with PyGWalker
import pygwalker as pyg
pyg.walk(results)FAQ
結論
Pythonジェネレータは、先に全部計算して保持する「即時評価」から、必要なときにだけ生成する「遅延評価」へと発想を切り替えるための重要な仕組みです。これにより、数千〜数十億レコード規模のデータセットでも、メモリ効率よく処理できるようになります。yield、ジェネレータ式、イテレータプロトコル、そしてsend()やyield fromのような発展機能を理解すれば、スケールする洗練されたデータ処理パイプラインを構築できます。
覚えておくべきポイント:
- ジェネレータは遅延評価でメモリ使用量を最小化し、リストに比べて99%+節約できることも多い
- 単純な変換にはジェネレータ式、複雑なロジックにはジェネレータ関数を使う
- ジェネレータを連結して、メモリ効率の高いデータ処理パイプラインを作る
itertoolsを活用して、強力なジェネレータベースの反復ユーティリティを使う- 大規模データや1回限りの走査にはジェネレータ、小規模データでランダムアクセスが必要ならリストを選ぶ
巨大ログの処理、APIデータのストリーミング、ETLパイプライン構築など、どの場面でもジェネレータは本番規模のデータ処理に必要な性能とメモリ効率を提供します。これらのパターンを身につければ、どんなサイズのデータセットでも、エレガントかつ効率的に扱えるPythonコードを書けるようになります。