Python Scikit-learn: Machine Learning Pipelines

Scikit-learn is the standard Python library for classical machine learning — everything from linear regression and random forests to dimensionality reduction and clustering. Its Pipeline API chains preprocessing and model training into a single reusable object that prevents data leakage and simplifies deployment. This guide covers the complete ML workflow: data preparation, pipelines, cross-validation, hyperparameter tuning, evaluation, and serving predictions via FastAPI.

Data Preparation

pip install scikit-learn pandas numpy joblib
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import (
    StandardScaler, MinMaxScaler, OneHotEncoder, LabelEncoder,
    OrdinalEncoder, PowerTransformer
)
from sklearn.impute import SimpleImputer, KNNImputer

# Load and inspect data
df = pd.read_csv("churn.csv")
print(df.info())
print(df.describe())
print(df.isnull().sum())

# Split features and target
X = df.drop("churned", axis=1)
y = df["churned"]

# Train/test split — stratify for imbalanced classes
X_train, X_test, y_train, y_test = train_test_split(
    X, y,
    test_size=0.2,
    random_state=42,
    stratify=y,
)

# Identify column types
numeric_cols = X.select_dtypes(include=np.number).columns.tolist()
categorical_cols = X.select_dtypes(include="object").columns.tolist()

print(f"Numeric: {numeric_cols}")
print(f"Categorical: {categorical_cols}")

Scikit-learn Pipelines

A Pipeline chains transformers and an estimator. Calling fit() runs all steps sequentially; predict() transforms inputs through all preprocessing steps before prediction. This prevents data leakage — scalers and encoders are fit only on training data, never on the test set.

from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.ensemble import RandomForestClassifier

# Numeric pipeline: impute missing values → scale
numeric_pipeline = Pipeline([
    ("imputer", SimpleImputer(strategy="median")),
    ("scaler", StandardScaler()),
])

# Categorical pipeline: impute → one-hot encode
categorical_pipeline = Pipeline([
    ("imputer", SimpleImputer(strategy="most_frequent")),
    ("encoder", OneHotEncoder(handle_unknown="ignore", sparse_output=False)),
])

# Combine both pipelines
preprocessor = ColumnTransformer([
    ("num", numeric_pipeline, numeric_cols),
    ("cat", categorical_pipeline, categorical_cols),
])

# Full pipeline: preprocessor → model
full_pipeline = Pipeline([
    ("preprocessor", preprocessor),
    ("classifier", RandomForestClassifier(
        n_estimators=200,
        max_depth=10,
        class_weight="balanced",
        random_state=42,
        n_jobs=-1,
    )),
])

# Train
full_pipeline.fit(X_train, y_train)

# Predict — preprocessing happens automatically
predictions = full_pipeline.predict(X_test)
probabilities = full_pipeline.predict_proba(X_test)[:, 1]

Classification

from sklearn.ensemble import (
    RandomForestClassifier, GradientBoostingClassifier,
    VotingClassifier, StackingClassifier
)
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier

# Multiple algorithms — compare all
models = {
    "Logistic Regression": LogisticRegression(max_iter=1000, C=1.0),
    "Random Forest": RandomForestClassifier(n_estimators=100, random_state=42),
    "Gradient Boosting": GradientBoostingClassifier(n_estimators=100, random_state=42),
    "SVM": SVC(probability=True, kernel="rbf"),
}

from sklearn.model_selection import cross_val_score

for name, model in models.items():
    pipeline = Pipeline([("preprocessor", preprocessor), ("model", model)])
    scores = cross_val_score(pipeline, X_train, y_train,
                             cv=5, scoring="roc_auc", n_jobs=-1)
    print(f"{name}: {scores.mean():.3f} ± {scores.std():.3f}")

# Ensemble: Voting
voting = VotingClassifier(
    estimators=[
        ("lr", LogisticRegression(max_iter=1000)),
        ("rf", RandomForestClassifier(n_estimators=100, random_state=42)),
        ("gb", GradientBoostingClassifier(n_estimators=100, random_state=42)),
    ],
    voting="soft",
)

# Stacking
stacking = StackingClassifier(
    estimators=[
        ("rf", RandomForestClassifier(n_estimators=100, random_state=42)),
        ("gb", GradientBoostingClassifier(n_estimators=100, random_state=42)),
    ],
    final_estimator=LogisticRegression(),
    cv=5,
)

Regression

from sklearn.linear_model import LinearRegression, Ridge, Lasso, ElasticNet
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import numpy as np

# House price prediction example
reg_pipeline = Pipeline([
    ("preprocessor", preprocessor),
    ("regressor", GradientBoostingRegressor(
        n_estimators=300,
        learning_rate=0.05,
        max_depth=5,
        random_state=42,
    )),
])

reg_pipeline.fit(X_train, y_train)
y_pred = reg_pipeline.predict(X_test)

rmse = np.sqrt(mean_squared_error(y_test, y_pred))
mae = mean_absolute_error(y_test, y_pred)
r2 = r2_score(y_test, y_pred)
print(f"RMSE: {rmse:.2f}, MAE: {mae:.2f}, R²: {r2:.3f}")

# Feature importance (for tree models)
rf = reg_pipeline.named_steps["regressor"]
feature_names = (
    numeric_cols
    + list(reg_pipeline.named_steps["preprocessor"]
           .named_transformers_["cat"]
           .named_steps["encoder"]
           .get_feature_names_out(categorical_cols))
)
importances = pd.Series(rf.feature_importances_, index=feature_names)
print(importances.nlargest(10))

Evaluation and Cross-Validation

from sklearn.model_selection import (
    cross_validate, StratifiedKFold, learning_curve
)
from sklearn.metrics import (
    classification_report, confusion_matrix,
    roc_auc_score, average_precision_score,
    precision_recall_curve
)

# Multi-metric cross-validation
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
cv_results = cross_validate(
    full_pipeline, X_train, y_train,
    cv=cv,
    scoring=["accuracy", "roc_auc", "f1", "precision", "recall"],
    return_train_score=True,
    n_jobs=-1,
)

for metric in ["accuracy", "roc_auc", "f1"]:
    train_score = cv_results[f"train_{metric}"].mean()
    val_score = cv_results[f"test_{metric}"].mean()
    print(f"{metric:12s}: train={train_score:.3f}, val={val_score:.3f}")

# Full evaluation on test set
y_pred = full_pipeline.predict(X_test)
y_prob = full_pipeline.predict_proba(X_test)[:, 1]

print(classification_report(y_test, y_pred))
print(f"ROC-AUC: {roc_auc_score(y_test, y_prob):.3f}")
print(f"Avg Precision: {average_precision_score(y_test, y_prob):.3f}")

Hyperparameter Tuning

from sklearn.model_selection import GridSearchCV, RandomizedSearchCV
from scipy.stats import randint, uniform

# Grid search — exhaustive
param_grid = {
    "classifier__n_estimators": [100, 200, 300],
    "classifier__max_depth": [5, 10, None],
    "classifier__min_samples_split": [2, 5, 10],
}
grid_search = GridSearchCV(
    full_pipeline, param_grid,
    cv=5, scoring="roc_auc", n_jobs=-1, verbose=1,
)
grid_search.fit(X_train, y_train)
print(f"Best params: {grid_search.best_params_}")
print(f"Best ROC-AUC: {grid_search.best_score_:.3f}")

# Random search — faster for large spaces
param_distributions = {
    "classifier__n_estimators": randint(50, 500),
    "classifier__max_depth": randint(3, 20),
    "classifier__min_samples_split": randint(2, 20),
    "classifier__max_features": uniform(0.3, 0.7),
}
random_search = RandomizedSearchCV(
    full_pipeline, param_distributions,
    n_iter=50, cv=5, scoring="roc_auc",
    random_state=42, n_jobs=-1, verbose=1,
)
random_search.fit(X_train, y_train)
best_model = random_search.best_estimator_

Model Persistence

import joblib
from pathlib import Path

# Save the entire pipeline (preprocessor + model)
model_path = Path("models/churn_pipeline_v1.joblib")
model_path.parent.mkdir(exist_ok=True)
joblib.dump(best_model, model_path, compress=3)

# Load and use
loaded_pipeline = joblib.load(model_path)
predictions = loaded_pipeline.predict(X_test)

# Save with metadata
import json
metadata = {
    "model_version": "1.0.0",
    "trained_at": "2026-06-13",
    "roc_auc": float(roc_auc_score(y_test, loaded_pipeline.predict_proba(X_test)[:, 1])),
    "features": list(X_train.columns),
    "target": "churned",
}
Path("models/churn_pipeline_v1_metadata.json").write_text(json.dumps(metadata, indent=2))

FastAPI Prediction Endpoint

from fastapi import FastAPI
from pydantic import BaseModel
import joblib
import pandas as pd

# Load model at startup
pipeline = joblib.load("models/churn_pipeline_v1.joblib")

app = FastAPI(title="Churn Prediction API")

class CustomerFeatures(BaseModel):
    account_age_days: int
    monthly_charges: float
    total_charges: float
    contract_type: str
    payment_method: str
    num_products: int
    has_tech_support: bool
    has_streaming: bool

class PredictionResponse(BaseModel):
    churned: bool
    churn_probability: float
    risk_level: str

@app.post("/predict/churn", response_model=PredictionResponse)
async def predict_churn(features: CustomerFeatures):
    df = pd.DataFrame([features.model_dump()])
    prob = pipeline.predict_proba(df)[0, 1]
    churned = prob >= 0.5
    risk = "high" if prob > 0.7 else "medium" if prob > 0.4 else "low"
    return PredictionResponse(
        churned=churned,
        churn_probability=round(float(prob), 4),
        risk_level=risk,
    )

@app.post("/predict/batch")
async def batch_predict(customers: list[CustomerFeatures]):
    df = pd.DataFrame([c.model_dump() for c in customers])
    probs = pipeline.predict_proba(df)[:, 1]
    return [{"churn_probability": round(float(p), 4)} for p in probs]

Frequently Asked Questions

Scikit-learn vs XGBoost vs LightGBM — which to choose?
Scikit-learn's gradient boosting is easy to use and well-integrated. XGBoost and LightGBM are generally faster and more accurate for tabular data — LightGBM is especially fast for large datasets. In competitions and production, LightGBM or XGBoost with Scikit-learn wrappers is the standard choice for structured data.
How do I handle imbalanced classes?
Use class_weight="balanced" on classifiers that support it, or adjust the decision threshold instead of resampling. The imbalanced-learn library provides SMOTE and other sampling techniques as Pipeline-compatible transformers. Always evaluate with precision-recall AUC, not just accuracy.
Can I use GPU acceleration with Scikit-learn?
Scikit-learn itself is CPU-only. For GPU-accelerated equivalents, use RAPIDS cuML — it implements most sklearn estimators on GPU with an identical API. For neural networks on tabular data, use PyTorch Lightning with a tabular-focused library like FastAI or PyTorch Tabular.