All Posts

Caching and Persistence in Spark: Storage Levels and When to Use Them

Caching in Spark is lazy and partition-level — understanding how StorageLevel and eviction work is the difference between a speedup and an OOM

Abstract AlgorithmsAbstract Algorithms
··25 min read
Share
AI Share on X / Twitter
AI Share on LinkedIn
Copy link

AI-assisted content. This post may have been written or enhanced with the help of AI tools. While efforts are made to ensure accuracy, the content may contain errors or inaccuracies. Please verify critical information independently.

TLDR: Calling cache() or persist() does not immediately store anything — Spark caches lazily at the first action, partition by partition, managed by a per-executor BlockManager. When memory fills up, LRU eviction silently drops or spills partitions. Choosing the right StorageLevel and knowing when NOT to cache is what separates a genuine 5× speedup from an OOM-driven cluster reboot.


📖 The 10× Speedup That Crashed the Cluster

A senior data engineer is tasked with speeding up a multi-stage Spark pipeline that scores a hundred million user records through a feature transformation graph. The pipeline reads the same large lookup DataFrame in four separate stages, each time triggering a full re-scan and re-join of 80 GB of Parquet files. The engineer has heard caching is the answer for repeated access patterns, so they add a single line before the first use:

lookupDf.cache()

The first run completes in four minutes — legitimately fast. The team cheers. They scale the input to three hundred million records, resubmit, and watch the Spark UI. Thirty minutes in, executor after executor starts dying:

ExecutorLostFailure (executor 7 exited caused by one of the running tasks)
  Reason: Container killed by YARN for exceeding memory limits.
  16.5 GB of 16 GB physical memory used.

The job retries, recomputes from scratch, re-caches, fills memory again, crashes again. The cluster spends the next two hours in a thrashing retry loop. The final runtime is four times longer than the uncached baseline.

What went wrong? The engineer cached the right DataFrame — the one that was scanned four times. But they cached it at the wrong storage level on a cluster where the executor heap was already under pressure from large shuffle buffers. Spark's LRU eviction policy quietly spilled and re-fetched partitions during every shuffle, turning the cache into expensive disk I/O. The engineer didn't realise that caching competes directly with shuffle memory inside the same unified memory pool, and on this workload, the shuffle always won.

Caching is one of the most misused Spark optimisations precisely because it feels mechanical — add one line and everything gets faster. The reality is subtler. This post explains what actually happens when you call cache(), how Spark's BlockManager stores and evicts data, which storage level suits which workload, and how to read the Spark UI to confirm whether caching is helping or hurting.


🔍 cache(), persist(), and the Six Storage Levels

cache() and persist() are not the same call

cache() is a shorthand for persist() with a default storage level. The default differs between the RDD API and the DataFrame API:

APIcache() default StorageLevel
RDDMEMORY_ONLY
DataFrame / DatasetMEMORY_AND_DISK

This distinction matters. Engineers who learned Spark on RDDs and assume cache() means "store in memory only" will be surprised to find their DataFrames quietly spilling to disk. persist(StorageLevel.MEMORY_ONLY) is the explicit way to get in-memory-only caching on a DataFrame.

persist() accepts a StorageLevel argument, giving you precise control. unpersist() (discussed at the end of this section) removes the cached blocks when you are done.

The six storage levels and what they actually store

Spark defines six primary storage levels. Each is a combination of three binary flags: useDisk, useMemory, and deserialized (plus a replication factor).

Storage LevelMemoryDiskSerializedReplicationWhen to use
MEMORY_ONLYYesNoNo1Small DataFrames that fit comfortably in executor heap
MEMORY_AND_DISKYesYesNo1Default for DataFrames; tolerates memory pressure
MEMORY_ONLY_SERYesNoYes1Heap-constrained clusters; use Kryo serializer
MEMORY_AND_DISK_SERYesYesYes1Large DataFrames on memory-constrained clusters
DISK_ONLYNoYesYes1Fault-tolerant but slow; rarely the right choice
OFF_HEAPYes (off-heap)NoYes1Reduces GC pressure; requires Alluxio or off-heap config

Levels with a _2 suffix (e.g., MEMORY_AND_DISK_2) replicate each partition to two executor nodes, providing fault tolerance at the cost of double the storage.

Caching is lazy — it fires at the first action

This is the most important behavioural fact about Spark caching. Calling cache() or persist() marks the DataFrame for caching but does not immediately store any data. The cache is populated partition by partition the first time an action (count(), show(), write(), etc.) forces computation. If you call cache() and then immediately call unpersist() before any action, nothing is ever stored.

The consequence: the first action after cache() is just as slow as an uncached execution — you pay the full read and compute cost. Speed improvements only appear on the second and subsequent actions.

unpersist() is mandatory in long-running applications

Spark does not automatically free cached blocks when a DataFrame variable goes out of scope. In a long-running Spark application (a Structured Streaming job, a scheduled batch loop, or a notebook that stays open), every cache() call without a corresponding unpersist() accumulates blocks in the BlockManager until memory exhaustion causes eviction cascades or out-of-memory failures. Always pair caching with explicit cleanup:

  • df.unpersist() — asynchronous; releases blocks in the background
  • df.unpersist(blocking=True) (PySpark) — waits until all blocks are freed before returning

⚙️ Inside BlockManager: How Spark Physically Stores Cached Data

Caching in Spark is implemented through the BlockManager, a storage subsystem running inside every executor JVM. Understanding its architecture explains why caching behaviour can feel unpredictable without this knowledge.

The BlockManager topology

Every running Spark application has one BlockManager per executor, plus a coordinating BlockManagerMaster running inside the Driver JVM. The master does not store data itself — it maintains a registry of which executor holds which blocks and routes lookup requests.

The diagram below shows how the three components interact. When an executor needs a partition that belongs to another executor, it first queries the BlockManagerMaster for the location, then fetches the block directly over the network using NettyBlockTransferService.

graph TD
    Driver[Driver JVM]
    BMM[BlockManagerMaster]
    BM1[Executor 1 BlockManager]
    BM2[Executor 2 BlockManager]
    MS1[MemoryStore - Executor 1]
    DS1[DiskStore - Executor 1]
    MS2[MemoryStore - Executor 2]
    DS2[DiskStore - Executor 2]
    NBTS[NettyBlockTransferService]

    Driver --> BMM
    BMM -->|block registry| BM1
    BMM -->|block registry| BM2
    BM1 --> MS1
    BM1 --> DS1
    BM2 --> MS2
    BM2 --> DS2
    BM1 -->|direct block fetch| NBTS
    BM2 -->|direct block fetch| NBTS

Each executor's BlockManager has two storage backends: MemoryStore for in-memory blocks and DiskStore for spilled or DISK_ONLY blocks. When a partition is cached as MEMORY_AND_DISK and memory fills up, the MemoryStore serialises and writes the evicted partition to the DiskStore — the executor's local temp directory, controlled by spark.local.dir.

How blocks are identified and tracked

Every cached partition is stored as a named block. RDD and DataFrame partitions use a RDDBlockId (e.g., rdd_42_3 for partition 3 of RDD 42). When a task writes a cached partition, the executor's BlockManager registers the block with the BlockManagerMaster, which updates its global registry. When a later task on a different executor needs that partition, it asks the master for its location and fetches it via the NettyBlockTransferService.

If an executor dies and its blocks are not replicated, the BlockManagerMaster removes those block registrations. Subsequent tasks that need the lost partitions will find no entry in the registry and recompute them from the original source — which is correct but can be expensive if the source is a complex multi-stage computation.

LRU eviction: what actually gets dropped

When the MemoryStore cannot allocate space for a new block, it evicts the least recently used blocks first. What happens to an evicted block depends on the storage level of the evicted partition:

  • MEMORY_ONLY: the block is simply dropped. It will be recomputed from scratch the next time it is needed.
  • MEMORY_AND_DISK: the block is serialised and written to the DiskStore before being removed from memory.
  • DISK_ONLY: blocks were never in memory; eviction does not apply.

This asymmetry is critical. Under MEMORY_ONLY, memory pressure silently causes expensive recomputation — and nothing in the Spark UI warns you that this is happening unless you inspect the storage tab's "fraction cached" metric carefully.


🧠 Deep Dive: Caching Internals and Memory Competition

The Internals of BlockManager

The BlockManagerMaster uses an RPC endpoint (BlockManagerMasterEndpoint) to receive registration and update messages from executor BlockManagers. Every time an executor caches, evicts, or removes a block, it sends an asynchronous message to the master. The master's registry is the single source of truth for block locations across the entire application.

Block replication for fault tolerance is handled at write time. When a storage level includes replication (e.g., MEMORY_AND_DISK_2), the writing executor's BlockManager picks a peer executor at random, transfers a copy of the block over NettyBlockTransferService, and only after the peer confirms receipt does it report the block as cached to the master. Replicated blocks survive a single executor failure without triggering recomputation.

Recomputation vs. retrieval is a conscious trade-off in Spark's design. When a partition is lost (executor failure, eviction under MEMORY_ONLY), Spark does not attempt a recovery protocol — it simply re-executes the lineage DAG for that partition from its last reliable source (a shuffle read boundary, a checkpoint, or the original data source). This means that if a cached DataFrame has a long and expensive lineage (many transformations, expensive UDFs, large reads), and that lineage executes multiple times due to eviction, the overall cost of caching can exceed the cost of not caching at all.

Checkpointing breaks this recomputation risk by materialising the lineage to a reliable, external storage (HDFS, S3). Unlike caching, a checkpoint write is synchronous and permanent — it survives both executor failure and application restart. This makes checkpointing the right tool for long-lineage DataFrames in iterative algorithms where recomputation would be prohibitively expensive.

Performance Analysis: When Caching Competes with Shuffle Memory

The most underappreciated source of caching failure is memory competition inside the unified memory model, introduced in Spark 1.6.

Before the unified model, storage memory (for caching) and execution memory (for shuffles, sorts, and aggregations) were allocated as fixed fractions of the executor heap. Under heavy shuffle workloads, execution memory would fill up even as storage memory sat half-empty — and neither region could borrow from the other.

The unified model solves this rigidity by sharing a single pool between storage and execution:

Executor Heap
└── Reserved Memory (300 MB fixed)
└── Usable Memory = (Heap − 300 MB) × spark.memory.fraction  [default: 0.6]
    └── Initial Storage Region = Usable Memory × spark.memory.storageFraction  [default: 0.5]
    └── Remaining = Initial Execution Region

The critical behaviour: execution memory can evict storage blocks when it needs space. Storage memory can reclaim execution memory only if execution is not actively using it. In practice, during a heavy shuffle or sort, execution aggressively expands at the expense of storage. Your cached partitions get evicted to disk or dropped — and they may be evicted repeatedly, forcing the cluster to re-read source data over and over.

This is exactly what happened in the opening scenario. The lookup DataFrame was cached, but the four join stages each triggered large shuffles. The shuffle's execution memory claims overwhelmed the storage region, evicting cached partitions mid-job. Each eviction triggered a re-scan of the original 80 GB Parquet files, which then competed with the next shuffle cycle for the newly freed memory. The result was pathological thrashing.

The levers to tune this competition are:

  • spark.memory.fraction (default 0.6): raise to give more heap to the unified pool; lower to protect heap for user objects and overhead
  • spark.memory.storageFraction (default 0.5): within the unified pool, the fraction that storage can "defend" against execution eviction — blocks in this region are only evicted if absolutely necessary, not speculatively

Raising spark.memory.storageFraction to 0.6 or 0.7 biases the pool toward storage, which helps when caching is the dominant operation (iterative ML, repeated lookup joins) and shuffles are moderate. Lowering it to 0.3 or 0.4 biases toward execution, which helps shuffle-heavy aggregation pipelines where caching is incidental.


📊 Cache Hit vs. Cache Miss: The Full Execution Path

The two paths a Spark task can take when it needs a cached partition are quite different in cost. Understanding the difference helps you reason about whether a given cache miss is acceptable or a signal of a configuration problem.

The diagram below traces both paths from when an action is triggered to when the task returns data. The key branching point is the BlockManager lookup — a cache hit skips all source I/O and lineage recomputation, while a cache miss recomputes the full lineage and then stores the result for future tasks.

graph TD
    A[Action triggered on DataFrame]
    B[DAGScheduler submits task to Executor]
    C{Check BlockManager registry}
    D[BlockManager reports block location]
    E{Block in MemoryStore?}
    F[Return deserialized partition from memory]
    G{Block in DiskStore?}
    H[Deserialize partition from disk]
    I[CACHE MISS: block not found]
    J[Recompute partition from lineage DAG]
    K[Read from source - file scan or shuffle read]
    L[Apply transformations]
    M{StorageLevel includes memory or disk?}
    N[Store block in MemoryStore or DiskStore]
    O[Return partition to task]

    A --> B
    B --> C
    C -->|found| D
    D --> E
    E -->|yes| F
    F --> O
    E -->|no| G
    G -->|yes| H
    H --> O
    G -->|no| I
    I --> J
    J --> K
    K --> L
    L --> M
    M -->|yes| N
    N --> O
    M -->|no| O

Notice that a DiskStore retrieval (G → H → O) is substantially cheaper than a full cache miss (I → J → K → L → N → O), but still involves disk I/O and deserialization. If your Spark UI shows that a large fraction of partitions live on disk rather than in memory, the effective speedup from caching may be far below expectations — especially for DataFrames with fast source reads (e.g., co-located Parquet on HDFS).


🌍 Where Caching Actually Pays Off in Production

Caching delivers genuine value in four specific production patterns. Outside these patterns, the costs often exceed the benefits.

Iterative machine learning algorithms

Algorithms like gradient descent, k-means clustering, and alternating least squares (ALS) for collaborative filtering make dozens to hundreds of passes over the same training dataset. Without caching, every iteration reloads the data from object storage — adding seconds or minutes per pass to a training loop that runs hundreds of passes. Caching the training DataFrame in MEMORY_ONLY or MEMORY_AND_DISK reduces each subsequent iteration to an in-memory scan, often cutting total training time by 5× to 10×. MLlib's iterative algorithms do this automatically — they call persist() on the input dataset internally before beginning the iteration loop.

Small-to-medium lookup tables used in broadcast joins

When a lookup table (country codes, product categories, user tier mappings) is joined to a large fact table, Spark broadcasts the small side to every executor. If that join appears in multiple downstream stages — a common pattern in multi-step enrichment pipelines — the broadcast DataFrame is reconstructed from source each time unless explicitly cached. Caching the broadcast side in MEMORY_ONLY ensures it survives across stages without repeated network transfers from the driver.

Intermediate results in multi-step transformation pipelines

Complex pipelines that compute an intermediate DataFrame (e.g., a cleaned and normalised feature matrix) and then branch it into several downstream transformations benefit from caching the branch point. Without caching, each branch independently recomputes the shared intermediate from source, wasting both compute and I/O. This is the "diamond DAG" pattern: one input feeds two or more outputs, and caching the shared node eliminates redundant work.

Long-lineage DataFrames and checkpoint() as an alternative

When a DataFrame is the product of many chained transformations — twenty-plus stages of filtering, joining, and aggregating — its lineage DAG becomes very deep. If any partition of this DataFrame needs to be recomputed (due to executor failure or cache eviction), Spark must replay the full chain from the original source. Beyond a certain lineage depth, the recomputation cost exceeds the cost of a fresh read, and the risk of task failures cascading up the lineage becomes real.

checkpoint() solves this by materialising the lineage to HDFS or S3 and truncating the DAG at that point. Unlike cache(), a checkpoint is permanent and survives executor restarts. The trade-off is a synchronous, blocking write to external storage — checkpoint writes are slower than cache writes, but the resulting fault tolerance is stronger. Use checkpointing for iterative algorithms after every N iterations and for any DataFrame whose lineage depth makes recomputation expensive.


⚖️ Storage Level Trade-offs and When NOT to Cache

Deserialized vs. serialized: speed vs. memory footprint

Deserialized storage (MEMORY_ONLY, MEMORY_AND_DISK) keeps data as Java objects directly in the JVM heap. Reads are fast — no deserialization overhead — but the memory footprint of Java objects is 2× to 5× larger than the raw data size. A 10 GB DataFrame can consume 25–50 GB of heap as deserialized Java objects.

Serialized storage (MEMORY_ONLY_SER, MEMORY_AND_DISK_SER) stores data as compact byte arrays. With the Kryo serializer (spark.serializer = org.apache.spark.serializer.KryoSerializer), the footprint is close to the raw on-disk size — often 3× to 5× smaller than deserialized storage. The cost is CPU time for serialization at write and deserialization at read. On CPU-abundant clusters with GC pressure, serialized storage is almost always the right choice.

When NOT to cache

Not every DataFrame benefits from caching. Three scenarios where caching hurts more than it helps:

Single-use DataFrames. If a DataFrame is used in exactly one action and then discarded, caching provides no benefit — you pay the cost of writing to the BlockManager with no subsequent reads to amortise it.

DataFrames that are faster to re-scan than to deserialize. If the source data lives on co-located HDFS DataNodes and the read involves no complex transformations, a sequential disk scan can be faster than deserializing a large cached block from a remote executor's DiskStore. This is common with DISK_ONLY cached DataFrames on clusters with fast local SSDs attached to the data nodes.

DataFrames that are larger than available executor memory with no disk fallback. Under MEMORY_ONLY, a DataFrame that cannot fully fit in memory will have its excess partitions silently dropped, providing no caching benefit while consuming heap space for the partitions that do fit. Use MEMORY_AND_DISK for any DataFrame whose size is uncertain relative to available executor memory.


🧭 Choosing a Storage Level for Your Use Case

Use this reference table to pick the right storage level before reaching for cache() by default:

Use CaseRecommended Storage LevelReasoning
Iterative ML training (gradient descent, k-means)MEMORY_ONLY or MEMORY_AND_DISK_SER with KryoMultiple passes demand fast reads; serialized saves heap on large datasets
Large DataFrame with uncertain memory fitMEMORY_AND_DISKPrevents silent partition drops under memory pressure
Shared intermediate on a heap-constrained clusterMEMORY_AND_DISK_SERMinimises memory footprint while preserving disk fallback
Fault-tolerant caching on a multi-tenant clusterMEMORY_AND_DISK_2Replication survives a single executor failure without recomputation
Long-lineage DataFrame in an iterative algorithmcheckpoint()Truncates DAG, survives restarts, eliminates cascading recomputation
Small broadcast lookup tableMEMORY_ONLYSmall enough to fit; deserialised speed matters for repeated lookup reads
Cost-sensitive write-once pipeline checkpointDISK_ONLYReliable but slow; use only when heap is too small for any in-memory option

A useful rule of thumb: start with MEMORY_AND_DISK_SER plus Kryo as the default for any new caching decision. It is almost universally more memory-efficient than deserialized storage, and the disk fallback prevents the silent-drop failure mode of MEMORY_ONLY. Upgrade to MEMORY_ONLY only after profiling confirms that deserialization CPU cost is a bottleneck, which is rare.


🧪 Reading the Spark UI Storage Tab to Catch Over-Caching

The Spark UI's Storage tab is the primary diagnostic tool for caching decisions. It is often overlooked — many engineers check the Jobs and Stages tabs but ignore Storage entirely. That oversight allows over-caching, eviction thrashing, and memory fragmentation to go undetected.

The Storage tab shows one row per cached RDD or DataFrame. Each row reports:

  • RDD Name / Storage Level — the DataFrame name (if set via setName()) and the active storage level
  • Cached Partitions — the number of partitions currently stored
  • Fraction CachedCached Partitions / Total Partitions. A value below 1.0 means some partitions were evicted or never written.
  • Size in Memory / Size on Disk — actual bytes consumed in MemoryStore and DiskStore, respectively

When diagnosing a caching problem, look for these signals:

Fraction Cached below 0.5: More than half the partitions have been evicted. Tasks that need evicted partitions are recomputing from source, and you are paying both the cache write cost and the recomputation cost. The most common fix is switching from MEMORY_ONLY to MEMORY_AND_DISK, or increasing spark.memory.storageFraction.

Large Size on Disk with small Size in Memory: The DataFrame is overwhelmingly on disk, not in memory. If the source data is fast to read (co-located HDFS, local SSD), the disk-cached copy may offer little benefit over a direct re-read. Consider whether caching is actually helping at all, or whether removing the cache() call would simplify the job without impacting performance.

Multiple DataFrames cached simultaneously with high total memory usage: Over-caching is common in notebooks and multi-stage pipelines where cache() calls accumulate without corresponding unpersist() calls. If the Storage tab shows five or more cached DataFrames competing for memory, review each one for necessity. A DataFrame that is used in only two stages separated by a short computation does not need to be cached — the recomputation cost is lower than the memory pressure it creates.

Inspecting block distribution with the Storage detail page: Clicking a row in the Storage tab opens a per-executor breakdown showing which executor holds which partitions. Skewed block distribution — many partitions on one executor, few on others — can indicate data skew in the upstream computation. Skewed caching leads to uneven memory pressure across the cluster.


🛠️ Apache Spark Caching Configuration Reference

The following settings control the behaviour described in this post. They belong in spark-defaults.conf, a job's SparkConf, or cluster-level configuration in EMR, Databricks, or GKE.

# Unified memory pool: fraction of (heap - 300 MB) allocated to storage + execution combined
# Default: 0.6 — raise to 0.75 on heap-rich clusters; lower on clusters with large off-heap usage
spark.memory.fraction=0.6

# Within the unified pool: minimum fraction protected for storage against execution eviction
# Default: 0.5 — raise to 0.6–0.7 for ML training jobs with heavy caching; lower for shuffle-heavy ETL
spark.memory.storageFraction=0.5

# Enable Kryo serializer for serialized storage levels (MEMORY_ONLY_SER, MEMORY_AND_DISK_SER)
# Typically 3–5x smaller than Java serialization for columnar or numeric data
spark.serializer=org.apache.spark.serializer.KryoSerializer

# Compress serialized RDD/DataFrame partitions before storing
# Reduces disk footprint for DISK_ONLY and spilled MEMORY_AND_DISK blocks; adds CPU overhead
spark.rdd.compress=true

# Local directories for DiskStore spill (comma-separated)
# Use local SSDs for best MEMORY_AND_DISK performance
spark.local.dir=/mnt/fast-ssd/tmp

# Remove checkpoints from local filesystem when their consuming RDD is GC'd
# Set to false to keep checkpoints for debugging or manual recovery
spark.cleaner.referenceTracking.cleanCheckpoints=true

# Off-heap memory size per executor (for OFF_HEAP storage level)
# Requires spark.memory.offHeap.enabled=true; reduces GC pressure on large heap machines
spark.memory.offHeap.enabled=false
spark.memory.offHeap.size=0

For iterative ML workloads on Databricks or EMR, a recommended starting point is spark.memory.fraction=0.7 with spark.memory.storageFraction=0.6 and Kryo enabled. This configuration favours the unified pool and protects cached training data from aggressive shuffle eviction. Profile with the Storage tab after the first training run and adjust if Fraction Cached drops below 0.8.


📚 Lessons Learned from Caching Gone Wrong

Lesson 1: Always check the Storage tab, not just the Stages tab. A job that completes "successfully" with a Fraction Cached of 0.3 is recomputing 70% of its supposedly-cached data. Without the Storage tab, you only see that the job ran — you do not see that caching was ineffective.

Lesson 2: The first action after cache() is free — in terms of cache value, not in terms of compute cost. Engineers frequently benchmark the first run and conclude that caching did not help. The correct benchmark compares the second and subsequent runs to a fully uncached baseline.

Lesson 3: MEMORY_AND_DISK is almost always a safer default than MEMORY_ONLY for DataFrames. The common advice to "cache in memory only for maximum speed" ignores the silent-drop failure mode. On any cluster where memory is not dedicated exclusively to caching (which is most production clusters), MEMORY_ONLY creates a reliability hazard. The deserialization overhead of MEMORY_AND_DISK_SER is almost always worth the reduction in eviction risk.

Lesson 4: Shuffles eat caching budget. The unified memory model means that heavy shuffle stages actively evict your cached data. If your job has both large caches and large shuffles, size the cluster to accommodate both — or increase spark.memory.fraction and tune spark.memory.storageFraction to balance the two workloads.

Lesson 5: unpersist() is not optional in production pipelines. In a Spark Streaming job or a scheduled batch loop that runs inside a long-lived SparkSession, every cache() without unpersist() is a memory leak. A job that runs cleanly for two hours can OOM after twelve hours purely from accumulated cached DataFrames that are no longer needed.

Lesson 6: Checkpoint long-lineage DataFrames in iterative algorithms. If you are running k-means or ALS for fifty iterations and a single executor failure causes Spark to recompute from the original data source, you may lose thirty minutes of progress. Checkpointing every ten iterations caps the recomputation cost to ten iterations' worth of work regardless of how many iterations have elapsed.


📌 Understanding Spark Caching Will Change How You Debug Performance

TLDR: Spark caching is lazy, partition-level, and managed by a per-executor BlockManager that competes for the same unified memory pool as shuffle execution. cache() defaults to MEMORY_AND_DISK for DataFrames; persist() gives explicit control over six storage levels from MEMORY_ONLY to DISK_ONLY. LRU eviction silently drops or spills blocks when memory pressure hits, and the Spark UI's Storage tab is the only way to confirm whether caching is actually working. Use MEMORY_AND_DISK_SER with Kryo as a safe default, always call unpersist() when done, and reach for checkpoint() rather than cache() when lineage depth makes recomputation expensive.


📝 Practice Quiz: Spark Caching and Persistence

Test your understanding of the concepts covered in this post.

  1. What does calling cache() on a DataFrame immediately do to the data?

    Show Answer Correct Answer: Nothing immediately. cache() (and persist()) marks the DataFrame for caching but does not materialise any data. The cache is populated lazily — partition by partition — the first time an action forces computation. Until an action is called, no blocks are written to any BlockManager.
  2. A Spark job runs four aggregation stages, each scanning a 50 GB DataFrame from S3. You add df.cache() before the first stage. After all four stages, the Spark UI shows "Fraction Cached: 0.4". What is the most likely cause?

    Show Answer Correct Answer: LRU eviction caused by memory pressure. The default storage level for a DataFrame cache() is MEMORY_AND_DISK, but if the available storage region in the unified memory pool was smaller than the DataFrame size, the MemoryStore evicted the least recently used partitions. If those partitions were on disk (spilled), they would count against "Size on Disk" but still appear as cached. A Fraction Cached of 0.4 under MEMORY_AND_DISK means roughly 60% of partitions were not successfully stored even on disk — likely because the DiskStore also filled up or the eviction happened under MEMORY_ONLY with partitions being silently dropped. Check whether Size on Disk is non-zero in the Storage tab to distinguish disk-spill from silent drop.
  3. What is the key difference between MEMORY_ONLY and MEMORY_ONLY_SER storage levels?

    Show Answer Correct Answer: MEMORY_ONLY stores data as deserialized Java objects directly in the JVM heap — fast to read (no deserialization needed) but 2–5× larger footprint than raw data. MEMORY_ONLY_SER stores data as serialized byte arrays — smaller footprint (especially with Kryo) but requires deserialization on every read, adding CPU overhead. Choose MEMORY_ONLY when CPU is the bottleneck and heap is abundant; choose MEMORY_ONLY_SER with Kryo when heap is constrained or GC pressure is high.
  4. What component is responsible for tracking which executor holds which cached partition across the entire Spark application?

    Show Answer Correct Answer: The BlockManagerMaster, which runs inside the Driver JVM. Each executor's BlockManager registers its blocks with the BlockManagerMaster via RPC. When a task needs a cached partition, it queries the master for the block's location and then fetches it directly from the holding executor using the NettyBlockTransferService. The master does not store data itself — it only maintains the registry.
  5. An iterative gradient descent job runs for 100 iterations over a training DataFrame cached with MEMORY_ONLY. After iteration 40, an executor OOMs and is restarted. What happens to the cached training data?

    Show Answer Correct Answer: All partitions held by the failed executor are lost. Since MEMORY_ONLY does not replicate blocks, the BlockManagerMaster removes those block registrations when it detects the executor loss. During iteration 41, tasks that need the lost partitions will find no entry in the BlockManager registry and will recompute those partitions by replaying the full lineage DAG from the original data source. The recomputed partitions will then be re-cached in the surviving executors' MemoryStore, but the recomputation adds latency proportional to the lineage depth. To prevent this, use MEMORY_AND_DISK_2 for fault-tolerant replication, or checkpoint() to truncate the lineage entirely.
  6. Open-ended challenge: A data engineer reports that adding cache() to the shared lookup DataFrame in a four-stage enrichment pipeline made the job slower, not faster. The Storage tab shows Fraction Cached at 1.0, all data in memory, and no eviction. What other factors could explain why caching a DataFrame with full memory coverage is still causing a slowdown?

    No single correct answer — think it through Consider: (1) Deserialization overhead on every read — if the source DataFrame reads from co-located HDFS with vectorised reads and the cache uses unoptimised Java serialization, deserializing from the MemoryStore can be slower than a sequential disk scan. Switch to Kryo or columnar-optimised caching. (2) Cache write latency on the first pass — if the pipeline is only run once end-to-end (not iterated), the first action pays both the compute cost and the cache write cost, and there are no subsequent reads to amortise it. (3) GC pressure from deserialized objects — caching a large DataFrame in MEMORY_ONLY (deserialized) adds significant heap pressure, which can increase GC pause frequency and duration across all tasks, slowing the entire job even though the cache hits are fast. (4) Columnar query pruning lost — Spark's Catalyst optimizer can push column and predicate pruning down to Parquet readers, skipping columns and row groups. Once data is cached, it is stored as full rows in the MemoryStore, and Catalyst cannot apply source-level pruning on subsequent reads from the cache. This is especially impactful when the downstream stages read only a few columns from a wide DataFrame.

Abstract Algorithms

Written by

Abstract Algorithms

@abstractalgorithms