All Posts

Partitioning in Spark: HashPartitioner, RangePartitioner, and Custom Strategies

Partition count and strategy determine whether your Spark job scales linearly or falls apart at 10x data — here is what happens under the hood

Abstract AlgorithmsAbstract Algorithms
··27 min read
Share
AI Share on X / Twitter
AI Share on LinkedIn
Copy link

AI-assisted content. This post may have been written or enhanced with the help of AI tools. While efforts are made to ensure accuracy, the content may contain errors or inaccuracies. Please verify critical information independently.

TLDR: Spark's partition count and partitioning strategy are the two levers that determine whether a job scales linearly or crumbles under data growth. HashPartitioner distributes keys by hash modulo — fast and uniform for well-distributed keys, catastrophic for hot keys. RangePartitioner samples the dataset to find natural boundaries, preserving sort order at the cost of an extra pass. Custom partitioners route by domain logic when neither built-in strategy fits. Diagnosing skew in the Spark UI stage detail view — not guessing at spark.sql.shuffle.partitions — is the only reliable path to the right fix.


📖 The Pipeline That Ran Fine at 200 Partitions and Collapsed When Data Grew 5x

A retail analytics team had a daily aggregation pipeline: read 50 GB of order events from HDFS, group by store_id, sum revenue per store, and write a summary table. With the default 200 shuffle partitions the job ran in 8 minutes. Six months later, after an acquisition, the dataset grew to 250 GB. The team added executors — the job took longer. They raised executor memory — the job took longer. They doubled spark.sql.shuffle.partitions to 400 — the job took longer and then one executor ran for 47 minutes while the other 49 were idle.

The Spark UI told the real story. In the shuffle-reduce stage, 197 out of 200 tasks finished in under 90 seconds each. Three tasks never finished at all — they ran until the job timed out. The shuffle read size on those three tasks was 48 GB, 51 GB, and 49 GB respectively. The remaining 197 tasks averaged 600 MB. A single chain of retail stores acquired in the acquisition contributed 62% of all order volume, and their store_id values mapped deterministically to three partition buckets under the hash function. Every row for those stores landed in those three buckets. No amount of hardware or memory tuning helps when three tasks are doing 80% of the work.

This is the partition skew problem, and it is the most common Spark performance failure in production. It has nothing to do with cluster size and everything to do with how Spark assigns rows to partitions. Understanding HashPartitioner, RangePartitioner, and custom strategies — and knowing which to apply when — is the skill that separates a Spark engineer who debugs jobs from one who prevents failures.


🔍 What a Partition Is and How Spark Decides the Initial Count

A partition is the fundamental unit of parallelism in Spark. It is a contiguous slice of data that one task processes on one executor thread. If you have 200 partitions, Spark creates exactly 200 tasks for that stage, and each task runs independently on whichever executor has a free thread. No communication happens between tasks within a stage — they are entirely parallel. This is what makes Spark fast: the program looks sequential but executes as hundreds of independent units.

Spark determines the initial partition count at read time through three mechanisms:

File-based sources (HDFS, S3, GCS): Spark splits input files into chunks based on spark.sql.files.maxPartitionBytes, which defaults to 128 MB. A 1.2 GB Parquet file with the default setting produces approximately 10 partitions. Spark also respects HDFS block boundaries — it will not split a block across two partitions because that would force a remote read.

In-memory RDD creation: When you call sc.parallelize(data), the initial partition count defaults to sc.defaultParallelism, which equals the total number of CPU cores across all executors. On a 10-node cluster with 4 cores per executor, defaultParallelism is 40. You can override it by passing a second argument: sc.parallelize(data, 500).

After a shuffle: Every wide transformation (groupBy, join, repartition) creates new partitions for the downstream stage. That count is controlled by spark.sql.shuffle.partitions (default: 200 for Spark SQL DataFrames) or by the explicit argument passed to repartition(N). Post-shuffle partitions are entirely artificial — they do not correspond to any physical file boundary.

The critical insight is that read-time partitions and shuffle-time partitions are independently controlled. A job reading 2 TB of Parquet files might start with 16,000 input partitions (128 MB each) but collapse to 200 shuffle partitions after the first aggregation, creating 200 tasks each processing 10 GB — a recipe for OOM failures.


⚙️ How HashPartitioner, RangePartitioner, and Custom Strategies Assign Keys to Partitions

HashPartitioner: The Default Strategy

HashPartitioner is Spark's default for all shuffle operations. The assignment formula is deceptively simple:

partitionId = Math.abs(key.hashCode()) % numPartitions

For every row, Spark extracts the grouping or join key, calls Java's hashCode() method on it, takes the absolute value, and divides by the target partition count. The remainder is the partition assignment. This happens independently on every executor during the shuffle write phase, which means no coordination is needed — each task can compute the destination partition locally.

The assumption embedded in HashPartitioner is uniform hash distribution: that no key (or small group of keys) dominates the input. When keys are naturally distributed — user IDs in the billions, random UUIDs, ISO country codes across 200 countries — HashPartitioner performs excellently. It spreads rows across partitions with near-equal sizes, and all tasks finish in roughly the same time.

Two failure modes break this assumption:

Hot key problem: A small number of keys appear disproportionately often. In the retail scenario above, a few acquired store_id values contributed 62% of rows. Their hashes land on the same partition buckets repeatedly because the hash function is deterministic. No configuration change fixes this — the fix requires changing how keys are assigned before the shuffle.

Null key problem: Java's hashCode() on a null key throws a NullPointerException. Spark handles nulls by routing all null-keyed rows to partition 0. If 40% of your rows have a null join key, partition 0 receives 40% of total data. The Spark UI shows it as a single outlier task with a massive shuffle read. The fix is always explicit: filter nulls before the join, coalesce them into a sentinel value, or handle them in a pre-processing step.

RangePartitioner: Sorted Distribution

RangePartitioner is used by sortBy() and orderBy() operations and is available explicitly via repartitionByRange(). Instead of hashing, it partitions by value range: all rows with keys in the range [A, B) go to partition 0, [B, C) to partition 1, and so on. The goal is equal-sized partitions by actual data volume, not by hash modulo arithmetic.

To find the correct range boundaries, RangePartitioner executes a two-pass algorithm:

Pass 1 (sampling): Spark samples a fraction of the RDD to estimate the key distribution. The sampler uses reservoir sampling, drawing approximately 20 * numPartitions samples from the dataset. This pass requires reading the data once before the actual computation begins.

Pass 2 (boundary calculation and assignment): Using the sample, Spark sorts the sampled keys and divides them into numPartitions equal-sized buckets. The boundary values become the partition cutpoints. During the actual shuffle write, each task uses a binary search against the boundary array to find the correct partition for each key.

RangePartitioner produces sorted output within each partition and near-equal partition sizes when the sample is representative. Its cost is the extra sampling pass — for large datasets this adds measurable latency, which is why it is only invoked on sort operations and not as the default shuffle strategy.

Custom Partitioner: Domain-Specific Routing

Neither HashPartitioner nor RangePartitioner handles every case. Consider a geo-sharding scenario: you want all events from Europe in partitions 0–49, North America in 50–99, and Asia-Pacific in 100–149. Hash or range partitioning cannot express this routing without preprocessing. A custom partitioner can.

In the RDD API, a custom partitioner is a class extending org.apache.spark.Partitioner that implements two methods: numPartitions (returns the total partition count) and getPartition(key) (returns an integer in [0, numPartitions) for the given key). Spark calls getPartition for every row during the shuffle write phase, exactly as it does for HashPartitioner. The implementation can contain arbitrary routing logic — geographic lookup tables, business-tier mappings, or time-bucketing logic.

The DataFrame API does not expose custom partitioners directly. The standard approach is to add a computed partition column to the DataFrame — a column whose value equals the desired partition index — and then call repartition(numPartitions, col("partition_col")). Spark uses the column value as the partition assignment during the shuffle.


🧠 Deep Dive into How Spark Partitioners Work Internally

The Internals of RangePartitioner

The sampling pass in RangePartitioner is more sophisticated than a simple random sample. Spark uses a reservoir sampling algorithm (specifically a variant of Algorithm R) to draw a fixed-size sample from a stream of unknown length without loading the full dataset into memory. Each sampled element is held in a reservoir of size k = 20 * numPartitions. As the stream progresses, each new element has a probability of k / n (where n is the count of elements seen so far) of replacing a randomly chosen element already in the reservoir. The result is a statistically uniform sample regardless of total dataset size.

The boundary calculation takes the sorted reservoir sample and extracts evenly spaced quantiles. For numPartitions = 200, Spark extracts the 0.5th percentile, 1.0th percentile, 1.5th percentile, ... up to the 99.5th percentile of the sample — producing 199 boundary values that divide the space into 200 equal-probability buckets. These boundaries are broadcast to all executors as a small array.

During the shuffle write phase, each task binary-searches this boundary array for each key it processes. Binary search over a sorted array of 199 elements completes in at most 8 comparisons — negligible overhead per row. The partition assignment is the index returned by the binary search.

One subtle consequence: RangePartitioner is non-deterministic across runs. The reservoir sample changes with each execution because the sampling probability depends on the order in which rows are processed, and row processing order in a distributed system is not guaranteed. Two identical datasets with different file orderings or different task scheduling produce different boundary values and therefore different partition assignments. This means RangePartitioner output cannot be reliably cached and reused across separate Spark sessions without explicitly saving and reloading the partition boundaries.

Performance Analysis: Tuning Partition Count for Production

Partition count is a dial with hard failure modes at both extremes:

Too few partitions (under-partitioned): Each task receives more data than the executor's shuffle memory region can hold. Spark spills excess data to local disk, triggering reads and writes against the OS page cache. The symptom in the Spark UI is uniform spill across all tasks simultaneously — every task is spilling, not just one or two. In extreme cases, tasks fail with java.lang.OutOfMemoryError during the sort-merge phase. Doubling spark.sql.shuffle.partitions usually fixes this immediately.

Too many partitions (over-partitioned): Each task processes only a few megabytes of data, but the scheduler must launch, track, and complete thousands of tasks. Task launch overhead on YARN or Kubernetes is approximately 0.5–2 seconds per task. At 100,000 partitions processing 1 KB each, the scheduling overhead dominates the actual computation time. Additionally, writing too many small output files creates fragmentation that degrades downstream read performance — a 1 TB dataset written as 50,000 files of 20 MB each reads 3–5x slower than the same data written as 400 files of 2.5 GB each.

The production rule of thumb: Target 128 MB to 256 MB of input data per partition. For a 500 GB shuffle, set spark.sql.shuffle.partitions to approximately 500,000 / 200 = 2,500. More precisely:

Dataset SizeTarget Shuffle PartitionsPer-Partition Target
< 10 GB50–100~100 MB per partition
10 GB – 100 GB200–1,000~128 MB per partition
100 GB – 1 TB1,000–8,000~128–256 MB per partition
> 1 TB8,000–50,000~200 MB per partition

A complementary rule: set spark.sql.shuffle.partitions to 2–4 times the total number of available CPU cores. This ensures all cores stay busy while preventing the scheduler from becoming a bottleneck. On a 100-core cluster processing a 200 GB shuffle, 200–400 partitions (using 200 MB each) is typically optimal.

Spark 3.0 introduced Adaptive Query Execution (AQE), which coalesces small post-shuffle partitions automatically at runtime. With AQE enabled, it is safer to over-estimate spark.sql.shuffle.partitions — AQE will merge small partitions together before tasks start. However, AQE cannot split oversized partitions for non-join operations, so under-estimation still causes OOM even with AQE enabled.


📊 How Data Flows Through HashPartitioner Versus RangePartitioner

The diagram below traces a single shuffle operation from input data through key extraction, partition assignment, and task execution. The two paths diverge at the partitioner choice, and the critical difference — uniform distribution versus hot-key collapse — is visible in the task duration column at the bottom.

graph TD
    A[Input Dataset - 500 GB order events] --> B[Map Phase - each executor reads its partitions]
    B --> C[Key Extraction - extract store_id from each row]
    C --> D{Partitioner Choice}

    D -->|HashPartitioner| E[Compute Math.abs store_id.hashCode mod 200]
    D -->|RangePartitioner| F[Reservoir sample 4000 keys - find 199 boundaries]

    E --> G[Partition 0 - 2.3 GB uniform stores]
    E --> H[Partition 1 - 41 GB hot acquired stores]
    E --> I[Partition 2 - 2.1 GB uniform stores]

    F --> J[Partition 0 - stores with low revenue range - 2.5 GB]
    F --> K[Partition 1 - stores with mid revenue range - 2.5 GB]
    F --> L[Partition 2 - stores with high revenue range - 2.5 GB]

    G --> M[Task duration: 18 sec]
    H --> N[Task duration: 47 min - STRAGGLER]
    I --> O[Task duration: 16 sec]

    J --> P[Task duration: 21 sec]
    K --> Q[Task duration: 23 sec]
    L --> R[Task duration: 20 sec]

The hash path creates straggler task N — a single task holding 41 GB of data that extends the stage duration from 18 seconds to 47 minutes, while 49 other executors sit idle waiting for it to finish. The range path produces uniform 2.5 GB partitions because the boundaries were derived from the actual key distribution, not from a mathematical hash function that has no knowledge of frequency. The stage finishes when the slowest task finishes, so the hash path's stage time is 47 minutes regardless of how many additional resources are added.


🌍 repartition vs coalesce, Salting Hot Keys, and Bucketing Pre-Joined Tables

repartition() vs coalesce()

Both operations change partition count, but they behave fundamentally differently:

repartition(N) triggers a full shuffle. Every row is moved across the network to a new partition based on a round-robin or key-based assignment. This is expensive but produces exactly N partitions of uniform size. Use it when you need to increase partition count, change the partitioning key, or guarantee uniform distribution before a join or aggregation.

coalesce(N) uses a narrow dependency — no shuffle. It merges adjacent partitions on the same executor by simply combining their partition metadata. No rows move across the network. This is cheap but only works when reducing partition count, and it produces unequal partition sizes because it cannot rebalance data between executors. Use it when writing output files to reduce the number of small files written to storage, after a filter has already reduced data volume significantly.

The performance difference between repartition(50) and coalesce(50) on a 1 TB dataset that was previously 2,000 partitions is enormous: coalesce takes seconds with no network I/O, while repartition triggers a full 1 TB shuffle. However, coalesce from 2,000 to 50 partitions on the same executors creates 50 partitions of wildly unequal size if data is not uniformly distributed — some coalesced partitions may hold 10x more rows than others.

Salting to Cure Hot Keys

When a few keys dominate the data distribution, the fix is salting: artificially increasing key cardinality by appending a random integer to the hot key before the first shuffle, then removing it after partial aggregation.

The process has two stages. In stage one, a random integer between 0 and K is appended to each key — store_id_1234 becomes store_id_1234_0, store_id_1234_1, ..., store_id_1234_K. This transforms one hot-key bucket into K uniform buckets. Each bucket computes a partial aggregate (partial sum, partial count). In stage two, the salt suffix is stripped and the partial results are combined by the original key. The hot key's work is now distributed across K parallel tasks in stage one, with only a tiny final aggregation in stage two.

The tradeoff: salting doubles the number of shuffle stages and requires the aggregation function to be decomposable (sum → partial sums, count → partial counts). Functions that cannot be split — median, exact distinct count — cannot use salting without approximations.

Bucketing for Pre-Partitioned Joins

Bucketing is a write-time optimization that pre-partitions and pre-sorts data so that Spark can skip the shuffle on future joins and groupBys. When you write a table with bucketBy(numBuckets, "store_id").sortBy("store_id"), Spark writes the output such that all rows with the same store_id hash land in the same set of files, pre-sorted by store_id. When two tables bucketed on the same key and bucket count are joined, Spark recognizes that no shuffle is necessary — the data is already co-partitioned.

The requirement is strict: both tables must use the same bucket count and the same bucketing key for the shuffle elimination to apply. Mismatched bucket counts force Spark to fall back to a full sort-merge join with shuffle. Bucketing is most valuable for fact tables that are joined repeatedly against dimension tables in ETL pipelines — the up-front cost of bucketing on write is paid back on every subsequent join read.


⚖️ HashPartitioner vs RangePartitioner vs Custom: Every Choice Has a Cost

StrategyDistribution qualitySort order preservedExtra pass requiredHot key resilienceBest fit
HashPartitionerExcellent for uniform keysNoNoPoor — deterministic collapseDefault joins and aggregations
RangePartitionerExcellent for any key distributionYes — within partitionsYes — sampling passGood — boundaries from real datasortBy, orderBy, sorted writes
Custom PartitionerDepends on implementationPossibleDepends on logicConfigurableGeo-sharding, tier routing, time-bucketing
Salted HashGood — hot keys split across K bucketsNoNo (adds a stage)ExcellentKnown hot keys with decomposable aggregation
BucketingPre-computed — no runtime costYesWrite-time onlyDepends on bucket countRepeated joins on the same key

The most expensive mistake in Spark partitioning is the repartition–coalesce confusion: teams routinely call repartition(200) after every transformation to "normalize" partition count, paying a full shuffle each time. The only valid reason to call repartition mid-pipeline is when the current partitioning key is wrong for the next operation. If you only need to reduce partition count before writing, coalesce almost always suffices.

AQE's skew join optimization (spark.sql.adaptive.skewJoin.enabled=true) detects partitions significantly larger than the median and splits them automatically — but only for join operations. It does not apply to groupBy aggregations. For skewed aggregations, salting remains the primary tool.


🧭 Choosing the Right Partitioning Strategy for Your Use Case

Use this table to match the data scenario and workload pattern to the correct partitioning approach. The "Symptom in Spark UI" column is the diagnostic signal that confirms you have reached the right row.

ScenarioPartitioning strategyWhySymptom in Spark UI when wrong strategy is used
Uniform key distribution — standard join or aggregationHashPartitioner (default)Uniform distribution, no extra pass, lowest overheadNone — performance is baseline
Hot keys from known skewed columns (user_id, store_id in retail)Salt + HashPartitionerSpreads hot bucket across K parallel tasks in first stageOne or few tasks running 10–100x longer than median
Sorted output required — downstream range scans or sorted writesRangePartitioner via repartitionByRangePreserves key order within each partitionSort on write is slow without pre-partitioning by range
Repeated joins between two large tables on the same keyBucketing at write timeEliminates shuffle on every future join readShuffle write size is large for every join execution
Reducing partition count before writing to storagecoalesce(N)No shuffle, merges adjacent partitions locallyDownstream reads slow due to excessive small files
Domain-specific routing — geography, business tier, time windowCustom PartitionerHash and range cannot express the routing logicMisrouted rows causing incorrect aggregation results

🧪 Diagnosing Partition Skew Using the Spark UI Stage Detail View

This section walks through the exact UI navigation and metric interpretation for a live skew diagnosis. The goal is to identify whether a slow stage is caused by too few partitions, partition skew, or a configuration mismatch — each has a distinct signature.

Navigate to the affected stage. Open the Spark UI at http://<driver-host>:4040. Click "Stages" in the top navigation. Sort by "Duration" descending. The slowest stage is almost always the skew culprit. Click the stage to open its task detail view.

Read the Task Duration Distribution. The stage detail view shows a summary statistics table for task metrics: minimum, 25th percentile, median, 75th percentile, and maximum. The metric to focus on is "Duration."

  • Uniform skew (all tasks slow): All percentiles are similar — median 4 min, max 5 min. This is the too-few-partitions signature. Every task is overloaded. Fix: increase spark.sql.shuffle.partitions.
  • Point skew (one or few tasks outliers): Median is 20 seconds, max is 45 minutes. This is the hot-key signature. The 75th and 25th percentiles are close to each other and to the median — only the maximum is extreme. Fix: salting for aggregations, AQE skew join for joins.
  • Stragglers with GC pressure: Median is 2 minutes, max is 8 minutes, GC Time 25th percentile is 15%. Tasks are spending significant time in garbage collection. This is the too-large-partition-causing-memory-pressure signature. Fix: increase both spark.sql.shuffle.partitions and spark.executor.memoryFraction.

Read Shuffle Read Size per Task. In the same task summary table, check "Shuffle Read Size / Records." A healthy job shows shuffle read size close to the median across all percentiles. Point skew is confirmed when the maximum shuffle read size is 5–50x the median. For the retail scenario above: median 600 MB, maximum 48 GB — an 80x ratio. This single metric is the most definitive confirmation of a hot-key problem.

Read the SQL tab for the physical plan. Click "SQL" in the top navigation and select the query. The physical plan tree shows whether Catalyst applied HashAggregate (two-phase aggregation with map-side partial aggregation) or ObjectHashAggregate (single-phase, no partial aggregation). If you see ObjectHashAggregate with a UDAF, map-side pre-aggregation is disabled — this amplifies hot-key skew because every row for the hot key must be shuffled before any aggregation happens.

The Spark UI evidence chain: Duration distribution → Shuffle Read Size → SQL Physical Plan → Configuration choice. Skipping any step leads to treating the symptom rather than the cause.


🛠️ Apache Spark Partitioning Configuration Reference

The following configuration properties govern initial partition sizing, shuffle partition count, and AQE-based skew mitigation. Apply them in spark-defaults.conf, as --conf arguments to spark-submit, or in a cluster-level configuration profile.

# ─────────────────────────────────────────────────────────────────
# INPUT PARTITION SIZING
# Controls how large each partition is when reading file-based sources.
# Default: 128m. Increase to 256m for very large datasets to reduce
# total partition count and scheduling overhead.
# ─────────────────────────────────────────────────────────────────
spark.sql.files.maxPartitionBytes=134217728

# ─────────────────────────────────────────────────────────────────
# SHUFFLE PARTITION COUNT
# Number of partitions produced after every wide transformation.
# Default: 200 — almost always too low for production workloads.
# Rule: target 128–256 MB per partition.
# Example: 500 GB shuffle → set to 2000–4000.
# ─────────────────────────────────────────────────────────────────
spark.sql.shuffle.partitions=2000

# ─────────────────────────────────────────────────────────────────
# RDD DEFAULT PARALLELISM
# Controls initial partition count for sc.parallelize() and
# operations on bare RDDs. Defaults to total executor core count.
# ─────────────────────────────────────────────────────────────────
spark.default.parallelism=400

# ─────────────────────────────────────────────────────────────────
# ADAPTIVE QUERY EXECUTION — master switch
# Enabled by default in Spark 3.0+. Enables runtime plan changes
# including partition coalescing and skew join splitting.
# ─────────────────────────────────────────────────────────────────
spark.sql.adaptive.enabled=true

# ─────────────────────────────────────────────────────────────────
# AQE SKEW JOIN OPTIMIZATION
# Automatically detects and splits skewed join partitions at runtime.
# Applies to sort-merge joins only — does not help groupBy skew.
# ─────────────────────────────────────────────────────────────────
spark.sql.adaptive.skewJoin.enabled=true

# ─────────────────────────────────────────────────────────────────
# SKEW DETECTION THRESHOLD
# A partition is considered skewed if its size is greater than
# (skewedPartitionFactor * median partition size) AND greater than
# skewedPartitionThresholdInBytes. Both conditions must be true.
# Defaults: factor=5, threshold=256MB.
# ─────────────────────────────────────────────────────────────────
spark.sql.adaptive.skewJoin.skewedPartitionFactor=5
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes=268435456

# ─────────────────────────────────────────────────────────────────
# AQE COALESCE — small partition merging
# After shuffle, AQE merges adjacent small partitions until each
# merged partition is approximately advisoryPartitionSizeInBytes.
# Default: 64m. Increase to 128m–256m for write-heavy pipelines.
# ─────────────────────────────────────────────────────────────────
spark.sql.adaptive.coalescePartitions.enabled=true
spark.sql.adaptive.advisoryPartitionSizeInBytes=134217728

For a full deep-dive on how Adaptive Query Execution makes these decisions at the operator level, see the companion post on Spark's Catalyst optimizer and DataFrame execution model.


📚 Lessons Learned from Partition Skew Incidents

Lesson 1: Adding executors never fixes skew. The most common incorrect response to a skewed stage is "give it more resources." More executors do not help because the bottleneck is a single task that cannot be parallelized further without changing the partitioning strategy. Adding ten more executors to the retail scenario above would leave them all idle while the one straggler ran for 47 minutes. Diagnose before you scale.

Lesson 2: The default 200 shuffle partitions is wrong for almost every production workload. It was calibrated for demonstration clusters with small datasets. In production, spark.sql.shuffle.partitions should be computed from the actual shuffle size: measure the shuffle write size in a representative run, divide by your target partition size (128–200 MB), and set the property accordingly. AQE's coalescing handles over-estimation gracefully; under-estimation causes spill and OOM.

Lesson 3: repartition mid-pipeline is often the most expensive line in the job. A repartition(200) call inserted to "normalize" partitions before a join triggers a full shuffle of the entire dataset — potentially terabytes of data — for no reason if the data is already partitioned by the join key. Profile the DAG in the SQL tab before adding any repartition call. If the upstream stage already produces the key-partitioned output you need, remove it.

Lesson 4: RangePartitioner's sampling pass is invisible in the logical plan but visible in the physical plan. When you call repartitionByRange() or sortBy(), Spark inserts a sampling job before the main job in the execution queue. If your pipeline suddenly takes longer for no apparent reason after adding a sort, check whether a RangePartitioner sampling job is serializing upstream of your main job. In pipelines with tight latency requirements, this extra pass is sometimes the deciding factor against using range-based partitioning.

Lesson 5: Bucketing is underused because it requires write discipline. Most teams reach for repartition at query time because the data was never bucketed at write time. Establishing bucketing conventions for high-frequency join tables at the data layer — in the data lakehouse or warehouse — eliminates the most expensive shuffles in ETL and reporting pipelines. The investment compounds: one bucketed write eliminates shuffle for every downstream reader for the life of that table partition.


📌 TLDR and Key Takeaways

TLDR: Spark's partition count and partitioning strategy are the two levers that determine whether a job scales linearly or crumbles under data growth. HashPartitioner distributes keys by hash modulo — fast and uniform for well-distributed keys, catastrophic for hot keys. RangePartitioner samples the dataset to find natural boundaries, preserving sort order at the cost of an extra pass. Custom partitioners route by domain logic when neither built-in strategy fits. Diagnosing skew in the Spark UI stage detail view — not guessing at spark.sql.shuffle.partitions — is the only reliable path to the right fix.

The five decisions that matter most:

  1. Set spark.sql.shuffle.partitions from measured shuffle size. Divide expected shuffle write bytes by 150–200 MB. Never leave it at 200 for production data volumes.
  2. Use RangePartitioner only when sort order is required. The extra sampling pass has real cost. For standard aggregations and joins, HashPartitioner is faster.
  3. Diagnose before tuning. The Spark UI stage detail view distinguishes uniform slowness (too few partitions) from point skew (hot keys) within 60 seconds. These require opposite fixes.
  4. Salt hot keys; do not add resources. For aggregations on skewed keys, a two-stage salted aggregation distributes what was a serial bottleneck into K parallel tasks with a cheap final merge.
  5. Bucket high-frequency join tables at write time. Pre-partitioning eliminates shuffle permanently for every future reader.

📝 Practice Quiz

Test your understanding of Spark partitioning internals. Use the <details> blocks to check each answer after attempting it.

  1. A 600 GB shuffle is running with the default spark.sql.shuffle.partitions=200. Each task handles 3 GB of shuffle input. What is the most likely symptom visible in the Spark UI, and what is the correct fix?
Answer Correct Answer: The Spark UI will show "Shuffle Spill (Disk)" on nearly all tasks, and GC Time will be elevated uniformly across the task summary percentiles. Because every task is receiving 3 GB of data but executor shuffle memory (typically 0.6 × spark.executor.memory × spark.memory.fraction) can only hold a fraction of that, all tasks spill to disk simultaneously. The fix is to increase spark.sql.shuffle.partitions to approximately 600,000 MB / 150 MB = 4,000. With AQE enabled, you can safely set it higher (e.g., 6,000) because small partitions will be coalesced automatically.
  1. You have a groupBy("user_id").count() job where user_id has a highly skewed distribution — 5% of users account for 70% of events. The Spark UI shows the 95th percentile task duration is 25 seconds, but the maximum is 48 minutes. AQE skew join optimization is enabled. Why does the skew persist, and what is the correct mitigation?
Answer Correct Answer: AQE's skew join optimization only applies to join operations — it detects skewed partitions in sort-merge joins and splits them. It does not apply to groupBy aggregations. The skew from hot user IDs lands in a single partition during the groupBy shuffle and cannot be automatically split. The correct mitigation is key salting: append a random integer (0 to K) to user_id before the first aggregation, compute partial counts by the salted key, then strip the salt and sum the partial counts in a second pass. This distributes the hot user's work across K parallel tasks.
  1. Your team is debating whether to use repartition(500) or coalesce(500) before writing a 150 GB result to Parquet. The data currently has 3,000 shuffle partitions. Which operation should you use, and why?
Answer Correct Answer: Use coalesce(500). The goal is to reduce the partition count before writing to avoid 3,000 tiny output files. coalesce(N) reduces partition count using a narrow dependency — no shuffle — by combining adjacent partitions on the same executor. This operation is nearly free. repartition(500) would trigger a full shuffle of 150 GB across the network purely to reduce file count, with no benefit to data quality or correctness. The only reason to prefer repartition here would be if the existing 3,000 partitions are extremely unequal in size and you need uniform output file sizes — but for standard post-processing writes, coalesce is correct.
  1. RangePartitioner's output is described as "non-deterministic across runs." What causes this non-determinism, and in what practical scenario does it matter?
Answer Correct Answer: RangePartitioner uses reservoir sampling to estimate key distribution. Reservoir sampling is order-dependent: the final sample changes based on the order in which rows are processed across executors, and that order varies with task scheduling, executor availability, and file scan order. Different runs of the same job on the same data can produce different boundary values, and therefore different partition assignments for the same rows. This matters in incremental or partitioned update scenarios: if you recompute a RangePartitioned dataset and try to merge it with a previously RangePartitioned version, the partition boundaries may not align, requiring a full re-read and re-shuffle. For repeatability, save the boundary values externally or use HashPartitioner instead.
  1. A Spark join between a 1 TB fact table and a 500 MB dimension table is showing SortMergeJoin with Exchange (shuffle) operators on both sides in the physical plan. The default spark.sql.autoBroadcastJoinThreshold is 10 MB. What change resolves this, and what is the mechanism by which the shuffle is eliminated?
Answer Correct Answer: Set spark.sql.autoBroadcastJoinThreshold=600m (or any value above 500 MB). When the threshold exceeds the dimension table's size, Catalyst replaces the SortMergeJoin plan with a BroadcastHashJoin. The mechanism: Spark serializes the entire dimension table on the driver and broadcasts it — sends a copy — to every executor's memory. Each executor then performs the join locally against its own fact table partitions using a hash lookup against the in-memory dimension table. No shuffle of the dimension table occurs at all, and no shuffle of the fact table is required for the join itself (though earlier stages may still shuffle the fact table for other reasons). For a 500 MB table, the broadcast serializes and transfers in under 30 seconds, compared to minutes for a full sort-merge shuffle.
  1. Open-ended challenge: A data platform team wants to pre-bucket their main event table by event_type (20 distinct values) for downstream joins. A colleague argues they should use numBuckets=20 to match the cardinality exactly. Another argues for numBuckets=400. What are the tradeoffs between these two choices, and what additional information would you need to make a definitive recommendation?

🔗 Further Reading in the Apache Spark Engineering Series

These posts cover the surrounding architecture and execution model that partitioning operates within:

Abstract Algorithms

Written by

Abstract Algorithms

@abstractalgorithms