
Building a machine learning model for healthcare is not the hard part. The hard part is building the pipeline around the model -- the infrastructure that extracts clinical data from FHIR APIs, de-identifies it for training, engineers clinically meaningful features, tracks experiments reproducibly, validates results against clinical thresholds, packages the model for deployment, serves it in production with health checks, monitors for drift, and maintains an audit trail that satisfies both HIPAA and FDA requirements.
Most healthcare ML tutorials skip from "load dataset" to "train model" and call it done. But in production healthcare AI, the model is perhaps 10% of the system. The other 90% is data engineering, compliance infrastructure, deployment automation, and monitoring -- the pipeline that makes the model safe, reliable, and maintainable.
This guide walks through all nine stages of a production healthcare ML pipeline, from FHIR Bulk $export to production monitoring, with Python code for each stage. Every stage is designed to be HIPAA-compliant and to produce the audit artifacts required for regulatory oversight.
Stage 1: Data Extraction -- FHIR Bulk $export

The pipeline starts with clinical data extraction. FHIR R4's Bulk Data Access ($export) operation is the standard mechanism for extracting large datasets from EHR systems. It produces NDJSON (newline-delimited JSON) files, with each line containing a complete FHIR resource.
import requests
import time
import ndjson
import json
import logging
from typing import List, Dict, Optional
from pathlib import Path
from datetime import datetime
logger = logging.getLogger("fhir_extract")
class FHIRBulkExporter:
"""
FHIR Bulk Data $export client for ML pipeline data extraction.
Produces NDJSON files with full audit logging.
"""
def __init__(self, fhir_url: str, client_id: str,
client_secret: str, output_dir: str):
self.fhir_url = fhir_url.rstrip("/")
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
self.token = self._authenticate(client_id, client_secret)
self.audit_log = []
def _authenticate(self, client_id: str, client_secret: str) -> str:
"""OAuth2 client credentials for SMART Backend Services."""
# In production, use signed JWT assertion (SMART Backend Services)
resp = requests.post(
f"{self.fhir_url}/auth/token",
data={"grant_type": "client_credentials",
"client_id": client_id,
"client_secret": client_secret}
)
resp.raise_for_status()
return resp.json()["access_token"]
def kick_off_export(self, resource_types: List[str],
since: Optional[str] = None) -> str:
"""Initiate Bulk $export and return polling URL."""
headers = {
"Authorization": f"Bearer {self.token}",
"Accept": "application/fhir+json",
"Prefer": "respond-async"
}
params = {"_type": ",".join(resource_types)}
if since:
params["_since"] = since
resp = requests.get(
f"{self.fhir_url}/$export",
headers=headers, params=params
)
resp.raise_for_status()
polling_url = resp.headers["Content-Location"]
self.audit_log.append({
"action": "export_initiated",
"timestamp": datetime.utcnow().isoformat(),
"resource_types": resource_types,
"since": since,
"polling_url": polling_url
})
return polling_url
def poll_until_complete(self, polling_url: str,
max_wait: int = 3600) -> List[Dict]:
"""Poll export status until completion."""
headers = {"Authorization": f"Bearer {self.token}"}
start = time.time()
while time.time() - start < max_wait:
resp = requests.get(polling_url, headers=headers)
if resp.status_code == 200:
manifest = resp.json()
self.audit_log.append({
"action": "export_completed",
"timestamp": datetime.utcnow().isoformat(),
"file_count": len(manifest.get("output", []))
})
return manifest.get("output", [])
elif resp.status_code == 202:
progress = resp.headers.get("X-Progress", "in-progress")
logger.info(f"Export in progress: {progress}")
time.sleep(10)
else:
raise RuntimeError(f"Export failed: {resp.status_code}")
raise TimeoutError("Export did not complete within time limit")
def download_files(self, output_files: List[Dict]) -> Dict[str, Path]:
"""Download NDJSON files from export manifest."""
headers = {"Authorization": f"Bearer {self.token}"}
downloaded = {}
for file_info in output_files:
resource_type = file_info["type"]
url = file_info["url"]
local_path = self.output_dir / f"{resource_type}.ndjson"
resp = requests.get(url, headers=headers, stream=True)
resp.raise_for_status()
line_count = 0
with open(local_path, "w") as f:
for line in resp.iter_lines(decode_unicode=True):
if line:
f.write(line + "\n")
line_count += 1
downloaded[resource_type] = local_path
logger.info(f"Downloaded {resource_type}: {line_count} resources")
self.audit_log.append({
"action": "file_downloaded",
"resource_type": resource_type,
"record_count": line_count,
"local_path": str(local_path)
})
return downloadedStage 2: De-Identification -- Safe Harbor Automated

Before any data leaves the HIPAA-covered environment for ML training, it must be de-identified. The Safe Harbor method removes 18 categories of identifiers defined in the HIPAA Privacy Rule. Here is an automated implementation for FHIR resources.
import hashlib
import json
import re
from typing import Dict, List, Any
from datetime import datetime
class SafeHarborDeidentifier:
"""
HIPAA Safe Harbor de-identification for FHIR resources.
Removes all 18 identifier categories defined in 45 CFR 164.514(b)(2).
"""
# Safe Harbor: 18 identifier categories
SAFE_HARBOR_CATEGORIES = [
"names", "geographic_data", "dates", "phone_numbers",
"fax_numbers", "email_addresses", "ssn", "mrn",
"health_plan_numbers", "account_numbers", "license_numbers",
"vehicle_identifiers", "device_identifiers", "web_urls",
"ip_addresses", "biometric_ids", "photos", "other_unique_ids"
]
def __init__(self, salt: str):
"""Salt for consistent pseudonymization across resources."""
self.salt = salt
self.id_map = {} # Original -> pseudonym mapping
self.stats = {cat: 0 for cat in self.SAFE_HARBOR_CATEGORIES}
def _pseudonymize(self, original_id: str) -> str:
"""Create consistent pseudonym for linkage preservation."""
if original_id not in self.id_map:
hashed = hashlib.sha256(
f"{self.salt}:{original_id}".encode()
).hexdigest()[:16]
self.id_map[original_id] = hashed
return self.id_map[original_id]
def deidentify_patient(self, patient: Dict) -> Dict:
"""De-identify a FHIR Patient resource."""
deid = patient.copy()
# 1. Pseudonymize ID (preserve for linkage)
deid["id"] = self._pseudonymize(deid["id"])
self.stats["other_unique_ids"] += 1
# 2. Remove names
deid.pop("name", None)
self.stats["names"] += 1
# 3. Remove telecom (phone, fax, email)
deid.pop("telecom", None)
self.stats["phone_numbers"] += 1
# 4. Remove address (keep state only if needed)
if "address" in deid:
for addr in deid.get("address", []):
addr.pop("line", None)
addr.pop("city", None)
addr.pop("district", None)
# Zip: keep first 3 digits only if population > 20,000
if "postalCode" in addr:
zip3 = addr["postalCode"][:3]
# List of restricted 3-digit zip prefixes
restricted = ["036", "059", "063", "102",
"203", "556", "692", "790",
"821", "823", "830", "831",
"878", "879", "884", "890",
"893"]
addr["postalCode"] = "000" if zip3 in restricted else zip3
self.stats["geographic_data"] += 1
# 5. Generalize dates (year only for Safe Harbor)
if "birthDate" in deid:
birth_year = int(deid["birthDate"][:4])
current_year = datetime.now().year
age = current_year - birth_year
# Safe Harbor: ages over 89 must be aggregated
if age > 89:
deid["birthDate"] = "1900" # Grouped as 90+
else:
deid["birthDate"] = str(birth_year)
self.stats["dates"] += 1
# 6. Remove identifiers
deid.pop("identifier", None)
self.stats["mrn"] += 1
# 7. Remove photo
deid.pop("photo", None)
self.stats["photos"] += 1
return deid
def deidentify_resource(self, resource: Dict) -> Dict:
"""De-identify any FHIR resource by pseudonymizing references."""
deid = json.loads(json.dumps(resource)) # Deep copy
# Pseudonymize the resource ID
if "id" in deid:
deid["id"] = self._pseudonymize(deid["id"])
# Pseudonymize all references
self._walk_and_pseudonymize_refs(deid)
# Remove text narratives (may contain PHI)
deid.pop("text", None)
# Generalize dates to month precision
self._generalize_dates(deid)
return deid
def _walk_and_pseudonymize_refs(self, obj: Any):
"""Recursively find and pseudonymize FHIR references."""
if isinstance(obj, dict):
if "reference" in obj and isinstance(obj["reference"], str):
parts = obj["reference"].split("/")
if len(parts) == 2:
parts[1] = self._pseudonymize(parts[1])
obj["reference"] = "/".join(parts)
for value in obj.values():
self._walk_and_pseudonymize_refs(value)
elif isinstance(obj, list):
for item in obj:
self._walk_and_pseudonymize_refs(item)Stage 3: Feature Engineering -- FHIR Resources to Tabular Features

Feature engineering is where clinical domain knowledge meets data science. FHIR resources are hierarchical JSON documents -- not the tabular format that most ML models expect. This stage transforms FHIR resources into flat feature vectors while preserving clinical meaning.
import pandas as pd
import numpy as np
from typing import Dict, List, Optional
from datetime import datetime, timedelta
from collections import Counter
class ClinicalFeatureEngineer:
"""
Transform FHIR resources into ML-ready feature vectors.
Handles time-windowed aggregations, categorical encoding,
and missing value imputation for clinical data.
"""
# LOINC codes for common lab tests
LAB_CODES = {
"2160-0": "creatinine",
"2823-3": "potassium",
"2951-2": "sodium",
"6690-2": "wbc",
"718-7": "hemoglobin",
"4548-4": "hba1c",
"2345-7": "glucose",
"1742-6": "alt",
"1920-8": "ast",
"32623-1": "platelet_count"
}
# Vital sign LOINC codes
VITAL_CODES = {
"8867-4": "heart_rate",
"8480-6": "systolic_bp",
"8462-4": "diastolic_bp",
"8310-5": "temperature",
"9279-1": "respiratory_rate",
"2708-6": "spo2"
}
def __init__(self, reference_date: Optional[str] = None):
self.reference_date = (
datetime.fromisoformat(reference_date)
if reference_date
else datetime.utcnow()
)
def build_features(self, patient: Dict,
conditions: List[Dict],
observations: List[Dict],
medications: List[Dict],
encounters: List[Dict]) -> Dict:
"""Build complete feature vector from FHIR resources."""
features = {}
# Demographics
features.update(self._demographic_features(patient))
# Diagnosis features
features.update(self._condition_features(conditions))
# Lab and vital sign features
features.update(self._observation_features(observations))
# Medication features
features.update(self._medication_features(medications))
# Utilization features
features.update(self._encounter_features(encounters))
return features
def _demographic_features(self, patient: Dict) -> Dict:
"""Extract demographic features from Patient resource."""
features = {}
# Age bucket (de-identified)
birth_year = patient.get("birthDate", "")[:4]
if birth_year:
age = self.reference_date.year - int(birth_year)
features["age_bucket"] = self._age_to_bucket(age)
features["age_numeric"] = min(age, 90) # Cap at 90
# Sex (binary for model, preserve full coding in metadata)
features["sex_male"] = 1 if patient.get("gender") == "male" else 0
return features
def _condition_features(self, conditions: List[Dict]) -> Dict:
"""Extract diagnosis features from Condition resources."""
features = {}
active = [c for c in conditions
if c.get("clinicalStatus", {}).get(
"coding", [{}])[0].get("code") == "active"]
features["active_condition_count"] = len(active)
features["total_condition_count"] = len(conditions)
# Chronic disease flags via ICD-10 prefix matching
icd_codes = []
for c in conditions:
for coding in c.get("code", {}).get("coding", []):
if coding.get("system", "").endswith("icd-10-cm"):
icd_codes.append(coding.get("code", ""))
features["has_diabetes"] = int(any(c.startswith("E11") for c in icd_codes))
features["has_hypertension"] = int(any(c.startswith("I10") for c in icd_codes))
features["has_ckd"] = int(any(c.startswith("N18") for c in icd_codes))
features["has_chf"] = int(any(c.startswith("I50") for c in icd_codes))
features["has_copd"] = int(any(c.startswith("J44") for c in icd_codes))
return features
def _observation_features(self, observations: List[Dict]) -> Dict:
"""Extract lab and vital sign features from Observations."""
features = {}
# Group observations by LOINC code
lab_values = {name: [] for name in self.LAB_CODES.values()}
vital_values = {name: [] for name in self.VITAL_CODES.values()}
for obs in observations:
code = obs.get("code", {}).get("coding", [{}])[0].get("code", "")
value = obs.get("valueQuantity", {}).get("value")
if value is None:
continue
if code in self.LAB_CODES:
lab_values[self.LAB_CODES[code]].append(float(value))
elif code in self.VITAL_CODES:
vital_values[self.VITAL_CODES[code]].append(float(value))
# Latest lab values and trends
for name, values in lab_values.items():
if values:
features[f"lab_{name}_latest"] = values[-1]
features[f"lab_{name}_mean"] = np.mean(values)
if len(values) >= 3:
# Linear slope for trend detection
x = np.arange(len(values))
slope = np.polyfit(x, values, 1)[0]
features[f"lab_{name}_slope"] = slope
# Latest vital signs
for name, values in vital_values.items():
if values:
features[f"vital_{name}_latest"] = values[-1]
features[f"vital_{name}_mean"] = np.mean(values)
features[f"vital_{name}_std"] = (
np.std(values) if len(values) > 1 else 0.0
)
return features
def _medication_features(self, medications: List[Dict]) -> Dict:
"""Extract medication features from MedicationRequest."""
features = {}
active = [m for m in medications
if m.get("status") == "active"]
features["active_medication_count"] = len(active)
features["polypharmacy"] = int(len(active) >= 5)
features["high_polypharmacy"] = int(len(active) >= 10)
return features
def _encounter_features(self, encounters: List[Dict]) -> Dict:
"""Extract utilization features from Encounter resources."""
features = {}
# Count encounters by type in last 6 months
six_months_ago = self.reference_date - timedelta(days=180)
recent = []
for enc in encounters:
start = enc.get("period", {}).get("start", "")
if start and datetime.fromisoformat(
start.replace("Z", "+00:00")
).replace(tzinfo=None) > six_months_ago:
recent.append(enc)
features["encounters_6mo"] = len(recent)
enc_classes = [e.get("class", {}).get("code", "") for e in recent]
features["ed_visits_6mo"] = enc_classes.count("EMER")
features["inpatient_stays_6mo"] = enc_classes.count("IMP")
return features
def _age_to_bucket(self, age: int) -> int:
"""Convert age to de-identified bucket."""
buckets = [(18, 0), (30, 1), (45, 2), (55, 3),
(65, 4), (75, 5), (85, 6)]
for threshold, bucket in reversed(buckets):
if age >= threshold:
return bucket
return 0Stage 4: Model Training with MLflow Experiment Tracking

Every training run must be fully reproducible and auditable. This is not just best practice -- it is a regulatory requirement if your model is classified as a Software as a Medical Device. MLflow provides experiment tracking, model versioning, and artifact storage in a single platform.
import mlflow
import mlflow.sklearn
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import roc_auc_score, precision_recall_curve
import numpy as np
import json
import hashlib
def train_clinical_model(X_train, y_train, X_val, y_val,
experiment_name: str,
data_version: str,
feature_names: list):
"""
Train clinical prediction model with full MLflow tracking.
Every parameter, metric, and artifact is logged for audit.
"""
mlflow.set_tracking_uri("https://mlflow.internal.hospital.org")
mlflow.set_experiment(experiment_name)
with mlflow.start_run() as run:
# Log data lineage
mlflow.log_param("data_version", data_version)
mlflow.log_param("data_hash", hashlib.sha256(
X_train.tobytes()).hexdigest()[:16])
mlflow.log_param("train_samples", len(X_train))
mlflow.log_param("val_samples", len(X_val))
mlflow.log_param("positive_rate_train", float(y_train.mean()))
mlflow.log_param("positive_rate_val", float(y_val.mean()))
mlflow.log_param("feature_count", len(feature_names))
# Log feature names as artifact
with open("/tmp/feature_names.json", "w") as f:
json.dump(feature_names, f)
mlflow.log_artifact("/tmp/feature_names.json")
# Model training
model = GradientBoostingClassifier(
n_estimators=500,
max_depth=6,
learning_rate=0.05,
subsample=0.8,
min_samples_leaf=50,
random_state=42
)
# Log all hyperparameters
mlflow.log_params(model.get_params())
# Cross-validation for robust estimation
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
cv_scores = []
for fold, (train_idx, val_idx) in enumerate(cv.split(X_train, y_train)):
model_cv = GradientBoostingClassifier(**model.get_params())
model_cv.fit(X_train[train_idx], y_train[train_idx])
score = roc_auc_score(
y_train[val_idx],
model_cv.predict_proba(X_train[val_idx])[:, 1]
)
cv_scores.append(score)
mlflow.log_metric(f"cv_auroc_fold_{fold}", score)
mlflow.log_metric("cv_auroc_mean", np.mean(cv_scores))
mlflow.log_metric("cv_auroc_std", np.std(cv_scores))
# Train final model on full training set
model.fit(X_train, y_train)
# Validation metrics
y_prob = model.predict_proba(X_val)[:, 1]
auroc = roc_auc_score(y_val, y_prob)
mlflow.log_metric("val_auroc", auroc)
# Clinical operating point analysis
precisions, recalls, thresholds = precision_recall_curve(y_val, y_prob)
# Find threshold for 90% sensitivity
for t in np.arange(0.1, 0.9, 0.01):
preds = (y_prob >= t).astype(int)
sens = (preds[y_val == 1] == 1).mean()
spec = (preds[y_val == 0] == 0).mean()
if sens >= 0.90:
mlflow.log_metric("threshold_90sens", t)
mlflow.log_metric("specificity_at_90sens", spec)
break
# Log model
mlflow.sklearn.log_model(
model, "model",
registered_model_name=experiment_name
)
return model, run.info.run_idStage 5: Clinical Validation
Statistical validation (AUROC, sensitivity) is necessary but not sufficient. Clinical validation asks whether the model's outputs are clinically actionable, whether the operating threshold is appropriate for the care setting, and whether performance is equitable across patient subgroups. See our MLOps for Healthcare guide for the full clinical validation framework.
| Validation Dimension | Metric | Threshold | Rationale |
|---|---|---|---|
| Discrimination | AUROC | >= 0.85 | Minimum for clinical utility in high-stakes prediction |
| Sensitivity | Sensitivity at 90% specificity | >= 0.70 | Balance between catching cases and alert fatigue |
| Calibration | Calibration slope | 0.8 - 1.2 | Predictions match observed frequencies |
| Fairness | AUROC disparity across groups | Less than 0.05 | Equitable performance across demographics |
| Stability | CV AUROC standard deviation | Less than 0.03 | Model is not sensitive to training data composition |
Stage 6: Model Packaging -- Docker Container with Model Card

Production models are packaged as Docker containers with the model artifact, inference code, health check endpoints, and a model card documenting the model's characteristics, limitations, and intended use.
# Dockerfile for clinical model serving
FROM python:3.11-slim
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy model artifacts
COPY model/ /app/model/
COPY inference.py /app/
COPY model_card.json /app/
COPY health_check.py /app/
WORKDIR /app
# Health check endpoint
HEALTHCHECK --interval=30s --timeout=10s --retries=3 \
CMD python health_check.py
# Serve model on port 8080
EXPOSE 8080
CMD ["python", "inference.py", "--port", "8080"]# inference.py - Production model serving with audit logging
from flask import Flask, request, jsonify
import joblib
import numpy as np
import json
import logging
from datetime import datetime
import uuid
app = Flask(__name__)
model = joblib.load("model/sepsis_model.pkl")
with open("model_card.json") as f:
model_card = json.load(f)
logger = logging.getLogger("inference")
@app.route("/predict", methods=["POST"])
def predict():
"""Run inference with full audit trail."""
prediction_id = str(uuid.uuid4())
start_time = datetime.utcnow()
features = request.json.get("features", [])
X = np.array(features).reshape(1, -1)
probability = float(model.predict_proba(X)[0][1])
threshold = model_card.get("operating_threshold", 0.5)
alert = probability >= threshold
# Audit log (no PHI - just prediction metadata)
logger.info(json.dumps({
"prediction_id": prediction_id,
"timestamp": start_time.isoformat(),
"model_version": model_card["version"],
"probability": round(probability, 4),
"alert": alert,
"latency_ms": (
datetime.utcnow() - start_time
).total_seconds() * 1000
}))
return jsonify({
"prediction_id": prediction_id,
"probability": round(probability, 4),
"alert": alert,
"model_version": model_card["version"],
"threshold": threshold
})
@app.route("/health", methods=["GET"])
def health():
return jsonify({"status": "healthy",
"model_version": model_card["version"]})
@app.route("/model-card", methods=["GET"])
def get_model_card():
return jsonify(model_card)Stage 7: Deployment -- Kubernetes with Health Checks

Production clinical models run on Kubernetes, which provides scaling, health checks, rolling updates, and A/B routing for canary deployments. The deployment follows the shadow-canary-production progression described in our MLOps guide. For latency-critical applications, consider edge deployment as an alternative or complement to cloud-based Kubernetes serving.
# kubernetes/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: sepsis-model-v3
labels:
app: sepsis-predictor
version: v3.1.0
spec:
replicas: 3
selector:
matchLabels:
app: sepsis-predictor
template:
metadata:
labels:
app: sepsis-predictor
version: v3.1.0
spec:
containers:
- name: model-server
image: registry.hospital.org/sepsis-model:v3.1.0
ports:
- containerPort: 8080
resources:
requests:
memory: "512Mi"
cpu: "500m"
limits:
memory: "1Gi"
cpu: "1000m"
livenessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 10
periodSeconds: 30
readinessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 5
periodSeconds: 10
env:
- name: MODEL_VERSION
value: "v3.1.0"
- name: LOG_LEVEL
value: "INFO"Stage 8: Production Monitoring -- Drift and Accuracy
Production monitoring tracks three dimensions continuously: data drift (are input distributions changing?), model performance (is accuracy degrading?), and operational health (are latency and error rates acceptable?). See our dedicated guide on model drift detection in clinical AI for the complete monitoring framework with Evidently AI implementation code.
Stage 9: The Audit Trail -- Connecting Every Stage

The audit trail is the thread connecting all nine stages. It answers the question: for any prediction the model makes in production, can you trace back to the exact training data, the exact model version, the validation results, and the deployment approval? This is required for FDA compliance and is essential for investigating adverse events.
from dataclasses import dataclass, field
from typing import List, Dict, Optional
from datetime import datetime
import json
import hashlib
@dataclass
class PipelineAuditTrail:
"""Complete audit trail for a healthcare ML pipeline run."""
# Pipeline identification
pipeline_run_id: str
pipeline_version: str
initiated_by: str
initiated_at: str = field(
default_factory=lambda: datetime.utcnow().isoformat()
)
# Stage 1: Data extraction
data_source: str = ""
export_timestamp: str = ""
resource_types: List[str] = field(default_factory=list)
raw_record_counts: Dict[str, int] = field(default_factory=dict)
# Stage 2: De-identification
deident_method: str = "safe_harbor"
deident_stats: Dict[str, int] = field(default_factory=dict)
deident_verification: str = "" # Attestation or verification ID
# Stage 3: Feature engineering
feature_version: str = ""
feature_count: int = 0
feature_names: List[str] = field(default_factory=list)
dataset_hash: str = "" # SHA-256 of final feature matrix
train_size: int = 0
val_size: int = 0
test_size: int = 0
# Stage 4: Training
mlflow_run_id: str = ""
mlflow_experiment: str = ""
model_architecture: str = ""
hyperparameters: Dict = field(default_factory=dict)
training_duration_minutes: float = 0.0
# Stage 5: Validation
validation_metrics: Dict[str, float] = field(default_factory=dict)
fairness_metrics: Dict[str, Dict[str, float]] = field(default_factory=dict)
clinical_review_status: str = "" # pending, approved, rejected
clinical_reviewers: List[str] = field(default_factory=list)
# Stage 6: Packaging
docker_image: str = ""
docker_digest: str = "" # SHA-256 of container image
model_card_version: str = ""
# Stage 7: Deployment
deployment_environment: str = "" # shadow, canary, production
deployment_timestamp: str = ""
kubernetes_namespace: str = ""
# Stage 8: Monitoring config
monitoring_enabled: bool = False
drift_detection_method: str = ""
alert_channels: List[str] = field(default_factory=list)
def generate_hash(self) -> str:
"""Generate tamper-evident hash of the entire audit trail."""
content = json.dumps(self.__dict__, sort_keys=True, default=str)
return hashlib.sha256(content.encode()).hexdigest()
def save(self, path: str):
"""Save audit trail with integrity hash."""
record = self.__dict__.copy()
record["integrity_hash"] = self.generate_hash()
with open(path, "w") as f:
json.dump(record, f, indent=2, default=str)The audit trail should be immutable once finalized. Store it in a write-once system (S3 with Object Lock, or a dedicated audit database with append-only permissions). Every pipeline run generates a new audit record. For models under FDA oversight, these records form the post-market surveillance documentation.
Frequently Asked Questions
How long does it take to build this pipeline from scratch?
For a well-resourced team (2-3 ML engineers, 1 data engineer, clinical informatics support), expect 4-6 months to build a production pipeline for a single model. The first model takes the longest because you are building shared infrastructure (FHIR extraction, de-identification, experiment tracking, deployment platform) that subsequent models will reuse. The second model through the same pipeline typically takes 4-8 weeks.
Can we use cloud services or must everything be on-premise?
Cloud services are viable for healthcare ML pipelines as long as the cloud provider offers HIPAA-eligible services with a Business Associate Agreement. AWS, GCP, and Azure all offer HIPAA-eligible AI/ML services. The key requirement is that all services processing PHI (data storage, training compute, model serving) must be covered by the BAA. Many organizations use a hybrid approach: sensitive data stays on-premise, de-identified data goes to the cloud for training.
What if our EHR does not support FHIR Bulk $export?
Many EHR systems still use HL7v2, CDA, or proprietary data exports. In this case, add a data translation stage before Stage 1: convert HL7v2 messages or CDA documents to FHIR resources using a transformation engine (Mirth Connect, HAPI FHIR, or a custom mapper). The rest of the pipeline remains the same -- it operates on FHIR resources regardless of the original source format. See our guide on healthcare integration with Mirth and Kafka for data transformation patterns.
How do we handle missing data in clinical features?
Missing data is pervasive in clinical datasets -- lab tests are only ordered when clinically indicated, vital signs may be recorded sporadically, and documentation practices vary across clinicians. Three approaches work in practice: (1) indicator features (add a binary "is_missing" feature for each potentially-missing variable), (2) clinically-informed imputation (use domain knowledge -- a missing creatinine in a non-renal patient is likely normal, so impute with population median), (3) models that handle missing data natively (gradient boosting handles missing values without imputation, which is one reason it is the most popular architecture for clinical tabular data).
How do we ensure reproducibility across pipeline runs?
Five practices ensure reproducibility: (1) version all code with git tags matching pipeline runs, (2) hash all input datasets and log hashes in the audit trail, (3) fix random seeds for training and data splitting, (4) pin all dependency versions in requirements files, (5) use Docker containers for training and inference to lock the full environment. MLflow tracks parameters 1-4 automatically; Docker addresses 5. The audit trail (Stage 9) ties everything together into a single verifiable record.
What is the minimum viable pipeline for a first healthcare ML project?
If building the full nine-stage pipeline is impractical for your first project, prioritize these five stages: (1) Data extraction with documented lineage, (2) De-identification (non-negotiable for HIPAA), (3) Training with experiment tracking (MLflow), (4) Validation with clinical metrics (not just AUROC), (5) Monitoring in production (at minimum, track prediction volume and distribution). Add the remaining stages (feature store, containerized deployment, full audit trail, automated retraining) as the program matures.



