Python Rate Limiting: Token Bucket and Sliding Window

Rate limiting protects your API from abuse, prevents runaway clients from overloading your database, and enforces fair usage across tenants. Python offers several rate limiting strategies — fixed window, sliding window log, sliding window counter, and token bucket — each with different trade-offs in accuracy, memory, and fairness. This guide covers pure-Python implementations, Redis-backed distributed rate limiting, and integrating with FastAPI using slowapi.

Rate Limiting Algorithms Compared

AlgorithmMemoryAccuracyBurst HandlingBest For
Fixed WindowO(1)Low (boundary spikes)PoorSimple quotas
Sliding Window LogO(requests)HighGoodAccurate limiting
Sliding Window CounterO(1)MediumGoodHigh-traffic APIs
Token BucketO(1)HighExcellentBurst-friendly APIs
Leaky BucketO(1)HighNoneSmooth output rate

Token Bucket Implementation

The token bucket is the most practical rate limiting algorithm. Tokens accumulate at a fixed rate up to a maximum capacity. Each request consumes one token. Bursts are allowed up to the bucket capacity; after that, requests are rejected until tokens refill. This gives clients freedom to burst while enforcing a sustainable average rate.

import time
import threading
from dataclasses import dataclass, field

@dataclass
class TokenBucket:
    """Thread-safe token bucket rate limiter."""
    capacity: float          # max tokens (burst size)
    refill_rate: float       # tokens per second
    tokens: float = field(init=False)
    last_refill: float = field(init=False)
    _lock: threading.Lock = field(default_factory=threading.Lock, init=False, repr=False)

    def __post_init__(self):
        self.tokens = self.capacity
        self.last_refill = time.monotonic()

    def _refill(self):
        now = time.monotonic()
        elapsed = now - self.last_refill
        added = elapsed * self.refill_rate
        self.tokens = min(self.capacity, self.tokens + added)
        self.last_refill = now

    def consume(self, tokens: float = 1.0) -> bool:
        with self._lock:
            self._refill()
            if self.tokens >= tokens:
                self.tokens -= tokens
                return True
            return False

    @property
    def available(self) -> float:
        with self._lock:
            self._refill()
            return self.tokens

# Usage
limiter = TokenBucket(capacity=10, refill_rate=2)  # 10 burst, 2/sec sustained

for i in range(15):
    allowed = limiter.consume()
    print(f"Request {i+1}: {'✓ allowed' if allowed else '✗ rejected'}")

# Per-client bucket (one bucket per IP/user)
class ClientRateLimiter:
    def __init__(self, capacity: float, refill_rate: float):
        self._buckets: dict[str, TokenBucket] = {}
        self._lock = threading.Lock()
        self.capacity = capacity
        self.refill_rate = refill_rate

    def is_allowed(self, client_id: str) -> bool:
        with self._lock:
            if client_id not in self._buckets:
                self._buckets[client_id] = TokenBucket(self.capacity, self.refill_rate)
        return self._buckets[client_id].consume()

rate_limiter = ClientRateLimiter(capacity=20, refill_rate=5)
print(rate_limiter.is_allowed("user_123"))  # True

Sliding Window with Redis

For distributed systems where multiple app instances serve requests, use Redis for shared rate limit state. The sliding window log uses a Redis sorted set — request timestamps as scores — to count requests within the past N seconds.

import time
import redis.asyncio as aioredis

redis = aioredis.from_url("redis://localhost:6379")

async def sliding_window_check(
    key: str,
    limit: int,
    window_seconds: int,
) -> tuple[bool, int, int]:
    """
    Returns: (allowed, current_count, retry_after_seconds)
    Uses a sorted set — score=timestamp, member=unique request ID.
    """
    now = time.time()
    window_start = now - window_seconds
    pipe = redis.pipeline()

    # Remove expired requests
    pipe.zremrangebyscore(key, 0, window_start)
    # Count current requests
    pipe.zcard(key)
    # Add this request
    pipe.zadd(key, {str(now): now})
    # Set TTL
    pipe.expire(key, window_seconds + 1)

    results = await pipe.execute()
    current_count = results[1]

    if current_count >= limit:
        return False, current_count, window_seconds
    return True, current_count + 1, 0

# Redis Lua script for atomic fixed-window counter (faster, O(1) memory)
RATE_LIMIT_SCRIPT = """
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local current = redis.call('INCR', key)
if current == 1 then
    redis.call('EXPIRE', key, window)
end
if current > limit then
    return {0, current, redis.call('TTL', key)}
end
return {1, current, 0}
"""

async def fixed_window_check(key: str, limit: int, window: int):
    script = redis.register_script(RATE_LIMIT_SCRIPT)
    allowed, count, retry_after = await script(keys=[key], args=[limit, window])
    return bool(allowed), count, retry_after

slowapi: FastAPI Rate Limiting

pip install slowapi
from fastapi import FastAPI, Request
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded

limiter = Limiter(key_func=get_remote_address)  # limit by IP
app = FastAPI()
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)

@app.get("/api/data")
@limiter.limit("100/minute")
async def get_data(request: Request):
    return {"data": "your data here"}

@app.get("/api/search")
@limiter.limit("10/second;100/minute;1000/hour")
async def search(request: Request, q: str):
    return {"results": []}

# Custom key: limit per authenticated user
def get_user_id(request: Request) -> str:
    user = getattr(request.state, "user", None)
    return str(user.id) if user else get_remote_address(request)

limiter_by_user = Limiter(key_func=get_user_id)

@app.get("/api/expensive")
@limiter_by_user.limit("10/minute")
async def expensive_op(request: Request):
    return {"status": "ok"}

Per-User and Per-Endpoint Limits

import asyncio
from fastapi import FastAPI, Request, HTTPException, Depends
from typing import Annotated

app = FastAPI()

# Tiered rate limits: free vs paid
LIMITS = {
    "free":    {"requests_per_minute": 60,   "requests_per_day": 1_000},
    "starter": {"requests_per_minute": 300,  "requests_per_day": 10_000},
    "pro":     {"requests_per_minute": 1000, "requests_per_day": 100_000},
}

async def check_rate_limit(request: Request, user=Depends(get_current_user)):
    tier = getattr(user, "tier", "free")
    limits = LIMITS[tier]

    minute_key = f"rl:{user.id}:minute:{int(time.time() // 60)}"
    day_key = f"rl:{user.id}:day:{int(time.time() // 86400)}"

    async with redis.pipeline() as pipe:
        pipe.incr(minute_key)
        pipe.expire(minute_key, 61)
        pipe.incr(day_key)
        pipe.expire(day_key, 86401)
        results = await pipe.execute()

    minute_count, _, day_count, _ = results

    if minute_count > limits["requests_per_minute"]:
        raise HTTPException(
            status_code=429,
            detail="Rate limit exceeded: too many requests per minute",
            headers={
                "X-RateLimit-Limit": str(limits["requests_per_minute"]),
                "X-RateLimit-Remaining": "0",
                "Retry-After": "60",
            },
        )
    if day_count > limits["requests_per_day"]:
        raise HTTPException(status_code=429, detail="Daily quota exceeded")

    # Attach rate limit info to response via request state
    request.state.rate_limit_remaining = limits["requests_per_minute"] - minute_count

@app.get("/api/query", dependencies=[Depends(check_rate_limit)])
async def run_query(request: Request, q: str):
    return {"results": [], "remaining": request.state.rate_limit_remaining}

Client-Side Rate Limiting

import asyncio
import time
from collections import deque

class AsyncRateLimiter:
    """Client-side rate limiter for outgoing API calls."""
    def __init__(self, calls: int, period: float):
        self.calls = calls
        self.period = period
        self._timestamps: deque = deque()
        self._lock = asyncio.Lock()

    async def acquire(self):
        async with self._lock:
            now = time.monotonic()
            # Remove timestamps outside the window
            while self._timestamps and now - self._timestamps[0] >= self.period:
                self._timestamps.popleft()

            if len(self._timestamps) >= self.calls:
                # Wait until oldest timestamp expires
                sleep_time = self.period - (now - self._timestamps[0])
                if sleep_time > 0:
                    await asyncio.sleep(sleep_time)

            self._timestamps.append(time.monotonic())

    async def __aenter__(self):
        await self.acquire()
        return self

    async def __aexit__(self, *args):
        pass

# Usage: call an external API at most 10 req/sec
api_limiter = AsyncRateLimiter(calls=10, period=1.0)

async def call_external_api(url: str):
    async with api_limiter:
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as resp:
                return await resp.json()

Rate Limit Headers

from fastapi import Response
from fastapi.middleware.base import BaseHTTPMiddleware

class RateLimitHeaderMiddleware(BaseHTTPMiddleware):
    """Add standard rate limit headers to all responses."""

    async def dispatch(self, request, call_next):
        response = await call_next(request)

        # Populate from request state if set by rate limit dependency
        limit = getattr(request.state, "rate_limit", 100)
        remaining = getattr(request.state, "rate_limit_remaining", 100)
        reset_time = getattr(request.state, "rate_limit_reset", int(time.time()) + 60)

        response.headers["X-RateLimit-Limit"] = str(limit)
        response.headers["X-RateLimit-Remaining"] = str(max(0, remaining))
        response.headers["X-RateLimit-Reset"] = str(reset_time)
        response.headers["X-RateLimit-Policy"] = f"{limit};w=60"
        return response

app.add_middleware(RateLimitHeaderMiddleware)

Frequently Asked Questions

What HTTP status code should I return for rate limit exceeded?
Return 429 Too Many Requests with a Retry-After header indicating how many seconds to wait. Include X-RateLimit-Limit, X-RateLimit-Remaining, and X-RateLimit-Reset headers on every response so clients can throttle proactively.
How do I handle rate limiting in a load-balanced environment?
Use Redis as the shared counter store — never in-process memory. All app instances write to the same Redis keys, so the rate limit is enforced across the entire fleet regardless of which instance handles the request.
How should I rate limit webhooks?
Apply rate limits on the sending side (client) and implement idempotency on the receiving side. Use exponential backoff when you receive 429 responses. Implement a dead-letter queue for retrying failed deliveries.