Python JWT Authentication: Access Tokens and Refresh Flow

JSON Web Tokens (JWT) are the standard for stateless API authentication. A JWT encodes user identity and claims in a signed token that can be verified without a database lookup. The standard pattern uses two tokens: a short-lived access token (15 minutes) for API authorization, and a long-lived refresh token (7–30 days) stored securely to obtain new access tokens. This guide covers implementing the full JWT auth flow in Python using PyJWT and FastAPI.

JWT Structure and Claims

A JWT consists of three Base64URL-encoded parts separated by dots: Header (algorithm), Payload (claims), and Signature. Standard claims (registered claims) include sub (subject/user ID), exp (expiration), iat (issued at), iss (issuer), and jti (JWT ID for uniqueness). Custom claims carry application-specific data like roles or permissions.

import jwt  # PyJWT: pip install PyJWT[crypto]
import base64, json

# A JWT looks like: header.payload.signature
# Each part is Base64URL encoded
sample_token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJ1c2VyXzEyMyIsImV4cCI6MTc1MDAwMDAwMCwiaWF0IjoxNzQ5OTk2NDAwLCJyb2xlIjoiYWRtaW4ifQ.SIGNATURE"

# Decode without verification to inspect (NEVER trust without verification)
parts = sample_token.split(".")
header  = json.loads(base64.b64decode(parts[0] + "=="))
payload = json.loads(base64.b64decode(parts[1] + "=="))
print("Header:", header)   # {'alg': 'HS256', 'typ': 'JWT'}
print("Payload:", payload) # {'sub': 'user_123', 'exp': ..., 'role': 'admin'}

# Standard claim types
from datetime import datetime, timezone, timedelta

claims = {
    "sub": "user_123",           # Subject: who the token is about
    "iss": "techoral.com",       # Issuer: who created the token
    "aud": "techoral-api",       # Audience: intended recipient
    "exp": datetime.now(tz=timezone.utc) + timedelta(minutes=15),  # Expiration
    "iat": datetime.now(tz=timezone.utc),  # Issued at
    "jti": "unique-token-id-123",          # JWT ID (for revocation)
    # Custom claims:
    "role": "admin",
    "permissions": ["read:users", "write:posts"],
}

PyJWT: Encoding and Decoding

PyJWT is the standard Python JWT library. Use HS256 (HMAC-SHA256) with a strong secret key for simple setups where you control both issuer and verifier. Use RS256 (RSA) when you need to verify tokens in other services without sharing a secret, or when implementing an OAuth2 authorization server.

import jwt
import secrets
from datetime import datetime, timezone, timedelta

# HS256: symmetric — same key signs and verifies
SECRET_KEY = secrets.token_urlsafe(64)  # 512-bit secret

def encode_hs256(payload: dict) -> str:
    return jwt.encode(payload, SECRET_KEY, algorithm="HS256")

def decode_hs256(token: str) -> dict:
    return jwt.decode(token, SECRET_KEY, algorithms=["HS256"])

# RS256: asymmetric — private key signs, public key verifies
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization

# Generate RSA key pair (do this once, store securely)
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
public_key = private_key.public_key()

private_pem = private_key.private_bytes(
    serialization.Encoding.PEM,
    serialization.PrivateFormat.PKCS8,
    serialization.NoEncryption()
)
public_pem = public_key.public_bytes(
    serialization.Encoding.PEM,
    serialization.PublicFormat.SubjectPublicKeyInfo
)

def encode_rs256(payload: dict) -> str:
    return jwt.encode(payload, private_pem, algorithm="RS256")

def decode_rs256(token: str, audience: str = "techoral-api") -> dict:
    return jwt.decode(token, public_pem, algorithms=["RS256"],
                      audience=audience)

# Error handling
try:
    payload = decode_hs256("some.invalid.token")
except jwt.ExpiredSignatureError:
    print("Token has expired")
except jwt.InvalidAudienceError:
    print("Token not intended for this service")
except jwt.InvalidTokenError as e:
    print(f"Invalid token: {e}")

Token Service: Access + Refresh

The token service encapsulates all JWT operations: creating access tokens (short-lived, carry user claims), creating refresh tokens (long-lived, carry only the user ID and a unique JTI for rotation), and validating both token types. Separating access and refresh token logic makes it easy to add token rotation and revocation later.

import uuid
import os
from dataclasses import dataclass
from datetime import datetime, timezone, timedelta
import jwt

@dataclass
class TokenPair:
    access_token: str
    refresh_token: str
    token_type: str = "bearer"
    expires_in: int = 900  # seconds

class TokenService:
    def __init__(self):
        self.secret = os.environ["JWT_SECRET"]
        self.algorithm = "HS256"
        self.access_ttl = timedelta(minutes=15)
        self.refresh_ttl = timedelta(days=30)
        self.issuer = "techoral.com"
        self.audience = "techoral-api"

    def create_access_token(self, user_id: str, roles: list[str] = None) -> str:
        now = datetime.now(tz=timezone.utc)
        payload = {
            "sub": user_id,
            "iss": self.issuer,
            "aud": self.audience,
            "iat": now,
            "exp": now + self.access_ttl,
            "jti": str(uuid.uuid4()),
            "type": "access",
            "roles": roles or [],
        }
        return jwt.encode(payload, self.secret, algorithm=self.algorithm)

    def create_refresh_token(self, user_id: str) -> str:
        now = datetime.now(tz=timezone.utc)
        payload = {
            "sub": user_id,
            "iss": self.issuer,
            "iat": now,
            "exp": now + self.refresh_ttl,
            "jti": str(uuid.uuid4()),
            "type": "refresh",
        }
        return jwt.encode(payload, self.secret, algorithm=self.algorithm)

    def create_token_pair(self, user_id: str, roles: list[str] = None) -> TokenPair:
        return TokenPair(
            access_token=self.create_access_token(user_id, roles),
            refresh_token=self.create_refresh_token(user_id),
        )

    def verify_access_token(self, token: str) -> dict:
        payload = jwt.decode(token, self.secret, algorithms=[self.algorithm],
                             audience=self.audience, issuer=self.issuer)
        if payload.get("type") != "access":
            raise jwt.InvalidTokenError("Not an access token")
        return payload

    def verify_refresh_token(self, token: str) -> dict:
        payload = jwt.decode(token, self.secret, algorithms=[self.algorithm],
                             options={"verify_aud": False}, issuer=self.issuer)
        if payload.get("type") != "refresh":
            raise jwt.InvalidTokenError("Not a refresh token")
        return payload

FastAPI Integration

FastAPI's dependency injection system makes JWT authentication elegant. A single get_current_user dependency validates the token and returns the user — any route that declares this dependency is automatically protected. The HTTPBearer security scheme auto-populates the Swagger UI's Authorize button.

from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel

app = FastAPI()
token_service = TokenService()
security = HTTPBearer()

class LoginRequest(BaseModel):
    username: str
    password: str

class UserContext(BaseModel):
    user_id: str
    roles: list[str]

async def get_current_user(
    credentials: HTTPAuthorizationCredentials = Depends(security)
) -> UserContext:
    try:
        payload = token_service.verify_access_token(credentials.credentials)
        return UserContext(user_id=payload["sub"], roles=payload.get("roles", []))
    except jwt.ExpiredSignatureError:
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,
                            detail="Token expired", headers={"WWW-Authenticate": "Bearer"})
    except jwt.InvalidTokenError:
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,
                            detail="Invalid token", headers={"WWW-Authenticate": "Bearer"})

def require_role(role: str):
    async def checker(user: UserContext = Depends(get_current_user)) -> UserContext:
        if role not in user.roles:
            raise HTTPException(status_code=status.HTTP_403_FORBIDDEN,
                                detail=f"Role '{role}' required")
        return user
    return checker

@app.post("/auth/login")
async def login(body: LoginRequest):
    # Verify password against database (simplified)
    if body.username == "admin" and body.password == "secret":
        return token_service.create_token_pair(user_id="user_1", roles=["admin"])
    raise HTTPException(status_code=401, detail="Invalid credentials")

@app.get("/me")
async def get_me(user: UserContext = Depends(get_current_user)):
    return {"user_id": user.user_id, "roles": user.roles}

@app.delete("/admin/users/{user_id}")
async def delete_user(user_id: str, user: UserContext = Depends(require_role("admin"))):
    return {"deleted": user_id}

Refresh Token Rotation

Token rotation issues a new refresh token every time the old one is used. Each refresh token has a unique JTI stored in the database. When a refresh is requested, the old JTI is invalidated and a new pair is issued. If the same refresh token is used twice (indicating theft), all tokens for that user can be immediately revoked.

import redis

class RefreshTokenStore:
    """Store refresh token JTIs in Redis."""

    def __init__(self):
        self.redis = redis.Redis.from_url(os.environ["REDIS_URL"])

    def save(self, user_id: str, jti: str, ttl_seconds: int):
        self.redis.setex(f"refresh:{user_id}:{jti}", ttl_seconds, "valid")

    def is_valid(self, user_id: str, jti: str) -> bool:
        return self.redis.exists(f"refresh:{user_id}:{jti}") == 1

    def revoke(self, user_id: str, jti: str):
        self.redis.delete(f"refresh:{user_id}:{jti}")

    def revoke_all(self, user_id: str):
        """Revoke all refresh tokens for a user (on password change, logout all)."""
        pattern = f"refresh:{user_id}:*"
        keys = self.redis.keys(pattern)
        if keys:
            self.redis.delete(*keys)

store = RefreshTokenStore()

@app.post("/auth/refresh")
async def refresh_tokens(body: dict):
    refresh_token = body.get("refresh_token")
    if not refresh_token:
        raise HTTPException(status_code=400, detail="refresh_token required")
    try:
        payload = token_service.verify_refresh_token(refresh_token)
        user_id = payload["sub"]
        old_jti = payload["jti"]
        # Verify this refresh token hasn't been used before
        if not store.is_valid(user_id, old_jti):
            # Possible token reuse — revoke everything
            store.revoke_all(user_id)
            raise HTTPException(status_code=401, detail="Token reuse detected — all sessions revoked")
        # Rotate: revoke old, issue new
        store.revoke(user_id, old_jti)
        pair = token_service.create_token_pair(user_id)
        new_payload = token_service.verify_refresh_token(pair.refresh_token)
        store.save(user_id, new_payload["jti"], ttl_seconds=30 * 24 * 3600)
        return pair
    except jwt.InvalidTokenError:
        raise HTTPException(status_code=401, detail="Invalid refresh token")

Token Revocation and Blacklisting

JWTs are stateless by design, but revocation requires some state. The most efficient approach is a short-lived blacklist: when a user logs out, add the access token's JTI to a Redis set that expires when the token would have expired naturally. This keeps the blacklist small and fast.

class TokenBlacklist:
    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client

    def revoke(self, jti: str, expires_at: datetime):
        ttl = int((expires_at - datetime.now(timezone.utc)).total_seconds())
        if ttl > 0:
            self.redis.setex(f"blacklist:{jti}", ttl, "revoked")

    def is_revoked(self, jti: str) -> bool:
        return self.redis.exists(f"blacklist:{jti}") == 1

blacklist = TokenBlacklist(redis.Redis.from_url(os.environ["REDIS_URL"]))

async def get_current_user_checked(
    credentials: HTTPAuthorizationCredentials = Depends(security)
) -> UserContext:
    try:
        payload = token_service.verify_access_token(credentials.credentials)
        jti = payload.get("jti")
        if jti and blacklist.is_revoked(jti):
            raise HTTPException(status_code=401, detail="Token has been revoked")
        return UserContext(user_id=payload["sub"], roles=payload.get("roles", []))
    except jwt.InvalidTokenError as e:
        raise HTTPException(status_code=401, detail=str(e))

@app.post("/auth/logout")
async def logout(user: UserContext = Depends(get_current_user_checked),
                 credentials: HTTPAuthorizationCredentials = Depends(security)):
    payload = token_service.verify_access_token(credentials.credentials)
    exp = datetime.fromtimestamp(payload["exp"], tz=timezone.utc)
    blacklist.revoke(payload["jti"], expires_at=exp)
    store.revoke_all(user.user_id)  # revoke all refresh tokens too
    return {"message": "Logged out successfully"}

Security Best Practices

JWT security depends on correct implementation at multiple layers. The most common vulnerabilities are algorithm confusion attacks, insecure secret management, storing tokens insecurely on the client, and missing claim validation.

# 1. NEVER allow 'none' algorithm
jwt.decode(token, key, algorithms=["HS256"])  # Good: explicit whitelist
# jwt.decode(token, key, algorithms=jwt.algorithms.get_default_algorithms())  # BAD

# 2. Use strong secrets (minimum 256-bit for HS256)
import secrets
SECRET = secrets.token_urlsafe(64)  # 512 bits of entropy

# 3. Always validate all standard claims
jwt.decode(
    token, secret,
    algorithms=["HS256"],
    audience="techoral-api",       # validate aud
    issuer="techoral.com",         # validate iss
    options={
        "verify_exp": True,        # always verify expiration
        "verify_iat": True,        # verify issued-at
        "require": ["sub", "exp", "iat", "jti"],  # require these claims
    }
)

# 4. Short access token TTL
ACCESS_TTL = timedelta(minutes=15)   # 15 min max
REFRESH_TTL = timedelta(days=7)      # 7 days for most apps

# 5. Cookie storage for refresh tokens (httpOnly, Secure, SameSite=Strict)
from fastapi import Response

@app.post("/auth/login-secure")
async def login_secure(body: LoginRequest, response: Response):
    pair = token_service.create_token_pair("user_123")
    response.set_cookie(
        key="refresh_token",
        value=pair.refresh_token,
        httponly=True,    # not accessible from JS
        secure=True,      # HTTPS only
        samesite="strict",
        max_age=7 * 24 * 3600,
    )
    return {"access_token": pair.access_token, "token_type": "bearer"}

# 6. Key rotation: support multiple valid keys
SECRET_KEYS = {
    "v2": os.environ["JWT_SECRET_V2"],
    "v1": os.environ["JWT_SECRET_V1"],  # kept for transition period
}
CURRENT_KEY_ID = "v2"

def decode_with_rotation(token: str) -> dict:
    header = jwt.get_unverified_header(token)
    kid = header.get("kid", CURRENT_KEY_ID)
    key = SECRET_KEYS.get(kid)
    if not key:
        raise jwt.InvalidTokenError(f"Unknown key ID: {kid}")
    return jwt.decode(token, key, algorithms=["HS256"])
Never store JWTs in localStorage: localStorage is accessible from JavaScript, making it vulnerable to XSS attacks. Store access tokens in memory (JavaScript variable) and refresh tokens in httpOnly cookies. In mobile apps, use the OS secure keychain (iOS Keychain, Android Keystore).