I have described the three-stage streaming architecture in a few posts now — raw ingest, deserialize, merge. This post is the implementation detail: what it actually looks like in Databricks notebooks and Workflows, with Delta Lake as the intermediate store between stages.
The Target Architecture
Three notebooks, three Databricks jobs, three Delta tables:
- Job 1: Kafka →
raw.sensor_readings(binary/string payload, Kafka metadata) - Job 2:
raw.sensor_readings→bronze.sensor_readings(parsed, validated, dead letters) - Job 3:
bronze.sensor_readings→gold.sensor_readings(merged, deduplicated, SCD-ready)
Job 1: Kafka to Raw Delta
# Notebook: /Pipelines/Sensor/01_raw_ingest
from pyspark.sql.functions import col, current_timestamp
raw_stream = spark.readStream .format("kafka") .option("kafka.bootstrap.servers", spark.conf.get("pipeline.kafka.brokers")) .option("subscribe", "sensor_readings") .option("kafka.security.protocol", "SASL_SSL") .option("kafka.sasl.mechanism", "PLAIN") .option("kafka.sasl.jaas.config",
f'org.apache.kafka.common.security.plain.PlainLoginModule required '
f'username="$ConnectionString" password="{dbutils.secrets.get("kafka", "connection_string")}";') .option("startingOffsets", "latest") .option("failOnDataLoss", "false") .load()
raw_landed = raw_stream.select(
col("topic"),
col("partition").cast("int"),
col("offset").cast("long"),
col("timestamp").alias("kafka_ts"),
col("value").cast("string").alias("raw_payload"),
current_timestamp().alias("arrived_at")
)
raw_landed.writeStream .format("delta") .outputMode("append") .option("checkpointLocation", spark.conf.get("pipeline.checkpoint.raw")) .trigger(availableNow=True) .toTable("raw.sensor_readings")
Job 2: Raw to Bronze (Deserialize)
# Notebook: /Pipelines/Sensor/02_deserialize
from pyspark.sql.functions import col, from_json, current_timestamp, lit
from pyspark.sql.types import StructType, StringType, DoubleType, LongType
SENSOR_SCHEMA = StructType() .add("sensor_id", StringType()) .add("value", DoubleType()) .add("unit", StringType()) .add("ts_ms", LongType())
# Read new raw records since last checkpoint
raw_stream = spark.readStream .format("delta") .option("checkpointLocation", spark.conf.get("pipeline.checkpoint.bronze")) .table("raw.sensor_readings")
def parse_and_route(batch_df, batch_id):
parsed = batch_df.withColumn("parsed", from_json(col("raw_payload"), SENSOR_SCHEMA))
# Good records: all required fields present and non-null
good = parsed.filter(
col("parsed.sensor_id").isNotNull() &
col("parsed.value").isNotNull() &
col("parsed.unit").isNotNull() &
col("parsed.ts_ms").isNotNull() &
(col("parsed.value").between(-100, 10000))
).select(
col("parsed.sensor_id"),
col("parsed.value"),
col("parsed.unit"),
col("parsed.ts_ms"),
col("arrived_at"),
col("topic"),
col("partition"),
col("offset")
)
# Bad records: anything that failed parsing or validation
bad = parsed.filter(
col("parsed.sensor_id").isNull() |
col("parsed.value").isNull() |
col("parsed.unit").isNull() |
col("parsed.ts_ms").isNull() |
~(col("parsed.value").between(-100, 10000))
).select(
col("raw_payload"),
col("topic"),
col("partition"),
col("offset"),
col("arrived_at"),
current_timestamp().alias("failed_at"),
lit(batch_id).cast("long").alias("batch_id")
)
good.write.format("delta").mode("append").saveAsTable("bronze.sensor_readings")
bad.write.format("delta").mode("append").saveAsTable("bronze.sensor_readings_dead_letter")
raw_stream.writeStream .foreachBatch(parse_and_route) .option("checkpointLocation", spark.conf.get("pipeline.checkpoint.bronze")) .trigger(availableNow=True) .start() .awaitTermination()
Job 3: Bronze to Gold (Merge)
# Notebook: /Pipelines/Sensor/03_merge_gold
from delta.tables import DeltaTable
from pyspark.sql.functions import col, from_unixtime
bronze_stream = spark.readStream .format("delta") .table("bronze.sensor_readings")
gold_table = DeltaTable.forName(spark, "gold.sensor_readings")
def merge_to_gold(batch_df, batch_id):
# Deduplicate within the batch before merging
deduped = batch_df.dropDuplicates(["sensor_id", "ts_ms"])
gold_table.alias("gold").merge(
deduped.alias("new"),
"gold.sensor_id = new.sensor_id AND gold.ts_ms = new.ts_ms"
).whenMatchedUpdate(set={
"value": "new.value",
"unit": "new.unit",
"updated_at": "new.arrived_at"
}).whenNotMatchedInsert(values={
"sensor_id": "new.sensor_id",
"value": "new.value",
"unit": "new.unit",
"ts_ms": "new.ts_ms",
"arrived_at": "new.arrived_at",
"updated_at": "new.arrived_at"
}).execute()
bronze_stream.writeStream .foreachBatch(merge_to_gold) .option("checkpointLocation", spark.conf.get("pipeline.checkpoint.gold")) .trigger(availableNow=True) .start() .awaitTermination()
Databricks Workflows Orchestration
Each notebook is a separate Databricks job, scheduled independently. Job 1 runs every 5 minutes. Job 2 runs every 15 minutes. Job 3 runs every 30 minutes. The intervals are tuned so each stage has time to complete before the next stage consumes its output.
The dependencies are loose — each stage reads from the Delta table written by the previous stage, not from a job dependency chain. This means a failure in Job 2 does not block Job 1 from continuing to land raw data. When Job 2 recovers, it processes the backlog. Job 3 processes whenever the bronze table has new records. The system is self-healing without explicit retry configuration. I am here to help if you want to adapt this pattern to your specific topic set and latency requirements.