Python FastAPI Background Tasks and Lifespan Events
FastAPI provides three mechanisms for background work: the built-in BackgroundTasks for lightweight post-response work (sending emails, logging), the lifespan context manager for startup/shutdown hooks (opening DB pools, starting background workers), and asyncio.create_task() for long-running concurrent tasks that outlive a single request. This guide covers all three with practical patterns for periodic jobs, health checks, and resource lifecycle management.
Table of Contents
BackgroundTasks
BackgroundTasks runs after the response is sent to the client. Use it for work that doesn't need to block the response: sending email confirmations, writing audit logs, updating search indexes. The tasks run in the same process and event loop — don't use for CPU-heavy work or tasks that must survive server restarts.
import asyncio
from fastapi import FastAPI, BackgroundTasks, Depends
from pydantic import BaseModel
app = FastAPI()
async def send_welcome_email(email: str, name: str):
"""Runs after response is sent — doesn't block the client."""
await asyncio.sleep(0) # yield to event loop
print(f"Sending welcome email to {email} for {name}")
# await email_client.send(to=email, subject="Welcome!", ...)
def log_signup(user_id: int, source: str):
"""Sync functions also work — run in a thread pool automatically."""
print(f"Logging signup: user_id={user_id} source={source}")
# analytics_db.insert(...)
class SignupRequest(BaseModel):
email: str
name: str
source: str = "organic"
@app.post("/signup", status_code=201)
async def signup(req: SignupRequest, background_tasks: BackgroundTasks):
# Create user synchronously
user_id = await create_user(req.email, req.name)
# Schedule background work (runs after this handler returns)
background_tasks.add_task(send_welcome_email, req.email, req.name)
background_tasks.add_task(log_signup, user_id, req.source)
# Response returned immediately — email and log happen in background
return {"user_id": user_id, "email": req.email}
async def create_user(email: str, name: str) -> int:
return 42 # stub
asyncio.to_thread() or a thread pool instead.
Lifespan Context Manager
The lifespan parameter replaces the deprecated @app.on_event("startup") / @app.on_event("shutdown") decorators. Code before yield runs at startup; code after yield runs at shutdown. Use it to open connection pools, load ML models, start background consumers, and ensure clean teardown.
import asyncio
from contextlib import asynccontextmanager
from fastapi import FastAPI
import httpx
import redis.asyncio as aioredis
# App-level shared resources
_http_client: httpx.AsyncClient | None = None
_redis: aioredis.Redis | None = None
_background_tasks: list[asyncio.Task] = []
@asynccontextmanager
async def lifespan(app: FastAPI):
global _http_client, _redis
# ---- STARTUP ----
print("Starting up...")
# HTTP client with connection pool
_http_client = httpx.AsyncClient(
timeout=httpx.Timeout(connect=5, read=30, write=10, pool=5),
limits=httpx.Limits(max_connections=100, max_keepalive_connections=20),
)
# Redis connection pool
_redis = aioredis.from_url("redis://localhost:6379", decode_responses=True, max_connections=20)
# Start background worker
task = asyncio.create_task(background_cache_warmer())
_background_tasks.append(task)
print("Startup complete")
yield # Application is running
# ---- SHUTDOWN ----
print("Shutting down...")
# Cancel background tasks gracefully
for t in _background_tasks:
t.cancel()
await asyncio.gather(*_background_tasks, return_exceptions=True)
# Close connection pools
await _http_client.aclose()
await _redis.aclose()
print("Shutdown complete")
app = FastAPI(lifespan=lifespan)
async def background_cache_warmer():
"""Long-running background coroutine started at app startup."""
while True:
try:
print("Warming cache...")
await asyncio.sleep(300) # run every 5 minutes
except asyncio.CancelledError:
print("Cache warmer cancelled")
break
# Access shared resources in handlers
@app.get("/status")
async def status():
ping = await _redis.ping()
return {"redis": "ok" if ping else "error"}
asyncio.create_task() for Long-Running Jobs
import asyncio
from fastapi import FastAPI, Request
import uuid
app = FastAPI()
_running_jobs: dict[str, asyncio.Task] = {}
async def long_running_report(job_id: str, params: dict):
"""Simulates a report generation job that takes minutes."""
print(f"[{job_id}] Starting report generation")
try:
await asyncio.sleep(60) # simulate work
print(f"[{job_id}] Report complete")
# Store result in Redis or DB
except asyncio.CancelledError:
print(f"[{job_id}] Job cancelled")
raise
@app.post("/reports/generate")
async def generate_report(params: dict):
job_id = str(uuid.uuid4())
task = asyncio.create_task(long_running_report(job_id, params))
_running_jobs[job_id] = task
# Clean up completed tasks
task.add_done_callback(lambda t: _running_jobs.pop(job_id, None))
return {"job_id": job_id, "status": "running"}
@app.get("/reports/{job_id}/status")
async def job_status(job_id: str):
task = _running_jobs.get(job_id)
if not task:
return {"status": "not_found_or_complete"}
if task.done():
exc = task.exception()
return {"status": "failed", "error": str(exc)} if exc else {"status": "complete"}
return {"status": "running"}
@app.delete("/reports/{job_id}")
async def cancel_job(job_id: str):
task = _running_jobs.get(job_id)
if task and not task.done():
task.cancel()
return {"status": "cancelled"}
return {"status": "not_found"}
Periodic Background Jobs
import asyncio
from contextlib import asynccontextmanager
from fastapi import FastAPI
import time
async def periodic(func, interval_seconds: float, name: str = ""):
"""Generic periodic task runner with error isolation."""
while True:
start = time.monotonic()
try:
await func()
except asyncio.CancelledError:
break
except Exception as e:
print(f"Periodic task {name} failed: {e}")
# Sleep for remaining interval (accounting for execution time)
elapsed = time.monotonic() - start
await asyncio.sleep(max(0, interval_seconds - elapsed))
async def cleanup_expired_sessions():
print("Cleaning expired sessions...")
# await db.execute("DELETE FROM sessions WHERE expires_at < NOW()")
async def refresh_exchange_rates():
print("Refreshing exchange rates...")
# rates = await forex_api.get_rates()
# await cache.set("exchange_rates", rates, ex=3600)
async def aggregate_analytics():
print("Aggregating hourly analytics...")
_tasks: list[asyncio.Task] = []
@asynccontextmanager
async def lifespan(app: FastAPI):
_tasks.append(asyncio.create_task(periodic(cleanup_expired_sessions, 300, "session-cleanup")))
_tasks.append(asyncio.create_task(periodic(refresh_exchange_rates, 3600, "exchange-rates")))
_tasks.append(asyncio.create_task(periodic(aggregate_analytics, 3600, "analytics")))
yield
for t in _tasks:
t.cancel()
await asyncio.gather(*_tasks, return_exceptions=True)
app = FastAPI(lifespan=lifespan)
APScheduler Integration
pip install apscheduler
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from contextlib import asynccontextmanager
from fastapi import FastAPI
scheduler = AsyncIOScheduler(timezone="Asia/Kolkata")
async def daily_digest():
print("Sending daily digest emails...")
async def hourly_cleanup():
print("Running hourly cleanup...")
async def weekly_report():
print("Generating weekly report...")
@asynccontextmanager
async def lifespan(app: FastAPI):
# Schedule jobs with cron expressions
scheduler.add_job(daily_digest, CronTrigger(hour=9, minute=0)) # 9am daily
scheduler.add_job(hourly_cleanup, CronTrigger(minute=0)) # every hour
scheduler.add_job(weekly_report, CronTrigger(day_of_week="mon", hour=8)) # Monday 8am
scheduler.start()
yield
scheduler.shutdown(wait=False)
app = FastAPI(lifespan=lifespan)
@app.get("/jobs")
def list_jobs():
return [
{"id": job.id, "name": job.name, "next_run": str(job.next_run_time)}
for job in scheduler.get_jobs()
]
Sharing State Between Requests and Background
from fastapi import FastAPI, Request
from contextlib import asynccontextmanager
import asyncio
# Use app.state for shared mutable state
@asynccontextmanager
async def lifespan(app: FastAPI):
app.state.cache = {}
app.state.job_results = {}
app.state.metrics = {"requests": 0, "errors": 0}
yield
app = FastAPI(lifespan=lifespan)
@app.middleware("http")
async def track_requests(request: Request, call_next):
request.app.state.metrics["requests"] += 1
try:
response = await call_next(request)
return response
except Exception:
request.app.state.metrics["errors"] += 1
raise
@app.get("/metrics")
async def get_metrics(request: Request):
return request.app.state.metrics
Production Patterns
import asyncio
from contextlib import asynccontextmanager
# Pattern: Supervisor that restarts crashed background tasks
class TaskSupervisor:
def __init__(self):
self._tasks: dict[str, tuple] = {} # name -> (coro_factory, task)
def add(self, name: str, coro_factory, restart_delay: float = 5.0):
self._tasks[name] = (coro_factory, restart_delay, None)
async def start(self):
for name in self._tasks:
await self._spawn(name)
async def _spawn(self, name: str):
coro_factory, delay, _ = self._tasks[name]
task = asyncio.create_task(self._supervised(name, coro_factory, delay))
self._tasks[name] = (coro_factory, delay, task)
async def _supervised(self, name, factory, delay):
while True:
try:
await factory()
except asyncio.CancelledError:
break
except Exception as e:
print(f"Task {name} crashed: {e}. Restarting in {delay}s...")
await asyncio.sleep(delay)
async def stop(self):
for name, (_, _, task) in self._tasks.items():
if task:
task.cancel()
tasks = [t for _, _, t in self._tasks.values() if t]
await asyncio.gather(*tasks, return_exceptions=True)
Frequently Asked Questions
- BackgroundTasks vs Celery — when to use each?
- Use BackgroundTasks for lightweight fire-and-forget tasks that run within the same process, don't need retry logic, and can be lost on server restart (email notifications, cache invalidation). Use Celery when you need durability, retries, rate limiting, distributed execution, or tasks that might take minutes.
- How do I run CPU-heavy work in a background task?
- Never run CPU-heavy code in an async function — it blocks the event loop and prevents other requests from being served. Use
await asyncio.to_thread(cpu_function, args)to run it in a thread pool, or useProcessPoolExecutorvialoop.run_in_executor()for true parallel CPU execution. - How do I test lifespan code?
- Use
httpx.AsyncClient(app=app, base_url="http://test")as an async context manager — it triggers the lifespan startup and shutdown automatically. Or usefrom fastapi.testclient import TestClientwithwith TestClient(app) as client:— the with block triggers lifespan events.