Python Asyncio: Async/Await Complete Guide (2026)

Python's asyncio library enables concurrent I/O-bound programming on a single thread using cooperative multitasking. Instead of blocking while waiting for a network response or database query, your program yields control and does other work. This guide covers the asyncio mental model, practical patterns, aiohttp for HTTP requests, and the most common mistakes developers make when switching from synchronous Python.

Event Loop Mechanics

The asyncio event loop is a scheduler that runs coroutines and I/O callbacks on a single OS thread. When a coroutine hits an await expression on an I/O operation, the event loop suspends it and runs other ready coroutines. When the I/O completes, the coroutine is resumed.

import asyncio

async def main():
    print("start")
    await asyncio.sleep(1)   # suspends — event loop runs other tasks
    print("end")

# Python 3.11+: asyncio.run() creates a new event loop, runs main(), closes it
asyncio.run(main())

The event loop is single-threaded. This means no GIL fights and no race conditions for pure Python objects — but it also means a single blocking call freezes everything. The entire asyncio mental model rests on one rule: never block the event loop thread.

Coroutines vs Threads vs Processes

Understanding when to use each concurrency model is critical:

  • asyncio coroutines: Best for high-concurrency I/O (thousands of simultaneous network requests, database queries). Extremely low overhead per task (~1 KB stack). Single-threaded — no parallelism for CPU work.
  • threading: Useful for blocking I/O code you can't rewrite as async (legacy libraries). Limited by the GIL for CPU work. Context switching overhead at ~1000+ threads.
  • multiprocessing: True parallelism for CPU-bound work (image processing, ML inference). High overhead per process (~50 MB memory). Use concurrent.futures.ProcessPoolExecutor.
Rule of thumb: I/O-bound and high concurrency → asyncio. I/O-bound with blocking libraries → threads. CPU-bound → processes.

Async/Await Syntax

import asyncio
import time

# A coroutine — calling it returns a coroutine object, does NOT execute it
async def fetch_data(url: str) -> dict:
    await asyncio.sleep(0.5)   # simulates network I/O
    return {"url": url, "data": "..."}

# Awaiting a coroutine suspends the current coroutine until it completes
async def process():
    result = await fetch_data("https://api.example.com/data")
    print(result)

# Sequential — total time ~1.0s
async def sequential():
    start = time.perf_counter()
    await fetch_data("https://api.example.com/1")
    await fetch_data("https://api.example.com/2")
    print(f"Sequential: {time.perf_counter() - start:.2f}s")

# Concurrent — total time ~0.5s
async def concurrent():
    start = time.perf_counter()
    results = await asyncio.gather(
        fetch_data("https://api.example.com/1"),
        fetch_data("https://api.example.com/2"),
    )
    print(f"Concurrent: {time.perf_counter() - start:.2f}s")
    print(results)

asyncio.run(concurrent())

asyncio.gather vs asyncio.create_task

asyncio.gather runs coroutines concurrently and returns all results. asyncio.create_task schedules a coroutine as an independent task that runs in the background:

import asyncio

async def work(n: int) -> str:
    await asyncio.sleep(n * 0.1)
    return f"result-{n}"

# gather: waits for ALL and returns results in order
async def use_gather():
    results = await asyncio.gather(work(1), work(2), work(3))
    print(results)  # ['result-1', 'result-2', 'result-3']

# gather with error handling — return_exceptions=True prevents one failure
# from cancelling all tasks
async def use_gather_safe():
    results = await asyncio.gather(
        work(1), work(2), work(3),
        return_exceptions=True
    )
    for r in results:
        if isinstance(r, Exception):
            print(f"Task failed: {r}")
        else:
            print(r)

# create_task: schedules task independently — you can do other work before awaiting
async def use_create_task():
    task1 = asyncio.create_task(work(1), name="task-1")
    task2 = asyncio.create_task(work(2), name="task-2")

    # Tasks are already running while we do other work here
    await asyncio.sleep(0)

    result1 = await task1
    result2 = await task2
    print(result1, result2)

# TaskGroup (Python 3.11+) — structured concurrency, cancels all on error
async def use_task_group():
    async with asyncio.TaskGroup() as tg:
        t1 = tg.create_task(work(1))
        t2 = tg.create_task(work(2))
    # Both tasks are done here
    print(t1.result(), t2.result())

asyncio.run(use_task_group())
Prefer TaskGroup in Python 3.11+: asyncio.TaskGroup provides structured concurrency — if any task raises an exception, all sibling tasks are automatically cancelled. This prevents task leaks that are easy to create with bare create_task.

asyncio.Queue for Producer-Consumer

import asyncio
import random

async def producer(queue: asyncio.Queue, n_items: int):
    for i in range(n_items):
        item = random.randint(1, 100)
        await queue.put(item)
        print(f"Produced: {item}")
        await asyncio.sleep(0.05)
    # Signal workers to stop
    for _ in range(3):   # number of workers
        await queue.put(None)

async def worker(worker_id: int, queue: asyncio.Queue):
    while True:
        item = await queue.get()
        if item is None:
            break
        await asyncio.sleep(0.1)   # simulate processing
        print(f"Worker {worker_id} processed: {item}")
        queue.task_done()

async def main():
    queue = asyncio.Queue(maxsize=10)
    workers = [asyncio.create_task(worker(i, queue)) for i in range(3)]
    await producer(queue, 15)
    await asyncio.gather(*workers)

asyncio.run(main())

Async Context Managers

Async context managers allow you to use async with for resources that require async setup/teardown, like database connections or HTTP sessions:

import asyncio
from contextlib import asynccontextmanager

class AsyncDBConnection:
    async def __aenter__(self):
        print("Opening DB connection")
        await asyncio.sleep(0.01)  # simulate connect
        return self

    async def __aexit__(self, exc_type, exc, tb):
        print("Closing DB connection")
        await asyncio.sleep(0.01)  # simulate disconnect

    async def query(self, sql: str):
        await asyncio.sleep(0.05)
        return [{"id": 1, "name": "Alice"}]

# Using asynccontextmanager decorator (simpler)
@asynccontextmanager
async def get_connection(url: str):
    print(f"Connecting to {url}")
    await asyncio.sleep(0.01)
    conn = {"url": url, "active": True}
    try:
        yield conn
    finally:
        conn["active"] = False
        print("Connection closed")

async def main():
    async with AsyncDBConnection() as db:
        rows = await db.query("SELECT * FROM users")
        print(rows)

    async with get_connection("postgresql://localhost/mydb") as conn:
        print(conn)

asyncio.run(main())

aiohttp for Async HTTP Requests

pip install aiohttp
import asyncio
import aiohttp
from typing import Any

async def fetch(session: aiohttp.ClientSession, url: str) -> Any:
    async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
        resp.raise_for_status()
        return await resp.json()

async def fetch_all(urls: list[str]) -> list[Any]:
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url) for url in urls]
        return await asyncio.gather(*tasks, return_exceptions=True)

async def main():
    urls = [
        "https://jsonplaceholder.typicode.com/posts/1",
        "https://jsonplaceholder.typicode.com/posts/2",
        "https://jsonplaceholder.typicode.com/posts/3",
    ]
    results = await fetch_all(urls)
    for r in results:
        if isinstance(r, Exception):
            print(f"Error: {r}")
        else:
            print(r["title"])

asyncio.run(main())
Reuse ClientSession: Creating a new aiohttp.ClientSession per request is a common mistake. The session manages a connection pool — create it once and reuse it for the lifetime of your application.

asyncio.timeout and Cancellation

import asyncio

# Python 3.11+ asyncio.timeout (replaces asyncio.wait_for)
async def slow_operation():
    await asyncio.sleep(10)
    return "done"

async def with_timeout():
    try:
        async with asyncio.timeout(2.0):
            result = await slow_operation()
    except TimeoutError:
        print("Operation timed out after 2 seconds")

# Cancelling tasks manually
async def cancelable():
    task = asyncio.create_task(slow_operation())
    await asyncio.sleep(1)
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        print("Task was cancelled cleanly")

asyncio.run(with_timeout())

Common Pitfalls

These mistakes account for the majority of asyncio bugs in production:

1. Calling blocking code in an async function:

# BAD — blocks the entire event loop for all coroutines
async def bad():
    import time
    time.sleep(5)        # blocks everything
    data = open("big.txt").read()  # also blocks

# GOOD — run blocking code in a thread pool executor
import asyncio

async def good():
    loop = asyncio.get_running_loop()
    # run_in_executor runs in a ThreadPoolExecutor by default
    data = await loop.run_in_executor(None, open("big.txt").read)

2. Forgetting to await a coroutine:

# BAD — this creates a coroutine object but never runs it
async def main():
    fetch_data("http://example.com")   # RuntimeWarning: coroutine never awaited

# GOOD
async def main():
    result = await fetch_data("http://example.com")

3. Creating tasks but not awaiting them:

# BAD — task runs but errors are silently swallowed after main() exits
async def main():
    asyncio.create_task(work())  # fire and forget — dangerous

# GOOD — keep a reference, await it, or use TaskGroup
async def main():
    task = asyncio.create_task(work())
    await task

Running Sync Code in Executors

import asyncio
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

def cpu_bound(n: int) -> int:
    """Simulate CPU work."""
    return sum(i * i for i in range(n))

def blocking_io(path: str) -> str:
    time.sleep(0.1)  # simulate blocking I/O
    return f"read {path}"

async def main():
    loop = asyncio.get_running_loop()

    # Thread pool for blocking I/O (default executor)
    result = await loop.run_in_executor(None, blocking_io, "file.txt")
    print(result)

    # Process pool for CPU work (bypasses the GIL)
    with ProcessPoolExecutor(max_workers=4) as pool:
        result = await loop.run_in_executor(pool, cpu_bound, 1_000_000)
        print(f"Sum: {result}")

asyncio.run(main())

Frequently Asked Questions

Does asyncio use multiple CPU cores?
No. asyncio is single-threaded and runs on one core. For CPU-bound parallelism, use ProcessPoolExecutor via loop.run_in_executor() or the multiprocessing module.
What is the difference between asyncio.sleep(0) and asyncio.sleep(1)?
asyncio.sleep(0) yields control to the event loop for one iteration, allowing other ready tasks to run, then immediately resumes. It is a common pattern to force task scheduling. asyncio.sleep(1) suspends for at least 1 second.
Can I use asyncio with Django or Flask?
Django 4.1+ supports async views natively. Flask 2.0+ supports async view functions. However, if you are building an async-first application, FastAPI is a better starting point. See our FastAPI guide.
How do I debug asyncio applications?
Enable asyncio debug mode: asyncio.run(main(), debug=True) or set PYTHONASYNCIODEBUG=1. This logs slow callbacks, unawaited coroutines, and improper thread usage.
What replaced asyncio.coroutine and yield from?
The old generator-based coroutine syntax (@asyncio.coroutine and yield from) was removed in Python 3.11. Always use async def and await.