All Posts

Spark Adaptive Query Execution: Dynamic Coalescing, Pruning, and Skew Handling

How AQE re-optimizes query plans at runtime using real shuffle statistics to eliminate skew, prune partitions, and coalesce small tasks

Abstract AlgorithmsAbstract Algorithms
ยทยท34 min read

AI-assisted content.

TLDR: Before AQE, Spark compiled your entire query into a static physical plan using size estimates that were frequently wrong โ€” and a wrong estimate at planning time meant a skewed join, 800 small tasks, or a missed broadcast opportunity that no amount of executor tuning could fix. Adaptive Query Execution (introduced in Spark 3.0, production-hardened in 3.2) rewrites that plan at every shuffle boundary using real partition statistics: coalescing hundreds of tiny post-shuffle partitions into a handful of right-sized ones, splitting skewed partitions into parallel sub-tasks, upgrading sort-merge joins to broadcast hash joins when the data turns out to be small at runtime, and pruning entire fact-table partitions using pushed-down broadcast filters. The result is a query engine that corrects its own mistakes between stages rather than committing to a bad plan and paying for it across the entire job.


๐Ÿ“– The 199 Idle Tasks: When One Skewed Partition Holds an Entire Cluster Hostage

The alert came in at 2:47 AM. A data engineering team at a logistics company had promoted their weekly customer-segment enrichment pipeline to production. The job joined a 600 GB behavioral event table against a 12 GB customer dimension table, partitioned by region. On staging โ€” a 15 GB sample โ€” the job completed in four minutes. In production, it hit the nine-hour timeout and produced no output.

The on-call engineer opened the Spark UI and found Stage 4 โ€” the join stage โ€” frozen at 199/200 complete. One task was still running. It had been running for eight hours and forty minutes. The other 199 tasks had finished in under ninety seconds each. The remaining task was processing 83% of the total join input โ€” approximately 498 GB of data โ€” because every event record for the UNKNOWN region key, which turned out to represent a legacy catch-all bucket created years earlier, had been hashed to the same partition. One task. One executor. One CPU core. Eight hours of the entire cluster waiting.

This is data skew: the pathological case in which a hash-partitioned key has a wildly non-uniform distribution. It is the most common source of Spark job failures at scale, and until Spark 3.0, there was no general-purpose mechanism to detect or mitigate it at runtime. Engineers worked around it by salting keys, over-partitioning, or writing pre-processing steps โ€” all of which required manual inspection and hand-coded fixes for every new dataset irregularity.

Adaptive Query Execution is Spark's answer to this entire class of problem. It changes the fundamental contract of query execution: instead of committing to a plan derived from potentially stale or inaccurate estimates, Spark now re-evaluates and rewrites the plan after every shuffle boundary, using the actual partition size statistics from the completed stage. The skewed join above would have been split automatically. The 199 idle tasks would have processed their sub-partitions in 90 seconds. The one monstrous task would have become eight balanced tasks. The job would have completed in under ten minutes.

This post opens the internals of how AQE achieves that. By the end, you will understand the AdaptiveSparkPlanExec tree, how ShuffleQueryStageExec materializes and captures statistics, what rules fire at each re-planning checkpoint, how dynamic partition pruning eliminates full table scans, and what configuration levers give you precise control over each feature.


โš™๏ธ How AQE Breaks the Static Plan Barrier: The Re-Optimization Loop Between Shuffle Stages

To understand what AQE does differently, you need to understand what Catalyst does without it.

When you submit a Spark SQL query or DataFrame chain, Catalyst transforms it through four phases: analysis (bind column names to the catalog), logical optimization (push predicates, fold constants, eliminate redundant projections), physical planning (select physical operators โ€” sort-merge join vs. broadcast hash join, pick sort orders), and code generation (compile the physical plan to JVM bytecode). The physical plan produced by this process is fixed before a single row of data is read. Every decision โ€” how many partitions to create after a shuffle, which join strategy to apply, which partitions of a table to scan โ€” is made using statistics gathered from the catalog or from ANALYZE TABLE. Those statistics are frequently wrong.

The post-shuffle partition count is a perfect example. Spark defaults to spark.sql.shuffle.partitions = 200. A query that aggregates 1 TB of data ends up with 200 output partitions โ€” averaging 5 GB each, which is far too large for most downstream operations. A query that aggregates 500 MB of data also ends up with 200 output partitions โ€” averaging 2.5 MB each, meaning 200 executor tasks each processing a trivially small slice of data, wasting task scheduling overhead on partitions that could fit in memory in milliseconds. Without AQE, both queries use the same partition count regardless of what the data actually looks like.

AQE inserts a re-optimization checkpoint at each shuffle boundary. The process works like this:

  1. Catalyst produces the initial physical plan as usual. If AQE is enabled, the plan is wrapped in an AdaptiveSparkPlanExec node, which acts as the root of a rewritable plan tree.
  2. Spark identifies the leaf-most stages โ€” those with no shuffle dependencies โ€” and submits them as QueryStageExec nodes. These stages execute normally.
  3. When a ShuffleQueryStageExec stage completes, Spark collects MapOutputStatistics from the Driver's MapOutputTracker: the byte size and record count of each post-shuffle partition.
  4. The AdaptiveSparkPlanExec node evaluates a set of rewrite rules against those statistics. Rules are things like: "these 180 partitions are each under 64 KB; coalesce them into 3 partitions" or "partition 7 is 40 GB while the median is 200 MB; split it into 8 sub-partitions."
  5. If any rule fires, the plan tree is rewritten โ€” the QueryStageExec nodes for the next set of stages are regenerated with the new plan. If no rule fires, the original plan continues unchanged.
  6. The next set of stages is submitted with the rewritten plan. Steps 3โ€“6 repeat at every subsequent shuffle boundary until the job completes.

This loop is the core of AQE. It converts query execution from a one-shot compile-and-run model into an iterative compile-execute-recompile model that converges on the right plan using real data. The cost is the overhead of collecting and evaluating statistics at each checkpoint. The benefit is that every downstream stage executes with a plan that is informed by what the data actually looked like in the stage that just finished.


๐Ÿง  Deep Dive: QueryStageExec, ShuffleQueryStageExec, and How Spark Rebuilds the Plan Mid-Flight

The Internals: From Catalyst Physical Plan to Adaptive Re-Planning

The entry point for AQE is the AdaptiveSparkPlanExec node. When spark.sql.adaptive.enabled = true, Catalyst's QueryExecution wraps the final physical plan in this node before code generation. AdaptiveSparkPlanExec holds a mutable reference to the current physical plan and maintains the state machine that controls re-planning.

The plan tree is decomposed into QueryStageExec subtrees at shuffle and broadcast exchange boundaries. Each QueryStageExec is a self-contained unit of execution: it wraps exactly one ShuffleExchangeExec (for shuffle stages) or BroadcastExchangeExec (for broadcast stages), has a unique integer ID, and transitions through three states โ€” Pending, Running, and ResultReady. A stage cannot begin execution until all stages it depends on have reached ResultReady.

The specialized subclass you will see in the Spark UI and physical plan output for join optimization is ShuffleQueryStageExec. When this stage completes, it holds a reference to a MapOutputStatistics object โ€” a compact structure containing two arrays per stage: bytesByPartitionId (post-serialization byte sizes) and recordsByPartitionId (row counts). These are the exact values that AQE rules evaluate.

The re-planning logic lives in AdaptiveSparkPlanExec.reOptimize(). After all child stages of a given plan node have reached ResultReady, reOptimize() runs three categories of rules:

Coalesce Partitions Rule (CoalesceShufflePartitions): Iterates over bytesByPartitionId for the completed shuffle stage. Groups consecutive partitions whose combined byte size falls below spark.sql.adaptive.advisoryPartitionSizeInBytes (default 64 MB), subject to a minimum of spark.sql.adaptive.coalescePartitions.minPartitionNum. The rule rewrites the plan to use a CoalescedShuffleReaderExec that reads multiple physical shuffle blocks as a single logical partition โ€” without re-materializing the shuffle data.

Skew Join Rule (OptimizeSkewedJoin): Computes the median partition size across the completed shuffle. Any partition whose size exceeds max(skewedPartitionFactor ร— medianSize, skewedPartitionThresholdInBytes) is flagged as skewed. Each skewed partition on the build side (or probe side, or both) is split into ceil(partitionSize / advisoryPartitionSizeInBytes) sub-partitions. The matching rows on the opposite side are replicated for each sub-partition โ€” the plan is rewritten to run multiple parallel join tasks and union their results. This is the mechanism that would have saved the logistics pipeline described in the opening.

Join Strategy Upgrade Rule (OptimizeLocalShuffleReader, DemoteBroadcastHashJoin, and the strategy-switching path in JoinSelectionHelper): If a shuffle stage's total byte size turns out to be below spark.sql.autoBroadcastJoinThreshold at runtime, AQE can upgrade a planned sort-merge join to a broadcast hash join. The already-materialized shuffle data is broadcast rather than re-shuffled. This is the "broadcast upgrade" โ€” it corrects the case where Catalyst estimated a table as too large to broadcast but the actual compressed shuffle output is well within the threshold.

Dynamic Partition Pruning (PartitionPruning): When a query joins a large partitioned fact table against a smaller dimension table on the partition column, AQE can push a broadcast filter of the dimension table's join keys down to the fact table's file scan. The fact table scanner reads only the partitions whose keys appear in the dimension table, eliminating entire HDFS/S3 list and read operations before any shuffle begins. This rule fires at planning time (not at the shuffle checkpoint) but is categorized under AQE because it is enabled and evaluated as part of the adaptive planning framework.

Mathematical Model: The Skew Detection Formula and Coalescing Arithmetic

The AQE rewrite rules are driven by explicit threshold arithmetic applied to the MapOutputStatistics arrays. Understanding the formulas makes it possible to predict exactly which rules will fire for a given dataset and to tune thresholds deliberately rather than by trial and error.

Coalescing model. Let B be the total post-shuffle byte size across all partitions, P be the current partition count, and T be advisoryPartitionSizeInBytes. The coalesce rule computes the target partition count as:

targetPartitions = max(ceil(B / T), minPartitionNum)

Consecutive partitions are greedily merged left-to-right: once the accumulated byte size of a group exceeds T, the group is closed and a new group begins. This produces a CoalescedShuffleReaderExec with targetPartitions logical partitions mapping to ranges of physical shuffle blocks. No data is re-sorted or re-written โ€” the merge is purely a read-side logical grouping. For B = 300 MB, T = 64 MB, and minPartitionNum = 1: targetPartitions = max(ceil(300/64), 1) = max(5, 1) = 5.

Skew detection model. Let s_i be the byte size of partition i and M be the median of all non-empty s_i values. Partition i is declared skewed if and only if both conditions hold simultaneously:

s_i > skewedPartitionFactor ร— M
s_i > skewedPartitionThresholdInBytes

With defaults skewedPartitionFactor = 5 and skewedPartitionThresholdInBytes = 256 MB: a 300 MB partition on a dataset with a 10 MB median passes condition 1 (300 > 5 ร— 10 = 50) and condition 2 (300 > 256). A 200 MB partition on the same dataset passes condition 1 (200 > 50) but fails condition 2 (200 < 256) โ€” it is not flagged. The two-condition gate prevents the rule from splitting moderately-sized partitions when the median is very small.

When a partition is flagged, the split count is:

splitCount = ceil(s_i / advisoryPartitionSizeInBytes)

Each sub-partition reads a contiguous byte range [(k-1) ร— T, k ร— T) of the skewed partition's shuffle file, where k โˆˆ {1 โ€ฆ splitCount}. The matching right-side data is replicated splitCount times โ€” one copy per sub-task. Total right-side data shipped: splitCount ร— (right-side partition size). This is the skew handling trade-off in quantitative form: you pay splitCount ร— rightPartitionSize in additional right-side data movement to avoid the s_i-sized straggler task.

Broadcast upgrade threshold check. After a shuffle stage completes, if the total output byte size B_total satisfies B_total โ‰ค autoBroadcastJoinThreshold, and the downstream operator is a sort-merge join, the upgrade rule fires. The check is against the compressed, serialized shuffle output โ€” not the in-memory size. In-memory decompression typically inflates this by a factor of 3โ€“5ร—, which is why the default 10 MB threshold is deliberately conservative.

Performance Analysis: What Runtime Re-Planning Costs and What It Returns

The most common concern about AQE is the overhead of statistics collection. In practice, it is negligible relative to the job runtime for any non-trivial workload, but it is worth understanding where the cost lives.

MapOutputStatistics collection happens in the Driver. When a shuffle stage completes, each executor reports its shuffle file sizes to the Driver's MapOutputTracker via the heartbeat mechanism โ€” this communication happens regardless of whether AQE is enabled, because the Driver must know shuffle block locations to schedule downstream tasks. AQE adds only the step of aggregating these byte-size arrays into the statistics objects and running the rule evaluations. For a typical 200-partition shuffle stage, this is a microsecond-range CPU operation on the Driver.

The larger cost is the materialization barrier: AQE can only re-plan after a shuffle stage is fully complete, which means it adds a hard synchronization point before evaluating coalesce or skew rules. In practice, this barrier already exists in Spark's stage scheduler โ€” no task in Stage N+1 can begin until all tasks in Stage N have written their shuffle output. AQE does not add a new barrier; it uses the existing one.

The performance return for each feature class is well-documented in Databricks and Apache Spark benchmarks. Dynamic coalescing reduces task scheduling overhead dramatically for post-aggregation stages, where output is often orders of magnitude smaller than input. In TPC-DS benchmark queries on multi-TB datasets, coalescing alone has been shown to reduce stage latency by 40โ€“70% for post-shuffle stages that would otherwise spin up hundreds of tasks on sub-megabyte partitions. Skew join handling is the highest-impact feature in absolute terms: a single undetected skewed partition can multiply total job runtime by 10x to 50x; eliminating it restores linear scaling. Broadcast upgrades remove entire shuffle phases โ€” a sort-merge join materialization eliminated at runtime saves the equivalent of the full shuffle write and read cost for the upgraded table.


๐Ÿ—๏ธ AQE in Multi-Stage Production DAGs: Where Runtime Re-Planning Fits in the Catalyst Pipeline

AQE does not replace Catalyst โ€” it extends it. Understanding the boundary between static optimization and adaptive re-planning is essential to using both correctly.

What Catalyst still owns: All logical optimization (predicate pushdown, constant folding, subquery decorrelation, CBO statistics-based join reordering) happens before the physical plan is produced and before AQE wraps it. AQE does not re-run logical optimization rules at shuffle boundaries. The join order in a multi-way join is set by Catalyst at planning time and is not reconsidered by AQE unless you explicitly enable spark.sql.adaptive.joinReorder.enabled (an experimental feature in Spark 3.x). Column pruning, partition pruning at the catalog level, and filter pushdown are also entirely Catalyst's domain.

What AQE owns: Physical operator selection (join strategy), partition layout (coalesce/split), and scan range (dynamic partition pruning). These are the three dimensions where runtime statistics provide better signal than catalog estimates.

The interaction with broadcast join threshold: One of the most important multi-stage interactions is between AQE's broadcast upgrade rule and spark.sql.autoBroadcastJoinThreshold. Catalyst evaluates the threshold at planning time against catalog statistics. If those statistics are stale or absent, a table that is actually 30 MB at query time might be estimated as 500 MB, and Catalyst will plan a sort-merge join. When the shuffle stage completes and the actual data size is visible in MapOutputStatistics, AQE's broadcast upgrade rule sees the 30 MB figure and rewrites the SMJ as a BHJ โ€” without the data needing to be re-shuffled, because the shuffle output already exists and can be broadcast directly.

AQE and Exchange reuse: Spark's ReuseExchange optimization deduplicates identical shuffle or broadcast exchanges across a query plan โ€” for example, when the same CTE or subquery result is used in multiple joins. AQE is compatible with exchange reuse, but adds a constraint: a ShuffleQueryStageExec that is reused across plan nodes is only executed once, and its MapOutputStatistics are shared across all consumers. This means coalesce and skew decisions for that stage are applied consistently everywhere the exchange is consumed.

AQE and multi-way joins: For a query with N join operations, the plan tree contains Nโ€“1 shuffle boundaries (for sort-merge joins) or fewer (if some joins are broadcast). AQE evaluates re-planning rules at each of these boundaries in a bottom-up traversal. The result is that a five-stage join pipeline can have its partition layout independently optimized after Stage 1 completes, then again after Stage 2 completes, and so on โ€” each stage benefits from the accumulated statistics of all upstream stages.

When AQE does not fire: AQE operates at shuffle exchange boundaries. It does not apply to queries with no shuffle at all (single-stage plans, local sorts, broadcast-only joins with no aggregation). It also does not apply to the first stage of a query โ€” there are no upstream shuffle statistics to collect before Stage 0 executes. For the initial stages, Catalyst's estimates remain the only guide. This is why AQE complements, rather than replaces, good statistical hygiene (running ANALYZE TABLE, keeping partition metadata current, and setting appropriate broadcast thresholds).

AQE and Structured Streaming: AQE is disabled for streaming micro-batches by default. Each micro-batch is a short-lived Spark job, and the overhead of adaptive re-planning โ€” even when small โ€” is non-trivial relative to a 1โ€“5 second trigger interval. The coalesce and skew rules are designed for bulk ETL workloads with stage runtimes in the minute range, not for streaming jobs where the shuffle itself might last only 200ms.


๐Ÿ“Š Partition Counts, Skew Detection Thresholds, and the Runtime Metrics That Drive Every AQE Decision

AQE's three core features are each governed by a specific set of statistics derived from MapOutputStatistics. Understanding what those statistics look like and where to find them in the Spark UI is essential for diagnosing whether AQE is helping and tuning it when it is not.

Dynamic Coalescing Metrics:

The coalesce rule works on bytesByPartitionId โ€” the post-serialization byte size of each partition after a shuffle stage completes. Before coalescing, a query with spark.sql.shuffle.partitions = 200 on a 2 GB post-aggregation result produces 200 partitions averaging 10 MB each โ€” acceptable, but potentially worth coalescing to reduce task scheduling overhead. A query that reduces a 500 MB input to a 20 MB result might produce 200 partitions of 100 KB each โ€” 200 tasks processing 100 KB is pure overhead. With advisoryPartitionSizeInBytes = 64MB, the coalesce rule groups those 200 tiny partitions into at most 1 partition, and the downstream stage schedules 1 task instead of 200.

In the Spark UI, look at the "Tasks" tab for the stage immediately after a shuffle. Before AQE coalescing, you will see hundreds of short-lived tasks with similar durations in the milliseconds range. After coalescing fires correctly, you will see a small number of tasks with substantially longer but more uniform runtimes. The "AQE" label on the stage in the DAG Visualization tab (Spark 3.2+) confirms that AQE re-planning modified the stage.

Skew Join Detection Metrics:

The skew join rule derives two threshold values from bytesByPartitionId for the shuffle stage preceding the join:

  • Median partition size โ€” the 50th percentile byte size across all non-empty partitions.
  • Skew threshold โ€” max(spark.sql.adaptive.skewJoin.skewedPartitionFactor ร— median, spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes).

Default values: skewedPartitionFactor = 5, skewedPartitionThresholdInBytes = 256MB. A partition is declared skewed only if it is both 5ร— the median and larger than 256 MB โ€” this two-condition gate prevents false positives when the median is tiny (e.g., 1 MB median ร— 5 = 5 MB threshold, which is far too low to matter).

For the logistics pipeline in the opening scenario: median 200 MB ร— 5 = 1 GB threshold, skewed threshold 256 MB. The UNKNOWN region partition at 498 GB vastly exceeds both conditions. AQE would split it into ceil(498GB / 64MB) = 778 sub-partitions in theory, but advisory partition size and minimum partition count constraints bound this to a more practical split โ€” typically into ceil(skewedSize / advisoryPartitionSizeInBytes) sub-tasks, each reading a contiguous byte range of the skewed partition's shuffle file. The matching right-side data is replicated for each sub-task and the results are unioned.

Dynamic Partition Pruning Metrics:

DPP does not use MapOutputStatistics โ€” it operates at the scan level, before shuffle. The key metric is the reduction in bytes read at the storage layer. In the Spark UI, compare the "Input" bytes reported for the fact table scan stage with and without DPP. On TPC-DS queries like Q17 (which joins a large date-partitioned sales fact table with a small date dimension), DPP can reduce the fact table scan from scanning all partitions to scanning only the partition for a single date range โ€” a 365ร— reduction in I/O for a date-range query.

Overhead of Statistics Collection:

MapOutputStatistics collection adds a constant-time operation per completed stage on the Driver side โ€” no task scheduling, no RPC to executors beyond the standard shuffle file registration. For a 1000-partition shuffle stage, the statistics arrays are two arrays of 1000 longs (partition bytes and record counts) โ€” approximately 16 KB of Driver-side memory. The rule evaluation CPU cost is a linear scan over these arrays to compute median, identify outliers, and determine coalesce groups โ€” nanoseconds per partition. Total overhead per shuffle boundary: typically under 5 ms on a healthy Driver.


๐Ÿ“Š Visualizing the AQE Re-Planning Loop and Skew Join Splitting

The following diagrams show the two most important AQE flows: the full re-planning loop that runs at every shuffle boundary, and the skew join splitting mechanism that converts one blocking task into many parallel ones.

The re-planning loop below captures the lifecycle of an AdaptiveSparkPlanExec node. The key insight is that the plan tree is mutable โ€” each completed shuffle stage produces statistics that feed directly into rule evaluation, and any triggered rule rewrites the plan before the next set of stages is submitted. Stages that pass through without triggering any rule are scheduled with their original structure unchanged.

flowchart TD
    A[Catalyst produces static physical plan] --> B[AdaptiveSparkPlanExec wraps the plan tree]
    B --> C[Identify leaf-most QueryStageExec nodes]
    C --> D[Submit stages with no pending dependencies]
    D --> E[ShuffleQueryStageExec tasks execute and write shuffle files]
    E --> F[MapOutputStatistics collected per partition]
    F --> G{Evaluate AQE rewrite rules}
    G -->|Coalesce rule fires| H[Merge small consecutive partitions into fewer tasks]
    G -->|Skew rule fires| I[Split oversized partitions into sub-tasks and replicate join side]
    G -->|Broadcast upgrade fires| J[Rewrite sort-merge join as broadcast hash join]
    G -->|No rule fires| K[Retain current stage plan unchanged]
    H --> L[Submit next stages with rewritten plan]
    I --> L
    J --> L
    K --> L
    L --> M{More stages remain?}
    M -->|Yes - more shuffle boundaries| C
    M -->|No - final stage| N[Job completes with fully adaptive plan]

The skew join splitting diagram below shows what happens at the partition level when the skew rule fires. Rather than running one catastrophic task that processes the entire skewed partition, Spark creates multiple sub-tasks that each process a contiguous byte range of the same shuffle file, running them in parallel and unioning the results. The right-side join data is replicated per sub-task โ€” a deliberate trade-off: slight data duplication on the right side in exchange for eliminating the blocking skewed task on the left.

flowchart TD
    A[Shuffle stage completes - partition stats available] --> B[Compute median partition byte size]
    B --> C{Check each partition against skew thresholds}
    C -->|Partition within normal range| D[Schedule as single join task]
    C -->|Partition exceeds skewedPartitionFactor x median and 256MB minimum| E[Flag partition as skewed]
    E --> F[Split into N sub-partitions by byte range]
    F --> G[Replicate right-side matching rows for each sub-partition]
    G --> H[Schedule N parallel join tasks - each handles one sub-partition]
    H --> I[Union all N sub-task outputs]
    D --> J[Collect all join results]
    I --> J
    J --> K[Final joined dataset - uniform task durations]

Dynamic Partition Pruning operates at the scan level rather than the shuffle level. The diagram below shows how a broadcast filter derived from the dimension table is pushed down to the fact table's file scanner, allowing the scanner to skip entire partitions before they are read from storage โ€” eliminating both the I/O and the shuffle cost for the pruned data.

flowchart TD
    A[Query: join partitioned fact table with dimension table on partition key] --> B[Catalyst detects DPP-eligible star schema join pattern]
    B --> C[Dimension table size below broadcast threshold]
    C -->|Yes| D[Build broadcast filter from dimension table join keys]
    C -->|No| E[No DPP - full fact table scan proceeds]
    D --> F[Push broadcast filter to fact table FileSourceScanExec]
    F --> G[Fact table scanner reads only matching partitions from storage]
    G --> H[Shuffle only the relevant data subset]
    E --> I[Scan all fact table partitions from storage]
    I --> J[Shuffle entire fact table - much larger network cost]
    H --> K[Final join result - reduced I/O and shuffle]
    J --> K

๐ŸŒ Where AQE Delivers the Biggest Wins: ETL Pipelines, Star Schema Joins, and Cardinality Collapse

AQE's value is not uniform across all workloads. Three categories of production jobs benefit most dramatically, and understanding them helps you prioritize AQE configuration for your cluster.

ETL pipelines with filter-and-aggregate patterns. The canonical ETL pipeline reads raw event data, applies filters that discard 80โ€“95% of the input, aggregates the remainder, and writes the result. The trouble is spark.sql.shuffle.partitions is set for the read scale, not the post-filter scale. If you read 2 TB and filter down to 40 GB, a shuffle.partitions = 2000 setting produces 2000 post-aggregation partitions averaging 20 MB each โ€” borderline workable. But after a second aggregation that reduces output to 400 MB, you still have 2000 partitions at 200 KB each, and the final write stage schedules 2000 tasks to write 2000 tiny files. File system write overhead, task scheduling latency, and small-file proliferation all compound. AQE coalescing catches this at runtime โ€” it sees the 400 MB total and 2000 partitions and reduces them to 7 partitions of ~57 MB, scheduling 7 tasks and producing 7 output files. No configuration change required.

Star schema joins at data warehouse scale. OLAP workloads typically join a central fact table (often hundreds of GB to TB range) with multiple dimension tables (typically MB to low-GB range). The challenge is that dimension tables vary in size by query: a "product category" dimension might be 5 MB for one query and 2 GB for a filtered version of a product dimension in another. Static broadcast thresholds force engineers to choose between a threshold too low (misses valid broadcast candidates) or too high (risks OOM on executors). AQE's broadcast upgrade rule resolves this dynamically โ€” it evaluates the actual materialized shuffle size at runtime and upgrades the join strategy when the data is genuinely small enough. In TPC-DS benchmark results published by the Spark community, AQE's broadcast upgrades reduce total job time by 15โ€“35% on Q1โ€“Q99 without any query-by-query tuning.

High-cardinality group-by aggregations on real-world keys. Keys like customer IDs, device fingerprints, and product SKUs follow power-law distributions in virtually every real-world dataset โ€” a small fraction of key values account for a large fraction of records. A group-by on such a key without AQE produces a shuffle with a handful of massively oversized partitions (the high-frequency keys) and hundreds of negligible partitions (the long tail). The high-frequency partition tasks dominate stage runtime. AQE's skew handling splits these hot-key partitions, turning a stragglers-dominated stage into a balanced one. Databricks internal benchmarks on production customer workloads showed median job latency reductions of 25โ€“50% for group-by-heavy pipelines after enabling skew join optimization in Spark 3.2.


โš–๏ธ When AQE Cannot Save You: Failure Modes, Blind Spots, and Workloads Where Static Plans Win

AQE is powerful but not universal. Senior engineers who rely on it uncritically run into its limits.

The first-stage skew problem. AQE fires at shuffle boundaries โ€” after a stage completes. It cannot fix skew in Stage 0, because there is no upstream shuffle statistics to collect. If your input data is partitioned on a skewed key at the source (e.g., a Parquet file partitioned by a high-frequency date), the first stage will still produce skewed intermediate results. AQE will detect and fix the skew at the next shuffle boundary, but the first-stage execution remains unbalanced. The fix for this is source-level data partitioning strategy, not AQE.

Skew below the detection threshold. The two-condition skew gate (skewedPartitionFactor ร— median AND skewedPartitionThresholdInBytes) means that moderate skew can fly under the radar. If your median partition is 1 GB and your skewed partition is 4 GB (4ร— the median, below the default 5ร— factor), AQE will not flag it. A single 4 GB task processing while the rest finish quickly will still cause a stragglers problem โ€” just a less severe one. Lowering skewedPartitionFactor to 3 catches more skew at the cost of more false positives (more unnecessary splits), so it needs to be tuned to your distribution.

Broadcast upgrade false positives causing OOM. When AQE upgrades a sort-merge join to a broadcast hash join at runtime, it sends the materialized shuffle data to every executor. If that data is 1 GB and you have 100 executors, each executor receives 1 GB of broadcast data โ€” and if the executor heap is tight, this triggers OOM. The threshold check AQE performs is against the shuffle output byte size, which is compressed and serialized. The in-memory broadcast representation is larger (often 3โ€“5ร— after deserialization). Setting spark.sql.autoBroadcastJoinThreshold conservatively and monitoring executor memory during broadcast upgrade stages is essential.

Dynamic Partition Pruning requires column-level partition alignment. DPP only fires when the join is on the fact table's partition column โ€” the column used when writing the Parquet/Delta files with partitionBy(...). If the join key is not the partition column, no pruning is possible. Many real-world fact tables are partitioned by event_date for time-range queries, but a query joining on customer_id (which is not the partition column) will not benefit from DPP regardless of AQE settings.

AQE in Structured Streaming and incremental workloads. As noted earlier, AQE is disabled for streaming micro-batches. For incremental batch workloads (e.g., DeltaLake incremental processing), AQE applies normally but the gains are smaller because incremental batches tend to have uniform small sizes where the partition count is already well-matched to the data volume.

Plan complexity and long AQE re-planning chains. For complex queries with many shuffle stages (15+ in a deeply nested pipeline), AQE introduces planning latency at each boundary. Each re-planning evaluation is fast individually, but the cumulative cost across 15 boundaries adds up. In extreme cases with very fast stages (sub-second), the re-planning overhead can be a non-trivial fraction of total runtime. The spark.sql.adaptive.optimizer.excludedRules configuration lets you disable specific rules if they are adding overhead without benefit.


๐Ÿงญ Configuring AQE for Your Workload: A Decision Guide for Production Engineers

The table below maps workload characteristics to specific AQE configuration decisions. The default values are reasonable starting points for most workloads; the tuning column addresses cases where the defaults under- or over-fit.

Workload CharacteristicAQE FeatureDefaultWhen to Tune
Many post-shuffle tiny partitions (sub-MB)Dynamic coalescingadvisoryPartitionSizeInBytes = 64MBIncrease to 128โ€“256 MB for write-heavy pipelines where large output files are preferred
Highly skewed join keys (power-law distribution)Skew joinskewedPartitionFactor = 5, threshold 256 MBLower factor to 3 for severe skew; raise threshold to 512 MB to avoid splitting mildly uneven partitions
Star schema with small dimension tablesBroadcast upgradeUses autoBroadcastJoinThresholdKeep default 10 MB; raise cautiously and monitor executor memory if upgrading large shuffles
Fact table partitioned by query join keyDynamic partition pruningdynamicPartitionPruning.enabled = trueNo tuning needed if partition column alignment is correct; verify via EXPLAIN output
Structured Streaming micro-batchAll AQE featuresDisabled for streamingDo not enable; adds latency overhead with minimal benefit at micro-batch granularity
Very fast stages (under 1 second)Coalesce onlyminPartitionNum = spark.default.parallelism / 2Disable coalesce for streaming; set minPartitionNum higher to avoid over-merging
Deeply nested multi-way joins (10+ shuffles)AllAll enabledUse excludedRules to disable specific rules adding overhead without triggering

Use AQE when: your job has multiple shuffle stages, your data has non-uniform distributions, your post-filter cardinality is significantly smaller than your source cardinality, or your job suffers from consistent stragglers in join stages.

Prefer static tuning when: your job has well-characterized data with stable statistics, you use Spark 2.x clusters where AQE is unavailable, your job is a streaming micro-batch pipeline, or you need fully deterministic execution plans for audit/compliance purposes.

Alternative approaches when AQE is insufficient: Source-level data partitioning (rewrite skewed data with a salt column), pre-aggregation before the join stage, or explicit repartition with a custom partitioner for known hot keys.


๐Ÿงช Three Walkthroughs: Watching AQE Change the Query Plan in Real Scenarios

These three walkthroughs trace the exact before-and-after execution path for each major AQE feature. The intent is to make the re-planning rules concrete by connecting the abstract mechanism to observable Spark UI and EXPLAIN plan changes.

Scenario 1: Dynamic Coalescing Converts 800 Tiny Tasks into 12 Right-Sized Ones

The query: a daily revenue aggregation that reads 30 GB of transaction events, filters to the last 7 days (retaining ~3 GB), groups by merchant and currency, and writes the result to Delta.

Without AQE: spark.sql.shuffle.partitions = 800 (set for the full 30 GB input). After filtering and grouping, the shuffle output is 3 GB across 800 partitions โ€” averaging 3.75 MB each. The write stage schedules 800 tasks, each writing a 3.75 MB file. Stage runtime: 95 seconds (dominated by task scheduling overhead and small-file write latency). Output: 800 files in the Delta table.

With AQE and advisoryPartitionSizeInBytes = 64MB: MapOutputStatistics shows 800 partitions totaling 3 GB. The coalesce rule groups them into ceil(3GB / 64MB) = 48 coalesced partitions (bounded by minPartitionNum if set). Write stage schedules 48 tasks, each writing a ~62 MB file. Stage runtime: 12 seconds. Output: 48 files. The downstream VACUUM and compaction jobs also benefit from the smaller file count.

Scenario 2: Skew Join Splitting Converts a Single 8-Hour Task into 20 Minutes of Parallel Work

The query: a customer event join where event records reference a customer dimension keyed by customer_id. The data has a known hot customer โ€” a test account used for load generation โ€” that accounts for 40% of all event records.

Without AQE: The shuffle stage assigns all events for the hot customer ID to one partition (hash of the ID modulo shuffle.partitions). That partition is 200 GB in a 500 GB total shuffle. One task processes 200 GB while the other 199 tasks average 1.5 GB and finish in 4 minutes. The hot partition task runs for 8 hours and 12 minutes. Total stage time: 8h 12m.

With AQE and skewedPartitionFactor = 5, skewedPartitionThresholdInBytes = 256MB, advisoryPartitionSizeInBytes = 128MB: Median partition size is 1.5 GB / 199 = ~7.5 MB (for the non-hot partitions). Wait โ€” actually the median would be 1.5GB total / 199 ~ 7.5MB mean, but since it's a heavy-tailed distribution the actual median would be much lower. AQE computes the median (not mean), so the 40% hot partition is an extreme outlier. The 200 GB partition far exceeds max(5 ร— median, 256 MB). AQE splits it into ceil(200GB / 128MB) = 1600 sub-partitions. In practice, the minimum viable split count is bounded by task overhead constraints โ€” AQE will split into ceil(skewedSize / advisoryPartitionSizeInBytes) sub-tasks. The matching right-side rows are replicated per sub-task. 1600 parallel join tasks each process ~128 MB. Stage runtime: approximately 18โ€“22 minutes (limited by cluster parallelism, not by data skew). The 8-hour straggler is gone.

Scenario 3: Dynamic Partition Pruning Eliminates a Full Fact Table Scan

The query: a sales report joining a sales_fact table (2 TB, partitioned by sale_date into 730 daily partitions for 2 years) with a promotion_dim table (50 MB) on sale_date where the promotion dimension is filtered to a specific 14-day campaign period.

Without DPP: Spark scans all 730 partitions of sales_fact (2 TB), shuffles the full result, then joins with the 50 MB promotion filter applied after the shuffle. Input bytes read: 2 TB. Shuffle write: ~800 GB (after projection pushdown).

With DPP enabled: Catalyst identifies the star-schema pattern. The promotion dimension is broadcastable (50 MB). The broadcast join keys are the distinct sale_date values in the 14-day campaign window โ€” a set of 14 dates. This set is pushed down as a dynamic filter to the sales_fact FileSourceScanExec. The scanner reads only 14 of 730 partitions: 2 TB ร— (14/730) โ‰ˆ 38.4 GB. Input bytes read: 38.4 GB. Shuffle write: approximately 15 GB. The query runs in minutes rather than hours, driven purely by the storage-layer scan reduction that DPP enables before any executor work begins.


๐Ÿ› ๏ธ Apache Spark AQE: Enabling and Tuning with Production-Ready Configuration

Apache Spark ships with AQE fully implemented in the open-source codebase since Spark 3.0. In Spark 3.2+, spark.sql.adaptive.enabled defaults to true. The following configuration reference covers the full set of AQE knobs with production-ready values and notes on when to deviate from the defaults.

Enabling AQE and Core Coalescing:

# Enable AQE (default: true in Spark 3.2+; must be explicit in Spark 3.0-3.1)
spark.sql.adaptive.enabled=true

# Target partition size after coalescing (default: 64MB)
# Increase to 128-256MB for write-heavy pipelines to reduce small-file proliferation
spark.sql.adaptive.advisoryPartitionSizeInBytes=64MB

# Minimum number of post-coalesce partitions (default: spark.default.parallelism / 2)
# Prevents over-coalescing to a single partition when output is tiny
spark.sql.adaptive.coalescePartitions.minPartitionNum=1

# Maximum number of partitions that can be coalesced into one (default: Integer.MAX_VALUE)
# Set to a reasonable bound if you need to limit file sizes
spark.sql.adaptive.coalescePartitions.initialPartitionNum=200

# Enable partition coalescing specifically (default: true when AQE is enabled)
spark.sql.adaptive.coalescePartitions.enabled=true

Skew Join Configuration:

# Enable skew join optimization (default: true when AQE is enabled)
spark.sql.adaptive.skewJoin.enabled=true

# Partition is skewed if size > skewedPartitionFactor * median partition size (default: 5)
# Lower to 3 for datasets with known extreme skew; raise to 10 to only catch severe cases
spark.sql.adaptive.skewJoin.skewedPartitionFactor=5

# Minimum absolute size for a partition to be considered skewed (default: 256MB)
# Prevents splitting small partitions that happen to be > 5x a tiny median
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes=256MB

Dynamic Partition Pruning:

# Enable DPP for star-schema joins on partition columns (default: true)
spark.sql.optimizer.dynamicPartitionPruning.enabled=true

# Reuse broadcast results for DPP filters (default: true)
# Prevents re-executing the dimension table scan for the DPP filter
spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly=true

# Fallback: allow non-broadcast DPP with an extra exchange (default: false)
# Enable only if dimension table is above broadcast threshold but DPP is still valuable
spark.sql.optimizer.dynamicPartitionPruning.fallbackFilterRatio=0.5

Broadcast Upgrade and Join Strategy:

# Threshold for Catalyst static broadcast selection AND AQE broadcast upgrade (default: 10MB)
# AQE uses this value at runtime against actual shuffle output size
spark.sql.autoBroadcastJoinThreshold=10MB

# Enable local shuffle reader optimization (avoids full network fetch when coalescing)
# (default: true) โ€” keep enabled; only disable if seeing driver OOM on large plan trees
spark.sql.adaptive.localShuffleReader.enabled=true

For a production Spark on Kubernetes or EMR deployment, a practical baseline AQE configuration combining all of the above might be:

spark.sql.adaptive.enabled=true
spark.sql.adaptive.advisoryPartitionSizeInBytes=128MB
spark.sql.adaptive.coalescePartitions.minPartitionNum=5
spark.sql.adaptive.skewJoin.skewedPartitionFactor=5
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes=256MB
spark.sql.autoBroadcastJoinThreshold=20MB
spark.sql.optimizer.dynamicPartitionPruning.enabled=true

For a full deep-dive on Spark's query planning pipeline, see the companion post on Spark DataFrames and the Catalyst Optimizer.


๐Ÿ“š Lessons Learned: What Running AQE in Production Actually Teaches You

AQE does not replace data quality work โ€” it rescues you from estimation failures. The most common misconception about AQE is that enabling it means you no longer need to think about partition sizing, skew, or join strategy. AQE handles the cases where your estimates were wrong at planning time. If your data is fundamentally broken โ€” corrupted partition statistics, missing ANALYZE TABLE runs, wildly uneven source partitioning โ€” AQE will mitigate the worst consequences, but it cannot substitute for a well-engineered data pipeline. Think of it as the airbag, not the seatbelt.

Broadcast upgrades can cause OOM if you set autoBroadcastJoinThreshold too high. The threshold is checked against compressed shuffle output bytes. In-memory, the broadcast object is typically 3โ€“5ร— larger after deserialization. A 200 MB shuffle that gets upgraded to a broadcast join sends approximately 600 MB โ€“ 1 GB of data to every executor. On a cluster with 100 executors and 8 GB heaps, that is 100 GB of broadcast data distributed across the cluster โ€” 1 GB per executor just for one broadcast. If multiple stages trigger broadcast upgrades simultaneously, executor OOM is a real risk. Set the threshold conservatively (10โ€“25 MB default range is well-chosen) and monitor executor memory peaks in the Spark UI's Executors tab.

Lowering skewedPartitionFactor too aggressively creates its own overhead. Splitting a non-skewed partition into sub-tasks adds join complexity: the right side must be replicated for each sub-task, sub-task outputs must be unioned, and extra tasks must be scheduled. For a dataset where partitions vary by 3โ€“4ร— naturally (not skewed, just uneven), setting skewedPartitionFactor = 2 causes constant unnecessary splits. Benchmark your job with the default factor first, then lower it only if the Spark UI shows a consistent straggler at a specific stage.

Dynamic Partition Pruning requires partition column discipline in your data lake. DPP is the highest-ROI AQE feature for data warehouse workloads, but it only fires when the join key matches the partition column. Many teams write Parquet/Delta files with poor partition column choices (partitioning by low-cardinality string columns instead of high-cardinality date columns, or not partitioning at all), then wonder why DPP is not appearing in their EXPLAIN output. Audit your fact tables with DESCRIBE EXTENDED table_name to verify partition columns, and design new tables with query patterns in mind.

Use EXPLAIN FORMATTED after enabling AQE to verify which rules fired. The physical plan output for an AQE-enabled query includes AdaptiveSparkPlan isFinalPlan=false before execution and AdaptiveSparkPlan isFinalPlan=true after. Comparing the two plans reveals exactly which stages were rewritten, which join strategies were upgraded, and which partitions were coalesced. This is your primary debugging tool when AQE is enabled but the expected optimization is not happening. Look for CustomShuffleReaderExec nodes (indicating coalescing occurred), SkewedJoinSplit nodes (indicating skew handling), and BroadcastHashJoin nodes where SortMergeJoin appeared in the pre-execution plan.

AQE and Z-ordering in Delta Lake interact in subtle ways. Delta's Z-order optimization rearranges data within files to improve predicate pushdown for columnar reads. When combined with DPP, the effect is multiplicative: DPP prunes partitions at the partition level, while Z-order + data skipping prunes row groups within partitions. On TPC-DS Q17 with a Z-ordered fact table, the combination of DPP and Delta's data skipping can reduce I/O by 99%+ compared to a full scan. But Z-ordered tables should not be repartitioned after the fact โ€” a coalesce pass that merges Z-ordered partitions into larger files destroys the Z-order locality and eliminates the data skipping benefit. Keep advisoryPartitionSizeInBytes large enough that AQE coalescing does not merge across Z-order partition boundaries.


๐Ÿ“Œ Summary: AQE Is Not Magic โ€” It Is Runtime Statistics Applied at Every Shuffle Boundary

  • AQE wraps the static Catalyst physical plan in AdaptiveSparkPlanExec and re-evaluates the plan after each shuffle stage completes, using real MapOutputStatistics instead of pre-execution catalog estimates.
  • Dynamic coalescing groups consecutive small post-shuffle partitions into fewer, right-sized tasks โ€” eliminating the "800 tasks on 400 MB" scheduling overhead without changing partition count settings manually.
  • Skew join optimization detects partitions that exceed skewedPartitionFactor ร— median at runtime, splits them into parallel sub-tasks, replicates the join counterpart, and unions the results โ€” converting an 8-hour straggler task into a 20-minute parallel operation.
  • Broadcast upgrades correct Catalyst's planning-time mistakes by checking the actual shuffle output size at runtime and upgrading sort-merge joins to broadcast hash joins when the data turns out to be small enough โ€” no code change required.
  • Dynamic Partition Pruning pushes broadcast join key filters down to the fact table scanner, allowing the storage layer to skip entire date or region partitions before any shuffle begins โ€” the highest-I/O-reduction feature in the AQE suite.
  • AQE fires only at shuffle boundaries: it cannot fix Stage 0 skew, streaming micro-batch latency, or join key misalignment with partition columns. It complements good data engineering โ€” it does not replace it.
  • The primary debugging tool is EXPLAIN FORMATTED with the post-execution plan, combined with the Spark UI's Stage and AQE tabs: compare isFinalPlan=false vs. isFinalPlan=true to see exactly which rules fired and which stages were rewritten.

Share
Abstract Algorithms

Written by

Abstract Algorithms

@abstractalgorithms