Watermarking and Late Data Handling in Spark Structured Streaming
Late events are inevitable in distributed systems — watermarks tell Spark exactly when it is safe to finalize a window without waiting forever
Abstract AlgorithmsAI-assisted content. This post may have been written or enhanced with the help of AI tools. While efforts are made to ensure accuracy, the content may contain errors or inaccuracies. Please verify critical information independently.
TLDR: A watermark tells Spark Structured Streaming: "I will accept events up to N minutes late, and then I am done waiting." Spark tracks the maximum event time seen per partition, takes the global minimum across all partitions, subtracts the threshold, and uses that as the watermark. Windows whose end time falls before the watermark are finalized and evicted from the state store. Setting the threshold too short drops real data; setting it too long inflates state memory indefinitely — every production deployment must make this tradeoff consciously.
📖 The Late Payment Problem: When Waiting Forever Is Not an Option
A mobile payments company processes hundreds of thousands of transactions per second. Their fraud detection pipeline aggregates payment events by merchant and computes rolling five-minute totals to spot unusual velocity patterns. The source is a Kafka topic; the sink is a Delta Lake table consumed by a risk-scoring service.
On a quiet Tuesday afternoon, an engineer notices that the Spark Structured Streaming job has been running for six hours — far longer than the expected two. The Spark UI shows that the state store, which holds partial aggregation results for all open windows, has grown to 47 GB. The executor pods are approaching their memory limit. By midnight the job will be killed by Kubernetes, losing six hours of aggregate state.
The cause is not a code bug. The cause is that Spark does not know when it is safe to finalize a window and evict it from state. Because mobile phones can be offline, tunnels drop connections, and CDNs buffer retries, payment events can arrive minutes — or even hours — after they occurred. If Spark holds open every window until it is certain no more late events can arrive, it must hold every window forever. State grows without bound.
This is exactly the problem watermarks solve. A watermark is a contractual statement: "I acknowledge that events older than the current event time minus this threshold will be treated as too late and dropped." With that contract in place, Spark knows the precise moment it is safe to close a window, emit its result, and release its memory. Without it, the state store is a slow memory leak.
🔍 Event Time, Processing Time, and What It Means for a Window to Be "Late"
Before watermarks make sense, it is essential to understand the three different clocks that exist inside a streaming system — and why only one of them reflects what actually happened in the physical world.
Event time is the timestamp embedded in the event itself, reflecting when the action occurred. For a payment, it is the moment the cardholder tapped their phone. This is the timestamp that matters for business logic: a fraud velocity check must use event time, not arrival time, because a genuine burst of spending looks different from a delayed batch of retransmissions.
Processing time is the wall-clock time at the Spark executor when the event is processed. It always advances monotonically and is never late — but it tells you nothing about when the underlying event occurred. A payment processed at 14:05 might have a card-swipe timestamp of 13:47. Windowing on processing time would place that payment in the wrong window.
Ingestion time is a hybrid: Spark assigns a monotonic timestamp when an event first enters the streaming system (for example, when it is read from Kafka). It avoids some out-of-order issues compared to processing time, but it still does not reflect the true event timestamp.
| Clock type | Where the timestamp comes from | Monotonically increasing at Spark? | Reflects real event occurrence? |
| Event time | Inside the event payload | No — events arrive out of order | Yes — accurate to the source |
| Ingestion time | Assigned on Kafka/Spark intake | Yes — assigned on arrival | Partially — bounded by intake delay |
| Processing time | Executor wall clock | Yes — always | No — affected by queuing and lag |
For any windowed aggregation that must produce results grounded in real-world time, event time is the only correct choice. Spark Structured Streaming's withWatermark() API is designed exclusively for event-time windowing.
Window Types in Structured Streaming
Spark supports three kinds of event-time windows, each with a different definition of which events belong to which window:
- Tumbling windows are fixed, non-overlapping intervals. A 5-minute tumbling window starting at 14:00 captures events with event time in [14:00, 14:05). No event belongs to two windows. These are the most memory-efficient and are the default choice for aggregations like "total payments per merchant per 5-minute period."
- Sliding windows overlap. A 10-minute window sliding every 5 minutes means an event at 14:03 belongs to both [13:55, 14:05) and [14:00, 14:10). Each event is counted multiple times, producing smoother trend lines at the cost of higher state footprint — every event is held in multiple window entries.
- Session windows are dynamic gaps. A session window stays open while events keep arriving within a gap duration. The window closes when no new event arrives for the gap period. This is natural for user activity tracking (a "session" of user actions on an app) but produces unpredictable window sizes and is more complex to reason about for watermarking.
The withWatermark() call attaches a maximum late-arrival threshold to the event-time column. Spark will accept events up to that threshold past the current watermark and will drop events older than that threshold. Critically, withWatermark() must be called before the window aggregation for the watermark to control state cleanup.
⚙️ How Spark Tracks the Watermark and Decides When a Window Is Safe to Close
The watermark is not a static value. It advances forward as Spark observes newer event times in the data stream. Understanding how that advancement works — and what it controls — is the key to sizing thresholds correctly.
The watermark formula is simple:
current watermark = max event time seen so far − late threshold
When Spark processes each micro-batch, it scans all the event timestamps in that batch, finds the maximum, and updates its internal watermark tracker if this maximum exceeds the previous recorded maximum. The watermark then moves to (new max − threshold).
Window finalization is triggered by watermark advancement. Spark's rule is: a window is eligible to be finalized and emitted when the watermark passes the window's end time. For a tumbling window [14:00, 14:05), finalization happens when the watermark exceeds 14:05. At that point, Spark is asserting: "I have seen events with event time up to at least watermark + threshold, so any event timestamped before 14:05 that has not arrived yet is considered too late."
State store entries for each open window consist of partial aggregation results (for example, a running sum and count for groupBy(merchantId, window)). These entries accumulate in RocksDB (or the in-memory state store, depending on configuration) until the corresponding window is finalized. Finalization triggers both the result emission (in append output mode) and the deletion of that window's state entry, reclaiming memory.
Late events that arrive after their window has been finalized are silently dropped — they do not update the already-emitted result, and they are not buffered for a future retry. This is the core tradeoff: correctness (waiting for late data) vs. bounded state (evicting finalized windows). Watermarks give you a knob to tune this tradeoff rather than forcing an all-or-nothing choice.
🧠 Deep Dive into Watermark Internals and State Store Performance
The Internals of Watermark Tracking Across Partitions and Micro-Batches
The phrase "max event time seen so far" conceals an important distributed-systems subtlety. Spark does not have a single global event-time tracker. Each executor processes its own partition of the Kafka input stream independently. Events in partition 0 may be several minutes ahead of events in partition 3 if one Kafka consumer is lagging. This means there is no single canonical "max event time" — there are as many max event times as there are partitions.
Spark resolves this with a two-step protocol repeated at every micro-batch boundary:
- Per-partition max tracking. Each executor tracks the maximum event timestamp it has seen in its partition during the current micro-batch.
- Global watermark = minimum of all per-partition maxes, minus threshold. The Driver collects the per-partition maxes reported at the end of each micro-batch, takes the minimum across all of them, and subtracts the late-arrival threshold to produce the new global watermark.
Why the minimum, not the maximum or average? Because correctness requires that the watermark only advances as fast as the slowest partition. If partition 0 has seen events up to 14:10 and partition 3 is stuck at 13:55, the global max would be 14:10 — but that would be wrong. Declaring the watermark as 14:10 minus threshold would mean the engine considers windows before that time complete, even though partition 3 may still produce events that legitimately belong to those windows. Using the minimum guarantees that no partition has any remaining events older than the declared watermark: every partition has confirmed it has seen at least up to that time.
This minimum-across-partitions design has an important operational implication: a single slow or stalled partition freezes the watermark for the entire pipeline. If one Kafka partition stops producing data — due to a dead producer, a consumer group lag spike, or a network partition — its per-partition max stops advancing. The global watermark stops advancing. No windows get finalized. The state store grows unbounded. This exact failure mode is one of the most common causes of unexplained state bloat in production Spark streaming pipelines.
The watermark value is visible in the Spark UI's Streaming tab under the "Watermark" column for each micro-batch. The eventTime.watermark metric in the streaming query progress JSON is the canonical programmatic way to monitor watermark advancement.
Performance Analysis: State Store Size, Threshold Sensitivity, and Monitoring Watermark Lag
The late-arrival threshold directly controls the size of the state store. A 10-minute threshold means Spark must keep all window entries whose end time is within 10 minutes of the current watermark alive, because late events for those windows may still arrive. A 60-minute threshold keeps all window entries from the last 60 minutes in state.
For a pipeline with 5-minute tumbling windows and 10 executions per minute, a 60-minute threshold means the state store holds roughly 12 active windows per aggregation key simultaneously. If the pipeline has 100,000 unique merchant IDs, that is 1.2 million state entries — each holding partial aggregation data. Multiply by RocksDB's per-entry overhead and the state store can easily consume tens of gigabytes of executor memory.
| Threshold | Active windows per key (5-min tumbling) | Memory pressure | Late-data coverage |
| 2 minutes | ~1 window | Very low | Only very minor delays (fast mobile networks) |
| 10 minutes | ~2–3 windows | Low | Typical mobile retries, brief outages |
| 30 minutes | ~6 windows | Moderate | Extended connectivity loss (subway tunnels, roaming) |
| 60 minutes | ~12 windows | High | Long-haul retransmission, offline-first apps |
| 4 hours | ~48 windows | Very high — often unsustainable | Batch uploads from disconnected devices |
The single most impactful performance concern is the stalled-partition problem. A Kafka topic with 50 partitions where one producer goes silent causes the global watermark to freeze at that partition's last-seen event time. The remaining 49 partitions continue generating fresh events, but no windows get finalized because the watermark does not advance. In practice, this means:
- Monitor watermark lag with the
eventTime.watermarkmetric in streaming query progress events. - Set up an alert when watermark lag (current wall time minus current watermark) exceeds two to three times the expected late-arrival threshold.
- Consider whether truly dead partitions should be treated as absent (not stalling), which requires careful Spark configuration or custom watermark strategies.
The state store backend also affects memory scaling behavior. The default in-memory HDFSBackedStateStoreProvider serializes all state to HDFS on every micro-batch checkpoint. RocksDB-backed state (RocksDBStateStoreProvider) is generally more efficient for large state footprints because it stores state on local SSD with compaction, rather than re-serializing the entire state to HDFS every batch. For pipelines with more than a few million state entries, RocksDB is strongly preferred.
📊 How Events Flow Through Watermark Tracking to Window Finalization
The following diagram traces the complete path an event takes from Kafka partition arrival through per-partition tracking, global watermark computation, window finalization, and state eviction. Each step is a distinct responsibility in the Spark internals.
graph TD
KafkaP0[Kafka Partition 0 - max event time 14:12]
KafkaP1[Kafka Partition 1 - max event time 14:09]
KafkaP2[Kafka Partition 2 - max event time 14:11]
Executor0[Executor 0 - tracks local max 14:12]
Executor1[Executor 1 - tracks local max 14:09]
Executor2[Executor 2 - tracks local max 14:11]
Driver[Driver - collects per-partition maxes]
GlobalMin[Global Min = 14:09 with threshold 5 min]
Watermark[New Watermark = 14:04]
StateStore[State Store - scan for finalizable windows]
FinalizeWindow[Finalize windows ending before 14:04]
EmitResult[Emit window results in append mode]
EvictState[Evict finalized state entries]
DropLate[Drop late events with event time before 14:04]
KafkaP0 --> Executor0
KafkaP1 --> Executor1
KafkaP2 --> Executor2
Executor0 --> Driver
Executor1 --> Driver
Executor2 --> Driver
Driver --> GlobalMin
GlobalMin --> Watermark
Watermark --> StateStore
StateStore --> FinalizeWindow
FinalizeWindow --> EmitResult
FinalizeWindow --> EvictState
Watermark --> DropLate
Read this diagram from top to bottom: three Kafka partitions deliver events processed by three executors, each tracking its local maximum event time. The Driver collects these maxes and takes the global minimum (14:09 — the slowest partition sets the pace). Subtracting the 5-minute threshold gives a watermark of 14:04. The state store is scanned for windows ending before 14:04; those windows are finalized, their results are emitted, and their entries are evicted. Any event arriving with an event time before 14:04 is dropped as too late.
The critical insight from this diagram is the bottleneck at the Global Min step: Executor 1's lagging partition (14:09) — not the fastest partition (14:12) — controls the entire pipeline's watermark and therefore the entire pipeline's state retention and output latency.
🌍 Where Watermarks Shape Real Production Streaming Pipelines
Tumbling-window payment aggregations with withWatermark. The canonical use case: aggregate payment events by merchant in 5-minute tumbling windows to feed a fraud detection model. A 10-minute watermark threshold accommodates typical mobile network retries and brief signal loss. Windows are finalized 15 minutes after their start time in steady-state operation (5-minute window duration plus 10-minute threshold): a healthy trade-off between latency and completeness.
Out-of-order Kafka messages from multi-region producers. When a payment gateway in São Paulo writes events to a Kafka cluster in us-east-1, replication lag and producer retry logic mean events from the same transaction can arrive in any order. A 2-minute watermark threshold is usually sufficient here because the skew comes from network transit time, not device-level offline buffering. The watermark prevents unbounded state without sacrificing legitimate near-real-time events.
Session-window user activity tracking. An analytics pipeline tracking app sessions uses session windows with a 30-minute inactivity gap. A user who opens the app, backgrounds it for 20 minutes, and then makes a purchase should have all three events in the same session. A watermark threshold of 10 minutes ensures that delayed session-end events — which are often last in the retry queue — are still captured before the session window closes. Combining session windows with watermarks is complex because session boundaries are dynamic, but Spark 3.2+ handles this correctly when withWatermark is called before the session window groupBy.
Combining watermarks with output modes. The output mode determines which rows Spark writes on each micro-batch, and not all modes are compatible with watermarks:
| Output mode | Watermark required? | Behavior | Use case |
| Append | Yes — required for aggregations | Only emits rows for finalized windows (watermark has passed window end) | Low-latency final results; immutable downstream sinks (Delta Lake, Parquet) |
| Update | No — but watermark still cleans state | Emits rows whenever a window's value changes; re-emits updated rows | Mutable downstream sinks (databases, Cassandra); real-time dashboards |
| Complete | Not compatible with watermark cleanup | Re-emits the entire result table on every batch | Small result sets only; no watermark-based state eviction |
For the fraud pipeline, append mode with watermark is the correct choice: results are written once per finalized window, exactly, with no re-emission. For a real-time dashboard showing running totals, update mode is more appropriate even though it requires the downstream system to handle updates.
Watermarks with stream-stream joins. When joining two streaming DataFrames (for example, matching payment events with fraud-check response events), Spark requires both streams to have a watermark. The engine maintains a state buffer for unmatched rows from each side, using the watermarks to bound how long it keeps waiting for a matching event from the other stream. Without watermarks on both sides, stream-stream joins are prohibited in append output mode — Spark cannot bound the join state.
⚖️ The Completeness-vs-Latency Tradeoff and Where Output Mode Fits In
Every watermark threshold decision is a point on a single spectrum:
Short threshold (1–5 minutes):
- Watermark advances quickly → windows finalize sooner → state is small → output latency is low.
- Any event arriving more than the threshold after the current watermark is silently dropped.
- Appropriate when: events are generated by infrastructure components on reliable networks (micro-services, IoT on enterprise WiFi), late data is genuinely unexpected, and low-latency results matter more than 100% completeness.
- Risk: a brief network outage or producer restart can cause a spike of legitimate events to arrive past the threshold and be dropped with no error — data loss is silent.
Long threshold (30–120 minutes):
- Watermark advances slowly → windows stay open much longer → state is large → output latency is high.
- Very few events are dropped; the pipeline absorbs extended connectivity loss.
- Appropriate when: events come from mobile devices with unreliable connectivity, compliance requirements demand high completeness, and the downstream system can tolerate result latency.
- Risk: state memory consumption can grow into tens or hundreds of gigabytes. A single stalled Kafka partition freezes the watermark and causes unbounded state growth until the partition recovers.
Append vs. Update output mode: In append mode, consumers see each window's result exactly once, after finalization — this is clean but adds the threshold duration to visible output latency. In update mode, consumers see incremental updates to window values as events arrive, enabling a real-time view, but the downstream system must handle overwrites and the state store does not evict windows as aggressively.
Inner join vs. outer join behavior with watermarks: For inner stream-stream joins, Spark emits matched rows as soon as a match is found and uses watermarks to eventually clean unmatched state. For outer joins, Spark must emit null-padded rows for unmatched events after the watermark passes the join's time range — this requires careful watermark alignment between the two joined streams. A watermark mismatch (one stream significantly slower than the other) can block outer join output entirely until the slower stream catches up.
🧭 Choosing Your Watermark Threshold: Latency Requirement Versus Late Data Tolerance
Use this table as a starting point. Measure your actual late-event distribution from Kafka consumer lag metrics before committing to a threshold in production.
| Latency requirement | Expected late-data range | Recommended threshold | Output mode | Notes |
| Real-time ( < 30 seconds) | < 30 seconds | 30 seconds – 2 minutes | Update | Accept small correctness loss for maximum freshness |
| Near real-time (1–5 minutes) | < 5 minutes | 5–10 minutes | Append | Standard mobile app pipeline on good networks |
| Analytical (5–15 minutes) | 5–30 minutes | 15–30 minutes | Append | Mobile payments, consumer IoT, retail POS |
| Batch-equivalent (> 15 minutes) | Up to 2 hours | 60–120 minutes | Append | Offline-first apps, long-haul trucking telemetry |
| Compliance-grade (any) | Unknown / long tail | 4–24 hours + fallback batch layer | Append + Lambda reconciliation | Regulatory reporting; combine with a nightly batch correction job |
The compliance-grade row is important: when business requirements demand both real-time output and guaranteed completeness, a single watermark threshold cannot satisfy both. The standard solution is a Lambda architecture: the streaming pipeline with a conservative threshold provides near-real-time results, and a nightly batch job re-processes the raw events with exact correctness to produce the official record.
For any pipeline where you are unsure about the late-event distribution, start with a histogram of (kafka_consumer_read_time - event_timestamp) across a representative one-week sample. The 99th percentile of that distribution is your minimum threshold; the 99.9th percentile is your safety margin.
🧪 Walking Through Three Micro-Batches: What Gets Finalized and What Gets Dropped
This walkthrough uses a simple scenario: a tumbling 5-minute window on payment events, with a 10-minute watermark threshold. Each micro-batch processes a new set of events. The goal is to show concretely how the watermark advances and which windows get finalized.
Setup: Window size = 5 minutes. Watermark threshold = 10 minutes. Output mode = Append.
Timeline table for reference:
| Window label | Window interval | Finalizes when watermark exceeds |
| W1 | [14:00, 14:05) | 14:05 |
| W2 | [14:05, 14:10) | 14:10 |
| W3 | [14:10, 14:15) | 14:15 |
Micro-batch 1 — Events arrive with timestamps: 14:03, 14:07, 14:08, 14:12.
- Max event time seen = 14:12.
- New watermark = 14:12 − 10 min = 14:02.
- 14:02 does not exceed any window end time (W1 ends at 14:05). No windows finalized.
- State store: W1 holds the 14:03 event. W2 holds 14:07, 14:08. W3 holds 14:12.
- Output emitted: none (no window has been finalized yet).
Micro-batch 2 — Events arrive with timestamps: 14:14, 14:16, 14:03 (late!).
- Max event time seen = 14:16.
- New watermark = 14:16 − 10 min = 14:06.
- 14:06 exceeds W1's end time (14:05). W1 is finalized.
- The late event at 14:03 has event time 14:03 < watermark 14:06: it is dropped. W1 will not be updated with this event.
- W2 (ends 14:10) and W3 (ends 14:15) are still open: 14:06 < 14:10.
- State store: W1 evicted. W2 holds 14:07, 14:08. W3 holds 14:12, 14:14, 14:16.
- Output emitted: W1 result (aggregate of the 14:03 event only, because 14:07/14:08 belong to W2).
Micro-batch 3 — Events arrive with timestamps: 14:22, 14:23.
- Max event time seen = 14:23.
- New watermark = 14:23 − 10 min = 14:13.
- 14:13 exceeds W2's end time (14:10). W2 is finalized.
- W3 (ends 14:15) is still open: 14:13 < 14:15. Not yet finalized.
- State store: W2 evicted. W3 holds 14:12, 14:14, 14:16. W4 [14:20, 14:25) holds 14:22, 14:23.
- Output emitted: W2 result (aggregate of 14:07, 14:08).
Key takeaways from this walkthrough:
- The 14:03 event in Micro-batch 2 was silently dropped — it arrived after the watermark had passed W1's end time. In a production fraud pipeline, this means that transaction was not counted in W1's aggregate.
- W3 is still open at the end of Micro-batch 3 even though the watermark is at 14:13 — because 14:13 < 14:15 (W3's end time). It will not finalize until Micro-batch 4 produces events pushing the watermark past 14:15.
- There is always a lag between the last event's time and when its window is emitted: in steady state, results appear approximately
window_duration + thresholdafter the window's start time.
🛠️ Spark Configuration Reference for Watermark and State Store Tuning
The following configuration options directly control watermark propagation behavior, state store backend, and correctness guarantees. These belong in spark-submit or SparkSession configuration — not in application code.
# Enable stateful operation correctness checks during development.
# Validates that withWatermark() is called before stateful operations.
# Disable in production for performance; enable during testing.
spark.sql.streaming.statefulOperator.checkCorrectness.enabled=true
# Switch the state store to RocksDB for large-state pipelines.
# Default is HDFSBackedStateStoreProvider (in-memory, HDFS checkpoint).
# RocksDB is preferred when state exceeds a few hundred MB per executor.
spark.sql.streaming.stateStore.providerClass=org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider
# Maximum delay allowed in watermark propagation across microbatches.
# If a micro-batch takes longer than this threshold to propagate a watermark
# update, Spark logs a warning. Useful for diagnosing stalled-watermark issues.
# Default: 0 (no threshold). Set to your SLA, e.g., 5 minutes.
spark.sql.streaming.watermarkPropagationDelayThreshold=5 minutes
# Number of micro-batches between state store maintenance operations
# (compaction, cleanup). Lower values keep state size smaller at the cost
# of more frequent compaction overhead.
spark.sql.streaming.stateStore.maintenanceInterval=60s
# RocksDB-specific: bound the in-memory write buffer size per state store.
# Larger values reduce compaction frequency but increase memory footprint.
spark.sql.streaming.stateStore.rocksdb.writeBufferSizeMB=64
# Enable changelog checkpointing for RocksDB state store.
# Writes incremental changes rather than full snapshots, reducing
# checkpoint I/O by 50–90% for large state stores.
spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled=true
Monitoring watermark lag programmatically. Every streaming query progress event — available via query.lastProgress in Python or through the StreamingQueryListener interface — contains an eventTime map with the current watermark timestamp. Subtract the watermark from the current wall clock time to get the watermark lag. Alert if this lag exceeds two to three times your configured threshold: it almost always means a stalled Kafka partition.
For a full deep-dive on Spark Structured Streaming trigger modes, micro-batch internals, and the continuous processing engine, see the Apache Spark Engineering series roadmap.
📚 Hard-Won Lessons from Production Watermark Deployments
Watermark threshold should be set from data, not intuition. The most common mistake is choosing a round number — "let's use 10 minutes" — without measuring actual event delay distributions. Pull a week of Kafka consumer offset timestamps versus event payload timestamps, build a histogram, and look at the 99th percentile. That number is your minimum viable threshold. Add 30–50% as a safety margin. Intuitive guesses are almost always too short for mobile apps and too long for infrastructure telemetry.
A single stalled partition is a state leak, not a pipeline failure. Kafka producers go silent for legitimate reasons (deployment restarts, maintenance windows). When they do, the global watermark freezes, no windows finalize, and the state store grows. Because no error is thrown and the job continues processing data from other partitions, this failure mode is invisible until executor memory approaches its limit — typically hours later. Instrument watermark lag as a first-class SLO metric. Treat a watermark lag greater than twice the threshold as an incident, not a curiosity.
Append mode adds hidden latency that surprises downstream consumers. In append mode, a window's result is visible window_duration + threshold after the window's start time. A 5-minute window with a 10-minute threshold does not appear in the output table until 15 minutes after the window opened. Downstream teams expecting near-real-time data are often surprised by this. Document the output latency explicitly in your data catalog and consider whether update mode is more appropriate for interactive use cases.
withWatermark() must come before the window aggregation, not after. A common coding error is calling withWatermark() after groupBy().agg(). This does not attach the watermark to the aggregation — the aggregation runs without watermark-based state cleanup. The resulting query may appear to work correctly in testing (state never grows large enough to matter) but will slowly run out of memory in production. With spark.sql.streaming.statefulOperator.checkCorrectness.enabled=true, Spark will raise an AnalysisException at query-plan time if the ordering is wrong.
Outer stream-stream joins need both streams to have aligned watermarks. Teams building enrichment joins (for example, joining payment events with a slowly-updating merchant-metadata stream) often watermark the fast stream but forget the slow stream, or set thresholds so different that the join state becomes effectively unbounded. Both streams must have watermarks, and the thresholds must be chosen so the engine can determine when unmatched rows from each stream can be safely dropped.
📌 TLDR & Key Takeaways
TLDR: A watermark is Spark's explicit bound on late-event tolerance. It advances as
global min of per-partition max event times − threshold, and triggers window finalization and state eviction once it passes a window's end time. Short thresholds mean low memory and low latency but more dropped data; long thresholds mean high completeness but expensive state. A single stalled Kafka partition freezes the global watermark — monitor watermark lag as a first-class metric.
- Event time is the only correct clock for business-logic windows. Processing time and ingestion time produce incorrect window boundaries for out-of-order event sources.
- Watermark = global min(per-partition max event time) − threshold. The global minimum ensures correctness across all partitions. The slowest partition controls the pipeline's state lifecycle.
- Windows finalize when the watermark exceeds the window end time. Late events arriving after finalization are silently dropped — no error, no retry.
- State store size scales linearly with threshold duration. A 60-minute threshold for 5-minute tumbling windows keeps 12 window entries per aggregation key in state simultaneously.
- Use RocksDB state store for large-state pipelines (
spark.sql.streaming.stateStore.providerClass=RocksDBStateStoreProvider). The default in-memory provider is unsuitable for state stores exceeding a few hundred MB. - Append mode adds
window_duration + thresholdlatency to visible results. Plan downstream data contracts around this latency, or use Update mode for interactive read paths. - Stalled Kafka partitions freeze the watermark. Alert on watermark lag > 2× threshold before the state store exhausts executor memory.
- Measure your actual event delay distribution before choosing a threshold. Use the 99th percentile of
(read_time − event_time)as the minimum threshold, and add a 30–50% safety margin.
📝 Practice Quiz
Test your understanding of watermarks, late-data handling, and state management in Spark Structured Streaming. Questions 1–4 have explicit answers; question 5 is open-ended.
- A pipeline has a 5-minute tumbling window and a 10-minute watermark threshold. The current micro-batch's maximum event time is 14:25. What is the current watermark, and which window (if any) is eligible for finalization in this micro-batch?
Show answer
Correct Answer: Current watermark = 14:25 − 10 minutes = 14:15. A window is eligible for finalization when the watermark exceeds the window's end time. The window [14:05, 14:10) has an end time of 14:10, and 14:15 > 14:10, so it is eligible for finalization. The window [14:10, 14:15) has an end time of 14:15, and 14:15 is not strictly greater than 14:15 — it is equal — so it is NOT yet finalized. The window [14:15, 14:20) has an end time of 14:20 > 14:15, so it is also still open.- A Structured Streaming job reads from a Kafka topic with 20 partitions. Partition 7 stops producing data at 14:00 because its upstream producer went down. All other 19 partitions continue producing events with timestamps advancing normally. At 14:30, what is the approximate watermark, assuming a 5-minute threshold?
Show answer
Correct Answer: Approximately 13:55 (14:00 − 5 minutes), assuming the watermark was near real-time at 14:00 when partition 7 stalled. After 14:00, partition 7's per-partition max event time stops advancing (it stays at approximately 14:00). The global watermark = min(all per-partition maxes) − threshold = 14:00 − 5 minutes = 13:55. It does not advance regardless of how far ahead the other 19 partitions move. At 14:30, the watermark is still approximately 13:55, no windows have been finalized since 14:00, and the state store has been accumulating unfinalized window entries for 30 minutes.- A data engineer writes a Structured Streaming query in append output mode without calling
withWatermark()on the event-time column before thegroupBy().agg()operation. The query runs without errors. What happens to the state store over time, and why?
Show answer
Correct Answer: The state store grows without bound and will eventually exhaust executor memory and crash the job. Without a watermark, Spark has no mechanism to determine when a window is complete. No window end time can ever be declared safe to finalize, so no state entries are ever evicted. The state store accumulates one entry per unique (key, window) combination for every window that has ever received at least one event. In a long-running pipeline, this is unbounded. Spark does not throw an error because the query is syntactically valid — watermarking is optional from the API perspective. Thespark.sql.streaming.statefulOperator.checkCorrectness.enabled=true configuration will raise an AnalysisException during query planning if withWatermark() is missing before a stateful aggregation with append mode.
- A pipeline uses a 5-minute tumbling window with a 30-minute watermark threshold and append output mode. A product manager asks why the fraud dashboard shows data that is always 35 minutes behind real time. What is the exact cause, and is this behavior correct?
Show answer
Correct Answer: Yes, this is correct and expected behavior for append mode with a watermark. In append mode, a window's result is only emitted after it has been finalized — and a window is finalized only when the watermark passes its end time. The watermark lags the latest event time by the threshold (30 minutes). A window that ends at time T is finalized when the watermark exceeds T, which happens when the latest event time reaches T + 30 minutes — roughly 30 minutes after the window closes. Adding the 5-minute window duration, results for a given interval are visible approximately 35 minutes after the interval's start. To reduce this latency, the watermark threshold must be reduced. To eliminate it entirely at the cost of re-emitting updated rows, use update output mode instead.- (Open-ended challenge) Your streaming pipeline must satisfy two requirements that appear contradictory: (a) the fraud alert must appear within 60 seconds of a suspicious transaction, and (b) the daily compliance report must account for 100% of transactions including those arriving up to 4 hours late. Evaluate whether a single watermarked Structured Streaming pipeline can satisfy both requirements simultaneously, and outline an architectural approach if it cannot.
Show answer
Correct Answer: No single watermarked Structured Streaming pipeline in append mode can satisfy both simultaneously. Requirement (a) demands a watermark threshold of approximately 60 seconds or less to produce results within 60 seconds (window_duration + threshold ≈ 60 seconds). Requirement (b) demands a watermark threshold of at least 4 hours to hold state open for late events. These are mutually exclusive: a 60-second threshold drops events arriving more than 60 seconds late, breaking requirement (b); a 4-hour threshold adds 4-hour output latency, breaking requirement (a). The standard architectural pattern is Lambda: (1) A real-time Spark Structured Streaming pipeline with a short watermark (60–120 seconds) feeds the fraud alerting system with low-latency approximate results. (2) A scheduled batch job (or a second streaming pipeline with a 4-hour watermark) re-processes raw events daily and writes the authoritative, complete totals to the compliance reporting table. The compliance report uses the batch layer's output; the fraud alert uses the speed layer's output. The two outputs serve different consumers with different correctness requirements and should never be sourced from the same pipeline.🔗 Related Posts
- Spark Architecture: Driver, Executors, DAG Scheduler, and Task Scheduler Explained — How Spark's internal components coordinate distributed execution: the Driver, DAGScheduler, TaskScheduler, and Executor memory model that underpin all Structured Streaming micro-batch processing.
- Spark Shuffles and GroupBy Performance: What Actually Happens Between Stages — The shuffle layer is where stateful streaming aggregations pay their highest cost. Understanding shuffle mechanics — map files, fetch patterns, and skew — is essential for tuning watermarked groupBy pipelines.
- Apache Spark for Data Engineers: RDDs, DataFrames, and Structured Streaming — The companion foundations post covering the DataFrame API, lazy evaluation, the Catalyst optimizer, and the Structured Streaming programming model that watermarks extend.

Written by
Abstract Algorithms
@abstractalgorithms
More Posts
Spark Structured Streaming: Micro-Batch vs Continuous Processing
📖 The 15-Minute Gap: How a Fraud Team Discovered They Needed Real-Time Streaming A fintech team runs payment fraud detection with a well-tuned Spark batch job. Every 15 minutes it reads a day's worth of transaction events from S3, scores them agains...
Stateful Aggregations in Spark Structured Streaming: mapGroupsWithState
TLDR: mapGroupsWithState gives each streaming key its own mutable state object, persisted in a fault-tolerant state store that checkpoints to object storage on every micro-batch. Where window aggregations assume fixed time boundaries, mapGroupsWithSt...
Shuffles in Spark: Why groupBy Kills Performance
TLDR: A Spark shuffle is the most expensive operation in any distributed job — it moves every matching key across the network, writes temporary sorted files to disk, and forces a hard synchronization barrier between every upstream and downstream stag...
