Python Kafka Consumer with confluent-kafka
The confluent-kafka library is the fastest Python Kafka client — it wraps librdkafka (a C library) and delivers much higher throughput than the pure-Python kafka-python package. Building a production Kafka consumer requires understanding consumer groups for horizontal scaling, offset commit strategies to avoid data loss or duplication, dead-letter queues for poison messages, and Schema Registry integration for Avro/Protobuf deserialization. This guide covers all these patterns with working code.
Table of Contents
Installation and Basic Consumer
pip install confluent-kafka
from confluent_kafka import Consumer, KafkaError, KafkaException
import json
import signal
import sys
running = True
def shutdown(sig, frame):
global running
running = False
signal.signal(signal.SIGTERM, shutdown)
signal.signal(signal.SIGINT, shutdown)
conf = {
"bootstrap.servers": "localhost:9092",
"group.id": "order-processor-v1",
"auto.offset.reset": "earliest", # start from beginning if no committed offset
"enable.auto.commit": False, # manual commit for exactly-once semantics
"session.timeout.ms": 30000,
"max.poll.interval.ms": 300000,
}
consumer = Consumer(conf)
consumer.subscribe(["orders.placed", "orders.updated"])
try:
while running:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
print(f"Reached end of partition {msg.partition()}")
else:
raise KafkaException(msg.error())
continue
data = json.loads(msg.value().decode("utf-8"))
print(f"[{msg.topic()}] partition={msg.partition()} offset={msg.offset()} key={msg.key()}")
print(f" payload: {data}")
# Process the message
process_order(data)
# Commit only after successful processing
consumer.commit(message=msg, asynchronous=False)
finally:
consumer.close()
def process_order(data: dict):
print(f"Processing order {data.get('order_id')}")
enable.auto.commit=True commits offsets periodically regardless of whether your processing succeeded — leading to data loss on crashes. Always use enable.auto.commit=False and commit only after you have durably processed (or stored) the message.
Consumer Groups and Partition Assignment
A consumer group allows horizontal scaling — Kafka assigns each partition to exactly one consumer in the group. Add more consumer instances to scale throughput; the maximum useful parallelism equals the number of partitions. Rebalances occur when consumers join or leave the group.
from confluent_kafka import Consumer
from confluent_kafka import TopicPartition
class RebalanceHandler:
def on_assign(self, consumer, partitions):
print(f"Assigned partitions: {[p.partition for p in partitions]}")
# Optionally seek to a specific offset on assignment
for p in partitions:
p.offset = self._load_last_offset(p.topic, p.partition)
consumer.assign(partitions)
def on_revoke(self, consumer, partitions):
print(f"Revoking partitions: {[p.partition for p in partitions]}")
# Commit offsets before giving up partitions
consumer.commit(asynchronous=False)
def on_lost(self, consumer, partitions):
print(f"Lost partitions (rebalance timeout): {[p.partition for p in partitions]}")
def _load_last_offset(self, topic: str, partition: int) -> int:
# Load from your own offset store (Redis, DB) for exactly-once
return -1001 # -1001 = OFFSET_STORED (use committed offset)
handler = RebalanceHandler()
conf = {
"bootstrap.servers": "localhost:9092",
"group.id": "order-processor-v1",
"auto.offset.reset": "earliest",
"enable.auto.commit": False,
# Cooperative sticky assignor — minimizes partition movements on rebalance
"partition.assignment.strategy": "cooperative-sticky",
}
consumer = Consumer(conf)
consumer.subscribe(["orders"], on_assign=handler.on_assign, on_revoke=handler.on_revoke)
Offset Management
Committing offsets correctly is the most critical aspect of Kafka consumer correctness. Commit too early and you lose messages on crash. Commit too late and you reprocess messages. Batch commits balance throughput and safety.
from confluent_kafka import Consumer, TopicPartition
from collections import defaultdict
import time
COMMIT_INTERVAL_SECONDS = 5
BATCH_SIZE = 500
conf = {
"bootstrap.servers": "localhost:9092",
"group.id": "order-processor-v1",
"auto.offset.reset": "earliest",
"enable.auto.commit": False,
}
consumer = Consumer(conf)
consumer.subscribe(["orders"])
pending_offsets: dict[tuple, int] = defaultdict(int)
last_commit_time = time.monotonic()
batch_count = 0
while True:
msg = consumer.poll(timeout=0.1)
if msg and not msg.error():
data = process(msg.value())
# Track highest offset per (topic, partition)
key = (msg.topic(), msg.partition())
pending_offsets[key] = msg.offset() + 1 # +1 = next offset to consume
batch_count += 1
# Commit on batch size or time interval
now = time.monotonic()
should_commit = (
batch_count >= BATCH_SIZE or
(batch_count > 0 and now - last_commit_time >= COMMIT_INTERVAL_SECONDS)
)
if should_commit:
offsets_to_commit = [
TopicPartition(topic, partition, offset)
for (topic, partition), offset in pending_offsets.items()
]
consumer.commit(offsets=offsets_to_commit, asynchronous=False)
pending_offsets.clear()
batch_count = 0
last_commit_time = now
print(f"Committed {len(offsets_to_commit)} partition offsets")
def process(value: bytes) -> dict:
import json
return json.loads(value)
Error Handling and Dead-Letter Queue
Kafka consumers must handle two classes of errors: retriable (transient network issues, broker unavailable) and non-retriable (deserialization failures, schema violations, business logic errors). Send non-retriable messages to a dead-letter topic (DLQ) to avoid blocking the consumer on a single bad message.
from confluent_kafka import Consumer, Producer, KafkaException
import json
import traceback
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
@dataclass
class DeadLetterMessage:
original_topic: str
original_partition: int
original_offset: int
error_type: str
error_message: str
failed_at: str
payload: str
dlq_producer = Producer({"bootstrap.servers": "localhost:9092"})
DLQ_TOPIC = "orders.dead-letter"
def send_to_dlq(msg, error: Exception):
dlq = DeadLetterMessage(
original_topic=msg.topic(),
original_partition=msg.partition(),
original_offset=msg.offset(),
error_type=type(error).__name__,
error_message=str(error),
failed_at=datetime.now(timezone.utc).isoformat(),
payload=msg.value().decode("utf-8", errors="replace"),
)
dlq_producer.produce(
DLQ_TOPIC,
key=msg.key(),
value=json.dumps(asdict(dlq)).encode(),
)
dlq_producer.flush()
consumer = Consumer({
"bootstrap.servers": "localhost:9092",
"group.id": "order-processor-v1",
"auto.offset.reset": "earliest",
"enable.auto.commit": False,
})
consumer.subscribe(["orders"])
MAX_RETRIES = 3
while True:
msg = consumer.poll(timeout=1.0)
if msg is None or msg.error():
continue
retries = 0
success = False
while retries < MAX_RETRIES and not success:
try:
data = json.loads(msg.value())
process_order(data)
success = True
except (json.JSONDecodeError, KeyError, ValueError) as e:
# Non-retriable: bad data — go to DLQ immediately
send_to_dlq(msg, e)
success = True # treat DLQ send as "handled"
print(f"Sent to DLQ: {e}")
except Exception as e:
retries += 1
if retries >= MAX_RETRIES:
send_to_dlq(msg, e)
print(f"Max retries exceeded, sent to DLQ: {e}")
else:
import time; time.sleep(2 ** retries) # exponential backoff
consumer.commit(message=msg, asynchronous=False)
Avro with Schema Registry
Schema Registry enforces schema compatibility across producers and consumers. Messages carry only a schema ID (4 bytes), not the full schema — the consumer fetches and caches the schema by ID, then deserializes with Avro.
pip install confluent-kafka[avro]
from confluent_kafka import DeserializingConsumer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
from confluent_kafka.serialization import StringDeserializer
schema_registry_conf = {"url": "http://localhost:8081"}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)
avro_deserializer = AvroDeserializer(
schema_registry_client,
schema_str=None, # fetch schema dynamically from registry by ID
from_dict=lambda obj, ctx: obj, # keep as dict
)
consumer_conf = {
"bootstrap.servers": "localhost:9092",
"key.deserializer": StringDeserializer("utf_8"),
"value.deserializer": avro_deserializer,
"group.id": "order-processor-avro",
"auto.offset.reset": "earliest",
"enable.auto.commit": False,
}
consumer = DeserializingConsumer(consumer_conf)
consumer.subscribe(["orders.avro"])
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
print(f"Error: {msg.error()}")
continue
order = msg.value() # already deserialized dict
print(f"Order: {order}")
consumer.commit(message=msg, asynchronous=False)
Async Processing Pattern
For I/O-bound processing (HTTP calls, DB writes), run a thread pool alongside the Kafka poll loop. Poll messages on the main thread, submit processing to the pool, and commit offsets only after the pool task completes.
from confluent_kafka import Consumer
from concurrent.futures import ThreadPoolExecutor, as_completed
import json
WORKERS = 8
consumer = Consumer({
"bootstrap.servers": "localhost:9092",
"group.id": "order-processor-threaded",
"auto.offset.reset": "earliest",
"enable.auto.commit": False,
})
consumer.subscribe(["orders"])
futures_to_msg = {}
with ThreadPoolExecutor(max_workers=WORKERS) as pool:
while True:
msg = consumer.poll(timeout=0.01)
if msg and not msg.error():
future = pool.submit(process_order_io, json.loads(msg.value()))
futures_to_msg[future] = msg
# Collect completed futures and commit
done = [f for f in list(futures_to_msg) if f.done()]
for future in done:
original_msg = futures_to_msg.pop(future)
try:
future.result()
consumer.commit(message=original_msg, asynchronous=False)
except Exception as e:
print(f"Processing failed: {e}")
# send to DLQ and commit to move past poison message
def process_order_io(data: dict):
import time
time.sleep(0.05) # simulate DB write or HTTP call
print(f"Processed {data.get('order_id')}")
Producer Basics
from confluent_kafka import Producer
import json
producer = Producer({
"bootstrap.servers": "localhost:9092",
"acks": "all", # wait for all ISR replicas to ack
"retries": 5,
"retry.backoff.ms": 500,
"compression.type": "lz4",
"batch.size": 65536,
"linger.ms": 5, # wait up to 5ms to batch messages
})
def delivery_report(err, msg):
if err:
print(f"Delivery failed: {err}")
else:
print(f"Delivered to {msg.topic()}[{msg.partition()}] @ {msg.offset()}")
# Produce a message
producer.produce(
topic="orders",
key="order-123",
value=json.dumps({"order_id": "123", "amount": 99.99}).encode(),
callback=delivery_report,
)
producer.flush() # block until all messages delivered
Frequently Asked Questions
- confluent-kafka vs kafka-python — which to use?
- Use confluent-kafka for production. It wraps librdkafka (C library) and delivers 5–10x higher throughput and lower latency than the pure-Python kafka-python. It also has first-class Schema Registry support. kafka-python is easier to install (no C dependency) but not suitable for high-throughput production use.
- How do I achieve exactly-once processing?
- True exactly-once requires: (1) disable auto-commit, (2) store your output and the committed offset atomically — e.g., write to Postgres and commit the offset in the same transaction, or use Kafka transactions if writing back to Kafka. Without atomic output+commit, you get at-least-once with idempotent processing.
- What is the right number of consumers per group?
- Maximum useful consumers = number of partitions. Extra consumers sit idle. If processing is slow, first increase partitions (requires rebalance), then add consumers up to the partition count.