Python Generators and Iterators: Memory-Efficient Pipelines
Python generators and iterators are the foundation of memory-efficient data processing. Instead of loading an entire dataset into memory, generators produce values one at a time on demand, making it possible to process files with billions of rows, stream API responses, or build composable data pipelines that use only a few kilobytes of RAM regardless of dataset size. Understanding the iterator protocol and the yield keyword unlocks a completely different way of thinking about data flow in Python.
Table of Contents
The Iterator Protocol
Python's iterator protocol is simple: any object that implements __iter__() (returning self) and __next__() (returning the next value or raising StopIteration) is an iterator. An iterable is any object with __iter__() that returns an iterator — lists, tuples, dicts, and strings are all iterables. Understanding this distinction matters when building custom data sources.
class CountUp:
"""Custom iterator that counts from start to stop."""
def __init__(self, start: int, stop: int):
self.current = start
self.stop = stop
def __iter__(self):
return self # Iterator is its own iterable
def __next__(self):
if self.current >= self.stop:
raise StopIteration
value = self.current
self.current += 1
return value
# Works with for loops, list(), zip(), etc.
for n in CountUp(1, 6):
print(n) # 1 2 3 4 5
# Manually consuming an iterator
counter = CountUp(10, 13)
print(next(counter)) # 10
print(next(counter)) # 11
print(list(counter)) # [12] — exhausted after
# Python's built-in iter() and next()
my_list = [10, 20, 30]
it = iter(my_list)
print(next(it)) # 10
print(next(it, "default")) # 20 (default avoids StopIteration)
__iter__). An iterator maintains state and produces the next value (has both __iter__ and __next__). Generators are iterators — they are also iterables that maintain their own position automatically.
Generator Functions and yield
A generator function uses yield instead of return. Calling a generator function doesn't execute its body — it returns a generator object. Each call to next() on the generator runs the function body until the next yield, suspends execution, and returns the yielded value. The function's local variables and execution position are preserved between calls, making generators inherently stateful and lazy.
import sys
def fibonacci():
"""Infinite Fibonacci sequence — never loads all values into memory."""
a, b = 0, 1
while True:
yield a
a, b = b, a + b
# Take only what you need
fib = fibonacci()
first_10 = [next(fib) for _ in range(10)]
print(first_10) # [0, 1, 1, 2, 3, 5, 8, 13, 21, 34]
def read_large_file(path: str, chunk_size: int = 8192):
"""Read a large file in chunks without loading it all into memory."""
with open(path, "rb") as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
yield chunk
def parse_csv_rows(path: str):
"""Parse a CSV file row by row — constant memory usage."""
import csv
with open(path, newline="") as f:
reader = csv.DictReader(f)
for row in reader:
yield row
# Memory comparison
big_list = list(range(1_000_000)) # ~8 MB
big_gen = (x for x in range(1_000_000)) # ~120 bytes
print(sys.getsizeof(big_list)) # 8448728
print(sys.getsizeof(big_gen)) # 120
Generator Expressions
Generator expressions use the same syntax as list comprehensions but with parentheses instead of brackets. They produce a lazy generator object rather than building a list in memory. They compose naturally with built-in functions like sum(), max(), any(), and all(), which accept any iterable. For single-use transformations over large sequences, generator expressions are almost always preferable to list comprehensions.
import os
# List comprehension — builds entire list in memory
squares_list = [x**2 for x in range(1_000_000)]
# Generator expression — lazy, no memory overhead
squares_gen = (x**2 for x in range(1_000_000))
# sum() accepts any iterable — no list needed
total = sum(x**2 for x in range(1_000_000))
# Filter + transform without intermediate list
evens_doubled = (x * 2 for x in range(100) if x % 2 == 0)
# Composing generator expressions (pipeline)
log_file = "app.log"
# lines = (line.strip() for line in open(log_file))
# errors = (line for line in lines if "ERROR" in line)
# messages = (line.split("|")[2] for line in errors)
# Real-world: sum file sizes across a directory tree
def total_size(root: str) -> int:
return sum(
entry.stat().st_size
for dirpath, _, files in os.walk(root)
for entry in (os.DirEntry(dirpath + "/" + f) for f in files)
)
# Nested generators with any/all
data = [{"score": 85}, {"score": 42}, {"score": 91}]
has_failing = any(d["score"] < 50 for d in data) # True
all_passing = all(d["score"] >= 50 for d in data) # False
yield from and Delegation
yield from delegates to a sub-generator or any iterable, transparently forwarding values, send signals, and exceptions. It makes it easy to compose generators and flatten nested iterables without writing manual loops. This is essential when building recursive generators or chaining multiple data sources into a single stream.
from typing import Iterable, Any
# Flatten arbitrarily nested iterables
def flatten(items: Any) -> Iterable:
"""Recursively flatten nested lists/tuples using yield from."""
if isinstance(items, (list, tuple)):
for item in items:
yield from flatten(item)
else:
yield items
nested = [1, [2, 3, [4, 5]], 6, [7, [8, [9]]]]
print(list(flatten(nested))) # [1, 2, 3, 4, 5, 6, 7, 8, 9]
# Chain multiple generators
def read_all_logs(*paths: str):
"""Yield lines from multiple log files in sequence."""
for path in paths:
yield from open(path)
# yield from vs manual loop (equivalent, but yield from is faster)
def chain_manual(iters):
for it in iters:
for item in it:
yield item
def chain_delegated(iters):
for it in iters:
yield from it # cleaner and ~20% faster
# Flattening a tree structure
class TreeNode:
def __init__(self, val, children=None):
self.val = val
self.children = children or []
def __iter__(self):
yield self.val
for child in self.children:
yield from child # recursively delegate
root = TreeNode(1, [TreeNode(2, [TreeNode(4), TreeNode(5)]), TreeNode(3)])
print(list(root)) # [1, 2, 4, 5, 3]
send() and throw(): Two-Way Communication
Generators support two-way communication: gen.send(value) resumes the generator and makes yield evaluate to the sent value, while gen.throw(exc) raises an exception inside the generator at the point of suspension. This enables coroutine-style programming. Python's asyncio was originally built on top of this mechanism before the async/await syntax was introduced.
def accumulator():
"""Coroutine that accumulates sent values and yields the running total."""
total = 0
while True:
value = yield total # yield sends total out, receives next value
if value is None:
return total
total += value
acc = accumulator()
next(acc) # Prime the coroutine (advance to first yield)
acc.send(10) # total = 10, returns 10
acc.send(20) # total = 30, returns 30
acc.send(5) # total = 35, returns 35
# Using throw() to handle signals
def resource_manager():
print("Acquiring resource")
try:
while True:
command = yield "ready"
print(f"Executing: {command}")
except GeneratorExit:
print("Generator closed — releasing resource")
except ValueError as e:
print(f"Invalid command: {e}")
yield "error"
rm = resource_manager()
print(next(rm)) # "Acquiring resource" / "ready"
print(rm.send("write")) # "Executing: write" / "ready"
rm.throw(ValueError, "bad command") # "Invalid command: bad command"
rm.close() # "Generator closed — releasing resource"
send(value) with a non-None value, you must call next(gen) or gen.send(None) once to advance the generator to the first yield. Use a @coroutine decorator or functools.wraps wrapper to automate this in reusable code.
itertools: The Generator Toolkit
The itertools module provides fast, memory-efficient tools implemented in C. These functions return iterators and compose perfectly with generator functions and expressions. Together, itertools and functools form a complete functional programming toolkit for data transformation without materializing intermediate collections.
import itertools
import operator
# --- Infinite iterators ---
# count(start, step): 0, 1, 2, 3, ...
for i, letter in zip(itertools.count(1), "ABCDE"):
print(f"{i}. {letter}")
# cycle: repeats indefinitely
statuses = itertools.cycle(["active", "idle", "sleep"])
first_9 = list(itertools.islice(statuses, 9))
# repeat: same value N times (or forever)
defaults = list(itertools.repeat(0, 5)) # [0, 0, 0, 0, 0]
# --- Combinatorics ---
print(list(itertools.combinations("ABCD", 2))) # 6 pairs
print(list(itertools.permutations("ABC", 2))) # 6 ordered pairs
print(list(itertools.product([0,1], repeat=3))) # 8 binary tuples
# --- Filtering/slicing ---
data = range(20)
# islice(iter, stop) or islice(iter, start, stop, step)
subset = list(itertools.islice(data, 5, 15, 2)) # [5, 7, 9, 11, 13]
# takewhile / dropwhile
taken = list(itertools.takewhile(lambda x: x < 5, data)) # [0,1,2,3,4]
dropped = list(itertools.dropwhile(lambda x: x < 5, data)) # [5..19]
# --- Grouping ---
inventory = [
("apple", "fruit"), ("banana", "fruit"),
("carrot", "veg"), ("broccoli", "veg"),
("mango", "fruit"),
]
inventory.sort(key=lambda x: x[1]) # must sort before groupby
for category, items in itertools.groupby(inventory, key=lambda x: x[1]):
print(f"{category}: {[i[0] for i in items]}")
# --- Accumulate ---
numbers = [1, 2, 3, 4, 5]
running_sum = list(itertools.accumulate(numbers)) # [1,3,6,10,15]
running_max = list(itertools.accumulate(numbers, max)) # [1,2,3,4,5]
running_prod= list(itertools.accumulate(numbers, operator.mul)) # [1,2,6,24,120]
Building Data Pipelines
Generators compose naturally into processing pipelines — each stage is a generator that consumes from an upstream generator and yields downstream. This pattern processes arbitrarily large datasets with fixed memory, enables lazy evaluation (only compute what's consumed), and makes each stage independently testable. This is the same model used by Unix pipes and Pandas' query chaining.
import csv
import json
from pathlib import Path
# Pipeline stages — each is a generator
def read_csv(path: str):
"""Source: yields rows as dicts."""
with open(path, newline="") as f:
yield from csv.DictReader(f)
def filter_rows(rows, predicate):
"""Filter stage: yields rows matching predicate."""
for row in rows:
if predicate(row):
yield row
def transform_row(rows, fn):
"""Transform stage: applies fn to each row."""
for row in rows:
yield fn(row)
def batch(rows, size: int):
"""Batch stage: groups rows into lists of `size`."""
import itertools
it = iter(rows)
while True:
chunk = list(itertools.islice(it, size))
if not chunk:
break
yield chunk
def write_jsonl(batches, output_path: str):
"""Sink: writes batches to a JSONL file."""
with open(output_path, "w") as f:
for batch in batches:
for row in batch:
f.write(json.dumps(row) + "\n")
print(f"Done — wrote to {output_path}")
# Wire together the pipeline
def run_etl(input_csv: str, output_jsonl: str):
rows = read_csv(input_csv)
active = filter_rows(rows, lambda r: r.get("status") == "active")
enriched = transform_row(active, lambda r: {**r, "processed": True})
batches = batch(enriched, size=500)
write_jsonl(batches, output_jsonl)
# Memory usage: O(batch_size) regardless of input file size
Async Generators
Async generators combine async def with yield, enabling lazy asynchronous data sources. They are consumed with async for and work naturally with asyncio. This is the right tool for streaming API responses, async database cursors, or any async producer-consumer pattern.
import asyncio
import aiohttp
async def paginate_api(url: str, page_size: int = 100):
"""Async generator: fetch paginated API results lazily."""
async with aiohttp.ClientSession() as session:
page = 1
while True:
params = {"page": page, "per_page": page_size}
async with session.get(url, params=params) as resp:
data = await resp.json()
if not data:
break
for item in data:
yield item
page += 1
async def async_filter(aiterable, predicate):
"""Async filter stage."""
async for item in aiterable:
if predicate(item):
yield item
async def process_users():
users = paginate_api("https://api.example.com/users")
active_users = async_filter(users, lambda u: u["active"])
async for user in active_users:
print(f"Processing {user['name']}")
await asyncio.sleep(0) # yield control back to event loop
# Async generator with aclose() for cleanup
async def database_cursor(query: str):
"""Simulate an async database cursor."""
conn = None
try:
# conn = await db.connect()
for i in range(10): # simulate rows
await asyncio.sleep(0.01)
yield {"id": i, "query": query}
finally:
if conn:
await conn.close() # always clean up
asyncio.run(process_users())