Python httpx: Async HTTP Client Guide

httpx is the modern Python HTTP client that supports both synchronous and asynchronous requests, HTTP/1.1 and HTTP/2, connection pooling, timeouts, cookies, authentication, and streaming — all with a requests-compatible API. It is the standard choice for async Python applications and FastAPI projects that need to call external services without blocking the event loop.

Sync and Async Basics

pip install httpx[http2]
import httpx

# Synchronous — drop-in requests replacement
response = httpx.get("https://httpbin.org/get", params={"page": 1})
print(response.status_code)          # 200
print(response.headers["content-type"])
print(response.json())               # parsed JSON
response.raise_for_status()          # raises HTTPStatusError on 4xx/5xx

# POST with JSON body
response = httpx.post(
    "https://httpbin.org/post",
    json={"name": "Techoral", "version": "2.0"},
    headers={"X-API-Key": "secret"},
)
data = response.json()

# Async version — same API, non-blocking
import asyncio

async def fetch_data():
    async with httpx.AsyncClient() as client:
        response = await client.get("https://httpbin.org/get")
        response.raise_for_status()
        return response.json()

result = asyncio.run(fetch_data())
httpx vs requests: requests is synchronous and blocks the event loop. In FastAPI/asyncio code, use httpx.AsyncClient so the event loop can handle other requests while waiting for the HTTP response. For scripts and CLI tools, the sync API is fine.

AsyncClient and Connection Pooling

Always use a persistent AsyncClient instance rather than calling httpx.get() for each request. The client maintains a connection pool that reuses TCP connections, reducing latency from TCP and TLS handshakes on repeated calls to the same host.

import httpx
import asyncio
from contextlib import asynccontextmanager
from fastapi import FastAPI


# Application-level shared client — created once, shared across requests
_http_client: httpx.AsyncClient | None = None


@asynccontextmanager
async def lifespan(app: FastAPI):
    global _http_client
    _http_client = httpx.AsyncClient(
        base_url="https://api.example.com",
        timeout=httpx.Timeout(connect=5, read=30, write=10, pool=5),
        limits=httpx.Limits(
            max_connections=100,
            max_keepalive_connections=20,
            keepalive_expiry=30,
        ),
        headers={
            "User-Agent": "TechoralAPI/2.0",
            "Accept": "application/json",
        },
        http2=True,
    )
    yield
    await _http_client.aclose()


app = FastAPI(lifespan=lifespan)


@app.get("/proxy/users/{user_id}")
async def proxy_user(user_id: int):
    response = await _http_client.get(f"/v1/users/{user_id}")
    response.raise_for_status()
    return response.json()


# Parallel requests using asyncio.gather
async def fetch_all(urls: list[str]) -> list[dict]:
    async with httpx.AsyncClient(timeout=10) as client:
        tasks = [client.get(url) for url in urls]
        responses = await asyncio.gather(*tasks, return_exceptions=True)
        results = []
        for r in responses:
            if isinstance(r, Exception):
                results.append({"error": str(r)})
            else:
                results.append(r.json() if r.status_code == 200 else {"status": r.status_code})
        return results

Timeouts and Limits

httpx separates timeouts into four phases: connect, read, write, and pool. Setting a single timeout applies to all phases. Always set timeouts — unbounded requests will block indefinitely in network failure scenarios.

import httpx

# Single timeout applies to all phases
client = httpx.AsyncClient(timeout=30.0)

# Granular per-phase timeouts
timeout = httpx.Timeout(
    connect=5.0,     # time to establish TCP connection
    read=30.0,       # time to receive the first byte after sending request
    write=10.0,      # time to send the request body
    pool=5.0,        # time to acquire a connection from the pool
)

# No timeout (dangerous — only for internal trusted services with SLAs)
no_timeout = httpx.Timeout(None)

# Per-request timeout override
async def call_with_custom_timeout():
    async with httpx.AsyncClient(timeout=timeout) as client:
        # Override timeout for one slow endpoint
        slow_response = await client.get(
            "https://api.example.com/report/generate",
            timeout=httpx.Timeout(read=120.0),
        )
        # Use client default for fast endpoints
        fast_response = await client.get("https://api.example.com/health")

Authentication

httpx ships with built-in auth classes for Basic, Digest, and Bearer token auth. Implement the httpx.Auth interface to create custom auth flows like OAuth2 token refresh or HMAC signing.

import httpx
import time


# Bearer token auth
class BearerAuth(httpx.Auth):
    def __init__(self, token: str):
        self.token = token

    def auth_flow(self, request: httpx.Request):
        request.headers["Authorization"] = f"Bearer {self.token}"
        yield request


# OAuth2 with automatic token refresh
class OAuth2Auth(httpx.Auth):
    requires_response_body = True

    def __init__(self, client_id: str, client_secret: str, token_url: str):
        self.client_id = client_id
        self.client_secret = client_secret
        self.token_url = token_url
        self._token: str | None = None
        self._expires_at: float = 0

    def auth_flow(self, request: httpx.Request):
        if not self._token or time.time() >= self._expires_at - 60:
            # Request a new token
            token_request = self.build_token_request()
            token_response = yield token_request
            self._parse_token(token_response)

        request.headers["Authorization"] = f"Bearer {self._token}"
        yield request

    def build_token_request(self) -> httpx.Request:
        return httpx.Request(
            "POST", self.token_url,
            data={"grant_type": "client_credentials",
                  "client_id": self.client_id,
                  "client_secret": self.client_secret},
        )

    def _parse_token(self, response: httpx.Response):
        data = response.json()
        self._token = data["access_token"]
        self._expires_at = time.time() + data.get("expires_in", 3600)


# HMAC request signing
import hashlib
import hmac

class HMACAuth(httpx.Auth):
    def __init__(self, key: str, secret: str):
        self.key = key
        self.secret = secret.encode()

    def auth_flow(self, request: httpx.Request):
        timestamp = str(int(time.time()))
        message = f"{timestamp}\n{request.method}\n{request.url.path}".encode()
        signature = hmac.new(self.secret, message, hashlib.sha256).hexdigest()
        request.headers["X-API-Key"] = self.key
        request.headers["X-Timestamp"] = timestamp
        request.headers["X-Signature"] = signature
        yield request


async def call_api():
    async with httpx.AsyncClient(
        base_url="https://api.example.com",
        auth=BearerAuth("my-jwt-token"),
    ) as client:
        return await client.get("/protected/resource")

Streaming Responses

Use streaming to download large files or consume server-sent events without buffering the entire response in memory. httpx's streaming API works in both sync and async modes.

import httpx
import asyncio


async def download_large_file(url: str, dest: str) -> None:
    async with httpx.AsyncClient(timeout=None) as client:
        async with client.stream("GET", url) as response:
            response.raise_for_status()
            total = int(response.headers.get("content-length", 0))
            downloaded = 0
            with open(dest, "wb") as f:
                async for chunk in response.aiter_bytes(chunk_size=65536):
                    f.write(chunk)
                    downloaded += len(chunk)
                    if total:
                        pct = downloaded / total * 100
                        print(f"\rDownloading: {pct:.1f}%", end="", flush=True)
    print(f"\nSaved to {dest}")


async def consume_sse(url: str) -> None:
    """Consume server-sent events (SSE) from a streaming endpoint."""
    async with httpx.AsyncClient(timeout=None) as client:
        async with client.stream("GET", url, headers={"Accept": "text/event-stream"}) as response:
            async for line in response.aiter_lines():
                if line.startswith("data:"):
                    data = line[5:].strip()
                    print(f"Event: {data}")


# Sync streaming
def download_sync(url: str, dest: str) -> None:
    with httpx.stream("GET", url) as response:
        with open(dest, "wb") as f:
            for chunk in response.iter_bytes(65536):
                f.write(chunk)

HTTP/2 Support

Enable HTTP/2 to multiplex multiple requests over a single connection, reducing overhead for parallel API calls to the same host. The API is identical — httpx handles protocol negotiation via ALPN.

pip install httpx[http2]
import httpx
import asyncio

async def parallel_with_http2():
    async with httpx.AsyncClient(http2=True, base_url="https://api.github.com") as client:
        # HTTP/2 multiplexes these over one connection
        tasks = [
            client.get("/repos/python/cpython"),
            client.get("/repos/encode/httpx"),
            client.get("/repos/tiangolo/fastapi"),
        ]
        responses = await asyncio.gather(*tasks)
        for r in responses:
            data = r.json()
            print(f"{data['full_name']}: {data['stargazers_count']} stars")

asyncio.run(parallel_with_http2())

Testing with httpx

httpx provides TestClient (sync) and AsyncClient with ASGI transport for testing FastAPI apps without a running server. For mocking external calls, use httpx_mock from the pytest-httpx package.

import pytest
import httpx
from fastapi import FastAPI
from fastapi.testclient import TestClient

app = FastAPI()

@app.get("/items/{item_id}")
async def get_item(item_id: int):
    return {"item_id": item_id, "name": f"Item {item_id}"}

# Sync test client (no event loop needed)
def test_get_item_sync():
    client = TestClient(app)
    response = client.get("/items/42")
    assert response.status_code == 200
    assert response.json() == {"item_id": 42, "name": "Item 42"}


# Async test client
@pytest.mark.anyio
async def test_get_item_async():
    async with httpx.AsyncClient(
        transport=httpx.ASGITransport(app=app),
        base_url="http://test",
    ) as client:
        response = await client.get("/items/42")
        assert response.status_code == 200


# Mock external HTTP calls with pytest-httpx
# pip install pytest-httpx
from pytest_httpx import HTTPXMock

def test_external_call(httpx_mock: HTTPXMock):
    httpx_mock.add_response(
        url="https://api.example.com/users/1",
        json={"id": 1, "name": "Alice"},
        status_code=200,
    )
    response = httpx.get("https://api.example.com/users/1")
    assert response.json()["name"] == "Alice"

Frequently Asked Questions

httpx vs aiohttp vs requests — which to use?
Use httpx for new projects. It supports both sync and async with the same API, includes HTTP/2, and has first-class FastAPI/Starlette testing support. aiohttp is async-only and has a different API. requests is sync-only and blocks the event loop. httpx is the modern standard.
Why use a persistent AsyncClient instead of httpx.get()?
Every httpx.get() call creates a new client, establishes a new TCP connection, performs TLS handshake, and closes it afterwards. A persistent client reuses connections from its pool, reducing latency by 50–200ms per request to HTTPS endpoints.
How do I handle redirects?
httpx follows redirects by default (up to 20 hops). Set follow_redirects=False to inspect redirects manually, or max_redirects=5 to limit them. The response .history list shows all intermediate responses.