Blog
airflowschedulingassettimetabledata-pipeline

Airflow 3 Scheduling & Assets — Data-Aware Pipelines

Compare cron, Timetable, and Asset-based scheduling, wire DAGs together with producer-consumer Assets, and tackle logical_date, catchup, and timezone pitfalls from a practical angle.

Data DynamicsJune 30, 202611 min read

Writing a good DAG and deciding "when to run it" are entirely different skills. Many teams start with @daily and then hit the wall of "I want the next DAG to run the moment the previous one finishes, but the timing doesn't line up." You schedule an aggregation DAG for 3 a.m. assuming the data lands at 2 a.m., but when the data shows up late at 4 a.m., you end up aggregating an empty table.

Airflow 3 offers a clear answer to this problem: schedule based on data, not on time.

This is Part 6 of the Airflow 3 in Practice series. Where the previous part, Part 5: Advanced DAG Techniques, covered practical DAG skills like parameters, error handling, and reruns, this part covers when and based on what that DAG should run. The next part continues with XCom & Passing Data.

What you will learn in this post

  • The differences between cron expressions, Timetables, and Asset-based scheduling, and how to choose between them
  • How to automatically connect producer and consumer DAGs with Assets (formerly Dataset)
  • The precise meaning of logical_date and data_interval_start/end (execution_date is gone)
  • The changed default for catchup and scheduler-managed backfill
  • How to avoid getting paged at dawn because of timezones

1. Three Ways to Set a Schedule

In Airflow 3, the value you can give to a DAG's schedule argument falls into roughly three categories. Getting a feel for which question each one answers makes the choice easy.

ApproachValue you setQuestion it answers
cron / preset"0 2 * * *", @daily, timedelta(hours=1)"When should it run?" (clock-based)
TimetableCronTriggerTimetable, custom Timetable object"It's clock-based, but the rules are complex"
Asset-basedschedule=[my_asset]"Run it when what gets updated?" (data-based)

cron is the most familiar, but rules like "9 a.m. Monday through Friday, except public holidays" are hard to express in a single cron line. This is where a Timetable becomes the flexible, higher-level tool for clock-based schedules. And when you want the arrival of data rather than the clock to be the trigger, Asset enters the picture.

from datetime import timedelta
from airflow.sdk import dag, task
from airflow.timetables.trigger import CronTriggerTimetable
 
# 1) cron string — the simplest
@dag(schedule="0 2 * * *", catchup=False)
def daily_cron():
    ...
 
# 2) preset / interval
@dag(schedule="@daily", catchup=False)
def daily_preset():
    ...
 
# 3) Timetable — finer control than cron (e.g., separating run time from the data interval)
@dag(
    schedule=CronTriggerTimetable("0 2 * * *", timezone="Asia/Seoul"),
    catchup=False,
)
def with_timetable():
    ...

Notice the imports come from airflow.sdk. In Airflow 3, the DAG-authoring API has been consolidated into the Task SDK (airflow.sdk).

If the clock is enough, use cron; if the rules are complex, use a Timetable; if data is the basis, use an Asset.

The following diagram shows how the time-based and Asset-based paths diverge.

Loading diagram…

2. Asset — the Heart of Data-Aware Scheduling

Airflow 3's Asset is a concept that extends the 2.x Dataset in both name and capability (the term "Dataset" is no longer used). An Asset is a label that points to a logical unit of data that a pipeline produces — a table, an S3 path, a file, and so on.

The core idea is simple. When one DAG "produces" a particular Asset and another DAG "consumes" that Asset, the consumer DAG is automatically triggered the moment production finishes. There's no need to align times. When the data is ready, the next step runs.

from airflow.sdk import dag, task, Asset
 
# Declare a logical unit of data as an Asset
sales_raw = Asset("s3://warehouse/sales/raw")
 
# (A) Producer DAG: list the Asset in outlets, and on success this Asset is marked "updated"
@dag(schedule="0 1 * * *", catchup=False)
def ingest_sales():
    @task(outlets=[sales_raw])
    def load():
        # ... load from the source ...
        return "done"
    load()
 
ingest_sales()
 
# (B) Consumer DAG: put the Asset in schedule, and it runs every time that Asset is updated
@dag(schedule=[sales_raw], catchup=False)
def aggregate_sales():
    @task
    def aggregate():
        # ... sales_raw was just updated, so aggregate with confidence ...
        ...
    aggregate()
 
aggregate_sales()

Here, aggregate_sales has no cron at all. It wakes up only when the load task in ingest_sales succeeds and updates sales_raw. If the data arrives late, the consumer DAG runs late too; if the data never comes, the consumer DAG never runs. The "aggregating an empty table" incident structurally disappears.

There is also a more concise @asset decorator that declares the function itself as the producer of an Asset.

from airflow.sdk import asset
 
# This function becomes a DAG that produces a single Asset
@asset(schedule="@daily")
def sales_curated():
    # The return value / side effect signifies an update to the sales_curated Asset
    ...

When you tie things together with Assets, multiple DAGs chain automatically like links. Below is the flow where a producer DAG updates an Asset and the scheduler picks up that signal to wake the consumer DAG.

Loading diagram…

You can also subscribe to multiple Assets at once. With schedule=[asset_a, asset_b], the consumer DAG is, by default, triggered only when both have been updated; more complex conditions are expressed with Asset logical expressions (e.g., asset_a & asset_b, asset_a | asset_b).

3. logical_date and data_interval — execution_date Is Gone

If you've used Airflow 2.x, you've probably been burned by the confusing name execution_date. It's called the "execution date," yet it points not to the actual run time but to the start of the data interval, which threw everyone for a loop. Airflow 3 has removed execution_date. Instead, it uses names with clear meanings.

VariableMeaning
logical_dateThe point in time this run logically represents. On a cron schedule, the reference point of the data interval
data_interval_startThe start of the data interval this run processes
data_interval_endThe end of the data interval (usually close to the time the actual trigger happens)
from airflow.sdk import dag, task
 
@dag(schedule="0 2 * * *", catchup=False)
def windowed():
    @task
    def process(**context):
        start = context["data_interval_start"]
        end = context["data_interval_end"]
        # Process "only the data in this interval" — the basis of idempotency
        print(f"Processing interval: {start} ~ {end}")
    process()
 
windowed()

Here's one of the most important changes. When a run is triggered by an Asset or manually, logical_date may be None. For a run that fires because data was updated, the very notion of "the clock-time point this run represents" is ambiguous. So your code should not assume that logical_date always exists.

Use data_interval_start/end for clock-based interval processing, and use logical_date only when you truly need "the representative time of this run" — and even then, keep the possibility of None in mind.

If you write your processing around the data interval, you get an idempotent pipeline that produces the same result no matter how many times you rerun the same interval. This is the foundation that makes backfill and reruns safe.

4. catchup and backfill — Changed Defaults in 3.x

The behavior that most often surprised new users in Airflow 2.x was catchup. If you set start_date in the past and turned on the DAG, runs were created en masse for every interval in between, and the scheduler would often go haywire.

In Airflow 3, the default for catchup has changed to False. That is, if you turn on a DAG with no special configuration, it doesn't backfill the past and instead runs from the next schedule onward. This is the behavior most teams actually want.

@dag(
    schedule="0 2 * * *",
    start_date=datetime(2025, 1, 1),  # even if it's in the past
    catchup=False,                     # 3.x default — does not backfill the past
)
def no_backlog():
    ...

So what about when you intentionally want to rerun past intervals (backfill)? There's a big change here too. In 2.x, the CLI (airflow dags backfill) created runs in a separate process, but in Airflow 3 the scheduler manages backfill. When you request "backfill this period" from the UI or REST API, the scheduler creates and manages runs for that interval through the same path as a normal schedule. That means backfill runs can be tracked and retried in the UI exactly like normal runs.

Loading diagram…

Keep day-to-day operations on catchup=False, and request a backfill via UI/API when you actually need to reprocess the past. It's a safe default that prevents the "the past floods in the moment you turn it on" incident.

5. The Timezone Pitfall

A classic reason for getting paged at dawn in scheduling is the timezone. You only need to keep a few things in mind.

  • Airflow internally stores and handles times based on UTC. It can deal with local timezones in the UI display or when interpreting cron, but storage is in UTC.
  • A cron string alone is ambiguous about "2 a.m. in which timezone." To make the timezone explicit, it's safest to specify timezone on CronTriggerTimetable.
from airflow.timetables.trigger import CronTriggerTimetable
 
# Pin it down clearly as 2 a.m. daily "in Seoul time"
@dag(
    schedule=CronTriggerTimetable("0 2 * * *", timezone="Asia/Seoul"),
    catchup=False,
)
def seoul_daily():
    ...
  • In timezones with DST (daylight saving time) (e.g., the US/Europe), a cron schedule behaves subtly on clock-change days, either skipping a run once or running twice. Korea (Asia/Seoul) has no DST and is relatively free of this problem, but if you handle global data, it's safest to explicitly align the timezone of the source system with the timezone of your Airflow schedule.
  • Inside task code, base your logic on the data_interval_start/end provided by the context (aware datetimes that carry timezone information) rather than a naive datetime like datetime.now().

6. What to Use When — A Summary

Finally, let's summarize the practical selection criteria.

SituationRecommendation
A fixed time daily/hourly, no external dependencycron string (@daily, 0 2 * * *)
Clock-based but with complex rules (holidays, business days)CronTriggerTimetable or a custom Timetable
Run the next step when the previous step's data is readyAsset (schedule=[asset])
Proceed only when several sources have all been updatedMultiple Asset subscriptions / Asset logical expressions
Runs only on external events, with no automatic scheduleschedule=None + manual/API trigger

The integrity of a data pipeline ultimately comes from the guarantee that "the next step runs only after the data it needs is ready." Airflow 3's Asset builds this guarantee from data events rather than from guessing cron times. Moving from operations where you woke at dawn just to align timing, to operations where the pipeline flows on its own as the data flows — that's the heart of this part.

In the next part, XCom & Passing Data, we'll cover how the tasks connected this way actually pass data between each other. For the official documentation, see Airflow Authoring & Scheduling.