Python Threading vs Multiprocessing vs asyncio: When to Use Each
Python offers three concurrency models — threading, multiprocessing, and asyncio — and choosing the wrong one for your workload can leave performance on the table or add unnecessary complexity. The Global Interpreter Lock (GIL) means threads cannot run Python bytecode in parallel, making threading suitable for I/O-bound work but ineffective for CPU-bound computation. Multiprocessing sidesteps the GIL by using separate processes. Asyncio handles thousands of concurrent I/O operations with a single thread using cooperative scheduling. Each model excels in a different scenario.
Table of Contents
The GIL Explained
The Global Interpreter Lock (GIL) is a mutex in CPython that prevents multiple threads from executing Python bytecode simultaneously. Only one thread holds the GIL at any time. The GIL is released during I/O operations (file reads, network calls, sleep) and during calls into C extensions that explicitly release it (NumPy, pandas, most database drivers). This means threading provides true concurrency for I/O-bound tasks but not for CPU-bound Python code.
import time
import threading
def cpu_task(n: int):
"""Pure CPU work — cannot be parallelized with threads."""
count = 0
for _ in range(n):
count += 1
return count
def io_task(seconds: float):
"""I/O work — threads can run this in parallel."""
time.sleep(seconds) # GIL is released during sleep
return f"slept {seconds}s"
# CPU-bound: threading gives NO speedup
start = time.time()
threads = [threading.Thread(target=cpu_task, args=(10_000_000,)) for _ in range(4)]
for t in threads: t.start()
for t in threads: t.join()
print(f"4 CPU threads: {time.time()-start:.2f}s") # ~same as sequential
# I/O-bound: threading DOES give speedup
start = time.time()
threads = [threading.Thread(target=io_task, args=(1.0,)) for _ in range(4)]
for t in threads: t.start()
for t in threads: t.join()
print(f"4 I/O threads: {time.time()-start:.2f}s") # ~1s, not 4s
# Verify GIL behavior
import sys
print(sys.version) # CPython
# Note: PyPy and GraalPy have different GIL behavior
--disable-gil build flag. This is a significant change — watch the per-interpreter GIL (PEP 703) for the roadmap toward true thread parallelism in CPython.
Threading: I/O-Bound Work
Use threads when your tasks spend most of their time waiting for I/O: network requests, database queries, file reads, or sleeping. Python threads are OS threads with low creation overhead and shared memory. The threading module provides threads, locks, events, semaphores, and queues. For most production code, use ThreadPoolExecutor instead of managing threads manually.
import threading
import queue
import requests
from concurrent.futures import ThreadPoolExecutor
# Thread-safe producer/consumer with Queue
def producer(q: queue.Queue, items: list):
for item in items:
result = requests.get(f"https://api.example.com/items/{item}", timeout=5)
q.put(result.json())
q.put(None) # sentinel
def consumer(q: queue.Queue, results: list, lock: threading.Lock):
while True:
item = q.get()
if item is None:
break
with lock:
results.append(item)
# Thread synchronization primitives
class SharedCounter:
def __init__(self):
self._value = 0
self._lock = threading.Lock()
def increment(self):
with self._lock: # atomic increment
self._value += 1
@property
def value(self):
with self._lock:
return self._value
# Event for coordination
ready_event = threading.Event()
def worker(event: threading.Event):
print("Worker: waiting for signal...")
event.wait(timeout=5)
print("Worker: signal received, processing")
def controller(event: threading.Event):
time.sleep(1)
event.set() # unblocks all waiters
t1 = threading.Thread(target=worker, args=(ready_event,))
t2 = threading.Thread(target=controller, args=(ready_event,))
t1.start(); t2.start()
t1.join(); t2.join()
# Thread-local storage (per-thread state)
local_data = threading.local()
def set_user(user_id: int):
local_data.user_id = user_id
process() # can safely access local_data.user_id
def process():
print(f"Processing for user {local_data.user_id}")
Multiprocessing: CPU-Bound Work
Multiprocessing creates separate OS processes, each with its own Python interpreter and GIL. This enables true parallel execution of CPU-intensive tasks across all CPU cores. The downside is higher overhead: process creation is slow, memory is not shared (data is serialized via pickle between processes), and communication requires explicit IPC (queues, pipes, shared memory). For CPU-bound tasks that take seconds or more per unit, the speedup is often linear with core count.
import multiprocessing as mp
from multiprocessing import Pool, Process, Queue, shared_memory
import numpy as np
def cpu_intensive(data: list[int]) -> int:
"""CPU-bound work: sum of squares."""
return sum(x**2 for x in data)
# Pool.map — simplest parallel map
if __name__ == "__main__": # Required on Windows
data_chunks = [list(range(i, i+100_000)) for i in range(0, 800_000, 100_000)]
with Pool(processes=mp.cpu_count()) as pool:
results = pool.map(cpu_intensive, data_chunks)
print(f"Total: {sum(results)}")
# Pool.starmap for multiple arguments
def worker(x, y):
return x ** y
with Pool() as pool:
results = pool.starmap(worker, [(2, 10), (3, 5), (4, 3)])
# Shared memory for large NumPy arrays (zero-copy)
array = np.ones((10_000, 10_000), dtype=np.float64)
shm = shared_memory.SharedMemory(create=True, size=array.nbytes)
shared_array = np.ndarray(array.shape, dtype=array.dtype, buffer=shm.buf)
shared_array[:] = array
def process_slice(shm_name, shape, dtype, start, end):
existing = shared_memory.SharedMemory(name=shm_name)
arr = np.ndarray(shape, dtype=dtype, buffer=existing.buf)
return arr[start:end].sum() # reads without copying
with Pool() as pool:
futures = [
pool.apply_async(process_slice, (shm.name, array.shape, array.dtype, i, i+2500))
for i in range(0, 10000, 2500)
]
totals = [f.get() for f in futures]
shm.close()
shm.unlink()
asyncio: High-Concurrency I/O
asyncio uses a single-threaded event loop with cooperative scheduling. When a coroutine awaits an I/O operation, control returns to the event loop which runs other coroutines. This enables handling thousands of concurrent connections with minimal overhead — no thread context switching, no GIL contention. asyncio is the right choice for web servers, chat servers, API gateways, and any service with high I/O concurrency.
import asyncio
import aiohttp
import aiofiles
async def fetch(session: aiohttp.ClientSession, url: str) -> dict:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
resp.raise_for_status()
return await resp.json()
async def fetch_all(urls: list[str]) -> list[dict]:
"""Fetch all URLs concurrently — ~same time as one request."""
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
return await asyncio.gather(*tasks, return_exceptions=True)
# Semaphore to limit concurrency
async def fetch_with_limit(urls: list[str], max_concurrent: int = 10):
sem = asyncio.Semaphore(max_concurrent)
async def limited_fetch(session, url):
async with sem:
return await fetch(session, url)
async with aiohttp.ClientSession() as session:
tasks = [limited_fetch(session, url) for url in urls]
return await asyncio.gather(*tasks)
# asyncio.Queue for producer/consumer
async def producer(queue: asyncio.Queue, items: list):
for item in items:
await queue.put(item)
await queue.put(None) # sentinel
async def consumer(queue: asyncio.Queue, results: list):
while True:
item = await queue.get()
if item is None:
break
results.append(item * 2)
queue.task_done()
async def main():
queue = asyncio.Queue(maxsize=10)
results = []
await asyncio.gather(
producer(queue, list(range(20))),
consumer(queue, results),
)
print(results)
asyncio.run(main())
concurrent.futures Unified API
concurrent.futures provides a high-level unified interface for both threads and processes through ThreadPoolExecutor and ProcessPoolExecutor. The API is identical — swapping between them is a one-line change. It also integrates cleanly with asyncio via loop.run_in_executor(), enabling you to run blocking code in a thread pool without blocking the event loop.
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
import asyncio
urls = [f"https://httpbin.org/delay/1?n={i}" for i in range(5)]
# ThreadPoolExecutor — I/O bound
with ThreadPoolExecutor(max_workers=5) as executor:
futures = {executor.submit(requests.get, url): url for url in urls}
for future in as_completed(futures):
url = futures[future]
try:
result = future.result()
print(f"{url}: {result.status_code}")
except Exception as e:
print(f"{url} failed: {e}")
# ProcessPoolExecutor — CPU bound (swap in with one line change)
def process_chunk(chunk):
return sum(x**2 for x in chunk)
chunks = [range(i, i+1_000_000) for i in range(0, 4_000_000, 1_000_000)]
with ProcessPoolExecutor(max_workers=4) as executor:
results = list(executor.map(process_chunk, chunks))
# run_in_executor: bridge between asyncio and blocking code
async def async_main():
loop = asyncio.get_event_loop()
# Run blocking I/O in thread pool (don't block the event loop)
with ThreadPoolExecutor(max_workers=4) as executor:
result = await loop.run_in_executor(executor, requests.get, "https://httpbin.org/get")
# Run CPU-bound in process pool
with ProcessPoolExecutor() as executor:
heavy = await loop.run_in_executor(executor, process_chunk, range(1_000_000))
return result, heavy
asyncio.run(async_main())
Combining Models
Real applications often combine concurrency models: an asyncio web server handles thousands of connections while offloading CPU-heavy work to a process pool, and uses thread pool workers for legacy blocking libraries that don't support async. FastAPI and Starlette do exactly this — async request handlers with run_in_executor for blocking database calls.
import asyncio
from concurrent.futures import ProcessPoolExecutor
from fastapi import FastAPI
app = FastAPI()
process_pool = ProcessPoolExecutor(max_workers=4)
def heavy_computation(data: list) -> float:
"""CPU-bound: runs in separate process."""
import math
return sum(math.sqrt(x) for x in data)
@app.post("/compute")
async def compute_endpoint(data: list[int]):
loop = asyncio.get_event_loop()
# Offload CPU work to process pool without blocking the event loop
result = await loop.run_in_executor(process_pool, heavy_computation, data)
return {"result": result}
@app.on_event("shutdown")
async def shutdown():
process_pool.shutdown(wait=True)
Decision Guide
Choosing the right concurrency model depends on your task type, concurrency level, and complexity budget. Use this framework to make the decision quickly.
"""
Decision framework:
1. Is it CPU-bound (pure Python computation)?
→ Use multiprocessing / ProcessPoolExecutor
→ Consider numpy vectorization first (often faster than multiprocessing)
2. Is it I/O-bound with high concurrency (1000+ concurrent connections)?
→ Use asyncio with async libraries (aiohttp, asyncpg, motor)
3. Is it I/O-bound with moderate concurrency (<100 concurrent)?
→ Use ThreadPoolExecutor (simpler, good enough)
4. Do you need to run blocking libraries in an async app?
→ Use loop.run_in_executor(ThreadPoolExecutor, ...)
5. Do you need CPU work in an async app?
→ Use loop.run_in_executor(ProcessPoolExecutor, ...)
Workload type → Recommended tool:
├── HTTP scraping (100s of URLs) → asyncio + aiohttp
├── DB queries (concurrent reads) → asyncio + asyncpg/motor
├── Image/video processing → multiprocessing + Pool
├── ML inference (per-request) → ProcessPoolExecutor
├── Legacy blocking library → ThreadPoolExecutor
├── File I/O (many small files) → ThreadPoolExecutor
└── Streaming data (real-time) → asyncio + generators
"""
asyncio provides the best concurrency/complexity tradeoff.