SQL Server DBAs learn early to read execution plans. That skill transfers to Spark — the concepts are the same even if the output looks different. This post is the EXPLAIN decoder ring for T-SQL practitioners.
Getting the Plan
df = spark.read.parquet("abfss://[email protected]/events/")
result = df.filter(F.col("event_type") == "purchase") .groupBy("user_id") .agg(F.sum("amount").alias("total_spend"))
# Simple mode: physical plan only
result.explain()
# Extended mode: logical + optimized logical + physical plans
result.explain(True)
Read the physical plan bottom-up — exactly like SQL Server's actual execution plan, where the rightmost/bottommost operators are the data sources.
Common Physical Plan Operators
FileScan — reading data from files. Look for PushedFilters in the output. A pushed filter means Spark is filtering data at the file reader level before loading rows into memory — equivalent to SQL Server's index seek predicate (not the residual predicate).
== Physical Plan ==
*(1) HashAggregate(keys=[user_id#10], functions=[sum(amount#11)])
+- Exchange hashpartitioning(user_id#10, 200), true, [id=#42] <-- SHUFFLE
+- *(1) HashAggregate(keys=[user_id#10], functions=[partial_sum(amount#11)])
+- *(1) Project [user_id#10, amount#11]
+- *(1) Filter (isnotnull(event_type#12) AND (event_type#12 = purchase))
+- *(1) FileScan parquet [user_id#10,amount#11,event_type#12]
PushedFilters: [IsNotNull(event_type), EqualTo(event_type,purchase)],
...
The Filter node appears above FileScan, but the PushedFilters line in FileScan tells you the predicate was pushed down into the file reader. Parquet files support row-group skipping based on min/max statistics per column — if the filter column's min/max don't overlap the predicate, Spark skips reading that row group entirely.
HashAggregate — distributed aggregation. Notice there are often TWO HashAggregate nodes: one before the Exchange (partial aggregation on each partition before the shuffle) and one after (final aggregation after the shuffle). This is Spark's equivalent of SQL Server's "local aggregation" optimization — aggregate as much as possible before the expensive redistribution.
Exchange — the shuffle. When you see Exchange hashpartitioning(..., 200), that's a shuffle happening, redistributing data into 200 partitions based on the specified key. This is where the network I/O occurs. Count the Exchange nodes in your plan to understand how many shuffles your job triggers.
BroadcastHashJoin vs. SortMergeJoin
-- BroadcastHashJoin means the small side was broadcast (good -- no shuffle of large table)
*(2) BroadcastHashJoin [product_id#5], [product_id#22], Inner, BuildRight
-- SortMergeJoin means both sides shuffled (expected for large-large joins)
*(4) SortMergeJoin [customer_id#1], [customer_id#31], Inner
If you expected a BroadcastHashJoin but see SortMergeJoin, check whether the broadcast table exceeded spark.sql.autoBroadcastJoinThreshold or whether Spark couldn't determine its size before planning.
Whole Stage Code Generation
The *(1) and *(2) prefixes in the plan are stage IDs. The asterisk (*) indicates Whole Stage Code Generation (WSCG) — Spark fused multiple operators into one compiled JVM function, which avoids per-row virtual dispatch overhead. Operators within the same WSCG stage run in a tight loop in generated JVM bytecode. Operators outside WSCG (no asterisk) are interpreted row-by-row.
Python UDFs break WSCG — they require serializing rows from JVM to Python, calling the Python function, and deserializing back. This is why Python UDFs are so much slower than built-in Spark functions and why the guidance is to use built-in functions (pyspark.sql.functions) whenever possible.
What to Look For
When diagnosing slow jobs, focus the plan review on:
- Filter pushdown: Is your predicate appearing in
PushedFilters? If not, why not? (UDFs on filter columns block pushdown.) - Number of Exchange nodes: Each Exchange is a shuffle. More shuffles = more cost. Can any be eliminated with a broadcast join or by pre-partitioning the data?
- Join strategy: BroadcastHashJoin good, SortMergeJoin expected for large-large joins. CartesianProduct (missing join condition) catastrophic.
- Output row count: In the Spark UI's SQL tab, you can see row counts at each operator after execution. Unexpected row explosions (a join producing more rows than either input) usually indicate a missing or incorrect join predicate.
Databricks Runtime 7+ enables Adaptive Query Execution (AQE) by default. AQE re-optimizes the plan at shuffle boundaries based on actual runtime statistics — it can dynamically convert SortMergeJoins to BroadcastHashJoins if the runtime size of a shuffled partition turns out to be small. After execution, the Spark UI SQL tab shows you the final plan including AQE changes.