Python Generators and Iterators: Memory-Efficient Pipelines

Python generators and iterators are the foundation of memory-efficient data processing. Instead of loading an entire dataset into memory, generators produce values one at a time on demand, making it possible to process files with billions of rows, stream API responses, or build composable data pipelines that use only a few kilobytes of RAM regardless of dataset size. Understanding the iterator protocol and the yield keyword unlocks a completely different way of thinking about data flow in Python.

The Iterator Protocol

Python's iterator protocol is simple: any object that implements __iter__() (returning self) and __next__() (returning the next value or raising StopIteration) is an iterator. An iterable is any object with __iter__() that returns an iterator — lists, tuples, dicts, and strings are all iterables. Understanding this distinction matters when building custom data sources.

class CountUp:
    """Custom iterator that counts from start to stop."""
    def __init__(self, start: int, stop: int):
        self.current = start
        self.stop = stop

    def __iter__(self):
        return self  # Iterator is its own iterable

    def __next__(self):
        if self.current >= self.stop:
            raise StopIteration
        value = self.current
        self.current += 1
        return value

# Works with for loops, list(), zip(), etc.
for n in CountUp(1, 6):
    print(n)  # 1 2 3 4 5

# Manually consuming an iterator
counter = CountUp(10, 13)
print(next(counter))  # 10
print(next(counter))  # 11
print(list(counter))  # [12]  — exhausted after

# Python's built-in iter() and next()
my_list = [10, 20, 30]
it = iter(my_list)
print(next(it))  # 10
print(next(it, "default"))  # 20  (default avoids StopIteration)
Key distinction: An iterable can be iterated (has __iter__). An iterator maintains state and produces the next value (has both __iter__ and __next__). Generators are iterators — they are also iterables that maintain their own position automatically.

Generator Functions and yield

A generator function uses yield instead of return. Calling a generator function doesn't execute its body — it returns a generator object. Each call to next() on the generator runs the function body until the next yield, suspends execution, and returns the yielded value. The function's local variables and execution position are preserved between calls, making generators inherently stateful and lazy.

import sys

def fibonacci():
    """Infinite Fibonacci sequence — never loads all values into memory."""
    a, b = 0, 1
    while True:
        yield a
        a, b = b, a + b

# Take only what you need
fib = fibonacci()
first_10 = [next(fib) for _ in range(10)]
print(first_10)  # [0, 1, 1, 2, 3, 5, 8, 13, 21, 34]

def read_large_file(path: str, chunk_size: int = 8192):
    """Read a large file in chunks without loading it all into memory."""
    with open(path, "rb") as f:
        while True:
            chunk = f.read(chunk_size)
            if not chunk:
                break
            yield chunk

def parse_csv_rows(path: str):
    """Parse a CSV file row by row — constant memory usage."""
    import csv
    with open(path, newline="") as f:
        reader = csv.DictReader(f)
        for row in reader:
            yield row

# Memory comparison
big_list = list(range(1_000_000))        # ~8 MB
big_gen  = (x for x in range(1_000_000)) # ~120 bytes

print(sys.getsizeof(big_list))  # 8448728
print(sys.getsizeof(big_gen))   # 120

Generator Expressions

Generator expressions use the same syntax as list comprehensions but with parentheses instead of brackets. They produce a lazy generator object rather than building a list in memory. They compose naturally with built-in functions like sum(), max(), any(), and all(), which accept any iterable. For single-use transformations over large sequences, generator expressions are almost always preferable to list comprehensions.

import os

# List comprehension — builds entire list in memory
squares_list = [x**2 for x in range(1_000_000)]

# Generator expression — lazy, no memory overhead
squares_gen = (x**2 for x in range(1_000_000))

# sum() accepts any iterable — no list needed
total = sum(x**2 for x in range(1_000_000))

# Filter + transform without intermediate list
evens_doubled = (x * 2 for x in range(100) if x % 2 == 0)

# Composing generator expressions (pipeline)
log_file = "app.log"
# lines = (line.strip() for line in open(log_file))
# errors = (line for line in lines if "ERROR" in line)
# messages = (line.split("|")[2] for line in errors)

# Real-world: sum file sizes across a directory tree
def total_size(root: str) -> int:
    return sum(
        entry.stat().st_size
        for dirpath, _, files in os.walk(root)
        for entry in (os.DirEntry(dirpath + "/" + f) for f in files)
    )

# Nested generators with any/all
data = [{"score": 85}, {"score": 42}, {"score": 91}]
has_failing = any(d["score"] < 50 for d in data)   # True
all_passing  = all(d["score"] >= 50 for d in data)  # False

yield from and Delegation

yield from delegates to a sub-generator or any iterable, transparently forwarding values, send signals, and exceptions. It makes it easy to compose generators and flatten nested iterables without writing manual loops. This is essential when building recursive generators or chaining multiple data sources into a single stream.

from typing import Iterable, Any

# Flatten arbitrarily nested iterables
def flatten(items: Any) -> Iterable:
    """Recursively flatten nested lists/tuples using yield from."""
    if isinstance(items, (list, tuple)):
        for item in items:
            yield from flatten(item)
    else:
        yield items

nested = [1, [2, 3, [4, 5]], 6, [7, [8, [9]]]]
print(list(flatten(nested)))  # [1, 2, 3, 4, 5, 6, 7, 8, 9]

# Chain multiple generators
def read_all_logs(*paths: str):
    """Yield lines from multiple log files in sequence."""
    for path in paths:
        yield from open(path)

# yield from vs manual loop (equivalent, but yield from is faster)
def chain_manual(iters):
    for it in iters:
        for item in it:
            yield item

def chain_delegated(iters):
    for it in iters:
        yield from it  # cleaner and ~20% faster

# Flattening a tree structure
class TreeNode:
    def __init__(self, val, children=None):
        self.val = val
        self.children = children or []

    def __iter__(self):
        yield self.val
        for child in self.children:
            yield from child  # recursively delegate

root = TreeNode(1, [TreeNode(2, [TreeNode(4), TreeNode(5)]), TreeNode(3)])
print(list(root))  # [1, 2, 4, 5, 3]

send() and throw(): Two-Way Communication

Generators support two-way communication: gen.send(value) resumes the generator and makes yield evaluate to the sent value, while gen.throw(exc) raises an exception inside the generator at the point of suspension. This enables coroutine-style programming. Python's asyncio was originally built on top of this mechanism before the async/await syntax was introduced.

def accumulator():
    """Coroutine that accumulates sent values and yields the running total."""
    total = 0
    while True:
        value = yield total   # yield sends total out, receives next value
        if value is None:
            return total
        total += value

acc = accumulator()
next(acc)          # Prime the coroutine (advance to first yield)
acc.send(10)       # total = 10, returns 10
acc.send(20)       # total = 30, returns 30
acc.send(5)        # total = 35, returns 35

# Using throw() to handle signals
def resource_manager():
    print("Acquiring resource")
    try:
        while True:
            command = yield "ready"
            print(f"Executing: {command}")
    except GeneratorExit:
        print("Generator closed — releasing resource")
    except ValueError as e:
        print(f"Invalid command: {e}")
        yield "error"

rm = resource_manager()
print(next(rm))          # "Acquiring resource" / "ready"
print(rm.send("write"))  # "Executing: write" / "ready"
rm.throw(ValueError, "bad command")  # "Invalid command: bad command"
rm.close()               # "Generator closed — releasing resource"
Priming requirement: Before calling send(value) with a non-None value, you must call next(gen) or gen.send(None) once to advance the generator to the first yield. Use a @coroutine decorator or functools.wraps wrapper to automate this in reusable code.

itertools: The Generator Toolkit

The itertools module provides fast, memory-efficient tools implemented in C. These functions return iterators and compose perfectly with generator functions and expressions. Together, itertools and functools form a complete functional programming toolkit for data transformation without materializing intermediate collections.

import itertools
import operator

# --- Infinite iterators ---
# count(start, step): 0, 1, 2, 3, ...
for i, letter in zip(itertools.count(1), "ABCDE"):
    print(f"{i}. {letter}")

# cycle: repeats indefinitely
statuses = itertools.cycle(["active", "idle", "sleep"])
first_9 = list(itertools.islice(statuses, 9))

# repeat: same value N times (or forever)
defaults = list(itertools.repeat(0, 5))  # [0, 0, 0, 0, 0]

# --- Combinatorics ---
print(list(itertools.combinations("ABCD", 2)))     # 6 pairs
print(list(itertools.permutations("ABC", 2)))       # 6 ordered pairs
print(list(itertools.product([0,1], repeat=3)))     # 8 binary tuples

# --- Filtering/slicing ---
data = range(20)
# islice(iter, stop) or islice(iter, start, stop, step)
subset = list(itertools.islice(data, 5, 15, 2))  # [5, 7, 9, 11, 13]

# takewhile / dropwhile
taken = list(itertools.takewhile(lambda x: x < 5, data))  # [0,1,2,3,4]
dropped = list(itertools.dropwhile(lambda x: x < 5, data)) # [5..19]

# --- Grouping ---
inventory = [
    ("apple", "fruit"), ("banana", "fruit"),
    ("carrot", "veg"),  ("broccoli", "veg"),
    ("mango", "fruit"),
]
inventory.sort(key=lambda x: x[1])  # must sort before groupby
for category, items in itertools.groupby(inventory, key=lambda x: x[1]):
    print(f"{category}: {[i[0] for i in items]}")

# --- Accumulate ---
numbers = [1, 2, 3, 4, 5]
running_sum = list(itertools.accumulate(numbers))          # [1,3,6,10,15]
running_max = list(itertools.accumulate(numbers, max))     # [1,2,3,4,5]
running_prod= list(itertools.accumulate(numbers, operator.mul))  # [1,2,6,24,120]

Building Data Pipelines

Generators compose naturally into processing pipelines — each stage is a generator that consumes from an upstream generator and yields downstream. This pattern processes arbitrarily large datasets with fixed memory, enables lazy evaluation (only compute what's consumed), and makes each stage independently testable. This is the same model used by Unix pipes and Pandas' query chaining.

import csv
import json
from pathlib import Path

# Pipeline stages — each is a generator
def read_csv(path: str):
    """Source: yields rows as dicts."""
    with open(path, newline="") as f:
        yield from csv.DictReader(f)

def filter_rows(rows, predicate):
    """Filter stage: yields rows matching predicate."""
    for row in rows:
        if predicate(row):
            yield row

def transform_row(rows, fn):
    """Transform stage: applies fn to each row."""
    for row in rows:
        yield fn(row)

def batch(rows, size: int):
    """Batch stage: groups rows into lists of `size`."""
    import itertools
    it = iter(rows)
    while True:
        chunk = list(itertools.islice(it, size))
        if not chunk:
            break
        yield chunk

def write_jsonl(batches, output_path: str):
    """Sink: writes batches to a JSONL file."""
    with open(output_path, "w") as f:
        for batch in batches:
            for row in batch:
                f.write(json.dumps(row) + "\n")
    print(f"Done — wrote to {output_path}")

# Wire together the pipeline
def run_etl(input_csv: str, output_jsonl: str):
    rows     = read_csv(input_csv)
    active   = filter_rows(rows, lambda r: r.get("status") == "active")
    enriched = transform_row(active, lambda r: {**r, "processed": True})
    batches  = batch(enriched, size=500)
    write_jsonl(batches, output_jsonl)
    # Memory usage: O(batch_size) regardless of input file size

Async Generators

Async generators combine async def with yield, enabling lazy asynchronous data sources. They are consumed with async for and work naturally with asyncio. This is the right tool for streaming API responses, async database cursors, or any async producer-consumer pattern.

import asyncio
import aiohttp

async def paginate_api(url: str, page_size: int = 100):
    """Async generator: fetch paginated API results lazily."""
    async with aiohttp.ClientSession() as session:
        page = 1
        while True:
            params = {"page": page, "per_page": page_size}
            async with session.get(url, params=params) as resp:
                data = await resp.json()
                if not data:
                    break
                for item in data:
                    yield item
                page += 1

async def async_filter(aiterable, predicate):
    """Async filter stage."""
    async for item in aiterable:
        if predicate(item):
            yield item

async def process_users():
    users = paginate_api("https://api.example.com/users")
    active_users = async_filter(users, lambda u: u["active"])

    async for user in active_users:
        print(f"Processing {user['name']}")
        await asyncio.sleep(0)  # yield control back to event loop

# Async generator with aclose() for cleanup
async def database_cursor(query: str):
    """Simulate an async database cursor."""
    conn = None
    try:
        # conn = await db.connect()
        for i in range(10):  # simulate rows
            await asyncio.sleep(0.01)
            yield {"id": i, "query": query}
    finally:
        if conn:
            await conn.close()  # always clean up

asyncio.run(process_users())
Performance tip: Async generators have slightly higher overhead per item than sync generators. For CPU-bound transformations, use sync generators inside async tasks rather than making every stage async. Only the I/O boundary (database reads, HTTP calls) needs to be async.