Auto Loader in Databricks: Incrementally Processing Files From Cloud Storage

The standard approach to incremental file ingestion is messy: track which files you've processed in a table, query the directory listing, compare, load only the new ones, update the tracker. It works, but it's boilerplate that every pipeline ends up reimplementing. Auto Loader is Databricks' answer: a file source for Structured Streaming that handles all of this automatically.

What Auto Loader Does

Auto Loader watches a cloud storage path for new files and processes them exactly once as they arrive. It maintains state in a checkpoint directory (like all Structured Streaming sources), so it knows which files have already been processed. On restart, it picks up from where it left off. No custom bookkeeping required.

# Without Auto Loader: manual file tracking
processed_files = spark.read.format("delta").load("path/to/processed_tracker/")
all_files = dbutils.fs.ls("abfss://[email protected]/raw/events/")
new_files = [f for f in all_files if f.name not in processed_files]
# ... process new_files, update tracker

# With Auto Loader: built-in exactly-once file tracking
df_stream = spark.readStream.format("cloudFiles")     .option("cloudFiles.format", "json")     .option("cloudFiles.schemaLocation", "abfss://[email protected]/schemas/events/")     .load("abfss://[email protected]/raw/events/")

Directory Listing vs. File Notification Mode

Auto Loader has two modes for discovering new files:

Directory listing (default): Periodically lists the target directory and compares against the internal state of already-processed files. Works with any cloud storage. Can be slow for directories with millions of files because the listing itself becomes expensive.

File notification: Sets up cloud storage event notifications (Azure Event Grid, AWS SNS/SQS) and receives a notification when new files are added. No directory listing required. Lower latency and scales to high-volume landing zones. Requires additional cloud infrastructure setup.

# File notification mode (Azure)
df_stream = spark.readStream.format("cloudFiles")     .option("cloudFiles.format", "json")     .option("cloudFiles.useNotifications", "true")     .option("cloudFiles.connectionString", dbutils.secrets.get("kv", "event-grid-connection"))     .option("cloudFiles.resourceGroup", "my-resource-group")     .option("cloudFiles.subscriptionId", dbutils.secrets.get("kv", "subscription-id"))     .option("cloudFiles.tenantId", dbutils.secrets.get("kv", "tenant-id"))     .option("cloudFiles.schemaLocation", "abfss://[email protected]/schemas/events/")     .load("abfss://[email protected]/raw/events/")

Schema Inference and Evolution

Auto Loader can infer the schema from the first batch of files and persist it to the schemaLocation. On subsequent runs, it reads the persisted schema rather than re-inferring it:

df_stream = spark.readStream.format("cloudFiles")     .option("cloudFiles.format", "json")     .option("cloudFiles.inferColumnTypes", "true")     .option("cloudFiles.schemaLocation", "abfss://[email protected]/schemas/events/")     .load("abfss://[email protected]/raw/events/")

When source files add new columns, Auto Loader can handle schema evolution with the cloudFiles.schemaEvolutionMode option:

  • addNewColumns (default): new columns are added to the inferred schema automatically
  • rescue: unexpected columns are captured in a special _rescued_data JSON column rather than being dropped
  • failOnNewColumns: strict mode — fail the stream if new columns appear (for schema contracts you want to enforce)

The Standard Ingestion Pattern

checkpoint = "abfss://[email protected]/events_ingestion/"
schema_loc = "abfss://[email protected]/events/"
target_table = "raw.events"

(
    spark.readStream
         .format("cloudFiles")
         .option("cloudFiles.format", "json")
         .option("cloudFiles.schemaLocation", schema_loc)
         .option("cloudFiles.inferColumnTypes", "true")
         .load("abfss://[email protected]/events/")
         .withColumn("_ingested_at", F.current_timestamp())
         .withColumn("_source_file", F.input_file_name())
    .writeStream
         .format("delta")
         .option("checkpointLocation", checkpoint)
         .option("mergeSchema", "true")
         .trigger(processingTime="5 minutes")
         .outputMode("append")
         .table(target_table)
)

The _source_file column (F.input_file_name()) captures the full path of the file each row came from — invaluable for debugging bad data, it lets you trace a corrupt row back to the exact file that contained it.

Running as a Job

Auto Loader streams run well as long-running Databricks Jobs (use an always-on job cluster with a modest instance type). Alternatively, use .trigger(once=True) to run the stream as a scheduled batch — it processes all new files since the last checkpoint and stops, without requiring a permanently running cluster.

For most enterprise data lake ingestion patterns — files landing in cloud storage from operational systems, IoT devices, or vendor exports — Auto Loader plus Delta is a significant simplification over hand-rolled file-tracking pipelines. Less code, better exactly-once guarantees, built-in schema handling.

Read more