All Posts

Spark Structured Streaming: Micro-Batch vs Continuous Processing

Treats streams as unbounded tables โ€” the execution model determines whether your pipeline hits milliseconds or minutes.

Abstract AlgorithmsAbstract Algorithms
ยทยท25 min read
Share
AI Share on X / Twitter
AI Share on LinkedIn
Copy link

AI-assisted content. This post may have been written or enhanced with the help of AI tools. While efforts are made to ensure accuracy, the content may contain errors or inaccuracies. Please verify critical information independently.

๐Ÿ“– The 15-Minute Gap: How a Fraud Team Discovered They Needed Real-Time Streaming

A fintech team runs payment fraud detection with a well-tuned Spark batch job. Every 15 minutes it reads a day's worth of transaction events from S3, scores them against a model, and writes flagged payments to a PostgreSQL alerts table. It works. The false-positive rate is low, the job completes in under three minutes, and the pipeline has been stable for months.

Then the fraud team reviews a case where a stolen card was used 47 times over 22 minutes before anyone noticed. The batch job had run twice during that window and processed none of the suspicious transactions because they had arrived after the last S3 snapshot was written. By the time the next batch job ran, the account was drained.

The team's instinct โ€” run the batch job more frequently, say every 30 seconds โ€” runs into a hard wall. Spark batch jobs carry substantial scheduler overhead: the driver allocates resources, builds an execution plan, stages data, and tears down the job. Firing a full batch cycle every 30 seconds against a Kafka topic is operationally expensive and unreliable. What the team actually needs is a runtime that keeps executors alive, tracks how far it has read, and continuously processes new data as it arrives.

That is exactly the problem Spark Structured Streaming was designed to solve. It delivers sub-second latency on streaming data using the same DataFrame API the team already knows, while providing the fault tolerance and exactly-once guarantees that production fraud detection demands. The key mental model shift is deceptively simple: stop thinking about files and batches. Start thinking about an infinitely growing table where new rows keep arriving, and your query runs continuously over its ever-expanding contents.


๐Ÿ” Streams as Unbounded Tables: The Core Abstraction of Structured Streaming

Structured Streaming reframes how engineers think about data in motion. In the traditional batch model, a dataset is a finite, bounded table: you read it all, compute results, and write output. In Structured Streaming, a data stream is modelled as an unbounded table โ€” a table that never stops growing because new rows are constantly being appended by the source (a Kafka topic, a file directory, a socket).

Reading and Writing a Stream

To consume a stream, you use readStream on a SparkSession. This produces a DataFrame backed by a DataStreamReader, which abstracts the specifics of the source (Kafka offsets, file modification timestamps, socket connections) behind a standard DataFrame interface. You write query transformations โ€” filters, joins, aggregations โ€” exactly as you would in batch mode.

To produce output, you call writeStream on the result DataFrame, which produces a DataStreamWriter. This is where you declare the sink (Delta Lake, Kafka, console, memory), the checkpoint location, the output mode, and the trigger.

Trigger Modes: When Does Spark Check for New Data?

The trigger mode controls the cadence at which Spark asks "is there new data to process?" Available modes are:

Trigger ModeBehaviourBest For
Default (unset)Processes a new micro-batch as soon as the previous one finishesMaximum throughput, latency ~seconds
ProcessingTime("30 seconds")Waits at least 30s between micro-batchesControlled latency with predictable load
OnceProcesses all available data in one batch, then stopsScheduled batch-style incremental loads
AvailableNowProcesses all available data across multiple micro-batches, then stopsIdempotent incremental ETL with backpressure
Continuous("1 second")Experimental continuous processing engine, checkpoint every 1sUltra-low latency, limited operator support

Output Modes: What Gets Written to the Sink?

Output mode determines which rows Spark emits to the sink each trigger:

  • Append โ€” only newly completed rows are written. Safe for stateless transformations. The default for sinks like Kafka and Delta Lake.
  • Complete โ€” the entire result table is rewritten every trigger. Required for global aggregations (e.g., total counts across all time). Memory-intensive.
  • Update โ€” only rows that changed since the last trigger are written. Efficient for aggregations with many stable groups. Not all sinks support it.

The combination of trigger mode and output mode is one of the most consequential design decisions in a Structured Streaming pipeline. A poorly chosen pair โ€” for example, Complete mode on a high-cardinality aggregation with a fast trigger โ€” can exhaust memory within minutes.


โš™๏ธ Inside MicroBatchExecution: How Every Trigger Fires a Batch Job

The default and most commonly used execution engine in Structured Streaming is MicroBatchExecution. Understanding it moves you from "I follow the docs" to "I can debug production incidents."

The StreamExecution Thread

When you call writeStream.start(), Spark starts a dedicated StreamExecution thread for the query. This thread runs in a loop and is managed by StreamingQueryManager, a singleton component in the SparkSession that tracks all active streaming queries, enforces resource policies, and provides the awaitAnyTermination() API.

The loop proceeds in four phases for every trigger interval:

  1. Poll sources for new offsets. MicroBatchExecution calls each source (e.g., KafkaSource) to ask: "what is the latest available offset?" This is a lightweight metadata call โ€” no data is read yet. The result is a map of {partition โ†’ offset}.

  2. Write the planned end offsets to the offset log. Before any data is processed, Spark durably commits the end offsets to the offset log (a write-ahead log stored in <checkpointDir>/offsets/). This is the fault-tolerance anchor: if the job crashes before completing this micro-batch, the end offsets are never committed, and on restart Spark re-plans from the last committed offsets.

  3. Execute the batch. Spark constructs a standard Spark batch job over the bounded range [lastCommittedOffset, newEndOffset]. This is a normal Catalyst-optimized DataFrame execution. All the familiar concepts โ€” physical plan, shuffle partitions, speculative execution โ€” apply in full here. The streaming aspect is simply that the data range is freshly computed on every trigger.

  4. Write committed offsets to the commit log. After the batch job completes successfully, the commit is recorded in <checkpointDir>/commits/. On restart, Spark checks both logs: if offsets are in the offset log but not the commit log, the last micro-batch is replayed from scratch using exactly the same offset range, achieving exactly-once processing when the sink is also idempotent.

Checkpointing and Exactly-Once Semantics

The checkpoint directory is the cornerstone of fault tolerance. Its structure is:

<checkpointDir>/
  offsets/        # WAL: planned end offsets per micro-batch epoch
  commits/        # WAL: committed epochs after successful batch
  state/          # State store data for stateful operators (windows, dedup, joins)
  metadata        # Query ID, Spark version, config snapshot

The combination of the offset WAL and an idempotent sink delivers exactly-once semantics. For Delta Lake, idempotency comes from transactional writes keyed on the batch ID. For Kafka, idempotency requires enabling Kafka's idempotent producer mode. For console or non-transactional file sinks, you get at-least-once at best.

Stateful Operations and the State Store

Aggregations with time windows, stream-stream joins, and deduplication all require Spark to maintain state across micro-batches. This state is managed by the StateStore, which is an in-memory key-value store backed by RocksDB (via RocksDBStateStoreProvider) or the default in-memory provider. The state is periodically snapshotted to HDFS or S3 under <checkpointDir>/state/ so that it survives executor failures and application restarts.

A watermark โ€” declared with withWatermark("eventTime", "10 minutes") โ€” tells Spark how long to wait for late data before finalising a window. Once a window is past the watermark, its state is evicted and memory is reclaimed.


๐Ÿง  How Structured Streaming Guarantees Correctness at Scale

The Internals of MicroBatchExecution

MicroBatchExecution extends StreamExecution, which is the base class for all Spark streaming execution engines. The object hierarchy is worth knowing when reading stack traces or Spark source code:

StreamingQueryManager  (1 per SparkSession)
  โ””โ”€โ”€ StreamingQuery   (1 per active stream)
        โ””โ”€โ”€ StreamExecution  (abstract)
              โ””โ”€โ”€ MicroBatchExecution  (default engine)
              โ””โ”€โ”€ ContinuousExecution  (experimental engine)

Epoch-based offset tracking is how Structured Streaming achieves determinism. Each micro-batch is assigned a monotonically increasing epoch number (the batch ID). The offset log and commit log are both indexed by this epoch number. On a restart, Spark finds the highest committed epoch (from commits/), determines the last planned end offset for that epoch (from offsets/), and starts planning the next batch from that exact position. There is no reliance on wall-clock time, no sampling, and no approximate "where did we leave off?" guesswork.

The WAL (write-ahead log) for offsets is written using HDFSMetadataLog, a Spark class that writes atomic JSON blobs to HDFS or S3-compatible storage. Each file in offsets/ is named by its epoch number and contains the offset ranges for all sources in that micro-batch. Reading the entire log directory gives you a complete, ordered history of every offset range ever processed โ€” invaluable for debugging missed data or duplicate writes.

The state store adds complexity for stateful queries. When you restart a job from a checkpoint, Spark must reload the state store snapshot that matches the last committed epoch. If the state snapshot and the offset log are inconsistent (e.g., a partial state write occurred before a crash), Spark rolls back to the last consistent state snapshot. This is why the spark.sql.streaming.stateStore.providerClass setting matters: the default in-memory provider loses state on executor failure, while RocksDBStateStoreProvider persists state to disk and to the checkpoint store.

Performance Analysis: Tuning Trigger Intervals for Production Pipelines

Trigger interval tuning is one of the most misunderstood aspects of Structured Streaming operations. Engineers often assume that a shorter trigger interval always means lower latency โ€” but this is only true when processing time is shorter than the trigger interval.

The processing time / trigger interval relationship:

ScenarioTrigger IntervalProcessing TimeObserved LatencyRisk
Healthy10s3s~10sNone โ€” idle time between batches
Borderline10s9s~10sScheduler overhead can push into next interval
Overloaded10s15s~15s+Backlog builds; continuous lag increase
Misconfigured1s8s~8sEvery batch queues; memory pressure builds

When processing time > trigger interval, Spark does not run multiple batches in parallel โ€” it simply fires the next batch as soon as the previous one finishes (default trigger behaviour). The trigger interval becomes irrelevant, and the true latency floor is the processing time itself. No amount of trigger tuning fixes an under-resourced cluster.

Key metrics to watch in the Spark Streaming UI:

  • processingTime (ms): time to execute the batch job, excluding scheduling overhead
  • triggerExecution (ms): total wall clock time for the trigger cycle, including scheduling
  • inputRowsPerSecond: the rate at which records arrive from the source
  • processedRowsPerSecond: the rate at which records are processed by the engine

When inputRowsPerSecond consistently exceeds processedRowsPerSecond, the pipeline is falling behind. The correct remedies are: increase the number of executors, increase shuffle partitions, or redesign expensive stateful operations โ€” not decrease the trigger interval.

Backpressure in Structured Streaming is not automatic the way it is in Apache Flink. Spark does not throttle the source based on downstream pressure. You must configure maxOffsetsPerTrigger (for Kafka) or maxFilesPerTrigger (for file sources) to bound the input volume per micro-batch and prevent runaway memory growth during lag recovery.


๐Ÿ“Š The End-to-End Streaming Pipeline: From Kafka to Sink

The following diagram shows how data flows through a Structured Streaming pipeline backed by Kafka and writing to Delta Lake. Each component maps to a concrete class or abstraction in Spark's codebase.

graph TD
    A[Kafka Topic - Source] --> B[KafkaSource - offset polling]
    B --> C[MicroBatchExecution - trigger loop]
    C --> D[Offset WAL - offsets dir]
    D --> C
    C --> E[Spark Batch Job - Catalyst plan]
    E --> F[State Store - windows and dedup]
    F --> G[DataStreamWriter - writeStream]
    G --> H[Delta Lake Sink - transactional write]
    H --> I[Commit WAL - commits dir]
    I --> C
    C --> J[StreamingQueryManager - lifecycle]
    J --> C

The offset WAL (top-right loop) is written before the batch executes. The commit WAL (bottom loop) is written after the batch succeeds. These two logs working together are what make exactly-once delivery possible: on a restart, Spark replays any epoch whose offset is logged but whose commit is missing.

The State Store sits between the Catalyst execution layer and the writer. It is only involved for stateful operators: windowed aggregations, stream-stream joins, and dropDuplicates. Stateless transformations โ€” maps, filters, simple projections โ€” bypass the state store entirely and are cheaper to run.


๐ŸŒ Real-World Patterns: Kafka, Incremental File ETL, and Delta Lake Sinks

Kafka Source with Structured Streaming

Kafka is the most common source for Structured Streaming production deployments. The KafkaSource maps each Kafka partition to a Spark task and tracks {topic, partition, offset} triples as the offset state. The source guarantees that each offset range is processed exactly once across restarts.

A critical operational detail: Spark does not commit offsets back to Kafka's __consumer_offsets topic by default. Offset state lives entirely in the Structured Streaming checkpoint. This means you cannot use Kafka consumer group lag monitoring tools (like Burrow or Kafka's kafka-consumer-groups.sh) to measure how far behind your pipeline is. Instead, read lag from the Spark Streaming UI or query the offsets/ checkpoint log directly.

File Source for Incremental ETL

The file source (readStream.format("parquet").load("/data/incoming/")) monitors a directory for new files using a metadata log stored at <checkpointDir>/sources/0/. Spark tracks which files have been processed and never reprocesses them even across restarts.

The AvailableNow trigger combines the reliability of Structured Streaming checkpointing with the operational simplicity of a scheduled job. It processes all currently available files in bounded micro-batches and shuts down cleanly, making it ideal for hourly or daily incremental ETL pipelines where you want exactly-once file processing without a permanently running cluster.

Delta Lake as a Streaming Sink with ACID Guarantees

Delta Lake integrates tightly with Structured Streaming as both a source and a sink. As a sink, Delta uses the micro-batch ID as a transaction token: each batch write is wrapped in a Delta transaction. If Spark retries a failed batch (same batch ID, same offset range), Delta detects the duplicate transaction and skips the write โ€” this is the idempotency that makes the exactly-once guarantee end-to-end rather than just Spark-side.

As a source, Delta's Change Data Feed (CDF) lets a downstream streaming query consume only the rows that changed in the Delta table since the last checkpoint. This enables efficient streaming ETL chains where each stage reads only the delta of the previous stage, rather than re-scanning the full table.

foreachBatch for Custom Output

When no built-in sink fits your requirements, foreachBatch(func) gives you a micro-batch DataFrame and a batch ID on every trigger. This is the escape hatch for writing to JDBC databases, custom REST APIs, or multi-sink fan-out patterns. The batch ID is passed so your function can implement its own idempotency check.


These three execution models represent genuinely different tradeoffs โ€” not just a dial turned from slow to fast.

Micro-Batch (Structured Streaming default)

The micro-batch engine achieves latency in the range of 100ms to several seconds depending on trigger configuration and cluster size. It benefits from Spark's mature Catalyst optimizer, full SQL support including complex joins and window functions, broad connector ecosystem, and deep integration with Delta Lake. Fault tolerance is strong: the WAL-based recovery model is well-understood and battle-tested.

The ceiling is latency. Even with sub-100ms trigger intervals, the scheduling overhead of launching a Spark job (serialising the plan, allocating tasks, shipping shuffle data) imposes a floor that continuous processing can undercut.

Continuous Processing (Spark experimental)

Continuous processing (Trigger.Continuous("1 second")) eliminates the micro-batch boundary. Executors read from sources continuously and checkpoint offsets every N milliseconds. Latency can drop to single-digit milliseconds. However, the feature is still experimental as of Spark 3.x: it supports only a limited set of operators (map, filter, flatMap, simple projections), does not support stateful aggregations, and has known reliability gaps. It is not appropriate for production use unless your team has contributed to and tested the code path specifically.

Flink's streaming runtime is a true event-at-a-time engine. It delivers sub-10ms latency with full stateful operator support, mature exactly-once guarantees via distributed snapshots (Chandy-Lamport algorithm), and a richer windowing API. If your requirement is sub-second latency with stateful aggregations, Flink is the operationally mature choice today.

Where Spark wins: teams already running Spark clusters for batch and ML workloads get Structured Streaming nearly for free โ€” same API, same cluster, same storage (Delta Lake). Flink requires a separate cluster, a different API, and a different operational skill set.

Comparison Summary

DimensionSpark Micro-BatchSpark ContinuousApache Flink
Minimum latency~100ms~10ms (experimental)~1ms
Stateful aggregationsFull supportLimitedFull support
Exactly-onceYes (WAL + idempotent sink)PartialYes (distributed snapshots)
Production readinessMatureExperimentalMature
Operator ecosystemVery broadLimitedBroad
Batch + stream unificationYes (same API)PartialTable API only
Operational overheadLow (if Spark already deployed)LowHigh (separate cluster)

๐Ÿงญ Choosing the Right Streaming Execution Mode for Your Pipeline

Use this table to map your pipeline's constraints to the appropriate execution model.

Latency RequirementState ComplexityFault Tolerance NeedRecommended Mode
> 1 minuteNone or simpleRecovery within minutesSpark AvailableNow trigger (batch-style)
1s โ€“ 60sNone or simpleExactly-once requiredSpark Micro-Batch, default trigger
1s โ€“ 60sComplex (windows, joins)Exactly-once requiredSpark Micro-Batch + RocksDB state store
100ms โ€“ 1sNone or simpleAt-least-once acceptableSpark Micro-Batch, ProcessingTime("100ms")
100ms โ€“ 1sComplexExactly-once requiredApache Flink
< 100msNoneAt-least-onceSpark Continuous (experimental)
< 100msComplexExactly-once requiredApache Flink

A few rules of thumb that will save a post-mortem:

Do not choose Continuous processing for stateful work. It will either not compile or silently produce incorrect results for operators like groupBy, window, and dropDuplicates.

Do not tune your way out of an undersized cluster. If processingTime exceeds your trigger interval, you have a resource problem, not a configuration problem. Add executors or reduce per-batch input volume with maxOffsetsPerTrigger.

Use AvailableNow instead of Once for incremental ETL. Once processes all backlog in a single gigantic micro-batch. AvailableNow spreads it across multiple bounded micro-batches, applies backpressure, and handles schema evolution more gracefully.


๐Ÿงช Reading a Streaming Query Execution Plan and Checkpoint Layout

This section shows two diagnostic tools every Structured Streaming operator should know: how to read a streaming query's physical plan, and how to interpret the checkpoint directory layout.

Inspecting the Streaming Query Plan with explain()

When you call explain(true) on a streaming DataFrame before starting the query, Spark prints the full analysed and optimised logical and physical plans. The output looks superficially identical to a batch explain() output, but there are streaming-specific nodes to watch for:

The physical plan will contain MicroBatchScan (or StreamingRelation in older versions) at the scan leaf, confirming the source is treated as a streaming relation. For stateful queries, you will see StateStoreSave and StateStoreRestore nodes wrapping aggregate operators. These nodes tell you exactly which operators are touching the state store on every trigger โ€” a high-cardinality key space under StateStoreSave is a common source of memory pressure.

For a windowed aggregation query, a representative physical plan snippet would include:

+- StateStoreSave [window_start, window_end, user_id], state info ...
   +- HashAggregate(keys=[window_start, window_end, user_id], ...)
      +- StateStoreRestore [window_start, window_end, user_id], state info ...
         +- Exchange hashpartitioning(...)
            +- MicroBatchScan[topic=payments, partition=0..15]

The Exchange node is a shuffle. For stateful operators, this shuffle is unavoidable โ€” Spark must route all rows with the same key to the same executor so the state store can be collocated with the computation.

Checkpoint Directory Layout After Several Epochs

After a running streaming query has processed a few micro-batches, the checkpoint directory looks like this:

/checkpoints/fraud-alerts-query/
โ”œโ”€โ”€ metadata                          # {"id":"abc-123","..."}
โ”œโ”€โ”€ offsets/
โ”‚   โ”œโ”€โ”€ 0                             # {"payments":{"0":1000,"1":980,...}}
โ”‚   โ”œโ”€โ”€ 1                             # {"payments":{"0":1500,"1":1450,...}}
โ”‚   โ””โ”€โ”€ 2                             # {"payments":{"0":2000,"1":1900,...}}
โ”œโ”€โ”€ commits/
โ”‚   โ”œโ”€โ”€ 0                             # {"nextBatchWatermarkMs":0}
โ”‚   โ”œโ”€โ”€ 1                             # {"nextBatchWatermarkMs":0}
โ”‚   โ””โ”€โ”€ 2                             # (absent if crash occurred mid-batch 2)
โ””โ”€โ”€ state/
    โ””โ”€โ”€ 0/
        โ””โ”€โ”€ 0/                        # operator index 0, partition 0
            โ”œโ”€โ”€ 1.delta               # incremental state update
            โ”œโ”€โ”€ 2.delta
            โ””โ”€โ”€ 2.snapshot            # full state snapshot at epoch 2

In the example above, epoch 2's offset is logged but its commit file is absent. On restart, Spark will reprocess the exact same offset range as epoch 2 โ€” from where epoch 1 ended to the offsets logged in offsets/2. This is deterministic replay, not guesswork, and it is why the offset WAL must be written before any data is processed.

The state/0/0/2.snapshot file is the RocksDB or in-memory state snapshot at epoch 2. On restart, Spark loads this snapshot, replays 2.delta, and the state is restored to exactly where it was before the crash.


๐Ÿ› ๏ธ Structured Streaming Configuration Reference

The following Spark configuration properties are the most operationally significant for Structured Streaming deployments. Apply these in your spark-defaults.conf or pass them at session creation.

# โ”€โ”€ Checkpoint and Recovery โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€

# Default checkpoint location for all streaming queries in the session.
# Override per-query with .option("checkpointLocation", "...") on writeStream.
spark.sql.streaming.checkpointLocation = /data/checkpoints/streaming

# Number of micro-batch epochs to retain in the offset and commit logs.
# Increase when debugging; decrease to reduce metadata storage.
# Minimum: 1. Recommended for production: 100.
spark.sql.streaming.minBatchesToRetain = 100

# โ”€โ”€ State Store โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€

# Default: org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider
# Use RocksDB for large state (> 1M keys per partition) โ€” avoids full JVM heap pressure.
spark.sql.streaming.stateStore.providerClass = \
  org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider

# Number of state snapshots to retain before garbage collection.
spark.sql.streaming.stateStore.minDeltasForSnapshot = 10

# โ”€โ”€ Kafka Source โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€

# Disable the deprecated consumer group offset fetcher (Spark 3.1+).
# The new fetcher uses AdminClient, which is more reliable on large clusters.
spark.sql.streaming.kafka.useDeprecatedOffsetFetching = false

# Maximum offsets to read per Kafka partition per micro-batch.
# Critical for preventing runaway memory during lag recovery.
# Set this to: (expected records per interval) / (num partitions).
spark.sql.streaming.kafka.maxOffsetsPerTrigger = 50000

# โ”€โ”€ Shuffle and Parallelism โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€

# For streaming queries on a 16-core cluster, set this to 2xโ€“4x core count.
# The default of 200 is almost always too high for micro-batch workloads.
spark.sql.shuffle.partitions = 64

# Adaptive Query Execution can interfere with state-store partition stability.
# Disable for stateful streaming queries; enable for stateless ones.
spark.sql.adaptive.enabled = false

# โ”€โ”€ Continuous Processing (experimental) โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€

# Checkpoint interval for Continuous processing mode.
# Lower values reduce potential duplicate re-processing on restart.
spark.sql.streaming.continuous.executorQueueSize = 1024
spark.sql.streaming.continuous.executorPollIntervalMs = 100

For a full deep-dive on Delta Lake's streaming integration, including ACID transaction mechanics and Change Data Feed, see the planned companion post Delta Lake as a Streaming Sink and Source.


๐Ÿ“š Lessons Learned Running Structured Streaming in Production

Lesson 1: The checkpoint is not optional โ€” treat it like a database. Teams routinely point the checkpoint location at a local filesystem or ephemeral NFS mount for development convenience and then forget to change it before deploying to production. The first executor failure wipes the checkpoint, and the query restarts from the source's latest offsets, silently skipping all unprocessed data. Use S3, HDFS, or GCS for checkpoints in every environment. Make the checkpoint path visible in your monitoring dashboard.

Lesson 2: Schema changes silently break stateful queries. Structured Streaming checkpoints include schema metadata for stateful operators. If you add a column to an upstream Kafka schema after a stateful query has been running, the query may fail on restart with an AnalysisException about schema incompatibility, even if the new column is nullable. Plan schema evolution carefully: use schema registry enforcement, add a transformation layer that absorbs schema drift, or document a deliberate checkpoint migration procedure.

Lesson 3: maxOffsetsPerTrigger is your most important safety valve. When a Structured Streaming query falls behind (because of a source spike, a slow batch, or a scheduled maintenance window), it accumulates a lag backlog. Without maxOffsetsPerTrigger, the first catch-up batch tries to process hours of backlog in a single micro-batch, OOMs the executors, and restarts โ€” accumulating even more lag. Bounding input per trigger allows the query to catch up gradually without destabilising the cluster.

Lesson 4: Watermarks must be tuned per-pipeline, not copy-pasted. A watermark of 10 minutes means Spark will hold state open for 10 minutes of event-time lag before evicting a window. If your events routinely arrive 8 minutes late, a 10-minute watermark is correct. If your events arrive within 30 seconds of event time, a 10-minute watermark holds state for 10 unnecessary minutes, bloating the state store and increasing memory pressure. Measure your actual event-time skew before setting the watermark.

Lesson 5: The Streaming UI lag number is in records, not bytes. The inputRowsPerSecond and processedRowsPerSecond metrics measure row counts. A thousand-row lag might be trivial (small Avro messages) or catastrophic (large JSON payloads). Always expose bytes-per-second metrics from your source (Kafka consumer lag in bytes, or Delta table commit sizes) alongside the Spark Streaming UI to get a complete picture of pipeline health.


๐Ÿ“Œ Summary and Key Takeaways

TLDR: Spark Structured Streaming models a data stream as an unbounded table and executes queries over it using the micro-batch engine (MicroBatchExecution). Each trigger fires a Spark batch job over a bounded offset range. The offset WAL and commit WAL together enable exactly-once delivery when paired with an idempotent sink like Delta Lake. Tune maxOffsetsPerTrigger to prevent lag avalanches, use RocksDBStateStoreProvider for large stateful workloads, and choose Apache Flink when you need sub-100ms latency with full stateful operator support.

Key concepts to remember:

  1. Unbounded table abstraction โ€” a stream is an infinite append-only table. Your query is declared once and runs continuously as new rows arrive.
  2. MicroBatchExecution loop โ€” poll offsets โ†’ write offset WAL โ†’ run Spark batch job โ†’ write commit WAL. This four-step loop is the heartbeat of every Structured Streaming query.
  3. Checkpoint = offset log + commit log + state store snapshots. All three must be consistent for correct recovery. Store checkpoints on durable, distributed storage.
  4. Trigger modes determine when Spark checks for new data. Output modes determine what rows are emitted to the sink. These two settings together define the cost and latency profile of your pipeline.
  5. Exactly-once = Spark WAL + idempotent sink. Spark's side is always exactly-once. The end-to-end guarantee requires the sink (Delta, Kafka idempotent producer, or custom foreachBatch with dedup) to honour the batch ID.
  6. Continuous processing is experimental and stateless-only. Do not use it in production for stateful workloads.
  7. Choose Flink over Spark for sub-100ms stateful streaming. For teams already on Spark doing batch and ML, Structured Streaming is the right default for latency requirements above 100ms.

๐Ÿ“ Practice Quiz

Test your understanding of Spark Structured Streaming internals.

  1. What is written to <checkpointDir>/offsets/ before a micro-batch executes, and why is the write-before-execute ordering critical for fault tolerance?

    Show Answer Correct Answer: The planned end offsets for the micro-batch are written to the offset WAL before any data is read from the source. The ordering is critical because if the driver crashes mid-batch, the offset WAL records what range was being processed. On restart, Spark checks whether a matching commit entry exists in commits/. If it does not (the batch never completed), Spark replays the exact same offset range. Without writing the offsets first, a crash mid-batch would make it impossible to know which offset range to replay, breaking exactly-once semantics.
  2. A Structured Streaming query is using Complete output mode with a groupBy aggregation. The number of distinct groups is 10 million. What memory problem will this cause, and what is the correct mitigation?

    Show Answer Correct Answer: In Complete mode, Spark writes the entire aggregated result table to the sink on every trigger. With 10 million distinct groups, this means 10 million rows are re-emitted every single trigger cycle, regardless of how many actually changed. This causes massive write amplification, network I/O pressure, and potential executor OOM if the result does not fit in memory for the final collect. The correct mitigation is to switch to Update output mode (emits only changed rows) if the sink supports it, or to redesign the aggregation with tighter grouping to reduce cardinality. If global counts across all time are genuinely needed, reconsider whether a batch job scheduled hourly would be more appropriate.
  3. What is the difference between the Once trigger and the AvailableNow trigger, and in what situation would you prefer AvailableNow?

    Show Answer Correct Answer: Once processes all available data as a single micro-batch, then terminates. If there is a large backlog, this creates one enormous batch job that can OOM or time out. AvailableNow processes all available data across multiple bounded micro-batches (each respecting maxOffsetsPerTrigger) and then terminates. Prefer AvailableNow when running scheduled incremental ETL with a potentially large backlog, schema evolution risk, or resource-constrained clusters, because the bounded batches apply backpressure and handle failures without losing the partially-processed state.
  4. A Kafka-backed Structured Streaming query reports inputRowsPerSecond = 100,000 but processedRowsPerSecond = 40,000 for several consecutive triggers. What does this mean, and what are the two correct remediation steps?

    Show Answer Correct Answer: The pipeline is falling behind: data is arriving 2.5x faster than it can be processed, which means the unprocessed offset backlog is growing. This is a resource or design problem, not a configuration problem. Correct remediations: (1) add executors or increase core allocation to raise processedRowsPerSecond; (2) set maxOffsetsPerTrigger on the Kafka source to cap per-batch input volume โ€” this controls the growth rate of the backlog during recovery and prevents a single catch-up batch from OOMing the cluster. Decreasing the trigger interval is not a valid remediation: it does not increase throughput and adds scheduling overhead.
  5. What does spark.sql.streaming.stateStore.providerClass = RocksDBStateStoreProvider change about how stateful operators work, and why is this preferable to the default for large state workloads?

    Show Answer Correct Answer: By default, Structured Streaming uses HDFSBackedStateStoreProvider, which keeps all state in the JVM heap and only writes snapshots to HDFS/S3 periodically. For large state (millions of keys per partition), heap pressure causes frequent GC pauses, executor instability, and ultimately OOM failures. RocksDBStateStoreProvider moves the state storage to a RocksDB instance running in native memory (off-heap), dramatically reducing JVM heap pressure. RocksDB also supports incremental delta snapshots and compaction, making checkpoint writes faster and more storage-efficient. The trade-off is slightly higher per-operation latency due to the JNI boundary between JVM and RocksDB.
  6. Open-ended challenge: A team wants to use Structured Streaming to implement a real-time sessionisation pipeline: group user events into sessions where a session ends if there is more than 30 minutes of inactivity. The team proposes using a tumbling time window with a 30-minute duration. Explain why this is incorrect and describe the correct Structured Streaming approach to implement session-gap logic.


Abstract Algorithms

Written by

Abstract Algorithms

@abstractalgorithms