Python Pika: RabbitMQ Messaging with AMQP
Pika is the official Python AMQP client for RabbitMQ. RabbitMQ's exchange-queue-binding model is more flexible than Kafka for task distribution patterns: work queues, pub/sub fan-out, topic routing, and RPC-over-messaging. This guide implements the four main messaging patterns with Pika, adds dead-letter exchange (DLX) handling for failed messages, and shows the async aio-pika library for FastAPI integration.
Table of Contents
Installation and Connection
pip install pika aio-pika
import pika
import os
RABBITMQ_URL = os.environ.get("RABBITMQ_URL", "amqp://guest:guest@localhost:5672/")
def get_connection() -> pika.BlockingConnection:
params = pika.URLParameters(RABBITMQ_URL)
params.heartbeat = 600
params.blocked_connection_timeout = 300
return pika.BlockingConnection(params)
# Simple test
conn = get_connection()
channel = conn.channel()
print("Connected to RabbitMQ")
conn.close()
Work Queue (Task Queue)
A work queue distributes tasks across multiple worker processes. Each message is delivered to exactly one worker. Set prefetch_count=1 so workers only receive one task at a time — preventing fast workers from hoarding messages while slow workers idle.
import pika
import json
import time
QUEUE = "task_queue"
def publisher():
"""Send tasks to the work queue."""
conn = get_connection()
ch = conn.channel()
ch.queue_declare(
queue=QUEUE,
durable=True, # survive broker restarts
)
for i in range(10):
task = {"task_id": i, "payload": f"process item {i}", "priority": i % 3}
ch.basic_publish(
exchange="",
routing_key=QUEUE,
body=json.dumps(task).encode(),
properties=pika.BasicProperties(
delivery_mode=pika.DeliveryMode.Persistent, # persist to disk
),
)
print(f"Published task {i}")
conn.close()
def worker():
"""Process tasks from the work queue."""
conn = get_connection()
ch = conn.channel()
ch.queue_declare(queue=QUEUE, durable=True)
ch.basic_qos(prefetch_count=1) # process one message at a time
def callback(ch, method, properties, body):
task = json.loads(body)
print(f"Processing task {task['task_id']}...")
try:
time.sleep(0.1) # simulate work
print(f"Task {task['task_id']} complete")
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
print(f"Task failed: {e}")
# nack and requeue=False → goes to DLX if configured
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
ch.basic_consume(queue=QUEUE, on_message_callback=callback)
print("Worker started. Waiting for tasks...")
ch.start_consuming()
if __name__ == "__main__":
import sys
if sys.argv[1] == "publish":
publisher()
else:
worker()
Publish/Subscribe with Fanout
A fanout exchange delivers every message to all bound queues. Use it for broadcasting events — order placed notification should go to email service, SMS service, analytics service, and inventory service simultaneously.
import pika
import json
EXCHANGE = "order.events"
def setup_fanout(channel):
channel.exchange_declare(exchange=EXCHANGE, exchange_type="fanout", durable=True)
def publish_event(event_type: str, payload: dict):
conn = get_connection()
ch = conn.channel()
setup_fanout(ch)
message = {"event": event_type, "data": payload}
ch.basic_publish(
exchange=EXCHANGE,
routing_key="", # ignored by fanout
body=json.dumps(message).encode(),
properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent),
)
conn.close()
print(f"Published event: {event_type}")
def subscribe(service_name: str, handler):
"""Each service gets its own durable queue bound to the fanout exchange."""
conn = get_connection()
ch = conn.channel()
setup_fanout(ch)
queue_name = f"{EXCHANGE}.{service_name}"
ch.queue_declare(queue=queue_name, durable=True)
ch.queue_bind(exchange=EXCHANGE, queue=queue_name)
def callback(ch, method, properties, body):
event = json.loads(body)
try:
handler(event)
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
print(f"{service_name} failed to process: {e}")
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
ch.basic_qos(prefetch_count=10)
ch.basic_consume(queue=queue_name, on_message_callback=callback)
print(f"{service_name} listening on {queue_name}")
ch.start_consuming()
# Usage
publish_event("order.placed", {"order_id": "ord-123", "amount": 99.99})
# subscribe("email-service", lambda e: print(f"Email: {e}"))
# subscribe("analytics", lambda e: print(f"Analytics: {e}"))
Topic Routing
Topic exchanges route messages by pattern-matching the routing key. * matches one word, # matches zero or more words. This enables fine-grained routing: send order.europe.placed to European order handlers only.
import pika
import json
def setup_topic_exchange(ch, exchange: str = "events.topic"):
ch.exchange_declare(exchange=exchange, exchange_type="topic", durable=True)
return exchange
def publish_topic(routing_key: str, payload: dict, exchange: str = "events.topic"):
conn = get_connection()
ch = conn.channel()
setup_topic_exchange(ch, exchange)
ch.basic_publish(
exchange=exchange,
routing_key=routing_key,
body=json.dumps(payload).encode(),
properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent),
)
conn.close()
def subscribe_topic(queue: str, binding_pattern: str, handler, exchange: str = "events.topic"):
conn = get_connection()
ch = conn.channel()
setup_topic_exchange(ch, exchange)
ch.queue_declare(queue=queue, durable=True)
ch.queue_bind(exchange=exchange, queue=queue, routing_key=binding_pattern)
def callback(ch, method, properties, body):
data = json.loads(body)
try:
handler(data, method.routing_key)
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
ch.basic_consume(queue=queue, on_message_callback=callback)
ch.start_consuming()
# Routing examples:
# publish_topic("order.europe.placed", {...})
# publish_topic("order.asia.cancelled", {...})
# subscribe_topic("all-orders", "order.#", handler) # all order events
# subscribe_topic("europe-orders", "order.europe.*", h) # only Europe events
# subscribe_topic("placed-orders", "*.*.placed", h) # only placed events
Dead-Letter Exchange
Configure a dead-letter exchange (DLX) to capture failed messages — those that were rejected (nacked without requeue), expired, or exceeded the queue's max-length. This prevents message loss and gives operators visibility into processing failures.
import pika
import json
def setup_dlx(ch):
"""Create DLX exchange and dead-letter queue."""
# Dead-letter exchange
ch.exchange_declare(exchange="dlx", exchange_type="direct", durable=True)
# Dead-letter queue — operators inspect this for failed messages
ch.queue_declare(
queue="dead.letters",
durable=True,
arguments={
"x-message-ttl": 7 * 24 * 60 * 60 * 1000, # keep 7 days
},
)
ch.queue_bind(exchange="dlx", queue="dead.letters", routing_key="failed")
def setup_work_queue_with_dlx(ch, queue_name: str):
"""Work queue that sends failures to DLX."""
setup_dlx(ch)
ch.queue_declare(
queue=queue_name,
durable=True,
arguments={
"x-dead-letter-exchange": "dlx",
"x-dead-letter-routing-key": "failed",
"x-message-ttl": 60 * 60 * 1000, # messages expire after 1h
"x-max-length": 100_000, # max queue depth
},
)
return queue_name
conn = get_connection()
ch = conn.channel()
setup_work_queue_with_dlx(ch, "orders.processing")
# Any message nacked with requeue=False or expired goes to dead.letters queue
# Monitor dead.letters to detect systematic failures
conn.close()
Async with aio-pika and FastAPI
import asyncio
import json
import aio_pika
from contextlib import asynccontextmanager
from fastapi import FastAPI
_connection: aio_pika.abc.AbstractConnection | None = None
_channel: aio_pika.abc.AbstractChannel | None = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global _connection, _channel
_connection = await aio_pika.connect_robust(
"amqp://guest:guest@localhost/",
heartbeat=600,
)
_channel = await _connection.channel()
await _channel.set_qos(prefetch_count=10)
yield
await _connection.close()
app = FastAPI(lifespan=lifespan)
@app.post("/orders")
async def create_order(order: dict):
await _channel.default_exchange.publish(
aio_pika.Message(
body=json.dumps(order).encode(),
delivery_mode=aio_pika.DeliveryMode.PERSISTENT,
),
routing_key="orders",
)
return {"status": "queued"}
async def start_consumer():
conn = await aio_pika.connect_robust("amqp://guest:guest@localhost/")
channel = await conn.channel()
await channel.set_qos(prefetch_count=5)
queue = await channel.declare_queue("orders", durable=True)
async with queue.iterator() as queue_iter:
async for message in queue_iter:
async with message.process():
order = json.loads(message.body)
print(f"Processing order: {order}")
await asyncio.sleep(0.1) # async work here
Production Patterns
import pika
import time
import logging
log = logging.getLogger(__name__)
class RobustConsumer:
"""Auto-reconnecting RabbitMQ consumer."""
def __init__(self, url: str, queue: str, handler):
self.url = url
self.queue = queue
self.handler = handler
def run(self):
while True:
try:
conn = pika.BlockingConnection(pika.URLParameters(self.url))
ch = conn.channel()
ch.queue_declare(queue=self.queue, durable=True)
ch.basic_qos(prefetch_count=1)
def callback(ch, method, properties, body):
try:
self.handler(body)
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
log.exception("Handler failed")
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
ch.basic_consume(queue=self.queue, on_message_callback=callback)
log.info(f"Consuming from {self.queue}")
ch.start_consuming()
except pika.exceptions.AMQPConnectionError as e:
log.warning(f"Connection lost: {e}. Reconnecting in 5s...")
time.sleep(5)
except KeyboardInterrupt:
break
Frequently Asked Questions
- RabbitMQ vs Kafka — when to use each?
- Use RabbitMQ for task queues, RPC, complex routing, and when messages should be deleted after consumption. Use Kafka for high-throughput event streaming, log aggregation, replay, and when consumers need to process historical events. RabbitMQ is easier to set up; Kafka scales to millions of events/second.
- What is the difference between ack and nack?
basic_acktells RabbitMQ the message was processed successfully — it is removed from the queue.basic_nack(requeue=True)returns it to the front of the queue for retry.basic_nack(requeue=False)discards it (goes to DLX if configured). Never leave messages unacked — the queue holds them in memory until the connection closes.- How do I monitor queue depth?
- Use the RabbitMQ Management API (
http://localhost:15672/api/queues) or therabbitmq-managementplugin. For production, export RabbitMQ metrics to Prometheus viarabbitmq_prometheusplugin and alert when queue depth exceeds N messages.