Python Kubernetes Operator with kopf
A Kubernetes operator extends the Kubernetes API with custom resources and business logic that manages those resources — automating what a human operator would do. kopf (Kubernetes Operator Pythonic Framework) makes writing Python operators as simple as decorating async functions with @kopf.on.create, @kopf.on.update, and @kopf.on.delete. This guide builds a complete operator that manages a custom Database resource — creating PostgreSQL instances, updating configurations, and cleaning up on deletion.
Table of Contents
Operator Concepts
Key terms:
- CRD (Custom Resource Definition) — extends the Kubernetes API with new resource types like
DatabaseorRedisCluster - CR (Custom Resource) — an instance of a CRD, like a specific
Databasenamedorders-db - Controller/Operator — a process that watches CRs and reconciles actual state with desired state
- Reconciliation — compare desired state (spec) with current state (status) and make changes to converge them
- kopf — handles the watch/event loop, retry logic, and structured logging, so you write only the business logic
pip install kopf kubernetes pydantic
Custom Resource Definition
# crd.yaml — install with: kubectl apply -f crd.yaml
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: databases.techoral.com
spec:
group: techoral.com
versions:
- name: v1
served: true
storage: true
schema:
openAPIV3Schema:
type: object
properties:
spec:
type: object
required: [engine, version, storage]
properties:
engine:
type: string
enum: [postgresql, mysql, mariadb]
version:
type: string
storage:
type: string
pattern: '^[0-9]+(Gi|Mi)$'
replicas:
type: integer
minimum: 1
maximum: 5
default: 1
backupEnabled:
type: boolean
default: true
status:
type: object
x-kubernetes-preserve-unknown-fields: true
subresources:
status: {}
scope: Namespaced
names:
plural: databases
singular: database
kind: Database
shortNames: [db]
# example-database.yaml
apiVersion: techoral.com/v1
kind: Database
metadata:
name: orders-db
namespace: production
spec:
engine: postgresql
version: "16"
storage: 100Gi
replicas: 2
backupEnabled: true
kopf Event Handlers
import asyncio
import logging
import kopf
from kubernetes import client, config as k8s_config
from kubernetes.client.rest import ApiException
log = logging.getLogger(__name__)
@kopf.on.startup()
async def startup(logger, **kwargs):
"""Called once when the operator starts."""
logger.info("Database operator starting up")
# Load kube config (in-cluster in prod, local ~/.kube/config in dev)
try:
k8s_config.load_incluster_config()
except k8s_config.ConfigException:
k8s_config.load_kube_config()
@kopf.on.create("techoral.com", "v1", "databases")
async def on_database_create(name, namespace, spec, status, logger, patch, **kwargs):
"""Called when a new Database resource is created."""
logger.info(f"Creating database {name} in {namespace}")
engine = spec["engine"]
version = spec["version"]
storage = spec["storage"]
replicas = spec.get("replicas", 1)
try:
# Create the underlying Kubernetes resources
await create_statefulset(name, namespace, engine, version, storage, replicas)
await create_service(name, namespace)
await create_pvc(name, namespace, storage)
if spec.get("backupEnabled", True):
await create_backup_cronjob(name, namespace)
# Update status
patch.status["state"] = "Provisioning"
patch.status["engine"] = engine
patch.status["version"] = version
logger.info(f"Database {name} provisioning started")
except Exception as e:
logger.error(f"Failed to create database {name}: {e}")
patch.status["state"] = "Failed"
patch.status["error"] = str(e)
raise kopf.PermanentError(f"Database creation failed: {e}")
@kopf.on.update("techoral.com", "v1", "databases")
async def on_database_update(name, namespace, spec, old, new, diff, logger, patch, **kwargs):
"""Called when a Database resource is modified."""
logger.info(f"Updating database {name}: {diff}")
for op, field, old_val, new_val in diff:
if field == ("spec", "replicas"):
logger.info(f"Scaling {name} from {old_val} to {new_val} replicas")
await scale_statefulset(name, namespace, new_val)
patch.status["replicas"] = new_val
elif field == ("spec", "storage"):
logger.warning(f"Storage resize for {name} requires manual PVC expansion")
patch.status["pendingAction"] = f"Resize storage to {new_val}"
@kopf.on.delete("techoral.com", "v1", "databases")
async def on_database_delete(name, namespace, spec, logger, **kwargs):
"""Called when a Database resource is deleted."""
logger.info(f"Deleting database {name} from {namespace}")
# Clean up child resources (kopf tracks ownership — owned resources auto-deleted)
await delete_backup_cronjob(name, namespace)
logger.info(f"Database {name} cleanup complete")
@kopf.on.field("techoral.com", "v1", "databases", field="spec.backupEnabled")
async def on_backup_toggled(name, namespace, old, new, logger, **kwargs):
"""Called when the backupEnabled field specifically changes."""
if new and not old:
await create_backup_cronjob(name, namespace)
logger.info(f"Enabled backups for {name}")
elif not new and old:
await delete_backup_cronjob(name, namespace)
logger.info(f"Disabled backups for {name}")
Reconciliation Loop
@kopf.timer("techoral.com", "v1", "databases", interval=60.0)
async def reconcile_database(name, namespace, spec, status, logger, patch, **kwargs):
"""Runs every 60 seconds to reconcile actual vs desired state."""
apps_v1 = client.AppsV1Api()
desired_replicas = spec.get("replicas", 1)
try:
sts = apps_v1.read_namespaced_stateful_set(name=f"{name}-db", namespace=namespace)
actual_replicas = sts.status.ready_replicas or 0
if actual_replicas != desired_replicas:
logger.warning(f"{name}: ready={actual_replicas} desired={desired_replicas}, reconciling")
await scale_statefulset(name, namespace, desired_replicas)
# Update status with current state
patch.status["readyReplicas"] = actual_replicas
patch.status["state"] = "Ready" if actual_replicas == desired_replicas else "Degraded"
except ApiException as e:
if e.status == 404:
logger.warning(f"StatefulSet for {name} missing, recreating")
await create_statefulset(name, namespace, spec["engine"], spec["version"],
spec["storage"], desired_replicas)
else:
raise
Status and Conditions
from datetime import datetime, timezone
def make_condition(type_: str, status: str, reason: str, message: str) -> dict:
return {
"type": type_,
"status": status,
"reason": reason,
"message": message,
"lastTransitionTime": datetime.now(timezone.utc).isoformat(),
}
@kopf.on.create("techoral.com", "v1", "databases")
async def on_create_with_conditions(name, namespace, spec, patch, logger, **kwargs):
# Set initial conditions
patch.status["conditions"] = [
make_condition("Ready", "False", "Provisioning", "Database is being provisioned"),
make_condition("Available", "False", "Provisioning", "Waiting for first replica"),
]
try:
await provision_database(name, namespace, spec)
# Update to ready
patch.status["conditions"] = [
make_condition("Ready", "True", "DatabaseReady", "All replicas healthy"),
make_condition("Available", "True", "DatabaseAvailable", "Database accepting connections"),
]
patch.status["state"] = "Ready"
patch.status["endpoint"] = f"{name}-db.{namespace}.svc.cluster.local:5432"
except Exception as e:
patch.status["conditions"] = [
make_condition("Ready", "False", "ProvisionFailed", str(e)),
]
patch.status["state"] = "Failed"
raise
async def provision_database(name, namespace, spec):
await asyncio.sleep(0) # stub
Using the Kubernetes API
from kubernetes import client
from kubernetes.client.rest import ApiException
async def create_statefulset(name: str, namespace: str, engine: str, version: str, storage: str, replicas: int):
apps_v1 = client.AppsV1Api()
image = f"bitnami/{engine}:{version}"
sts = client.V1StatefulSet(
metadata=client.V1ObjectMeta(name=f"{name}-db", namespace=namespace,
labels={"app": name, "managed-by": "database-operator"}),
spec=client.V1StatefulSetSpec(
replicas=replicas,
selector=client.V1LabelSelector(match_labels={"app": f"{name}-db"}),
service_name=f"{name}-db",
template=client.V1PodTemplateSpec(
metadata=client.V1ObjectMeta(labels={"app": f"{name}-db"}),
spec=client.V1PodSpec(
containers=[
client.V1Container(
name="db",
image=image,
ports=[client.V1ContainerPort(container_port=5432)],
resources=client.V1ResourceRequirements(
requests={"memory": "256Mi", "cpu": "100m"},
limits={"memory": "1Gi", "cpu": "500m"},
),
)
]
),
),
),
)
try:
apps_v1.create_namespaced_stateful_set(namespace=namespace, body=sts)
except ApiException as e:
if e.status == 409:
pass # already exists
else:
raise
async def scale_statefulset(name: str, namespace: str, replicas: int):
apps_v1 = client.AppsV1Api()
apps_v1.patch_namespaced_stateful_set_scale(
name=f"{name}-db",
namespace=namespace,
body={"spec": {"replicas": replicas}},
)
async def create_service(name, namespace): pass
async def create_pvc(name, namespace, storage): pass
async def create_backup_cronjob(name, namespace): pass
async def delete_backup_cronjob(name, namespace): pass
Deploying the Operator
# Dockerfile
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY operator.py .
CMD ["kopf", "run", "operator.py", "--liveness=http://0.0.0.0:8080/healthz"]
# operator-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: database-operator
namespace: operators
spec:
replicas: 1
selector:
matchLabels:
app: database-operator
template:
metadata:
labels:
app: database-operator
spec:
serviceAccountName: database-operator
containers:
- name: operator
image: registry.techoral.com/database-operator:v1.0.0
ports:
- containerPort: 8080
name: healthz
livenessProbe:
httpGet:
path: /healthz
port: healthz
initialDelaySeconds: 15
periodSeconds: 20
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: database-operator
namespace: operators
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: database-operator
rules:
- apiGroups: [techoral.com]
resources: [databases, databases/status]
verbs: [get, list, watch, create, update, patch, delete]
- apiGroups: [apps]
resources: [statefulsets]
verbs: [get, list, watch, create, update, patch, delete]
- apiGroups: [""]
resources: [services, persistentvolumeclaims]
verbs: [get, list, watch, create, update, patch, delete]
- apiGroups: [batch]
resources: [cronjobs]
verbs: [get, list, watch, create, update, patch, delete]
Frequently Asked Questions
- kopf vs Operator SDK (Go) — when to use Python?
- Use kopf when your team is primarily Python, when rapid iteration matters more than raw performance, or when the operator's business logic is complex and benefits from Python's rich ecosystem. Use Go Operator SDK for high-scale operators managing thousands of resources where Go's lower memory footprint and startup time matter.
- How do I handle idempotency in operator handlers?
- Operators must be idempotent — handlers can be called multiple times for the same event (retries, restarts). Always use create-or-update patterns (
applyin kubectl terms,patchin API terms) rather than pure create. Check if a resource exists before creating it and update if it does. - What is the difference between @kopf.on.create and @kopf.timer?
@kopf.on.createfires once when a CR is first created — use it for initial provisioning.@kopf.timerfires periodically for ongoing reconciliation — use it to detect drift and correct it. Both are needed in a production operator.