Python Rate Limiting: Token Bucket and Sliding Window

Rate limiting controls how many requests a client can make to your API within a time window, protecting services from abuse, accidental overload, and denial-of-service attacks. Python provides multiple approaches: in-memory limiters for single-process applications, Redis-backed distributed limiters for multi-instance deployments, and battle-tested libraries like slowapi that integrate directly into FastAPI and Flask. Understanding the underlying algorithms — token bucket, sliding window, and fixed window — helps you choose the right strategy for each endpoint.

Rate Limiting Algorithms Compared

Each algorithm offers different trade-offs between burst handling, memory usage, and accuracy. The fixed window counter is simplest but allows burst traffic at window boundaries. The sliding window log is accurate but memory-intensive. The token bucket and sliding window counter offer the best balance of accuracy, burst allowance, and efficiency.

Algorithm         | Burst handling | Memory  | Accuracy | Complexity
------------------|----------------|---------|----------|-----------
Fixed Window      | Poor (2x burst)| O(1)    | Low      | Very Low
Sliding Window Log| No burst       | O(n)    | Exact    | Medium
Sliding Window    | Approximate    | O(1)    | High     | Low
Token Bucket      | Controlled     | O(1)    | High     | Low
Leaky Bucket      | None           | O(1)    | Exact    | Low

Token Bucket Implementation

The token bucket algorithm maintains a bucket that fills with tokens at a constant rate up to a maximum capacity. Each request consumes one token. If the bucket is empty, the request is rejected. This naturally allows short bursts (up to the bucket capacity) while enforcing a long-term average rate. It's the algorithm used by most cloud providers (AWS API Gateway, Stripe, GitHub).

import asyncio
import time
from dataclasses import dataclass, field
from collections import defaultdict
import threading

@dataclass
class TokenBucket:
    """Thread-safe token bucket rate limiter."""
    capacity: float      # max tokens
    refill_rate: float   # tokens added per second
    _tokens: float = field(init=False)
    _last_refill: float = field(init=False)
    _lock: threading.Lock = field(default_factory=threading.Lock, init=False)

    def __post_init__(self):
        self._tokens = self.capacity
        self._last_refill = time.monotonic()

    def _refill(self):
        now = time.monotonic()
        elapsed = now - self._last_refill
        added = elapsed * self.refill_rate
        self._tokens = min(self.capacity, self._tokens + added)
        self._last_refill = now

    def consume(self, tokens: float = 1.0) -> bool:
        """Return True if request is allowed, False if rate limited."""
        with self._lock:
            self._refill()
            if self._tokens >= tokens:
                self._tokens -= tokens
                return True
            return False

    @property
    def available(self) -> float:
        with self._lock:
            self._refill()
            return self._tokens

# Per-client rate limiter using a dict of buckets
class RateLimiter:
    def __init__(self, capacity: int = 10, rate: float = 1.0):
        self.capacity = capacity
        self.rate = rate
        self._buckets: dict[str, TokenBucket] = defaultdict(
            lambda: TokenBucket(capacity=self.capacity, refill_rate=self.rate)
        )

    def is_allowed(self, client_id: str) -> bool:
        return self._buckets[client_id].consume()

# Usage
limiter = RateLimiter(capacity=10, rate=2.0)  # burst 10, refill 2/sec

for i in range(15):
    allowed = limiter.is_allowed("user-123")
    print(f"Request {i+1}: {'OK' if allowed else 'RATE LIMITED'}")
    time.sleep(0.1)

Sliding Window with Redis

Redis-backed sliding window rate limiting works across multiple application instances and survives process restarts. The sliding window counter stores the count in a sorted set keyed by client ID, with request timestamps as scores. A Lua script runs the check atomically on the Redis server, preventing race conditions that would occur if the check and update were two separate commands.

import redis.asyncio as aioredis
import time

redis_client = aioredis.from_url("redis://localhost:6379")

SLIDING_WINDOW_SCRIPT = """
local key = KEYS[1]
local now = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local limit = tonumber(ARGV[3])
local request_id = ARGV[4]

-- Remove timestamps outside the window
redis.call('ZREMRANGEBYSCORE', key, 0, now - window)

-- Count requests in the current window
local count = redis.call('ZCARD', key)

if count < limit then
    -- Add this request
    redis.call('ZADD', key, now, request_id)
    redis.call('EXPIRE', key, math.ceil(window / 1000))
    return {1, limit - count - 1, 0}  -- allowed, remaining, retry_after
else
    -- Find when the oldest request will expire
    local oldest = redis.call('ZRANGE', key, 0, 0, 'WITHSCORES')
    local retry_after = math.ceil((tonumber(oldest[2]) + window - now) / 1000)
    return {0, 0, retry_after}  -- denied, remaining, retry_after
end
"""

_script = None

async def sliding_window_rate_limit(
    client_id: str,
    limit: int = 100,
    window_ms: int = 60_000,  # 1 minute
) -> tuple[bool, int, int]:
    """
    Returns (allowed, remaining, retry_after_seconds).
    Uses atomic Lua script to prevent race conditions.
    """
    global _script
    if _script is None:
        _script = redis_client.register_script(SLIDING_WINDOW_SCRIPT)

    now_ms = int(time.time() * 1000)
    request_id = f"{now_ms}-{id(object())}"
    key = f"ratelimit:{client_id}"

    result = await _script(
        keys=[key],
        args=[now_ms, window_ms, limit, request_id]
    )
    allowed = bool(result[0])
    remaining = int(result[1])
    retry_after = int(result[2])
    return allowed, remaining, retry_after

slowapi with FastAPI

slowapi is a rate limiting library for FastAPI and Starlette, inspired by Flask-Limiter. It uses the limits library under the hood, which supports multiple storage backends (in-memory, Redis, memcached). Decorating routes with @limiter.limit() is the simplest way to add rate limiting without writing middleware from scratch.

from fastapi import FastAPI, Request, HTTPException
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from slowapi.middleware import SlowAPIMiddleware

# Use IP address as the key by default
limiter = Limiter(key_func=get_remote_address)
app = FastAPI()
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
app.add_middleware(SlowAPIMiddleware)

# Use Redis for distributed rate limiting
from limits.storage import RedisStorage
limiter_with_redis = Limiter(
    key_func=get_remote_address,
    storage_uri="redis://localhost:6379",
)

@app.get("/api/public")
@limiter.limit("100/minute")
async def public_endpoint(request: Request):
    return {"message": "OK"}

@app.get("/api/search")
@limiter.limit("10/minute;50/hour")  # multiple limits
async def search_endpoint(request: Request, q: str):
    return {"query": q, "results": []}

@app.post("/api/login")
@limiter.limit("5/minute")  # strict limit on auth endpoints
async def login_endpoint(request: Request):
    return {"token": "..."}

# Custom key function — rate limit per authenticated user
def get_user_id(request: Request) -> str:
    user = getattr(request.state, "user", None)
    if user:
        return f"user:{user.id}"
    return get_remote_address(request)

user_limiter = Limiter(key_func=get_user_id)

Per-User and Per-Endpoint Limits

Production APIs typically enforce different limits per user tier (free vs paid) and per endpoint sensitivity (read vs write, public vs authenticated). This requires a dynamic limit function that reads the user's plan from the database or JWT claims and returns the appropriate limit string.

from fastapi import Depends
from functools import lru_cache

PLAN_LIMITS = {
    "free":       {"api": "60/hour",   "search": "10/minute"},
    "pro":        {"api": "1000/hour", "search": "100/minute"},
    "enterprise": {"api": "10000/hour","search": "1000/minute"},
}

def get_api_limit(request: Request) -> str:
    """Dynamic limit based on authenticated user's plan."""
    user = getattr(request.state, "user", None)
    plan = user.plan if user else "free"
    return PLAN_LIMITS.get(plan, PLAN_LIMITS["free"])["api"]

# FastAPI middleware that enforces per-user limits
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import JSONResponse

class DynamicRateLimitMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        user_id = getattr(request.state, "user_id", request.client.host)
        endpoint = request.url.path

        allowed, remaining, retry_after = await sliding_window_rate_limit(
            client_id=f"{user_id}:{endpoint}",
            limit=self._get_limit(request),
            window_ms=60_000,
        )

        if not allowed:
            return JSONResponse(
                status_code=429,
                content={"detail": "Rate limit exceeded"},
                headers={
                    "Retry-After": str(retry_after),
                    "X-RateLimit-Remaining": "0",
                },
            )

        response = await call_next(request)
        response.headers["X-RateLimit-Remaining"] = str(remaining)
        return response

    def _get_limit(self, request: Request) -> int:
        user = getattr(request.state, "user", None)
        plan = user.plan if user else "free"
        return {"free": 60, "pro": 1000, "enterprise": 10000}.get(plan, 60)

Rate Limit Response Headers

Standard rate limit headers let clients handle throttling gracefully by backing off before hitting the limit. The RateLimit headers draft standard (IETF draft-polli-ratelimit-headers) proposes RateLimit-Limit, RateLimit-Remaining, and RateLimit-Reset. Always include Retry-After in 429 responses so clients know exactly when to retry.

from fastapi import Response
from datetime import datetime, timezone

def add_rate_limit_headers(
    response: Response,
    limit: int,
    remaining: int,
    reset_at: datetime,
    retry_after: int | None = None,
):
    """Add standard rate limit headers to a response."""
    response.headers["X-RateLimit-Limit"] = str(limit)
    response.headers["X-RateLimit-Remaining"] = str(max(0, remaining))
    response.headers["X-RateLimit-Reset"] = str(int(reset_at.timestamp()))
    if retry_after is not None:
        response.headers["Retry-After"] = str(retry_after)

# 429 response with proper headers
def rate_limit_response(retry_after: int) -> JSONResponse:
    return JSONResponse(
        status_code=429,
        content={
            "detail": "Too Many Requests",
            "retry_after": retry_after,
        },
        headers={
            "Retry-After": str(retry_after),
            "Content-Type": "application/json",
        },
    )

Testing Rate Limiters

Rate limiting logic is critical enough to warrant dedicated unit tests. Test that the limiter allows requests under the limit, blocks requests over the limit, and resets correctly after the window expires. Use freezegun or time.monotonic patching to control time in tests without actually sleeping.

import pytest
from unittest.mock import patch
import time

@pytest.fixture
def bucket():
    return TokenBucket(capacity=5, refill_rate=1.0)

def test_allows_requests_within_limit(bucket):
    for _ in range(5):
        assert bucket.consume() is True

def test_blocks_when_empty(bucket):
    for _ in range(5):
        bucket.consume()
    assert bucket.consume() is False  # 6th request denied

def test_refills_over_time(bucket):
    for _ in range(5):
        bucket.consume()  # drain the bucket

    # Simulate 3 seconds passing
    with patch("time.monotonic", return_value=time.monotonic() + 3):
        bucket._refill()
        assert bucket._tokens == pytest.approx(3.0, abs=0.1)
        assert bucket.consume() is True

@pytest.mark.asyncio
async def test_sliding_window_redis():
    """Integration test with real Redis (requires running instance)."""
    client_id = "test-user-123"
    # Clean up from previous runs
    await redis_client.delete(f"ratelimit:{client_id}")

    for i in range(5):
        allowed, remaining, _ = await sliding_window_rate_limit(
            client_id, limit=5, window_ms=60_000
        )
        assert allowed is True
        assert remaining == 4 - i

    allowed, remaining, retry_after = await sliding_window_rate_limit(
        client_id, limit=5, window_ms=60_000
    )
    assert allowed is False
    assert retry_after > 0