12 min read Part 10 of 10

Part 10: Orchestrating Workflows

Building production DAGs, conditional tasks, and passing state using Databricks Workflows.

#Databricks #Orchestration #Workflows #Scheduling
Part 10: Orchestrating Workflows

You’ve made it to the final part of the Databricks series! We’ve covered everything from storage formats and compute nodes to DLT pipelines, security catalogs, and performance tuning. Now, it’s time to glue everything together. Let’s look at how to build reliable, multi-task orchestration workflows.

The Scheduler: Databricks Workflows

Databricks Workflows is a fully managed orchestrator built directly into the Databricks platform. It allows you to build Directed Acyclic Graphs (DAGs) containing multiple steps: notebooks, SQL queries, DLT pipelines, Python scripts, or even dbt tasks.

                  ┌─────────────────────────┐
                  │  Task 1: Ingest (DLT)   │
                  └────────────┬────────────┘


                  ┌─────────────────────────┐
                  │ Task 2: Clean (Notebook)│
                  └────────────┬────────────┘

            ┌──────────────────┴──────────────────┐
            ▼                                     ▼
┌─────────────────────────┐           ┌─────────────────────────┐
│ Task 3a: Metrics (SQL)  │           │  Task 3b: Train ML (Py) │
└─────────────────────────┘           └─────────────────────────┘

ELI5: What is Workflow Orchestration? Think of running a data pipeline like directing a Broadway play. Actors, lighting techs, and set builders can’t all run onto the stage at once. The director coordinates them: “Set builders assemble the stage first. When they finish, the actors walk out. If the lead actor gets sick, trigger the backup plan.” See ELI5: Workflow Orchestration for the breakdown.


The Economics of Workflows: Ephemeral Job Clusters

As we discussed in Part 2, the number one way to control costs in Databricks is to run your production workflows on Job Clusters.

When scheduling a Workflow:

  1. You configure a New Job Cluster definition for the job.
  2. The orchestrator boots up these virtual machine nodes only when the first task starts.
  3. The tasks execute sequentially or in parallel.
  4. The moment the final task finishes (or fails), the orchestrator terminates all VM nodes, stopping your cloud billing timer.

You only pay the compute tax for the exact seconds your tasks are running.


Task Dependencies & Conditional Logic

In a multi-task workflow, you specify dependencies. For example, “Task B runs only when Task A succeeds.”

But DLT and Workflows also support Conditional Tasks:

  • Run if all succeeded (Default): Standard dependency.
  • Run if at least one succeeded: Useful for backup paths.
  • Run if all failed: Trigger cleanup or alert pipelines.
  • Run if skipped: Bypass steps dynamically.

You can also configure Retries on individual tasks: e.g. “If Task A fails due to a network timeout, retry up to 3 times, waiting 30 seconds between attempts.”

Disaster Girl failed workflow POV: Watching your critical 24-step production workflow fail on task 23 at 3:15 AM because you didn’t configure a retry limit on a flaky third-party API call.


State Sharing: The Task Values API

A common requirement in data engineering is passing variables between tasks. For example, Task A processes files and calculates the number of rows written, and Task B needs that row count to decide whether to send a Slack alert.

You cannot use global Python variables because each task runs in a separate process or even on a different cluster node.

To solve this, use the Task Values API.

Task A (Writes Value)

Inside your first notebook, use the dbutils.jobs.taskValues utility to register a value:

# Calculate rows processed
rows_written = 14520

# Save the value to the job context
dbutils.jobs.taskValues.set(
    key="row_count", 
    value=rows_written
)

Task B (Reads Value)

Inside your second notebook, retrieve the value saved by Task A (referencing Task A’s task name in the workflow):

# Fetch value using task name: "Ingest_Task"
input_rows = dbutils.jobs.taskValues.get(
    taskKey="Ingest_Task", 
    key="row_count", 
    default=0
)

# Print and use the value
print(f"Received row count from Ingest Task: {input_rows}")

if input_rows == 0:
    print("Warning: No records were processed!")

For complete parameter options, read the Databricks Task Values Documentation.


Series Conclusion

Congratulations on completing the “Databricks Lakehouse: From Zero to Hero” series!

You have gone from understanding storage architectures, configuring workspaces, modeling tables using the Medallion pattern, streaming files with Auto Loader, tuning query execution with AQE, building declarative DLT pipelines, securing assets with Unity Catalog, speeding up operations with Photon and Liquid Clustering, and orchestrating production jobs.

You now possess the tools to architect, build, and operate enterprise-level Lakehouse environments. Go forth, orchestrate pipelines, and build amazing data platforms!