Python Kubernetes Operator with kopf

A Kubernetes operator extends the Kubernetes API with custom resources and business logic that manages those resources — automating what a human operator would do. kopf (Kubernetes Operator Pythonic Framework) makes writing Python operators as simple as decorating async functions with @kopf.on.create, @kopf.on.update, and @kopf.on.delete. This guide builds a complete operator that manages a custom Database resource — creating PostgreSQL instances, updating configurations, and cleaning up on deletion.

Operator Concepts

Key terms:
  • CRD (Custom Resource Definition) — extends the Kubernetes API with new resource types like Database or RedisCluster
  • CR (Custom Resource) — an instance of a CRD, like a specific Database named orders-db
  • Controller/Operator — a process that watches CRs and reconciles actual state with desired state
  • Reconciliation — compare desired state (spec) with current state (status) and make changes to converge them
  • kopf — handles the watch/event loop, retry logic, and structured logging, so you write only the business logic
pip install kopf kubernetes pydantic

Custom Resource Definition

# crd.yaml — install with: kubectl apply -f crd.yaml
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
  name: databases.techoral.com
spec:
  group: techoral.com
  versions:
    - name: v1
      served: true
      storage: true
      schema:
        openAPIV3Schema:
          type: object
          properties:
            spec:
              type: object
              required: [engine, version, storage]
              properties:
                engine:
                  type: string
                  enum: [postgresql, mysql, mariadb]
                version:
                  type: string
                storage:
                  type: string
                  pattern: '^[0-9]+(Gi|Mi)$'
                replicas:
                  type: integer
                  minimum: 1
                  maximum: 5
                  default: 1
                backupEnabled:
                  type: boolean
                  default: true
            status:
              type: object
              x-kubernetes-preserve-unknown-fields: true
      subresources:
        status: {}
  scope: Namespaced
  names:
    plural: databases
    singular: database
    kind: Database
    shortNames: [db]
# example-database.yaml
apiVersion: techoral.com/v1
kind: Database
metadata:
  name: orders-db
  namespace: production
spec:
  engine: postgresql
  version: "16"
  storage: 100Gi
  replicas: 2
  backupEnabled: true

kopf Event Handlers

import asyncio
import logging
import kopf
from kubernetes import client, config as k8s_config
from kubernetes.client.rest import ApiException

log = logging.getLogger(__name__)


@kopf.on.startup()
async def startup(logger, **kwargs):
    """Called once when the operator starts."""
    logger.info("Database operator starting up")
    # Load kube config (in-cluster in prod, local ~/.kube/config in dev)
    try:
        k8s_config.load_incluster_config()
    except k8s_config.ConfigException:
        k8s_config.load_kube_config()


@kopf.on.create("techoral.com", "v1", "databases")
async def on_database_create(name, namespace, spec, status, logger, patch, **kwargs):
    """Called when a new Database resource is created."""
    logger.info(f"Creating database {name} in {namespace}")
    engine = spec["engine"]
    version = spec["version"]
    storage = spec["storage"]
    replicas = spec.get("replicas", 1)

    try:
        # Create the underlying Kubernetes resources
        await create_statefulset(name, namespace, engine, version, storage, replicas)
        await create_service(name, namespace)
        await create_pvc(name, namespace, storage)
        if spec.get("backupEnabled", True):
            await create_backup_cronjob(name, namespace)

        # Update status
        patch.status["state"] = "Provisioning"
        patch.status["engine"] = engine
        patch.status["version"] = version
        logger.info(f"Database {name} provisioning started")

    except Exception as e:
        logger.error(f"Failed to create database {name}: {e}")
        patch.status["state"] = "Failed"
        patch.status["error"] = str(e)
        raise kopf.PermanentError(f"Database creation failed: {e}")


@kopf.on.update("techoral.com", "v1", "databases")
async def on_database_update(name, namespace, spec, old, new, diff, logger, patch, **kwargs):
    """Called when a Database resource is modified."""
    logger.info(f"Updating database {name}: {diff}")

    for op, field, old_val, new_val in diff:
        if field == ("spec", "replicas"):
            logger.info(f"Scaling {name} from {old_val} to {new_val} replicas")
            await scale_statefulset(name, namespace, new_val)
            patch.status["replicas"] = new_val

        elif field == ("spec", "storage"):
            logger.warning(f"Storage resize for {name} requires manual PVC expansion")
            patch.status["pendingAction"] = f"Resize storage to {new_val}"


@kopf.on.delete("techoral.com", "v1", "databases")
async def on_database_delete(name, namespace, spec, logger, **kwargs):
    """Called when a Database resource is deleted."""
    logger.info(f"Deleting database {name} from {namespace}")

    # Clean up child resources (kopf tracks ownership — owned resources auto-deleted)
    await delete_backup_cronjob(name, namespace)
    logger.info(f"Database {name} cleanup complete")


@kopf.on.field("techoral.com", "v1", "databases", field="spec.backupEnabled")
async def on_backup_toggled(name, namespace, old, new, logger, **kwargs):
    """Called when the backupEnabled field specifically changes."""
    if new and not old:
        await create_backup_cronjob(name, namespace)
        logger.info(f"Enabled backups for {name}")
    elif not new and old:
        await delete_backup_cronjob(name, namespace)
        logger.info(f"Disabled backups for {name}")

Reconciliation Loop

@kopf.timer("techoral.com", "v1", "databases", interval=60.0)
async def reconcile_database(name, namespace, spec, status, logger, patch, **kwargs):
    """Runs every 60 seconds to reconcile actual vs desired state."""
    apps_v1 = client.AppsV1Api()
    desired_replicas = spec.get("replicas", 1)

    try:
        sts = apps_v1.read_namespaced_stateful_set(name=f"{name}-db", namespace=namespace)
        actual_replicas = sts.status.ready_replicas or 0

        if actual_replicas != desired_replicas:
            logger.warning(f"{name}: ready={actual_replicas} desired={desired_replicas}, reconciling")
            await scale_statefulset(name, namespace, desired_replicas)

        # Update status with current state
        patch.status["readyReplicas"] = actual_replicas
        patch.status["state"] = "Ready" if actual_replicas == desired_replicas else "Degraded"

    except ApiException as e:
        if e.status == 404:
            logger.warning(f"StatefulSet for {name} missing, recreating")
            await create_statefulset(name, namespace, spec["engine"], spec["version"],
                                     spec["storage"], desired_replicas)
        else:
            raise

Status and Conditions

from datetime import datetime, timezone


def make_condition(type_: str, status: str, reason: str, message: str) -> dict:
    return {
        "type": type_,
        "status": status,
        "reason": reason,
        "message": message,
        "lastTransitionTime": datetime.now(timezone.utc).isoformat(),
    }


@kopf.on.create("techoral.com", "v1", "databases")
async def on_create_with_conditions(name, namespace, spec, patch, logger, **kwargs):
    # Set initial conditions
    patch.status["conditions"] = [
        make_condition("Ready", "False", "Provisioning", "Database is being provisioned"),
        make_condition("Available", "False", "Provisioning", "Waiting for first replica"),
    ]

    try:
        await provision_database(name, namespace, spec)

        # Update to ready
        patch.status["conditions"] = [
            make_condition("Ready", "True", "DatabaseReady", "All replicas healthy"),
            make_condition("Available", "True", "DatabaseAvailable", "Database accepting connections"),
        ]
        patch.status["state"] = "Ready"
        patch.status["endpoint"] = f"{name}-db.{namespace}.svc.cluster.local:5432"

    except Exception as e:
        patch.status["conditions"] = [
            make_condition("Ready", "False", "ProvisionFailed", str(e)),
        ]
        patch.status["state"] = "Failed"
        raise


async def provision_database(name, namespace, spec):
    await asyncio.sleep(0)  # stub

Using the Kubernetes API

from kubernetes import client
from kubernetes.client.rest import ApiException


async def create_statefulset(name: str, namespace: str, engine: str, version: str, storage: str, replicas: int):
    apps_v1 = client.AppsV1Api()
    image = f"bitnami/{engine}:{version}"

    sts = client.V1StatefulSet(
        metadata=client.V1ObjectMeta(name=f"{name}-db", namespace=namespace,
                                      labels={"app": name, "managed-by": "database-operator"}),
        spec=client.V1StatefulSetSpec(
            replicas=replicas,
            selector=client.V1LabelSelector(match_labels={"app": f"{name}-db"}),
            service_name=f"{name}-db",
            template=client.V1PodTemplateSpec(
                metadata=client.V1ObjectMeta(labels={"app": f"{name}-db"}),
                spec=client.V1PodSpec(
                    containers=[
                        client.V1Container(
                            name="db",
                            image=image,
                            ports=[client.V1ContainerPort(container_port=5432)],
                            resources=client.V1ResourceRequirements(
                                requests={"memory": "256Mi", "cpu": "100m"},
                                limits={"memory": "1Gi", "cpu": "500m"},
                            ),
                        )
                    ]
                ),
            ),
        ),
    )

    try:
        apps_v1.create_namespaced_stateful_set(namespace=namespace, body=sts)
    except ApiException as e:
        if e.status == 409:
            pass  # already exists
        else:
            raise


async def scale_statefulset(name: str, namespace: str, replicas: int):
    apps_v1 = client.AppsV1Api()
    apps_v1.patch_namespaced_stateful_set_scale(
        name=f"{name}-db",
        namespace=namespace,
        body={"spec": {"replicas": replicas}},
    )


async def create_service(name, namespace): pass
async def create_pvc(name, namespace, storage): pass
async def create_backup_cronjob(name, namespace): pass
async def delete_backup_cronjob(name, namespace): pass

Deploying the Operator

# Dockerfile
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY operator.py .
CMD ["kopf", "run", "operator.py", "--liveness=http://0.0.0.0:8080/healthz"]
# operator-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: database-operator
  namespace: operators
spec:
  replicas: 1
  selector:
    matchLabels:
      app: database-operator
  template:
    metadata:
      labels:
        app: database-operator
    spec:
      serviceAccountName: database-operator
      containers:
        - name: operator
          image: registry.techoral.com/database-operator:v1.0.0
          ports:
            - containerPort: 8080
              name: healthz
          livenessProbe:
            httpGet:
              path: /healthz
              port: healthz
            initialDelaySeconds: 15
            periodSeconds: 20
---
apiVersion: v1
kind: ServiceAccount
metadata:
  name: database-operator
  namespace: operators
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: database-operator
rules:
  - apiGroups: [techoral.com]
    resources: [databases, databases/status]
    verbs: [get, list, watch, create, update, patch, delete]
  - apiGroups: [apps]
    resources: [statefulsets]
    verbs: [get, list, watch, create, update, patch, delete]
  - apiGroups: [""]
    resources: [services, persistentvolumeclaims]
    verbs: [get, list, watch, create, update, patch, delete]
  - apiGroups: [batch]
    resources: [cronjobs]
    verbs: [get, list, watch, create, update, patch, delete]

Frequently Asked Questions

kopf vs Operator SDK (Go) — when to use Python?
Use kopf when your team is primarily Python, when rapid iteration matters more than raw performance, or when the operator's business logic is complex and benefits from Python's rich ecosystem. Use Go Operator SDK for high-scale operators managing thousands of resources where Go's lower memory footprint and startup time matter.
How do I handle idempotency in operator handlers?
Operators must be idempotent — handlers can be called multiple times for the same event (retries, restarts). Always use create-or-update patterns (apply in kubectl terms, patch in API terms) rather than pure create. Check if a resource exists before creating it and update if it does.
What is the difference between @kopf.on.create and @kopf.timer?
@kopf.on.create fires once when a CR is first created — use it for initial provisioning. @kopf.timer fires periodically for ongoing reconciliation — use it to detect drift and correct it. Both are needed in a production operator.