Python Feature Flags with LaunchDarkly and Custom Toggles

Feature flags decouple code deployment from feature releases, letting you ship code to production with features disabled, then enable them for specific users, percentages, or conditions without redeploying. They also serve as kill switches — instantly disabling broken features without a rollback. This guide implements a production-ready feature flag system in Python: a self-hosted Redis-backed engine with percentage rollouts and user targeting, plus LaunchDarkly SDK integration for managed flag infrastructure.

Custom Redis-Backed Feature Flags

A self-hosted flag system stores flag configurations in Redis as JSON. The evaluation engine reads flags with a local cache (refreshed every 30 seconds) so there's zero Redis latency on the hot path. Flag changes propagate within the cache TTL.

import hashlib
import json
import time
import threading
import redis
from dataclasses import dataclass, field
from typing import Any


@dataclass
class Flag:
    name: str
    enabled: bool = False
    rollout_percentage: float = 100.0    # 0-100, only relevant when enabled=True
    allowed_users: list[str] = field(default_factory=list)
    allowed_groups: list[str] = field(default_factory=list)
    value: Any = None                    # multivariate flag value


class FeatureFlags:
    CACHE_TTL = 30  # seconds

    def __init__(self, redis_client: redis.Redis, prefix: str = "ff:"):
        self._redis = redis_client
        self._prefix = prefix
        self._cache: dict[str, Flag] = {}
        self._cache_expires: float = 0
        self._lock = threading.Lock()

    def _load_flags(self):
        now = time.monotonic()
        if now < self._cache_expires:
            return
        with self._lock:
            if now < self._cache_expires:
                return
            keys = self._redis.keys(f"{self._prefix}*")
            flags = {}
            for key in keys:
                raw = self._redis.get(key)
                if raw:
                    data = json.loads(raw)
                    flag_name = key.decode().removeprefix(self._prefix)
                    flags[flag_name] = Flag(**data)
            self._cache = flags
            self._cache_expires = now + self.CACHE_TTL

    def set_flag(self, flag: Flag):
        key = f"{self._prefix}{flag.name}"
        self._redis.set(key, json.dumps({
            "name": flag.name,
            "enabled": flag.enabled,
            "rollout_percentage": flag.rollout_percentage,
            "allowed_users": flag.allowed_users,
            "allowed_groups": flag.allowed_groups,
            "value": flag.value,
        }))
        self._cache_expires = 0  # invalidate cache

    def _hash_user(self, user_id: str, flag_name: str) -> float:
        """Consistent hash 0-100 for stable percentage rollout."""
        key = f"{flag_name}:{user_id}".encode()
        digest = hashlib.sha256(key).hexdigest()
        return (int(digest[:8], 16) / 0xFFFFFFFF) * 100

    def is_enabled(self, flag_name: str, user_id: str = "", groups: list[str] | None = None) -> bool:
        self._load_flags()
        flag = self._cache.get(flag_name)
        if flag is None or not flag.enabled:
            return False
        if flag.allowed_users and user_id in flag.allowed_users:
            return True
        if flag.allowed_groups and groups:
            if any(g in flag.allowed_groups for g in groups):
                return True
        if flag.rollout_percentage >= 100:
            return True
        if flag.rollout_percentage <= 0:
            return False
        return self._hash_user(user_id, flag_name) < flag.rollout_percentage

    def get_value(self, flag_name: str, default: Any = None) -> Any:
        self._load_flags()
        flag = self._cache.get(flag_name)
        return flag.value if flag and flag.enabled else default

Percentage Rollouts

import redis

r = redis.Redis(host="localhost", port=6379, decode_responses=True)
ff = FeatureFlags(r)

# Create flags
ff.set_flag(Flag(name="new_checkout_flow", enabled=True, rollout_percentage=10))
ff.set_flag(Flag(name="dark_mode", enabled=True, rollout_percentage=50))
ff.set_flag(Flag(name="ai_recommendations", enabled=False))   # kill switch

# Test rollout consistency
results = {}
for user_id in [f"user-{i}" for i in range(1000)]:
    results[user_id] = ff.is_enabled("new_checkout_flow", user_id)

enabled_count = sum(1 for v in results.values() if v)
print(f"Enabled for {enabled_count}/1000 users = {enabled_count/10:.1f}%")
# Should be ~10%

# Same user always gets same result (stable hash)
assert ff.is_enabled("new_checkout_flow", "user-42") == ff.is_enabled("new_checkout_flow", "user-42")

User Targeting Rules

from dataclasses import dataclass

# Allow specific beta users regardless of rollout percentage
ff.set_flag(Flag(
    name="new_billing_page",
    enabled=True,
    rollout_percentage=5,
    allowed_users=["admin@techoral.com", "beta-user-123"],
    allowed_groups=["beta_testers", "internal"],
))

# Check with user context
def check_feature(user_id: str, email: str, plan: str) -> dict:
    groups = ["beta_testers"] if plan == "enterprise" else []
    return {
        "new_billing_page": ff.is_enabled("new_billing_page", user_id, groups),
        "ai_recommendations": ff.is_enabled("ai_recommendations", user_id, groups),
        "dark_mode": ff.is_enabled("dark_mode", user_id),
    }

features = check_feature("user-42", "user@example.com", "enterprise")
print(features)

# Multivariate flag — return a value instead of boolean
ff.set_flag(Flag(
    name="checkout_variant",
    enabled=True,
    value="variant_b",  # could be "control", "variant_a", "variant_b"
))

variant = ff.get_value("checkout_variant", default="control")
print(f"Checkout variant: {variant}")

LaunchDarkly SDK Integration

pip install launchdarkly-server-sdk
import ldclient
from ldclient.config import Config
from ldclient import Context
import os

ldclient.set_config(Config(os.environ["LAUNCHDARKLY_SDK_KEY"]))
client = ldclient.get()

# Simple boolean flag
def is_new_dashboard_enabled(user_id: str, email: str) -> bool:
    context = Context.builder(user_id).kind("user").set("email", email).build()
    return client.variation("new-dashboard", context, False)

# Multivariate flag — returns a string variant
def get_pricing_variant(user_id: str, plan: str) -> str:
    context = (
        Context.builder(user_id)
        .kind("user")
        .set("plan", plan)
        .build()
    )
    return client.variation("pricing-page-variant", context, "control")

# Flag with detail (reason for evaluation result)
def check_flag_with_reason(user_id: str, flag_key: str) -> dict:
    context = Context.builder(user_id).kind("user").build()
    detail = client.variation_detail(flag_key, context, False)
    return {
        "value": detail.value,
        "reason": detail.reason,
        "variation_index": detail.variation_index,
    }

# Always close the client on shutdown
import atexit
atexit.register(client.close)

FastAPI Middleware

Inject feature flags into request state via middleware so every handler has access to flags for the current user without re-evaluating them on each call.

import redis
from fastapi import FastAPI, Request, Depends
from starlette.middleware.base import BaseHTTPMiddleware

r = redis.Redis(host="localhost", port=6379, decode_responses=True)
ff = FeatureFlags(r)
app = FastAPI()


class FeatureFlagMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        user_id = request.headers.get("X-User-ID", "anonymous")
        groups = request.headers.get("X-User-Groups", "").split(",")

        # Evaluate all flags once per request
        request.state.features = {
            "new_checkout": ff.is_enabled("new_checkout_flow", user_id, groups),
            "dark_mode": ff.is_enabled("dark_mode", user_id),
            "ai_recommendations": ff.is_enabled("ai_recommendations", user_id, groups),
        }
        request.state.user_id = user_id

        return await call_next(request)


app.add_middleware(FeatureFlagMiddleware)


@app.get("/checkout")
async def checkout(request: Request):
    if request.state.features["new_checkout"]:
        return {"flow": "new", "steps": ["cart", "shipping", "payment", "confirm"]}
    return {"flow": "legacy", "steps": ["cart", "address", "payment"]}


@app.get("/")
async def home(request: Request):
    return {
        "features": request.state.features,
        "user_id": request.state.user_id,
    }

Usage Patterns

# 1. Kill switch — instantly disable broken feature
ff.set_flag(Flag(name="payment_v2", enabled=False))   # disable immediately

# 2. Canary release — 1% of traffic
ff.set_flag(Flag(name="new_algorithm", enabled=True, rollout_percentage=1))

# 3. Dark launch — run new code but ignore its output, verify in logs
async def process_order(order_id: str, user_id: str):
    result = await legacy_processor(order_id)
    if ff.is_enabled("new_processor_shadow", user_id):
        try:
            new_result = await new_processor(order_id)
            # Log differences but return legacy result
            if new_result != result:
                print(f"Shadow divergence for {order_id}: {new_result} vs {result}")
        except Exception as e:
            print(f"Shadow processor failed: {e}")
    return result

# 4. A/B test — run experiment and track metrics
async def show_pricing(user_id: str):
    variant = ff.get_value("pricing_experiment", default="control")
    # Track which variant the user saw
    await analytics.track("pricing_shown", user_id, {"variant": variant})
    return {"variant": variant, "price": 99 if variant == "variant_a" else 79}


async def legacy_processor(order_id): return {}
async def new_processor(order_id): return {}

Testing with Feature Flags

import pytest
from unittest.mock import patch, MagicMock


# Mock the feature flags in tests to test both branches
@pytest.fixture
def flags_enabled():
    with patch.object(FeatureFlags, "is_enabled", return_value=True):
        yield


@pytest.fixture
def flags_disabled():
    with patch.object(FeatureFlags, "is_enabled", return_value=False):
        yield


def test_new_checkout_flow_enabled(client, flags_enabled):
    response = client.get("/checkout", headers={"X-User-ID": "test-user"})
    assert response.json()["flow"] == "new"


def test_legacy_checkout_flow(client, flags_disabled):
    response = client.get("/checkout", headers={"X-User-ID": "test-user"})
    assert response.json()["flow"] == "legacy"


# Test percentage consistency
def test_rollout_consistency():
    r = MagicMock()
    r.keys.return_value = []
    ff = FeatureFlags(r)

    # Same user always gets same result
    h1 = ff._hash_user("user-42", "my-flag")
    h2 = ff._hash_user("user-42", "my-flag")
    assert h1 == h2

    # Different users get different results
    h3 = ff._hash_user("user-43", "my-flag")
    assert h1 != h3

Frequently Asked Questions

LaunchDarkly vs self-hosted — which to choose?
Use LaunchDarkly (or Unleash, Flagsmith, Split) when you want a management UI, audit logs, targeting rules, A/B test analytics, and team collaboration out of the box. Build self-hosted when you have strict data residency requirements, want to avoid another SaaS dependency, or have simple on/off flag needs.
How do feature flags affect performance?
With a local in-process cache, flag evaluation is sub-microsecond. The cache is refreshed from Redis every 30 seconds (configurable) in the background. There is no Redis round-trip on the hot path. LaunchDarkly's SDK streams flag changes via Server-Sent Events, updating the in-memory flag store instantly without polling.
When should I clean up feature flags?
Feature flags are technical debt. Remove flags once a rollout reaches 100% and has been stable for 1–2 weeks. Treat flags like branches — merge and delete when done. Add flag expiry dates to your flag registry and run a weekly audit of flags older than 30 days.