Python JWT Authentication: Access Tokens and Refresh Flow

JSON Web Tokens (JWT) are the dominant mechanism for stateless API authentication. A short-lived access token grants API access; a long-lived refresh token lets clients obtain new access tokens without re-authentication. This guide covers PyJWT, token generation and validation, the refresh flow, FastAPI integration, and security pitfalls to avoid.

JWT Anatomy

A JWT is three Base64URL-encoded sections joined by dots: header.payload.signature. The header specifies the algorithm. The payload (claims) carries the data — sub (subject), exp (expiry), iat (issued at), and any custom claims. The signature is an HMAC or RSA signature over the first two sections, preventing tampering. JWTs are not encrypted by default — anyone can read the payload. Never put secrets in a JWT.

import base64
import json

# Manually decode a JWT (without verification — for illustration only)
def decode_jwt_payload(token: str) -> dict:
    payload_b64 = token.split(".")[1]
    # JWT uses base64url (no padding) — add padding if needed
    padding = 4 - len(payload_b64) % 4
    payload_b64 += "=" * (padding % 4)
    return json.loads(base64.urlsafe_b64decode(payload_b64))

# Example output for a typical JWT:
# {
#   "sub": "user_123",
#   "iat": 1718265600,
#   "exp": 1718269200,
#   "type": "access",
#   "roles": ["user"]
# }

PyJWT Basics: Encode and Decode

import jwt
import os
from datetime import datetime, timedelta, timezone

SECRET_KEY = os.environ["JWT_SECRET"]  # min 32 random bytes
ALGORITHM = "HS256"

def create_token(payload: dict, expires_in: timedelta) -> str:
    now = datetime.now(timezone.utc)
    payload = {
        **payload,
        "iat": now,
        "exp": now + expires_in,
    }
    return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)

def decode_token(token: str) -> dict:
    try:
        return jwt.decode(
            token,
            SECRET_KEY,
            algorithms=[ALGORITHM],
            options={"require": ["exp", "iat", "sub"]},
        )
    except jwt.ExpiredSignatureError:
        raise ValueError("Token has expired")
    except jwt.InvalidTokenError as e:
        raise ValueError(f"Invalid token: {e}")

# Usage
token = create_token({"sub": "user_123", "roles": ["admin"]}, timedelta(minutes=15))
claims = decode_token(token)
print(claims["sub"])   # user_123
print(claims["roles"]) # ['admin']

Access Token + Refresh Token Flow

The dual-token pattern uses a short-lived access token (15 minutes) for API requests and a long-lived refresh token (7–30 days) stored server-side. When the access token expires, the client uses the refresh token to get a new pair. Each refresh rotates the refresh token, invalidating the old one.

import secrets
from datetime import timedelta

ACCESS_TOKEN_EXPIRE = timedelta(minutes=15)
REFRESH_TOKEN_EXPIRE = timedelta(days=7)

class TokenService:
    def __init__(self, redis_client):
        self.redis = redis_client

    def create_token_pair(self, user_id: str, roles: list[str]) -> dict:
        access_token = create_token(
            {"sub": user_id, "roles": roles, "type": "access"},
            ACCESS_TOKEN_EXPIRE,
        )
        refresh_token = secrets.token_urlsafe(48)  # opaque random token
        # Store refresh token in Redis with TTL
        key = f"refresh:{refresh_token}"
        self.redis.setex(key, int(REFRESH_TOKEN_EXPIRE.total_seconds()), user_id)
        return {
            "access_token": access_token,
            "refresh_token": refresh_token,
            "token_type": "bearer",
            "expires_in": int(ACCESS_TOKEN_EXPIRE.total_seconds()),
        }

    def rotate_refresh_token(self, old_refresh_token: str) -> dict:
        key = f"refresh:{old_refresh_token}"
        user_id = self.redis.get(key)
        if not user_id:
            raise ValueError("Invalid or expired refresh token")

        # Delete old token (rotation — each refresh token is single-use)
        self.redis.delete(key)

        # Issue new pair
        user_id = user_id.decode() if isinstance(user_id, bytes) else user_id
        return self.create_token_pair(user_id, roles=self._get_roles(user_id))

    def revoke_refresh_token(self, refresh_token: str) -> None:
        self.redis.delete(f"refresh:{refresh_token}")

    def _get_roles(self, user_id: str) -> list[str]:
        # In production, look up roles from DB
        return ["user"]

FastAPI Integration

FastAPI's dependency injection makes JWT authentication clean and reusable. Define a dependency that extracts and validates the token, then use it on any route that requires authentication.

from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel

app = FastAPI()
security = HTTPBearer()

class TokenClaims(BaseModel):
    sub: str
    roles: list[str] = []
    type: str = "access"

def get_current_user(
    credentials: HTTPAuthorizationCredentials = Depends(security),
) -> TokenClaims:
    token = credentials.credentials
    try:
        payload = decode_token(token)
    except ValueError as e:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail=str(e),
            headers={"WWW-Authenticate": "Bearer"},
        )
    if payload.get("type") != "access":
        raise HTTPException(status_code=401, detail="Not an access token")
    return TokenClaims(**payload)

def require_role(role: str):
    def checker(user: TokenClaims = Depends(get_current_user)) -> TokenClaims:
        if role not in user.roles:
            raise HTTPException(status_code=403, detail="Insufficient permissions")
        return user
    return checker

@app.post("/auth/login")
async def login(username: str, password: str):
    # Verify credentials against DB (simplified)
    if username == "admin" and password == "secret":
        token_service = TokenService(redis_client)
        return token_service.create_token_pair("user_123", ["admin"])
    raise HTTPException(status_code=401, detail="Invalid credentials")

@app.post("/auth/refresh")
async def refresh(refresh_token: str):
    try:
        return token_service.rotate_refresh_token(refresh_token)
    except ValueError as e:
        raise HTTPException(status_code=401, detail=str(e))

@app.get("/users/me")
async def get_me(user: TokenClaims = Depends(get_current_user)):
    return {"user_id": user.sub, "roles": user.roles}

@app.delete("/users/{user_id}")
async def delete_user(
    user_id: str,
    admin: TokenClaims = Depends(require_role("admin")),
):
    return {"deleted": user_id}

Token Blacklisting with Redis

Access tokens cannot be revoked before expiry by default — once issued, they're valid until expiry. For logout or security incidents, maintain a Redis blocklist of revoked JTI (JWT ID) claims.

import uuid

def create_access_token(user_id: str, roles: list[str]) -> str:
    jti = str(uuid.uuid4())  # unique token ID
    return create_token(
        {"sub": user_id, "roles": roles, "type": "access", "jti": jti},
        ACCESS_TOKEN_EXPIRE,
    )

def is_token_revoked(jti: str, redis_client) -> bool:
    return redis_client.exists(f"blocklist:{jti}") > 0

def revoke_token(jti: str, redis_client, ttl_seconds: int) -> None:
    redis_client.setex(f"blocklist:{jti}", ttl_seconds, "1")

# Updated get_current_user with blocklist check
def get_current_user_checked(
    credentials: HTTPAuthorizationCredentials = Depends(security),
    redis=Depends(get_redis),
) -> TokenClaims:
    payload = decode_token(credentials.credentials)
    jti = payload.get("jti")
    if jti and is_token_revoked(jti, redis):
        raise HTTPException(status_code=401, detail="Token has been revoked")
    return TokenClaims(**payload)

RS256 with RSA Key Pairs

HS256 uses a shared secret — whoever can verify can also create tokens. RS256 uses asymmetric keys: the private key signs tokens (only your auth server), the public key verifies them (all services). This is the right choice for multi-service architectures.

from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.backends import default_backend

# Generate RSA key pair (do this once, store securely)
private_key = rsa.generate_private_key(
    public_exponent=65537,
    key_size=2048,
    backend=default_backend(),
)
public_key = private_key.public_key()

# Sign with private key (auth server only)
def create_rs256_token(payload: dict, expires_in: timedelta) -> str:
    from datetime import datetime, timezone
    now = datetime.now(timezone.utc)
    claims = {**payload, "iat": now, "exp": now + expires_in}
    return jwt.encode(claims, private_key, algorithm="RS256")

# Verify with public key (any service)
def verify_rs256_token(token: str) -> dict:
    return jwt.decode(token, public_key, algorithms=["RS256"])

Security Best Practices

  • Never log tokens — they're credentials. Redact Authorization headers from access logs.
  • Use HTTPS always — JWTs sent over plain HTTP can be intercepted and replayed.
  • Short access token lifetime — 15 minutes is standard. Don't use hours.
  • Store refresh tokens server-side — opaque tokens in Redis, not JWTs. This allows revocation.
  • Specify algorithm explicitly — always pass algorithms=["HS256"] to decode(). Never use algorithms=["none"].
  • Include audience claim — use aud to prevent tokens for service A from being used on service B.
  • Rotate refresh tokens — one refresh token, one use. Reuse of an old token indicates theft; revoke the entire token family.

Frequently Asked Questions

Should I store the refresh token in localStorage or a cookie?
Use an HttpOnly; Secure; SameSite=Strict cookie for the refresh token. This prevents JavaScript access (XSS protection) and automatic cross-site submission (CSRF protection). Store the short-lived access token in memory only.
How do I handle token refresh on the client side?
Use an HTTP interceptor (Axios, fetch wrapper) that catches 401 responses, calls /auth/refresh to get a new access token, then retries the original request transparently.
Can I put user permissions in the JWT?
Yes, but only if you can accept permission changes taking effect after the current token expires. For immediate revocation of permissions, check a fast cache (Redis) on each request using the sub claim as the key.