12 min read Part 5 of 10

Part 5: Batch & Stream Ingestion with Auto Loader

Ingesting files at scale using Databricks Auto Loader, Schema Inference, and Rescued Data.

#Databricks #Ingestion #Auto Loader #Structured Streaming
Part 5: Batch & Stream Ingestion with Auto Loader

If you are building pipelines where files land in cloud storage (S3, ADLS, or GCS) at random intervals, you face a scaling challenge. Traditional batch jobs have to scan the entire bucket to see what’s new. As the bucket grows, your job gets slower and more expensive. Databricks solved this with Auto Loader.

What is Auto Loader?

Databricks Auto Loader is an optimized file source that processes new files as they land in cloud storage.

Instead of writing complex code to keep track of which files you have already processed, you write a standard Spark Structured Streaming query using the cloudFiles format. Auto Loader handles file discovery, checkpointing, and schema management automatically.

Distracted Boyfriend Auto Loader Data engineers looking at cloudFiles Auto Loader while holding hands with manual S3 file-polling cron scripts.

  [ Files land in S3 ]


┌───────────────────────┐
│     Auto Loader       │  <-- Directory Listing or SQS File Notifications
└──────────┬────────────┘


┌───────────────────────┐
│   Bronze Delta Table  │  <-- Streamed in real-time or triggered batch
└───────────────────────┘

ELI5: What is Auto Loader? Think of the difference between walking down the hall to check an empty physical mail slot every 5 minutes vs. installing a smart sensor that pings your phone the exact millisecond a letter falls through the slot. See ELI5: Databricks Auto Loader for the full breakdown.


The Core Features: Schema Inference & Rescued Data

1. Schema Inference & Evolution

Normally, Spark Structured Streaming requires you to explicitly define the schema of files before reading. This is painful when schemas change.

Auto Loader automatically infers the schema by sampling a few files. If a new column shows up in a file next week, Auto Loader detects it, updates the target schema, and continues streaming without crashing.

2. The Rescued Data Column (_rescued_data)

What happens if a file contains corrupted data, or a column that doesn’t match the schema?

Normally, Spark would write NULL to the column, or crash.

Auto Loader solves this by automatically injecting a hidden column named _rescued_data. Any corrupted fields, type mismatches, or unexpected extra columns are written to _rescued_data as a serialized JSON string.

You never lose data, and you can audit the _rescued_data column to build data quality alerts.


Directory Listing vs. File Notifications

Auto Loader has two modes for detecting new files:

1. Directory Listing Mode (Default)

Auto Loader scans the directory files list in your S3/ADLS bucket. It is smart—it caches the file structure and only checks for files modified since the last run.

  • Pros: Needs no infrastructure permissions. Easy to set up.
  • Cons: As the number of files climbs past 10 million, listing the directory gets slow and expensive.

2. File Notification Mode

Auto Loader automatically provisions a queue and notification service (e.g. AWS SNS/SQS or Azure Event Grid) inside your cloud account. The moment a file lands, the cloud service pushes a notification to the queue, and Auto Loader reads it from there.

  • Pros: Scales to billions of files. Extremely low latency. No directory scanning tax.
  • Cons: Requires cloud permissions to create queues and SNS topics.

For cloud setup commands, check the Official Databricks Auto Loader Cloud Setup Guide.


Code Implementation: Auto Loader PySpark Stream

Let’s write a PySpark stream to ingest JSON files using Auto Loader.

# Configure S3 paths and checkpoint storage
source_path = "s3://my-company-bucket/raw/logs/"
target_table = "default.user_logs_bronze"
checkpoint_path = "s3://my-company-bucket/checkpoints/user_logs/"
schema_path = "s3://my-company-bucket/schemas/user_logs/"

# Set up Auto Loader Stream
loader_stream = spark.readStream.format("cloudFiles") \
    .option("cloudFiles.format", "json") \
    .option("cloudFiles.schemaLocation", schema_path) \
    .option("cloudFiles.useNotifications", "true") \
    .option("cloudFiles.rescuedDataColumn", "_rescued_data") \
    .load(source_path)

# Write the stream to a Delta Table
query = loader_stream.writeStream.format("delta") \
    .option("checkpointLocation", checkpoint_path) \
    .outputMode("append") \
    .trigger(availableNow=True) \
    .toTable(target_table)

Explaining the Options:

  • .format("cloudFiles"): Enables the Auto Loader engine.
  • .option("cloudFiles.schemaLocation", schema_path): Tells Auto Loader where to write its schema metadata files. If the schema evolves, the new schema is saved here.
  • .option("cloudFiles.useNotifications", "true"): Configures Auto Loader to set up and use AWS SQS/SNS to listen for new files rather than listing the directory.
  • .trigger(availableNow=True): This is a cost-saving superpower. It tells Spark: “Spin up, scan the queue, process all new files since the last run, and then shutdown.” You get the benefits of streaming checkpoints but run it as a cheap batch job.

Now that we are ingesting files into Delta tables, we need to run highly optimized SQL queries on them. In the next part, we’ll cover Spark SQL optimization and Catalyst details.