Spark Structured Streaming: Micro-Batch vs Continuous Processing
Treats streams as unbounded tables โ the execution model determines whether your pipeline hits milliseconds or minutes.
Abstract AlgorithmsAI-assisted content. This post may have been written or enhanced with the help of AI tools. While efforts are made to ensure accuracy, the content may contain errors or inaccuracies. Please verify critical information independently.
๐ The 15-Minute Gap: How a Fraud Team Discovered They Needed Real-Time Streaming
A fintech team runs payment fraud detection with a well-tuned Spark batch job. Every 15 minutes it reads a day's worth of transaction events from S3, scores them against a model, and writes flagged payments to a PostgreSQL alerts table. It works. The false-positive rate is low, the job completes in under three minutes, and the pipeline has been stable for months.
Then the fraud team reviews a case where a stolen card was used 47 times over 22 minutes before anyone noticed. The batch job had run twice during that window and processed none of the suspicious transactions because they had arrived after the last S3 snapshot was written. By the time the next batch job ran, the account was drained.
The team's instinct โ run the batch job more frequently, say every 30 seconds โ runs into a hard wall. Spark batch jobs carry substantial scheduler overhead: the driver allocates resources, builds an execution plan, stages data, and tears down the job. Firing a full batch cycle every 30 seconds against a Kafka topic is operationally expensive and unreliable. What the team actually needs is a runtime that keeps executors alive, tracks how far it has read, and continuously processes new data as it arrives.
That is exactly the problem Spark Structured Streaming was designed to solve. It delivers sub-second latency on streaming data using the same DataFrame API the team already knows, while providing the fault tolerance and exactly-once guarantees that production fraud detection demands. The key mental model shift is deceptively simple: stop thinking about files and batches. Start thinking about an infinitely growing table where new rows keep arriving, and your query runs continuously over its ever-expanding contents.
๐ Streams as Unbounded Tables: The Core Abstraction of Structured Streaming
Structured Streaming reframes how engineers think about data in motion. In the traditional batch model, a dataset is a finite, bounded table: you read it all, compute results, and write output. In Structured Streaming, a data stream is modelled as an unbounded table โ a table that never stops growing because new rows are constantly being appended by the source (a Kafka topic, a file directory, a socket).
Reading and Writing a Stream
To consume a stream, you use readStream on a SparkSession. This produces a DataFrame backed by a DataStreamReader, which abstracts the specifics of the source (Kafka offsets, file modification timestamps, socket connections) behind a standard DataFrame interface. You write query transformations โ filters, joins, aggregations โ exactly as you would in batch mode.
To produce output, you call writeStream on the result DataFrame, which produces a DataStreamWriter. This is where you declare the sink (Delta Lake, Kafka, console, memory), the checkpoint location, the output mode, and the trigger.
Trigger Modes: When Does Spark Check for New Data?
The trigger mode controls the cadence at which Spark asks "is there new data to process?" Available modes are:
| Trigger Mode | Behaviour | Best For |
| Default (unset) | Processes a new micro-batch as soon as the previous one finishes | Maximum throughput, latency ~seconds |
ProcessingTime("30 seconds") | Waits at least 30s between micro-batches | Controlled latency with predictable load |
Once | Processes all available data in one batch, then stops | Scheduled batch-style incremental loads |
AvailableNow | Processes all available data across multiple micro-batches, then stops | Idempotent incremental ETL with backpressure |
Continuous("1 second") | Experimental continuous processing engine, checkpoint every 1s | Ultra-low latency, limited operator support |
Output Modes: What Gets Written to the Sink?
Output mode determines which rows Spark emits to the sink each trigger:
- Append โ only newly completed rows are written. Safe for stateless transformations. The default for sinks like Kafka and Delta Lake.
- Complete โ the entire result table is rewritten every trigger. Required for global aggregations (e.g., total counts across all time). Memory-intensive.
- Update โ only rows that changed since the last trigger are written. Efficient for aggregations with many stable groups. Not all sinks support it.
The combination of trigger mode and output mode is one of the most consequential design decisions in a Structured Streaming pipeline. A poorly chosen pair โ for example, Complete mode on a high-cardinality aggregation with a fast trigger โ can exhaust memory within minutes.
โ๏ธ Inside MicroBatchExecution: How Every Trigger Fires a Batch Job
The default and most commonly used execution engine in Structured Streaming is MicroBatchExecution. Understanding it moves you from "I follow the docs" to "I can debug production incidents."
The StreamExecution Thread
When you call writeStream.start(), Spark starts a dedicated StreamExecution thread for the query. This thread runs in a loop and is managed by StreamingQueryManager, a singleton component in the SparkSession that tracks all active streaming queries, enforces resource policies, and provides the awaitAnyTermination() API.
The loop proceeds in four phases for every trigger interval:
Poll sources for new offsets.
MicroBatchExecutioncalls each source (e.g.,KafkaSource) to ask: "what is the latest available offset?" This is a lightweight metadata call โ no data is read yet. The result is a map of{partition โ offset}.Write the planned end offsets to the offset log. Before any data is processed, Spark durably commits the end offsets to the offset log (a write-ahead log stored in
<checkpointDir>/offsets/). This is the fault-tolerance anchor: if the job crashes before completing this micro-batch, the end offsets are never committed, and on restart Spark re-plans from the last committed offsets.Execute the batch. Spark constructs a standard Spark batch job over the bounded range
[lastCommittedOffset, newEndOffset]. This is a normal Catalyst-optimized DataFrame execution. All the familiar concepts โ physical plan, shuffle partitions, speculative execution โ apply in full here. The streaming aspect is simply that the data range is freshly computed on every trigger.Write committed offsets to the commit log. After the batch job completes successfully, the commit is recorded in
<checkpointDir>/commits/. On restart, Spark checks both logs: if offsets are in the offset log but not the commit log, the last micro-batch is replayed from scratch using exactly the same offset range, achieving exactly-once processing when the sink is also idempotent.
Checkpointing and Exactly-Once Semantics
The checkpoint directory is the cornerstone of fault tolerance. Its structure is:
<checkpointDir>/
offsets/ # WAL: planned end offsets per micro-batch epoch
commits/ # WAL: committed epochs after successful batch
state/ # State store data for stateful operators (windows, dedup, joins)
metadata # Query ID, Spark version, config snapshot
The combination of the offset WAL and an idempotent sink delivers exactly-once semantics. For Delta Lake, idempotency comes from transactional writes keyed on the batch ID. For Kafka, idempotency requires enabling Kafka's idempotent producer mode. For console or non-transactional file sinks, you get at-least-once at best.
Stateful Operations and the State Store
Aggregations with time windows, stream-stream joins, and deduplication all require Spark to maintain state across micro-batches. This state is managed by the StateStore, which is an in-memory key-value store backed by RocksDB (via RocksDBStateStoreProvider) or the default in-memory provider. The state is periodically snapshotted to HDFS or S3 under <checkpointDir>/state/ so that it survives executor failures and application restarts.
A watermark โ declared with withWatermark("eventTime", "10 minutes") โ tells Spark how long to wait for late data before finalising a window. Once a window is past the watermark, its state is evicted and memory is reclaimed.
๐ง How Structured Streaming Guarantees Correctness at Scale
The Internals of MicroBatchExecution
MicroBatchExecution extends StreamExecution, which is the base class for all Spark streaming execution engines. The object hierarchy is worth knowing when reading stack traces or Spark source code:
StreamingQueryManager (1 per SparkSession)
โโโ StreamingQuery (1 per active stream)
โโโ StreamExecution (abstract)
โโโ MicroBatchExecution (default engine)
โโโ ContinuousExecution (experimental engine)
Epoch-based offset tracking is how Structured Streaming achieves determinism. Each micro-batch is assigned a monotonically increasing epoch number (the batch ID). The offset log and commit log are both indexed by this epoch number. On a restart, Spark finds the highest committed epoch (from commits/), determines the last planned end offset for that epoch (from offsets/), and starts planning the next batch from that exact position. There is no reliance on wall-clock time, no sampling, and no approximate "where did we leave off?" guesswork.
The WAL (write-ahead log) for offsets is written using HDFSMetadataLog, a Spark class that writes atomic JSON blobs to HDFS or S3-compatible storage. Each file in offsets/ is named by its epoch number and contains the offset ranges for all sources in that micro-batch. Reading the entire log directory gives you a complete, ordered history of every offset range ever processed โ invaluable for debugging missed data or duplicate writes.
The state store adds complexity for stateful queries. When you restart a job from a checkpoint, Spark must reload the state store snapshot that matches the last committed epoch. If the state snapshot and the offset log are inconsistent (e.g., a partial state write occurred before a crash), Spark rolls back to the last consistent state snapshot. This is why the spark.sql.streaming.stateStore.providerClass setting matters: the default in-memory provider loses state on executor failure, while RocksDBStateStoreProvider persists state to disk and to the checkpoint store.
Performance Analysis: Tuning Trigger Intervals for Production Pipelines
Trigger interval tuning is one of the most misunderstood aspects of Structured Streaming operations. Engineers often assume that a shorter trigger interval always means lower latency โ but this is only true when processing time is shorter than the trigger interval.
The processing time / trigger interval relationship:
| Scenario | Trigger Interval | Processing Time | Observed Latency | Risk |
| Healthy | 10s | 3s | ~10s | None โ idle time between batches |
| Borderline | 10s | 9s | ~10s | Scheduler overhead can push into next interval |
| Overloaded | 10s | 15s | ~15s+ | Backlog builds; continuous lag increase |
| Misconfigured | 1s | 8s | ~8s | Every batch queues; memory pressure builds |
When processing time > trigger interval, Spark does not run multiple batches in parallel โ it simply fires the next batch as soon as the previous one finishes (default trigger behaviour). The trigger interval becomes irrelevant, and the true latency floor is the processing time itself. No amount of trigger tuning fixes an under-resourced cluster.
Key metrics to watch in the Spark Streaming UI:
processingTime(ms): time to execute the batch job, excluding scheduling overheadtriggerExecution(ms): total wall clock time for the trigger cycle, including schedulinginputRowsPerSecond: the rate at which records arrive from the sourceprocessedRowsPerSecond: the rate at which records are processed by the engine
When inputRowsPerSecond consistently exceeds processedRowsPerSecond, the pipeline is falling behind. The correct remedies are: increase the number of executors, increase shuffle partitions, or redesign expensive stateful operations โ not decrease the trigger interval.
Backpressure in Structured Streaming is not automatic the way it is in Apache Flink. Spark does not throttle the source based on downstream pressure. You must configure maxOffsetsPerTrigger (for Kafka) or maxFilesPerTrigger (for file sources) to bound the input volume per micro-batch and prevent runaway memory growth during lag recovery.
๐ The End-to-End Streaming Pipeline: From Kafka to Sink
The following diagram shows how data flows through a Structured Streaming pipeline backed by Kafka and writing to Delta Lake. Each component maps to a concrete class or abstraction in Spark's codebase.
graph TD
A[Kafka Topic - Source] --> B[KafkaSource - offset polling]
B --> C[MicroBatchExecution - trigger loop]
C --> D[Offset WAL - offsets dir]
D --> C
C --> E[Spark Batch Job - Catalyst plan]
E --> F[State Store - windows and dedup]
F --> G[DataStreamWriter - writeStream]
G --> H[Delta Lake Sink - transactional write]
H --> I[Commit WAL - commits dir]
I --> C
C --> J[StreamingQueryManager - lifecycle]
J --> C
The offset WAL (top-right loop) is written before the batch executes. The commit WAL (bottom loop) is written after the batch succeeds. These two logs working together are what make exactly-once delivery possible: on a restart, Spark replays any epoch whose offset is logged but whose commit is missing.
The State Store sits between the Catalyst execution layer and the writer. It is only involved for stateful operators: windowed aggregations, stream-stream joins, and dropDuplicates. Stateless transformations โ maps, filters, simple projections โ bypass the state store entirely and are cheaper to run.
๐ Real-World Patterns: Kafka, Incremental File ETL, and Delta Lake Sinks
Kafka Source with Structured Streaming
Kafka is the most common source for Structured Streaming production deployments. The KafkaSource maps each Kafka partition to a Spark task and tracks {topic, partition, offset} triples as the offset state. The source guarantees that each offset range is processed exactly once across restarts.
A critical operational detail: Spark does not commit offsets back to Kafka's __consumer_offsets topic by default. Offset state lives entirely in the Structured Streaming checkpoint. This means you cannot use Kafka consumer group lag monitoring tools (like Burrow or Kafka's kafka-consumer-groups.sh) to measure how far behind your pipeline is. Instead, read lag from the Spark Streaming UI or query the offsets/ checkpoint log directly.
File Source for Incremental ETL
The file source (readStream.format("parquet").load("/data/incoming/")) monitors a directory for new files using a metadata log stored at <checkpointDir>/sources/0/. Spark tracks which files have been processed and never reprocesses them even across restarts.
The AvailableNow trigger combines the reliability of Structured Streaming checkpointing with the operational simplicity of a scheduled job. It processes all currently available files in bounded micro-batches and shuts down cleanly, making it ideal for hourly or daily incremental ETL pipelines where you want exactly-once file processing without a permanently running cluster.
Delta Lake as a Streaming Sink with ACID Guarantees
Delta Lake integrates tightly with Structured Streaming as both a source and a sink. As a sink, Delta uses the micro-batch ID as a transaction token: each batch write is wrapped in a Delta transaction. If Spark retries a failed batch (same batch ID, same offset range), Delta detects the duplicate transaction and skips the write โ this is the idempotency that makes the exactly-once guarantee end-to-end rather than just Spark-side.
As a source, Delta's Change Data Feed (CDF) lets a downstream streaming query consume only the rows that changed in the Delta table since the last checkpoint. This enables efficient streaming ETL chains where each stage reads only the delta of the previous stage, rather than re-scanning the full table.
foreachBatch for Custom Output
When no built-in sink fits your requirements, foreachBatch(func) gives you a micro-batch DataFrame and a batch ID on every trigger. This is the escape hatch for writing to JDBC databases, custom REST APIs, or multi-sink fan-out patterns. The batch ID is passed so your function can implement its own idempotency check.
โ๏ธ Micro-Batch vs Continuous Processing vs Apache Flink
These three execution models represent genuinely different tradeoffs โ not just a dial turned from slow to fast.
Micro-Batch (Structured Streaming default)
The micro-batch engine achieves latency in the range of 100ms to several seconds depending on trigger configuration and cluster size. It benefits from Spark's mature Catalyst optimizer, full SQL support including complex joins and window functions, broad connector ecosystem, and deep integration with Delta Lake. Fault tolerance is strong: the WAL-based recovery model is well-understood and battle-tested.
The ceiling is latency. Even with sub-100ms trigger intervals, the scheduling overhead of launching a Spark job (serialising the plan, allocating tasks, shipping shuffle data) imposes a floor that continuous processing can undercut.
Continuous Processing (Spark experimental)
Continuous processing (Trigger.Continuous("1 second")) eliminates the micro-batch boundary. Executors read from sources continuously and checkpoint offsets every N milliseconds. Latency can drop to single-digit milliseconds. However, the feature is still experimental as of Spark 3.x: it supports only a limited set of operators (map, filter, flatMap, simple projections), does not support stateful aggregations, and has known reliability gaps. It is not appropriate for production use unless your team has contributed to and tested the code path specifically.
Apache Flink
Flink's streaming runtime is a true event-at-a-time engine. It delivers sub-10ms latency with full stateful operator support, mature exactly-once guarantees via distributed snapshots (Chandy-Lamport algorithm), and a richer windowing API. If your requirement is sub-second latency with stateful aggregations, Flink is the operationally mature choice today.
Where Spark wins: teams already running Spark clusters for batch and ML workloads get Structured Streaming nearly for free โ same API, same cluster, same storage (Delta Lake). Flink requires a separate cluster, a different API, and a different operational skill set.
Comparison Summary
| Dimension | Spark Micro-Batch | Spark Continuous | Apache Flink |
| Minimum latency | ~100ms | ~10ms (experimental) | ~1ms |
| Stateful aggregations | Full support | Limited | Full support |
| Exactly-once | Yes (WAL + idempotent sink) | Partial | Yes (distributed snapshots) |
| Production readiness | Mature | Experimental | Mature |
| Operator ecosystem | Very broad | Limited | Broad |
| Batch + stream unification | Yes (same API) | Partial | Table API only |
| Operational overhead | Low (if Spark already deployed) | Low | High (separate cluster) |
๐งญ Choosing the Right Streaming Execution Mode for Your Pipeline
Use this table to map your pipeline's constraints to the appropriate execution model.
| Latency Requirement | State Complexity | Fault Tolerance Need | Recommended Mode |
| > 1 minute | None or simple | Recovery within minutes | Spark AvailableNow trigger (batch-style) |
| 1s โ 60s | None or simple | Exactly-once required | Spark Micro-Batch, default trigger |
| 1s โ 60s | Complex (windows, joins) | Exactly-once required | Spark Micro-Batch + RocksDB state store |
| 100ms โ 1s | None or simple | At-least-once acceptable | Spark Micro-Batch, ProcessingTime("100ms") |
| 100ms โ 1s | Complex | Exactly-once required | Apache Flink |
| < 100ms | None | At-least-once | Spark Continuous (experimental) |
| < 100ms | Complex | Exactly-once required | Apache Flink |
A few rules of thumb that will save a post-mortem:
Do not choose Continuous processing for stateful work. It will either not compile or silently produce incorrect results for operators like groupBy, window, and dropDuplicates.
Do not tune your way out of an undersized cluster. If processingTime exceeds your trigger interval, you have a resource problem, not a configuration problem. Add executors or reduce per-batch input volume with maxOffsetsPerTrigger.
Use AvailableNow instead of Once for incremental ETL. Once processes all backlog in a single gigantic micro-batch. AvailableNow spreads it across multiple bounded micro-batches, applies backpressure, and handles schema evolution more gracefully.
๐งช Reading a Streaming Query Execution Plan and Checkpoint Layout
This section shows two diagnostic tools every Structured Streaming operator should know: how to read a streaming query's physical plan, and how to interpret the checkpoint directory layout.
Inspecting the Streaming Query Plan with explain()
When you call explain(true) on a streaming DataFrame before starting the query, Spark prints the full analysed and optimised logical and physical plans. The output looks superficially identical to a batch explain() output, but there are streaming-specific nodes to watch for:
The physical plan will contain MicroBatchScan (or StreamingRelation in older versions) at the scan leaf, confirming the source is treated as a streaming relation. For stateful queries, you will see StateStoreSave and StateStoreRestore nodes wrapping aggregate operators. These nodes tell you exactly which operators are touching the state store on every trigger โ a high-cardinality key space under StateStoreSave is a common source of memory pressure.
For a windowed aggregation query, a representative physical plan snippet would include:
+- StateStoreSave [window_start, window_end, user_id], state info ...
+- HashAggregate(keys=[window_start, window_end, user_id], ...)
+- StateStoreRestore [window_start, window_end, user_id], state info ...
+- Exchange hashpartitioning(...)
+- MicroBatchScan[topic=payments, partition=0..15]
The Exchange node is a shuffle. For stateful operators, this shuffle is unavoidable โ Spark must route all rows with the same key to the same executor so the state store can be collocated with the computation.
Checkpoint Directory Layout After Several Epochs
After a running streaming query has processed a few micro-batches, the checkpoint directory looks like this:
/checkpoints/fraud-alerts-query/
โโโ metadata # {"id":"abc-123","..."}
โโโ offsets/
โ โโโ 0 # {"payments":{"0":1000,"1":980,...}}
โ โโโ 1 # {"payments":{"0":1500,"1":1450,...}}
โ โโโ 2 # {"payments":{"0":2000,"1":1900,...}}
โโโ commits/
โ โโโ 0 # {"nextBatchWatermarkMs":0}
โ โโโ 1 # {"nextBatchWatermarkMs":0}
โ โโโ 2 # (absent if crash occurred mid-batch 2)
โโโ state/
โโโ 0/
โโโ 0/ # operator index 0, partition 0
โโโ 1.delta # incremental state update
โโโ 2.delta
โโโ 2.snapshot # full state snapshot at epoch 2
In the example above, epoch 2's offset is logged but its commit file is absent. On restart, Spark will reprocess the exact same offset range as epoch 2 โ from where epoch 1 ended to the offsets logged in offsets/2. This is deterministic replay, not guesswork, and it is why the offset WAL must be written before any data is processed.
The state/0/0/2.snapshot file is the RocksDB or in-memory state snapshot at epoch 2. On restart, Spark loads this snapshot, replays 2.delta, and the state is restored to exactly where it was before the crash.
๐ ๏ธ Structured Streaming Configuration Reference
The following Spark configuration properties are the most operationally significant for Structured Streaming deployments. Apply these in your spark-defaults.conf or pass them at session creation.
# โโ Checkpoint and Recovery โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
# Default checkpoint location for all streaming queries in the session.
# Override per-query with .option("checkpointLocation", "...") on writeStream.
spark.sql.streaming.checkpointLocation = /data/checkpoints/streaming
# Number of micro-batch epochs to retain in the offset and commit logs.
# Increase when debugging; decrease to reduce metadata storage.
# Minimum: 1. Recommended for production: 100.
spark.sql.streaming.minBatchesToRetain = 100
# โโ State Store โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
# Default: org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider
# Use RocksDB for large state (> 1M keys per partition) โ avoids full JVM heap pressure.
spark.sql.streaming.stateStore.providerClass = \
org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider
# Number of state snapshots to retain before garbage collection.
spark.sql.streaming.stateStore.minDeltasForSnapshot = 10
# โโ Kafka Source โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
# Disable the deprecated consumer group offset fetcher (Spark 3.1+).
# The new fetcher uses AdminClient, which is more reliable on large clusters.
spark.sql.streaming.kafka.useDeprecatedOffsetFetching = false
# Maximum offsets to read per Kafka partition per micro-batch.
# Critical for preventing runaway memory during lag recovery.
# Set this to: (expected records per interval) / (num partitions).
spark.sql.streaming.kafka.maxOffsetsPerTrigger = 50000
# โโ Shuffle and Parallelism โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
# For streaming queries on a 16-core cluster, set this to 2xโ4x core count.
# The default of 200 is almost always too high for micro-batch workloads.
spark.sql.shuffle.partitions = 64
# Adaptive Query Execution can interfere with state-store partition stability.
# Disable for stateful streaming queries; enable for stateless ones.
spark.sql.adaptive.enabled = false
# โโ Continuous Processing (experimental) โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
# Checkpoint interval for Continuous processing mode.
# Lower values reduce potential duplicate re-processing on restart.
spark.sql.streaming.continuous.executorQueueSize = 1024
spark.sql.streaming.continuous.executorPollIntervalMs = 100
For a full deep-dive on Delta Lake's streaming integration, including ACID transaction mechanics and Change Data Feed, see the planned companion post Delta Lake as a Streaming Sink and Source.
๐ Lessons Learned Running Structured Streaming in Production
Lesson 1: The checkpoint is not optional โ treat it like a database. Teams routinely point the checkpoint location at a local filesystem or ephemeral NFS mount for development convenience and then forget to change it before deploying to production. The first executor failure wipes the checkpoint, and the query restarts from the source's latest offsets, silently skipping all unprocessed data. Use S3, HDFS, or GCS for checkpoints in every environment. Make the checkpoint path visible in your monitoring dashboard.
Lesson 2: Schema changes silently break stateful queries.
Structured Streaming checkpoints include schema metadata for stateful operators. If you add a column to an upstream Kafka schema after a stateful query has been running, the query may fail on restart with an AnalysisException about schema incompatibility, even if the new column is nullable. Plan schema evolution carefully: use schema registry enforcement, add a transformation layer that absorbs schema drift, or document a deliberate checkpoint migration procedure.
Lesson 3: maxOffsetsPerTrigger is your most important safety valve.
When a Structured Streaming query falls behind (because of a source spike, a slow batch, or a scheduled maintenance window), it accumulates a lag backlog. Without maxOffsetsPerTrigger, the first catch-up batch tries to process hours of backlog in a single micro-batch, OOMs the executors, and restarts โ accumulating even more lag. Bounding input per trigger allows the query to catch up gradually without destabilising the cluster.
Lesson 4: Watermarks must be tuned per-pipeline, not copy-pasted.
A watermark of 10 minutes means Spark will hold state open for 10 minutes of event-time lag before evicting a window. If your events routinely arrive 8 minutes late, a 10-minute watermark is correct. If your events arrive within 30 seconds of event time, a 10-minute watermark holds state for 10 unnecessary minutes, bloating the state store and increasing memory pressure. Measure your actual event-time skew before setting the watermark.
Lesson 5: The Streaming UI lag number is in records, not bytes.
The inputRowsPerSecond and processedRowsPerSecond metrics measure row counts. A thousand-row lag might be trivial (small Avro messages) or catastrophic (large JSON payloads). Always expose bytes-per-second metrics from your source (Kafka consumer lag in bytes, or Delta table commit sizes) alongside the Spark Streaming UI to get a complete picture of pipeline health.
๐ Summary and Key Takeaways
TLDR: Spark Structured Streaming models a data stream as an unbounded table and executes queries over it using the micro-batch engine (
MicroBatchExecution). Each trigger fires a Spark batch job over a bounded offset range. The offset WAL and commit WAL together enable exactly-once delivery when paired with an idempotent sink like Delta Lake. TunemaxOffsetsPerTriggerto prevent lag avalanches, useRocksDBStateStoreProviderfor large stateful workloads, and choose Apache Flink when you need sub-100ms latency with full stateful operator support.
Key concepts to remember:
- Unbounded table abstraction โ a stream is an infinite append-only table. Your query is declared once and runs continuously as new rows arrive.
- MicroBatchExecution loop โ poll offsets โ write offset WAL โ run Spark batch job โ write commit WAL. This four-step loop is the heartbeat of every Structured Streaming query.
- Checkpoint = offset log + commit log + state store snapshots. All three must be consistent for correct recovery. Store checkpoints on durable, distributed storage.
- Trigger modes determine when Spark checks for new data. Output modes determine what rows are emitted to the sink. These two settings together define the cost and latency profile of your pipeline.
- Exactly-once = Spark WAL + idempotent sink. Spark's side is always exactly-once. The end-to-end guarantee requires the sink (Delta, Kafka idempotent producer, or custom
foreachBatchwith dedup) to honour the batch ID. - Continuous processing is experimental and stateless-only. Do not use it in production for stateful workloads.
- Choose Flink over Spark for sub-100ms stateful streaming. For teams already on Spark doing batch and ML, Structured Streaming is the right default for latency requirements above 100ms.
๐ Practice Quiz
Test your understanding of Spark Structured Streaming internals.
What is written to
<checkpointDir>/offsets/before a micro-batch executes, and why is the write-before-execute ordering critical for fault tolerance?Show Answer
Correct Answer: The planned end offsets for the micro-batch are written to the offset WAL before any data is read from the source. The ordering is critical because if the driver crashes mid-batch, the offset WAL records what range was being processed. On restart, Spark checks whether a matching commit entry exists incommits/. If it does not (the batch never completed), Spark replays the exact same offset range. Without writing the offsets first, a crash mid-batch would make it impossible to know which offset range to replay, breaking exactly-once semantics.A Structured Streaming query is using
Completeoutput mode with agroupByaggregation. The number of distinct groups is 10 million. What memory problem will this cause, and what is the correct mitigation?Show Answer
Correct Answer: InCompletemode, Spark writes the entire aggregated result table to the sink on every trigger. With 10 million distinct groups, this means 10 million rows are re-emitted every single trigger cycle, regardless of how many actually changed. This causes massive write amplification, network I/O pressure, and potential executor OOM if the result does not fit in memory for the final collect. The correct mitigation is to switch toUpdateoutput mode (emits only changed rows) if the sink supports it, or to redesign the aggregation with tighter grouping to reduce cardinality. If global counts across all time are genuinely needed, reconsider whether a batch job scheduled hourly would be more appropriate.What is the difference between the
Oncetrigger and theAvailableNowtrigger, and in what situation would you preferAvailableNow?Show Answer
Correct Answer:Onceprocesses all available data as a single micro-batch, then terminates. If there is a large backlog, this creates one enormous batch job that can OOM or time out.AvailableNowprocesses all available data across multiple bounded micro-batches (each respectingmaxOffsetsPerTrigger) and then terminates. PreferAvailableNowwhen running scheduled incremental ETL with a potentially large backlog, schema evolution risk, or resource-constrained clusters, because the bounded batches apply backpressure and handle failures without losing the partially-processed state.A Kafka-backed Structured Streaming query reports
inputRowsPerSecond = 100,000butprocessedRowsPerSecond = 40,000for several consecutive triggers. What does this mean, and what are the two correct remediation steps?Show Answer
Correct Answer: The pipeline is falling behind: data is arriving 2.5x faster than it can be processed, which means the unprocessed offset backlog is growing. This is a resource or design problem, not a configuration problem. Correct remediations: (1) add executors or increase core allocation to raiseprocessedRowsPerSecond; (2) setmaxOffsetsPerTriggeron the Kafka source to cap per-batch input volume โ this controls the growth rate of the backlog during recovery and prevents a single catch-up batch from OOMing the cluster. Decreasing the trigger interval is not a valid remediation: it does not increase throughput and adds scheduling overhead.What does
spark.sql.streaming.stateStore.providerClass = RocksDBStateStoreProviderchange about how stateful operators work, and why is this preferable to the default for large state workloads?Show Answer
Correct Answer: By default, Structured Streaming usesHDFSBackedStateStoreProvider, which keeps all state in the JVM heap and only writes snapshots to HDFS/S3 periodically. For large state (millions of keys per partition), heap pressure causes frequent GC pauses, executor instability, and ultimately OOM failures.RocksDBStateStoreProvidermoves the state storage to a RocksDB instance running in native memory (off-heap), dramatically reducing JVM heap pressure. RocksDB also supports incremental delta snapshots and compaction, making checkpoint writes faster and more storage-efficient. The trade-off is slightly higher per-operation latency due to the JNI boundary between JVM and RocksDB.Open-ended challenge: A team wants to use Structured Streaming to implement a real-time sessionisation pipeline: group user events into sessions where a session ends if there is more than 30 minutes of inactivity. The team proposes using a tumbling time window with a 30-minute duration. Explain why this is incorrect and describe the correct Structured Streaming approach to implement session-gap logic.
๐ Related Posts
- Spark Architecture: Driver, Executors, and the DAG Scheduler โ understand the cluster execution model that underpins every micro-batch job
- Spark Shuffles and GroupBy Performance โ stateful streaming aggregations generate shuffles on every trigger; this post explains how to minimise that cost
- Spark DataFrames, SQL, and the Catalyst Optimizer โ the same Catalyst optimizer that powers batch queries also optimises your streaming query plans

Written by
Abstract Algorithms
@abstractalgorithms
More Posts
Watermarking and Late Data Handling in Spark Structured Streaming
TLDR: A watermark tells Spark Structured Streaming: "I will accept events up to N minutes late, and then I am done waiting." Spark tracks the maximum event time seen per partition, takes the global minimum across all partitions, subtracts the thresho...
Stateful Aggregations in Spark Structured Streaming: mapGroupsWithState
TLDR: mapGroupsWithState gives each streaming key its own mutable state object, persisted in a fault-tolerant state store that checkpoints to object storage on every micro-batch. Where window aggregations assume fixed time boundaries, mapGroupsWithSt...
Shuffles in Spark: Why groupBy Kills Performance
TLDR: A Spark shuffle is the most expensive operation in any distributed job โ it moves every matching key across the network, writes temporary sorted files to disk, and forces a hard synchronization barrier between every upstream and downstream stag...
