Celery: Distributed Task Queue with Python and Redis (2026)
Published June 6, 2026 • 14 min read
Celery is the most widely deployed distributed task queue in the Python ecosystem. It decouples long-running work — sending emails, processing images, generating reports, syncing with third-party APIs — from your HTTP request cycle, keeping response times fast and servers responsive. This guide covers the full Celery stack: Redis broker, workers, Beat scheduler, retry strategies, Flower monitoring, Django integration, and the canvas primitives that let you build complex task workflows.
Celery Architecture
Understanding the four components prevents most production surprises:
| Component | Role | Common Choice |
|---|---|---|
| Broker | Receives task messages from producers; queues them for workers | Redis, RabbitMQ |
| Worker | Pulls tasks from broker, executes them | Celery worker process |
| Result Backend | Stores task results/status for retrieval by producers | Redis, PostgreSQL, Django DB |
| Beat | Periodic task scheduler — puts tasks on the broker on a schedule | Celery Beat process |
Setup with Redis
pip install celery[redis] redis kombu
# Run Redis locally (Docker)
docker run -d -p 6379:6379 --name redis redis:7-alpine
# celery_app.py
from celery import Celery
app = Celery(
'myproject',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/1',
include=['myproject.tasks.email', 'myproject.tasks.media'],
)
app.conf.update(
task_serializer = 'json',
accept_content = ['json'],
result_serializer = 'json',
timezone = 'UTC',
enable_utc = True,
task_track_started = True,
task_acks_late = True, # ack after execution, not on receipt
worker_prefetch_multiplier = 1, # one task per worker at a time (safer)
result_expires = 3600, # delete results after 1 hour
)
# Start a worker (development)
celery -A celery_app worker --loglevel=info
# Start with concurrency (production: match CPU cores or 2× for I/O tasks)
celery -A celery_app worker --concurrency=4 --loglevel=warning
# Inspect active workers
celery -A celery_app inspect active
Defining and Calling Tasks
Three real-world tasks — sending email, processing an uploaded image, and exporting data:
# tasks/email.py
from celery_app import app
from celery.utils.log import get_task_logger
import smtplib
from email.message import EmailMessage
logger = get_task_logger(__name__)
@app.task(bind=True, name='tasks.send_welcome_email',
autoretry_for=(smtplib.SMTPException,),
retry_backoff=True, max_retries=3)
def send_welcome_email(self, user_id: int, email: str, username: str) -> dict:
logger.info(f"Sending welcome email to {email} (attempt {self.request.retries + 1})")
msg = EmailMessage()
msg['Subject'] = f"Welcome to Techoral, {username}!"
msg['From'] = 'noreply@techoral.com'
msg['To'] = email
msg.set_content(f"Hi {username},\n\nYour account is ready.")
with smtplib.SMTP('smtp.example.com', 587) as smtp:
smtp.starttls()
smtp.login('user', 'pass')
smtp.send_message(msg)
return {'status': 'sent', 'user_id': user_id}
# tasks/media.py
from celery_app import app
from PIL import Image
import os
@app.task(name='tasks.process_image', bind=True, max_retries=2)
def process_image(self, image_path: str, output_dir: str) -> dict:
"""Resize uploaded image to thumbnail and web sizes."""
try:
with Image.open(image_path) as img:
sizes = {'thumbnail': (150, 150), 'web': (800, 600)}
results = {}
for name, size in sizes.items():
img_copy = img.copy()
img_copy.thumbnail(size, Image.LANCZOS)
out_path = os.path.join(output_dir, f"{name}_{os.path.basename(image_path)}")
img_copy.save(out_path, optimize=True, quality=85)
results[name] = out_path
return {'status': 'ok', 'files': results}
except Exception as exc:
raise self.retry(exc=exc, countdown=30)
# Calling tasks
from tasks.email import send_welcome_email
from tasks.media import process_image
# Fire and forget
send_welcome_email.delay(user_id=42, email='alice@example.com', username='alice')
# Get a result handle
result = process_image.delay('/uploads/photo.jpg', '/processed/')
# Poll for result (blocks until done or timeout)
output = result.get(timeout=60)
# Schedule for a specific time
from datetime import datetime, timezone, timedelta
send_welcome_email.apply_async(
args=[42, 'alice@example.com', 'alice'],
eta=datetime.now(timezone.utc) + timedelta(minutes=5),
queue='email', # route to a specific queue
)
Celery Beat — Periodic Tasks
# celery_app.py — add beat schedule
from celery.schedules import crontab
app.conf.beat_schedule = {
'cleanup-expired-sessions': {
'task': 'tasks.maintenance.cleanup_sessions',
'schedule': crontab(hour=3, minute=0), # daily at 3 AM UTC
},
'send-weekly-digest': {
'task': 'tasks.email.send_weekly_digest',
'schedule': crontab(day_of_week='monday', hour=9, minute=0),
},
'refresh-cache-every-5min': {
'task': 'tasks.cache.refresh_homepage_cache',
'schedule': 300.0, # seconds (float)
},
'process-pending-exports': {
'task': 'tasks.exports.process_pending',
'schedule': crontab(minute='*/15'), # every 15 minutes
'args': (),
'kwargs': {'batch_size': 50},
},
}
# Start Beat (run exactly ONE Beat process per cluster)
celery -A celery_app beat --loglevel=info
# Combined worker + beat for development only
celery -A celery_app worker --beat --loglevel=info
Retries with Exponential Backoff
import requests
from celery_app import app
@app.task(
bind=True,
name='tasks.sync_external_api',
# autoretry_for: exception classes that trigger an automatic retry
autoretry_for=(requests.exceptions.RequestException, ConnectionError),
# retry_backoff: exponential backoff — waits 2s, 4s, 8s, 16s...
retry_backoff=2,
# retry_backoff_max: cap the wait time at 600 seconds
retry_backoff_max=600,
# retry_jitter: add randomness to avoid thundering herd
retry_jitter=True,
max_retries=5,
default_retry_delay=60,
)
def sync_external_api(self, resource_id: int) -> dict:
resp = requests.get(
f'https://api.partner.com/resources/{resource_id}',
timeout=10,
)
resp.raise_for_status()
data = resp.json()
# ... process data ...
return {'synced': resource_id, 'status': data['status']}
# Manual retry inside exception handler
@app.task(bind=True, max_retries=3)
def call_flaky_service(self, payload: dict):
try:
result = external_service(payload)
return result
except TemporaryServiceError as exc:
# Exponential: 60, 120, 240 seconds
raise self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))
Flower — Real-Time Monitoring
pip install flower
# Start Flower (port 5555)
celery -A celery_app flower --port=5555
# With basic auth
celery -A celery_app flower --basic-auth=admin:password --port=5555
# Flower as a Docker service
docker run -p 5555:5555 mher/flower \
celery flower --broker=redis://redis:6379/0
Flower shows active workers, task throughput, success/failure rates, active vs. queued tasks, and lets you revoke tasks from the UI. Access it at http://localhost:5555.
Django + Celery Integration
# myproject/celery.py
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
app = Celery('myproject')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks() # finds tasks.py in every INSTALLED_APPS app
# myproject/__init__.py
from .celery import app as celery_app
__all__ = ('celery_app',)
# settings.py — Celery config with CELERY_ prefix
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'django-db' # stores results in Django ORM
CELERY_CACHE_BACKEND = 'default'
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TIMEZONE = 'UTC'
# Add to INSTALLED_APPS for django-db backend
# 'django_celery_results',
# 'django_celery_beat', # if using Beat with DB-backed schedule
# myapp/tasks.py
from celery import shared_task
from django.core.mail import send_mail
from .models import Order
@shared_task(bind=True, max_retries=3, autoretry_for=(Exception,), retry_backoff=True)
def fulfill_order(self, order_id: int) -> str:
order = Order.objects.select_related('user').get(pk=order_id)
# ... process payment, update inventory ...
order.status = 'fulfilled'
order.save(update_fields=['status'])
send_mail(
subject=f'Order #{order_id} Confirmed',
message=f'Hi {order.user.first_name}, your order is on the way!',
from_email='orders@techoral.com',
recipient_list=[order.user.email],
)
return f'Order {order_id} fulfilled'
Task Canvas: chain, group, chord
from celery import chain, group, chord
from tasks.media import process_image, generate_thumbnail, upload_to_cdn
from tasks.email import send_notification
# chain: sequential pipeline — output of each step feeds into the next
pipeline = chain(
process_image.s('/uploads/photo.jpg', '/tmp/'),
generate_thumbnail.s(),
upload_to_cdn.s(bucket='media-bucket'),
)
pipeline.delay()
# group: parallel execution of independent tasks
parallel = group(
process_image.s(f'/uploads/img{i}.jpg', '/tmp/')
for i in range(10)
)
result = parallel.delay()
outputs = result.get() # list of 10 results
# chord: parallel tasks + callback when ALL complete
# Great for: fan-out processing with a final aggregation step
header = group(
process_image.s(f'/uploads/img{i}.jpg', '/tmp/')
for i in range(10)
)
callback = send_notification.s(user_id=42, message="All images processed!")
workflow = chord(header)(callback)
workflow.delay()
Frequently Asked Questions
- What is the difference between
.delay()and.apply_async()? .delay(*args, **kwargs)is a shortcut for.apply_async(args, kwargs)with no extra options. Use.apply_async()when you need to seteta,countdown,expires,queue,priority, or custom headers. Both return anAsyncResultyou can poll.- How do I route different tasks to different queues?
- Define a
task_routesdict in your Celery config mapping task names to queue names. Start separate workers consuming from each queue with--queues=email,default. This lets you scale email workers independently from CPU-heavy image processing workers. - How do I handle task idempotency?
- Design tasks to be safe to run multiple times. For database writes, use
update_or_create()instead ofcreate(). For external API calls, check state before sending. Store a deduplication key (e.g.,celery_task_id) on your model and skip execution if already processed. - Should I use Celery or FastAPI's BackgroundTasks?
- Use
BackgroundTasksfor lightweight work that must run within the same process and doesn't need retry logic or result tracking (e.g., logging, analytics pings). Use Celery when tasks are heavy, need retries, need to be scheduled, need result persistence, or need to run on separate worker machines. - What causes workers to silently swallow exceptions?
- Setting
task_acks_early=True(the default) acknowledges the message before execution — if the worker crashes mid-task, the message is lost. Settask_acks_late=Trueto acknowledge only after successful completion, which ensures at-least-once delivery (paired with idempotent tasks).