Multi-Stage Kafka Pipelines on Databricks: Raw, Bronze, Gold as Separate Jobs

I have described the three-stage streaming architecture in a few posts now — raw ingest, deserialize, merge. This post is the implementation detail: what it actually looks like in Databricks notebooks and Workflows, with Delta Lake as the intermediate store between stages.

The Target Architecture

Three notebooks, three Databricks jobs, three Delta tables:

  • Job 1: Kafka → raw.sensor_readings (binary/string payload, Kafka metadata)
  • Job 2: raw.sensor_readingsbronze.sensor_readings (parsed, validated, dead letters)
  • Job 3: bronze.sensor_readingsgold.sensor_readings (merged, deduplicated, SCD-ready)

Job 1: Kafka to Raw Delta

# Notebook: /Pipelines/Sensor/01_raw_ingest
from pyspark.sql.functions import col, current_timestamp

raw_stream = spark.readStream     .format("kafka")     .option("kafka.bootstrap.servers", spark.conf.get("pipeline.kafka.brokers"))     .option("subscribe", "sensor_readings")     .option("kafka.security.protocol", "SASL_SSL")     .option("kafka.sasl.mechanism", "PLAIN")     .option("kafka.sasl.jaas.config",
            f'org.apache.kafka.common.security.plain.PlainLoginModule required '
            f'username="$ConnectionString" password="{dbutils.secrets.get("kafka", "connection_string")}";')     .option("startingOffsets", "latest")     .option("failOnDataLoss", "false")     .load()

raw_landed = raw_stream.select(
    col("topic"),
    col("partition").cast("int"),
    col("offset").cast("long"),
    col("timestamp").alias("kafka_ts"),
    col("value").cast("string").alias("raw_payload"),
    current_timestamp().alias("arrived_at")
)

raw_landed.writeStream     .format("delta")     .outputMode("append")     .option("checkpointLocation", spark.conf.get("pipeline.checkpoint.raw"))     .trigger(availableNow=True)     .toTable("raw.sensor_readings")

Job 2: Raw to Bronze (Deserialize)

# Notebook: /Pipelines/Sensor/02_deserialize
from pyspark.sql.functions import col, from_json, current_timestamp, lit
from pyspark.sql.types import StructType, StringType, DoubleType, LongType

SENSOR_SCHEMA = StructType()     .add("sensor_id", StringType())     .add("value", DoubleType())     .add("unit", StringType())     .add("ts_ms", LongType())

# Read new raw records since last checkpoint
raw_stream = spark.readStream     .format("delta")     .option("checkpointLocation", spark.conf.get("pipeline.checkpoint.bronze"))     .table("raw.sensor_readings")

def parse_and_route(batch_df, batch_id):
    parsed = batch_df.withColumn("parsed", from_json(col("raw_payload"), SENSOR_SCHEMA))

    # Good records: all required fields present and non-null
    good = parsed.filter(
        col("parsed.sensor_id").isNotNull() &
        col("parsed.value").isNotNull() &
        col("parsed.unit").isNotNull() &
        col("parsed.ts_ms").isNotNull() &
        (col("parsed.value").between(-100, 10000))
    ).select(
        col("parsed.sensor_id"),
        col("parsed.value"),
        col("parsed.unit"),
        col("parsed.ts_ms"),
        col("arrived_at"),
        col("topic"),
        col("partition"),
        col("offset")
    )

    # Bad records: anything that failed parsing or validation
    bad = parsed.filter(
        col("parsed.sensor_id").isNull() |
        col("parsed.value").isNull() |
        col("parsed.unit").isNull() |
        col("parsed.ts_ms").isNull() |
        ~(col("parsed.value").between(-100, 10000))
    ).select(
        col("raw_payload"),
        col("topic"),
        col("partition"),
        col("offset"),
        col("arrived_at"),
        current_timestamp().alias("failed_at"),
        lit(batch_id).cast("long").alias("batch_id")
    )

    good.write.format("delta").mode("append").saveAsTable("bronze.sensor_readings")
    bad.write.format("delta").mode("append").saveAsTable("bronze.sensor_readings_dead_letter")

raw_stream.writeStream     .foreachBatch(parse_and_route)     .option("checkpointLocation", spark.conf.get("pipeline.checkpoint.bronze"))     .trigger(availableNow=True)     .start()     .awaitTermination()

Job 3: Bronze to Gold (Merge)

# Notebook: /Pipelines/Sensor/03_merge_gold
from delta.tables import DeltaTable
from pyspark.sql.functions import col, from_unixtime

bronze_stream = spark.readStream     .format("delta")     .table("bronze.sensor_readings")

gold_table = DeltaTable.forName(spark, "gold.sensor_readings")

def merge_to_gold(batch_df, batch_id):
    # Deduplicate within the batch before merging
    deduped = batch_df.dropDuplicates(["sensor_id", "ts_ms"])

    gold_table.alias("gold").merge(
        deduped.alias("new"),
        "gold.sensor_id = new.sensor_id AND gold.ts_ms = new.ts_ms"
    ).whenMatchedUpdate(set={
        "value":      "new.value",
        "unit":       "new.unit",
        "updated_at": "new.arrived_at"
    }).whenNotMatchedInsert(values={
        "sensor_id":  "new.sensor_id",
        "value":      "new.value",
        "unit":       "new.unit",
        "ts_ms":      "new.ts_ms",
        "arrived_at": "new.arrived_at",
        "updated_at": "new.arrived_at"
    }).execute()

bronze_stream.writeStream     .foreachBatch(merge_to_gold)     .option("checkpointLocation", spark.conf.get("pipeline.checkpoint.gold"))     .trigger(availableNow=True)     .start()     .awaitTermination()

Databricks Workflows Orchestration

Each notebook is a separate Databricks job, scheduled independently. Job 1 runs every 5 minutes. Job 2 runs every 15 minutes. Job 3 runs every 30 minutes. The intervals are tuned so each stage has time to complete before the next stage consumes its output.

The dependencies are loose — each stage reads from the Delta table written by the previous stage, not from a job dependency chain. This means a failure in Job 2 does not block Job 1 from continuing to land raw data. When Job 2 recovers, it processes the backlog. Job 3 processes whenever the bronze table has new records. The system is self-healing without explicit retry configuration. I am here to help if you want to adapt this pattern to your specific topic set and latency requirements.

Read more