All Posts

Change Data Capture Pattern: Log-Based Data Movement Without Full Reloads

Capture insert, update, and delete events from source logs for timely downstream sync.

Abstract AlgorithmsAbstract Algorithms
··15 min read

AI-assisted content.

TLDR: Change data capture moves committed database changes into downstream systems without full reloads. It is most useful when freshness matters, replay matters, and the source database must remain the system of record.

TLDR: CDC becomes production-ready only when teams define snapshot handoff, schema evolution, delete semantics, checkpoint ownership, and downstream idempotency. Most incidents come from those edges, not from the connector binary itself.

Shopify runs Debezium on over 200 production tables feeding analytics, search, and fraud systems. In 2020, a routine schema migration on their orders table — adding a nullable column — silently broke their Elasticsearch sink for 4 hours. The Debezium connector was green. The Kafka topic was filling normally. But search was serving stale order status to merchant dashboards. The bug was a missing schema compatibility check, not a connector failure. That distinction — connector health vs. downstream correctness — is the most important thing to understand about CDC in production.

Operator note: Incident reviews usually show that CDC pipelines fail at the seams: the snapshot-to-stream transition duplicates rows, a schema change breaks a sink silently, or downstream consumers cannot explain whether deletes are tombstones, hard deletes, or business events.

📖 When CDC Actually Helps

CDC is the right pattern when operational systems remain authoritative but other systems need timely copies of change events.

Use it for:

  • syncing OLTP changes into a warehouse or lakehouse,
  • maintaining search indexes or caches from transactional writes,
  • propagating inserts, updates, and deletes to other bounded contexts,
  • replayable change streams for audit or bug recovery.
Operational problemWhy CDC helps
Nightly full loads are too slow and expensiveCDC ships only committed changes
Search or analytics trails production by hoursLog-based capture provides low-latency propagation
Teams need replay after a downstream bugChange log plus checkpoints make replay deterministic
Source database must stay authoritativeCDC keeps truth in the source while feeding consumers

🔍 When Not to Use CDC

CDC is not automatically the best answer for every data sync problem.

Avoid or constrain it when:

  • the source database has unstable schema discipline,
  • the change stream is not needed continuously,
  • downstream consumers cannot handle out-of-order or repeated messages,
  • one-time migration is the real problem rather than ongoing replication.
ConstraintBetter first move
Need one-time historical backfill onlyBatch export or snapshot load
Downstream does not understand deletes or updatesRedesign sink model first
Source team changes tables without compatibility processAdd schema governance before CDC rollout
Low freshness requirements and low volumeKeep batch sync simpler

⚙️ How CDC Works in Production

Log-based CDC usually follows this control loop:

  1. Source database writes committed changes to WAL/binlog/redo log.
  2. Connector reads those changes and records source position.
  3. Optional initial snapshot captures current table state.
  4. Change events are emitted to a durable stream or sink.
  5. Consumers apply the events idempotently.
  6. Checkpoints and lag metrics prove where recovery must resume.
Control pointWhat operators care aboutWhy it matters
Snapshot strategyInitial load and transition to live streamPrevents duplicates and gaps
Source position checkpointLSN or binlog offset persistenceDefines replay starting point
Event envelopeOperation type, key, timestamp, schema versionLets downstream reason about changes
Delete semanticsTombstone, hard delete, soft deletePrevents sink corruption
Sink idempotencySafe reprocessing during replayMakes recovery practical

🧠 Deep Dive: What Incident Reviews Usually Reveal First

Failure modeEarly symptomRoot causeFirst mitigation
Duplicate rows after rolloutWarehouse or index count jumps unexpectedlySnapshot-to-stream handoff overlappedAdd explicit cutover watermark and dedupe by primary key plus source position
Sink silently misses deletesSearch index shows stale recordsTombstone handling undefinedStandardize delete semantics and tests
Connector keeps running, consumers failLag stays low but downstream correctness driftsSchema evolution broke sink parsingAdd schema compatibility checks before producer deploys
Replay is too dangerous to useTeams fear duplicating every downstream writeConsumers are not idempotentDesign sink keys and merge logic before rollout
Source DB pressure increasesTransaction latency rises after CDC enablementSnapshot or connector configuration is too heavyThrottle snapshots and isolate connector reads

Field note: teams often monitor connector health but not downstream correctness. A healthy connector feeding a broken sink is still a production incident.

Internals

The critical internals here are boundary ownership, failure handling order, and idempotent state transitions so retries remain safe.

Performance Analysis

Track p95 and p99 latency, queue lag, retry pressure, and cost per successful operation to catch regressions before incidents escalate.

📊 CDC Runtime Flow

flowchart TD
    A[Source database commit] --> B[WAL or binlog]
    B --> C[CDC connector]
    C --> D{Initial snapshot needed?}
    D -->|Yes| E[Load current table state]
    D -->|No| F[Read live change events]
    E --> G[Emit snapshot records]
    F --> H[Emit insert, update, delete events]
    G --> I[Stream or topic]
    H --> I
    I --> J[Idempotent sinks and consumers]
    C --> K[Persist source offset and lag metrics]

This flowchart shows the full CDC pipeline from a source database commit through the WAL or binlog to downstream consumers. The connector handles both initial snapshots and live change events, emitting them to a shared stream or topic that idempotent sinks can consume at their own pace. The key takeaway is that offset persistence and idempotent sink design are the two properties that make the pipeline resumable after any failure point.

📊 WAL to Kafka: Log-Based CDC Sequence

sequenceDiagram
  participant DB as PostgreSQL
  participant W as WAL / Binlog
  participant D as Debezium Connector
  participant K as Kafka Topic
  participant C as Consumer
  DB->>W: COMMIT row change
  W->>D: stream LSN records
  D->>D: persist source offset
  D->>K: emit change event (op+before+after)
  K->>C: deliver insert/update/delete
  C->>C: idempotent upsert by LSN key
  C-->>K: commit offset on success

This sequence diagram traces a single committed row change from PostgreSQL's WAL all the way through Debezium and into a Kafka topic consumed by a downstream service. Debezium persists its source offset after each batch of LSN records, and consumers commit their Kafka offset only after a successful idempotent upsert — ensuring at-least-once delivery with no data loss. The takeaway is that offset tracking happens at two independent layers, giving the pipeline two durable checkpoints to resume from after a failure.

📊 CDC Method Comparison: Log vs Trigger vs Query

flowchart LR
  subgraph Log-Based
    L1[DB WAL or Binlog] --> L2[Connector reads log]
    L2 --> L3[Emit change event]
  end
  subgraph Trigger-Based
    T1[DB Trigger on DML] --> T2[Write to audit table]
    T2 --> T3[Poll audit table]
  end
  subgraph Query-Based
    Q1[Scheduled poll] --> Q2[SELECT WHERE updated gt last_run]
    Q2 --> Q3[Detect changes by timestamp]
  end
  L3 --> Sink[Downstream Sink]
  T3 --> Sink
  Q3 --> Sink

This diagram compares three CDC strategies side by side: log-based (reading directly from the WAL), trigger-based (writing to an audit table on DML), and query-based (polling with a timestamp filter). All three approaches ultimately deliver change records to the same downstream sink, but they differ sharply in latency, database write overhead, and the completeness of changes they can capture. The takeaway is that log-based CDC is the only approach that captures deletes without application-level cooperation and introduces no additional write overhead on the source database.

🧪 Concrete Config Example: Debezium PostgreSQL Connector

This Debezium PostgreSQL connector JSON configuration captures all changes from the orders and order_items tables and streams them to Kafka using the pgoutput logical replication plugin. Debezium is the de facto log-based CDC tool for PostgreSQL, and this connector config is the entry point for the entire log-to-Kafka pipeline described in the diagrams above. Pay close attention to plugin.name, slot.name, and table.include.list — these three fields determine the replication mechanism, the durable replication slot name, and the exact set of tables being captured.

{
  "name": "orders-cdc",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "orders-db",
    "database.port": "5432",
    "database.user": "debezium",
    "database.dbname": "orders",
    "plugin.name": "pgoutput",
    "slot.name": "orders_cdc_slot",
    "publication.name": "orders_cdc_pub",
    "table.include.list": "public.orders,public.order_items",
    "snapshot.mode": "initial",
    "tombstones.on.delete": "true"
  }
}

Why these fields matter operationally:

  • slot.name is part of recovery and retention behavior on the source database.
  • snapshot.mode changes how first-sync risk is handled.
  • tombstones.on.delete makes delete semantics explicit for downstream consumers.

Python Consumer: Reading Debezium Change Events from Kafka

from confluent_kafka import Consumer
import json, logging

logger = logging.getLogger(__name__)

consumer = Consumer({
    "bootstrap.servers": "kafka:9092",
    "group.id":          "orders-cdc-consumer",
    "auto.offset.reset": "earliest",
    # Disable auto-commit — commit only after idempotent write succeeds
    "enable.auto.commit": False,
})
consumer.subscribe(["postgres.public.orders"])

def process_change_event(event: dict):
    op     = event.get("op")           # "c"=insert "u"=update "d"=delete "r"=snapshot
    after  = event.get("after")        # new row state (None for deletes)
    before = event.get("before")       # old row state (None for inserts)
    lsn    = event.get("source", {}).get("lsn")   # PostgreSQL LSN — idempotency key

    if op == "d":
        # Handle deletes explicitly — never silently drop them
        order_id = before["id"] if before else None
        logger.info(f"DELETE order_id={order_id} lsn={lsn}")
        upsert_to_warehouse({"id": order_id, "_deleted": True, "_lsn": lsn})
    elif op in ("c", "u", "r"):
        upsert_to_warehouse({**after, "_lsn": lsn})

def upsert_to_warehouse(record: dict):
    # Idempotent write keyed by (id, _lsn) — safe to replay without duplicates
    pass  # replace with actual sink write

try:
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None or msg.error():
            continue
        payload = json.loads(msg.value())["payload"]
        process_change_event(payload)
        consumer.commit(asynchronous=False)  # commit after successful write only
except KeyboardInterrupt:
    consumer.close()

Key points from Shopify's production pattern:

  • op field determines event type — always handle deletes explicitly or sink correctness drifts silently.
  • Commit offsets only after the sink write succeeds — this is what makes replay safe.
  • lsn is your idempotency key; duplicate events from replay will match and no-op in the sink.

🌍 Real-World Applications: What to Instrument and What to Alert On

Shopify: CDC Across 200+ Production Tables

Shopify uses Debezium to stream PostgreSQL changes from orders, products, and inventory into Kafka, feeding Elasticsearch (search), Snowflake (analytics), and fraud detection. Operating numbers: CDC lag under 300ms from WAL commit to Kafka topic. A full snapshot of their 500M-row orders table takes 6–8 hours and is scheduled during low-traffic windows. The 2020 schema incident (a nullable column broke Elasticsearch for 4 hours) drove adoption of mandatory schema compatibility checks in CI before any table migration merges.

LinkedIn: Databus at Millions of Events/Second

LinkedIn built Databus — an open-source CDC framework — to propagate changes from Oracle and MySQL into distributed cache and search. Their 2012 engineering paper reported CDC reduced cache-miss latency from ~5ms (polling-based sync) to under 100ms end-to-end for profile updates, at throughput of millions of events/second. Replay is supported via a configurable event buffer of 1–24 hours depending on table criticality.

Stripe: Audit Log Pipeline with Slot Lag SLOs

Stripe routes CDC events from primary PostgreSQL clusters into an append-only audit log via Debezium → Kafka → BigQuery. Each row change records the full before/after payload, enabling point-in-time state reconstruction for compliance queries. Their replication slot SLO: if slot lag exceeds 30 minutes, an alert fires to prevent WAL bloat from stalled consumers.

SignalWhy it mattersTypical alert threshold
Source-to-stream lagFreshness against source of truth> 500ms for critical tables
Replication slot lag (PostgreSQL)WAL bloat risk if connector stalls> 30 min behind
Snapshot durationSource pressure and cutover riskExceeds maintenance window
Consumer apply lagSink health separate from connector healthSink lag grows while connector stays healthy
Schema compatibility failuresDrift before silent corruptionAny consumer schema rejection

What breaks first:

  1. Snapshot handoff logic — the Shopify incident pattern.
  2. Delete semantics — sink serves stale records for deleted rows.
  3. Sink idempotency — replay causes duplicates in the warehouse.

⚖️ Trade-offs & Failure Modes: Pros, Cons, and Alternatives

CategoryPractical impactMitigation
ProsLow-latency replication from source-of-truth systemsKeep envelope and checkpoints explicit
ProsReplayable recovery path for downstream failuresDesign idempotent sinks
ConsConnector, schema, and source-log operational overheadLimit scope to high-value tables first
ConsMore nuanced delete and update handlingStandardize downstream contracts
RiskSilent correctness drift in sinksValidate counts, deletes, and compatibility continuously
RiskSource database retention issues from stalled slotsAlert on slot lag and connector stoppage

🧭 Decision Guide for Data Movement

SituationRecommendation
Need sub-minute propagation from OLTP to sinksUse CDC
Need only daily reporting loadsPrefer simpler batch extracts
Source and sink both need replayable historyCDC plus durable stream is a strong fit
Consumers cannot tolerate duplicates or delete complexityFix sink model before rollout

If you cannot explain how deletes and replays behave, the CDC design is not ready.

Operator Field Note: What Fails First in Production

Shopify's 4-hour silent Elasticsearch corruption: The Debezium connector was green. Kafka consumer lag was zero. But a schema change on orders — adding a nullable updated_reason column — wasn't backward-compatible with Elasticsearch's strict index mapping. The connector serialized null as a missing field; Elasticsearch silently dropped those records from search results. Merchant dashboards showed order counts, but many orders were invisible in search. The incident ran for 4 hours before a merchant escalation triggered the investigation.

Root cause: no schema compatibility gate in the table migration deploy pipeline. Fix: Debezium's Confluent Schema Registry integration was wired to reject producer schema changes failing Avro backward-compatibility checks. Every table migration now runs a schema-compat-check CI step before merge.

  • Early warning signal: consumer apply lag growing while connector lag stays flat — this almost always means the sink is rejecting or silently dropping events.
  • First containment move: pause the consumer, inspect the DLQ or dead-letter topic, and compare the last-processed event schema with the current source schema.
  • Escalate immediately when: sink row count diverges from source by more than 0.1% over two consecutive polling windows.

15-Minute SRE Drill

  1. Replay one bounded failure case in staging.
  2. Capture one metric, one trace, and one log that prove the guardrail worked.
  3. Update the runbook with exact rollback command and owner on call.

🛠️ Debezium Embedded Engine: Running CDC Inside a Java Spring Boot Service

Debezium is an open-source CDC platform built on Apache Kafka Connect; its EmbeddedEngine variant runs the connector inside the JVM of a Spring Boot application — useful for teams that want CDC without operating a full Kafka Connect cluster. For production-scale deployments (like Shopify's 200+ table setup), the standalone Kafka Connect connector configuration shown earlier in this post is the preferred deployment model.

How it solves the problem: The EmbeddedEngine approach lets a Java service own its CDC pipeline from WAL read to idempotent sink write in a single, testable process. Every change event is delivered to an EngineChangeEventHandler that maps to the same op-based routing shown in the Python consumer above — with the same LSN-based idempotency key and explicit delete handling.

// Spring Boot CDC service using Debezium EmbeddedEngine
@Service
public class OrderCdcService implements InitializingBean, DisposableBean {

    private DebeziumEngine<ChangeEvent<String, String>> engine;
    private final WarehouseSinkService warehouseSink;

    public OrderCdcService(WarehouseSinkService warehouseSink) {
        this.warehouseSink = warehouseSink;
    }

    @Override
    public void afterPropertiesSet() {
        Properties props = new Properties();

        // Connector identity and source database
        props.setProperty("name",                    "orders-cdc-embedded");
        props.setProperty("connector.class",
            "io.debezium.connector.postgresql.PostgresConnector");
        props.setProperty("database.hostname",       "orders-db");
        props.setProperty("database.port",           "5432");
        props.setProperty("database.user",           "debezium");
        props.setProperty("database.password",       "${DB_PASSWORD}");
        props.setProperty("database.dbname",         "orders");
        props.setProperty("plugin.name",             "pgoutput");
        props.setProperty("slot.name",               "orders_embedded_slot");
        props.setProperty("table.include.list",      "public.orders,public.order_items");

        // Snapshot: capture current state on first run, then switch to live stream
        props.setProperty("snapshot.mode",           "initial");
        // Explicit tombstone for deletes — downstream consumers must handle op=d
        props.setProperty("tombstones.on.delete",    "true");

        // Checkpoint: persist source offset so replay resumes at the right LSN
        props.setProperty("offset.storage",
            "org.apache.kafka.connect.storage.FileOffsetBackingStore");
        props.setProperty("offset.storage.file.filename", "/var/cdc/offsets.dat");
        props.setProperty("offset.flush.interval.ms",     "5000");

        engine = DebeziumEngine.create(Json.class)
            .using(props)
            .notifying(this::handleChangeEvent)
            .build();

        Executors.newSingleThreadExecutor().execute(engine);
    }

    private void handleChangeEvent(ChangeEvent<String, String> event) {
        if (event.value() == null) return;   // tombstone record from Kafka

        try {
            JsonNode payload = objectMapper.readTree(event.value()).get("payload");
            String op  = payload.get("op").asText();           // c/u/d/r
            JsonNode after  = payload.get("after");
            JsonNode before = payload.get("before");
            long lsn = payload.path("source").path("lsn").asLong();

            switch (op) {
                case "d" -> {
                    // Explicit delete handling — never silently drop
                    String orderId = before.get("id").asText();
                    warehouseSink.markDeleted(orderId, lsn);
                }
                case "c", "u", "r" -> {
                    // Insert, update, or snapshot row — idempotent upsert by (id, lsn)
                    warehouseSink.upsert(after, lsn);
                }
                default -> log.warn("Unknown CDC op={} lsn={}", op, lsn);
            }
        } catch (JsonProcessingException ex) {
            log.error("Failed to parse CDC event — routing to DLQ", ex);
            dlqPublisher.publish(event.value());
        }
    }

    @Override
    public void destroy() throws Exception {
        if (engine != null) engine.close();
    }
}

// Idempotent sink service — safe to replay without duplicates
@Service
public class WarehouseSinkService {

    private final JdbcTemplate jdbc;

    public void upsert(JsonNode row, long lsn) {
        // Idempotent: ON CONFLICT (id) DO UPDATE only if lsn is newer
        jdbc.update("""
            INSERT INTO warehouse.orders (id, customer_id, amount, status, source_lsn)
            VALUES (?, ?, ?, ?, ?)
            ON CONFLICT (id) DO UPDATE
              SET customer_id = EXCLUDED.customer_id,
                  amount      = EXCLUDED.amount,
                  status      = EXCLUDED.status,
                  source_lsn  = EXCLUDED.source_lsn
              WHERE warehouse.orders.source_lsn < EXCLUDED.source_lsn
            """,
            row.get("id").asText(),
            row.get("customer_id").asText(),
            row.get("amount").asDouble(),
            row.get("status").asText(),
            lsn);
    }

    public void markDeleted(String orderId, long lsn) {
        jdbc.update("""
            UPDATE warehouse.orders
            SET deleted = true, source_lsn = ?
            WHERE id = ? AND source_lsn < ?
            """, lsn, orderId, lsn);
    }
}

For a full deep-dive on Debezium production deployments with Kafka Connect and schema registry integration, a dedicated follow-up post is planned.

📚 Interactive Review: CDC Readiness Drill

Before enabling a new CDC connector, ask:

  1. What exact source position defines the transition from snapshot to live stream?
  2. How will downstream systems deduplicate replayed update events?
  3. What does a delete event mean in every sink that consumes it?
  4. Which schema changes are backward compatible, and who approves them?
  5. How much source lag can the business tolerate before the pipeline is considered degraded?

Scenario question: if the connector stays healthy but search keeps serving deleted orders for 30 minutes, which metrics and contracts failed you first?

📌 TLDR: Summary & Key Takeaways

  • CDC is a practical way to move committed changes without full reloads.
  • Snapshot cutover, deletes, checkpoints, and idempotent sinks are the hard parts.
  • Connector health alone does not prove downstream correctness.
  • Measure freshness and replay cost separately for connector and consumer layers.
  • Start with a narrow table set and clear schema governance.
Share

Test Your Knowledge

🧠

Ready to test what you just learned?

AI will generate 4 questions based on this article's content.

Abstract Algorithms

Written by

Abstract Algorithms

@abstractalgorithms