All Posts

Reading and Writing Data in Spark: Parquet, Delta, JSON, and JDBC

The format you choose for Spark I/O determines whether predicate pushdown saves you 90% of a scan or reads every byte

Abstract AlgorithmsAbstract Algorithms
ยทยท34 min read
Share
AI Share on X / Twitter
AI Share on LinkedIn
Copy link

AI-assisted content. This post may have been written or enhanced with the help of AI tools. While efforts are made to ensure accuracy, the content may contain errors or inaccuracies. Please verify critical information independently.

TLDR: Parquet's columnar layout with row-group statistics enables predicate pushdown that can reduce a 500 GB scan to 8 GB. Delta Lake wraps Parquet with a JSON transaction log to add ACID semantics and time travel. JSON and CSV read every byte because they have no statistics. JDBC reads scale only with explicit numPartitions + partitionColumn. The format you choose is not cosmetic โ€” it determines the entire I/O path before a single task runs.

๐Ÿ“– The Team That Turned a 500 GB Daily Scan into an 8 GB Query

A platform analytics team at a mid-size SaaS company had a daily pipeline that read the previous seven days of raw user-event data from S3 and computed a set of funnel metrics. The data was stored as gzipped JSON โ€” one file per hour per region โ€” totalling approximately 500 GB. The pipeline ran for ninety minutes every morning, used forty executors, and cost the team about $180 per run on their cloud cluster.

A senior data engineer proposed a one-time migration: convert the raw JSON landing zone to date-partitioned Parquet and repoint the daily pipeline. The conversion took four hours and produced 78 GB of Parquet files across 7 date partitions โ€” an 84% storage reduction from compression and columnar layout alone. The daily pipeline, unchanged in logic, now completed in eleven minutes and cost $14 per run.

The delta was not faster hardware or more executors. The query used only 3 of the 47 columns in the schema, and its WHERE clause filtered to a single date partition. Against the JSON files, Spark had no way to skip anything โ€” every file, every row, every field had to be deserialized before a single predicate could be evaluated. Against the Parquet files, Spark skipped six of seven date partitions entirely (directory-level partition pruning), read only the 3 required column chunks per file (column projection), and within those column chunks skipped 78% of row groups whose statistics proved they contained no matching rows (predicate pushdown to the Parquet footer).

Format choice in Spark is not cosmetic. It determines whether Spark reads your data or reads around it.


๐Ÿ” The DataFrameReader and DataFrameWriter API and What You Can Control

Every Spark I/O operation passes through two symmetric APIs: DataFrameReader for reads and DataFrameWriter for writes. Understanding the options available at this surface is the prerequisite for understanding what the underlying format can and cannot do.

The DataFrameReader Pattern

The reader is accessed via spark.read and accepts a format identifier, optional key-value options, and a path. The supported built-in formats are:

FormatFormat identifierNotes
ParquetparquetDefault format in Spark SQL; columnar, splittable
Delta LakedeltaRequires delta-core library; adds ACID on top of Parquet
JSONjsonLine-delimited JSON (NDJSON); schema inferred by default
CSVcsvDelimiter-separated text; schema inferred by default
ORCorcHive-native columnar format; similar capabilities to Parquet
JDBCjdbcReads from any JDBC-compatible relational database
AvroavroRow-oriented binary format; schema stored in file header

The reader chain works as: specify format โ†’ apply options โ†’ load a path or paths. Options vary by format but share a common vocabulary for schema handling, inference, and corruption tolerance.

The DataFrameWriter Pattern and Write Modes

The writer is accessed via df.write or df.writeTo and mirrors the reader. Four save modes control what happens when the destination path or table already exists:

ModeBehavior
overwriteDeletes all existing data at the path, then writes new data
appendAdds new data to existing data; does not touch existing files
ignoreNo-op if destination already exists; silently skips the write
errorIfExistsThrows AnalysisException if destination already exists (default)

The partitionBy(column, ...) method on the writer instructs Spark to create a subdirectory per distinct value of the partition column(s) โ€” for example, date=2026-01-15/ โ€” enabling directory-level partition pruning on subsequent reads. This is a write-time decision with read-time consequences: once you choose partition columns, every downstream query benefits from predicate pushdown on those columns automatically.

The Write Path for Partitioned Output

When writing a partitioned DataFrame with partitionBy("date", "region"), each Spark task writes rows belonging to a single (date, region) combination into the corresponding subdirectory. The directory tree produced is:

output/
  date=2026-01-15/
    region=us-east/
      part-00000-abc123.parquet
      part-00001-def456.parquet
    region=eu-west/
      part-00002-ghi789.parquet
  date=2026-01-16/
    region=us-east/
      part-00003-jkl012.parquet

This directory layout is what Spark's file index reads when planning a query. Partition directories matching the query's predicate are included; all others are excluded before any task is launched.


โš™๏ธ Parquet Internals, the Delta Transaction Log, and How Partition Pruning Works

Parquet's Columnar Layout: Row Groups, Column Chunks, and Statistics

A Parquet file is divided horizontally into row groups โ€” each row group is a horizontal slice of the data, typically 128 MB. Within a row group, data for each column is stored contiguously as a column chunk. Column chunks are further divided into pages of approximately 1 MB each, which are the unit of compression and encoding.

Every column chunk stores column statistics in the Parquet footer: the minimum and maximum values of that column across all rows in the row group. Every page stores finer-grained page statistics. When Spark evaluates a predicate like WHERE revenue > 1000, it reads only the lightweight Parquet footer (a few KB at the end of the file), checks each row group's min/max statistics against the predicate, and skips every row group where max(revenue) <= 1000. This is predicate pushdown to the row group filter โ€” entire 128 MB horizontal slices of data are skipped without reading a single data page.

Dictionary encoding amplifies the storage savings further. For low-cardinality columns โ€” country, device_type, event_category โ€” Parquet stores a dictionary of distinct values and then encodes each page as a compact array of integer indices into that dictionary. A column containing 50 distinct country codes across 10 million rows is stored as a 50-entry dictionary plus an array of two-byte indices, dramatically reducing both file size and the amount of data read during a scan.

The vectorized Parquet reader (enabled by default) processes 4,096 rows at a time as columnar arrays rather than creating one Java object per row. This eliminates per-row object allocation overhead and allows SIMD-compatible expression evaluation in the JVM's JIT compiler. The alternative row-by-row reader exists for compatibility with complex nested types that the vectorized path does not yet handle.

Delta Lake's Transaction Log: ACID Without a Database Engine

Delta Lake stores data as Parquet files plus a _delta_log/ directory at the table root. This transaction log is the mechanism behind Delta's ACID guarantees.

Each write to a Delta table appends a new JSON commit file to _delta_log/:

_delta_log/
  00000000000000000000.json   โ† initial commit (table creation)
  00000000000000000001.json   โ† first write
  00000000000000000002.json   โ† second write (e.g., DELETE)
  00000000000000000010.json   โ† tenth write
  00000000000000000010.checkpoint.parquet  โ† checkpoint at commit 10
  _last_checkpoint              โ† pointer to latest checkpoint

Each JSON commit log entry records a list of actions: add actions for newly written Parquet files, remove actions for deleted files (logical deletion โ€” the data files are not immediately deleted), metaData actions for schema changes, and commitInfo for audit metadata. A sample commit file for a simple append looks like:

{"commitInfo":{"timestamp":1713398400000,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[\"date\"]"}}}
{"add":{"path":"date=2026-01-15/part-00000-abc123-c000.snappy.parquet","partitionValues":{"date":"2026-01-15"},"size":134217728,"modificationTime":1713398400000,"dataChange":true}}
{"add":{"path":"date=2026-01-15/part-00001-def456-c000.snappy.parquet","partitionValues":{"date":"2026-01-15"},"size":127926944,"modificationTime":1713398400000,"dataChange":true}}

Every ten commits, Delta writes a checkpoint file โ€” a single Parquet file summarizing all active add entries across all prior commit logs. Reading the full Delta table state then requires reading only the latest checkpoint plus any subsequent JSON commit files, rather than replaying all commit files from the beginning. The _last_checkpoint metadata file records which checkpoint is current.

Snapshot isolation means that a reader always sees the consistent state of the table as of the commit it started from. Concurrent writers use optimistic concurrency: each writer reads the current commit number, performs its work, and then attempts to write a new commit file. If another writer has advanced the commit number in the meantime, the conflicting writer retries or fails with a ConcurrentModificationException, depending on the conflict type.

Schema evolution in Delta is tightly controlled. By default, Delta rejects writes whose schema does not match the registered table schema. Setting mergeSchema = true on the write operation allows new columns to be appended to the schema; the commit log's metaData action records the updated schema so all subsequent readers see it automatically.

Partition Pruning: How Directory Structure Eliminates I/O Before Task Launch

Partition pruning is the highest-value optimization for partitioned datasets because it operates at the file-listing level, before Spark allocates a single task. The process is:

  1. Spark's FileIndex recursively lists the directories under the table path.
  2. For each directory that encodes a partition value (e.g., date=2026-01-15), Spark extracts the partition column name and value.
  3. When planning a query, the PartitionFilters portion of the predicate is evaluated against the extracted partition values.
  4. Only directories where the partition value satisfies the predicate are included in the file scan plan.
  5. The FileScan physical plan node is created with only the pruned file list โ€” excluded partitions are never read.

For a query like SELECT * FROM events WHERE date = '2026-01-15' over a table with 365 date partitions, Spark lists 365 directories, evaluates date = '2026-01-15' against all 365 partition values, and then plans a FileScan that contains only the single matching directory. The other 364 directories never appear in the task plan. No I/O, no tasks, no data movement for any of them.

The critical constraint: partition pruning only fires when the predicate is on a partition column. A predicate on a non-partition column cannot prune directories โ€” it becomes a DataFilter that Spark evaluates after reading each row, or at best a PushedFilter that Spark pushes into the Parquet footer check. The choice of partition column at write time directly determines which predicates can prune at the directory level.


๐Ÿง  Deep Dive: How Spark Plans and Executes a Parquet Read Internally

The Internals of Parquet Reads in Spark via DataSourceV2

Since Spark 2.3, the Parquet data source (and most others) is implemented via the DataSourceV2 API. This API introduces a clean separation between scan planning (what to read) and scan execution (how to read it), enabling more sophisticated optimizations than the original DataSource V1 API allowed.

When spark.read.parquet("s3://bucket/events") is called, the execution sequence is:

Step 1 โ€” File scan planning. Spark calls the data source's Table.newScanBuilder() method. The scan builder receives the query's predicate expressions and requested column list from the Catalyst optimizer. This is where the data source signals back which predicates it can handle natively (SupportsPushDownFilters) and which columns it will read (SupportsPushDownRequiredColumns).

Step 2 โ€” Footer metadata read. For each Parquet file in the scan plan, Spark reads the Parquet footer. The footer is a compact binary structure at the end of the file containing: row group metadata (byte offset, row count, compressed size per column chunk), column statistics (min/max/null count per column chunk), and the file schema. Reading the footer typically requires one or two small HTTP range requests against S3, regardless of file size.

Step 3 โ€” Row group filtering. Armed with the PushedFilters from the optimizer and the column statistics from the footers, Spark's Parquet reader evaluates each predicate against each row group's min/max statistics. Row groups where the predicate cannot possibly match โ€” for instance, a row group where max(event_type) < 'purchase' when the predicate is event_type = 'purchase' โ€” are marked as skipped. No data pages from those row groups are fetched.

Step 4 โ€” Column projection. Only the column chunks corresponding to the requested columns are fetched and decompressed. If the query selects 3 of 47 schema columns, the remaining 44 column chunks are never read. For a 1 GB row group with 47 columns of roughly equal size, this reduces the data read per row group from 1 GB to approximately 64 MB.

Step 5 โ€” Vectorized decode. The VectorizedParquetRecordReader reads column data as typed columnar arrays of 4,096 values (one batch). Expression evaluation over these arrays operates on contiguous memory and benefits from CPU cache efficiency and JIT vectorization. The output is an InternalRow batch that Spark's execution engine processes as a columnar ColumnarBatch.

The diagram below shows this complete read path from SparkSession.read through to the final DataFrame. Notice that the optimization decisions โ€” partition pruning, filter pushdown, column projection โ€” all happen at planning time, before a single byte of actual data is read from storage.

graph TD
    A[SparkSession.read.parquet - path]
    B[DataSourceV2 scan builder]
    C[FileIndex - list partition directories]
    D[PartitionPruning - evaluate partition filters]
    E[Footer metadata read - row group statistics per file]
    F[Row group filtering - skip groups where predicate cannot match]
    G[Column projection - fetch only requested column chunks]
    H[Vectorized decode - 4096 rows per columnar batch]
    I[DataFrame - InternalRow columnar batches]

    A --> B
    B --> C
    C --> D
    D --> E
    E --> F
    F --> G
    G --> H
    H --> I

The fork between D and E is the key architectural insight: partition pruning eliminates entire files before footer reads happen; footer reads eliminate entire row groups before column data is fetched; column projection eliminates columns before pages are decompressed. Each gate operates on dramatically cheaper metadata than the gate that follows it.

Performance Analysis: Format Comparison Across Compression, Speed, and Pushdown Support

The following table compares the six primary Spark data sources across the dimensions that matter most for I/O-bound workloads.

FormatCompression ratioScan speed (relative)Predicate pushdownSchema evolutionACID writesBest use case
ParquetHigh (3โ€“7x vs raw)Fastest for analytical readsRow group + column statisticsmergeSchema optionNoAppend-only analytical tables
Delta LakeHigh (same as Parquet)Fastest + ACID overheadRow group + column + Z-orderNative, schema enforcementYesMutable data lake tables
ORCHigh (similar to Parquet)Fast; Hive-native pathStripe-level statisticsLimitedNoHive metastore ecosystem
JSONLow (text + whitespace)Slowest โ€” parses every byteNoneFull flexibilityNoLanding zone, API ingestion
CSVLow (text)Slowest โ€” parses every characterNoneSchema inference onlyNoExternal data exchange
JDBCN/A (remote DB)Database-bound; single partition by defaultWHERE clause (partial)Database-controlledDB-controlledOperational DB pipelines

A few observations that the table abstracts away deserve explicit mention:

Parquet vs Delta read performance. For pure append-only workloads, raw Parquet reads are marginally faster than Delta reads because Delta must reconstruct the active file list from the transaction log on every scan. For tables with many small commits, this overhead grows โ€” Delta's checkpoint mechanism (every 10 commits) exists precisely to bound this cost.

JSON's hidden cost beyond decompression. JSON parsing in Spark is not just slow because of text overhead โ€” it is slow because the parser must handle variable-width fields, escape sequences, nested structures, and null handling on every single row. Even a perfectly structured, single-level JSON document with 10 fields requires the parser to process approximately 200โ€“400 characters per row to extract 3 values. Parquet reads those same 3 values from pre-typed binary column chunks.

JDBC's default single-partition bottleneck. Without explicit parallelism configuration, Spark issues a single JDBC query against the database and reads the entire result set on one partition โ€” zero parallelism, one executor thread. For any table larger than a few million rows, this produces a single-task bottleneck that no amount of executor scaling can fix, because the executor count is irrelevant when only one executor is doing any work.

ORC vs Parquet in practice. ORC and Parquet have nearly identical capability profiles. The practical reason to choose ORC is existing Hive infrastructure: ORC's integration with Hive's metastore schema tracking and its ACID support in Hive 3 are mature. For Spark-native workloads without a Hive dependency, Parquet has broader ecosystem support and slightly better Spark optimizer integration.


๐Ÿ“Š Tracing the Parquet Read Path: From Storage to DataFrame

The following diagram shows both the planning path and the execution path for a partitioned Parquet table read with predicate pushdown. The left column shows planning-time decisions (no data read); the right column shows execution-time I/O. The point where control transfers from planning to execution is the FileScan plan node โ€” once that node exists, Spark knows exactly which files and row groups to read, and no planning-time decision can be revisited without replanning.

graph TD
    QP[Query with WHERE predicates]
    CA[Catalyst Analyzer]
    CO[Catalyst Optimizer]
    PP[Partition pruning at FileIndex]
    FP[Filter pushdown to DataSourceV2]
    CP[Column pruning - ReadSchema]
    FS[FileScan plan node - pruned file list]
    DA[DAGScheduler creates FileScanRDD tasks]
    T1[Task 1 - reads file1 footer]
    T2[Task 2 - reads file2 footer]
    TN[Task N - reads fileN footer]
    RG[Row group filter - skip non-matching row groups]
    CD[Column data fetch - only projected columns]
    VD[Vectorized decode - columnar batches]
    DF[DataFrame result]

    QP --> CA
    CA --> CO
    CO --> PP
    PP --> FP
    FP --> CP
    CP --> FS
    FS --> DA
    DA --> T1
    DA --> T2
    DA --> TN
    T1 --> RG
    T2 --> RG
    TN --> RG
    RG --> CD
    CD --> VD
    VD --> DF

The planning phase (top half of the diagram) operates entirely on metadata: directory listings, footer statistics, and schema information. All three major I/O reductions โ€” partition pruning, row group filtering, column projection โ€” are encoded into the FileScan plan node before the first byte of actual data is read. Each task in the execution phase independently reads its assigned file's footer, applies the row group filter, and fetches only the projected columns. Tasks share no state during execution โ€” the planning-time metadata is the sole communication channel.


๐ŸŒ Delta Lake Merges, JDBC Parallel Reads, and Dynamic Partition Overwrites

Delta Lake MERGE: The Upsert Pattern for Mutable Data Lakes

The most common mutation pattern in data lake workloads is the upsert (insert or update): for each incoming record, update the existing row if a matching key exists, or insert a new row if it does not. Relational databases handle this natively with SQL MERGE or INSERT ... ON CONFLICT. For Parquet-based lakes, implementing upsert without Delta requires reading all existing data, merging the changes in memory, and rewriting entire partitions โ€” an O(table size) operation regardless of change volume.

Delta Lake implements this as a first-class MERGE INTO operation backed by the transaction log:

MERGE INTO target_table AS target
USING source_data AS source
ON target.user_id = source.user_id
WHEN MATCHED THEN UPDATE SET
    target.last_seen = source.event_ts,
    target.session_count = target.session_count + 1
WHEN NOT MATCHED THEN INSERT *

Internally, Delta executes a MERGE as two phases: first, it identifies which data files contain rows matching the ON condition (using partition pruning and Delta's file statistics); second, it rewrites only those affected files, adding new add and remove entries in a single atomic commit to the transaction log. The key insight is that unaffected files are never read or rewritten โ€” Delta's transaction log allows file-level selective mutation that raw Parquet cannot express.

For Delta tables with millions of small files, the OPTIMIZE command compacts them into larger files and writes a new commit recording the replacements, keeping the table readable with minimal overhead throughout the compaction.

JDBC Parallel Reads: Breaking the Single-Partition Bottleneck

JDBC reads in Spark default to a single partition: one executor runs one SQL query, fetches all rows, and materializes them into a DataFrame. This is safe but not scalable. For any operational database table larger than a few million rows, the correct approach is to partition the JDBC read across multiple concurrent queries using three options:

  • partitionColumn: the column used to split the read (must be numeric or date type)
  • lowerBound: the minimum value of the partition column to include
  • upperBound: the maximum value of the partition column to include
  • numPartitions: how many parallel JDBC queries to issue

Given these four values, Spark generates numPartitions SQL queries, each adding a range predicate on partitionColumn:

Query 1: WHERE order_id >= 1         AND order_id < 1000001
Query 2: WHERE order_id >= 1000001   AND order_id < 2000001
...
Query N: WHERE order_id >= (N-1)*1M  AND order_id <= 4000000

All N queries run concurrently, each on a separate executor thread connecting to the database. The parallelism is bounded only by the database's connection limit and the executor count. For numPartitions = 20 against a 4-million-row table, each query fetches 200,000 rows โ€” a reasonable per-partition size for most JDBC-to-Spark pipelines.

The critical constraint: if the data distribution is skewed across the partitionColumn range, partition sizes are uneven. An order_id column with values clustered around a recent date range will produce a handful of very large partitions and many empty ones. In such cases, AQE can coalesce small post-read partitions, but it cannot rebalance the underlying JDBC queries after they have been issued.

Dynamic Partition Overwrite: Replacing Only What Changed

Static partition overwrite (the default) deletes all directories under the write path and rewrites all partitions present in the new data. This is safe but destructive: if the new DataFrame contains only date=2026-01-15, static overwrite deletes date=2026-01-14, date=2026-01-13, and all other prior dates before writing the new partition.

Dynamic partition overwrite changes this semantics: Spark deletes and replaces only the partition directories present in the new DataFrame, leaving all other partitions untouched. For an incremental daily pipeline that writes a single new date partition, dynamic overwrite is the correct mode:

  • Static overwrite on date=2026-01-15 deletes the entire table, then writes one partition.
  • Dynamic overwrite on date=2026-01-15 deletes only date=2026-01-15/, then writes the new data for that date.

This is controlled via the Spark configuration spark.sql.sources.partitionOverwriteMode=dynamic (default is static).

Reading Partitioned Hive Tables with Metastore-Registered Partitions

When a Parquet or ORC table is registered in the Hive metastore, partition metadata is stored in the metastore rather than inferred from directory listing. Spark's HiveTableScanExec queries the metastore for partition locations matching the query predicate, avoiding the directory listing entirely. For tables with thousands of partitions, this can cut scan planning time from seconds (directory listing against S3) to milliseconds (metastore query). The trade-off is that the metastore must be kept in sync via MSCK REPAIR TABLE after new partitions are written, or via ALTER TABLE ... ADD PARTITION per partition.


โš–๏ธ Trade-offs: Small File Accumulation, Format Mismatches, and JDBC Bottlenecks

The Small File Problem in Streaming Writes

Structured Streaming writes to Parquet produce one or more files per micro-batch per partition. At a 60-second batch interval, 500 output partitions, and 10 micro-batches per hour, a single streaming job generates 300,000 small files per hour. Each file is a valid Parquet file with its own footer, row group metadata, and storage overhead โ€” but each row group may contain only a few thousand rows instead of the 128 MB target.

The consequences are significant: Spark's file listing phase must enumerate hundreds of thousands of files, the task scheduler creates one task per file (or one per file per core depending on maxPartitionBytes), and each task reads a tiny amount of data relative to its startup overhead. A 10 MB file contains approximately 80,000 rows of typical event data. With 300,000 files, that is 300,000 tasks for a query that could have run as 2,000 tasks against 150 properly sized 128 MB Parquet files.

Delta Lake's OPTIMIZE command and AUTO OPTIMIZE (on Databricks) solve this by compacting small files into target-sized files in a background process. For plain Parquet, the remedy is periodic compaction jobs that re-read a partition and write it back with coalesce(N) targeting the desired file count.

Parquet vs Delta: When the Transaction Log Cost Matters

Delta Lake is strictly more capable than plain Parquet โ€” it adds ACID, time travel, schema enforcement, and MERGE support. But this capability comes with a cost that matters for some workloads:

Read overhead: Every Delta table read begins with a transaction log reconstruction โ€” reading the latest checkpoint file plus any subsequent JSON commit files. For a table with a very large number of active files and frequent small commits, log reconstruction can take 10โ€“30 seconds. Checkpointing every 10 commits bounds this, but very active tables may still have observable log-read overhead.

Write overhead: Delta writes involve committing to the transaction log atomically, which requires a distributed lock (usually Hadoop-compatible file renaming or DynamoDB-based locking on S3). Under high write concurrency, lock contention slows individual writes.

For append-only archival tables with no need for mutation, time travel, or concurrent writes, plain Parquet is simpler, faster to read, and free of infrastructure dependencies. For any table that receives updates, deletes, or schema changes, Delta's ACID guarantees pay for their overhead many times over by eliminating the manual partition-rewrite patterns that plain Parquet requires.

JSON and CSV for Production Analytical Workloads

JSON and CSV exist in production Spark pipelines primarily as landing zone formats โ€” the format in which raw data arrives from external sources. API webhooks, CDC event streams, and file exports from operational databases frequently produce JSON or CSV. The correct pattern is to accept these formats at the boundary, validate and type-coerce the data, and immediately convert to Parquet or Delta for all downstream analytical access.

Using JSON directly for repeated analytical queries is expensive because Spark must deserialize the full text representation on every query, regardless of which columns are needed. There is no footer to consult, no statistics to evaluate, no columns to skip โ€” every byte of every file must be read and parsed. A 500 GB JSON dataset that could be answered from an 8 GB Parquet file will always cost 500 GB of I/O unless it is converted.

Schema inference from JSON (the default) compounds the cost: Spark reads an entire sample of the data to infer types, which can add minutes to the planning phase for large datasets. Disabling schema inference by providing an explicit schema avoids this overhead.


๐Ÿงญ Choosing the Right Format: A Decision Guide for Spark I/O

Use this table when choosing a data format for a new Spark pipeline or migrating an existing one. The "format recommendation" column names the best default; the "conditions" column lists the circumstances that change the recommendation.

Use caseRecommended formatConditions that override
Append-only event logs at scaleParquet with date partitioningIf mutations are ever needed, use Delta instead
Mutable fact tables with upserts or deletesDelta LakeNone โ€” there is no Parquet-only solution for ACID updates
Time-travel or audit trail queriesDelta LakeNone โ€” transaction log is required
External operational database (live reads)JDBC with numPartitions + partitionColumnIf you can export to S3 first, use Parquet instead
API webhook landing zoneJSON (schema inference disabled, explicit schema provided)Always convert to Parquet/Delta immediately after landing
Hive metastore ecosystem with Hive ACIDORCOnly if Hive 3 ACID is required; otherwise prefer Parquet
Schema-flexible exploration of unknown dataJSON with samplingRatio=0.1Convert to typed Parquet once schema is understood
Streaming writes with low-latency micro-batchesDelta Lake with Auto OptimizePlain Parquet accumulates small files without compaction
Machine-to-machine file exchange with CSV consumersCSVOnly for the exchange boundary; convert immediately

Two meta-rules that override any specific cell in this table:

Never keep analytical data in JSON or CSV beyond the landing zone. The scan cost is too high. Convert on ingestion and never look back.

Default to Delta over plain Parquet for any new table. Delta's overhead is small, its safety guarantees are significant, and migrating from plain Parquet to Delta after an accidental overwrite destroys data is a painful experience.


๐Ÿงช Reading the Evidence: Explain Output and a Delta Transaction Log Walk-Through

This section demonstrates two diagnostic skills: reading a Spark explain() output to understand what predicate pushdown is doing, and reading a Delta transaction log to understand what a write actually committed.

Example 1 โ€” Interpreting a Parquet Scan's explain() Output

Consider a query selecting purchase events for a single date from a partitioned Parquet table with 47 schema columns. The explain(true) physical plan output contains the following FileScan node:

== Physical Plan ==
*(1) ColumnarToRow
+- FileScan parquet events[event_type#5, user_id#6, revenue#7]
   Batched: true,
   DataFilters: [isnotnull(event_type#5), (event_type#5 = purchase)],
   Format: Parquet,
   Location: InMemoryFileIndex(1 paths)[s3://data-lake/events/date=2026-01-15],
   PartitionFilters: [isnotnull(date#0), (date#0 = 2026-01-15)],
   PushedFilters: [IsNotNull(event_type), EqualTo(event_type,purchase)],
   ReadSchema: struct<event_type:string,user_id:string,revenue:double>

Reading this output section by section tells the full I/O story:

Location shows a single path after partition pruning. The original table has 365 date partitions but only date=2026-01-15 survived the PartitionFilters evaluation โ€” Spark's file index listed 365 directories and included exactly one. The phrase "1 paths" is the signal: only one partition directory will be scanned.

PartitionFilters lists the predicate components that operated at the directory level: (date#0 = 2026-01-15). These were evaluated against partition metadata, not data. Zero data files were opened for the other 364 partitions.

PushedFilters lists the predicates sent to the Parquet row-group filter: IsNotNull(event_type) and EqualTo(event_type, purchase). Before reading any data page, Spark evaluates these against each row group's min/max(event_type) statistics. Row groups where max(event_type) is alphabetically less than purchase โ€” or min(event_type) is greater โ€” are skipped entirely. In this table, approximately 78% of row groups were skipped by this check.

DataFilters lists the same predicates, now shown in their Spark-side form. These are applied to rows that passed the row group filter and were decoded. DataFilters re-evaluates what PushedFilters could not fully eliminate at the row group level โ€” because min/max statistics are approximate, some row groups may pass the statistics check but contain no matching rows.

ReadSchema shows struct<event_type:string, user_id:string, revenue:double> โ€” only 3 of the 47 schema columns. Column projection eliminated 44 column chunks from every row group. If each row group is 128 MB distributed uniformly across 47 columns, reading only 3 columns reduces each row group's data fetch from 128 MB to approximately 8 MB.

Example 2 โ€” Reading a Delta Transaction Log Entry

After a Delta MERGE upsert that updated 12,000 rows and inserted 3,500 new rows across two date partitions, the commit log entry at _delta_log/00000000000000000042.json contains:

{"commitInfo":{"timestamp":1713484800000,"operation":"MERGE","operationParameters":{"predicate":"(target.user_id = source.user_id)","matchedPredicates":"[{\"actionType\":\"update\"}]","notMatchedPredicates":"[{\"actionType\":\"insert\"}]"},"readVersion":41,"isolationLevel":"Serializable","isBlindAppend":false}}
{"remove":{"path":"date=2026-01-14/part-00003-old-abc.snappy.parquet","deletionTimestamp":1713484800000,"dataChange":true}}
{"remove":{"path":"date=2026-01-15/part-00007-old-def.snappy.parquet","deletionTimestamp":1713484800000,"dataChange":true}}
{"add":{"path":"date=2026-01-14/part-00003-new-xyz.snappy.parquet","partitionValues":{"date":"2026-01-14"},"size":131072000,"modificationTime":1713484800000,"dataChange":true}}
{"add":{"path":"date=2026-01-15/part-00007-new-uvw.snappy.parquet","partitionValues":{"date":"2026-01-15"},"size":127926944,"modificationTime":1713484800000,"dataChange":true}}

This single commit atomically represents the entire MERGE operation. The two remove entries mark the original files as logically deleted (the physical files remain on disk until vacuum is run). The two add entries point to the newly written files containing the merged result. Any reader that starts a query after this commit sees the new files; any reader mid-query before this commit sees the old files. No partial state is ever visible โ€” this is the ACID guarantee in action.

The readVersion: 41 field records which snapshot the writer read from. If another writer concurrently modified these same files between reading at version 41 and committing at version 42, Delta's optimistic concurrency check would detect the conflict and retry or reject the write โ€” preventing a silent data corruption that would be invisible in a plain Parquet overwrite.


๐Ÿ› ๏ธ Spark Data Source Configuration Reference

The following properties control Parquet read optimizations, Delta Lake write behavior, file sizing, and JDBC parallelism. Apply them in spark-defaults.conf, as --conf arguments to spark-submit, or programmatically via SparkConf.

# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
# PARQUET PREDICATE PUSHDOWN
# Enables pushing down filter predicates to the Parquet footer
# reader, allowing row groups to be skipped before data is read.
# Default: true  |  Set false only for debugging โ€” disabling this
# causes Spark to read every row group regardless of statistics.
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
spark.sql.parquet.filterPushdown=true

# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
# PARQUET SCHEMA MERGING
# When true, reads the footer of every file to construct the
# superset schema across all files (schema union). Required when
# different files in the same table have different columns.
# Default: false  |  Enable only when necessary โ€” reading every
# footer adds significant overhead for large numbers of files.
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
spark.sql.parquet.mergeSchema=false

# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
# PARQUET VECTORIZED READER
# Reads Parquet data as columnar batches of 4096 rows rather than
# one Java object per row. Dramatically reduces GC pressure and
# enables CPU-efficient SIMD expression evaluation.
# Default: true  |  Set false only for complex nested types that
# the vectorized path does not support.
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
spark.sql.parquet.enableVectorizedReader=true

# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
# TARGET FILE SIZE FOR READS
# Maximum number of bytes per partition when reading from file
# sources. Controls how many files Spark groups into a single task.
# Default: 128m  |  Increase to 256mโ€“512m for large Parquet files
# to reduce task count and scheduling overhead.
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
spark.sql.files.maxPartitionBytes=134217728

# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
# DELTA LAKE OPTIMIZED WRITES (Databricks / Delta 2.x)
# Automatically coalesces small shuffle partitions into larger
# output files to avoid the small file problem on write.
# Default: false  |  Enable for streaming Delta writes and any
# pipeline that produces high partition counts per micro-batch.
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
spark.databricks.delta.optimizeWrite.enabled=true

# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
# DELTA SCHEMA ENFORCEMENT
# Controls whether Delta rejects writes with schema mismatches.
# Default: true (strict enforcement)
# Set mergeSchema=true per-write to allow safe column additions.
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
spark.databricks.delta.schema.autoMerge.enabled=false

# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
# PARTITION OVERWRITE MODE
# static: overwrites ALL partition directories, then writes new data
# dynamic: overwrites ONLY the partition directories present in
#          the new DataFrame, preserving all other partitions.
# Default: static  |  Set dynamic for incremental daily pipelines
# that write a subset of partitions per run.
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
spark.sql.sources.partitionOverwriteMode=dynamic

# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
# JDBC PARALLELISM (passed as DataSource options, not spark-defaults)
# These are not spark.sql properties โ€” they are options passed
# directly to spark.read.format("jdbc").option(key, value):
#
#   numPartitions    : number of parallel JDBC queries (and tasks)
#   partitionColumn  : numeric/date column to split the range on
#   lowerBound       : minimum partition column value (inclusive)
#   upperBound       : maximum partition column value (exclusive)
#
# Example: numPartitions=20, partitionColumn=order_id,
#          lowerBound=1, upperBound=4000001
# Generates 20 concurrent SQL queries, each fetching ~200K rows.
# Warning: data skew in partitionColumn produces uneven task sizes.
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
# spark.sql.jdbc.numPartitions (not a real Spark property โ€” see above)

For a full guide on Spark's DAGScheduler, stage planning, and how file scans become tasks on executor JVMs, see the companion post on Spark architecture linked in the Related Posts section below.


๐Ÿ“š Lessons Learned from Production Spark I/O Failures

Lesson 1: Every JSON query is a full table scan, always. The most common mistake in new Spark pipelines is leaving raw event data in JSON because "it's easier to debug." Easier to debug, yes โ€” but it costs full-table I/O on every query. A properly partitioned Parquet table that serves the same analytical queries for 10% of the I/O cost should be the first engineering decision, not the last.

Lesson 2: A predicate on a non-partition column cannot prune directories. Engineers frequently add a WHERE clause on a non-partition column and expect it to reduce the number of files scanned. It does not โ€” it becomes a DataFilter evaluated per row after reading. Partition pruning requires the predicate to be on a partition column. This means the partition column choice at write time is an architectural decision that determines query performance for the table's lifetime.

Lesson 3: spark.sql.parquet.mergeSchema=true has a hidden cost proportional to file count, not data volume. Because schema merging reads every file's footer, a table with 100,000 small Parquet files requires 100,000 footer reads at planning time โ€” potentially minutes of planning overhead before a single data page is read. Keep this setting at its default false unless schema merging is genuinely required, and run explicit schema unification jobs instead.

Lesson 4: The Delta transaction log checkpoint interval determines read performance under high write frequency. A table that receives thousands of small appends per day without checkpointing accumulates thousands of JSON commit files. Every reader must replay all of them on open. Delta checkpoints every 10 commits by default; for high-frequency write workloads, reducing the checkpoint interval eliminates this log-read overhead.

Lesson 5: JDBC without numPartitions is not a Spark job โ€” it is a single-threaded JDBC fetch. Adding 100 executors to a cluster does not help a JDBC read that issues one query. The first question for any JDBC-based Spark pipeline is: "what is the partitionColumn, what are the bounds, and how many partitions?" Without answers to these three questions, the job will not scale.

Lesson 6: Dynamic partition overwrite prevents accidental full-table deletion in incremental pipelines. The default static overwrite mode deletes all partitions, then writes only the ones present in the new DataFrame. An incremental daily job that writes date=2026-01-15 with static overwrite deletes all historical data before writing the single new day. This mistake has destroyed production tables. Dynamic partition overwrite is always the correct mode for incremental pipelines; the only case for static overwrite is when you intentionally want to replace the entire table.

Lesson 7: Column projection in the explain() ReadSchema is the fastest validation that your query is efficient. Before trusting any Spark job to production, run explain(true) and check the ReadSchema in the FileScan node. If it lists all 47 schema columns for a query that only needs 3, either the column pruning optimization is not firing (uncommon) or upstream transformations are referencing columns that could be dropped earlier in the plan. Projecting away unused columns before wide transformations is one of the highest-ROI query rewrites available.


๐Ÿ“Œ Format Is an Architectural Choice, Not a File Extension

TLDR: Parquet's columnar layout with row-group statistics enables predicate pushdown that can reduce a 500 GB scan to 8 GB. Delta Lake wraps Parquet with a JSON transaction log to add ACID semantics and time travel. JSON and CSV read every byte because they have no statistics. JDBC reads scale only with explicit numPartitions + partitionColumn. The format you choose is not cosmetic โ€” it determines the entire I/O path before a single task runs.

Every format choice in Spark I/O has a corresponding I/O path. Parquet enables three-level I/O reduction: directory-level partition pruning, row-group-level predicate pushdown, and column-level projection. Together these three gates can reduce a 500 GB full-table scan to an 8 GB targeted read for analytical queries. Delta Lake adds a transaction log on top of Parquet, enabling ACID writes, time travel, and schema enforcement โ€” at the cost of log reconstruction overhead that checkpointing bounds.

JSON and CSV are full-scan formats by nature. They have no statistics, no columnar layout, and no footer. They belong in landing zones, not analytical layers. JDBC is a single-partition-by-default source that requires explicit numPartitions and partitionColumn configuration to scale beyond one executor thread.

The three most impactful decisions in any Spark I/O design:

  1. Partition column choice โ€” determines which predicates become directory-level pruning (zero I/O) versus row-level filtering (full I/O). Choose the column most commonly used in range predicates.
  2. Format choice โ€” Parquet for append-only analytical workloads; Delta Lake for anything that requires mutations, upserts, time travel, or concurrent writes.
  3. Dynamic vs static partition overwrite โ€” always use dynamic for incremental pipelines unless intentional full-table replacement is required.

The platform analytics team from the opening story did not change a single line of query logic. They changed the format, the partition column, and the write mode โ€” and reduced their daily pipeline cost by 92%.


๐Ÿ“ Practice Quiz: Spark I/O Internals and Format Selection

  1. A Spark query runs SELECT user_id, revenue FROM events WHERE date = '2026-01-15' AND event_type = 'purchase'. The table has 47 columns, is partitioned by date, and stored as Parquet. Which part of the explain() output tells you that column projection is working correctly, and what should it show?
Show Answer Correct Answer: The ReadSchema field in the FileScan node. If column projection is working, ReadSchema should show only struct<user_id:string, revenue:double> โ€” the two columns actually selected. If it shows all 47 schema columns, either a wildcard reference upstream in the plan is preventing projection, or a column in a preceding transformation references columns that Catalyst cannot eliminate. Seeing only the requested columns in ReadSchema confirms that 45 of 47 column chunks are being skipped per row group โ€” reducing per-row-group I/O by approximately 94%.

  1. A Parquet table has 365 date partitions. A query includes WHERE date BETWEEN '2026-01-01' AND '2026-01-31'. How many partition directories does Spark scan, and how is this determined?
Show Answer Correct Answer: Spark scans 31 partition directories โ€” only those matching date values in January 2026. The FileIndex lists all 365 directories, extracts the date partition value from each directory name, and evaluates the BETWEEN predicate against each extracted value. The 334 directories outside the range are excluded from the FileScan plan before any task is launched. The PartitionFilters field in explain() will show (date#0 >= 2026-01-01 AND date#0 <= 2026-01-31). No data from the excluded 334 directories is ever read.

  1. A Delta Lake table receives 200 small appends per day. A Spark reader opens the table and the query takes 45 seconds just for planning before any data scan begins. What is the most likely cause, and what configuration change resolves it?
Show Answer Correct Answer: The Delta transaction log has accumulated too many JSON commit files without checkpointing. With 200 daily appends and the default checkpoint interval of 10 commits, Delta creates one checkpoint per 10 commits โ€” 20 checkpoints per day โ€” but with 200 commits, the reader must still read up to 10 JSON commit files after the latest checkpoint. If checkpointing is disabled or the log has accumulated from many prior days, the reader may replay hundreds or thousands of JSON commit files before it can determine the active file list. The fix is to trigger OPTIMIZE on the table (which also triggers checkpointing) and to reduce the checkpoint interval via spark.databricks.delta.checkpoint.interval=5 for high-frequency write workloads.

  1. A JDBC read of a 10-million-row PostgreSQL table using the default Spark JDBC configuration completes, but only one Spark task is created. What configuration changes are needed to read this table with 20-way parallelism, and what constraint must the partitionColumn satisfy?
Show Answer Correct Answer: The JDBC read must be configured with four options: numPartitions=20, partitionColumn=<column_name>, lowerBound=<min_value>, and upperBound=<max_value+1>. Spark generates 20 SQL queries, each fetching a range of partitionColumn values. The constraint is that partitionColumn must be a numeric (integer, bigint) or date/timestamp column โ€” string columns are not supported. The column should also be reasonably uniformly distributed across its range; a heavily skewed column produces uneven partition sizes and one slow straggler task that holds up the entire stage.

  1. A Spark streaming job writes to a partitioned Parquet table with 500 output partitions every 30 seconds. After 24 hours, queries against the table are dramatically slower than expected. What is the root cause, and what are the two remedies?
Show Answer Correct Answer: The root cause is the small file problem. At 500 partitions ร— 2 micro-batches per minute ร— 60 minutes ร— 24 hours = 1,440,000 small files. Each file is a valid Parquet file but contains very few rows, far smaller than the 128 MB target. Query planning must list all 1.44 million files, creating one task per file (or per maxPartitionBytes grouping), resulting in millions of tiny tasks. The two remedies are: (1) migrate the sink to Delta Lake and enable spark.databricks.delta.optimizeWrite.enabled=true, which automatically coalesces small write partitions into larger output files; or (2) run a periodic Parquet compaction job that reads each partition and rewrites it with coalesce(N) to produce a bounded number of right-sized files.

  1. Open-ended challenge: A Parquet table with 50 columns is queried daily for aggregations that use 4 specific columns. The explain() output shows ReadSchema with all 50 columns. What could cause this, and what steps would you take to diagnose and fix it?

Abstract Algorithms

Written by

Abstract Algorithms

@abstractalgorithms