The appeal of Structured Streaming in Databricks is that it uses the same DataFrame API you already know. The query that runs as a batch job on yesterday's data can run continuously on data as it arrives, with minimal changes. This is the architectural convergence that makes it worth learning even if you don't have a strict real-time requirement.
The Core Concept
Structured Streaming models a data stream as an unbounded table. New rows arrive and get appended to this table continuously. Your query runs on this table, producing output as new rows arrive. The output can be another Delta table, a Kafka topic, an in-memory table for dashboards, or console output for testing.
The execution model is microbatch by default: Spark periodically checks for new data (every few seconds by default), processes any new rows, and commits the output. This isn't true sub-second streaming — it's "near-real-time," typically 1–30 second latency. For most analytical use cases, this is fine.
Reading a Stream from Files
# Batch read (what you already know)
df_batch = spark.read.format("delta").load("abfss://[email protected]/raw/events/")
# Streaming read from the same Delta table -- gets new rows as they arrive
df_stream = spark.readStream.format("delta") .load("abfss://[email protected]/raw/events/")
print(df_stream.isStreaming) # True
The only change: read becomes readStream. The DataFrame schema and API are identical. All your transformations work unchanged:
processed = df_stream .filter(F.col("event_type") == "purchase") .withColumn("event_hour", F.date_trunc("hour", F.col("event_timestamp"))) .groupBy("event_hour", "product_category") .agg(F.sum("amount").alias("hourly_revenue"))
Writing a Stream to Delta
query = processed.writeStream .format("delta") .option("checkpointLocation", "abfss://[email protected]/checkpoints/hourly_revenue/") .outputMode("complete") .trigger(processingTime="60 seconds") .start("abfss://[email protected]/tables/hourly_revenue/")
# The stream runs until you stop it
query.awaitTermination() # blocks until the query fails or is stopped
The checkpoint location is critical — it's where Spark tracks which data it has already processed. If the stream job fails and restarts, it reads the checkpoint to find out where it left off and continues from there. Without a checkpoint, the stream re-processes everything from the beginning on restart.
Output Modes
Streaming output has three modes — this is the biggest conceptual shift from batch:
- append: Only new rows are written to the output. Works for transformations that don't aggregate (filter, select, withColumn). Aggregations require complete or update mode.
- complete: The entire result table is rewritten on every trigger. Works for aggregations. For large result sets, this is expensive — every trigger rewrites the full aggregated output.
- update: Only rows that changed since the last trigger are written. Most efficient for aggregations on Delta tables. Only rows whose aggregated values changed get updated.
# For a simple transformation (no aggregation): append
df_stream.filter(...).writeStream.outputMode("append").start(...)
# For aggregations on a result table you can overwrite: complete
df_stream.groupBy("category").agg(F.sum("amount")).writeStream.outputMode("complete").start(...)
# For aggregations on Delta with upsert semantics: update
df_stream.groupBy("category").agg(F.sum("amount")).writeStream.outputMode("update") .format("delta").start(...)
Watermarking for Late Data
Events arrive late. A purchase event from 11:55 PM might arrive at 12:05 AM because of network delay. If your stream aggregates by hour, late events for the 11 PM hour arrive after you've already output the 11 PM aggregate.
Watermarking tells Spark how late is too late — events that arrive more than N minutes after their event timestamp are dropped:
df_with_watermark = df_stream .withWatermark("event_timestamp", "10 minutes") .groupBy(
F.window("event_timestamp", "1 hour"),
"product_category"
) .agg(F.sum("amount").alias("hourly_revenue"))
With a 10-minute watermark, events arriving up to 10 minutes late are included in their original window. Events arriving more than 10 minutes after their timestamp are dropped. This bounds the state Spark needs to maintain (it can discard state for windows older than the watermark).
The Trigger Options
# Process every 60 seconds
.trigger(processingTime="60 seconds")
# Process once and stop (useful for scheduled "mini-batch" runs)
.trigger(once=True)
# Process as fast as possible (continuous microbatching)
.trigger(processingTime="0 seconds") # or just don't set trigger
trigger(once=True) is particularly useful as a bridge between batch and streaming: the stream reads all new data since the last checkpoint, processes it, and stops. You can run this on a Databricks job schedule (every 15 minutes, say) and get incremental processing without running a persistent streaming cluster. Lower cost than always-on streaming, lower latency than daily batch.