The cluster is running a groupBy job. 19 tasks finish in 30 seconds. One task runs for 8 minutes. The cluster is 95% idle while one executor processes an outsized partition. This is data skew, and it's one of the most common reasons Spark jobs are slow despite adequate cluster resources.
What Causes Skew
Skew happens when data is unevenly distributed across partitions. Common causes:
- Null keys in joins: Spark hashes join keys to determine which partition a row goes to. All nulls hash to the same value and land in the same partition. If 40% of your join key is null, 40% of the data lands on one executor.
- High-frequency values in groupBy keys: If you're grouping by
statusand 80% of rows havestatus = 'active', the 'active' group dominates one partition. - Uneven partition sizes in input data: If your source Parquet files are uneven, reading them produces uneven partitions that propagate through the job.
- Skewed join key values: One customer_id that appears in 10 million rows while most appear in tens of rows.
Identifying Skew in the Spark UI
Go to the Stages tab for a running job. Click into a slow stage. Click "Tasks." Sort by "Duration." If the median task time is 5 seconds and one task is 8 minutes, that's skew. Look at "Shuffle Read Size" per task — the slow task almost certainly processed dramatically more data than the others.
In the DAG view, a stage with very long runtime and normal-looking predecessor stages is a strong skew indicator. The preceding stage finished quickly because the shuffle write was distributed. The current stage is processing one oversized shuffle partition.
Solution 1: Filter Nulls Before Joining
# Join was slow because order_key has many nulls
# Nulls don't join to anything -- filter them before the join
orders_df.filter(F.col("order_key").isNotNull()) .join(products_df, on="order_key", how="inner")
For left outer joins where you want to preserve null-key rows, handle them separately: filter non-null rows and join normally, filter null-key rows and add null columns for the right side, then union the results. More code, faster execution.
Solution 2: Salting for groupBy Skew
For heavily skewed groupBy keys, salting breaks the large group into multiple smaller partitions that can be processed in parallel:
import random
# If 'category' has a dominant value causing skew:
# Step 1: Add a random salt (0-9) to the key
df_salted = df.withColumn(
"salted_category",
F.concat(F.col("category"), F.lit("_"), (F.rand() * 10).cast("int").cast("string"))
)
# Step 2: Aggregate with the salted key
df_partial = df_salted.groupBy("salted_category") .agg(F.sum("amount").alias("partial_sum"))
# Step 3: Strip the salt and do a final aggregation
df_partial.withColumn("category", F.split(F.col("salted_category"), "_").getItem(0)) .groupBy("category") .agg(F.sum("partial_sum").alias("total_amount"))
The dominant category's data is now spread across 10 partitions (salt 0-9). Each of those 10 partitions is processed in parallel, then the partial sums are aggregated in a second, much lighter groupBy stage.
Solution 3: Broadcast the Skewed Join Side
If the skew is on the smaller side of a join and it fits in executor memory, broadcast it. The large side never shuffles, so its skew characteristics don't matter:
# If product_dim is small and has skewed product_id values, broadcast it
large_events.join(F.broadcast(product_dim), on="product_id", how="inner")
Solution 4: Adaptive Query Execution (AQE)
Databricks Runtime 7+ enables Adaptive Query Execution by default. AQE includes a skew join optimization that detects oversized partitions at runtime and automatically splits them:
# AQE is enabled by default in recent Databricks runtimes
# These are the relevant settings:
spark.conf.get("spark.sql.adaptive.enabled") # true
spark.conf.get("spark.sql.adaptive.skewJoin.enabled") # true
spark.conf.get("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes") # default 256MB
AQE monitors partition sizes during shuffle and automatically splits partitions that exceed the threshold. A 2GB partition (8x the 256MB threshold) gets split into 8 partitions processed in parallel. You don't need to salt manually if AQE is handling it — but understanding the manual approach matters when AQE isn't enough or when you're diagnosing why it didn't trigger.
The Metric to Watch
After fixing skew, the task duration distribution in the Spark UI should be much tighter. Median and maximum should be close. A 200x ratio between min and max task time means skew. A 2-3x ratio is normal variance. The goal isn't perfect uniformity — it's eliminating the outlier tasks that extend the stage runtime.