All Posts

Spark DataFrames and Spark SQL: Schema, DDL, and the Catalyst Optimizer

Catalyst turns your DataFrame ops into an optimized physical plan — understanding it explains why column pruning and predicate pushdown are free

Abstract AlgorithmsAbstract Algorithms
··25 min read
Share
AI Share on X / Twitter
AI Share on LinkedIn
Copy link

AI-assisted content. This post may have been written or enhanced with the help of AI tools. While efforts are made to ensure accuracy, the content may contain errors or inaccuracies. Please verify critical information independently.

TLDR: Catalyst is Spark's query compiler. It takes any DataFrame operation or SQL string, parses it into an abstract syntax tree, resolves column references against the catalog, applies a library of algebraic rewrite rules to produce an optimized logical plan, then selects a physical execution strategy and generates JVM bytecode — all before a single row of data moves. Column pruning, predicate pushdown, and join reordering happen automatically inside this pipeline, which is why a 3-line DataFrame query consistently outperforms hand-tuned RDD code.


📖 The Junior Engineer Who Rewrote Your Three Days of RDD Tuning in Three Lines

A senior data engineer at a mid-sized fintech company had spent three days squeezing performance out of a PySpark RDD pipeline. They had manually partitioned the dataset by account region, broadcast-joined a small currency table by hand, written a custom map function to filter inactive accounts before the join, and added a .persist() call at exactly the right stage boundary to avoid redundant recomputation. The job ran in eleven minutes. It was, by any measure, well-tuned.

A junior engineer, new to Spark and unaware of the senior's work, was given the same task as a training exercise. She wrote three lines using the DataFrame API: read the source data, filter where status == 'active', and join to the currency table. She ran df.explain() out of curiosity, saw something about "PushedFilters" and "BroadcastHashJoin," shrugged, and submitted. The job ran in nine minutes.

The senior engineer's reaction — confusion followed by grudging acceptance — is the exact moment Catalyst becomes worth understanding.

What happened was not luck or coincidence. Catalyst, Spark's built-in query optimizer, looked at the junior engineer's declarative intent ("I want rows where status is active, joined to the currency table") and automatically applied every optimization the senior had implemented by hand: it pushed the filter before the join so fewer rows participated in the shuffle, detected that the currency table was small enough to broadcast, and pruned unneeded columns from the scan. It did all of this without being asked, in milliseconds, before execution began.

This post explains exactly how Catalyst does that — and, more importantly, where its limits are, because understanding those limits is what separates an engineer who writes fast Spark jobs from one who writes jobs that happen to be fast today but break under production load.


🔍 DataFrames vs RDDs: Schema, Types, and What Makes Catalyst Possible

Before Catalyst can optimize anything, Spark needs to understand what the data looks like. That is the key distinction between the RDD API and the DataFrame API, and it is the entire reason Catalyst exists.

An RDD (Resilient Distributed Dataset) is a distributed collection of opaque JVM objects. Spark knows that an RDD contains objects, but it has no visibility into what those objects contain — their field names, types, or null constraints. When you call .map() on an RDD, Spark executes that function as a black box. It cannot reorder it, merge it with the next operation, or push it closer to the data source. It simply runs your code.

A DataFrame is a distributed dataset organized into named, typed columns. Every DataFrame has a schema — a description of column names and their data types. Because Spark can see inside the data structure, it can make intelligent decisions: it knows that selecting only name and age from a 50-column CSV means it can skip reading the other 48 columns entirely; it knows that a filter on age > 30 can be applied before a join so fewer rows participate in the expensive shuffle step.

This schema awareness is the prerequisite for everything Catalyst does.

Creating DataFrames and Defining Schemas

DataFrames can be created from files, JDBC connections, in-memory collections, or Spark SQL queries. When reading from a structured format like Parquet, Spark infers the schema from embedded metadata. For CSV or JSON files, schema inference reads a sample of the data, which adds startup cost and can produce incorrect types for edge-case values.

The preferred approach for production pipelines is to declare the schema explicitly using StructType and StructField. A StructType is an ordered list of StructField definitions, where each field carries a name, a DataType, and a nullable flag:

StructField ComponentPurpose
nameColumn identifier used in query plans
dataTypeSpark SQL type: StringType, IntegerType, TimestampType, ArrayType, etc.
nullableWhether Spark should expect null values; false enables tighter optimizations

Declaring nullable=False on non-null columns is not just documentation — Catalyst uses nullability information when generating filter predicates. If a column is non-null, Catalyst can skip the isnotnull() check that it would otherwise inject automatically, saving a comparison per row across potentially billions of rows.

The Basic DataFrame Operations That Feed into Catalyst

The DataFrame API provides a set of transformations that are lazy — they build up a query plan but do not trigger execution. The primary operations are:

  • select() — choose a subset of columns or derive computed columns; Catalyst uses this to build the column pruning list
  • filter() / where() — add row-level predicates; Catalyst lifts these as early in the plan as possible
  • groupBy() + agg() — express aggregations; Catalyst selects between hash aggregation and sort aggregation based on data characteristics
  • join() — combine two DataFrames; Catalyst chooses between BroadcastHashJoin, SortMergeJoin, and ShuffleHashJoin based on table sizes and the join key

None of these operations run immediately. Every call adds a node to a tree data structure called the logical plan. Execution only begins when you call an action like .count(), .collect(), or .write.


⚙️ The Four Phases Catalyst Uses to Rewrite Your Query Before It Runs

Catalyst is not a single algorithm. It is a pipeline of four distinct phases, each operating on a tree representation of the query. Understanding what each phase does — and what it can change — explains every performance characteristic you will observe in production.

Phase 1: Parsing → Unresolved Logical Plan

The first phase converts your DataFrame API calls (or SQL string) into an Unresolved Logical Plan. This is a tree of LogicalPlan nodes that captures your intent: "select these columns, filter on this condition, join to this table." At this stage, column references are unresolved — employees.age is just a symbol; Spark does not yet know its data type or whether it actually exists in the schema. Attempting to reference a nonexistent column produces an error in the next phase, not this one.

Phase 2: Analysis → Analyzed Logical Plan

The Analyzer walks the unresolved tree and resolves every column reference against the Catalog — Spark's metadata registry for tables, views, and registered DataFrames. It assigns unique IDs to each column reference (e.g., age#1 rather than age), resolves data types, checks for type compatibility in expressions, and resolves function names. After this phase, every node in the plan is fully typed and column references are concrete.

The Analyzer also handles implicit casts: if you filter on age > "30" (a string literal instead of an integer), the Analyzer either inserts a cast expression or raises an AnalysisException depending on whether the cast is safe.

Phase 3: Logical Optimization → Optimized Logical Plan

This is where Catalyst earns its reputation. The Optimizer applies a library of algebraic rewrite rules to the analyzed plan. Each rule is a tree transformation that replaces one subtree pattern with a semantically equivalent but more efficient pattern. Rules fire repeatedly in fixed-point iterations until no more rules apply.

Two rules dominate performance in real workloads:

Predicate Pushdown relocates filter operations as close to the data source as possible. If you filter after a join, Catalyst rewrites the plan to filter before the join so fewer rows participate in the expensive shuffle. For Parquet and ORC sources, the filter is pushed all the way into the file reader, which uses column statistics and bloom filters stored in the file metadata to skip entire row groups without reading them.

Column Pruning identifies which columns are actually needed by the final query and removes all others from the scan. If your table has 80 columns and your query needs 5, the physical plan only reads 5. For columnar formats like Parquet, this is a direct I/O reduction — unneeded columns are never read from disk.

Additional rules include: constant folding (age + 0 becomes age), null propagation, boolean simplification, subquery elimination, and join reordering using the Cost-Based Optimizer (CBO) when table statistics are available.

Phase 4: Physical Planning and Code Generation

The Physical Planner takes the optimized logical plan and generates one or more Physical Plans — concrete execution strategies that reference actual Spark operators like SortMergeJoinExec or BroadcastHashJoinExec. When multiple strategies are possible, the CBO selects the lowest-cost option based on row count statistics collected with ANALYZE TABLE.

The selected physical plan is then handed to Whole-Stage Code Generation (WSCG), which compiles an entire pipeline of operators into a single JVM method. Instead of executing each operator in turn — which requires materializing intermediate rows in memory — WSCG fuses the operators into a tight loop that processes one row from ingestion through all transformations to output without creating intermediate objects. This eliminates virtual function dispatch overhead and dramatically improves CPU cache utilization.


🧠 Inside Catalyst: AST Rules, Cost Models, and Code Generation

The Internals of Catalyst: TreeNode, RuleExecutor, and Transformation Rules

Every entity in Catalyst — expressions, operators, plans — is a subclass of TreeNode. A TreeNode is an immutable algebraic tree node with a list of children. The expression age > 30 is an GreaterThan(UnresolvedAttribute("age"), Literal(30)) tree. The SELECT clause is a Project node whose children are the column expressions. The FROM clause is a Relation node. The WHERE clause is a Filter node wrapping the Project.

The RuleExecutor drives optimization. It holds a sequence of batches, where each batch contains a set of Rule[LogicalPlan] instances and a strategy (Once or FixedPoint). A FixedPoint strategy applies all rules in the batch repeatedly until the plan stops changing — which guarantees convergence but requires each rule to be idempotent. A Once strategy fires each rule exactly once, used for rules that are expensive or cannot converge.

To see how predicate pushdown is implemented: the PushDownPredicates rule pattern-matches on any Filter node that sits above a Join node. When it finds this pattern, it inspects the filter predicate to determine which side of the join it references. A predicate that only uses columns from the left table gets pushed below the join to the left child; one that only uses right-table columns moves to the right child; one that references both sides stays at the join level. This single rule eliminates a full shuffle of filtered-out rows.

When statistics are available — either computed via ANALYZE TABLE or inferred from Parquet footer metadata — the CBO uses a Selectivity Model to estimate output row counts after each filter or join. Join ordering then becomes a cost minimization problem: given N tables to join, what order minimizes the estimated total data movement? Spark uses a dynamic programming approach similar to the classic System R optimizer when CBO is enabled via spark.sql.cbo.enabled.

Performance Analysis: Reading explain() Output and Understanding Codegen

The single most powerful diagnostic tool Spark provides is df.explain(True) (or df.explain(mode="extended")). It prints all four plan stages: Parsed, Analyzed, Optimized, and Physical. Reading the Physical Plan from bottom to top tells you exactly what Spark will do when the action fires.

The Physical Plan annotates operators with *(N) where N is the code generation stage ID. All operators sharing the same stage ID are fused by WSCG into a single compiled loop. An operator without a stage annotation is interpreted — typically because it involves a complex expression type that WSCG cannot inline, such as a Python UDF.

The PushedFilters field in a FileScan node confirms that predicate pushdown reached the data source layer. If your filter is present in PushedFilters, Spark's Parquet reader will use column statistics in the Parquet footer to skip row groups that cannot satisfy the filter — a dramatic I/O reduction on large datasets with skewed value distributions.

Adaptive Query Execution (AQE) interacts with Catalyst plans at runtime. AQE can re-optimize the physical plan mid-execution using statistics collected from completed shuffle stages. The most impactful AQE intervention is dynamically switching a SortMergeJoin to a BroadcastHashJoin when AQE detects at runtime that one side of the join is smaller than spark.sql.autoBroadcastJoinThreshold — an optimization that Catalyst's static planning could not make if statistics were unavailable at plan time. When AQE fires a plan change, df.explain() output will show an AdaptiveSparkPlan root node.

The difference between an interpreted operator and a code-generated one is not cosmetic. Benchmarks consistently show 2–5x throughput improvement from WSCG for CPU-bound operations, primarily because fused loops eliminate the cost of creating intermediate row objects and enable the JIT compiler to optimize the full pipeline as a single method.


📊 Tracing a DataFrame Query Through the Full Catalyst Pipeline

The diagram below shows the complete path a DataFrame API call travels through Catalyst before execution. Read it top to bottom: each box represents a phase or component, and the arrows show data flow between them. The key insight the diagram illustrates is that by the time execution begins, the query has been transformed three times — and each transformation is an opportunity for the optimizer to reduce work.

graph TD
    A[DataFrame API Call or SQL String] --> B[Parser]
    B --> C[Unresolved Logical Plan]
    C --> D[Analyzer: Catalog and Type Resolution]
    D --> E[Analyzed Logical Plan]
    E --> F[RuleExecutor: Optimization Passes]
    F --> G[Optimized Logical Plan]
    G --> H[Physical Planner]
    H --> I[Physical Plan Candidates]
    I --> J[CBO: Cost Model Selection]
    J --> K[Selected Physical Plan]
    K --> L[Whole-Stage Code Generation]
    L --> M[JVM Bytecode]
    M --> N[Executor: Distributed Execution]

Notice that the path from your API call to actual execution passes through the Analyzer (which resolves against the Catalog), the RuleExecutor (which rewrites the tree), and WSCG (which compiles to bytecode). The Catalog lookup is why Spark SQL registered views participate in exactly the same optimization path as DataFrame API calls — both routes converge on the Analyzed Logical Plan step. The RuleExecutor box is where column pruning and predicate pushdown live; by the time the plan reaches the Physical Planner, filtered and pruned columns are already gone from the plan tree.


🌍 Where Catalyst Makes the Real Difference: SQL Views, UDFs, and the Dataset API

Spark SQL and DataFrames Share One Optimizer

When you register a DataFrame as a temporary view with createOrReplaceTempView("employees") and then query it with spark.sql("SELECT name, age FROM employees WHERE age > 30"), you are not taking a different code path. The SQL string is parsed into the same LogicalPlan tree as the equivalent DataFrame API chain. The Analyzer resolves the view's schema from the Catalog, and the same optimization rules fire. This is why Spark SQL queries and DataFrame API queries are performance-equivalent — they are literally the same thing after the parsing phase.

This equivalence is architecturally significant: it means you can choose between SQL strings and the DataFrame API based on team familiarity and readability, not performance characteristics.

The UDF Problem: Black Boxes in a Transparent Pipeline

Python UDFs are the most common Catalyst optimization killer in production Spark jobs. When you register a Python function as a UDF, Catalyst treats it as an opaque black box. It cannot inspect the function's logic, apply algebraic rules to it, or push it before a join. Worse, Python UDFs require serializing each row from the JVM to a Python process via Apache Arrow or raw pickle, executing the Python function, and deserializing the result back. This inter-process serialization cost can reduce throughput by an order of magnitude compared to equivalent built-in functions.

The rule is simple: any operation expressible using Spark's built-in functions should use them. Built-in functions like to_date(), regexp_extract(), when()/otherwise(), explode(), and the full set of SQL aggregate functions are implemented in Scala, visible to Catalyst, and eligible for code generation. A Python UDF doing the same work will not be pruned, cannot be pushed down, and breaks the WSCG fusion boundary — every operator touching the UDF's output becomes interpreted.

Scala and Java UDFs are better than Python UDFs because they eliminate the cross-language serialization overhead, but they are still black boxes to the optimizer. The rule holds: built-in functions first.

Dataset[T]: Type Safety at the Cost of Some Optimizations

Spark's Dataset[T] API provides compile-time type safety by encoding your schema as a JVM type parameter. A Dataset[Employee] gives you IDE autocompletion for field names and catches column-name typos at compile time instead of at runtime. Catalyst can still optimize most Dataset operations because the encoder that maps JVM objects to Spark's internal row format is schema-aware.

However, Dataset encoder/decoder operations at API boundaries introduce some overhead and can interrupt WSCG fusion. In practice, the difference is small for most workloads, and type safety in large production codebases is worth the tradeoff.


⚖️ DataFrame vs Dataset vs RDD: Choosing the Right Abstraction for the Job

Each Spark API layer offers a different contract. Choosing incorrectly is not merely a style problem — it has direct performance and maintainability consequences.

APIType SafetyCatalyst OptimizedSerializationBest For
DataFrameNo (runtime schema)FullNone (internal Row)ETL, SQL-style transformations, cross-language portability
Dataset[T]Yes (compile-time)Full for most opsEncoder at boundariesScala/Java applications needing type-safe domain models
RDDYes (JVM type)NoneFull Java serializationCustom partitioning logic, iterative graph algorithms, legacy code

The primary failure mode is using RDDs where DataFrames would work, because engineers mistake familiarity with the functional .map() / .filter() API for performance. An RDD .filter() is a black box to Catalyst; a DataFrame .filter() becomes a pushed predicate. The performance gap grows with data volume because predicate pushdown saves I/O, not just CPU.

The secondary failure mode is writing Python UDFs where built-in functions exist. The next section on the decision guide makes this concrete.

When Predicate Pushdown Fails

Predicate pushdown is not guaranteed. Several patterns defeat it:

  • Filters on derived columns created by non-trivial expressions may not translate to storage-level predicates. Parquet pushdown supports equality, range, and null checks but not arbitrary expressions.
  • Filters after a UDF call stay above the UDF in the plan and are never pushed to the source.
  • Dynamic partition pruning depends on broadcast join results, so it only applies when the join partner is small enough to broadcast.

Understanding these boundaries explains why df.explain() is not optional for any Spark job that runs at scale — it is the only way to verify that the optimizations you expect are actually happening.


🧭 Choosing Between SQL Strings, DataFrame API, Dataset[T], and Built-in Functions

The right API choice depends on context. Use this table as a decision framework, not a rigid rule:

SituationRecommendationReason
Ad-hoc exploration, SQL-fluent teamspark.sql() with SQL stringsReadable, identical optimizer path, easy to share
Production ETL in Python/ScalaDataFrame APIComposable, programmatic, full Catalyst optimization
Scala/Java with strict type contractsDataset[T]Compile-time safety, mostly equivalent performance
Custom transformation expressible in SQLBuilt-in functionsCatalyst can see inside, eligible for pushdown and WSCG
Custom transformation not in built-insScala/Java UDF > Python UDFAvoid Python serialization; accept optimizer blindness
Complex graph traversal or iterative MLRDD APINo columnar benefit; custom partitioning required
Statistical joins with known table sizesEnable CBO + ANALYZE TABLECBO selects optimal join strategy; prevents mis-estimation

The most consequential choice in production is the last row in the "when to UDF" decision: before writing a Python UDF, check whether the transformation can be expressed as a combination of when(), regexp_extract(), transform() (for arrays), explode(), higher-order functions, or any of the ~350 built-in Spark SQL functions. If it can, the built-in version will be meaningfully faster at scale.


🧪 Reading a Real explain(True) Output: Walking Through Predicate Pushdown in Action

The following scenario demonstrates what happens when Catalyst successfully pushes a filter into a Parquet scan. The setup is a DataFrame that reads an employee Parquet table, selects three columns, and filters for employees older than 30. Without looking at the explain output, you might assume the filter runs after the full table is read. The explain output proves otherwise — and understanding the difference between "DataFilters" and "PushedFilters" is the key diagnostic skill.

== Parsed Logical Plan ==
'Filter ('age > 30)
+- 'Project ['name, 'age, 'department]
   +- 'UnresolvedRelation [employees]

== Analyzed Logical Plan ==
name: string, age: int, department: string
Filter (age#1 > 30)
+- Project [name#0, age#1, department#2]
   +- SubqueryAlias employees
      +- Relation [name#0,age#1,department#2] parquet

== Optimized Logical Plan ==
Filter (isnotnull(age#1) AND (age#1 > 30))
+- Relation [name#0,age#1,department#2] parquet

== Physical Plan ==
*(1) Filter (isnotnull(age#1) AND (age#1 > 30))
+- *(1) ColumnarToRow
   +- FileScan parquet [name#0,age#1,department#2]
      Batched: true
      DataFilters: [isnotnull(age#1), (age#1 > 30)]
      Format: Parquet
      PartitionFilters: []
      PushedFilters: [IsNotNull(age), GreaterThan(age,30)]
      ReadSchema: struct<name:string,age:int,department:string>

Walk through each stage to see what Catalyst changed:

Parsed Logical Plan: The UnresolvedRelation [employees] shows that Catalyst has not yet looked up the table. Column references are symbolic ('age, with a tick prefix indicating unresolved). The Project sits above the Filter — this is the order in which the DataFrame API calls were chained, before any reordering.

Analyzed Logical Plan: Column references are resolved to concrete IDs (age#1, name#0). The SubqueryAlias employees wrapper shows the Catalog resolved the relation. Data types are now known.

Optimized Logical Plan: Two critical changes appeared. First, the Project node is gone — Catalyst eliminated it because the Relation now only contains the three required columns, making the explicit projection redundant. Second, isnotnull(age#1) was injected automatically because Catalyst knows that age > 30 is false for null values; adding the null check explicitly allows the Parquet reader to skip null-heavy row groups earlier. Third, the Filter moved below the Project and sits directly above the Relation — predicate pushed as close to the source as possible.

Physical Plan: The *(1) prefix on both operators confirms Whole-Stage Code Generation will fuse them. The FileScan line is the critical diagnostic: PushedFilters: [IsNotNull(age), GreaterThan(age,30)] confirms the filter was handed to Spark's Parquet reader. This means the Parquet reader will consult the column statistics in the file footer and skip any row group where max(age) <= 30 without reading those rows at all. On a 100 GB Parquet file where 80% of employees are age 30 or under, this is the difference between reading 20 GB and reading 100 GB.

ReadSchema: struct<name:string,age:int,department:string> confirms column pruning: only three fields are in the read schema, regardless of how many columns the full table contains.


🛠️ Apache Spark SQL Config Reference: Tuning Catalyst's Behavior in Production

Catalyst's behavior is configurable through Spark SQL properties. The four most impactful settings govern broadcast thresholds, cost-based optimization, adaptive execution, and code generation:

# Broadcast threshold: tables smaller than this value are automatically broadcast
# Default: 10MB. Increase carefully — each executor must hold the broadcast in memory.
spark.sql.autoBroadcastJoinThreshold=10485760

# Enable the Cost-Based Optimizer. Requires ANALYZE TABLE to collect statistics.
# Without CBO, join strategies are selected by rule of thumb (size-based heuristics only).
spark.sql.cbo.enabled=true

# Enable Adaptive Query Execution. Re-optimizes plans at runtime using shuffle statistics.
# Enables dynamic broadcast join switching and skew join handling. Default: true in Spark 3.x.
spark.sql.adaptive.enabled=true

# Enable Whole-Stage Code Generation. Fuses operator pipelines into single JVM methods.
# Disable only for debugging — WSCG is the primary throughput driver for CPU-bound operations.
spark.sql.codegen.wholeStage=true

For AQE to dynamically switch joins, ensure spark.sql.adaptive.localShuffleReader.enabled is also true (the default). AQE collects post-shuffle statistics after each stage completes and rechecks whether a BroadcastHashJoin would now be cheaper than the originally planned SortMergeJoin.

To enable CBO effectively: run ANALYZE TABLE employees COMPUTE STATISTICS FOR ALL COLUMNS on your registered tables or Delta Lake tables before submitting jobs that involve multi-table joins. Without fresh statistics, CBO falls back to the same heuristics as when disabled.

For a complete deep-dive on how these settings interact with the DAG Scheduler and executor memory, see the companion post on Spark Architecture: Driver, Executors, DAG Scheduler.


📚 Lessons Learned from Catalyst Optimization in Production

Always run explain() before profiling with Spark UI. The explain output tells you in seconds whether the optimizations you expect are happening. If PushedFilters is empty when you expect pushdown, you will never find the root cause by looking at stage durations.

UDFs are a tax, not a feature. Every Python UDF in a production pipeline is a performance debt. When a new UDF is proposed, the first question should be "can this be expressed using built-in functions?" The second question should be "can we rewrite this in Scala?" Python UDFs should be a last resort, not a convenience.

Schema-on-read is for exploration, not production. Allowing Spark to infer schema at job startup adds latency, risks inferring incorrect types from edge-case values, and produces non-deterministic schemas when file contents change. Declaring schema explicitly with StructType/StructField makes jobs deterministic and enables the nullability optimizations described earlier.

CBO requires fresh statistics. Enabling spark.sql.cbo.enabled without running ANALYZE TABLE is a no-op for join ordering. CBO with stale statistics can actually choose worse plans than pure heuristics. Make statistics collection part of your data ingestion pipeline, not an ad-hoc step.

Whole-Stage Code Generation breaking can be a diagnostic signal. If explain output shows an operator without a *(N) prefix in the physical plan, something in that pipeline is not code-generated. Common causes include unsupported expression types, certain window functions, and Python UDFs. Identifying which operator breaks the WSCG chain often reveals a rewrite opportunity.

AQE does not replace good upfront planning. AQE's dynamic broadcast switching is powerful for workloads where partition cardinalities are hard to predict at planning time, but it cannot fix a fundamentally unbalanced data model. Heavily skewed join keys — where one key value represents 30% of the dataset — require explicit salting strategies that AQE cannot auto-apply.


📌 TLDR

TLDR: Catalyst transforms every DataFrame operation or SQL string through four phases — parsing, analysis, logical optimization, and physical planning — before generating JVM bytecode via Whole-Stage Code Generation. Column pruning and predicate pushdown are free because they are algebraic rewrite rules applied to a tree representation of your query, not manual optimizations you implement. UDFs break the optimizer because they are opaque: Catalyst cannot see inside them, push them toward the data source, or fuse them into code-generated pipelines. Run df.explain(True) and verify PushedFilters and *(N) prefixes to confirm you are getting the optimizations you expect. Use built-in functions over UDFs, declare schemas explicitly, enable CBO with ANALYZE TABLE, and let AQE handle runtime join strategy adjustments.


📝 Practice Quiz

Test your understanding of Catalyst and the DataFrame optimizer.

  1. You write a DataFrame query that filters on status == 'active' and then joins to a second table. After running df.explain(True), you notice the Filter appears above the Join in the optimized logical plan rather than below it. What does this tell you, and what should you investigate?
Answer Correct Answer: The filter predicate references columns from both sides of the join, which prevents Catalyst from pushing it below the join. When a filter expression contains references to columns from the left table AND the right table (for example, left.status = right.valid_status), the filter cannot move below the join because both inputs are required to evaluate it. Investigate whether the filter can be decomposed into two separate single-table filters, which can each be pushed to their respective sides.
  1. You enable spark.sql.cbo.enabled=true but observe no change in join strategy between SortMergeJoin and BroadcastHashJoin for a table you believe is small enough to broadcast. What is the most likely cause?
Answer Correct Answer: The CBO has no statistics to work with. Enabling CBO does not automatically collect table statistics — it only enables the cost-based optimizer to use statistics when they exist. Without running ANALYZE TABLE <table> COMPUTE STATISTICS FOR ALL COLUMNS, CBO falls back to size-based heuristics identical to what it uses when disabled. Run the ANALYZE TABLE command and re-run explain to confirm the strategy changes.
  1. Your explain output shows *(1) Filter and *(1) FileScan operators in the Physical Plan. A teammate adds a Python UDF to the pipeline and re-runs explain. The filter now shows Filter without a *(N) prefix. Why did this happen, and what is the performance consequence?
Answer Correct Answer: The Python UDF broke the Whole-Stage Code Generation (WSCG) fusion boundary. WSCG fuses multiple operators into a single compiled JVM method. Python UDFs cannot be inlined into the generated code because they execute in a separate Python process — the JVM must serialize each row, send it to Python, wait for the result, and deserialize it back. This cross-process boundary prevents WSCG from fusing any operator that depends on the UDF's output. The *(N) prefix disappears because those operators are now interpreted, not compiled. The performance consequence is 2–10x throughput reduction for that pipeline stage depending on UDF complexity.
  1. What is the difference between DataFilters and PushedFilters in a FileScan node in the Physical Plan? Why does PushedFilters represent a more powerful optimization?
Answer Correct Answer: DataFilters are filters applied by Spark's row-processing pipeline after reading data from the file — they eliminate rows that did not satisfy the predicate but only after those rows were read from disk into memory. PushedFilters are filters that have been handed to the file format reader itself (e.g., Spark's Parquet reader). The Parquet reader uses column statistics stored in the Parquet footer to skip entire row groups — blocks of ~128MB — that cannot contain matching rows, without reading them from disk at all. PushedFilters is therefore an I/O reduction, while DataFilters is only a CPU reduction. On large datasets with selective filters, PushedFilters can reduce I/O by 80–90%, whereas DataFilters without PushedFilters reads the full file before filtering.
  1. You have a Spark pipeline that reads a 500 GB Parquet table, applies three transformations, and writes the output. After enabling spark.sql.adaptive.enabled=true, you observe that a SortMergeJoin was replaced mid-execution with a BroadcastHashJoin. The job completes 40% faster. What AQE mechanism caused this, and what prerequisite condition made it possible?
Answer Correct Answer: AQE's dynamic join coalescing re-evaluated the physical plan after the first shuffle stage completed. When the shuffle stage finished, AQE read the partition statistics from the completed shuffle — specifically the total output bytes for each join side — and determined that one join side had shrunk below spark.sql.autoBroadcastJoinThreshold. At planning time this was not known (statistics were stale or unavailable), so Catalyst chose SortMergeJoin. With post-shuffle statistics, AQE substituted BroadcastHashJoin, which eliminates the second shuffle entirely. The prerequisite is that spark.sql.adaptive.enabled=true and that the join side genuinely fits in executor memory at the broadcast threshold — AQE cannot override memory limits.
  1. Open-ended challenge: You are given a Spark job that runs in 45 minutes on a 200 GB Parquet dataset. Your task is to reduce it to under 15 minutes without adding cluster resources. Describe a systematic approach using the Catalyst tools discussed in this post — explain output, CBO, AQE, and built-in function substitution — to identify the bottlenecks and apply targeted optimizations. What would you check first, what would you change based on each finding, and how would you verify each change was effective?

Abstract Algorithms

Written by

Abstract Algorithms

@abstractalgorithms