All Posts

Data Anomalies in Distributed Systems: Split Brain, Clock Skew, Stale Reads, and More

The anomalies that distributed databases produce even when nodes are healthy — and the engineering patterns that prevent them

Abstract AlgorithmsAbstract Algorithms
··37 min read
Share
Share on X / Twitter
Share on LinkedIn
Copy link

TLDR: Distributed systems produce anomalies not because the code is buggy — but because physics makes it impossible to be perfectly consistent, available, and partition-tolerant simultaneously. Split brain, stale reads, clock skew, causality violations, conflicting writes, zombie leaders, cascading failures, and thundering herd are the canonical failure modes. Each has a distinct root cause and a set of engineering patterns — fencing tokens, vector clocks, CRDTs, quorum reads, jittered backoff, circuit breakers — that contain or eliminate the damage.


In 2012, a DynamoDB bug caused 30 minutes of degraded availability for Amazon services. The root cause: a replication lag anomaly that caused nodes to return stale reads while the system appeared healthy. The data was correct on disk. The problem was in the coordination layer.

Distributed systems produce anomalies not because the code is buggy — but because physics makes it impossible to be consistent, available, and partition-tolerant all at once. A message sent from node A to node B may be delayed by 200ms, 2 seconds, or forever. Two clocks on two different servers will drift apart even if they were synchronized a minute ago. A node that stops sending heartbeats may be dead — or it may be alive and paused by a garbage collector.

Every one of these realities creates a distinct class of anomaly. This post catalogs them, explains their root causes, and maps each to the engineering patterns that prevent or contain them. It deliberately does not re-cover dirty reads, phantom reads, or write skew — those belong to the isolation levels domain. This post is about the anomalies that arise specifically from distribution: network delays, clock differences, node failures, replication topology, and independent node state.


📖 Why Distributing Data Breaks Rules Physics Never Mentioned

A single-node relational database is a marvel of engineering predictability. Every write goes through one process, in one memory space, serialized to one disk. ACID properties — Atomicity, Consistency, Isolation, Durability — hold because there is no concurrency problem that can't be resolved by a simple mutex. The database scheduler sees every transaction and can order them cleanly.

Distribute that database across five nodes in two datacenters and the world changes entirely. You now have five independent processes that must coordinate using a network — a channel that can drop, delay, reorder, and duplicate messages. You have five clocks that drift at different rates. You have five independent failure domains.

The Eight Fallacies That Cause Distributed System Failures

Peter Deutsch's eight fallacies of distributed computing were first articulated at Sun Microsystems in the 1990s. They are still the best taxonomy of the hidden assumptions that cause distributed systems to fail:

  1. The network is reliable. It is not. Packets are dropped, connections are reset, routes flap. A TCP connection can silently fail for minutes before either end notices.
  2. Latency is zero. A cross-datacenter round trip is 60–100ms. A GC pause is 200ms–5 seconds. A disk I/O under load is 20–200ms. None of these are zero.
  3. Bandwidth is infinite. A replication stream, a batch snapshot, and a monitoring pipeline can all compete for the same NIC.
  4. The network is secure. Assumptions about trusted internal traffic have caused incidents from BGP hijacks to lateral movement after breach.
  5. Topology doesn't change. Nodes are added, removed, promoted, and demoted. A static configuration that was correct at startup may be wrong by noon.
  6. There is one administrator. Different teams may manage different parts of the infrastructure with conflicting assumptions.
  7. Transport cost is zero. Serialization, deserialization, and connection overhead add up at scale.
  8. The network is homogeneous. Mixed hardware, mixed OS versions, and mixed kernel parameters create unpredictable timing behavior.

Every distributed anomaly in this post can be traced back to one or more of these fallacies. Split brain follows from fallacy 1. Clock skew follows from fallacy 2. Cascading failure follows from the intersection of fallacies 1, 2, and 5.

The Anomaly Taxonomy

Distributed anomalies fall into four families, which we will use as a conceptual frame throughout this post:

FamilyDescriptionExamples
Visibility anomaliesA read returns data that is wrong — stale, missing, or partially appliedStale reads, torn reads, replication lag
Ordering anomaliesEvents that are causally related appear in the wrong order to some observersCausality violations, clock skew
Availability anomaliesA component appears failed when it is healthy, or healthy when it is failedSplit brain, zombie leader, phantom failures
Integrity anomaliesTwo nodes diverge in data state and cannot deterministically agree on a canonical valueConflicting writes, data divergence

Understanding which family an anomaly belongs to immediately narrows the solution space. Visibility anomalies are fixed by routing and consistency tokens. Ordering anomalies are fixed by logical clocks. Availability anomalies are fixed by epoch/term tracking and fencing. Integrity anomalies are fixed by merge semantics or conflict-free data structures.


⚙️ Split Brain: When Two Nodes Both Believe They Are the Leader

Split brain is one of the most dangerous distributed anomalies because it is an availability anomaly that produces an integrity anomaly. It is dangerous precisely because the system looks healthy — all nodes are up, all nodes are accepting writes, no alarms are firing.

How Split Brain Happens

Consider a three-node replica set: one primary (P) and two secondaries (S1, S2). The primary sends heartbeats to secondaries every 2 seconds. A network partition isolates P from S1 and S2.

From P's perspective: the network is down, but P is alive. P continues serving writes to clients on its side of the partition.

From S1 and S2's perspective: they have not heard from P in 10 seconds. They trigger an election, S1 wins, and S1 promotes itself to primary (with a new term number).

Now both P (term 1) and S1 (term 2) are accepting writes to the same logical dataset. A user writes user.email = a@example.com to P and user.email = b@example.com to S1. When the partition heals, both primaries have committed conflicting writes. The data cannot be merged deterministically without application-level intervention.

This sequence diagram shows the split brain progression: after the partition fires, two primaries independently accept writes. When the partition heals and the term numbers are compared, the old primary must discard its writes — data is permanently lost.

sequenceDiagram
    participant C1 as Clients A
    participant P as Old Primary (Term 1)
    participant NET as Network Partition
    participant S1 as New Primary (Term 2)
    participant C2 as Clients B

    P->>S1: Heartbeat blocked by partition
    S1->>S1: Election timeout fires
    S1->>S1: Wins election, promotes to Term 2
    C1->>P: Write email=a@example.com (Term 1)
    P->>P: Accepts write
    C2->>S1: Write email=b@example.com (Term 2)
    S1->>S1: Accepts write
    NET->>P: Partition heals
    S1->>P: I am primary at Term 2
    P->>P: Steps down, rolls back Term 1 writes
    Note over P,S1: Write email=a@example.com is permanently lost

Detecting and Preventing Split Brain

Term/epoch numbers are the core detection mechanism. In Raft, every message carries a term number. A node that receives a message with a higher term number than its own immediately steps down. This is why S1's term-2 message causes P to step down after the partition heals.

Quorum writes prevent split brain from causing data loss. If a write requires acknowledgment from a majority of nodes (e.g., 2 of 3), then P cannot commit any writes during the partition because it only has 1 of 3 votes — below majority. Writes on P's side fail, which surfaces the partition to clients rather than silently accepting conflicting writes.

STONITH (Shoot The Other Node In The Head) is the most aggressive prevention: when a new leader is elected, it sends a forced-shutdown command to the old leader via an out-of-band channel (IPMI, AWS EC2 instance stop API) to guarantee the old leader is dead before the new leader serves writes.

Fencing tokens provide a softer alternative — a monotonically increasing number associated with each leadership lease. The storage layer rejects writes from a leader whose token is lower than the highest seen. We explore this further in the 🧪 section.

A real-world example: a 2012 MongoDB bug allowed two primaries to coexist simultaneously after certain network partition patterns because the term-number comparison did not cover all code paths. The fix was to add explicit term checks at every write path entry point.


🧠 Stale Reads and Clock Skew: The Two Anomalies That Break Your Sense of Time

Stale reads and clock skew are both visibility anomalies, but they come from different root causes. Stale reads come from replication lag — the inherent delay in propagating writes to replicas. Clock skew comes from the impossibility of perfectly synchronizing physical clocks across machines.

Stale Reads: The Replica That Didn't Get the Memo

In an asynchronous replication setup, the primary commits a write and immediately acknowledges success to the client. The replication to secondaries happens in the background, with no hard latency bound.

This creates a window — typically milliseconds, but potentially seconds under load — during which a secondary holds a value that the primary has already superseded. Any read routed to that secondary during this window returns a stale value.

Scenario 1 — Read-your-writes violation: A user updates their profile avatar. The write lands on the primary. The client is immediately redirected to load the profile page, which reads from a secondary that hasn't received the update yet. The user sees their old avatar.

Scenario 2 — Inventory oversell: An e-commerce system's primary decrements stock from 5 to 0 after the last purchase. A replica is still showing 5. A second buyer's availability check hits the replica and concludes stock is available. An oversell occurs.

Scenario 3 — Monitoring dashboard inconsistency: A dashboard reading from a replica shows CPU metrics from 8 seconds ago. An alert threshold that should have fired 6 seconds ago has not yet triggered.

The following diagram shows the replication lag window: the critical period between the primary committing a write and the replica applying it, during which any read from the replica returns the stale value.

sequenceDiagram
    participant C as Client
    participant P as Primary
    participant R as Replica (async)

    C->>P: Write: stock=0
    P->>P: Committed at T=100ms
    P-->>R: Replication in-flight
    C->>R: Read: stock? at T=110ms
    R-->>C: stock=5 (stale — replica at T=50ms)
    Note over P,R: Replication lag window (~200ms)
    P->>R: Apply: stock=0 at T=300ms
    C->>R: Read: stock? at T=400ms
    R-->>C: stock=0 (fresh)

Prevention strategies:

  • Read-your-writes (session consistency): Route reads for a session's own writes to the primary, or include a replication position token (LSN) with the request. A replica serving that read waits until it has applied at least that LSN before responding.
  • Bounded staleness: Configure a maximum acceptable lag (e.g., 5 seconds). If a replica's lag exceeds this, route reads to the primary. PostgreSQL exposes pg_stat_replication.write_lag; MySQL exposes Seconds_Behind_Master; Kafka exposes consumer group lag.
  • Synchronous replication for critical paths: For inventory-decrement or payment-debit reads, require quorum: R + W > N. If N=3 and W=2, then R=2 guarantees overlap with the quorum that received the write.

Clock Skew: When Two Nodes Disagree About What Time It Is

NTP-synchronized clocks typically stay within 100ms of each other under normal conditions. Under network load or in VMs with paused clocks, skew can reach 1–2 seconds. Across datacenters, it can exceed 5 seconds without tight clock discipline.

This matters because many distributed systems use wall-clock timestamps to resolve conflicts. Last Write Wins (LWW) is the canonical example: when two nodes both received a write for the same key, the write with the later timestamp wins. If node A's clock is 2 seconds behind node B's, a write to node A at physical time T will carry a timestamp T-2000ms — and will lose to a write on node B that happened 1 second earlier in physical time but carries timestamp T-1000ms. The correct write is discarded.

Cassandra uses LWW with client-supplied timestamps as its default conflict resolution strategy. A client with a 2-second clock skew can permanently overwrite data that was written after it — silently, with no error.

Prevention strategies:

Hybrid Logical Clocks (HLC) combine a wall-clock component with a logical counter. The wall-clock provides rough ordering; the logical counter breaks ties. Crucially, HLC timestamps are always monotonically increasing — a node never issues a timestamp lower than the last one it observed, regardless of wall-clock drift.

Pseudocode for HLC timestamp generation:

function hlc_now(local_wall_clock, last_hlc):
    wall = max(local_wall_clock, last_hlc.wall)
    if wall == last_hlc.wall:
        logical = last_hlc.logical + 1
    else:
        logical = 0
    last_hlc = {wall: wall, logical: logical}
    return last_hlc

function hlc_receive(msg_hlc, local_wall_clock, last_hlc):
    wall = max(local_wall_clock, msg_hlc.wall, last_hlc.wall)
    if wall == msg_hlc.wall and wall == last_hlc.wall:
        logical = max(msg_hlc.logical, last_hlc.logical) + 1
    elif wall == msg_hlc.wall:
        logical = msg_hlc.logical + 1
    elif wall == last_hlc.wall:
        logical = last_hlc.logical + 1
    else:
        logical = 0
    last_hlc = {wall: wall, logical: logical}
    return last_hlc

TrueTime (Google Spanner) takes the opposite approach: rather than hiding uncertainty, it exposes it as an interval [earliest, latest]. Spanner uses GPS receivers and atomic clocks to bound this interval to ±7ms. Transactions carry a commit timestamp; before returning success to the client, Spanner waits until now.earliest > commit_timestamp — guaranteeing that no future transaction on any node can claim an earlier timestamp. This commit-wait delay is the cost of external consistency.

Internals: How Asynchronous Replication Propagates State

The mechanics behind replication lag are worth understanding precisely, because the lag window is where stale reads live. In a primary-secondary replication chain:

  1. The client sends a write to the primary. The primary acquires a lock, writes to the WAL (Write-Ahead Log), commits to its data files, and returns ACK to the client.
  2. The primary's replication stream sends the WAL record(s) to each connected secondary. In PostgreSQL streaming replication, this is a continuous binary log stream over TCP. In MySQL, it is the binlog. In Kafka, it is the log segment.
  3. The secondary receives the WAL record, applies it to its own data files, and updates its replication position pointer (pg_last_wal_replay_lsn() in PostgreSQL).
  4. A reader hitting the secondary before step 3 completes sees the pre-write value.

The replication lag observable from the primary is measured in bytes (write_lag, flush_lag, replay_lag in pg_stat_replication) or in seconds (Seconds_Behind_Master in MySQL). Monitoring these metrics is the first step to detecting when staleness exceeds acceptable bounds.

Performance Characteristics: The Cost of Each Staleness Bound

Different consistency guarantees carry different latency costs. Understanding the performance profile of each option is essential for making the right choice per read path:

Consistency ModelAdditional LatencyAvailability During PartitionSuitable For
Async replica read (no bound)~0msHigh (always reads something)Analytics, dashboards, non-critical reads
Bounded staleness (max lag N seconds)0ms if in-bound; primary fallback adds ~1msMediumFeeds, recommendations, soft real-time
Read-your-writes (LSN token)0–200ms (wait for replica to catch up)High (can fall back to primary)User-visible writes (profile, cart, settings)
Quorum read (R + W > N)1 extra network round trip (50–100ms cross-AZ)Reduced (requires R nodes up)Inventory, financial balances, authoritative state
Linearizable read (primary only)Round trip to primary (~1ms intra-AZ)Single point of failureLeader election state, session tokens

The key insight: most applications do not need linearizable reads on all paths. The performance cost scales directly with the consistency strength. Applying linearizable reads only to the 5% of paths that require them and bounded staleness to the rest achieves both correctness and scale.

Mathematical Model: Quorum Intersection and the W+R > N Formula

The quorum formula provides a mathematical guarantee that a read will always overlap with the set of nodes that received the most recent write. Given:

  • N = total number of replicas
  • W = number of replicas that must acknowledge a write before it is committed
  • R = number of replicas that must respond to a read

Guarantee: If W + R > N, then the read quorum and write quorum overlap by at least one node. That overlapping node holds the latest written value.

Example with N=5:

  • W=3, R=3: quorum overlap of 1 node (3+3-5=1). Tolerates 2 node failures on reads or writes.
  • W=5, R=1: strong write durability; 1-node reads are cheap but only tolerable for read-heavy workloads.
  • W=1, R=5: fast writes; all reads must hit every replica (impractical at scale).
  • W=2, R=2: W+R=4 < N=5. No overlap guaranteed. May return stale data. Useful for high availability at the cost of consistency.

The catch: the formula assumes clocks are synchronized and that "latest" is determined by logical ordering. In Cassandra with client-side timestamps, clock skew can corrupt the "latest" determination even when quorum conditions hold — a reason to separate quorum logic from wall-clock conflict resolution.


🏗️ Causality Violations and Vector Clocks: When the Reply Arrives Before the Post

A causality violation is an ordering anomaly: events that are causally dependent (A caused B, B caused C) appear in the wrong order to some observers. Unlike clock skew, causality violations can occur even in a system with perfect clocks — they arise purely from asynchronous message delivery.

The Threaded Conversation Problem

User A posts "Hello, is anyone there?" and user B sees it and replies "Yes, hi!" User C is observing the thread. Due to different network paths, C receives B's reply before A's original post. From C's perspective, someone replied to a question that hasn't been asked yet.

This isn't a network bug. It's the expected behavior of an asynchronous system where different messages take different routes. The same pattern appears in distributed cache invalidation (the invalidation message arrives before the new value), in distributed logging (a log entry for a downstream operation appears before the upstream trigger), and in multi-datacenter replication (an ACK for a write arrives at the client before the write propagates to all replicas).

Vector Clocks: Making Causality Explicit

A vector clock is a data structure that captures the causal history of an event. Each node maintains a vector of counters, one per node in the system. When a node sends a message, it increments its own counter and attaches the full vector. When a node receives a message, it merges the received vector with its own by taking the component-wise maximum, then increments its own counter.

The following diagram shows a causality violation and how vector clocks detect it: B's reply carries the vector (A:1, B:1) which is causally after A's post (A:1, B:0). When C receives B's reply before A's post, the vector clock lets C detect that it is missing A's post before it can display B's reply.

sequenceDiagram
    participant A as Node A
    participant B as Node B
    participant C as Node C

    A->>B: Post "Hello" (A:1, B:0, C:0)
    B->>B: Reply "Yes, hi!" (A:1, B:1, C:0)
    B->>C: Deliver reply first (A:1, B:1, C:0)
    Note over C: C has not seen A:1 yet
    C->>C: Detect: missing causal predecessor A:1
    A->>C: Deliver post (delayed) (A:1, B:0, C:0)
    C->>C: Now can display post, then reply in order

Comparing two vector clocks: V1 happens-before V2 if and only if every component of V1 is the corresponding component of V2, and at least one is strictly <. If neither V1 ≤ V2 nor V2 ≤ V1, the events are concurrent — neither causally precedes the other, and they may be delivered in any order without violating causality.

Prevention strategies:

  • Causal consistency middleware: Each request carries the client's current causal token (a vector clock or logical timestamp). A server delays the response until it has applied all events causally preceding the client's token.
  • Total order broadcast: All nodes deliver messages in the same global order (implemented by Raft or Paxos logs). Total order is stronger than causal consistency and eliminates all ordering anomalies — at the cost of latency and availability.
  • Operational transforms / CRDTs: For collaborative editing and eventually-consistent applications, conflict-free data structures sidestep ordering requirements entirely by making all operations commutative.

📊 Network Partition Anomalies: What CAP Looks Like in Production

CAP theorem is often taught as a theoretical trade-off — a dry academic statement about impossibility proofs. In production, a partition is a concrete operational event with concrete observable effects that differ dramatically depending on whether your system chose CP or AP.

CP Systems Under Partition: Correct but Unavailable

A ZooKeeper or etcd cluster uses Raft consensus. Writes require a majority quorum. Consider a 5-node cluster with 3 nodes in US-EAST and 2 nodes in US-WEST. A network partition cuts the US-EAST/US-WEST link.

US-EAST (majority, 3 nodes): Quorum is intact. The leader continues serving reads and writes normally. Clients connected to US-EAST see no degradation.

US-WEST (minority, 2 nodes): Quorum is broken. US-WEST cannot elect a new leader — it needs 3 of 5 votes. All writes from US-WEST clients return errors. Reads may also return errors if the system enforces linearizable reads (which require quorum). Clients connected to US-WEST see hard failures even though their nodes are healthy.

This is the correct behavior for a CP system: it refuses to serve potentially stale reads rather than risk inconsistency. But users don't experience it as "correct" — they experience it as an outage.

AP Systems Under Partition: Available but Divergent

A Cassandra or DynamoDB cluster in AP mode keeps all nodes serving reads and writes during a partition. US-EAST writes user.cart = [A, B]. US-WEST writes user.cart = [C, D] to the same user key. Both writes succeed with a 200 OK. Clients see no errors.

When the partition heals, the cluster enters a read-repair / anti-entropy phase to reconcile the two values. Without application-level conflict resolution, one value will overwrite the other (LWW). With CRDTs, both values may be merged. Either way, the divergence that accumulated during the partition must be resolved — and the resolution may surprise users.

The G-SLO Trap

Many teams monitor service availability by measuring error rates on reads and writes. During a partition, a CP system returns errors on the minority partition — these errors count against the SLO. An AP system returns successes during the partition — the errors appear later, during reconciliation, as data inconsistencies, which are harder to count. Teams with AP systems sometimes discover they have been reporting 99.99% availability while silently accumulating data divergence.

The honest accounting: both CP and AP systems lose something during a partition. CP loses availability. AP loses consistency. Neither is "better" — the choice is which failure mode your application can tolerate.


🌍 Conflicting Writes, Data Divergence, and CRDTs in the Wild

In leader-based replication, the single leader serializes all writes, eliminating concurrent conflicts. In multi-leader and leaderless (Dynamo-style) replication, two nodes can independently accept writes to the same key. When they sync, they must resolve the conflict.

How Conflicts Happen

In a multi-leader setup with two datacenters, a user updates their shipping address. The request lands on DC1's leader simultaneously with another update on DC2's leader (perhaps from a mobile sync). Both updates are committed. Both datacenters have diverged. When the replication stream bridges the two leaders, the same key has two different values.

In leaderless replication (Cassandra, DynamoDB), a write with consistency level ONE succeeds as soon as a single replica acknowledges it. A concurrent write can hit a different replica. Both succeed. On the next read, a coordinator receives conflicting values from different replicas and must choose one.

Conflict Resolution Strategies

Last Write Wins (LWW): Use the write timestamp to determine the winner. Simple, but data loss is silent and guaranteed — the losing write is simply discarded. LWW is the default in Cassandra and the behavior in many NoSQL systems when no explicit merge is configured. It is appropriate only when losing some writes is acceptable (e.g., sensor readings where the latest value is all that matters).

First Write Wins: The opposite policy. Used in some compare-and-swap (CAS) patterns where the first commit wins and subsequent updates are rejected.

Application-level merge: The application receives all conflicting versions and decides how to merge them. Riak exposes "sibling values" — all conflicting versions — and requires the application to resolve them on the next write. DynamoDB conditional expressions can implement optimistic locking at the application layer.

CRDTs (Conflict-free Replicated Data Types): CRDTs are data structures whose merge operation is mathematically guaranteed to converge to the same result regardless of the order in which replicas are merged. They eliminate the conflict problem by design.

A G-Counter (Grow-only Counter) is the simplest CRDT. Each node maintains its own counter. The global value is the sum of all per-node counters. Incrementing always goes to the local node's counter; merging takes the component-wise maximum.

// G-Counter: each node has its own slot
state: {nodeId -> count}   // e.g., {A:5, B:3, C:7}

increment(nodeId):
    state[nodeId] += 1

value():
    return sum(state.values())

merge(local, remote):
    for each nodeId:
        result[nodeId] = max(local[nodeId], remote[nodeId])
    return result

Because merge is commutative, associative, and idempotent, two replicas with the same set of increments will always converge to the same value regardless of network delays or partial synchronization. A PN-Counter (Positive-Negative Counter) extends G-Counter with a separate decrement counter, enabling arbitrary increment/decrement operations with CRDT convergence.

Amazon's shopping cart famously used a CRDT: when two datacenters diverged, the cart's merge operation took the union of both carts. This is why items sometimes "reappear" in Amazon carts after a page refresh — the item was in one datacenter's version, and after a partition healed, the union merge brought it back. Amazon's engineers documented this explicitly: they chose to show users a slightly larger cart (annoyance) rather than silently drop items (data loss).


⚖️ Cascading Failures and Thundering Herd: When One Node's Death Becomes Many

Single-node failures are routine in a well-designed distributed system — they are expected and handled by redundancy. Cascading failures and thundering herd are pathological patterns where a single failure triggers a self-reinforcing chain reaction that takes down healthy components.

Cascading Failure: The Feedback Loop

The classic cascading failure pattern in a database cluster:

  1. One replica dies under load.
  2. The same read traffic that was spread across N replicas is now spread across N-1 replicas.
  3. Each surviving replica now handles more requests per second.
  4. Latency on each replica increases due to higher load.
  5. Some clients hit read timeouts and retry — doubling effective read traffic.
  6. A second replica dies under the doubled load.
  7. Repeat. The cluster collapses from the outside in.

Cloudflare's July 2020 incident followed this pattern: a BGP advertisement error caused a traffic spike that overloaded edge nodes, which timed out and were marked unhealthy, routing more traffic to the remaining nodes, which overloaded them in turn. Within minutes, 50+ services were unreachable.

Prevention:

  • Circuit breakers: An open circuit stops sending traffic to a failing component. The circuit trips when the error rate exceeds a threshold (e.g., 50% errors in 10 seconds). This halts the retry amplification loop.
  • Bulkheads: Partition client connection pools by upstream service. A failure in service A cannot consume the connections allocated to service B.
  • Load shedding: Under extreme load, deliberately reject a percentage of requests with 503 before the system collapses. A degraded service that serves 60% of traffic is better than a collapsed service serving 0%.
  • Backpressure: Downstream services signal overload upstream, which slows its request rate. Reactive Streams and gRPC flow control implement this at the protocol level.

Thundering Herd and Cache Stampede

Thundering herd: A server restarts after a failure. One hundred clients, each implementing exponential backoff with a fixed base, all computed the same next retry time (because they all saw the failure at the same time). They retry simultaneously. The server is overwhelmed immediately after restart and may fail again.

Cache stampede: A hot cache key expires. One thousand concurrent requests, all waiting on that key, each detect a cache miss and independently fire a database query for the same data. The database receives 1,000 simultaneous queries for the same record.

Both patterns are variants of synchronized load bursts triggered by a common event.

Prevention for thundering herd: Jittered exponential backoff adds randomness to the retry delay, spreading retries over a window rather than synchronizing them.

function jittered_backoff(attempt, base_delay_ms, max_delay_ms):
    exponential = base_delay_ms * (2 ^ attempt)
    capped = min(exponential, max_delay_ms)
    jitter = random(0, capped)   // full jitter: random in [0, cap]
    return jitter

// Example: attempt=3, base=100ms, max=30000ms
// exponential = 800ms, capped = 800ms
// jitter = random(0, 800) = e.g., 347ms

AWS's own retry guidance recommends "full jitter" — multiplying the capped backoff by a random value in [0, 1] — over "decorrelated jitter" or fixed exponential backoff for high-contention scenarios.

Prevention for cache stampede:

  • Probabilistic early expiration (PER): Instead of expiring a cache key at the exact TTL, begin probabilistically recomputing it slightly before expiry. A single request triggers the recomputation; the rest are served the still-valid cached value.
  • Request coalescing (dog-pile lock): On cache miss, only one request acquires a lock and fetches the database value. All other concurrent requests wait for that one result. Used in Redis with SETNX or in Varnish with grace mode.

🧭 Anomaly Prevention Playbook: The Cheatsheet Every Distributed Systems Engineer Needs

This table maps every anomaly in this post to its root cause, detection method, prevention pattern, and the databases or frameworks where this is most relevant.

When you encounter an anomaly in production, start by classifying it into one of the four families from the opening taxonomy. This flowchart shows the triage path from observed symptom to the correct prevention pattern.

flowchart TD
    A[Anomaly Detected in Production] --> B{Classify by Family}
    B -->|Visibility| C[Stale or torn data returned to reader]
    B -->|Ordering| D[Events appear in wrong causal order]
    B -->|Availability| E[Node falsely healthy or falsely dead]
    B -->|Integrity| F[Two nodes hold conflicting values]
    C --> C1{Stale read or torn?}
    C1 -->|Stale| C2[Bounded staleness / LSN token / Quorum read]
    C1 -->|Torn| C3[WAL atomic replay / Commit marker gating]
    D --> D1{Clock drift or async delivery?}
    D1 -->|Clock drift| D2[HLC / TrueTime / Avoid LWW]
    D1 -->|Async delivery| D3[Vector clocks / Causal middleware]
    E --> E1{Split brain or zombie?}
    E1 -->|Split brain| E2[Quorum writes / Raft terms / STONITH]
    E1 -->|Zombie leader| E3[Fencing tokens / Epoch rejection]
    F --> F1{Merge or overwrite?}
    F1 -->|Overwrite acceptable| F2[LWW with NTP discipline]
    F1 -->|Must preserve all| F3[CRDTs / Application merge]

The flowchart leads to the same prevention patterns described in the table below. Use the classification step first — it narrows the solution space from twelve options to two or three before you consult documentation.

AnomalyRoot CauseDetection SignalPrevention PatternRelevant Systems
Split brainNetwork partition isolates leader; secondary elects new leaderTwo nodes with same shard claim leadershipQuorum writes, fencing tokens, STONITH, Raft term numbersMongoDB, etcd, Redis Sentinel, PostgreSQL
Stale readsAsync replication lag; reads routed to lagging replicapg_stat_replication.write_lag; Seconds_Behind_Master; consumer lagRead-your-writes, causal tokens (LSN), bounded staleness, synchronous replicationPostgreSQL, MySQL, Cassandra, Kafka
Clock skewNTP synchronization imprecision; VM clock pausesClock offset metrics; ntpstat; chrony trackingHybrid Logical Clocks (HLC), TrueTime (Spanner), avoid wall-clock LWWCassandra (LWW), Spanner, CockroachDB
Causality violationAsync message delivery via different network pathsOut-of-order events in event log; consumer sees reply before postVector clocks, causal consistency middleware, total order broadcastRiak, DynamoDB Streams, Kafka (single partition)
Conflicting writes / divergenceMulti-leader or leaderless replication accepts concurrent writesVersion conflicts on read (sibling values); anti-entropy reportsLWW (lossy), application merge, CRDTs, optimistic lockingCassandra, DynamoDB, Riak, CouchDB
Zombie leaderGC pause or slow I/O causes leader to miss heartbeats; new leader elected; old leader resumesMultiple leaders in monitoring; duplicate writes in logsFencing tokens (monotonic token from lock service), STONITH, Raft term rejectionZooKeeper, etcd, Redis Redlock
Partial write / torn readReplication applied row-by-row; reader catches DB mid-replicationInvariant violations in consistency checksWAL-based atomic replay, logical replication with commit markers, read-your-writesPostgreSQL logical replication, MySQL binlog
Thundering herdAll clients retry simultaneously after common failure eventSynchronized retry spike in traffic metricsJittered exponential backoff, staggered restartsAll HTTP clients, gRPC
Cache stampedeHot cache key expires; all waiters hit the DB simultaneouslyDB query spike on cache expiryProbabilistic early expiration, request coalescing (dog-pile lock)Redis, Memcached, Varnish
Network partition (CP)Minority partition loses quorum; cannot serve reads/writesError spike from specific AZ/regionPartition-aware routing, graceful degradation to stale readsZooKeeper, etcd, Consul
Network partition (AP)Both partitions accept writes; diverge during partitionPost-heal anti-entropy logs show key conflictsCRDTs, version vectors, application merge on readCassandra, DynamoDB, Riak
Cascading failureSingle node failure increases load on remaining nodesLatency/error rate climbing across all replicasCircuit breakers, bulkheads, load shedding, backpressureResilience4j, Istio, Netflix Hystrix

🧪 Zombie Leaders, Fencing Tokens, and Partial Writes: The Sneakiest Production Bugs

Zombie leaders and partial writes share a common property: they pass all unit tests, generate no exceptions, and produce no error-level log messages. The data is wrong, but the system reports success.

The Zombie Leader

A node is the current leader of a shard. Under the hood, the JVM triggers a stop-the-world garbage collection. The GC pause lasts 8 seconds. During those 8 seconds, the leader has not sent heartbeats. The cluster's election timeout (5 seconds) fires. A new leader is elected.

The GC pause ends. The old leader wakes up. From its own perspective, it has been the leader continuously — it has no awareness of the 8-second gap. It begins accepting writes again, with its old lease and old epoch number.

The storage layer receives two concurrent writes for the same key: one from the new leader (epoch 6) and one from the zombie leader (epoch 5). Without a fencing mechanism, the zombie write may succeed — overwriting the new leader's write, corrupting the dataset.

The following diagram shows the zombie leader scenario and how a fencing token at the storage layer rejects the stale write. The monotonically increasing token is assigned by the lock service (ZooKeeper, etcd) at election time; the storage layer keeps track of the highest token it has seen and rejects any write below that watermark.

sequenceDiagram
    participant LS as Lock Service
    participant OL as Old Leader (token=5)
    participant NL as New Leader (token=6)
    participant DB as Storage Layer

    OL->>OL: GC pause begins (8 seconds)
    LS->>NL: Grant lock with token=6
    NL->>DB: Write key=X value=new, token=6
    DB->>DB: Stored. high_water_token=6
    OL->>OL: GC pause ends, resumes as leader
    OL->>DB: Write key=X value=stale, token=5
    DB-->>OL: REJECTED: token 5 less than high_water 6
    Note over DB: Fencing prevents zombie write

The fencing token pattern requires that every write path in the storage layer check the token before applying the write. This is why it is called "fencing" — the storage layer acts as a fence that blocks stale actors from writing.

Partial Writes and Torn Reads

A partial write is a visibility anomaly where a multi-object operation appears to have been applied only partially to a reader. It differs from a dirty read (uncommitted data) because the data has been committed — it just hasn't fully replicated yet.

Consider a funds transfer: debit account A, credit account B, write a transaction record. On the primary, this is a single atomic transaction. On an asynchronous replica, the three rows are applied in order as the replication stream delivers them.

A reader hitting the replica between the first and second row application sees account A debited (new balance $900) and account B not yet credited (old balance $100). The invariant A + B = $1,000 appears to be violated. The reader has observed a torn transaction.

Prevention:

  • Write-ahead log (WAL) replay: Replicas apply transactions atomically by replaying the WAL segment that covers the full commit. A replica only advances its read pointer after the full transaction commit record is applied. This is how PostgreSQL streaming replication works.
  • Logical replication with commit markers: Logical replication publishes a full transaction bundle, including the commit marker, before any consumer can read the rows. MySQL Group Replication and PostgreSQL logical replication both enforce this.
  • Read from standby with LSN check: Before serving a read, verify that the replica's pg_last_wal_replay_lsn() is at or beyond the LSN returned to the client at write time. Only then serve the read from the replica.

🛠️ ZooKeeper: Distributed Coordination and Fencing in Practice

ZooKeeper is a distributed coordination service built on ZAB (ZooKeeper Atomic Broadcast), a total-order broadcast protocol related to Paxos. It was designed explicitly to provide the primitives needed to implement distributed locking, leader election, and fencing tokens — the exact mechanisms that prevent split brain and zombie leader anomalies.

How ZooKeeper Leader Election Works

ZooKeeper's ephemeral sequential nodes are the building block for leader election. An ephemeral node is automatically deleted when the client session that created it disconnects. A sequential node is assigned a monotonically increasing suffix by ZooKeeper.

Each candidate for leadership creates an ephemeral sequential node under a common path:

/election/candidate-0000000001
/election/candidate-0000000002
/election/candidate-0000000003

The node with the lowest sequence number is the leader. All other candidates watch the node immediately preceding them. When the leader's session expires (because the leader died or paused), its node is deleted, the candidate with the next-lowest sequence number becomes the new leader.

The sequence number itself functions as a fencing token. Because ZooKeeper guarantees that sequence numbers are monotonically increasing and unique, the leader with sequence number 0000000002 has a higher token than the leader with sequence number 0000000001. Any storage system that rejects writes with lower sequence numbers is protected from zombie writes.

Curator: Production-Grade ZooKeeper Client

Apache Curator is the standard production library for ZooKeeper in the Java ecosystem. It wraps the low-level ZooKeeper client with recipes for leader election, distributed locks, and barrier synchronization.

// Leader election with fencing token retrieval using Apache Curator
CuratorFramework client = CuratorFrameworkFactory.newClient(
    "zk1:2181,zk2:2181,zk3:2181",
    new ExponentialBackoffRetry(1000, 3)
);
client.start();

LeaderSelector selector = new LeaderSelector(
    client,
    "/election/my-service",
    new LeaderSelectorListenerAdapter() {
        @Override
        public void takeLeadership(CuratorFramework client) throws Exception {
            // Retrieve the sequential node name as the fencing token
            String leaderPath = client
                .getChildren()
                .forPath("/election/my-service")
                .stream()
                .sorted()
                .findFirst()
                .orElseThrow();

            long fencingToken = Long.parseLong(
                leaderPath.substring(leaderPath.lastIndexOf('-') + 1)
            );

            log.info("Became leader with fencing token: {}", fencingToken);

            // Pass fencingToken to storage layer on every write.
            // Storage layer rejects writes with token < high_water_mark.
            serveAsLeader(fencingToken);
        }
    }
);

selector.autoRequeue();
selector.start();

The key principle: the fencingToken retrieved from the sequential node name is passed to the storage layer on every write. The storage layer maintains a high_water_mark and rejects any write whose token is below it. This is the full implementation of the fencing token pattern — not a theoretical construct, but four lines of ZooKeeper interaction.

For a full deep-dive on ZooKeeper internals, including ZAB consensus, snapshot-based recovery, and watch semantics, see Consensus Algorithms: Raft and Paxos for the consensus background, and the Apache ZooKeeper documentation for implementation details.


📚 Lessons Learned from Real Distributed System Failures

1. Healthy nodes can produce incorrect results. The most dangerous distributed anomalies — stale reads, clock skew conflicts, zombie writes — occur when all nodes are running and all health checks are green. Monitoring for node health is necessary but not sufficient. You must also monitor for data correctness invariants.

2. Async replication creates windows, not guarantees. "Eventual consistency" means that replicas will converge — eventually. It does not bound how long that takes or guarantee that a particular read will be fresh. If your application has correctness requirements, you must express them explicitly: read-your-writes, monotonic reads, or full linearizability. The default is not what you want.

3. Clocks are observations, not facts. Any system that uses wall-clock timestamps for conflict resolution is vulnerable to clock skew. The 2012 Amazon DynamoDB incident, the 2017 Cassandra consistency bug reports, and numerous MongoDB ticket overwrite incidents all trace to LWW on drifted clocks. Use logical clocks (vector clocks, HLC) for any conflict resolution that must be causally correct.

4. Election timeouts and GC pauses are in a race. If your election timeout is 5 seconds and your JVM's GC can pause for 8 seconds, you will produce zombie leaders. Either tune GC to bound max pause time, increase the election timeout, or use a language/runtime with bounded pause behavior. G1GC and ZGC help; they do not eliminate the problem.

5. The circuit breaker is the single most impactful distributed resilience pattern. It doesn't prevent failures. It prevents one component's failure from cascading to all components. Every service that calls a downstream dependency should have a circuit breaker on that call path.

6. Jitter is not optional. The reason AWS, Google, and Netflix all recommend jittered backoff is that synchronized retry storms have taken down production systems at all three companies. "Retry" without jitter is "synchronized attack on a recovering service."

7. CRDTs are not magic. They guarantee convergence, but the converged value may not be what your users expect. Amazon's cart "reappearing items" is not a bug — it is the correct CRDT behavior. Design your conflict resolution policy for the user experience, not just for mathematical convergence.

8. Testing distributed anomalies requires fault injection. Unit tests and integration tests do not produce split brain, clock skew, or cascading failures. Chaos engineering tools (Chaos Monkey, Gremlin, Litmus) are the only way to verify that your defenses work under the actual conditions that trigger anomalies.


📌 TLDR: The Distributed Anomaly Map

  • Split brain: network partition → two leaders → conflicting writes. Prevent with quorum writes + fencing tokens + Raft terms.
  • Stale reads: async replication lag → replica returns old data. Prevent with read-your-writes, causal LSN tokens, bounded staleness.
  • Clock skew: NTP drift → LWW resolves conflicts incorrectly. Prevent with HLC or TrueTime; avoid wall-clock conflict resolution.
  • Causality violation: async message delivery → reply seen before post. Detect with vector clocks; prevent with causal consistency or total order broadcast.
  • Conflicting writes: multi-leader/leaderless concurrent writes → divergence. Resolve with LWW (lossy), application merge, or CRDTs.
  • Zombie leader: GC pause → missed heartbeats → new election → stale leader resumes. Prevent with fencing tokens at the storage layer.
  • Partial writes: replication applied row-by-row → torn reads mid-replication. Prevent with WAL-based atomic replay and commit-marker gating.
  • Cache stampede: hot key expires → 1,000 concurrent DB hits. Prevent with probabilistic early expiration and request coalescing.
  • Thundering herd: synchronized retries after common failure. Prevent with full-jitter exponential backoff.
  • Cascading failure: single node failure amplifies load → chain of failures. Prevent with circuit breakers, bulkheads, and load shedding.

The meta-lesson: every anomaly in this post has a known, implementable solution. The challenge is not knowing which pattern exists — it's recognizing which anomaly you are dealing with in production, under time pressure, when the monitoring dashboard is showing conflicting signals.


📝 Practice Quiz: Distributed Anomaly Identification

  1. An e-commerce site runs primary-secondary replication. A user adds an item to their cart. The write succeeds. The user immediately clicks "View Cart" — the request hits a secondary that hasn't received the replication yet. Which anomaly is this, and what prevention pattern addresses it?

    • A) Split brain — fix with quorum writes
    • B) Stale read / read-your-writes violation — fix with LSN token or primary routing
    • C) Causality violation — fix with vector clocks
    • D) Clock skew — fix with HLC timestamps

    Correct Answer: B. The secondary hasn't applied the write yet, producing a stale read that violates the user's expectation of seeing their own write. Fix: include the primary's LSN in the session; the secondary waits until it has applied at least that LSN before serving the cart read. Alternatively, route all reads for a user's own writes to the primary.


  1. A Cassandra cluster uses Last Write Wins with client-provided timestamps. Server A's clock is 3 seconds ahead of Server B. A user updates their email from old@x.com to new@x.com via Server B. A stale mobile sync concurrently writes old@x.com back via Server A. After both writes, what does the cluster store?

    • A) new@x.com, because the intent of the most recent user action wins
    • B) old@x.com, because Server A's timestamp is higher due to clock skew, so its LWW entry wins
    • C) A merge of both values, stored as sibling versions
    • D) An exception is raised and neither write is committed

    Correct Answer: B. Server A's clock is 3 seconds ahead, giving the stale-mobile write a higher timestamp. LWW discards new@x.com in favor of the earlier-intent but later-timestamped old@x.com. This is the clock skew anomaly in LWW conflict resolution.


  1. A leader is elected with fencing token 42. After a 12-second GC pause, a new election fires and a new leader receives token 43. The old leader wakes up and writes to the storage layer. What happens?

    • A) The write succeeds because the old leader's lease has not yet expired
    • B) The write succeeds but is flagged for reconciliation
    • C) The write is rejected because token 42 is below the storage layer's high-water mark of 43
    • D) The write succeeds and causes split brain until the next heartbeat cycle

    Correct Answer: C. The storage layer tracks the highest token it has seen (43, set when the new leader wrote with token 43). Any write whose token is below the high-water mark is rejected. Token 42 < 43 — the zombie leader's write is fenced out.


  1. A three-node Redis Sentinel cluster has one master and two replicas. A network partition isolates the master from both replicas. Sentinel elects a new master. The original master — still serving a subset of clients — accepts writes for 90 seconds until the partition heals. Which two mechanisms would prevent data loss during this scenario?

    • A) Increasing the Sentinel quorum from 2 to 3, and enabling min-replicas-to-write 1
    • B) Enabling asynchronous replication and increasing heartbeat frequency
    • C) Using a Raft-based system (etcd or ZooKeeper) for leader election instead of Sentinel, and configuring min-replicas-to-write 1 on the original master
    • D) Increasing the TCP keepalive interval to detect the partition faster

    Correct Answer: C. min-replicas-to-write 1 causes the original master to stop accepting writes when it cannot replicate to any replica, surfacing the partition to clients rather than silently accepting conflicting writes. A Raft-based system prevents split brain by construction — a quorum of nodes must acknowledge each write, so the isolated master cannot form a quorum and halts writes automatically.


  1. Your team is designing a globally distributed inventory system. Items can be reserved from any of three regional datacenters simultaneously. Describe which CRDT types are appropriate for tracking available inventory count and explain the trade-off between using a CRDT versus using strong consistency (quorum writes). Under what business conditions would you choose each? (Open-ended — no single correct answer.)

    Discussion guidance: A PN-Counter CRDT allows decrement (reservation) operations to converge without cross-region coordination — no global lock required. The trade-off: during a replication lag window, two regions may simultaneously believe they can fulfill the last item. CRDT convergence will eventually show 0 inventory, but both reservations already succeeded — the system oversells. A quorum write prevents oversell by requiring majority acknowledgment before confirming a reservation, but adds 60–100ms of cross-region latency per reservation. Choose CRDT when the cost of an occasional oversell is low and compensable (cancel and refund). Choose quorum writes when oversell is legally or financially catastrophic (airline seats, pharmaceutical inventory, limited-edition releases).


Abstract Algorithms

Written by

Abstract Algorithms

@abstractalgorithms