Apache Spark for Data Engineers: RDDs, DataFrames, and Structured Streaming
Process petabytes with Python: understand Spark's execution model, DataFrame transformations, partitioning strategy, and real-time streaming.
Abstract AlgorithmsTLDR: Apache Spark distributes Python DataFrame jobs across a cluster of executors, using lazy evaluation and the Catalyst query optimizer to process terabytes with the same code that works on gigabytes. Master partitioning, shuffle-awareness, and Structured Streaming and you have a single engine for both batch and real-time pipelines.
π When Pandas Hits the Wall: The 500GB Problem
Picture this: a data engineer writes a clean pandas script that reads 500MB of e-commerce event data, filters it, joins it to a product catalog, and produces a category rollup. It runs in 40 seconds on a laptop. The team ships it to production.
Three months later, the raw file is 500GB. The script runs for two hours and crashes with MemoryError: Unable to allocate 48.3 GiB. The machine simply ran out of RAM.
The instinctive fix β buy a bigger machine β works once. It doesn't work at 5TB or 50TB. And vertical scaling is expensive, single-point-of-failure, and has a hard ceiling. What you actually need is a way to split the problem across many machines so that each machine only needs to process its share.
That is exactly the problem Apache Spark was designed to solve.
Spark lets you express transformations (filter, join, groupBy) in Python using a DataFrame API that looks almost identical to pandas. Underneath, it compiles that plan into a parallel execution graph, splits your data into partitions, ships each partition to a different worker node, and runs the work concurrently. The result lands in distributed storage like S3 or HDFS. Your Python code never changes β the engine handles the distribution.
| Scale | Tool | Why |
| < 1 GB | pandas | Fits in memory; simple API |
| 1 GB β 100 GB | pandas + chunking / Dask | Single machine stretched |
| 100 GB β 100 TB | Apache Spark | Distributed execution, SQL optimizer |
| Streaming events | Spark Structured Streaming | Same DataFrame API, micro-batch or continuous |
π RDDs: Spark's Original Distributed Dataset Abstraction
Before DataFrames, Spark exposed data as Resilient Distributed Datasets (RDDs). An RDD is an immutable, fault-tolerant, distributed collection of records partitioned across a cluster. Three properties define it:
- Immutable: you never modify an RDD in place; every transformation produces a new RDD.
- Distributed: partitions live on different executor nodes and are processed in parallel.
- Resilient: Spark tracks the lineage of transformations, so if a partition is lost (executor crash), it can be recomputed from the source without re-reading the entire dataset.
RDD operations split into two families:
- Transformations (
map,filter,flatMap,reduceByKey) β describe what to compute but do not trigger execution. Spark records them in a DAG. - Actions (
collect,count,saveAsTextFile) β trigger the actual execution of the recorded DAG.
This split is the foundation of Spark's lazy evaluation model. Nothing runs until an action is called.
# RDD-style wordcount β verbose and unoptimized
lines = sc.textFile("s3://bucket/logs/*.txt")
words = lines.flatMap(lambda line: line.split())
pairs = words.map(lambda w: (w, 1))
counts = pairs.reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile("s3://bucket/output/wordcount") # β action triggers execution
RDDs are still available and occasionally necessary (custom serialization, unstructured data, ML pipelines via MLlib). But for most data engineering work, DataFrames have superseded them β and for good reason.
βοΈ Driver, Executors, and the DAG: How Spark Distributes Work
Understanding where your code runs is the most important mental model for a Spark engineer.
Driver: the process running your Python script. It hosts the SparkSession, parses your DataFrame transformations, builds a logical plan, hands it to the Catalyst optimizer, and ultimately sends tasks to executors. The driver is the brain β it does no data crunching.
Executors: JVM processes running on worker nodes. Each executor holds a fraction of the total data (one or more partitions), executes tasks assigned by the driver, and caches intermediate results in memory or on disk. Executors are the muscle.
Partitions: the unit of parallelism. Each partition is an independent slice of the data that one task processes on one executor core. If you have 200 partitions and 50 executor cores, Spark runs 50 tasks in parallel and processes all 200 in 4 rounds.
Lazy Evaluation: Building the Plan Before Touching Data
Every time you call filter(), select(), join(), or groupBy(), Spark does not run anything. It records the transformation in a logical query plan (a DAG of operators). Only when you call an action (write, count, show, collect) does Spark:
- Pass the logical plan through the Catalyst optimizer to produce an optimized physical plan.
- Split the physical plan into stages separated by shuffle boundaries.
- Split stages into tasks (one per partition).
- Schedule tasks on executor cores.
This means Spark can see the entire computation before touching the first byte of data, enabling rewrites that a row-by-row loop could never achieve: predicate pushdown, column pruning, join reordering, and broadcast join selection.
DataFrames vs RDDs: The API Gap
The same word count written with the DataFrame API:
# DataFrame-style β shorter, and Catalyst-optimized
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, col
spark = SparkSession.builder.appName("wordcount").getOrCreate()
df = spark.read.text("s3://bucket/logs/*.txt")
counts = (
df.select(explode(split(col("value"), " ")).alias("word"))
.groupBy("word")
.count()
)
counts.write.parquet("s3://bucket/output/wordcount")
The DataFrame version is shorter, carries schema information, and benefits from the Catalyst optimizer. The RDD version is a black box to the optimizer β it must execute every lambda as-is.
π Visualizing Spark's Execution: From Python Call to JVM Task
The journey from a Python groupBy().count() call to bytes on disk passes through five layers:
flowchart TD
A["π Python DataFrame API\n(filter Β· join Β· groupBy)"] --> B["Logical Plan\n(unresolved operators)"]
B --> C["Catalyst Optimizer\n(predicate pushdown Β· column pruning\njoin reordering)"]
C --> D["Optimized Physical Plan\n(scan Β· hash-aggregate Β· sort-merge-join)"]
D --> E["DAG of Stages\n(split at shuffle boundaries)"]
E --> F["Tasks\n(one per partition)"]
F --> G["Executor Cores\n(JVM Β· Tungsten UnsafeRow)"]
G --> H["π¦ Output\n(S3 / HDFS / Delta Lake)"]
Key insight: stages are separated by shuffles β the most expensive operation in Spark, because shuffle requires moving data across the network. Every groupBy, join (non-broadcast), and repartition triggers a shuffle. Minimizing shuffles is the single most impactful Spark performance tuning lever.
π§ Deep Dive: Catalyst, Tungsten, and How Python DataFrames Become JVM Bytecode
Spark's Internals: DAGScheduler, BlockManager, and Shuffle
When an action fires, the DAGScheduler takes the optimized physical plan and divides it into stages. A new stage begins whenever a shuffle is required. Within a stage, tasks are independent β they can run fully in parallel without exchanging data.
The TaskScheduler assigns tasks to available executor slots (cores Γ threads). Each task reads its assigned partition from the BlockManager, which manages in-memory and on-disk storage on each node. Block replication for fault tolerance and shuffle data caching both flow through BlockManager.
During a shuffle, the writing side (map stage) hashes output rows by the partition key and writes shuffle files to local disk. The reading side (reduce stage) fetches the correct file segment from every mapper node. This network transfer is why groupBy on a high-cardinality column over a 5TB dataset can take 20 minutes: it moves 5TB across the cluster.
The Catalyst optimizer works in four phases:
- Analysis β resolve column names against the schema; throw errors for unknown columns.
- Logical optimization β push
filterbelowjoin, eliminate unused columns. - Physical planning β choose between hash join, sort-merge join, and broadcast join based on table sizes.
- Code generation β emit JVM bytecode for the physical operators.
The Tungsten execution engine replaces JVM object layout with compact binary UnsafeRow format in off-heap memory, avoiding garbage collection pressure and enabling SIMD-friendly memory access patterns. The result: Python DataFrame code compiled to memory-efficient JVM bytecode that can sustain hundreds of millions of rows per second per executor core.
Performance Analysis: Shuffle Cost, Skew, and Partition Sizing
| Factor | Impact | Mitigation |
| Too few partitions | Under-utilizes executor cores; one giant task | repartition(n) before wide transforms |
| Too many partitions | Task scheduling overhead dominates | coalesce(n) before writing output |
| Data skew | One partition 10x larger; that task is the bottleneck | Salt the key; use skewHint; pre-aggregate |
| Shuffle spill | Partition too large for executor memory; spills to disk | Increase spark.executor.memory; reduce partition size |
| Broadcast join threshold | Default 10MB; large tables re-shuffled unnecessarily | Tune spark.sql.autoBroadcastJoinThreshold |
The 128MB rule: the default target partition size is 128MB (controlled by spark.sql.files.maxPartitionBytes). At this size, each task does meaningful work without spilling. When reading Parquet from S3, Spark coalesces file chunks to hit this target automatically. After a wide transform (shuffle), partition count defaults to spark.sql.shuffle.partitions (200) β often far too many for small datasets and too few for large ones. Tune it explicitly.
π§ͺ Processing a 1-Billion-Row E-Commerce Clickstream in PySpark
Example: Category-Level Add-to-Cart Funnel
The dataset: {user_id, product_id, event_type, timestamp} stored as Parquet on S3, partitioned by date. Goal: count add_to_cart events per product category over the last 30 days.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, current_date, date_sub
spark = (
SparkSession.builder
.appName("clickstream-category-funnel")
.config("spark.sql.shuffle.partitions", "400") # tune for ~128 MB per partition
.config("spark.sql.autoBroadcastJoinThreshold", "50mb") # broadcast the small products table
.getOrCreate()
)
# ββ 1. Read partitioned Parquet β Spark only scans the last-30-days date partitions
events = (
spark.read
.parquet("s3://data-lake/clickstream/")
.filter(col("event_type") == "add_to_cart")
.filter(col("date") >= date_sub(current_date(), 30)) # partition pruning
)
# ββ 2. Load the products dimension table (small β fits in broadcast memory)
products = spark.read.parquet("s3://data-lake/products/") # ~200 MB β triggers broadcast join
# ββ 3. Join to get category β broadcast join avoids a shuffle on the products side
enriched = events.join(products, on="product_id", how="inner")
# ββ 4. Aggregate: count add_to_cart events per category
result = (
enriched
.groupBy("category")
.count()
.withColumnRenamed("count", "add_to_cart_events")
.orderBy(col("add_to_cart_events").desc())
)
# ββ 5. Write as partitioned Parquet β downstream jobs can prune by run_date
result.write.mode("overwrite").parquet("s3://data-lake/reports/category_funnel/")
spark.stop()
What's happening under the hood:
filter(col("date") >= ...)triggers partition pruning β Spark only lists and reads the last 30 date-partitioned directories from S3, scanning perhaps 30 TB of a 365-day archive down to 2.5 TB.- The
productstable is small enough for a broadcast join: the driver fetches a copy and pushes it to every executor, eliminating the shuffle entirely. groupBy("category")is a wide transformation β it triggers a shuffle, hashing rows by category key acrossspark.sql.shuffle.partitions(400) output partitions.result.write.parquet(...)is the action that fires the entire DAG. Nothing ran before this line.
ποΈ Partitioning Strategy: The 128MB Rule, repartition vs coalesce, and Column Partitioning
Partition count controls parallelism. Get it wrong and you'll either starve your executors or drown the scheduler.
repartition(n) triggers a full shuffle to produce exactly n evenly distributed partitions. Use it before a wide transform when the current partition count is too low, or to redistribute skewed data.
coalesce(n) merges partitions without a shuffle by combining local partitions on the same executor. Use it to reduce the number of output files before writing β 400 shuffle partitions would otherwise produce 400 tiny Parquet files on S3, creating a small-file problem for downstream readers.
# Bad: 400 shuffle files written directly β small-file problem
result.write.parquet("s3://...")
# Good: coalesce to a reasonable output file count before writing
result.coalesce(10).write.parquet("s3://...")
partitionBy("column") on write: tells Spark to create a directory per column value in the output (e.g., date=2026-03-28/). Downstream jobs that filter by that column only read the matching directories β this is partition pruning. It is one of the highest-leverage optimizations available for columnar formats like Parquet on S3.
# Write with date partitioning β enables downstream partition pruning
result.write.partitionBy("run_date").mode("overwrite").parquet("s3://...")
The heuristic: target 128β256 MB per partition for processing, and 500 MBβ1 GB per output file for storage. Scale spark.sql.shuffle.partitions to (total_data_size_in_bytes / 128_000_000).
π Real-World Applications: Spark SQL and Structured Streaming at Scale
Spark SQL: Running SQL on DataFrames
Any DataFrame can be registered as a temporary view and queried with standard SQL. This is not a conversion β it is the same Catalyst optimizer, the same physical plan. SQL and DataFrame API are interchangeable:
# Register the enriched DataFrame as a SQL-queryable view
enriched.createOrReplaceTempView("clickstream_enriched")
# Run SQL β Catalyst compiles it identically to the DataFrame API
top_categories = spark.sql("""
SELECT category,
COUNT(*) AS add_to_cart_events
FROM clickstream_enriched
WHERE event_type = 'add_to_cart'
GROUP BY category
ORDER BY add_to_cart_events DESC
LIMIT 20
""")
top_categories.show()
Spark SQL integrates with the Hive Metastore: you can define tables with schemas, partitioning, and SerDes, then query them with spark.sql("SELECT ...") without ever specifying a file path. This is how most warehouse-style ETL jobs are structured in practice.
Structured Streaming: The Unbounded DataFrame
Spark's streaming API treats a live data stream as a DataFrame with no end. The schema is declared upfront; the same filter, groupBy, and join operators apply. Spark reads micro-batches from the source (Kafka, Kinesis, S3 landing zone), processes each batch through the DAG, and commits offsets and output atomically.
# Streaming word count from Kafka β same DataFrame API, continuous execution
words = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "broker:9092")
.option("subscribe", "logs-topic")
.load()
.selectExpr("CAST(value AS STRING) AS line")
.selectExpr("explode(split(line, ' ')) AS word")
)
word_counts = words.groupBy("word").count()
query = (
word_counts.writeStream
.format("delta") # Delta Lake for ACID output
.outputMode("update") # emit only changed rows each micro-batch
.option("checkpointLocation", "s3://checkpoints/wordcount/")
.start("s3://data-lake/wordcount/")
)
query.awaitTermination()
Micro-batch vs continuous: by default, Structured Streaming uses micro-batch mode β it collects events for a configurable trigger interval (e.g., 1 minute) and processes them as a batch. Latency is bounded by the trigger interval. Continuous mode (experimental) targets sub-millisecond latency with a persistent polling loop, at the cost of reduced fault tolerance guarantees.
βοΈ Trade-offs & Failure Modes: Data Skew, Shuffle Hell, and Driver OOM
Performance vs. Operational Complexity
Spark delivers horizontal scale but introduces non-trivial operational overhead: cluster sizing, executor memory tuning, shuffle partition selection, and network-intensive shuffle operations all require deliberate configuration. Pandas on a large machine is often faster and simpler for datasets below 50 GB.
Failure Modes
Data skew β one partition is 10Γ larger than the rest because the join key is highly concentrated (e.g., a single user_id generated 40% of events). That task runs while all other tasks finish and sit idle. The fix: add a random salt suffix to the key before groupBy, then aggregate twice.
collect() on a large DataFrame β df.collect() pulls every row to the driver's JVM heap. On a 10 GB DataFrame this kills the driver with an OOM error. Use write to persist results, or take(n) / limit(n).toPandas() for sampling.
Wide transformation cascades β a chain of three join operations without intermediate cache() calls will re-read and re-compute the source data three times. Persist hot DataFrames with df.cache() or df.persist(StorageLevel.MEMORY_AND_DISK).
Shuffle spill to disk β when a shuffle partition exceeds executor memory, Spark spills it to local disk. Disk spill is 10β100Γ slower than in-memory processing. Symptoms: tasks running for minutes instead of seconds; executor disk I/O pegged at 100%. Fix: reduce spark.sql.shuffle.partitions, increase spark.executor.memory, or pre-aggregate before the shuffle.
Mitigation Patterns
- Profile with Spark UI (port 4040): examine stage timelines, task duration distributions, and shuffle read/write sizes.
- Enable Adaptive Query Execution (
spark.sql.adaptive.enabled=true) β Spark dynamically coalesces shuffle partitions and switches to broadcast joins at runtime based on actual data sizes. - Use Delta Lake for output: ACID writes, schema enforcement, and time-travel eliminate the half-written partition problem that corrupts downstream reads.
π οΈ Apache Spark's Architecture: How Python DataFrame Calls Become JVM Bytecode
Apache Spark is a fully open-source distributed compute engine (Apache License 2.0) with four major subsystems working together when you run a PySpark job:
PySpark / Py4J bridge: your Python process communicates with the JVM driver via Py4J sockets. DataFrame operations are serialized as logical plan nodes and passed to the JVM. Data never moves through the Python process β only plan instructions do. This is why PySpark performance is comparable to Scala Spark.
Catalyst query optimizer: a rule-based + cost-based optimizer written in Scala. It applies ~100 rewrite rules (predicate pushdown, constant folding, column pruning) and selects physical join strategies based on table statistics. The same optimizer powers Spark SQL and the DataFrame API β they share the same AST.
Tungsten execution engine: replaces JVM object serialization with UnsafeRow, a binary row format stored off-heap. Tungsten uses whole-stage code generation: instead of interpreting an operator tree row by row, it emits a single fused JVM method that processes a full partition in a tight loop. The result approaches the performance of hand-written C code for CPU-bound operations.
Spark's cluster managers: Spark supports YARN (Hadoop), Kubernetes (container-native), and standalone mode. On cloud platforms (Databricks, EMR, Dataproc), the cluster manager is abstracted β you specify executor count and instance type, and the platform handles scheduling.
For a full treatment of Delta Lake as the output layer for Spark pipelines, see the Big Data Architecture Patterns companion post linked in Related Posts.
π§ Decision Guide: When to Use Spark vs Alternatives
| Situation | Recommendation |
| Use Spark when | Dataset exceeds single-machine RAM; need SQL + streaming from one engine; output goes to a data lake (S3/ADLS/GCS) |
| Avoid Spark when | Dataset fits in pandas (< 50 GB); low-latency sub-second queries required; team lacks Spark ops experience |
| Alternative | Dask for pandas-compatible distributed compute on smaller clusters; DuckDB for fast in-process SQL on files up to ~100 GB; Flink for true continuous streaming (sub-second latency) |
| Edge cases | Spark Structured Streaming for micro-batch streaming (latency > 30s acceptable); use Flink for strict low-latency SLAs; use ClickHouse for OLAP queries on pre-aggregated data |
π Lessons Learned: What Production Spark Teams Get Wrong
Don't use the default shuffle partition count (spark.sql.shuffle.partitions=200). For a 10 GB dataset that's 50 MB per partition β fine. For a 10 TB dataset it's 50 GB per partition β guarantees disk spill. Always set it explicitly based on dataset size.
Don't collect() DataFrames in production jobs. Even a "small" result from a groupBy can be 500 MB if you forgot a filter. Write results to storage and read them back if you need them as pandas.
Don't skip partitionBy on write. Downstream jobs that read the same dataset every day will re-scan the entire history if there's no date partition. A single partitionBy("date") call on write eliminates that cost permanently.
Do enable Adaptive Query Execution (spark.sql.adaptive.enabled=true, default true since Spark 3.2). AQE coalesces small shuffle partitions, handles skew join automatically, and switches from sort-merge to broadcast join when it discovers a table is small at runtime. It requires no manual tuning.
Do cache DataFrames used multiple times in a multi-step job. An un-cached DataFrame referenced in two branches of a DAG will be read and recomputed twice. df.cache() materializes it in executor memory on the first pass.
Do use Parquet with predicate pushdown. Parquet stores column statistics (min/max) in row group metadata. Spark reads this metadata before reading data, skipping entire row groups that can't match a filter. On a well-organized dataset, a filter on a low-cardinality column can reduce I/O by 90%.
π TLDR: Summary & Key Takeaways
- Spark's core model: the Driver builds a DAG of lazy transformations; Executors run tasks in parallel, one per partition. Nothing executes until an action fires.
- DataFrames over RDDs: the DataFrame API carries schema information and routes through the Catalyst optimizer β the same code runs faster with zero RDD changes.
- Lazy evaluation enables whole-plan optimization: Spark rewrites, prunes, and reorders operators before touching the first byte of data. This is why
df.filter(...).select(...)is faster than you'd expect. - Partitioning is the primary performance dial: too few partitions = serial bottleneck; too many = scheduler overhead; skewed partitions = one slow task blocks the stage.
- Structured Streaming extends the same model to real time: treat a Kafka topic as an unbounded DataFrame, apply the same operators, and write to Delta Lake with exactly-once semantics.
- AQE + broadcast join + partition pruning are the three highest-leverage optimizations that require the least code change; enable them before reaching for manual tuning.
- The golden rule for production Spark: never let data reach an executor that doesn't need to be there β filter early, prune partitions, broadcast small tables.
π Practice Quiz
You run
df.filter(...).groupBy("category").count(). When does Spark actually read data from S3?- A) When
filter()is called - B) When
groupBy()is called - C) When
count()is called Correct Answer: C βfilterandgroupByare lazy transformations that build the DAG.count()is an action that triggers execution and causes Spark to read the data.
- A) When
Your Spark job has
spark.sql.shuffle.partitions=200and processes 4 TB of data. Each shuffle partition ends up at ~20 GB, and tasks are spilling to disk. What is the most direct fix?- A) Increase
spark.executor.coresfrom 4 to 8 - B) Increase
spark.sql.shuffle.partitionsto 32000 so each partition is ~128 MB - C) Enable
spark.sql.adaptive.enabled=trueand leave shuffle partitions at 200 Correct Answer: B β Shuffle partition count determines partition size. At 4 TB and 200 partitions, each partition is 20 GB β guaranteed to spill. Setting it to ~32000 brings each partition to ~128 MB, the target size for in-memory processing.
- A) Increase
A Spark streaming job reads from Kafka and aggregates events with a 5-minute tumbling window. After a rolling Kafka broker restart, some windows show only half the expected event count. Which configuration change most directly prevents this from recurring?
- A) Increase
spark.executor.memoryto avoid task OOM - B) Set
outputMode("complete")to recompute all windows on every micro-batch - C) Configure a checkpoint location so Spark can recover offset state and replay missed micro-batches Correct Answer: C β Structured Streaming checkpointing stores Kafka offsets and aggregation state. Without it, a restart causes Spark to start from the latest offset, permanently losing events that arrived during the outage.
- A) Increase
Open-ended challenge: Your PySpark job joins a 2 TB events table to a 500 MB product catalog. After the join, a
groupBy("seller_id").count()runs. You notice one task in the groupBy stage takes 45 minutes while all other tasks finish in 2 minutes. Describe how you would diagnose whether the bottleneck is a skewedseller_idkey or an under-resourced executor, what metrics in the Spark UI you would examine, and what code changes (salting, pre-aggregation, or AQE skew handling) you would apply to resolve it.
π Related Posts

Written by
Abstract Algorithms
@abstractalgorithms
More Posts
Software Engineering Principles: Your Complete Learning Roadmap
TLDR: This roadmap organizes the Software Engineering Principles series into a problem-first learning path β starting with the code smell before the principle. New to SOLID? Start with Single Responsibility. Facing messy legacy code? Jump to the smel...
Machine Learning Fundamentals: Your Complete Learning Roadmap
TLDR: πΊοΈ Most ML courses dive into math formulas before explaining what problems they solve. This roadmap guides you through 9 essential posts across 3 phases: understanding ML fundamentals β mastering core algorithms β deploying production models. ...
Low-Level Design Guide: Your Complete Learning Roadmap
TLDR TLDR: LLD interviews ask you to design classes and interfaces β not databases and caches.This roadmap sequences 8 problems across two phases: Phase 1 (6 beginner posts) builds your core OOP vocabulary through increasingly complex domains; Phase...

LLM Engineering: Your Complete Learning Roadmap
TLDR: The LLM space moves so fast that engineers end up reading random blog posts and never build a mental model of how everything connects. This roadmap organizes 35+ LLM Engineering posts into 7 tra
