MLOps (Machine Learning Operations) applies DevOps principles to the full ML lifecycle: data versioning, experiment tracking, model training pipelines, reproducible builds, staged deployments, and continuous monitoring. Without MLOps, ML projects suffer from "experiment chaos" — untracked experiments, unversioned models, and no path from notebook to production. In 2026, MLOps tooling has matured significantly, with MLflow, DVC, and BentoML emerging as the open-source leaders for teams that want control without vendor lock-in.
This guide covers the practical MLOps stack: experiment tracking with MLflow, data versioning with DVC, model serving with FastAPI, containerization with Docker, drift monitoring, and CI/CD pipelines for automated model retraining and deployment.
MLflow Tracking records experiment parameters, metrics, artifacts, and code versions so you can compare runs and reproduce results. Every training run is logged to the MLflow server with full provenance — which dataset, which hyperparameters, which code commit. This replaces the anti-pattern of naming files model_final_v3_REAL.pkl.
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score, roc_auc_score
from sklearn.datasets import load_breast_cancer
import numpy as np
# Connect to MLflow tracking server (local or remote)
mlflow.set_tracking_uri("http://localhost:5000") # Or "mlruns" for local file storage
mlflow.set_experiment("breast-cancer-classifier")
# Load data
data = load_breast_cancer()
X_train, X_test, y_train, y_test = train_test_split(
data.data, data.target, test_size=0.2, random_state=42, stratify=data.target
)
# Hyperparameter grid to try
configs = [
{"n_estimators": 100, "max_depth": 5, "min_samples_split": 2},
{"n_estimators": 200, "max_depth": 10, "min_samples_split": 5},
{"n_estimators": 50, "max_depth": None, "min_samples_split": 2},
]
for params in configs:
with mlflow.start_run(run_name=f"rf-{params['n_estimators']}-depth{params['max_depth']}"):
# Log parameters
mlflow.log_params(params)
mlflow.log_param("dataset", "breast_cancer_sklearn")
mlflow.log_param("test_size", 0.2)
# Train
model = RandomForestClassifier(**params, random_state=42, n_jobs=-1)
model.fit(X_train, y_train)
# Evaluate
y_pred = model.predict(X_test)
y_prob = model.predict_proba(X_test)[:, 1]
metrics = {
"accuracy": accuracy_score(y_test, y_pred),
"f1": f1_score(y_test, y_pred),
"roc_auc": roc_auc_score(y_test, y_prob),
}
mlflow.log_metrics(metrics)
print(f"AUC: {metrics['roc_auc']:.4f} | params: {params}")
# Log model with signature
from mlflow.models import infer_signature
signature = infer_signature(X_train, model.predict(X_train))
mlflow.sklearn.log_model(model, "model", signature=signature)
# Log feature importances as artifact
import json
importances = dict(zip(data.feature_names, model.feature_importances_.tolist()))
mlflow.log_dict(importances, "feature_importances.json")
DVC (Data Version Control) versions large datasets and model files using Git-compatible metadata files, storing the actual data in remote storage (S3, GCS, Azure Blob). This gives you Git-style data versioning without storing binaries in Git — checkout a specific data version and your pipeline reproducibility is guaranteed.
# Initialize DVC in a Git repo
git init ml-project && cd ml-project
dvc init
git add .dvc .gitignore
git commit -m "Initialize DVC"
# Configure remote storage (S3)
dvc remote add -d s3remote s3://my-ml-bucket/dvc-cache
dvc remote modify s3remote region us-east-1
git add .dvc/config && git commit -m "Add S3 remote"
# Track a dataset
dvc add data/train.csv
git add data/train.csv.dvc data/.gitignore
git commit -m "Add training data v1"
# Push data to remote
dvc push
# Later: checkout a specific version
git checkout v1.0
dvc checkout # Downloads the data version linked to this git commit
# dvc.yaml — define reproducible pipeline stages
stages:
preprocess:
cmd: python src/preprocess.py --input data/raw.csv --output data/processed.csv
deps:
- src/preprocess.py
- data/raw.csv
outs:
- data/processed.csv
train:
cmd: python src/train.py --data data/processed.csv --output models/
deps:
- src/train.py
- data/processed.csv
params:
- params.yaml:
- train.n_estimators
- train.max_depth
outs:
- models/model.pkl
metrics:
- metrics/eval.json
A reproducible training pipeline produces the same model given the same data, code, and configuration. Key ingredients: fixed random seeds, pinned dependency versions, deterministic data splits, and logged configuration. Use a configuration file (YAML or Pydantic) rather than hardcoded values, so every training run's configuration is automatically tracked.
from dataclasses import dataclass, asdict
from pathlib import Path
import yaml, mlflow, json
import numpy as np
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import cross_val_score
@dataclass
class TrainConfig:
"""All training configuration in one place — logged to MLflow automatically."""
data_path: str = "data/processed.csv"
target_column: str = "label"
test_size: float = 0.2
random_seed: int = 42
n_estimators: int = 200
max_depth: int = 5
learning_rate: float = 0.1
cv_folds: int = 5
model_output: str = "models/model.pkl"
def train(config: TrainConfig):
np.random.seed(config.random_seed)
with mlflow.start_run():
mlflow.log_params(asdict(config))
import pandas as pd
df = pd.read_csv(config.data_path)
X = df.drop(columns=[config.target_column])
y = df[config.target_column]
pipeline = Pipeline([
("scaler", StandardScaler()),
("model", GradientBoostingClassifier(
n_estimators=config.n_estimators,
max_depth=config.max_depth,
learning_rate=config.learning_rate,
random_state=config.random_seed,
))
])
# Cross-validation for robust evaluation
cv_scores = cross_val_score(pipeline, X, y, cv=config.cv_folds, scoring="roc_auc")
mlflow.log_metric("cv_roc_auc_mean", cv_scores.mean())
mlflow.log_metric("cv_roc_auc_std", cv_scores.std())
# Final fit on full training data
pipeline.fit(X, y)
mlflow.sklearn.log_model(pipeline, "pipeline")
metrics = {"cv_auc_mean": float(cv_scores.mean()), "cv_auc_std": float(cv_scores.std())}
Path("metrics/eval.json").parent.mkdir(exist_ok=True)
Path("metrics/eval.json").write_text(json.dumps(metrics))
mlflow.log_artifact("metrics/eval.json")
return pipeline, metrics
if __name__ == "__main__":
config = TrainConfig()
model, metrics = train(config)
print(f"CV AUC: {metrics['cv_auc_mean']:.4f} ± {metrics['cv_auc_std']:.4f}")
The MLflow Model Registry provides a central hub for managing model lifecycle stages: Staging, Production, and Archived. Models are promoted through stages via code or the MLflow UI, with full audit trail of who promoted what and when. This replaces ad-hoc file management with a governed, versioned model store.
import mlflow
from mlflow.tracking import MlflowClient
client = MlflowClient("http://localhost:5000")
MODEL_NAME = "cancer-classifier"
def promote_best_model(experiment_name: str, metric: str = "roc_auc", stage: str = "Staging"):
"""Find the best run and register/promote it."""
experiment = client.get_experiment_by_name(experiment_name)
runs = client.search_runs(
experiment_ids=[experiment.experiment_id],
order_by=[f"metrics.{metric} DESC"],
max_results=1,
)
if not runs:
raise ValueError("No runs found")
best_run = runs[0]
run_id = best_run.info.run_id
print(f"Best run: {run_id} | {metric}={best_run.data.metrics[metric]:.4f}")
# Register model
model_uri = f"runs:/{run_id}/model"
mv = mlflow.register_model(model_uri, MODEL_NAME)
print(f"Registered: {MODEL_NAME} v{mv.version}")
# Transition to target stage
client.transition_model_version_stage(
name=MODEL_NAME,
version=mv.version,
stage=stage,
archive_existing_versions=False,
)
client.update_model_version(
name=MODEL_NAME,
version=mv.version,
description=f"Promoted from run {run_id} | {metric}={best_run.data.metrics[metric]:.4f}",
)
return mv
def load_production_model():
"""Load the current production model."""
return mlflow.sklearn.load_model(f"models:/{MODEL_NAME}/Production")
promote_best_model("breast-cancer-classifier", metric="roc_auc", stage="Staging")
FastAPI makes it straightforward to wrap a trained model in a production REST API. The model is loaded once at startup, prediction requests are validated with Pydantic, and the endpoint returns structured JSON predictions. Add background tasks for async logging and middleware for authentication in production.
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
import mlflow.sklearn
import numpy as np
import logging
from contextlib import asynccontextmanager
MODEL_NAME = "cancer-classifier"
model = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global model
logging.info("Loading model from MLflow registry...")
model = mlflow.sklearn.load_model(f"models:/{MODEL_NAME}/Production")
logging.info("Model loaded successfully")
yield
model = None
app = FastAPI(title="Cancer Classifier API", version="1.0.0", lifespan=lifespan)
class PredictionRequest(BaseModel):
features: list[float] = Field(..., min_length=30, max_length=30,
description="30 cancer features from sklearn breast_cancer dataset")
class PredictionResponse(BaseModel):
prediction: int # 0 = malignant, 1 = benign
probability_benign: float
probability_malignant: float
model_version: str = MODEL_NAME
@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
if model is None:
raise HTTPException(status_code=503, detail="Model not loaded")
try:
X = np.array(request.features).reshape(1, -1)
prediction = int(model.predict(X)[0])
probabilities = model.predict_proba(X)[0]
return PredictionResponse(
prediction=prediction,
probability_malignant=float(probabilities[0]),
probability_benign=float(probabilities[1]),
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/health")
async def health():
return {"status": "healthy", "model_loaded": model is not None}
Docker containers ensure the serving environment exactly matches the training environment, eliminating "it worked in the notebook" failures. Use multi-stage builds to keep production images lean — the build stage installs all dependencies; the final stage contains only what's needed to serve predictions.
# Dockerfile for ML model serving
FROM python:3.12-slim AS builder
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --user -r requirements.txt
FROM python:3.12-slim AS production
WORKDIR /app
COPY --from=builder /root/.local /root/.local
COPY src/ src/
COPY models/ models/
ENV PATH=/root/.local/bin:$PATH
ENV MLFLOW_TRACKING_URI=http://mlflow-server:5000
ENV MODEL_NAME=cancer-classifier
EXPOSE 8000
HEALTHCHECK --interval=30s --timeout=10s --start-period=60s \
CMD curl -f http://localhost:8000/health || exit 1
CMD ["uvicorn", "src.api:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "2"]
# docker-compose.yml — full MLOps stack
version: "3.9"
services:
mlflow:
image: ghcr.io/mlflow/mlflow:v2.13.0
ports: ["5000:5000"]
volumes: ["./mlruns:/mlruns"]
command: mlflow server --host 0.0.0.0 --backend-store-uri /mlruns
model-api:
build: .
ports: ["8000:8000"]
depends_on: [mlflow]
environment:
- MLFLOW_TRACKING_URI=http://mlflow:5000
deploy:
replicas: 2
resources:
limits: {cpus: "1.0", memory: 512M}
Model performance degrades when the real-world data distribution shifts away from the training distribution (data drift) or when the relationship between features and labels changes (concept drift). Monitoring catches drift before it silently degrades user experience. Log predictions and ground truth, compute statistical tests on rolling windows, and alert when drift is detected.
import numpy as np
from scipy import stats
from dataclasses import dataclass
from datetime import datetime
@dataclass
class DriftAlert:
feature: str
p_value: float
drift_detected: bool
timestamp: str
def detect_feature_drift(
reference: np.ndarray,
current: np.ndarray,
alpha: float = 0.05,
) -> dict:
"""Kolmogorov-Smirnov test for distribution shift."""
ks_stat, p_value = stats.ks_2samp(reference, current)
return {
"ks_statistic": float(ks_stat),
"p_value": float(p_value),
"drift_detected": p_value < alpha,
"timestamp": datetime.utcnow().isoformat(),
}
def monitor_prediction_drift(
reference_preds: np.ndarray,
current_preds: np.ndarray,
alert_threshold: float = 0.05,
) -> dict:
"""Monitor for significant shift in prediction distribution."""
# PSI (Population Stability Index) for prediction scores
bins = np.linspace(0, 1, 11)
ref_hist, _ = np.histogram(reference_preds, bins=bins, density=True)
cur_hist, _ = np.histogram(current_preds, bins=bins, density=True)
# Add small epsilon to avoid log(0)
eps = 1e-8
ref_hist = ref_hist + eps
cur_hist = cur_hist + eps
psi = np.sum((cur_hist - ref_hist) * np.log(cur_hist / ref_hist))
severity = "none" if psi < 0.1 else "warning" if psi < 0.2 else "critical"
return {"psi": float(psi), "severity": severity, "alert": psi >= alert_threshold}
# Example usage
reference_scores = np.random.beta(2, 5, 1000) # Training distribution
current_scores = np.random.beta(5, 2, 500) # Shifted distribution
result = monitor_prediction_drift(reference_scores, current_scores)
print(f"PSI: {result['psi']:.3f} | Severity: {result['severity']}")
ML CI/CD pipelines automate training, evaluation, and deployment when code or data changes. The pipeline must gate deployment on metric thresholds — only promote a new model if it beats the current production model on the evaluation set. Use GitHub Actions or GitLab CI with DVC for pipeline reproducibility and MLflow for experiment comparison.
# .github/workflows/ml-pipeline.yml
name: ML Training and Deployment
on:
push:
paths: ["src/**", "data/**", "params.yaml"]
jobs:
train-and-evaluate:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with: {python-version: "3.12"}
- name: Install dependencies
run: pip install -r requirements.txt
- name: Pull data with DVC
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_KEY }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET }}
run: dvc pull
- name: Run training pipeline
run: dvc repro
env:
MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_URI }}
- name: Evaluate vs production
id: eval
run: |
NEW_AUC=$(python src/get_metric.py --metric roc_auc --stage latest)
PROD_AUC=$(python src/get_metric.py --metric roc_auc --stage Production)
echo "new_auc=$NEW_AUC" >> $GITHUB_OUTPUT
echo "prod_auc=$PROD_AUC" >> $GITHUB_OUTPUT
if python -c "exit(0 if $NEW_AUC > $PROD_AUC + 0.005 else 1)"; then
echo "promote=true" >> $GITHUB_OUTPUT
fi
- name: Promote model if better
if: steps.eval.outputs.promote == 'true'
run: python src/promote_model.py --stage Production
env:
MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_URI }}
- name: Deploy updated API
if: steps.eval.outputs.promote == 'true'
run: |
docker build -t cancer-api:${{ github.sha }} .
docker push registry.example.com/cancer-api:${{ github.sha }}