Most healthcare data warehouses still rely on full daily reloads: every night, an ETL job copies the entire patient table, the entire encounter table, and the entire observation table from the EHR database into the warehouse. For a mid-size health system with 500 million observation records, this nightly reload takes 4-6 hours, costs hundreds of dollars in compute, and means clinical dashboards show data that is up to 24 hours stale.
Change Data Capture (CDC) eliminates this waste by capturing only the rows that actually changed since the last sync. Instead of copying 500 million records, you process only the 50,000 that were inserted, updated, or deleted. The result: near-real-time data freshness, 95% lower compute costs, and minimal load on the source EHR database. This guide covers both database-level CDC with Debezium and application-level CDC with FHIR Subscriptions, with production code for each approach.

Why CDC for Healthcare Data Warehouses
The case for CDC in healthcare is both technical and clinical. Stale data in healthcare is not just an inconvenience; it can directly impact patient safety. A clinical dashboard showing yesterday's lab results when today's critical values exist is a liability.
Full Reload vs CDC: The Numbers
| Metric | Full Daily Reload | CDC (Debezium) | Improvement |
|---|---|---|---|
| Records processed/day | 500M (full copy) | ~50K (changes only) | 10,000x reduction |
| Data freshness | 12-24 hours stale | <5 seconds | Near real-time |
| Source database impact | High (full table scan) | Zero (reads WAL only) | No production impact |
| Pipeline runtime | 4-6 hours | Continuous streaming | No batch window needed |
| Compute cost | ~$200/day | ~$10/day | 95% reduction |
| Failure recovery | Re-run entire reload | Resume from last offset | Minutes vs hours |
For healthcare organizations still running batch pipelines, see our comparison of ETL vs ELT architectures for understanding where CDC fits in the modern data stack.

Database-Level CDC with Debezium
Debezium is the open-source standard for database CDC. It reads the database transaction log (WAL in PostgreSQL, binlog in MySQL) and produces a Kafka event for every INSERT, UPDATE, and DELETE. The EHR database does not know Debezium exists because it reads only the log, never queries the database directly.
Debezium Connector Configuration for EHR PostgreSQL
{
"name": "ehr-postgres-cdc",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "ehr-db-primary.internal",
"database.port": "5432",
"database.user": "debezium_cdc",
"database.password": "${file:/secrets/debezium-cdc-password}",
"database.dbname": "ehr_production",
"database.server.name": "ehr_prod",
"schema.include.list": "clinical",
"table.include.list": "clinical.patients,clinical.encounters,clinical.observations,clinical.medication_requests,clinical.conditions",
"plugin.name": "pgoutput",
"slot.name": "debezium_ehr_cdc",
"publication.name": "ehr_cdc_publication",
"topic.prefix": "ehr.cdc",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "ehr\\.cdc\\.clinical\\.(.*)",
"transforms.route.replacement": "healthcare.cdc.$1",
"tombstones.on.delete": true,
"snapshot.mode": "initial",
"snapshot.locking.mode": "none",
"heartbeat.interval.ms": 10000,
"poll.interval.ms": 500,
"max.batch.size": 2048
}
}PostgreSQL Configuration for CDC
-- Enable logical replication (requires restart)
-- postgresql.conf:
-- wal_level = logical
-- max_replication_slots = 4
-- max_wal_senders = 4
-- Create dedicated CDC user with minimal permissions
CREATE ROLE debezium_cdc WITH LOGIN REPLICATION PASSWORD '...' ;
-- Grant SELECT on CDC tables only
GRANT USAGE ON SCHEMA clinical TO debezium_cdc;
GRANT SELECT ON ALL TABLES IN SCHEMA clinical TO debezium_cdc;
-- Create publication for specific tables
CREATE PUBLICATION ehr_cdc_publication FOR TABLE
clinical.patients,
clinical.encounters,
clinical.observations,
clinical.medication_requests,
clinical.conditions;
-- Verify replication slot after Debezium starts
SELECT slot_name, plugin, slot_type, active
FROM pg_replication_slots
WHERE slot_name = 'debezium_ehr_cdc';Debezium CDC Event Structure
Every Debezium event contains the full before and after state of the row, plus metadata about the operation. Here is what a patient address change looks like:
{
"schema": { "...": "..." },
"payload": {
"before": {
"patient_id": "P-12345",
"family_name": "Smith",
"given_name": "John",
"address_line": "123 Oak Street",
"address_city": "Springfield",
"address_state": "IL",
"last_modified": "2026-01-15T10:30:00Z"
},
"after": {
"patient_id": "P-12345",
"family_name": "Smith",
"given_name": "John",
"address_line": "456 Maple Avenue",
"address_city": "Chicago",
"address_state": "IL",
"last_modified": "2026-03-16T14:22:00Z"
},
"source": {
"version": "2.5.0.Final",
"connector": "postgresql",
"name": "ehr_prod",
"ts_ms": 1710598920000,
"db": "ehr_production",
"schema": "clinical",
"table": "patients",
"lsn": 123456789,
"txId": 987654
},
"op": "u",
"ts_ms": 1710598920500
}
}
Handling Healthcare-Specific CDC Patterns
Soft Deletes: The Healthcare Standard
Healthcare records are almost never hard-deleted. HIPAA retention requirements, legal holds, and clinical safety all demand that records persist even when "deleted." In practice, healthcare systems use soft deletes: a status flag or is_deleted column marks the record as inactive while preserving the data.
from pyspark.sql.functions import *
from delta.tables import DeltaTable
def process_cdc_event(event):
"""Process a single CDC event with healthcare soft-delete logic."""
op = event["payload"]["op"]
after = event["payload"].get("after")
before = event["payload"].get("before")
source_ts = event["payload"]["source"]["ts_ms"]
if op == "c": # INSERT
return {
**after,
"cdc_operation": "INSERT",
"is_deleted": False,
"valid_from": source_ts,
"valid_to": None,
"cdc_sequence": event["payload"]["source"]["lsn"]
}
elif op == "u": # UPDATE
return {
**after,
"cdc_operation": "UPDATE",
"is_deleted": False,
"valid_from": source_ts,
"valid_to": None,
"previous_state": json.dumps(before),
"cdc_sequence": event["payload"]["source"]["lsn"]
}
elif op == "d": # DELETE (soft delete in warehouse)
return {
**before,
"cdc_operation": "DELETE",
"is_deleted": True,
"deleted_at": source_ts,
"valid_from": before.get("last_modified"),
"valid_to": source_ts,
"cdc_sequence": event["payload"]["source"]["lsn"]
}
elif op == "r": # READ (snapshot)
return {
**after,
"cdc_operation": "SNAPSHOT",
"is_deleted": False,
"valid_from": source_ts,
"valid_to": None,
"cdc_sequence": 0
}Versioned Records with Validity Periods
For clinical data, you often need to maintain the full history of every change, not just the current state. This is a Type 2 Slowly Changing Dimension (SCD Type 2), and CDC makes it straightforward:
def apply_scd2_merge(delta_table, incoming_changes):
"""Apply SCD Type 2 logic to maintain full change history."""
# Step 1: Close out existing current records that have updates
delta_table.alias("existing").merge(
incoming_changes.alias("changes"),
"existing.patient_id = changes.patient_id "
"AND existing.is_current = true "
"AND changes.cdc_operation IN ('UPDATE', 'DELETE')"
).whenMatchedUpdate(
set={
"is_current": lit(False),
"valid_to": col("changes.valid_from"),
"updated_at": current_timestamp()
}
).execute()
# Step 2: Insert new versions for updates and new inserts
new_records = incoming_changes.filter(
col("cdc_operation").isin("INSERT", "UPDATE")
).withColumn("is_current", lit(True)) \
.withColumn("version",
row_number().over(
Window.partitionBy("patient_id")
.orderBy("cdc_sequence")
)
)
new_records.write.format("delta") \
.mode("append") \
.save("/data/healthcare/silver/patients_history")
Application-Level CDC with FHIR Subscriptions
Not every healthcare organization has direct access to the EHR database. Many EHR vendors do not expose database-level access, especially in cloud-hosted deployments. FHIR Subscriptions provide an application-level alternative: the EHR notifies you via webhook whenever a resource is created, updated, or deleted.
FHIR Subscription Configuration for CDC
{
"resourceType": "Subscription",
"status": "active",
"reason": "CDC pipeline for data warehouse sync",
"criteria": "Patient?_lastUpdated=gt2026-01-01",
"channel": {
"type": "rest-hook",
"endpoint": "https://cdc-bridge.internal/fhir-webhook",
"payload": "application/fhir+json",
"header": [
"Authorization: Bearer ${CDC_WEBHOOK_TOKEN}"
]
}
}FHIR CDC Bridge Service
from flask import Flask, request, jsonify
from confluent_kafka import Producer
import json
import hashlib
from datetime import datetime
app = Flask(__name__)
producer = Producer({
"bootstrap.servers": "kafka:9092",
"enable.idempotence": True,
"acks": "all",
"transactional.id": "fhir-cdc-bridge"
})
producer.init_transactions()
@app.route("/fhir-webhook", methods=["POST"])
def handle_fhir_cdc():
"""Convert FHIR notification to CDC event."""
bundle = request.get_json()
if bundle.get("resourceType") != "Bundle":
return jsonify({"error": "Expected Bundle"}), 400
producer.begin_transaction()
try:
for entry in bundle.get("entry", []):
resource = entry.get("resource", {})
req = entry.get("request", {})
# Map HTTP method to CDC operation
op_map = {
"POST": "c", # create
"PUT": "u", # update
"DELETE": "d" # delete
}
cdc_event = {
"payload": {
"op": op_map.get(req.get("method", "PUT"), "u"),
"before": None, # FHIR Subscriptions lack before-state
"after": resource,
"source": {
"connector": "fhir-subscription",
"ts_ms": int(datetime.utcnow().timestamp() * 1000),
"resource_type": resource.get("resourceType"),
"resource_id": resource.get("id")
}
}
}
# Extract patient ID for partitioning
patient_id = extract_patient_reference(resource)
topic = f"healthcare.cdc.fhir_{resource.get('resourceType', 'unknown').lower()}"
producer.produce(
topic=topic,
key=patient_id.encode("utf-8") if patient_id else None,
value=json.dumps(cdc_event).encode("utf-8")
)
producer.commit_transaction()
return jsonify({"status": "ok"}), 200
except Exception as e:
producer.abort_transaction()
return jsonify({"error": str(e)}), 500
def extract_patient_reference(resource):
"""Extract patient ID from any FHIR resource."""
rt = resource.get("resourceType")
if rt == "Patient":
return resource.get("id")
ref = resource.get("subject", resource.get("patient", {}))
if isinstance(ref, dict):
r = ref.get("reference", "")
return r.split("/")[-1] if "/" in r else r
return NoneFor comprehensive coverage of FHIR Subscription patterns, see our guide on event-driven EHR architecture with FHIR Subscriptions.

Delta Lake MERGE for CDC Upserts
The sink side of the CDC pipeline uses Delta Lake MERGE to apply changes to the warehouse. This is the most critical component because it must handle inserts, updates, and deletes atomically while maintaining the audit trail.
Streaming CDC Consumer with Delta Lake MERGE
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from delta.tables import DeltaTable
spark = SparkSession.builder \
.appName("CDC-to-DeltaLake") \
.config("spark.sql.extensions",
"io.delta.sql.DeltaSparkSessionExtension") \
.getOrCreate()
# Read CDC events from Kafka
cdc_stream = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "healthcare.cdc.patients") \
.option("startingOffsets", "earliest") \
.option("kafka.isolation.level", "read_committed") \
.load()
# Parse CDC events
parsed_cdc = cdc_stream \
.select(
from_json(
col("value").cast("string"),
cdc_event_schema
).alias("event"),
col("offset"),
col("timestamp").alias("kafka_timestamp")
) \
.select(
col("event.payload.op").alias("operation"),
col("event.payload.after.*"),
col("event.payload.source.ts_ms").alias("source_ts"),
col("event.payload.source.lsn").alias("lsn"),
col("offset").alias("kafka_offset")
)
def apply_cdc_to_delta(batch_df, batch_id):
"""Apply a micro-batch of CDC events to Delta table."""
if batch_df.isEmpty():
return
# Deduplicate: keep only latest event per patient_id
deduped = batch_df \
.withColumn("row_num",
row_number().over(
Window.partitionBy("patient_id")
.orderBy(desc("lsn"))
)
) \
.filter(col("row_num") == 1) \
.drop("row_num")
delta_table = DeltaTable.forPath(
spark, "/data/healthcare/silver/patients"
)
delta_table.alias("target").merge(
deduped.alias("source"),
"target.patient_id = source.patient_id"
).whenMatchedUpdate(
condition="source.operation = 'u'",
set={
"family_name": "source.family_name",
"given_name": "source.given_name",
"birth_date": "source.birth_date",
"gender": "source.gender",
"address_line": "source.address_line",
"address_city": "source.address_city",
"address_state": "source.address_state",
"last_modified": "source.last_modified",
"updated_at": current_timestamp(),
"cdc_source_ts": "source.source_ts"
}
).whenMatchedUpdate(
condition="source.operation = 'd'",
set={
"is_deleted": lit(True),
"deleted_at": current_timestamp(),
"updated_at": current_timestamp(),
"cdc_source_ts": "source.source_ts"
}
).whenNotMatchedInsert(
condition="source.operation IN ('c', 'r')",
values={
"patient_id": "source.patient_id",
"family_name": "source.family_name",
"given_name": "source.given_name",
"birth_date": "source.birth_date",
"gender": "source.gender",
"address_line": "source.address_line",
"address_city": "source.address_city",
"address_state": "source.address_state",
"last_modified": "source.last_modified",
"is_deleted": lit(False),
"created_at": current_timestamp(),
"updated_at": current_timestamp(),
"cdc_source_ts": "source.source_ts"
}
).execute()
# Run streaming CDC consumer
parsed_cdc.writeStream \
.foreachBatch(apply_cdc_to_delta) \
.option("checkpointLocation",
"/data/healthcare/cdc/_checkpoint/patients") \
.trigger(processingTime="30 seconds") \
.start() \
.awaitTermination()
Choosing the Right CDC Approach
| Approach | Mechanism | Latency | Source Impact | Before State | Best For |
|---|---|---|---|---|---|
| Debezium (WAL) | Database transaction log | <5 seconds | Zero | Yes (full before/after) | Direct database access available |
| FHIR Subscriptions | Application webhooks | <10 seconds | Minimal | No (after-state only) | Cloud-hosted EHR, no DB access |
| Timestamp polling | Query _lastUpdated | Minutes to hours | Medium | No | Legacy systems, simplest setup |
| Database triggers | SQL triggers write audit table | <1 second | High (slows writes) | Yes | Avoid in healthcare (performance) |
Timestamp-Based Polling: The Fallback
When neither Debezium nor FHIR Subscriptions are available, timestamp-based polling is the simplest CDC approach. It queries the source for records modified since the last poll:
import requests
from datetime import datetime, timedelta
class FhirTimestampCDC:
"""Timestamp-based CDC for FHIR servers."""
def __init__(self, fhir_base_url, token):
self.base_url = fhir_base_url
self.headers = {
"Authorization": f"Bearer {token}",
"Accept": "application/fhir+json"
}
self.last_poll_time = None
def poll_changes(self, resource_type, since=None):
"""Poll for resources modified since last check."""
if since is None:
since = self.last_poll_time or (
datetime.utcnow() - timedelta(hours=1)
)
since_str = since.strftime("%Y-%m-%dT%H:%M:%SZ")
url = (
f"{self.base_url}/{resource_type}"
f"?_lastUpdated=gt{since_str}"
f"&_count=100&_sort=_lastUpdated"
)
all_resources = []
while url:
resp = requests.get(url, headers=self.headers)
bundle = resp.json()
for entry in bundle.get("entry", []):
resource = entry.get("resource", {})
all_resources.append({
"op": "u", # Cannot distinguish insert vs update
"after": resource,
"source_ts": resource.get(
"meta", {}
).get("lastUpdated")
})
# Follow pagination
url = None
for link in bundle.get("link", []):
if link["relation"] == "next":
url = link["url"]
self.last_poll_time = datetime.utcnow()
return all_resources
Handling Out-of-Order Events
In distributed systems, CDC events can arrive out of order. A patient address update (timestamp T2) might arrive before a phone number update (timestamp T1) that actually happened earlier. Without proper ordering, the warehouse ends up with stale data.
Watermark-Based Ordering
from pyspark.sql.functions import window
# Buffer CDC events in a 30-second watermark window
# Process in source timestamp order within each window
ordered_cdc = parsed_cdc \
.withWatermark("kafka_timestamp", "30 seconds") \
.groupBy(
window("kafka_timestamp", "30 seconds"),
"patient_id"
) \
.agg(
# Keep only the latest event per patient within window
max_by("operation", "lsn").alias("final_operation"),
max_by("family_name", "lsn").alias("family_name"),
max_by("given_name", "lsn").alias("given_name"),
max_by("address_line", "lsn").alias("address_line"),
max("lsn").alias("latest_lsn"),
max("source_ts").alias("latest_source_ts")
)
# Alternative: LSN-based idempotency
# Store the last-processed LSN per table
# Skip events with LSN <= last-processedIdempotent CDC Processing
def idempotent_cdc_merge(delta_table, cdc_batch):
"""Apply CDC events idempotently using LSN tracking."""
# Only process events newer than what we have already seen
delta_table.alias("target").merge(
cdc_batch.alias("source"),
"target.patient_id = source.patient_id"
).whenMatchedUpdate(
# Only update if CDC event is newer than current record
condition="source.lsn > target.cdc_last_lsn",
set={
"family_name": "source.family_name",
"given_name": "source.given_name",
"address_line": "source.address_line",
"last_modified": "source.last_modified",
"cdc_last_lsn": "source.lsn",
"updated_at": current_timestamp()
}
).whenNotMatchedInsertAll().execute()For organizations using Delta Lake as the CDC sink, see our comprehensive guide on Delta Lake for healthcare with ACID transactions and time travel. The time travel feature is particularly valuable for CDC because you can replay and verify any point-in-time state.

Production CDC Pipeline: Monitoring and Reliability
CDC Health Monitoring
# Prometheus alerting for CDC pipeline health
groups:
- name: cdc-healthcare-alerts
rules:
# Alert if CDC lag exceeds 5 minutes
- alert: CDCLagExceedsThreshold
expr: |
kafka_consumer_group_lag_sum{
group="cdc-delta-lake-consumer"
} > 30000
for: 5m
labels:
severity: critical
annotations:
summary: "CDC pipeline lag > 5 minutes"
# Alert if Debezium connector fails
- alert: DebeziumConnectorDown
expr: |
debezium_connector_status{
connector="ehr-postgres-cdc"
} != 1
for: 2m
labels:
severity: critical
# Alert if replication slot grows too large
- alert: ReplicationSlotLagHigh
expr: |
pg_replication_slot_bytes_lag{
slot_name="debezium_ehr_cdc"
} > 1073741824
for: 10m
labels:
severity: warning
annotations:
summary: "Replication slot lag > 1GB"For comprehensive monitoring strategies including PagerDuty integration, see our guide on alerting for healthcare systems.
Frequently Asked Questions
Does Debezium affect EHR database performance?
No. Debezium reads the database write-ahead log (WAL), not the tables themselves. It does not run queries against the EHR database. The only impact is WAL retention: the replication slot prevents the database from recycling WAL segments until Debezium reads them. If Debezium falls behind, WAL segments accumulate and consume disk space. Monitor the replication slot lag and alert if it exceeds 1GB.
How do I handle the initial snapshot?
Debezium performs an initial snapshot when first started, reading all existing rows from the monitored tables. For large EHR databases, use snapshot.mode=initial with snapshot.locking.mode=none to avoid locking the source database. The snapshot can take hours for tables with hundreds of millions of rows. Run it during a maintenance window or against a read replica.
What happens if Debezium crashes?
Debezium stores its last-read position (LSN) in Kafka Connect offsets. When it restarts, it resumes from the last committed offset. No events are lost. Some events may be redelivered, so your sink consumer must be idempotent (the MERGE pattern with LSN tracking handles this automatically).
Can I use CDC with cloud-hosted EHR systems like Epic?
Not at the database level because you do not have database access. Use FHIR Subscriptions as your CDC source instead. Epic supports FHIR R4 Subscriptions with rest-hook channels. The tradeoff is that FHIR Subscriptions provide after-state only (no before-state for updates), and you must create a separate Subscription for each resource type you want to track.
How do I handle schema changes in the source database?
Debezium detects schema changes automatically and updates the schema in the Schema Registry. For additive changes (new columns), the CDC pipeline continues without interruption if the downstream consumer and Delta Lake table use mergeSchema=true. For breaking changes (renamed columns, type changes), you need to update the consumer code. Use Schema Registry compatibility checks (FULL compatibility) to catch breaking changes before they reach production.
What is the difference between CDC and FHIR Bulk Data Export?
FHIR Bulk Data Export ($export) produces a point-in-time dump of all resources. It is the equivalent of a full reload, not CDC. Use Bulk Data Export for initial loads and periodic reconciliation. Use CDC (Debezium or FHIR Subscriptions) for ongoing incremental sync. Many healthcare organizations combine both: Bulk Data Export for the initial load, then CDC for continuous updates, with periodic Bulk Data Export reconciliation to catch any drift.


