Python WebSockets: Real-Time Communication with FastAPI

WebSockets provide a persistent, full-duplex connection between client and server — perfect for chat applications, live dashboards, collaborative editing, and real-time notifications. FastAPI supports WebSockets natively with @app.websocket(), giving you the same async programming model as HTTP endpoints. This guide covers the fundamentals, connection management, room-based chat, JWT authentication, and horizontal scaling with Redis pub/sub.

WebSocket Basics in FastAPI

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import json

app = FastAPI()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    # Accept the WebSocket handshake
    await websocket.accept()
    try:
        while True:
            # Receive text or bytes
            data = await websocket.receive_text()
            message = json.loads(data)
            # Echo back with a timestamp
            import time
            response = {"echo": message, "ts": time.time()}
            await websocket.send_text(json.dumps(response))
    except WebSocketDisconnect:
        print("Client disconnected")

# Send different data types
@app.websocket("/ws/binary")
async def binary_endpoint(websocket: WebSocket):
    await websocket.accept()
    async for data in websocket.iter_bytes():
        # Process binary data (images, audio, etc.)
        await websocket.send_bytes(data)

Connection Manager

A connection manager tracks active WebSocket connections and provides broadcast capabilities. Keep one manager instance per application (or per room for multi-room apps).

from typing import Optional
import asyncio

class ConnectionManager:
    def __init__(self):
        # Map of connection_id -> WebSocket
        self._connections: dict[str, WebSocket] = {}
        self._lock = asyncio.Lock()

    async def connect(self, connection_id: str, ws: WebSocket) -> None:
        await ws.accept()
        async with self._lock:
            self._connections[connection_id] = ws

    async def disconnect(self, connection_id: str) -> None:
        async with self._lock:
            self._connections.pop(connection_id, None)

    async def send_to(self, connection_id: str, data: dict) -> bool:
        ws = self._connections.get(connection_id)
        if ws:
            try:
                await ws.send_json(data)
                return True
            except Exception:
                await self.disconnect(connection_id)
        return False

    async def broadcast(self, data: dict, exclude: Optional[str] = None) -> None:
        dead = []
        for cid, ws in list(self._connections.items()):
            if cid == exclude:
                continue
            try:
                await ws.send_json(data)
            except Exception:
                dead.append(cid)
        for cid in dead:
            await self.disconnect(cid)

    @property
    def count(self) -> int:
        return len(self._connections)

manager = ConnectionManager()

@app.websocket("/ws/{client_id}")
async def websocket_endpoint(client_id: str, ws: WebSocket):
    await manager.connect(client_id, ws)
    await manager.broadcast({"type": "join", "user": client_id, "count": manager.count})
    try:
        while True:
            data = await ws.receive_json()
            await manager.broadcast({"type": "message", "from": client_id, "data": data})
    except WebSocketDisconnect:
        await manager.disconnect(client_id)
        await manager.broadcast({"type": "leave", "user": client_id, "count": manager.count})

Room-Based Chat

from collections import defaultdict

class RoomManager:
    def __init__(self):
        # room_id -> {connection_id -> WebSocket}
        self._rooms: dict[str, dict[str, WebSocket]] = defaultdict(dict)

    async def join(self, room_id: str, user_id: str, ws: WebSocket) -> None:
        await ws.accept()
        self._rooms[room_id][user_id] = ws
        await self.broadcast_room(room_id, {"type": "join", "user": user_id}, exclude=user_id)

    async def leave(self, room_id: str, user_id: str) -> None:
        self._rooms[room_id].pop(user_id, None)
        if not self._rooms[room_id]:
            del self._rooms[room_id]
        else:
            await self.broadcast_room(room_id, {"type": "leave", "user": user_id})

    async def broadcast_room(self, room_id: str, data: dict, exclude: str = None) -> None:
        dead = []
        for uid, ws in list(self._rooms.get(room_id, {}).items()):
            if uid == exclude:
                continue
            try:
                await ws.send_json(data)
            except Exception:
                dead.append(uid)
        for uid in dead:
            self._rooms[room_id].pop(uid, None)

    def room_members(self, room_id: str) -> list[str]:
        return list(self._rooms.get(room_id, {}).keys())

room_manager = RoomManager()

@app.websocket("/ws/rooms/{room_id}/{user_id}")
async def room_endpoint(room_id: str, user_id: str, ws: WebSocket):
    await room_manager.join(room_id, user_id, ws)
    try:
        while True:
            data = await ws.receive_json()
            await room_manager.broadcast_room(
                room_id,
                {"type": "message", "from": user_id, "text": data.get("text")},
            )
    except WebSocketDisconnect:
        await room_manager.leave(room_id, user_id)

WebSocket Authentication

WebSocket connections cannot use HTTP headers for auth after the initial handshake. Authenticate via query parameters (for the initial connection) or via the first message after connecting.

from fastapi import Query, HTTPException
import jwt

def decode_token(token: str) -> dict:
    return jwt.decode(token, SECRET_KEY, algorithms=["HS256"])

@app.websocket("/ws/secure")
async def secure_ws(websocket: WebSocket, token: str = Query(...)):
    # Validate JWT before accepting the connection
    try:
        claims = decode_token(token)
    except jwt.InvalidTokenError:
        await websocket.close(code=4001, reason="Unauthorized")
        return

    user_id = claims["sub"]
    await websocket.accept()
    await websocket.send_json({"type": "connected", "user_id": user_id})
    try:
        while True:
            data = await websocket.receive_json()
            # Process authenticated message
            await websocket.send_json({"type": "ack", "received": data})
    except WebSocketDisconnect:
        pass

Scaling with Redis Pub/Sub

When running multiple FastAPI workers or instances, each process has its own in-memory connection manager. To broadcast across all instances, use Redis pub/sub: each instance subscribes to the same channel, and any message published by one instance is received by all.

import asyncio
import redis.asyncio as aioredis
import json

redis_client = aioredis.from_url("redis://localhost:6379")

async def redis_listener(channel: str, local_manager: ConnectionManager):
    """Subscribe to Redis channel and broadcast to local WebSocket connections."""
    pubsub = redis_client.pubsub()
    await pubsub.subscribe(channel)
    async for message in pubsub.listen():
        if message["type"] == "message":
            data = json.loads(message["data"])
            await local_manager.broadcast(data)

# Start the listener as a background task on app startup
@app.on_event("startup")
async def startup():
    asyncio.create_task(redis_listener("chat:global", manager))

@app.websocket("/ws/scalable/{client_id}")
async def scalable_ws(client_id: str, ws: WebSocket):
    await manager.connect(client_id, ws)
    try:
        while True:
            data = await ws.receive_json()
            # Publish to Redis — all instances receive it
            await redis_client.publish("chat:global", json.dumps({
                "type": "message",
                "from": client_id,
                "text": data.get("text"),
            }))
    except WebSocketDisconnect:
        await manager.disconnect(client_id)

JavaScript Client

// Reconnecting WebSocket client
class ChatClient {
    constructor(url, token) {
        this.url = `${url}?token=${token}`;
        this.ws = null;
        this.reconnectDelay = 1000;
        this.connect();
    }

    connect() {
        this.ws = new WebSocket(this.url);

        this.ws.onopen = () => {
            console.log("Connected");
            this.reconnectDelay = 1000; // reset backoff
        };

        this.ws.onmessage = (event) => {
            const data = JSON.parse(event.data);
            this.onMessage(data);
        };

        this.ws.onclose = (event) => {
            if (event.code !== 1000) { // not intentional close
                setTimeout(() => this.connect(), this.reconnectDelay);
                this.reconnectDelay = Math.min(this.reconnectDelay * 2, 30000);
            }
        };
    }

    send(text) {
        if (this.ws?.readyState === WebSocket.OPEN) {
            this.ws.send(JSON.stringify({ text }));
        }
    }

    onMessage(data) {
        console.log("Received:", data);
    }

    disconnect() {
        this.ws?.close(1000, "Client disconnect");
    }
}

const client = new ChatClient("ws://localhost:8000/ws/secure", "your-jwt-token");

Error Handling and Reconnection

WebSocket connections drop due to network issues, timeouts, and server restarts. Implement heartbeats (ping/pong) to detect stale connections, and exponential backoff reconnection on the client.

import asyncio
from fastapi import WebSocket, WebSocketDisconnect

PING_INTERVAL = 30  # seconds

@app.websocket("/ws/heartbeat/{client_id}")
async def heartbeat_ws(client_id: str, ws: WebSocket):
    await ws.accept()

    async def send_ping():
        while True:
            await asyncio.sleep(PING_INTERVAL)
            try:
                await ws.send_json({"type": "ping"})
            except Exception:
                break

    ping_task = asyncio.create_task(send_ping())
    try:
        while True:
            data = await ws.receive_json()
            if data.get("type") == "pong":
                continue  # client acknowledged heartbeat
            await ws.send_json({"type": "ack", "received": data})
    except WebSocketDisconnect:
        pass
    finally:
        ping_task.cancel()

Frequently Asked Questions

How many WebSocket connections can FastAPI handle?
A single uvicorn worker handles thousands of concurrent WebSocket connections (typically 5,000–20,000) since each connection is just a coroutine in the event loop — no thread per connection. Scale horizontally for more capacity, using Redis pub/sub to synchronize state.
Should I use WebSockets or Server-Sent Events (SSE)?
Use SSE when communication is server-to-client only (notifications, live feeds) — SSE is simpler, works over plain HTTP, and reconnects automatically. Use WebSockets when you need bidirectional communication (chat, collaborative editing, games).
How do I handle WebSocket message ordering?
WebSocket messages within a single connection are always ordered. Across multiple connections or server instances, ordering is not guaranteed. Include a sequence number or timestamp in your messages and let the client sort if ordering matters.