Python Asyncio: Async/Await Complete Guide (2026)
Python's asyncio library enables concurrent I/O-bound programming on a single thread using cooperative multitasking. Instead of blocking while waiting for a network response or database query, your program yields control and does other work. This guide covers the asyncio mental model, practical patterns, aiohttp for HTTP requests, and the most common mistakes developers make when switching from synchronous Python.
Table of Contents
- Event Loop Mechanics
- Coroutines vs Threads vs Processes
- Async/Await Syntax
- asyncio.gather vs asyncio.create_task
- asyncio.Queue for Producer-Consumer
- Async Context Managers
- aiohttp for Async HTTP Requests
- asyncio.timeout and Cancellation
- Common Pitfalls
- Running Sync Code in Executors
- Frequently Asked Questions
Event Loop Mechanics
The asyncio event loop is a scheduler that runs coroutines and I/O callbacks on a single OS thread. When a coroutine hits an await expression on an I/O operation, the event loop suspends it and runs other ready coroutines. When the I/O completes, the coroutine is resumed.
import asyncio
async def main():
print("start")
await asyncio.sleep(1) # suspends — event loop runs other tasks
print("end")
# Python 3.11+: asyncio.run() creates a new event loop, runs main(), closes it
asyncio.run(main())
The event loop is single-threaded. This means no GIL fights and no race conditions for pure Python objects — but it also means a single blocking call freezes everything. The entire asyncio mental model rests on one rule: never block the event loop thread.
Coroutines vs Threads vs Processes
Understanding when to use each concurrency model is critical:
- asyncio coroutines: Best for high-concurrency I/O (thousands of simultaneous network requests, database queries). Extremely low overhead per task (~1 KB stack). Single-threaded — no parallelism for CPU work.
- threading: Useful for blocking I/O code you can't rewrite as async (legacy libraries). Limited by the GIL for CPU work. Context switching overhead at ~1000+ threads.
- multiprocessing: True parallelism for CPU-bound work (image processing, ML inference). High overhead per process (~50 MB memory). Use
concurrent.futures.ProcessPoolExecutor.
Async/Await Syntax
import asyncio
import time
# A coroutine — calling it returns a coroutine object, does NOT execute it
async def fetch_data(url: str) -> dict:
await asyncio.sleep(0.5) # simulates network I/O
return {"url": url, "data": "..."}
# Awaiting a coroutine suspends the current coroutine until it completes
async def process():
result = await fetch_data("https://api.example.com/data")
print(result)
# Sequential — total time ~1.0s
async def sequential():
start = time.perf_counter()
await fetch_data("https://api.example.com/1")
await fetch_data("https://api.example.com/2")
print(f"Sequential: {time.perf_counter() - start:.2f}s")
# Concurrent — total time ~0.5s
async def concurrent():
start = time.perf_counter()
results = await asyncio.gather(
fetch_data("https://api.example.com/1"),
fetch_data("https://api.example.com/2"),
)
print(f"Concurrent: {time.perf_counter() - start:.2f}s")
print(results)
asyncio.run(concurrent())
asyncio.gather vs asyncio.create_task
asyncio.gather runs coroutines concurrently and returns all results. asyncio.create_task schedules a coroutine as an independent task that runs in the background:
import asyncio
async def work(n: int) -> str:
await asyncio.sleep(n * 0.1)
return f"result-{n}"
# gather: waits for ALL and returns results in order
async def use_gather():
results = await asyncio.gather(work(1), work(2), work(3))
print(results) # ['result-1', 'result-2', 'result-3']
# gather with error handling — return_exceptions=True prevents one failure
# from cancelling all tasks
async def use_gather_safe():
results = await asyncio.gather(
work(1), work(2), work(3),
return_exceptions=True
)
for r in results:
if isinstance(r, Exception):
print(f"Task failed: {r}")
else:
print(r)
# create_task: schedules task independently — you can do other work before awaiting
async def use_create_task():
task1 = asyncio.create_task(work(1), name="task-1")
task2 = asyncio.create_task(work(2), name="task-2")
# Tasks are already running while we do other work here
await asyncio.sleep(0)
result1 = await task1
result2 = await task2
print(result1, result2)
# TaskGroup (Python 3.11+) — structured concurrency, cancels all on error
async def use_task_group():
async with asyncio.TaskGroup() as tg:
t1 = tg.create_task(work(1))
t2 = tg.create_task(work(2))
# Both tasks are done here
print(t1.result(), t2.result())
asyncio.run(use_task_group())
asyncio.TaskGroup provides structured concurrency — if any task raises an exception, all sibling tasks are automatically cancelled. This prevents task leaks that are easy to create with bare create_task.
asyncio.Queue for Producer-Consumer
import asyncio
import random
async def producer(queue: asyncio.Queue, n_items: int):
for i in range(n_items):
item = random.randint(1, 100)
await queue.put(item)
print(f"Produced: {item}")
await asyncio.sleep(0.05)
# Signal workers to stop
for _ in range(3): # number of workers
await queue.put(None)
async def worker(worker_id: int, queue: asyncio.Queue):
while True:
item = await queue.get()
if item is None:
break
await asyncio.sleep(0.1) # simulate processing
print(f"Worker {worker_id} processed: {item}")
queue.task_done()
async def main():
queue = asyncio.Queue(maxsize=10)
workers = [asyncio.create_task(worker(i, queue)) for i in range(3)]
await producer(queue, 15)
await asyncio.gather(*workers)
asyncio.run(main())
Async Context Managers
Async context managers allow you to use async with for resources that require async setup/teardown, like database connections or HTTP sessions:
import asyncio
from contextlib import asynccontextmanager
class AsyncDBConnection:
async def __aenter__(self):
print("Opening DB connection")
await asyncio.sleep(0.01) # simulate connect
return self
async def __aexit__(self, exc_type, exc, tb):
print("Closing DB connection")
await asyncio.sleep(0.01) # simulate disconnect
async def query(self, sql: str):
await asyncio.sleep(0.05)
return [{"id": 1, "name": "Alice"}]
# Using asynccontextmanager decorator (simpler)
@asynccontextmanager
async def get_connection(url: str):
print(f"Connecting to {url}")
await asyncio.sleep(0.01)
conn = {"url": url, "active": True}
try:
yield conn
finally:
conn["active"] = False
print("Connection closed")
async def main():
async with AsyncDBConnection() as db:
rows = await db.query("SELECT * FROM users")
print(rows)
async with get_connection("postgresql://localhost/mydb") as conn:
print(conn)
asyncio.run(main())
aiohttp for Async HTTP Requests
pip install aiohttp
import asyncio
import aiohttp
from typing import Any
async def fetch(session: aiohttp.ClientSession, url: str) -> Any:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
resp.raise_for_status()
return await resp.json()
async def fetch_all(urls: list[str]) -> list[Any]:
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
return await asyncio.gather(*tasks, return_exceptions=True)
async def main():
urls = [
"https://jsonplaceholder.typicode.com/posts/1",
"https://jsonplaceholder.typicode.com/posts/2",
"https://jsonplaceholder.typicode.com/posts/3",
]
results = await fetch_all(urls)
for r in results:
if isinstance(r, Exception):
print(f"Error: {r}")
else:
print(r["title"])
asyncio.run(main())
aiohttp.ClientSession per request is a common mistake. The session manages a connection pool — create it once and reuse it for the lifetime of your application.
asyncio.timeout and Cancellation
import asyncio
# Python 3.11+ asyncio.timeout (replaces asyncio.wait_for)
async def slow_operation():
await asyncio.sleep(10)
return "done"
async def with_timeout():
try:
async with asyncio.timeout(2.0):
result = await slow_operation()
except TimeoutError:
print("Operation timed out after 2 seconds")
# Cancelling tasks manually
async def cancelable():
task = asyncio.create_task(slow_operation())
await asyncio.sleep(1)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("Task was cancelled cleanly")
asyncio.run(with_timeout())
Common Pitfalls
These mistakes account for the majority of asyncio bugs in production:
1. Calling blocking code in an async function:
# BAD — blocks the entire event loop for all coroutines
async def bad():
import time
time.sleep(5) # blocks everything
data = open("big.txt").read() # also blocks
# GOOD — run blocking code in a thread pool executor
import asyncio
async def good():
loop = asyncio.get_running_loop()
# run_in_executor runs in a ThreadPoolExecutor by default
data = await loop.run_in_executor(None, open("big.txt").read)
2. Forgetting to await a coroutine:
# BAD — this creates a coroutine object but never runs it
async def main():
fetch_data("http://example.com") # RuntimeWarning: coroutine never awaited
# GOOD
async def main():
result = await fetch_data("http://example.com")
3. Creating tasks but not awaiting them:
# BAD — task runs but errors are silently swallowed after main() exits
async def main():
asyncio.create_task(work()) # fire and forget — dangerous
# GOOD — keep a reference, await it, or use TaskGroup
async def main():
task = asyncio.create_task(work())
await task
Running Sync Code in Executors
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
def cpu_bound(n: int) -> int:
"""Simulate CPU work."""
return sum(i * i for i in range(n))
def blocking_io(path: str) -> str:
time.sleep(0.1) # simulate blocking I/O
return f"read {path}"
async def main():
loop = asyncio.get_running_loop()
# Thread pool for blocking I/O (default executor)
result = await loop.run_in_executor(None, blocking_io, "file.txt")
print(result)
# Process pool for CPU work (bypasses the GIL)
with ProcessPoolExecutor(max_workers=4) as pool:
result = await loop.run_in_executor(pool, cpu_bound, 1_000_000)
print(f"Sum: {result}")
asyncio.run(main())
Frequently Asked Questions
- Does asyncio use multiple CPU cores?
- No. asyncio is single-threaded and runs on one core. For CPU-bound parallelism, use
ProcessPoolExecutorvialoop.run_in_executor()or themultiprocessingmodule. - What is the difference between asyncio.sleep(0) and asyncio.sleep(1)?
asyncio.sleep(0)yields control to the event loop for one iteration, allowing other ready tasks to run, then immediately resumes. It is a common pattern to force task scheduling.asyncio.sleep(1)suspends for at least 1 second.- Can I use asyncio with Django or Flask?
- Django 4.1+ supports async views natively. Flask 2.0+ supports async view functions. However, if you are building an async-first application, FastAPI is a better starting point. See our FastAPI guide.
- How do I debug asyncio applications?
- Enable asyncio debug mode:
asyncio.run(main(), debug=True)or setPYTHONASYNCIODEBUG=1. This logs slow callbacks, unawaited coroutines, and improper thread usage. - What replaced asyncio.coroutine and yield from?
- The old generator-based coroutine syntax (
@asyncio.coroutineandyield from) was removed in Python 3.11. Always useasync defandawait.