Partitioning in Spark: HashPartitioner, RangePartitioner, and Custom Strategies
Partition count and strategy determine whether your Spark job scales linearly or falls apart at 10x data — here is what happens under the hood
Abstract AlgorithmsAI-assisted content. This post may have been written or enhanced with the help of AI tools. While efforts are made to ensure accuracy, the content may contain errors or inaccuracies. Please verify critical information independently.
TLDR: Spark's partition count and partitioning strategy are the two levers that determine whether a job scales linearly or crumbles under data growth. HashPartitioner distributes keys by hash modulo — fast and uniform for well-distributed keys, catastrophic for hot keys. RangePartitioner samples the dataset to find natural boundaries, preserving sort order at the cost of an extra pass. Custom partitioners route by domain logic when neither built-in strategy fits. Diagnosing skew in the Spark UI stage detail view — not guessing at
spark.sql.shuffle.partitions— is the only reliable path to the right fix.
📖 The Pipeline That Ran Fine at 200 Partitions and Collapsed When Data Grew 5x
A retail analytics team had a daily aggregation pipeline: read 50 GB of order events from HDFS, group by store_id, sum revenue per store, and write a summary table. With the default 200 shuffle partitions the job ran in 8 minutes. Six months later, after an acquisition, the dataset grew to 250 GB. The team added executors — the job took longer. They raised executor memory — the job took longer. They doubled spark.sql.shuffle.partitions to 400 — the job took longer and then one executor ran for 47 minutes while the other 49 were idle.
The Spark UI told the real story. In the shuffle-reduce stage, 197 out of 200 tasks finished in under 90 seconds each. Three tasks never finished at all — they ran until the job timed out. The shuffle read size on those three tasks was 48 GB, 51 GB, and 49 GB respectively. The remaining 197 tasks averaged 600 MB. A single chain of retail stores acquired in the acquisition contributed 62% of all order volume, and their store_id values mapped deterministically to three partition buckets under the hash function. Every row for those stores landed in those three buckets. No amount of hardware or memory tuning helps when three tasks are doing 80% of the work.
This is the partition skew problem, and it is the most common Spark performance failure in production. It has nothing to do with cluster size and everything to do with how Spark assigns rows to partitions. Understanding HashPartitioner, RangePartitioner, and custom strategies — and knowing which to apply when — is the skill that separates a Spark engineer who debugs jobs from one who prevents failures.
🔍 What a Partition Is and How Spark Decides the Initial Count
A partition is the fundamental unit of parallelism in Spark. It is a contiguous slice of data that one task processes on one executor thread. If you have 200 partitions, Spark creates exactly 200 tasks for that stage, and each task runs independently on whichever executor has a free thread. No communication happens between tasks within a stage — they are entirely parallel. This is what makes Spark fast: the program looks sequential but executes as hundreds of independent units.
Spark determines the initial partition count at read time through three mechanisms:
File-based sources (HDFS, S3, GCS): Spark splits input files into chunks based on spark.sql.files.maxPartitionBytes, which defaults to 128 MB. A 1.2 GB Parquet file with the default setting produces approximately 10 partitions. Spark also respects HDFS block boundaries — it will not split a block across two partitions because that would force a remote read.
In-memory RDD creation: When you call sc.parallelize(data), the initial partition count defaults to sc.defaultParallelism, which equals the total number of CPU cores across all executors. On a 10-node cluster with 4 cores per executor, defaultParallelism is 40. You can override it by passing a second argument: sc.parallelize(data, 500).
After a shuffle: Every wide transformation (groupBy, join, repartition) creates new partitions for the downstream stage. That count is controlled by spark.sql.shuffle.partitions (default: 200 for Spark SQL DataFrames) or by the explicit argument passed to repartition(N). Post-shuffle partitions are entirely artificial — they do not correspond to any physical file boundary.
The critical insight is that read-time partitions and shuffle-time partitions are independently controlled. A job reading 2 TB of Parquet files might start with 16,000 input partitions (128 MB each) but collapse to 200 shuffle partitions after the first aggregation, creating 200 tasks each processing 10 GB — a recipe for OOM failures.
⚙️ How HashPartitioner, RangePartitioner, and Custom Strategies Assign Keys to Partitions
HashPartitioner: The Default Strategy
HashPartitioner is Spark's default for all shuffle operations. The assignment formula is deceptively simple:
partitionId = Math.abs(key.hashCode()) % numPartitions
For every row, Spark extracts the grouping or join key, calls Java's hashCode() method on it, takes the absolute value, and divides by the target partition count. The remainder is the partition assignment. This happens independently on every executor during the shuffle write phase, which means no coordination is needed — each task can compute the destination partition locally.
The assumption embedded in HashPartitioner is uniform hash distribution: that no key (or small group of keys) dominates the input. When keys are naturally distributed — user IDs in the billions, random UUIDs, ISO country codes across 200 countries — HashPartitioner performs excellently. It spreads rows across partitions with near-equal sizes, and all tasks finish in roughly the same time.
Two failure modes break this assumption:
Hot key problem: A small number of keys appear disproportionately often. In the retail scenario above, a few acquired store_id values contributed 62% of rows. Their hashes land on the same partition buckets repeatedly because the hash function is deterministic. No configuration change fixes this — the fix requires changing how keys are assigned before the shuffle.
Null key problem: Java's hashCode() on a null key throws a NullPointerException. Spark handles nulls by routing all null-keyed rows to partition 0. If 40% of your rows have a null join key, partition 0 receives 40% of total data. The Spark UI shows it as a single outlier task with a massive shuffle read. The fix is always explicit: filter nulls before the join, coalesce them into a sentinel value, or handle them in a pre-processing step.
RangePartitioner: Sorted Distribution
RangePartitioner is used by sortBy() and orderBy() operations and is available explicitly via repartitionByRange(). Instead of hashing, it partitions by value range: all rows with keys in the range [A, B) go to partition 0, [B, C) to partition 1, and so on. The goal is equal-sized partitions by actual data volume, not by hash modulo arithmetic.
To find the correct range boundaries, RangePartitioner executes a two-pass algorithm:
Pass 1 (sampling): Spark samples a fraction of the RDD to estimate the key distribution. The sampler uses reservoir sampling, drawing approximately 20 * numPartitions samples from the dataset. This pass requires reading the data once before the actual computation begins.
Pass 2 (boundary calculation and assignment): Using the sample, Spark sorts the sampled keys and divides them into numPartitions equal-sized buckets. The boundary values become the partition cutpoints. During the actual shuffle write, each task uses a binary search against the boundary array to find the correct partition for each key.
RangePartitioner produces sorted output within each partition and near-equal partition sizes when the sample is representative. Its cost is the extra sampling pass — for large datasets this adds measurable latency, which is why it is only invoked on sort operations and not as the default shuffle strategy.
Custom Partitioner: Domain-Specific Routing
Neither HashPartitioner nor RangePartitioner handles every case. Consider a geo-sharding scenario: you want all events from Europe in partitions 0–49, North America in 50–99, and Asia-Pacific in 100–149. Hash or range partitioning cannot express this routing without preprocessing. A custom partitioner can.
In the RDD API, a custom partitioner is a class extending org.apache.spark.Partitioner that implements two methods: numPartitions (returns the total partition count) and getPartition(key) (returns an integer in [0, numPartitions) for the given key). Spark calls getPartition for every row during the shuffle write phase, exactly as it does for HashPartitioner. The implementation can contain arbitrary routing logic — geographic lookup tables, business-tier mappings, or time-bucketing logic.
The DataFrame API does not expose custom partitioners directly. The standard approach is to add a computed partition column to the DataFrame — a column whose value equals the desired partition index — and then call repartition(numPartitions, col("partition_col")). Spark uses the column value as the partition assignment during the shuffle.
🧠 Deep Dive into How Spark Partitioners Work Internally
The Internals of RangePartitioner
The sampling pass in RangePartitioner is more sophisticated than a simple random sample. Spark uses a reservoir sampling algorithm (specifically a variant of Algorithm R) to draw a fixed-size sample from a stream of unknown length without loading the full dataset into memory. Each sampled element is held in a reservoir of size k = 20 * numPartitions. As the stream progresses, each new element has a probability of k / n (where n is the count of elements seen so far) of replacing a randomly chosen element already in the reservoir. The result is a statistically uniform sample regardless of total dataset size.
The boundary calculation takes the sorted reservoir sample and extracts evenly spaced quantiles. For numPartitions = 200, Spark extracts the 0.5th percentile, 1.0th percentile, 1.5th percentile, ... up to the 99.5th percentile of the sample — producing 199 boundary values that divide the space into 200 equal-probability buckets. These boundaries are broadcast to all executors as a small array.
During the shuffle write phase, each task binary-searches this boundary array for each key it processes. Binary search over a sorted array of 199 elements completes in at most 8 comparisons — negligible overhead per row. The partition assignment is the index returned by the binary search.
One subtle consequence: RangePartitioner is non-deterministic across runs. The reservoir sample changes with each execution because the sampling probability depends on the order in which rows are processed, and row processing order in a distributed system is not guaranteed. Two identical datasets with different file orderings or different task scheduling produce different boundary values and therefore different partition assignments. This means RangePartitioner output cannot be reliably cached and reused across separate Spark sessions without explicitly saving and reloading the partition boundaries.
Performance Analysis: Tuning Partition Count for Production
Partition count is a dial with hard failure modes at both extremes:
Too few partitions (under-partitioned): Each task receives more data than the executor's shuffle memory region can hold. Spark spills excess data to local disk, triggering reads and writes against the OS page cache. The symptom in the Spark UI is uniform spill across all tasks simultaneously — every task is spilling, not just one or two. In extreme cases, tasks fail with java.lang.OutOfMemoryError during the sort-merge phase. Doubling spark.sql.shuffle.partitions usually fixes this immediately.
Too many partitions (over-partitioned): Each task processes only a few megabytes of data, but the scheduler must launch, track, and complete thousands of tasks. Task launch overhead on YARN or Kubernetes is approximately 0.5–2 seconds per task. At 100,000 partitions processing 1 KB each, the scheduling overhead dominates the actual computation time. Additionally, writing too many small output files creates fragmentation that degrades downstream read performance — a 1 TB dataset written as 50,000 files of 20 MB each reads 3–5x slower than the same data written as 400 files of 2.5 GB each.
The production rule of thumb: Target 128 MB to 256 MB of input data per partition. For a 500 GB shuffle, set spark.sql.shuffle.partitions to approximately 500,000 / 200 = 2,500. More precisely:
| Dataset Size | Target Shuffle Partitions | Per-Partition Target |
| < 10 GB | 50–100 | ~100 MB per partition |
| 10 GB – 100 GB | 200–1,000 | ~128 MB per partition |
| 100 GB – 1 TB | 1,000–8,000 | ~128–256 MB per partition |
| > 1 TB | 8,000–50,000 | ~200 MB per partition |
A complementary rule: set spark.sql.shuffle.partitions to 2–4 times the total number of available CPU cores. This ensures all cores stay busy while preventing the scheduler from becoming a bottleneck. On a 100-core cluster processing a 200 GB shuffle, 200–400 partitions (using 200 MB each) is typically optimal.
Spark 3.0 introduced Adaptive Query Execution (AQE), which coalesces small post-shuffle partitions automatically at runtime. With AQE enabled, it is safer to over-estimate spark.sql.shuffle.partitions — AQE will merge small partitions together before tasks start. However, AQE cannot split oversized partitions for non-join operations, so under-estimation still causes OOM even with AQE enabled.
📊 How Data Flows Through HashPartitioner Versus RangePartitioner
The diagram below traces a single shuffle operation from input data through key extraction, partition assignment, and task execution. The two paths diverge at the partitioner choice, and the critical difference — uniform distribution versus hot-key collapse — is visible in the task duration column at the bottom.
graph TD
A[Input Dataset - 500 GB order events] --> B[Map Phase - each executor reads its partitions]
B --> C[Key Extraction - extract store_id from each row]
C --> D{Partitioner Choice}
D -->|HashPartitioner| E[Compute Math.abs store_id.hashCode mod 200]
D -->|RangePartitioner| F[Reservoir sample 4000 keys - find 199 boundaries]
E --> G[Partition 0 - 2.3 GB uniform stores]
E --> H[Partition 1 - 41 GB hot acquired stores]
E --> I[Partition 2 - 2.1 GB uniform stores]
F --> J[Partition 0 - stores with low revenue range - 2.5 GB]
F --> K[Partition 1 - stores with mid revenue range - 2.5 GB]
F --> L[Partition 2 - stores with high revenue range - 2.5 GB]
G --> M[Task duration: 18 sec]
H --> N[Task duration: 47 min - STRAGGLER]
I --> O[Task duration: 16 sec]
J --> P[Task duration: 21 sec]
K --> Q[Task duration: 23 sec]
L --> R[Task duration: 20 sec]
The hash path creates straggler task N — a single task holding 41 GB of data that extends the stage duration from 18 seconds to 47 minutes, while 49 other executors sit idle waiting for it to finish. The range path produces uniform 2.5 GB partitions because the boundaries were derived from the actual key distribution, not from a mathematical hash function that has no knowledge of frequency. The stage finishes when the slowest task finishes, so the hash path's stage time is 47 minutes regardless of how many additional resources are added.
🌍 repartition vs coalesce, Salting Hot Keys, and Bucketing Pre-Joined Tables
repartition() vs coalesce()
Both operations change partition count, but they behave fundamentally differently:
repartition(N) triggers a full shuffle. Every row is moved across the network to a new partition based on a round-robin or key-based assignment. This is expensive but produces exactly N partitions of uniform size. Use it when you need to increase partition count, change the partitioning key, or guarantee uniform distribution before a join or aggregation.
coalesce(N) uses a narrow dependency — no shuffle. It merges adjacent partitions on the same executor by simply combining their partition metadata. No rows move across the network. This is cheap but only works when reducing partition count, and it produces unequal partition sizes because it cannot rebalance data between executors. Use it when writing output files to reduce the number of small files written to storage, after a filter has already reduced data volume significantly.
The performance difference between repartition(50) and coalesce(50) on a 1 TB dataset that was previously 2,000 partitions is enormous: coalesce takes seconds with no network I/O, while repartition triggers a full 1 TB shuffle. However, coalesce from 2,000 to 50 partitions on the same executors creates 50 partitions of wildly unequal size if data is not uniformly distributed — some coalesced partitions may hold 10x more rows than others.
Salting to Cure Hot Keys
When a few keys dominate the data distribution, the fix is salting: artificially increasing key cardinality by appending a random integer to the hot key before the first shuffle, then removing it after partial aggregation.
The process has two stages. In stage one, a random integer between 0 and K is appended to each key — store_id_1234 becomes store_id_1234_0, store_id_1234_1, ..., store_id_1234_K. This transforms one hot-key bucket into K uniform buckets. Each bucket computes a partial aggregate (partial sum, partial count). In stage two, the salt suffix is stripped and the partial results are combined by the original key. The hot key's work is now distributed across K parallel tasks in stage one, with only a tiny final aggregation in stage two.
The tradeoff: salting doubles the number of shuffle stages and requires the aggregation function to be decomposable (sum → partial sums, count → partial counts). Functions that cannot be split — median, exact distinct count — cannot use salting without approximations.
Bucketing for Pre-Partitioned Joins
Bucketing is a write-time optimization that pre-partitions and pre-sorts data so that Spark can skip the shuffle on future joins and groupBys. When you write a table with bucketBy(numBuckets, "store_id").sortBy("store_id"), Spark writes the output such that all rows with the same store_id hash land in the same set of files, pre-sorted by store_id. When two tables bucketed on the same key and bucket count are joined, Spark recognizes that no shuffle is necessary — the data is already co-partitioned.
The requirement is strict: both tables must use the same bucket count and the same bucketing key for the shuffle elimination to apply. Mismatched bucket counts force Spark to fall back to a full sort-merge join with shuffle. Bucketing is most valuable for fact tables that are joined repeatedly against dimension tables in ETL pipelines — the up-front cost of bucketing on write is paid back on every subsequent join read.
⚖️ HashPartitioner vs RangePartitioner vs Custom: Every Choice Has a Cost
| Strategy | Distribution quality | Sort order preserved | Extra pass required | Hot key resilience | Best fit |
| HashPartitioner | Excellent for uniform keys | No | No | Poor — deterministic collapse | Default joins and aggregations |
| RangePartitioner | Excellent for any key distribution | Yes — within partitions | Yes — sampling pass | Good — boundaries from real data | sortBy, orderBy, sorted writes |
| Custom Partitioner | Depends on implementation | Possible | Depends on logic | Configurable | Geo-sharding, tier routing, time-bucketing |
| Salted Hash | Good — hot keys split across K buckets | No | No (adds a stage) | Excellent | Known hot keys with decomposable aggregation |
| Bucketing | Pre-computed — no runtime cost | Yes | Write-time only | Depends on bucket count | Repeated joins on the same key |
The most expensive mistake in Spark partitioning is the repartition–coalesce confusion: teams routinely call repartition(200) after every transformation to "normalize" partition count, paying a full shuffle each time. The only valid reason to call repartition mid-pipeline is when the current partitioning key is wrong for the next operation. If you only need to reduce partition count before writing, coalesce almost always suffices.
AQE's skew join optimization (spark.sql.adaptive.skewJoin.enabled=true) detects partitions significantly larger than the median and splits them automatically — but only for join operations. It does not apply to groupBy aggregations. For skewed aggregations, salting remains the primary tool.
🧭 Choosing the Right Partitioning Strategy for Your Use Case
Use this table to match the data scenario and workload pattern to the correct partitioning approach. The "Symptom in Spark UI" column is the diagnostic signal that confirms you have reached the right row.
| Scenario | Partitioning strategy | Why | Symptom in Spark UI when wrong strategy is used |
| Uniform key distribution — standard join or aggregation | HashPartitioner (default) | Uniform distribution, no extra pass, lowest overhead | None — performance is baseline |
| Hot keys from known skewed columns (user_id, store_id in retail) | Salt + HashPartitioner | Spreads hot bucket across K parallel tasks in first stage | One or few tasks running 10–100x longer than median |
| Sorted output required — downstream range scans or sorted writes | RangePartitioner via repartitionByRange | Preserves key order within each partition | Sort on write is slow without pre-partitioning by range |
| Repeated joins between two large tables on the same key | Bucketing at write time | Eliminates shuffle on every future join read | Shuffle write size is large for every join execution |
| Reducing partition count before writing to storage | coalesce(N) | No shuffle, merges adjacent partitions locally | Downstream reads slow due to excessive small files |
| Domain-specific routing — geography, business tier, time window | Custom Partitioner | Hash and range cannot express the routing logic | Misrouted rows causing incorrect aggregation results |
🧪 Diagnosing Partition Skew Using the Spark UI Stage Detail View
This section walks through the exact UI navigation and metric interpretation for a live skew diagnosis. The goal is to identify whether a slow stage is caused by too few partitions, partition skew, or a configuration mismatch — each has a distinct signature.
Navigate to the affected stage. Open the Spark UI at http://<driver-host>:4040. Click "Stages" in the top navigation. Sort by "Duration" descending. The slowest stage is almost always the skew culprit. Click the stage to open its task detail view.
Read the Task Duration Distribution. The stage detail view shows a summary statistics table for task metrics: minimum, 25th percentile, median, 75th percentile, and maximum. The metric to focus on is "Duration."
- Uniform skew (all tasks slow): All percentiles are similar — median 4 min, max 5 min. This is the too-few-partitions signature. Every task is overloaded. Fix: increase
spark.sql.shuffle.partitions. - Point skew (one or few tasks outliers): Median is 20 seconds, max is 45 minutes. This is the hot-key signature. The 75th and 25th percentiles are close to each other and to the median — only the maximum is extreme. Fix: salting for aggregations, AQE skew join for joins.
- Stragglers with GC pressure: Median is 2 minutes, max is 8 minutes, GC Time 25th percentile is 15%. Tasks are spending significant time in garbage collection. This is the too-large-partition-causing-memory-pressure signature. Fix: increase both
spark.sql.shuffle.partitionsandspark.executor.memoryFraction.
Read Shuffle Read Size per Task. In the same task summary table, check "Shuffle Read Size / Records." A healthy job shows shuffle read size close to the median across all percentiles. Point skew is confirmed when the maximum shuffle read size is 5–50x the median. For the retail scenario above: median 600 MB, maximum 48 GB — an 80x ratio. This single metric is the most definitive confirmation of a hot-key problem.
Read the SQL tab for the physical plan. Click "SQL" in the top navigation and select the query. The physical plan tree shows whether Catalyst applied HashAggregate (two-phase aggregation with map-side partial aggregation) or ObjectHashAggregate (single-phase, no partial aggregation). If you see ObjectHashAggregate with a UDAF, map-side pre-aggregation is disabled — this amplifies hot-key skew because every row for the hot key must be shuffled before any aggregation happens.
The Spark UI evidence chain: Duration distribution → Shuffle Read Size → SQL Physical Plan → Configuration choice. Skipping any step leads to treating the symptom rather than the cause.
🛠️ Apache Spark Partitioning Configuration Reference
The following configuration properties govern initial partition sizing, shuffle partition count, and AQE-based skew mitigation. Apply them in spark-defaults.conf, as --conf arguments to spark-submit, or in a cluster-level configuration profile.
# ─────────────────────────────────────────────────────────────────
# INPUT PARTITION SIZING
# Controls how large each partition is when reading file-based sources.
# Default: 128m. Increase to 256m for very large datasets to reduce
# total partition count and scheduling overhead.
# ─────────────────────────────────────────────────────────────────
spark.sql.files.maxPartitionBytes=134217728
# ─────────────────────────────────────────────────────────────────
# SHUFFLE PARTITION COUNT
# Number of partitions produced after every wide transformation.
# Default: 200 — almost always too low for production workloads.
# Rule: target 128–256 MB per partition.
# Example: 500 GB shuffle → set to 2000–4000.
# ─────────────────────────────────────────────────────────────────
spark.sql.shuffle.partitions=2000
# ─────────────────────────────────────────────────────────────────
# RDD DEFAULT PARALLELISM
# Controls initial partition count for sc.parallelize() and
# operations on bare RDDs. Defaults to total executor core count.
# ─────────────────────────────────────────────────────────────────
spark.default.parallelism=400
# ─────────────────────────────────────────────────────────────────
# ADAPTIVE QUERY EXECUTION — master switch
# Enabled by default in Spark 3.0+. Enables runtime plan changes
# including partition coalescing and skew join splitting.
# ─────────────────────────────────────────────────────────────────
spark.sql.adaptive.enabled=true
# ─────────────────────────────────────────────────────────────────
# AQE SKEW JOIN OPTIMIZATION
# Automatically detects and splits skewed join partitions at runtime.
# Applies to sort-merge joins only — does not help groupBy skew.
# ─────────────────────────────────────────────────────────────────
spark.sql.adaptive.skewJoin.enabled=true
# ─────────────────────────────────────────────────────────────────
# SKEW DETECTION THRESHOLD
# A partition is considered skewed if its size is greater than
# (skewedPartitionFactor * median partition size) AND greater than
# skewedPartitionThresholdInBytes. Both conditions must be true.
# Defaults: factor=5, threshold=256MB.
# ─────────────────────────────────────────────────────────────────
spark.sql.adaptive.skewJoin.skewedPartitionFactor=5
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes=268435456
# ─────────────────────────────────────────────────────────────────
# AQE COALESCE — small partition merging
# After shuffle, AQE merges adjacent small partitions until each
# merged partition is approximately advisoryPartitionSizeInBytes.
# Default: 64m. Increase to 128m–256m for write-heavy pipelines.
# ─────────────────────────────────────────────────────────────────
spark.sql.adaptive.coalescePartitions.enabled=true
spark.sql.adaptive.advisoryPartitionSizeInBytes=134217728
For a full deep-dive on how Adaptive Query Execution makes these decisions at the operator level, see the companion post on Spark's Catalyst optimizer and DataFrame execution model.
📚 Lessons Learned from Partition Skew Incidents
Lesson 1: Adding executors never fixes skew. The most common incorrect response to a skewed stage is "give it more resources." More executors do not help because the bottleneck is a single task that cannot be parallelized further without changing the partitioning strategy. Adding ten more executors to the retail scenario above would leave them all idle while the one straggler ran for 47 minutes. Diagnose before you scale.
Lesson 2: The default 200 shuffle partitions is wrong for almost every production workload. It was calibrated for demonstration clusters with small datasets. In production, spark.sql.shuffle.partitions should be computed from the actual shuffle size: measure the shuffle write size in a representative run, divide by your target partition size (128–200 MB), and set the property accordingly. AQE's coalescing handles over-estimation gracefully; under-estimation causes spill and OOM.
Lesson 3: repartition mid-pipeline is often the most expensive line in the job. A repartition(200) call inserted to "normalize" partitions before a join triggers a full shuffle of the entire dataset — potentially terabytes of data — for no reason if the data is already partitioned by the join key. Profile the DAG in the SQL tab before adding any repartition call. If the upstream stage already produces the key-partitioned output you need, remove it.
Lesson 4: RangePartitioner's sampling pass is invisible in the logical plan but visible in the physical plan. When you call repartitionByRange() or sortBy(), Spark inserts a sampling job before the main job in the execution queue. If your pipeline suddenly takes longer for no apparent reason after adding a sort, check whether a RangePartitioner sampling job is serializing upstream of your main job. In pipelines with tight latency requirements, this extra pass is sometimes the deciding factor against using range-based partitioning.
Lesson 5: Bucketing is underused because it requires write discipline. Most teams reach for repartition at query time because the data was never bucketed at write time. Establishing bucketing conventions for high-frequency join tables at the data layer — in the data lakehouse or warehouse — eliminates the most expensive shuffles in ETL and reporting pipelines. The investment compounds: one bucketed write eliminates shuffle for every downstream reader for the life of that table partition.
📌 TLDR and Key Takeaways
TLDR: Spark's partition count and partitioning strategy are the two levers that determine whether a job scales linearly or crumbles under data growth. HashPartitioner distributes keys by hash modulo — fast and uniform for well-distributed keys, catastrophic for hot keys. RangePartitioner samples the dataset to find natural boundaries, preserving sort order at the cost of an extra pass. Custom partitioners route by domain logic when neither built-in strategy fits. Diagnosing skew in the Spark UI stage detail view — not guessing at
spark.sql.shuffle.partitions— is the only reliable path to the right fix.
The five decisions that matter most:
- Set
spark.sql.shuffle.partitionsfrom measured shuffle size. Divide expected shuffle write bytes by 150–200 MB. Never leave it at 200 for production data volumes. - Use RangePartitioner only when sort order is required. The extra sampling pass has real cost. For standard aggregations and joins, HashPartitioner is faster.
- Diagnose before tuning. The Spark UI stage detail view distinguishes uniform slowness (too few partitions) from point skew (hot keys) within 60 seconds. These require opposite fixes.
- Salt hot keys; do not add resources. For aggregations on skewed keys, a two-stage salted aggregation distributes what was a serial bottleneck into K parallel tasks with a cheap final merge.
- Bucket high-frequency join tables at write time. Pre-partitioning eliminates shuffle permanently for every future reader.
📝 Practice Quiz
Test your understanding of Spark partitioning internals. Use the <details> blocks to check each answer after attempting it.
- A 600 GB shuffle is running with the default
spark.sql.shuffle.partitions=200. Each task handles 3 GB of shuffle input. What is the most likely symptom visible in the Spark UI, and what is the correct fix?
Answer
Correct Answer: The Spark UI will show "Shuffle Spill (Disk)" on nearly all tasks, and GC Time will be elevated uniformly across the task summary percentiles. Because every task is receiving 3 GB of data but executor shuffle memory (typically 0.6 ×spark.executor.memory × spark.memory.fraction) can only hold a fraction of that, all tasks spill to disk simultaneously. The fix is to increase spark.sql.shuffle.partitions to approximately 600,000 MB / 150 MB = 4,000. With AQE enabled, you can safely set it higher (e.g., 6,000) because small partitions will be coalesced automatically.
- You have a
groupBy("user_id").count()job where user_id has a highly skewed distribution — 5% of users account for 70% of events. The Spark UI shows the 95th percentile task duration is 25 seconds, but the maximum is 48 minutes. AQE skew join optimization is enabled. Why does the skew persist, and what is the correct mitigation?
Answer
Correct Answer: AQE's skew join optimization only applies to join operations — it detects skewed partitions in sort-merge joins and splits them. It does not apply togroupBy aggregations. The skew from hot user IDs lands in a single partition during the groupBy shuffle and cannot be automatically split. The correct mitigation is key salting: append a random integer (0 to K) to user_id before the first aggregation, compute partial counts by the salted key, then strip the salt and sum the partial counts in a second pass. This distributes the hot user's work across K parallel tasks.
- Your team is debating whether to use
repartition(500)orcoalesce(500)before writing a 150 GB result to Parquet. The data currently has 3,000 shuffle partitions. Which operation should you use, and why?
Answer
Correct Answer: Usecoalesce(500). The goal is to reduce the partition count before writing to avoid 3,000 tiny output files. coalesce(N) reduces partition count using a narrow dependency — no shuffle — by combining adjacent partitions on the same executor. This operation is nearly free. repartition(500) would trigger a full shuffle of 150 GB across the network purely to reduce file count, with no benefit to data quality or correctness. The only reason to prefer repartition here would be if the existing 3,000 partitions are extremely unequal in size and you need uniform output file sizes — but for standard post-processing writes, coalesce is correct.
- RangePartitioner's output is described as "non-deterministic across runs." What causes this non-determinism, and in what practical scenario does it matter?
Answer
Correct Answer: RangePartitioner uses reservoir sampling to estimate key distribution. Reservoir sampling is order-dependent: the final sample changes based on the order in which rows are processed across executors, and that order varies with task scheduling, executor availability, and file scan order. Different runs of the same job on the same data can produce different boundary values, and therefore different partition assignments for the same rows. This matters in incremental or partitioned update scenarios: if you recompute a RangePartitioned dataset and try to merge it with a previously RangePartitioned version, the partition boundaries may not align, requiring a full re-read and re-shuffle. For repeatability, save the boundary values externally or use HashPartitioner instead.- A Spark join between a 1 TB fact table and a 500 MB dimension table is showing
SortMergeJoinwithExchange(shuffle) operators on both sides in the physical plan. The defaultspark.sql.autoBroadcastJoinThresholdis 10 MB. What change resolves this, and what is the mechanism by which the shuffle is eliminated?
Answer
Correct Answer: Setspark.sql.autoBroadcastJoinThreshold=600m (or any value above 500 MB). When the threshold exceeds the dimension table's size, Catalyst replaces the SortMergeJoin plan with a BroadcastHashJoin. The mechanism: Spark serializes the entire dimension table on the driver and broadcasts it — sends a copy — to every executor's memory. Each executor then performs the join locally against its own fact table partitions using a hash lookup against the in-memory dimension table. No shuffle of the dimension table occurs at all, and no shuffle of the fact table is required for the join itself (though earlier stages may still shuffle the fact table for other reasons). For a 500 MB table, the broadcast serializes and transfers in under 30 seconds, compared to minutes for a full sort-merge shuffle.
- Open-ended challenge: A data platform team wants to pre-bucket their main event table by
event_type(20 distinct values) for downstream joins. A colleague argues they should usenumBuckets=20to match the cardinality exactly. Another argues fornumBuckets=400. What are the tradeoffs between these two choices, and what additional information would you need to make a definitive recommendation?
🔗 Further Reading in the Apache Spark Engineering Series
These posts cover the surrounding architecture and execution model that partitioning operates within:
- Shuffles in Spark: Why groupBy Kills Performance — how the sort-based shuffle write and read phases work, why
groupBytriggers a full shuffle, and which operations avoid the network transfer entirely - Spark Architecture: Driver, Executors, DAG Scheduler, and Task Scheduler Explained — how the DAGScheduler breaks a job into stages at shuffle boundaries, how the TaskScheduler dispatches tasks to executors respecting data locality, and what kills each component
- Spark DataFrames and SQL: How the Catalyst Optimizer Rewrites Your Queries — how Catalyst's logical and physical planning phases select HashAggregate vs ObjectHashAggregate, when predicate pushdown eliminates partitions before a shuffle, and how AQE modifies the plan at runtime

Written by
Abstract Algorithms
@abstractalgorithms
More Posts
Watermarking and Late Data Handling in Spark Structured Streaming
TLDR: A watermark tells Spark Structured Streaming: "I will accept events up to N minutes late, and then I am done waiting." Spark tracks the maximum event time seen per partition, takes the global minimum across all partitions, subtracts the thresho...
Spark Structured Streaming: Micro-Batch vs Continuous Processing
📖 The 15-Minute Gap: How a Fraud Team Discovered They Needed Real-Time Streaming A fintech team runs payment fraud detection with a well-tuned Spark batch job. Every 15 minutes it reads a day's worth of transaction events from S3, scores them agains...
Stateful Aggregations in Spark Structured Streaming: mapGroupsWithState
TLDR: mapGroupsWithState gives each streaming key its own mutable state object, persisted in a fault-tolerant state store that checkpoints to object storage on every micro-batch. Where window aggregations assume fixed time boundaries, mapGroupsWithSt...
Shuffles in Spark: Why groupBy Kills Performance
TLDR: A Spark shuffle is the most expensive operation in any distributed job — it moves every matching key across the network, writes temporary sorted files to disk, and forces a hard synchronization barrier between every upstream and downstream stag...
