Python Background Tasks: Celery Beat and APScheduler
Most production applications need recurring background work: sending daily digest emails, syncing data from external APIs, cleaning up expired records, generating reports, and running health checks. Python has two dominant approaches — Celery Beat for distributed task queues with scheduling, and APScheduler for in-process scheduling that works without a broker. This guide covers both with FastAPI integration patterns.
Table of Contents
Celery Setup with Redis
pip install celery[redis] flower redis
# celery_app.py
from celery import Celery
import os
celery = Celery(
"myapp",
broker=os.environ.get("CELERY_BROKER_URL", "redis://localhost:6379/0"),
backend=os.environ.get("CELERY_RESULT_BACKEND", "redis://localhost:6379/1"),
include=["myapp.tasks"],
)
celery.conf.update(
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="UTC",
enable_utc=True,
task_track_started=True,
task_acks_late=True, # re-queue on worker crash
worker_prefetch_multiplier=1, # fair task distribution
result_expires=3600, # delete results after 1 hour
)
# tasks.py
from celery_app import celery
import time
@celery.task(bind=True, max_retries=3, default_retry_delay=60)
def send_email(self, to: str, subject: str, body: str):
"""Send email — retries up to 3 times on failure."""
try:
# import smtplib; send email...
print(f"Sending email to {to}: {subject}")
return {"status": "sent", "to": to}
except Exception as exc:
raise self.retry(exc=exc, countdown=2 ** self.request.retries * 60)
@celery.task
def process_report(report_id: int) -> dict:
"""Generate a report synchronously."""
time.sleep(2) # simulate work
return {"report_id": report_id, "status": "complete", "rows": 1500}
# Dispatch tasks
result = send_email.delay("alice@example.com", "Welcome!", "Thanks for signing up.")
print(result.id) # task ID
print(result.status) # PENDING / STARTED / SUCCESS / FAILURE
# Wait for result (don't do this in HTTP handlers)
data = process_report.delay(42).get(timeout=30)
# Start worker
celery -A celery_app worker --loglevel=info --concurrency=4
# Start worker for specific queue
celery -A celery_app worker -Q emails --loglevel=info
Celery Beat: Periodic Tasks
# celery_app.py — add beat schedule
from celery.schedules import crontab
celery.conf.beat_schedule = {
# Every minute
"health-check-every-minute": {
"task": "myapp.tasks.health_check",
"schedule": 60.0,
},
# Every day at 8am UTC
"daily-digest-email": {
"task": "myapp.tasks.send_daily_digest",
"schedule": crontab(hour=8, minute=0),
"args": (),
},
# Every Monday at 9am
"weekly-report": {
"task": "myapp.tasks.generate_weekly_report",
"schedule": crontab(hour=9, minute=0, day_of_week="monday"),
},
# Every 15 minutes
"sync-external-data": {
"task": "myapp.tasks.sync_data",
"schedule": crontab(minute="*/15"),
"kwargs": {"source": "crm"},
},
# First day of each month at midnight
"monthly-billing": {
"task": "myapp.tasks.run_billing",
"schedule": crontab(hour=0, minute=0, day_of_month=1),
},
}
# Periodic tasks in tasks.py
@celery.task
def health_check():
import requests
try:
r = requests.get("http://localhost:8000/health", timeout=5)
return {"status": "ok", "code": r.status_code}
except Exception as e:
return {"status": "error", "error": str(e)}
@celery.task
def send_daily_digest():
from myapp.db import get_users, get_daily_stats
users = get_users(subscribed_to_digest=True)
stats = get_daily_stats()
for user in users:
send_email.delay(user.email, "Daily Digest", format_digest(stats))
return {"sent": len(users)}
# Start beat scheduler (run ONE instance only)
celery -A celery_app beat --loglevel=info
# Combined worker + beat (development only)
celery -A celery_app worker --beat --loglevel=info
One beat instance only: Run exactly one Celery Beat process per deployment. Multiple Beat processes running simultaneously will duplicate scheduled task executions. In Kubernetes, deploy Beat as a separate Deployment with
replicas: 1.
Task Routing and Priorities
# Separate queues for different task types
celery.conf.task_routes = {
"myapp.tasks.send_email": {"queue": "emails"},
"myapp.tasks.process_report": {"queue": "reports"},
"myapp.tasks.health_check": {"queue": "monitoring"},
}
# Or route by task name pattern
celery.conf.task_routes = {
"myapp.tasks.send_*": {"queue": "emails"},
"myapp.tasks.*_report": {"queue": "reports"},
}
# Dispatch to specific queue
send_email.apply_async(
args=["user@example.com", "Alert!", "Something happened"],
queue="emails",
priority=9, # 0 (low) to 9 (high) — Redis supports this
countdown=300, # delay 5 minutes
expires=3600, # discard if not executed within 1 hour
)
# Start workers for specific queues
# celery -A celery_app worker -Q emails --concurrency=8
# celery -A celery_app worker -Q reports --concurrency=2
Monitoring with Flower
# Start Flower web dashboard
celery -A celery_app flower --port=5555
# Open: http://localhost:5555
# With basic auth
celery -A celery_app flower --basic-auth=admin:secret --port=5555
# Programmatic task inspection
from celery_app import celery
inspect = celery.control.inspect()
# Active tasks
active = inspect.active()
# Reserved (queued) tasks
reserved = inspect.reserved()
# Worker statistics
stats = inspect.stats()
# Revoke (cancel) a task
celery.control.revoke(task_id="abc-123", terminate=True)
# Retry a failed task
from celery.result import AsyncResult
result = AsyncResult("abc-123")
print(result.state) # FAILURE
print(result.info) # exception info
APScheduler: In-Process Scheduling
APScheduler runs inside your Python process — no broker, no worker processes. Perfect for applications that need scheduled tasks but don't need distributed execution. Works great with FastAPI via the async scheduler.
pip install apscheduler
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.jobstores.redis import RedisJobStore
from apscheduler.executors.asyncio import AsyncIOExecutor
import asyncio
# Configure with Redis job store (persists jobs across restarts)
jobstores = {
"default": RedisJobStore(host="localhost", port=6379, db=2),
}
scheduler = AsyncIOScheduler(
jobstores=jobstores,
executors={"default": AsyncIOExecutor()},
job_defaults={"coalesce": True, "max_instances": 1},
timezone="UTC",
)
# Add jobs
async def send_digests():
print("Sending daily digests...")
async def sync_crm():
print("Syncing CRM data...")
# Interval trigger
scheduler.add_job(
sync_crm,
trigger=IntervalTrigger(minutes=15),
id="sync_crm",
replace_existing=True,
misfire_grace_time=300, # run up to 5 min late before skipping
)
# Cron trigger
scheduler.add_job(
send_digests,
trigger=CronTrigger(hour=8, minute=0),
id="daily_digest",
replace_existing=True,
)
# One-time scheduled job
from datetime import datetime, timedelta
scheduler.add_job(
lambda: print("Reminder!"),
trigger="date",
run_date=datetime.utcnow() + timedelta(hours=24),
id="one_time_reminder",
)
FastAPI Integration
from fastapi import FastAPI
from contextlib import asynccontextmanager
@asynccontextmanager
async def lifespan(app: FastAPI):
scheduler.start()
yield
scheduler.shutdown()
app = FastAPI(lifespan=lifespan)
# Trigger tasks from HTTP endpoints
@app.post("/tasks/sync")
async def trigger_sync():
scheduler.add_job(sync_crm, id="manual_sync", replace_existing=True)
return {"status": "scheduled"}
@app.get("/tasks")
async def list_jobs():
return [
{
"id": job.id,
"next_run": str(job.next_run_time),
"trigger": str(job.trigger),
}
for job in scheduler.get_jobs()
]
@app.delete("/tasks/{job_id}")
async def remove_job(job_id: str):
scheduler.remove_job(job_id)
return {"removed": job_id}
# Celery + FastAPI — dispatch tasks from HTTP handlers
from celery_app import celery
from myapp.tasks import send_email, process_report
@app.post("/reports/{report_id}/generate")
async def generate_report(report_id: int):
task = process_report.delay(report_id)
return {"task_id": task.id, "status": "queued"}
@app.get("/reports/tasks/{task_id}")
async def get_task_status(task_id: str):
result = celery.AsyncResult(task_id)
return {"task_id": task_id, "status": result.status, "result": result.result}
Celery Beat vs APScheduler
| Feature | Celery Beat | APScheduler |
|---|---|---|
| Requires broker | Yes (Redis/RabbitMQ) | No (optional Redis store) |
| Distributed workers | Yes | No |
| In-process | No | Yes |
| Task persistence | Yes (broker) | Optional (jobstore) |
| Best for | High-volume, distributed | Simple, single-process |
| FastAPI integration | Via .delay() | Native async |
Frequently Asked Questions
- Can I use Celery without Redis?
- Yes. Celery supports RabbitMQ, Amazon SQS, and even a database broker (via
celery[sqlalchemy]). Redis is the most popular choice because it also serves as a result backend and is easy to set up. - What happens if a Beat task misses its scheduled time?
- By default, missed tasks are skipped. Set
task_default_rate_limitand configuremisfire_grace_time(APScheduler) ortask_annotations(Celery) to control behavior. Celery'scoalesce=Trueruns missed tasks once instead of piling them up. - How do I prevent duplicate task execution?
- Celery Beat with only one Beat process prevents duplicate scheduling. For task-level deduplication, use a task lock in Redis: check and set a key before executing, clear it on completion. Libraries like
celery-onceautomate this pattern.