All Posts

Apache Spark for Data Engineers: RDDs, DataFrames, and Structured Streaming

Process petabytes with Python: understand Spark's execution model, DataFrame transformations, partitioning strategy, and real-time streaming.

Abstract AlgorithmsAbstract Algorithms
Β·Β·20 min read
Share
Share on X / Twitter
Share on LinkedIn
Copy link

TLDR: Apache Spark distributes Python DataFrame jobs across a cluster of executors, using lazy evaluation and the Catalyst query optimizer to process terabytes with the same code that works on gigabytes. Master partitioning, shuffle-awareness, and Structured Streaming and you have a single engine for both batch and real-time pipelines.


πŸ“– When Pandas Hits the Wall: The 500GB Problem

Picture this: a data engineer writes a clean pandas script that reads 500MB of e-commerce event data, filters it, joins it to a product catalog, and produces a category rollup. It runs in 40 seconds on a laptop. The team ships it to production.

Three months later, the raw file is 500GB. The script runs for two hours and crashes with MemoryError: Unable to allocate 48.3 GiB. The machine simply ran out of RAM.

The instinctive fix β€” buy a bigger machine β€” works once. It doesn't work at 5TB or 50TB. And vertical scaling is expensive, single-point-of-failure, and has a hard ceiling. What you actually need is a way to split the problem across many machines so that each machine only needs to process its share.

That is exactly the problem Apache Spark was designed to solve.

Spark lets you express transformations (filter, join, groupBy) in Python using a DataFrame API that looks almost identical to pandas. Underneath, it compiles that plan into a parallel execution graph, splits your data into partitions, ships each partition to a different worker node, and runs the work concurrently. The result lands in distributed storage like S3 or HDFS. Your Python code never changes β€” the engine handles the distribution.

ScaleToolWhy
< 1 GBpandasFits in memory; simple API
1 GB – 100 GBpandas + chunking / DaskSingle machine stretched
100 GB – 100 TBApache SparkDistributed execution, SQL optimizer
Streaming eventsSpark Structured StreamingSame DataFrame API, micro-batch or continuous

πŸ” RDDs: Spark's Original Distributed Dataset Abstraction

Before DataFrames, Spark exposed data as Resilient Distributed Datasets (RDDs). An RDD is an immutable, fault-tolerant, distributed collection of records partitioned across a cluster. Three properties define it:

  • Immutable: you never modify an RDD in place; every transformation produces a new RDD.
  • Distributed: partitions live on different executor nodes and are processed in parallel.
  • Resilient: Spark tracks the lineage of transformations, so if a partition is lost (executor crash), it can be recomputed from the source without re-reading the entire dataset.

RDD operations split into two families:

  • Transformations (map, filter, flatMap, reduceByKey) β€” describe what to compute but do not trigger execution. Spark records them in a DAG.
  • Actions (collect, count, saveAsTextFile) β€” trigger the actual execution of the recorded DAG.

This split is the foundation of Spark's lazy evaluation model. Nothing runs until an action is called.

# RDD-style wordcount β€” verbose and unoptimized
lines = sc.textFile("s3://bucket/logs/*.txt")
words = lines.flatMap(lambda line: line.split())
pairs = words.map(lambda w: (w, 1))
counts = pairs.reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile("s3://bucket/output/wordcount")   # ← action triggers execution

RDDs are still available and occasionally necessary (custom serialization, unstructured data, ML pipelines via MLlib). But for most data engineering work, DataFrames have superseded them β€” and for good reason.


βš™οΈ Driver, Executors, and the DAG: How Spark Distributes Work

Understanding where your code runs is the most important mental model for a Spark engineer.

Driver: the process running your Python script. It hosts the SparkSession, parses your DataFrame transformations, builds a logical plan, hands it to the Catalyst optimizer, and ultimately sends tasks to executors. The driver is the brain β€” it does no data crunching.

Executors: JVM processes running on worker nodes. Each executor holds a fraction of the total data (one or more partitions), executes tasks assigned by the driver, and caches intermediate results in memory or on disk. Executors are the muscle.

Partitions: the unit of parallelism. Each partition is an independent slice of the data that one task processes on one executor core. If you have 200 partitions and 50 executor cores, Spark runs 50 tasks in parallel and processes all 200 in 4 rounds.

Lazy Evaluation: Building the Plan Before Touching Data

Every time you call filter(), select(), join(), or groupBy(), Spark does not run anything. It records the transformation in a logical query plan (a DAG of operators). Only when you call an action (write, count, show, collect) does Spark:

  1. Pass the logical plan through the Catalyst optimizer to produce an optimized physical plan.
  2. Split the physical plan into stages separated by shuffle boundaries.
  3. Split stages into tasks (one per partition).
  4. Schedule tasks on executor cores.

This means Spark can see the entire computation before touching the first byte of data, enabling rewrites that a row-by-row loop could never achieve: predicate pushdown, column pruning, join reordering, and broadcast join selection.

DataFrames vs RDDs: The API Gap

The same word count written with the DataFrame API:

# DataFrame-style β€” shorter, and Catalyst-optimized
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, col

spark = SparkSession.builder.appName("wordcount").getOrCreate()
df = spark.read.text("s3://bucket/logs/*.txt")
counts = (
    df.select(explode(split(col("value"), " ")).alias("word"))
      .groupBy("word")
      .count()
)
counts.write.parquet("s3://bucket/output/wordcount")

The DataFrame version is shorter, carries schema information, and benefits from the Catalyst optimizer. The RDD version is a black box to the optimizer β€” it must execute every lambda as-is.


πŸ“Š Visualizing Spark's Execution: From Python Call to JVM Task

The journey from a Python groupBy().count() call to bytes on disk passes through five layers:

flowchart TD
    A["🐍 Python DataFrame API\n(filter · join · groupBy)"] --> B["Logical Plan\n(unresolved operators)"]
    B --> C["Catalyst Optimizer\n(predicate pushdown Β· column pruning\njoin reordering)"]
    C --> D["Optimized Physical Plan\n(scan Β· hash-aggregate Β· sort-merge-join)"]
    D --> E["DAG of Stages\n(split at shuffle boundaries)"]
    E --> F["Tasks\n(one per partition)"]
    F --> G["Executor Cores\n(JVM Β· Tungsten UnsafeRow)"]
    G --> H["πŸ“¦ Output\n(S3 / HDFS / Delta Lake)"]

Key insight: stages are separated by shuffles β€” the most expensive operation in Spark, because shuffle requires moving data across the network. Every groupBy, join (non-broadcast), and repartition triggers a shuffle. Minimizing shuffles is the single most impactful Spark performance tuning lever.


🧠 Deep Dive: Catalyst, Tungsten, and How Python DataFrames Become JVM Bytecode

Spark's Internals: DAGScheduler, BlockManager, and Shuffle

When an action fires, the DAGScheduler takes the optimized physical plan and divides it into stages. A new stage begins whenever a shuffle is required. Within a stage, tasks are independent β€” they can run fully in parallel without exchanging data.

The TaskScheduler assigns tasks to available executor slots (cores Γ— threads). Each task reads its assigned partition from the BlockManager, which manages in-memory and on-disk storage on each node. Block replication for fault tolerance and shuffle data caching both flow through BlockManager.

During a shuffle, the writing side (map stage) hashes output rows by the partition key and writes shuffle files to local disk. The reading side (reduce stage) fetches the correct file segment from every mapper node. This network transfer is why groupBy on a high-cardinality column over a 5TB dataset can take 20 minutes: it moves 5TB across the cluster.

The Catalyst optimizer works in four phases:

  1. Analysis β€” resolve column names against the schema; throw errors for unknown columns.
  2. Logical optimization β€” push filter below join, eliminate unused columns.
  3. Physical planning β€” choose between hash join, sort-merge join, and broadcast join based on table sizes.
  4. Code generation β€” emit JVM bytecode for the physical operators.

The Tungsten execution engine replaces JVM object layout with compact binary UnsafeRow format in off-heap memory, avoiding garbage collection pressure and enabling SIMD-friendly memory access patterns. The result: Python DataFrame code compiled to memory-efficient JVM bytecode that can sustain hundreds of millions of rows per second per executor core.

Performance Analysis: Shuffle Cost, Skew, and Partition Sizing

FactorImpactMitigation
Too few partitionsUnder-utilizes executor cores; one giant taskrepartition(n) before wide transforms
Too many partitionsTask scheduling overhead dominatescoalesce(n) before writing output
Data skewOne partition 10x larger; that task is the bottleneckSalt the key; use skewHint; pre-aggregate
Shuffle spillPartition too large for executor memory; spills to diskIncrease spark.executor.memory; reduce partition size
Broadcast join thresholdDefault 10MB; large tables re-shuffled unnecessarilyTune spark.sql.autoBroadcastJoinThreshold

The 128MB rule: the default target partition size is 128MB (controlled by spark.sql.files.maxPartitionBytes). At this size, each task does meaningful work without spilling. When reading Parquet from S3, Spark coalesces file chunks to hit this target automatically. After a wide transform (shuffle), partition count defaults to spark.sql.shuffle.partitions (200) β€” often far too many for small datasets and too few for large ones. Tune it explicitly.


πŸ§ͺ Processing a 1-Billion-Row E-Commerce Clickstream in PySpark

Example: Category-Level Add-to-Cart Funnel

The dataset: {user_id, product_id, event_type, timestamp} stored as Parquet on S3, partitioned by date. Goal: count add_to_cart events per product category over the last 30 days.

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, current_date, date_sub

spark = (
    SparkSession.builder
    .appName("clickstream-category-funnel")
    .config("spark.sql.shuffle.partitions", "400")          # tune for ~128 MB per partition
    .config("spark.sql.autoBroadcastJoinThreshold", "50mb") # broadcast the small products table
    .getOrCreate()
)

# ── 1. Read partitioned Parquet β€” Spark only scans the last-30-days date partitions
events = (
    spark.read
    .parquet("s3://data-lake/clickstream/")
    .filter(col("event_type") == "add_to_cart")
    .filter(col("date") >= date_sub(current_date(), 30))    # partition pruning
)

# ── 2. Load the products dimension table (small β€” fits in broadcast memory)
products = spark.read.parquet("s3://data-lake/products/")   # ~200 MB β€” triggers broadcast join

# ── 3. Join to get category β€” broadcast join avoids a shuffle on the products side
enriched = events.join(products, on="product_id", how="inner")

# ── 4. Aggregate: count add_to_cart events per category
result = (
    enriched
    .groupBy("category")
    .count()
    .withColumnRenamed("count", "add_to_cart_events")
    .orderBy(col("add_to_cart_events").desc())
)

# ── 5. Write as partitioned Parquet β€” downstream jobs can prune by run_date
result.write.mode("overwrite").parquet("s3://data-lake/reports/category_funnel/")

spark.stop()

What's happening under the hood:

  • filter(col("date") >= ...) triggers partition pruning β€” Spark only lists and reads the last 30 date-partitioned directories from S3, scanning perhaps 30 TB of a 365-day archive down to 2.5 TB.
  • The products table is small enough for a broadcast join: the driver fetches a copy and pushes it to every executor, eliminating the shuffle entirely.
  • groupBy("category") is a wide transformation β€” it triggers a shuffle, hashing rows by category key across spark.sql.shuffle.partitions (400) output partitions.
  • result.write.parquet(...) is the action that fires the entire DAG. Nothing ran before this line.

πŸ—οΈ Partitioning Strategy: The 128MB Rule, repartition vs coalesce, and Column Partitioning

Partition count controls parallelism. Get it wrong and you'll either starve your executors or drown the scheduler.

repartition(n) triggers a full shuffle to produce exactly n evenly distributed partitions. Use it before a wide transform when the current partition count is too low, or to redistribute skewed data.

coalesce(n) merges partitions without a shuffle by combining local partitions on the same executor. Use it to reduce the number of output files before writing β€” 400 shuffle partitions would otherwise produce 400 tiny Parquet files on S3, creating a small-file problem for downstream readers.

# Bad: 400 shuffle files written directly β€” small-file problem
result.write.parquet("s3://...")

# Good: coalesce to a reasonable output file count before writing
result.coalesce(10).write.parquet("s3://...")

partitionBy("column") on write: tells Spark to create a directory per column value in the output (e.g., date=2026-03-28/). Downstream jobs that filter by that column only read the matching directories β€” this is partition pruning. It is one of the highest-leverage optimizations available for columnar formats like Parquet on S3.

# Write with date partitioning β€” enables downstream partition pruning
result.write.partitionBy("run_date").mode("overwrite").parquet("s3://...")

The heuristic: target 128–256 MB per partition for processing, and 500 MB–1 GB per output file for storage. Scale spark.sql.shuffle.partitions to (total_data_size_in_bytes / 128_000_000).


🌍 Real-World Applications: Spark SQL and Structured Streaming at Scale

Spark SQL: Running SQL on DataFrames

Any DataFrame can be registered as a temporary view and queried with standard SQL. This is not a conversion β€” it is the same Catalyst optimizer, the same physical plan. SQL and DataFrame API are interchangeable:

# Register the enriched DataFrame as a SQL-queryable view
enriched.createOrReplaceTempView("clickstream_enriched")

# Run SQL β€” Catalyst compiles it identically to the DataFrame API
top_categories = spark.sql("""
    SELECT category,
           COUNT(*) AS add_to_cart_events
    FROM   clickstream_enriched
    WHERE  event_type = 'add_to_cart'
    GROUP  BY category
    ORDER  BY add_to_cart_events DESC
    LIMIT  20
""")
top_categories.show()

Spark SQL integrates with the Hive Metastore: you can define tables with schemas, partitioning, and SerDes, then query them with spark.sql("SELECT ...") without ever specifying a file path. This is how most warehouse-style ETL jobs are structured in practice.

Structured Streaming: The Unbounded DataFrame

Spark's streaming API treats a live data stream as a DataFrame with no end. The schema is declared upfront; the same filter, groupBy, and join operators apply. Spark reads micro-batches from the source (Kafka, Kinesis, S3 landing zone), processes each batch through the DAG, and commits offsets and output atomically.

# Streaming word count from Kafka β€” same DataFrame API, continuous execution
words = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "broker:9092")
    .option("subscribe", "logs-topic")
    .load()
    .selectExpr("CAST(value AS STRING) AS line")
    .selectExpr("explode(split(line, ' ')) AS word")
)

word_counts = words.groupBy("word").count()

query = (
    word_counts.writeStream
    .format("delta")                          # Delta Lake for ACID output
    .outputMode("update")                     # emit only changed rows each micro-batch
    .option("checkpointLocation", "s3://checkpoints/wordcount/")
    .start("s3://data-lake/wordcount/")
)

query.awaitTermination()

Micro-batch vs continuous: by default, Structured Streaming uses micro-batch mode β€” it collects events for a configurable trigger interval (e.g., 1 minute) and processes them as a batch. Latency is bounded by the trigger interval. Continuous mode (experimental) targets sub-millisecond latency with a persistent polling loop, at the cost of reduced fault tolerance guarantees.


βš–οΈ Trade-offs & Failure Modes: Data Skew, Shuffle Hell, and Driver OOM

Performance vs. Operational Complexity

Spark delivers horizontal scale but introduces non-trivial operational overhead: cluster sizing, executor memory tuning, shuffle partition selection, and network-intensive shuffle operations all require deliberate configuration. Pandas on a large machine is often faster and simpler for datasets below 50 GB.

Failure Modes

Data skew β€” one partition is 10Γ— larger than the rest because the join key is highly concentrated (e.g., a single user_id generated 40% of events). That task runs while all other tasks finish and sit idle. The fix: add a random salt suffix to the key before groupBy, then aggregate twice.

collect() on a large DataFrame β€” df.collect() pulls every row to the driver's JVM heap. On a 10 GB DataFrame this kills the driver with an OOM error. Use write to persist results, or take(n) / limit(n).toPandas() for sampling.

Wide transformation cascades β€” a chain of three join operations without intermediate cache() calls will re-read and re-compute the source data three times. Persist hot DataFrames with df.cache() or df.persist(StorageLevel.MEMORY_AND_DISK).

Shuffle spill to disk β€” when a shuffle partition exceeds executor memory, Spark spills it to local disk. Disk spill is 10–100Γ— slower than in-memory processing. Symptoms: tasks running for minutes instead of seconds; executor disk I/O pegged at 100%. Fix: reduce spark.sql.shuffle.partitions, increase spark.executor.memory, or pre-aggregate before the shuffle.

Mitigation Patterns

  • Profile with Spark UI (port 4040): examine stage timelines, task duration distributions, and shuffle read/write sizes.
  • Enable Adaptive Query Execution (spark.sql.adaptive.enabled=true) β€” Spark dynamically coalesces shuffle partitions and switches to broadcast joins at runtime based on actual data sizes.
  • Use Delta Lake for output: ACID writes, schema enforcement, and time-travel eliminate the half-written partition problem that corrupts downstream reads.

πŸ› οΈ Apache Spark's Architecture: How Python DataFrame Calls Become JVM Bytecode

Apache Spark is a fully open-source distributed compute engine (Apache License 2.0) with four major subsystems working together when you run a PySpark job:

PySpark / Py4J bridge: your Python process communicates with the JVM driver via Py4J sockets. DataFrame operations are serialized as logical plan nodes and passed to the JVM. Data never moves through the Python process β€” only plan instructions do. This is why PySpark performance is comparable to Scala Spark.

Catalyst query optimizer: a rule-based + cost-based optimizer written in Scala. It applies ~100 rewrite rules (predicate pushdown, constant folding, column pruning) and selects physical join strategies based on table statistics. The same optimizer powers Spark SQL and the DataFrame API β€” they share the same AST.

Tungsten execution engine: replaces JVM object serialization with UnsafeRow, a binary row format stored off-heap. Tungsten uses whole-stage code generation: instead of interpreting an operator tree row by row, it emits a single fused JVM method that processes a full partition in a tight loop. The result approaches the performance of hand-written C code for CPU-bound operations.

Spark's cluster managers: Spark supports YARN (Hadoop), Kubernetes (container-native), and standalone mode. On cloud platforms (Databricks, EMR, Dataproc), the cluster manager is abstracted β€” you specify executor count and instance type, and the platform handles scheduling.

For a full treatment of Delta Lake as the output layer for Spark pipelines, see the Big Data Architecture Patterns companion post linked in Related Posts.


🧭 Decision Guide: When to Use Spark vs Alternatives

SituationRecommendation
Use Spark whenDataset exceeds single-machine RAM; need SQL + streaming from one engine; output goes to a data lake (S3/ADLS/GCS)
Avoid Spark whenDataset fits in pandas (< 50 GB); low-latency sub-second queries required; team lacks Spark ops experience
AlternativeDask for pandas-compatible distributed compute on smaller clusters; DuckDB for fast in-process SQL on files up to ~100 GB; Flink for true continuous streaming (sub-second latency)
Edge casesSpark Structured Streaming for micro-batch streaming (latency > 30s acceptable); use Flink for strict low-latency SLAs; use ClickHouse for OLAP queries on pre-aggregated data

πŸ“š Lessons Learned: What Production Spark Teams Get Wrong

Don't use the default shuffle partition count (spark.sql.shuffle.partitions=200). For a 10 GB dataset that's 50 MB per partition β€” fine. For a 10 TB dataset it's 50 GB per partition β€” guarantees disk spill. Always set it explicitly based on dataset size.

Don't collect() DataFrames in production jobs. Even a "small" result from a groupBy can be 500 MB if you forgot a filter. Write results to storage and read them back if you need them as pandas.

Don't skip partitionBy on write. Downstream jobs that read the same dataset every day will re-scan the entire history if there's no date partition. A single partitionBy("date") call on write eliminates that cost permanently.

Do enable Adaptive Query Execution (spark.sql.adaptive.enabled=true, default true since Spark 3.2). AQE coalesces small shuffle partitions, handles skew join automatically, and switches from sort-merge to broadcast join when it discovers a table is small at runtime. It requires no manual tuning.

Do cache DataFrames used multiple times in a multi-step job. An un-cached DataFrame referenced in two branches of a DAG will be read and recomputed twice. df.cache() materializes it in executor memory on the first pass.

Do use Parquet with predicate pushdown. Parquet stores column statistics (min/max) in row group metadata. Spark reads this metadata before reading data, skipping entire row groups that can't match a filter. On a well-organized dataset, a filter on a low-cardinality column can reduce I/O by 90%.


πŸ“Œ TLDR: Summary & Key Takeaways

  • Spark's core model: the Driver builds a DAG of lazy transformations; Executors run tasks in parallel, one per partition. Nothing executes until an action fires.
  • DataFrames over RDDs: the DataFrame API carries schema information and routes through the Catalyst optimizer β€” the same code runs faster with zero RDD changes.
  • Lazy evaluation enables whole-plan optimization: Spark rewrites, prunes, and reorders operators before touching the first byte of data. This is why df.filter(...).select(...) is faster than you'd expect.
  • Partitioning is the primary performance dial: too few partitions = serial bottleneck; too many = scheduler overhead; skewed partitions = one slow task blocks the stage.
  • Structured Streaming extends the same model to real time: treat a Kafka topic as an unbounded DataFrame, apply the same operators, and write to Delta Lake with exactly-once semantics.
  • AQE + broadcast join + partition pruning are the three highest-leverage optimizations that require the least code change; enable them before reaching for manual tuning.
  • The golden rule for production Spark: never let data reach an executor that doesn't need to be there β€” filter early, prune partitions, broadcast small tables.

πŸ“ Practice Quiz

  1. You run df.filter(...).groupBy("category").count(). When does Spark actually read data from S3?

    • A) When filter() is called
    • B) When groupBy() is called
    • C) When count() is called Correct Answer: C β€” filter and groupBy are lazy transformations that build the DAG. count() is an action that triggers execution and causes Spark to read the data.
  2. Your Spark job has spark.sql.shuffle.partitions=200 and processes 4 TB of data. Each shuffle partition ends up at ~20 GB, and tasks are spilling to disk. What is the most direct fix?

    • A) Increase spark.executor.cores from 4 to 8
    • B) Increase spark.sql.shuffle.partitions to 32000 so each partition is ~128 MB
    • C) Enable spark.sql.adaptive.enabled=true and leave shuffle partitions at 200 Correct Answer: B β€” Shuffle partition count determines partition size. At 4 TB and 200 partitions, each partition is 20 GB β€” guaranteed to spill. Setting it to ~32000 brings each partition to ~128 MB, the target size for in-memory processing.
  3. A Spark streaming job reads from Kafka and aggregates events with a 5-minute tumbling window. After a rolling Kafka broker restart, some windows show only half the expected event count. Which configuration change most directly prevents this from recurring?

    • A) Increase spark.executor.memory to avoid task OOM
    • B) Set outputMode("complete") to recompute all windows on every micro-batch
    • C) Configure a checkpoint location so Spark can recover offset state and replay missed micro-batches Correct Answer: C β€” Structured Streaming checkpointing stores Kafka offsets and aggregation state. Without it, a restart causes Spark to start from the latest offset, permanently losing events that arrived during the outage.
  4. Open-ended challenge: Your PySpark job joins a 2 TB events table to a 500 MB product catalog. After the join, a groupBy("seller_id").count() runs. You notice one task in the groupBy stage takes 45 minutes while all other tasks finish in 2 minutes. Describe how you would diagnose whether the bottleneck is a skewed seller_id key or an under-resourced executor, what metrics in the Spark UI you would examine, and what code changes (salting, pre-aggregation, or AQE skew handling) you would apply to resolve it.


Abstract Algorithms

Written by

Abstract Algorithms

@abstractalgorithms