Python Structlog: Structured Logging for Production

structlog transforms Python logging from unstructured text strings into machine-readable JSON events. In production, structured logs are instantly queryable in Datadog, ELK, or CloudWatch without regex parsing. structlog wraps the standard library logging module so you can adopt it incrementally, and its processor pipeline gives you full control over every log event — censoring passwords, injecting trace IDs, and dropping health-check noise before the event hits the output stream.

Installation and Basic Setup

pip install structlog
import logging
import structlog

structlog.configure(
    processors=[
        structlog.contextvars.merge_contextvars,
        structlog.processors.add_log_level,
        structlog.processors.TimeStamper(fmt="iso", utc=True),
        structlog.processors.StackInfoRenderer(),
        structlog.processors.format_exc_info,
        structlog.processors.UnicodeDecoder(),
        structlog.processors.JSONRenderer(),
    ],
    wrapper_class=structlog.make_filtering_bound_logger(logging.DEBUG),
    context_class=dict,
    logger_factory=structlog.PrintLoggerFactory(),
)

log = structlog.get_logger()

log.info("server_started", host="0.0.0.0", port=8000, env="production")
# {"event":"server_started","host":"0.0.0.0","port":8000,"env":"production",
#  "level":"info","timestamp":"2026-06-14T10:00:00.000000Z"}

log.warning("high_memory", used_mb=4800, threshold_mb=5000, service="api")
log.error("db_connection_failed", host="db.internal", error="timeout after 30s")
Key benefit: Every key-value pair becomes a queryable field. log.info("request", status=500, duration_ms=1200) lets you query status:500 AND duration_ms:>500 in Datadog instantly — no regex, no parsing pipeline.

Processor Pipeline

A structlog processor is a callable that receives (logger, method, event_dict) and returns a modified event_dict, or raises structlog.DropEvent to discard the log entirely. Processors run in order, making it easy to compose behaviour: censor secrets, inject request IDs, or drop noise, all in separate focused functions.

import structlog
import uuid


def add_request_id(logger, method, event_dict: dict) -> dict:
    if "request_id" not in event_dict:
        event_dict["request_id"] = str(uuid.uuid4())
    return event_dict


def add_service_name(service: str):
    def processor(logger, method, event_dict: dict) -> dict:
        event_dict["service"] = service
        return event_dict
    return processor


def censor_sensitive_fields(logger, method, event_dict: dict) -> dict:
    sensitive = {"password", "token", "secret", "api_key", "authorization"}
    for key in list(event_dict.keys()):
        if key.lower() in sensitive:
            event_dict[key] = "***REDACTED***"
    return event_dict


def drop_health_checks(logger, method, event_dict: dict) -> dict:
    if event_dict.get("path") in {"/health", "/ping", "/metrics", "/readyz"}:
        raise structlog.DropEvent()
    return event_dict


structlog.configure(
    processors=[
        structlog.contextvars.merge_contextvars,
        add_request_id,
        add_service_name("order-service"),
        censor_sensitive_fields,
        drop_health_checks,
        structlog.processors.add_log_level,
        structlog.processors.TimeStamper(fmt="iso", utc=True),
        structlog.processors.format_exc_info,
        structlog.processors.JSONRenderer(),
    ],
    wrapper_class=structlog.make_filtering_bound_logger(logging.INFO),
    logger_factory=structlog.PrintLoggerFactory(),
)

Bound Context Variables

Use log.bind() to create child loggers that automatically include extra key-value pairs in every subsequent call. For async code, use structlog.contextvars which stores context in Python's contextvars.ContextVar — propagated across await boundaries without passing loggers around.

import structlog
from structlog.contextvars import bind_contextvars, clear_contextvars

log = structlog.get_logger()

# bind() creates a child logger with extra fields
request_log = log.bind(request_id="req-abc123", user_id=42, tenant="acme")
request_log.info("order_placed", order_id="ord-789", amount=99.99)
request_log.info("payment_processed", gateway="stripe", status="success")
# Both lines include request_id, user_id, tenant automatically

# Further binding creates another child without mutating the parent
checkout_log = request_log.bind(flow="checkout")
checkout_log.info("cart_validated", items=3)

# contextvars approach for async code
async def handle_request(request_id: str, user_id: int):
    clear_contextvars()
    bind_contextvars(request_id=request_id, user_id=user_id)

    log.info("request_received")       # includes request_id, user_id
    await process_order()
    log.info("request_complete")       # still has context


async def process_order():
    log.info("processing_order")       # context from caller propagates
    bind_contextvars(order_id="ord-999")
    log.info("order_validated")        # now also has order_id

Integration with stdlib logging

Third-party libraries (uvicorn, SQLAlchemy, httpx) use the standard logging module. Route their records through structlog's formatter so all logs appear in the same JSON format, with the same processor chain.

import sys
import logging
import structlog

shared_processors = [
    structlog.contextvars.merge_contextvars,
    structlog.stdlib.add_log_level,
    structlog.stdlib.add_logger_name,
    structlog.processors.TimeStamper(fmt="iso", utc=True),
    structlog.processors.StackInfoRenderer(),
    structlog.processors.format_exc_info,
]

structlog.configure(
    processors=shared_processors + [structlog.processors.JSONRenderer()],
    logger_factory=structlog.stdlib.LoggerFactory(),
    wrapper_class=structlog.stdlib.BoundLogger,
    cache_logger_on_first_use=True,
)

# Configure stdlib to use the same formatter
formatter = structlog.stdlib.ProcessorFormatter(
    processor=structlog.processors.JSONRenderer(),
    foreign_pre_chain=shared_processors,
)

handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(formatter)

root_logger = logging.getLogger()
root_logger.addHandler(handler)
root_logger.setLevel(logging.INFO)

# Now uvicorn, sqlalchemy logs also appear as JSON
logging.getLogger("uvicorn.access").setLevel(logging.INFO)
logging.getLogger("sqlalchemy.engine").setLevel(logging.WARNING)

FastAPI Request Logging Middleware

Bind request context to structlog's contextvars at the start of each request so every log statement in any downstream handler automatically includes the request ID, method, path, and response status.

import time
import uuid
import structlog
from fastapi import FastAPI, Request
from structlog.contextvars import bind_contextvars, clear_contextvars
from starlette.middleware.base import BaseHTTPMiddleware

log = structlog.get_logger()
app = FastAPI()


class StructlogMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        clear_contextvars()
        request_id = request.headers.get("X-Request-ID", str(uuid.uuid4()))
        bind_contextvars(
            request_id=request_id,
            method=request.method,
            path=request.url.path,
        )

        start = time.perf_counter()
        try:
            response = await call_next(request)
            duration_ms = round((time.perf_counter() - start) * 1000, 2)
            log.info(
                "http_request",
                status=response.status_code,
                duration_ms=duration_ms,
            )
            response.headers["X-Request-ID"] = request_id
            return response
        except Exception as exc:
            log.exception("http_request_failed", exc_info=exc)
            raise


app.add_middleware(StructlogMiddleware)


@app.get("/orders/{order_id}")
async def get_order(order_id: int):
    log.info("fetching_order", order_id=order_id)
    # Any logs here automatically include request_id, method, path
    return {"order_id": order_id}

Async-Safe Logging with contextvars

Python's contextvars.ContextVar propagates context to child coroutines but not to sibling tasks created with asyncio.create_task() unless you explicitly copy the context. structlog's bind_contextvars uses this mechanism, so background tasks inherit the spawning request's context automatically.

import asyncio
import structlog
from structlog.contextvars import bind_contextvars, clear_contextvars

log = structlog.get_logger()


async def send_notification(order_id: str):
    # Inherits request_id from the parent context at task creation time
    log.info("sending_notification", order_id=order_id)
    await asyncio.sleep(0.1)
    log.info("notification_sent", order_id=order_id)


async def process_checkout(request_id: str, user_id: int):
    clear_contextvars()
    bind_contextvars(request_id=request_id, user_id=user_id)

    log.info("checkout_started")

    order_id = "ord-12345"
    bind_contextvars(order_id=order_id)

    # Task inherits a snapshot of current context (request_id, user_id, order_id)
    asyncio.create_task(send_notification(order_id))

    log.info("checkout_complete")


asyncio.run(process_checkout("req-abc", 42))

Shipping Logs to Datadog and ELK

structlog JSON output is ready for any log aggregator. Point Filebeat or Fluentd at your log files, or write directly to stdout and let your container runtime collect the output. For Datadog APM, inject trace IDs from ddtrace so logs and traces correlate in the UI.

# Datadog trace correlation processor
try:
    from ddtrace import tracer

    def inject_datadog_trace(logger, method, event_dict: dict) -> dict:
        span = tracer.current_span()
        if span:
            event_dict["dd.trace_id"] = str(span.trace_id)
            event_dict["dd.span_id"] = str(span.span_id)
            event_dict["dd.service"] = span.service
        return event_dict
except ImportError:
    def inject_datadog_trace(logger, method, event_dict): return event_dict


# Add to processor chain before JSONRenderer:
# inject_datadog_trace,

# For ELK via Filebeat (filebeat.yml):
# filebeat.inputs:
#   - type: log
#     paths: ["/var/log/app/*.json"]
#     json.keys_under_root: true
#     json.add_error_key: true
# output.elasticsearch:
#   hosts: ["elasticsearch:9200"]
#   index: "python-logs-%{+yyyy.MM.dd}"

# Write logs to file for collection
import logging

file_handler = logging.FileHandler("/var/log/app/service.json")
file_handler.setLevel(logging.INFO)
logging.getLogger().addHandler(file_handler)

Frequently Asked Questions

structlog vs standard logging — when to use each?
Use structlog as the application logging API everywhere in your code. The standard library still underpins it, but structlog encourages key-value logging instead of f-string interpolation. Third-party libraries use stdlib; configure structlog's ProcessorFormatter so their records also appear in JSON.
How do I log exceptions properly?
Use log.exception("db_failed") inside an except block, or log.error("db_failed", exc_info=True). The format_exc_info processor serializes the traceback into the exception field of the JSON event — fully indexed and searchable in any log platform.
Does structlog add significant overhead?
Minimal. structlog uses cache_logger_on_first_use=True to avoid repeated processor-chain lookups. For high-throughput services, use PrintLoggerFactory with buffered stdout. The bottleneck is almost always I/O, not structlog's processing.