Continuous Integration and Continuous Deployment CICD for Machine Learning
Continuous Integration and Continuous Deployment CICD for Machine Learning is where machine learning grows up. A model that only runs on a data scientist's laptop isn't a product; MLOps turns experimental code into reliable, observable, updatable systems that serve users in production.
Why Continuous Integration Continuous Matters
The hardest bugs in ML systems are almost never in the model — they live in data pipelines, feature stores, deployments and monitors. Strong MLOps practice is what keeps real-world systems honest.
- Version data, features, code and models together.
- Automate the path from commit to production with CI/CD.
- Monitor data drift as aggressively as you monitor model accuracy.
- Design for rollback — every deployment is a hypothesis.
How Continuous Integration Continuous Shows Up in Practice
In a typical project, continuous integration and continuous deployment cicd for machine learning is combined with the rest of the MLOps & Deployment toolkit. You rarely use any one technique in isolation; the real skill is knowing which combination fits the problem you are trying to solve, and being able to explain that choice to a non-technical stakeholder.
Non-negotiable once a model leaves the notebook and influences real users, money or safety-critical processes.
- Privacy-preserving Machine Learning and Federated Learning
- Design Implementation ETL ELT Data Pipelines
- A Formal Treatment of Lambda and
- Workflow Orchestration and Management with Apache
Back to the Data Science curriculum →
Code Examples: Continuous Integration Continuous Deployment CICD Machine (5 runnable snippets)
Copy any block into a file or notebook and run it end-to-end — each example stands alone.
Example 1: Population stability and drift monitor
# Example 1: Population stability and drift monitor -- Continuous Integration Continuous Deployment CICD Machine
import numpy as np
from scipy.stats import ks_2samp
def psi(expected, actual, bins: int = 10) -> float:
breaks = np.quantile(expected, np.linspace(0, 1, bins + 1))
e, _ = np.histogram(expected, breaks)
a, _ = np.histogram(actual, breaks)
e, a = e / e.sum(), a / a.sum()
mask = (e > 0) & (a > 0)
return float(((a[mask] - e[mask]) * np.log(a[mask] / e[mask])).sum())
rng = np.random.default_rng(0)
baseline = rng.normal(0.0, 1.0, 5_000)
production = rng.normal(0.3, 1.1, 5_000)
print(f"PSI = {psi(baseline, production):.3f} (>0.25 = significant drift)")
ks = ks_2samp(baseline, production)
print(f"KS statistic = {ks.statistic:.3f}")
print(f"KS p-value = {ks.pvalue:.4f}")
Example 2: Airflow DAG for a batch feature pipeline
# Example 2: Airflow DAG for a batch feature pipeline -- Continuous Integration Continuous Deployment CICD Machine
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
def extract(**ctx): ...
def transform(**ctx): ...
def load(**ctx): ...
default_args = {
"owner": "data-platform",
"retries": 2,
"retry_delay": timedelta(minutes=5),
"email_on_failure": True,
}
with DAG(
dag_id="daily_customer_features",
start_date=datetime(2026, 1, 1),
schedule_interval="0 3 * * *",
catchup=False,
default_args=default_args,
tags=["features", "batch"],
) as dag:
t1 = PythonOperator(task_id="extract", python_callable=extract)
t2 = PythonOperator(task_id="transform", python_callable=transform)
t3 = PythonOperator(task_id="load", python_callable=load)
t1 >> t2 >> t3
Example 3: Shadow-deployment traffic splitter
# Example 3: Shadow-deployment traffic splitter -- Continuous Integration Continuous Deployment CICD Machine
import random, time, statistics
def call_champion(x): time.sleep(0.002); return x * 1.03
def call_challenger(x):time.sleep(0.003); return x * 1.05
def route(x, shadow_pct: float = 0.10):
prod = call_champion(x)
if random.random() < shadow_pct:
t0 = time.perf_counter()
shadow = call_challenger(x)
shadow_latency = (time.perf_counter() - t0) * 1_000
diffs.append(shadow - prod)
latencies.append(shadow_latency)
return prod
diffs, latencies = [], []
for _ in range(1_000):
route(random.uniform(10, 100))
print(f"shadow calls : {len(diffs)}")
print(f"mean output delta : {statistics.mean(diffs):+.3f}")
print(f"p95 shadow latency : {sorted(latencies)[int(len(latencies)*0.95)]:.2f} ms")
Example 4: FastAPI model-serving microservice
# Example 4: FastAPI model-serving microservice -- Continuous Integration Continuous Deployment CICD Machine
from fastapi import FastAPI
from pydantic import BaseModel
import joblib, numpy as np
model = joblib.load("model.joblib") # sklearn Pipeline persisted at build time
app = FastAPI(title="Credit risk API", version="1.0")
class LoanRequest(BaseModel):
income: float
debt: float
age: int
history_score: float
@app.post("/predict")
def predict(req: LoanRequest) -> dict:
x = np.array([[req.income, req.debt, req.age, req.history_score]])
proba = float(model.predict_proba(x)[0, 1])
return {"default_probability": round(proba, 4),
"decision": "decline" if proba > 0.3 else "approve"}
# run with: uvicorn service:app --host 0.0.0.0 --port 8080
Example 5: MLflow experiment tracking across a sweep
# Example 5: MLflow experiment tracking across a sweep -- Continuous Integration Continuous Deployment CICD Machine
import mlflow
from sklearn.datasets import load_iris
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score
X, y = load_iris(return_X_y=True)
mlflow.set_experiment("iris-rf-sweep")
for n in [50, 100, 200]:
for depth in [None, 4, 8]:
with mlflow.start_run(run_name=f"rf-{n}-{depth}"):
mlflow.log_params({"n_estimators": n, "max_depth": depth})
clf = RandomForestClassifier(n_estimators=n, max_depth=depth,
random_state=0)
score = cross_val_score(clf, X, y, cv=5).mean()
mlflow.log_metric("cv_accuracy", score)
clf.fit(X, y)
mlflow.sklearn.log_model(clf, artifact_path="model")