Spark Architecture: Driver, Executors, DAG Scheduler, and Task Scheduler Explained
Why your Spark job fails with OOM at the same stage every time — and how understanding the driver, DAG scheduler, and executors fixes it
Abstract AlgorithmsAI-assisted content. This post may have been written or enhanced with the help of AI tools. While efforts are made to ensure accuracy, the content may contain errors or inaccuracies. Please verify critical information independently.
TLDR: Spark's architecture is a precise chain of responsibility. The Driver converts user code into a DAG, the DAGScheduler breaks it into stages at shuffle boundaries, the TaskScheduler dispatches tasks to Executors respecting data locality, and the Cluster Manager handles raw resource allocation. Understanding what each component does — and what kills it — is the difference between tuning Spark intentionally and tuning it by luck.
🔥 Forty-Five Minutes of Work, One Error, No Answers
A data engineer on a mid-sized e-commerce team submits a Spark job that reads 200 GB of clickstream events, aggregates them by country and device type, and writes a summary table. The job runs for 45 minutes and then fails with:
java.lang.OutOfMemoryError: GC overhead limit exceeded
at org.apache.spark.sql.execution.aggregate.HashAggregateExec...
The engineer checks executor memory: 4 GB. They bump it to 8 GB and resubmit. Same failure. Same stage. Same line in the stack trace. They try 12 GB. Again. At this point they have spent three hours on a job that should take twenty minutes, and every configuration change is a guess.
The root problem is not memory size. The problem is a mental model mismatch. When you write df.groupBy("country", "device").count(), it looks like a local computation. It is not. That single line triggers a chain of events across a Driver JVM, a DAGScheduler, a TaskScheduler, a Cluster Manager, and dozens of Executor JVMs distributed across worker nodes — and a misconfiguration anywhere in that chain can kill the job in ways that look identical from the outside.
This post builds the mental model from first principles. By the end you will know exactly what each Spark component does, which component is responsible when a job fails, and how to read the Spark UI to confirm your diagnosis — turning those three-hour debugging sessions into fifteen-minute fixes.
📖 Why Spark's Architecture Is the Missing Mental Model for Every Data Engineer
Spark is often introduced as "distributed pandas" or "fast MapReduce." Both descriptions are technically accurate and pedagogically dangerous. They invite engineers to treat Spark like a local computation library with a bigger machine behind it — which is exactly the mental model that produces the OOM story above.
Spark is a distributed execution engine with a layered architecture designed around a specific division of labor. Each layer has a precise scope: the Driver layer owns planning and coordination, the execution layer owns data processing, and the resource layer owns container allocation. Confusing which layer owns what leads directly to misconfiguration: adding executor memory when the real bottleneck is shuffle partitions, enabling speculative execution when the real problem is a hot key, or scaling the cluster when the real issue is a Driver OOM.
Understanding the architecture pays off in two concrete ways. First, it makes every Spark configuration option legible — instead of cargo-culting settings from Stack Overflow, you know which component a setting controls and why. Second, it makes failures diagnosable — when you see a FetchFailedException in Stage 3, you immediately know the problem is in Stage 2's shuffle output, not Stage 3's executor.
The mental model starts with five components. Everything else is a detail.
🔍 The Five Components Every Spark Job Runs Through
Every Spark application, regardless of size or cluster manager, passes through the same five components. Before examining any one of them in depth, it helps to see how they relate.
The diagram below maps all major participants in a running Spark application. Notice the two distinct zones: everything inside the Driver JVM (planning and coordination) and everything in the Executor JVMs (data processing). The Cluster Manager sits at the boundary between them.
graph TD
UserCode[User Application Code]
SS[SparkSession]
Driver[Driver JVM Process]
DAG[DAGScheduler]
Task[TaskScheduler]
BE[SchedulerBackend]
CM[Cluster Manager]
WN1[Worker Node 1]
WN2[Worker Node 2]
WN3[Worker Node 3]
E1[Executor JVM]
E2[Executor JVM]
E3[Executor JVM]
Th1[Task Threads]
Th2[Task Threads]
Th3[Task Threads]
UserCode --> SS
SS --> Driver
Driver --> DAG
DAG --> Task
Task --> BE
BE --> CM
CM --> WN1
CM --> WN2
CM --> WN3
WN1 --> E1
WN2 --> E2
WN3 --> E3
E1 --> Th1
E2 --> Th2
E3 --> Th3
Read this diagram from top to bottom: user code enters via SparkSession, passes through two scheduler layers inside the Driver JVM, crosses the Cluster Manager boundary, and arrives at Executor JVMs on worker nodes where actual tasks run. Each component owns exactly one job in this chain.
| Component | Zone | Primary responsibility |
| Driver JVM | Planning | Runs user code; creates SparkSession; holds the entire job state |
| DAGScheduler | Planning (inside Driver) | Converts RDD lineage into stages separated by shuffle boundaries |
| TaskScheduler | Planning (inside Driver) | Assigns tasks from a stage to specific executors using data locality |
| Cluster Manager | Resource boundary | Allocates CPU and memory containers to the Driver's requests |
| Executor JVM | Execution | Runs assigned tasks; manages data in memory; writes shuffle output |
The most critical architectural fact: if the Driver JVM dies, the entire job dies. There is no recovery. If a single Executor JVM dies, only its in-progress tasks are affected — the job can recover by rescheduling those tasks elsewhere. This asymmetry defines everything about how Spark handles failures and why driver resource sizing is never optional.
⚙️ How the Driver and Schedulers Convert User Code Into Distributed Tasks
The Driver is the entry point for your Spark application. When you run spark-submit my_job.py, a JVM process starts on the driver node, runs your main() (or Python equivalent via Py4J), and creates the SparkSession. From that point, the Driver acts as the central coordinator for the entire distributed computation.
From SparkSession Call to Logical Plan
The moment you call a DataFrame transformation, SparkSession captures it as a lazy logical plan — a description of what you want, not how to execute it. Nothing runs yet. When you call an action like .show(), .write(), or .count(), lazy evaluation ends and the Driver begins executing:
sequenceDiagram
participant App as User Code
participant SS as SparkSession
participant Cat as Catalyst Optimizer
participant DAG as DAGScheduler
App->>SS: df.groupBy(country).count().show()
SS->>Cat: Build logical plan from transformation chain
Cat->>Cat: Apply predicate pushdown and column pruning
Cat->>Cat: Choose physical plan - partial aggregate strategy
Cat->>DAG: Submit optimized RDD DAG
DAG-->>App: Job submitted - execution begins
Catalyst's plan optimization matters enormously in practice. For a groupBy().count(), Catalyst recognizes it can split the computation into a partial aggregation (one count per partition, Stage 1) and a final merge (Stage 2), avoiding the need to shuffle all raw rows across the network. This optimization alone can reduce shuffle volume by 10-100x compared to a naive implementation.
Why Driver OOM kills the entire job. The Driver holds the job's entire metadata state: task status for every stage, the RDD lineage graph, shuffle map output locations, and broadcast variable data. If --driver-memory runs out — common when df.collect() pulls millions of rows into driver heap, or when broadcast joins use multi-GB lookup tables — the JVM exits and the entire application terminates. No recovery path exists. This is why collect() in production code is almost always the wrong design choice.
The DAGScheduler: Narrow vs. Wide Transformations
When the Driver submits a job, the DAGScheduler takes the logical RDD lineage graph and converts it into a Directed Acyclic Graph (DAG) of stages. A stage is a group of tasks that can all run in parallel without shuffling data between nodes. Stage boundaries appear at every wide transformation — any operation that requires data to move between partitions across the network.
| Transformation type | Stage boundary? | Examples | Why |
| Narrow | No | map, filter, flatMap, select, withColumn | Each output partition depends on exactly one input partition |
| Wide | Yes | groupBy, join, repartition, sortBy, distinct | Output partitions depend on data from multiple input partitions |
A map after a filter stays in the same stage. A groupBy forces the DAGScheduler to end the current stage, write shuffle map files to local executor disk, and start a new stage that reads those files across the network. The shuffle boundary is the most expensive operation in any Spark job — both in time (network I/O + disk I/O) and space (shuffle files persist to disk and must be fetched by all downstream tasks).
Job → Stage → Task hierarchy:
- One Action call (
.show(),.write(),.count()) creates one Job - The DAGScheduler breaks that job into Stages — one new stage per shuffle boundary
- Each stage is divided into Tasks — one task per input partition
- A job reading 400 partitions with one
groupByproduces Stage 1 with 400 tasks, then Stage 2 withspark.sql.shuffle.partitionstasks (default: 200)
Here is what the DAG looks like for a word count job — two stages separated by a single shuffle boundary:
graph TD
subgraph Stage1[Stage 1 - Tokenize and Partial Count]
A[Read Input Partitions from Storage]
B[flatMap - split lines into words]
C[map - emit word and count 1]
A --> B --> C
end
subgraph Stage2[Stage 2 - Shuffle Merge and Write]
D[Shuffle Read - fetch word buckets by hash]
E[reduceByKey - sum counts per word]
F[Write output partitions]
D --> E --> F
end
Stage1 --> Shuffle[Wide Transformation Boundary - reduceByKey causes shuffle]
Shuffle --> Stage2
Stage 1 tasks all run fully in parallel. Stage 2 cannot start until every single Stage 1 task completes — one slow task in Stage 1 holds Stage 2 hostage. This is the mechanism behind data skew being such a severe Spark performance problem.
The TaskScheduler: Data Locality and Task Assignment
Once the DAGScheduler hands a completed stage to the TaskScheduler, it turns abstract tasks into physical assignments — which executor runs which task, with what retry policy, and in what preference order based on data location.
The TaskScheduler prefers tasks that read data local to the executor running them, trying five locality levels in order:
| Locality level | Meaning | Typical cost |
PROCESS_LOCAL | Data is in the same executor JVM cache | Zero network overhead |
NODE_LOCAL | Data is on the same physical node (HDFS block on this worker) | Minimal — in-node IPC |
NO_PREF | Data has no location preference (RDD operation result) | Low |
RACK_LOCAL | Data is on a different node but the same rack | Medium — cross-node |
ANY | Data must be fetched from a remote rack or object storage | High — full network hop |
The TaskScheduler will wait spark.locality.wait (default: 3 seconds) at each level before relaxing to the next. On Kubernetes clusters with data in cloud object storage (S3, GCS, ADLS), ANY is the correct and expected outcome — all executors have equally remote access to object storage, so no locality preference exists. Increasing spark.locality.wait in this environment only adds scheduling delay.
Speculative execution handles straggler tasks: if one task is running dramatically slower than the median for its stage, Spark launches a duplicate on a different executor. The first copy to finish wins; the other is killed. Enable with spark.speculation=true. It helps when slowness is caused by a degraded node, but does not help for genuine data skew — a task processing 10x more data than its peers will still be slow on any executor.
🧠 Deep Dive: How Executors Manage Memory and How the Shuffle Layer Actually Works
The Internals: Unified Memory Model, Shuffle Files, and Task Concurrency
Each Executor is a separate JVM process on a worker node. It receives serialized tasks from the TaskScheduler, deserializes them, runs them against data in memory or storage, and either returns results to the Driver or writes shuffle output for downstream stages.
Unified Memory Model. Every executor JVM has a configurable heap (--executor-memory). Spark divides this heap into three regions:
graph TD
Heap[Executor JVM Heap - set by executor-memory]
Reserved[Reserved Memory - 300 MB fixed - JVM and Spark internals]
SparkMem[Spark Memory Region - 60 pct of remaining heap]
UserMem[User Memory Region - 40 pct of remaining heap]
ExecMem[Execution Memory - joins, sorts, aggregations, shuffle buffers]
StoreMem[Storage Memory - cached RDDs, DataFrames, broadcast variables]
Heap --> Reserved
Heap --> SparkMem
Heap --> UserMem
SparkMem --> ExecMem
SparkMem --> StoreMem
The Spark Memory Region is shared dynamically between Execution Memory and Storage Memory. When execution needs more heap for a sort or join and storage has idle cached data, it can evict cache blocks to reclaim that space — and vice versa. This unified model replaced the static 60/40 split in Spark 1.6 and is generally better, but it means large .cache() calls can crowd out the execution memory that joins and aggregations need, causing spills to disk.
Shuffle map files. When a Stage 1 task finishes, it writes its output as shuffle map files on the executor's local disk — one file per downstream shuffle partition, indexed for fast access. Stage 2 tasks then use this shuffle map location information (reported to the Driver via the MapOutputTracker) to fetch exactly the data buckets they are responsible for from all Stage 1 executors. This disk-backed shuffle design is deliberate: it tolerates executor failure (the Driver can request Stage 1 re-execution to regenerate lost shuffle files) and avoids the need to keep all intermediate data in memory simultaneously.
Task concurrency per executor. Each executor runs up to --executor-cores tasks simultaneously. More cores per executor means more memory contention between tasks sharing the JVM heap. A common configuration anti-pattern is --executor-cores 1 to avoid GC pressure — this works but wastes JVM process overhead. The practical sweet spot is 4–5 cores per executor, with executor memory sized to give each task 2–4 GB of working space (executor-memory / executor-cores).
Heartbeat and liveness. Executors send heartbeats to the Driver every spark.executor.heartbeatInterval (default: 10 seconds). If the Driver does not receive a heartbeat within spark.network.timeout (default: 120 seconds), it marks the executor as lost and reschedules all in-progress tasks on surviving executors.
Performance Analysis: Where Each Component Adds Latency
Not all Spark slowness looks the same. Each component contributes to latency in a distinctive pattern visible in the Spark UI:
| Component | Bottleneck mechanism | Spark UI signal | Fix |
| Driver | Oversized broadcast variable serialization | Long "broadcast exchange" time in SQL tab | Keep broadcasts under 200 MB; use spark.broadcast.compress=true |
| Driver | collect() or toPandas() on large result sets | Driver OOM; application exits entirely | Write to storage instead; use show(n) for inspection |
| DAGScheduler | Unnecessary repartitions creating extra shuffle boundaries | More stages than expected in stages tab | Remove redundant repartition() calls; prefer coalesce() for size reduction |
| Shuffle boundary | Default 200 shuffle partitions wrong for data size | Stage 2 tasks either instant (too many) or OOM (too few) | Set spark.sql.shuffle.partitions to 2–3x total executor cores |
| TaskScheduler | Locality fallback to ANY on every task | High shuffle read time; tasks not collocated with HDFS data | Increase spark.locality.wait on HDFS clusters; acceptable on cloud object storage |
| Executor | GC pressure from large partitions | High GC time column in task metrics table | Increase partition count to reduce per-task data volume |
| Executor | Storage memory crowding execution memory | Spill-to-disk events in task metrics | Use MEMORY_AND_DISK storage level; reduce cache scope |
| Executor | Data skew — one task has 50x more data than peers | One long-running task while 199 finish; visible in task duration bar | Salt groupBy keys; enable AQE skew join optimization |
📊 End-to-End Flow: What Happens When You Call df.groupBy("country").count().show()
Every component described above participates in executing that single line. The sequence below traces the complete path from user call to printed results, touching every layer of the architecture.
sequenceDiagram
participant App as User Code
participant Driver as Driver JVM
participant DAG as DAGScheduler
participant Task as TaskScheduler
participant CM as Cluster Manager
participant Exec as Executor JVMs
App->>Driver: df.groupBy(country).count().show()
Driver->>Driver: Catalyst builds and optimizes plan
Driver->>DAG: Submit job with optimized RDD DAG
DAG->>DAG: Identify shuffle boundary after groupBy
DAG->>DAG: Create Stage 1 - partial aggregate per partition
DAG->>Task: Submit Stage 1 task set - 400 tasks
Task->>CM: Request executor slots
CM-->>Task: Executor endpoints returned
Task->>Exec: Serialize and send tasks with data location
Exec->>Exec: Deserialize task - read partition - hash-aggregate locally
Exec->>Exec: Write shuffle map files to local disk
Exec-->>Task: Task complete - shuffle output location reported to Driver
Task-->>DAG: All Stage 1 tasks complete
DAG->>DAG: Create Stage 2 tasks - one per shuffle partition
DAG->>Task: Submit Stage 2 task set - 200 tasks
Task->>Exec: Send Stage 2 tasks with shuffle fetch locations
Exec->>Exec: Fetch shuffle buckets from Stage 1 executors
Exec->>Exec: Merge partial aggregates - final count per country
Exec-->>Task: Stage 2 tasks complete
Task-->>DAG: All stages done
DAG-->>Driver: Job complete
Driver->>Driver: Collect result rows from executors
Driver-->>App: Print rows to console
Steps 1–6 (Driver planning). The moment .show() is called, lazy evaluation ends. Catalyst fires and rewrites the plan — for this groupBy().count(), it generates a two-phase aggregation: partial counts per partition (Stage 1) then a merge (Stage 2). This optimization avoids shuffling all raw rows and instead shuffles only partial (country, partial_count) pairs. The DAGScheduler then creates Stage 1 with one task per input partition (400 tasks if the source has 400 partitions).
Steps 7–12 (Stage 1 execution). The TaskScheduler requests executor slots from the Cluster Manager and distributes tasks. Each Stage 1 task reads one input partition, applies the partial aggregation, and writes its output to local shuffle map files on that executor's disk. The task reports its shuffle file locations back to the Driver's MapOutputTracker so Stage 2 tasks know where to fetch data.
Steps 13–19 (Stage 2 execution). Stage 2 begins only after every Stage 1 task completes — this is the stage barrier. Each Stage 2 task is responsible for one shuffle partition and fetches the matching data buckets from every Stage 1 executor. The number of Stage 2 tasks equals spark.sql.shuffle.partitions (default: 200). This default is the source of most Spark performance problems: 200 is correct for some jobs and catastrophically wrong for others.
Steps 20–21 (result collection). Final results are collected back to the Driver JVM and printed. This step is where Driver OOM most commonly occurs: .show() limits the collection, but .collect() or .toPandas() will pull all rows into Driver heap simultaneously.
🌍 How Spark's Architecture Plays Out in Three Production Scenarios
Scenario 1: The e-commerce nightly aggregation. A retailer runs a nightly Spark job reading 500 GB of purchase events from S3 and computing revenue by country and product category. The cluster runs on EMR with YARN. Because S3 has no data locality, all tasks run at ANY locality — this is expected and acceptable. The real bottleneck is the groupBy(country, category) shuffle with the default 200 partitions. A few countries (US, UK, DE) represent 60% of all events, so 3 of the 200 shuffle partitions are enormous while 150 are nearly empty. Enabling AQE (spark.sql.adaptive.enabled=true) lets Spark automatically coalesce the small partitions and split the large ones, cutting Stage 2 runtime from 40 minutes to 8 minutes with no code changes.
Scenario 2: The streaming pipeline with executor churn. A media company runs Structured Streaming ingest from Kafka into Delta Lake on Databricks. During traffic spikes, Kubernetes kills executor pods that exceed memory limits. Because the External Shuffle Service is not configured, the shuffle output from those pods is lost — triggering Stage 1 re-execution every time an executor dies. What should be a recoverable failure becomes a cascade of re-executions. Enabling the External Shuffle Service decouples shuffle file lifetime from executor lifetime: files survive executor death, and only the in-progress tasks need to be retried.
Scenario 3: The BI team's ad-hoc notebook. A data analyst runs a Spark notebook with df.collect() to pull query results into a Pandas DataFrame for plotting. The query returns 50 million rows. The Driver OOM appears immediately, the Spark application crashes, and every other notebook sharing the same SparkSession context is also killed. The fix is not more driver memory — it is redesigning the notebook to write results to Delta Lake and read only an aggregated summary (a few thousand rows) back into Pandas for visualization.
⚖️ Failure Modes and the Recovery Limits of Each Component
Understanding which failures are recoverable and which are terminal is the most operationally useful aspect of Spark architecture knowledge.
Driver OOM — terminal. Cause: collect(), oversized broadcast variable, or driver-side aggregation of large results. Result: the Driver JVM exits, and the entire Spark application terminates with no recovery. Every executor is killed immediately. Fix: eliminate collect() from production code, enforce broadcast size limits (spark.sql.broadcastTimeout), and size --driver-memory based on actual driver-side data volume — not as an afterthought.
Executor OOM — recoverable (within limits). Cause: a task's data volume exceeds available execution memory after GC cannot reclaim enough heap. Result: the task fails and is rescheduled on another executor (up to spark.task.maxFailures times, default: 4). If all retries fail, the stage fails. If the stage fails enough times (spark.stage.maxConsecutiveAttempts, default: 4), the job is marked failed. Fix: increase partition count to reduce per-task data volume. Adding raw executor memory without reducing partition size often just delays the OOM by one data size increment.
Executor lost (container killed) — recoverable with caveat. Cause: YARN/Kubernetes kills the executor container due to memory limit enforcement, node failure, or preemption. Result: in-progress tasks are rescheduled. However, if shuffle output files from a completed stage were stored on that executor and no External Shuffle Service is running, the upstream stage must be re-executed to regenerate the shuffle data. Fix: enable the External Shuffle Service to decouple shuffle file lifetime from executor process lifetime.
FetchFailedException — cascading failure. Cause: a Stage 2 task cannot fetch shuffle data from a Stage 1 executor because that executor died after Stage 1 completed. Result: Stage 2 tasks fail, Spark re-executes the relevant Stage 1 tasks to regenerate shuffle files, then retries Stage 2. This failure pattern is confusing because the visible error appears in Stage 2 but the root cause is the missing Stage 1 shuffle output — and the first failure entry in the Stage 1 retry history always has the most useful stack trace. Fix: External Shuffle Service (server-side) or remote shuffle service (e.g., Celeborn, Uniffle) for large clusters.
Stage retry limit exceeded — terminal. After spark.stage.maxConsecutiveAttempts consecutive failures (default: 4), the stage is marked failed and the job exits. The failure entry in the Spark UI's failed stages tab shows the last attempt's error, but the first attempt's error is often more diagnostic. Always check the earliest stage attempt in the history to understand the root cause.
🧭 Choosing Your Cluster Manager and Sizing Spark Resources
Spark supports three cluster managers, each with a different operational context. The choice affects resource allocation behavior, but not Spark's internal DAG or task execution logic.
| Cluster Manager | Best environment | Key characteristic | Dynamic allocation support |
| Standalone | Dev/test; single-tenant Spark clusters | Spark-native; simplest setup; no queue management | Yes (built-in) |
| YARN | Enterprise Hadoop clusters; multi-tenant | Queue-based fairness; integrates with existing Hadoop infrastructure | Yes (requires External Shuffle Service) |
| Kubernetes | Cloud-native; container-first environments | Pod-based executors; strong isolation; fine-grained resource control | Yes (requires shuffle service or remote shuffle) |
Sizing decisions that depend on architecture understanding:
| Decision | Principle | Common mistake |
--driver-memory | Size for: broadcast variable data + collected result rows + job metadata | Default 1 GB; underprovisioned for any job with broadcast joins |
--executor-memory | Size for: (partition size × 3 safety factor) / executor-cores | Increasing this to fix OOM without increasing partition count — just delays the failure |
--executor-cores | 4–5 per executor is the production sweet spot | --executor-cores 1 wastes JVM overhead; --executor-cores 10 causes GC contention |
spark.sql.shuffle.partitions | Target: 100–200 MB of data per shuffle partition; set to 2–3x total executor cores | Default 200 is wrong for nearly every production job |
spark.dynamicAllocation.enabled | Use on shared clusters with variable job parallelism | Disable if job has steady-state parallelism — dynamic allocation adds overhead for stable jobs |
When to use fixed vs. dynamic executors. Dynamic allocation is the right default for shared clusters where jobs have variable parallelism across stages — a job with 10 stages of varying task counts wastes cluster resources holding 100 executors during the single-task stages. Fixed executor counts are better for recurring jobs with predictable parallelism where the startup cost of container allocation is a significant fraction of job runtime.
🧪 Using the Spark UI Stages Tab to Diagnose Architecture Problems
Every bottleneck described in this post has a visible signature in the Spark UI's Stages tab. Knowing what to look for converts a three-hour debugging session into a fifteen-minute diagnosis.
Reading the task duration distribution. The stages tab shows a task duration bar chart for each stage. A healthy stage has a roughly normal distribution: most tasks finish at the same time, with a small tail. A skewed stage shows a long tail — a few tasks running 10–100x longer than the median. This is data skew, and it points to a groupBy or join key with a hot value. The fix is salting or enabling AQE's skew join optimization.
Reading the GC time column. The task metrics table in each stage includes a GC Time column. If GC time is more than 10–20% of task duration, the executor is spending more time collecting garbage than running your code. This almost always means partition sizes are too large: each task is creating too many intermediate objects that the JVM cannot reclaim fast enough. Increase spark.sql.shuffle.partitions to spread data across more smaller tasks.
Reading shuffle read and write sizes. The shuffle read size per task tells you the actual data volume each Stage 2 task processed. If shuffle sizes vary by a factor of 10x or more across tasks in the same stage, you have data skew at the key level. If every task's shuffle read size is under 10 MB, you have too many partitions — each task's scheduling overhead is dominating its compute time. Target 100–500 MB of shuffle read per task as a rough calibration.
Reading the Task Deserialization Time column. If this is high (more than 1–2 seconds per task), the task payload being sent from the Driver to the executor is too large. The usual cause is an oversized broadcast variable or a UDF that captures a large closure. Reduce the broadcast size, or materialize the data to storage and read it from executors directly rather than serializing it through the Driver.
🛠️ PySpark and spark-submit: The Architecture-Aware Configuration Reference
The configuration options below directly control the components described in this post. Each one should be set based on your job's data volume and cluster size — not left at defaults.
spark-submit \
# Driver JVM heap — must hold broadcast data, job metadata, and collected results
--driver-memory 4g \
# Executor JVM heap per executor process
--executor-memory 8g \
# Task threads per executor JVM — 4 is a safe default for most workloads
--executor-cores 4 \
# Fixed executor count — use only when NOT using dynamic allocation
--num-executors 20 \
# Shuffle partitions — set to 2-3x total executor cores for most production jobs
# Default of 200 is wrong for nearly every workload outside small test datasets
--conf spark.sql.shuffle.partitions=400 \
# Dynamic allocation — lets Spark request and release executors based on workload
--conf spark.dynamicAllocation.enabled=true \
--conf spark.dynamicAllocation.minExecutors=5 \
--conf spark.dynamicAllocation.maxExecutors=50 \
# Speculative execution — launches duplicate tasks for straggler detection
--conf spark.speculation=true \
# External Shuffle Service — decouples shuffle files from executor JVM lifetime
--conf spark.shuffle.service.enabled=true \
# AQE — automatically coalesces small shuffle partitions and handles skew
--conf spark.sql.adaptive.enabled=true \
--conf spark.sql.adaptive.skewJoin.enabled=true \
my_job.py
The most impactful single change for most production jobs is setting spark.sql.shuffle.partitions explicitly. The default of 200 was chosen for small demonstration datasets. If your cluster has 200 executor cores and your groupBy produces 10 billion rows, 200 shuffle partitions means each partition task processes 50 million rows — likely to OOM regardless of how much executor memory you allocate. Set shuffle partitions to approximately 2–3x your total executor core count, then let AQE coalesce down automatically if the actual data turns out to be smaller than expected.
For a full deep-dive on Spark performance tuning — AQE, broadcast threshold tuning, skew join handling, and the Catalyst optimizer — see the Apache Spark Engineering series roadmap.
📚 Hard-Won Lessons from Production Spark Jobs
Driver memory is chronically underprovisioned everywhere. Most teams allocate 2–4 GB for the driver "because it's just coordination." Then they add a broadcast join for a reference table. Then they add a UDF that captures a large Python object. Then they add a nightly collect() for a metrics export. The driver OOM appears two months later with no obvious recent cause. Start with 4–8 GB for driver memory on any job with broadcast joins or result collection, and monitor the Driver Memory metric in the Spark UI — it is rarely checked and frequently alarming.
The default 200 shuffle partitions is wrong for almost every production job. It was chosen as a reasonable default for documentation examples and local testing. A job processing 50 GB across 200 shuffle partitions creates ~250 MB per partition task — usually fine. The same configuration on a 5 TB job creates ~25 GB per task — catastrophic. Set spark.sql.shuffle.partitions explicitly for every production job, or enable AQE to let Spark manage it dynamically.
FetchFailedException's root cause is always in the stage before it. Engineers new to Spark inevitably try to debug FetchFailedException by looking at Stage 2's error details. But FetchFailedException means shuffle data from Stage 1 is missing, so the root cause is always in Stage 1's executor or the absence of an External Shuffle Service. Always open the stages tab, find Stage 1's retry history, and read the first failure — not the most recent one.
Killing and resubmitting beats debugging a stuck stage. When a stage has been running for 4× its expected time with no visible progress — no newly completed tasks, no failed tasks, just running — the most productive action is to kill the job, increase shuffle partitions by 2×, and resubmit. A stuck shuffle is almost always a data distribution problem that configuration changes fix faster than live debugging.
The Spark UI stages tab is the single most valuable debugging tool in the ecosystem. Every bottleneck pattern — GC pressure, data skew, locality misses, oversized task payloads, empty shuffle partitions — has a distinct signature in the task metrics table. Learn to read it before changing any configuration. It takes five minutes to read and saves hours of guessing.
📌 TLDR & Key Takeaways
- The Driver is the control plane. It runs user code, builds the execution plan, and coordinates all distributed work via DAGScheduler and TaskScheduler. Driver OOM is a terminal failure — there is no recovery path.
- The DAGScheduler creates stage boundaries at wide transformations. Every
groupBy,join,repartition, andsortByends one stage and starts another with a shuffle in between. Count your wide transformations to predict your stage count. - The TaskScheduler optimizes for data locality. It prefers
PROCESS_LOCAL>NODE_LOCAL>RACK_LOCAL>ANY. On cloud-native clusters with object storage,ANYis expected and correct — do not inflatespark.locality.waitin this environment. - Executors share one unified memory pool between execution (joins, sorts, shuffles) and storage (caches). Large
.cache()calls can crowd out execution memory and cause shuffle spills to disk. - The Cluster Manager handles only resources. YARN, Kubernetes, and Standalone all do the same thing from Spark's perspective: allocate executor containers. They are completely unaware of stages, tasks, or shuffle data.
spark.sql.shuffle.partitions=200is wrong for most production jobs. Set it to 2–3x total executor cores, or enable AQE to manage it automatically. This single setting change fixes more Spark OOM and slowness issues than any other configuration.- FetchFailedException is Stage 1's fault, not Stage 2's. When shuffle data disappears because an executor died, the failure surfaces in the consuming stage — but the root cause is always the producing stage or the missing External Shuffle Service.
- Every failure mode has a specific fix. Driver OOM → eliminate
collect()and right-size driver heap. Executor OOM → increase partition count. FetchFailedException → add External Shuffle Service. Data skew → salt keys or enable AQE skew join optimization.
📝 Practice Quiz
Test your understanding of Spark's internal architecture. Questions 1–4 have explicit answers; questions 5 and 6 are open-ended and have no single correct answer.
- A Spark job has this transformation chain:
read → filter → groupBy → map → write. How many stages does the DAGScheduler create, and where is the stage boundary?
Correct Answer: Two stages. Stage 1 covers read → filter (both narrow transformations). The groupBy is a wide transformation that triggers a shuffle, creating the stage boundary. Stage 2 covers the groupBy reduce side → map → write. The map after groupBy is narrow and stays in Stage 2.
- Your Stage 2 fails with
FetchFailedException: Failed to connect to executor X. Stage 2 reads shuffle output from Stage 1. What actually went wrong, and what is the Spark recovery path?
Correct Answer: Executor X — which ran Stage 1 tasks and stored their shuffle map files — has died or become unreachable. The shuffle files are no longer accessible. Spark's recovery path: mark the missing shuffle output as lost, re-execute only the Stage 1 tasks that produced output on Executor X, then retry the Stage 2 tasks that failed. If no External Shuffle Service is configured, all Stage 1 tasks that ran on Executor X must be re-run.
- You have 40 executors × 5 cores = 200 total executor cores.
spark.sql.shuffle.partitionsis left at default 200. After the job runs, the Spark UI shows Stage 3 and Stage 4 have tasks completing in under 100ms each. What is the performance problem, and how do you fix it?
Correct Answer: Sub-100ms task completion means the tasks are trivially small. Most of the elapsed time is scheduling overhead — serializing the task, shipping it to the executor, deserializing it, and reporting completion — rather than actual computation. This is the "too many tiny tasks" problem. Fix: reduce spark.sql.shuffle.partitions to a value where each task runs 100–500ms of actual work. Alternatively, enable AQE (spark.sql.adaptive.enabled=true), which automatically coalesces small post-shuffle partitions at runtime.
- A data engineer caches a 300 GB DataFrame with
.cache(). Later join operations in the same job start spilling to disk. What is happening in Spark's unified memory model?
Correct Answer: .cache() stores the DataFrame in Storage Memory. Because Storage Memory and Execution Memory share the same Spark Memory Region, the large cache consumes a significant fraction of the available pool. When downstream joins need Execution Memory for sort buffers and hash tables, Spark cannot reclaim enough space and spills the join intermediate data to disk. Fix: cache only DataFrames that are reused multiple times; use StorageLevel.MEMORY_AND_DISK so that evicted cache blocks spill to disk rather than preventing eviction entirely; or increase executor heap if the cache is genuinely necessary.
- (Open-ended) Your job runs on Kubernetes with data in GCS. The Spark UI shows that 100% of tasks have
ANYdata locality. A colleague suggests increasingspark.locality.waitfrom 3s to 30s to improve data locality. Evaluate this suggestion and explain your reasoning.
Correct Answer: No single correct answer — this is a design reasoning question. Key considerations: On Kubernetes with data in GCS, all executor pods have equally remote access to object storage — there is no locality advantage to waiting for a specific pod. ANY locality is correct and expected in this environment. Increasing spark.locality.wait to 30s would add up to 30 seconds of scheduling delay per stage before tasks are assigned, with zero benefit in terms of actual data access speed. The correct action is to leave spark.locality.wait at its default (or reduce it) and focus optimization effort on partition sizing, executor memory, and shuffle configuration.
- (Open-ended — no single correct answer) A production Spark job runs for 3 hours. The Spark UI shows Stage 7 has 499 completed tasks and 1 task running for 2.5 hours. All other stages finish in under 15 minutes. Speculative execution is disabled. What are the possible root causes, and what would you try first?
Correct Answer: The most likely cause is data skew: one shuffle partition received a disproportionate fraction of the data — possibly because the groupBy key has a hot value (e.g., one user account drives 40% of all events). Other possible causes: the task was assigned to a degraded node with slow disk or network; the task encountered a pathological input (e.g., a deeply nested JSON record that takes O(n^2) time to parse). First action: enable speculative execution (spark.speculation=true) for the immediate run to launch a duplicate of the straggler task on a fresh executor — this handles the slow-node case. For a permanent fix: profile the shuffle read size distribution in Stage 7 to confirm skew, then apply salting to the groupBy key or enable AQE skew join optimization (spark.sql.adaptive.skewJoin.enabled=true).
🔗 Related Posts
- Apache Spark for Data Engineers: RDDs, DataFrames, and Structured Streaming — The companion foundations post: how RDDs and DataFrames work, lazy evaluation, partitioning strategy, and the Catalyst optimizer from a user-facing API perspective.
- How Kafka Works: Brokers, Partitions, Consumer Groups, and Replication Explained — Kafka's internal architecture as a counterpart to Spark: understand how the distributed log, partition assignment, and consumer group coordination work in the messaging layer that feeds many Spark Structured Streaming pipelines.
- Big Data Architecture Patterns: Lambda, Kappa, Medallion, and Data Mesh — How Spark fits into the larger big data architecture picture: Lambda vs. Kappa pipeline design, the Medallion lakehouse pattern, and when to use batch Spark vs. Structured Streaming.

Written by
Abstract Algorithms
@abstractalgorithms
More Posts
Watermarking and Late Data Handling in Spark Structured Streaming
TLDR: A watermark tells Spark Structured Streaming: "I will accept events up to N minutes late, and then I am done waiting." Spark tracks the maximum event time seen per partition, takes the global minimum across all partitions, subtracts the thresho...
Spark Structured Streaming: Micro-Batch vs Continuous Processing
📖 The 15-Minute Gap: How a Fraud Team Discovered They Needed Real-Time Streaming A fintech team runs payment fraud detection with a well-tuned Spark batch job. Every 15 minutes it reads a day's worth of transaction events from S3, scores them agains...
Stateful Aggregations in Spark Structured Streaming: mapGroupsWithState
TLDR: mapGroupsWithState gives each streaming key its own mutable state object, persisted in a fault-tolerant state store that checkpoints to object storage on every micro-batch. Where window aggregations assume fixed time boundaries, mapGroupsWithSt...
Shuffles in Spark: Why groupBy Kills Performance
TLDR: A Spark shuffle is the most expensive operation in any distributed job — it moves every matching key across the network, writes temporary sorted files to disk, and forces a hard synchronization barrier between every upstream and downstream stag...
