Splitting the Monolith: How Three-Stage Streaming Cut Our DBU Costs by Half

This is the follow-up to the April post about the client ingesting 1,000+ Kafka topics on a single Databricks job that could not scale. If you have not read that one, the short version: 640 vCores, 85-95% CPU, still falling behind on 200,000 messages per minute. The fix was architectural, not computational.

The Three-Job Architecture for 1,000 Topics

The redesign split the monolith into three separate Databricks jobs:

Job 1: Topic fan-out and raw landing. A single Structured Streaming job subscribes to all 1,000+ topics. It does exactly one thing: write each message to a raw Delta table with the Kafka metadata and raw Avro bytes. No deserialization. No schema lookup. No merge.

# Job 1: land all topics into a single raw Delta table
raw_stream = spark.readStream     .format("kafka")     .option("kafka.bootstrap.servers", kafka_brokers)     .option("subscribePattern", "plant\..*")    # regex: all plant.* topics
    .option("startingOffsets", "latest")     .load()

raw_landed = raw_stream.select(
    col("topic"),
    col("partition").cast("int"),
    col("offset").cast("long"),
    col("timestamp").alias("kafka_ts"),
    col("value")                        # raw bytes — keep as binary
        .cast("string").alias("raw_payload"),
    current_timestamp().alias("arrived_at")
)

# One raw table for all 1,000+ topics
raw_landed.writeStream     .format("delta")     .outputMode("append")     .option("checkpointLocation", checkpoints["raw"])     .trigger(availableNow=True)     .toTable("raw.all_topics")

This job ran on a 4-worker cluster. CPU utilization: 15-25%. It easily kept pace with the full 1,000-topic throughput because moving bytes requires almost no CPU. Lag stayed at zero.

Job 2: Topic-grouped deserialization. A separate job reads from raw.all_topics, groups by topic name, applies the correct schema for each topic group, deserializes, validates, and writes to topic-specific bronze tables. This is where Schema Registry lookups happen — but once per topic per batch, not once per message.

# Job 2: deserialize by topic group
from pyspark.sql.functions import col, from_json

raw_df = spark.readStream     .format("delta")     .table("raw.all_topics")

def deserialize_batch(batch_df, batch_id):
    # Group by topic — apply schema per topic group
    topics = batch_df.select("topic").distinct().collect()

    for row in topics:
        topic = row["topic"]
        schema = schema_registry.get_schema(topic)   # cached after first lookup
        topic_df = batch_df.filter(col("topic") == topic)
        parsed = topic_df.withColumn("parsed", from_json(col("raw_payload"), schema))

        good = parsed.filter(col("parsed").isNotNull())                      .select(col("parsed.*"), col("arrived_at"), col("kafka_ts"))
        bad  = parsed.filter(col("parsed").isNull())                      .select(col("raw_payload"), col("topic"), col("offset"), col("arrived_at"))

        table_name = f"bronze.{topic.replace('.', '_')}"
        good.write.format("delta").mode("append").option("mergeSchema", "true").saveAsTable(table_name)
        bad.write.format("delta").mode("append").saveAsTable("bronze.dead_letter")

raw_df.writeStream     .foreachBatch(deserialize_batch)     .option("checkpointLocation", checkpoints["bronze"])     .trigger(availableNow=True)     .start()     .awaitTermination()

This job ran on an 8-worker cluster. CPU utilization: 40-60% at peak. It handled deserialization for all 1,000 topics in a single batch pass, with schema lookups amortized per topic per batch rather than per message. The key optimization: instead of deserializing every message in a Kafka micro-batch individually, it grouped all messages by topic and applied the schema once per group. This reduced Schema Registry overhead by approximately 99.9%.

Job 3: Gold layer merge. The merge job read from the bronze tables and merged into the gold layer. This job ran on a 6-worker cluster, triggered every 30 minutes. Merge operations are inherently I/O-bound, and the 30-minute window gave each merge run enough data to amortize the shuffle cost effectively.

The Results

MetricMonolithic JobThree-Stage Architecture
Workers (peak)204 + 8 + 6 = 18
CPU utilization85-95%15-25% / 40-60% / 30-50%
Consumer lagRising, never caught upNear zero on raw ingest job
Annual cluster cost~$100,000+~$38,000
Data freshness (gold)30-60 min (falling behind)30 min (stable, reliable)

Three jobs instead of one. More total workers (18 vs. 20, actually slightly fewer). Far lower CPU utilization on each job. Total cost less than half. Stable consumer lag instead of rising lag.

Why the Cost Fell Even With Three Jobs

The monolithic job needed large, continuously-running workers to handle the combined CPU load of deserialization plus merge. The three-job architecture uses smaller clusters tuned to each workload, running on schedules that match their actual data arrival patterns. The raw ingest cluster runs briefly every few minutes. The deserialize cluster runs for longer but less frequently. The merge cluster runs every 30 minutes and finishes in 10-12 minutes.

Total compute hours are lower because each cluster exists only when its workload is active. Total CPU efficiency is higher because each cluster is doing one thing it is sized for, not three things it is undersized for.

The principle generalizes: if your streaming job has unmanageable CPU load, the fix is rarely "add more workers." It is almost always "do less per message." The way to do less per message is to split the workload into stages that each do one thing. I am here to help if you are working through this for your own pipeline.

Read more