The Right Way to Author Airflow 3 DAGs
TaskFlow API, dependencies & branching, Dynamic Task Mapping, and TaskGroups — how to write DAGs properly in Airflow 3.
This is Part 4 of the Airflow 3 in Practice series. In the previous part, Configuration & Optimization, we tuned the cluster; now we look at how to write the DAGs that run on top of it. The next part is Part 5: Advanced DAG Techniques.
A DAG (Directed Acyclic Graph) is a pipeline definition that spells out "what to run, and in what order" as code. Two DAGs can do exactly the same thing, yet depending on how they are written, one becomes easy-to-read code and the other becomes code no one dares to touch six months later. This article lays out the standard for "this is how you do it" on Airflow 3.
In Airflow 3, authoring a DAG starts from a single point: imports come from
from airflow.sdk import .... Because workers no longer connect directly to the metadata DB but instead communicate via the Task SDK, using the SDK import path is the canonical approach.
Two ways to author: classic Operators vs the TaskFlow API
Let's first look at the same pipeline written both ways, side by side. We'll use the most common ETL: "extract → transform → load."
The classic Operator approach — you create tasks as objects and pass values around via XCom.
from airflow.sdk import DAG
from airflow.providers.standard.operators.python import PythonOperator
import pendulum
def extract(**context):
return {"rows": [1, 2, 3, 4]}
def transform(**context):
ti = context["ti"]
data = ti.xcom_pull(task_ids="extract")
return [r * 10 for r in data["rows"]]
def load(**context):
ti = context["ti"]
rows = ti.xcom_pull(task_ids="transform")
print(f"Load complete: {rows}")
with DAG(
dag_id="etl_classic",
start_date=pendulum.datetime(2026, 1, 1, tz="Asia/Seoul"),
schedule="@daily",
catchup=False,
) as dag:
t_extract = PythonOperator(task_id="extract", python_callable=extract)
t_transform = PythonOperator(task_id="transform", python_callable=transform)
t_load = PythonOperator(task_id="load", python_callable=load)
t_extract >> t_transform >> t_loadThe TaskFlow API approach — decorate a function with @task and that function is the task. Pass a function's return value as the argument to the next function and the XCom wiring happens automatically.
from airflow.sdk import dag, task
import pendulum
@dag(
schedule="@daily",
start_date=pendulum.datetime(2026, 1, 1, tz="Asia/Seoul"),
catchup=False,
)
def etl_taskflow():
@task
def extract():
return {"rows": [1, 2, 3, 4]}
@task
def transform(payload: dict):
return [r * 10 for r in payload["rows"]]
@task
def load(rows: list):
print(f"Load complete: {rows}")
load(transform(extract()))
etl_taskflow()The difference is obvious at a glance. With TaskFlow you never call xcom_pull/xcom_push directly; instead the function return values and arguments themselves become the dependencies and the data hand-off. The single line load(transform(extract())) is what creates the extract >> transform >> load order.
| Aspect | Classic Operators | TaskFlow API |
|---|---|---|
| Dependency expression | Wired manually with >> | Inferred from function call relationships |
| Data passing | Explicit xcom_pull/push | Implicit via return values and arguments |
| Readability | Centered on objects and task IDs | Reads like plain Python functions |
| Best fit | Off-the-shelf Operators (Sensors, Providers, etc.) | Pure Python logic |
The two are not an either/or choice. Real-world DAGs usually mix them. Write pure Python logic with
@task, handle the parts that touch external systems with off-the-shelf Operators, and stitch them together with>>.
How Operator, Sensor, and Hook relate
If you write everything by hand with only @task, you end up rewriting external-system integration code every time. Knowing the three building blocks Airflow provides means you don't have to.
- Operator: a unit that "performs one piece of work, once."
PythonOperator,BashOperator, theS3...Operatorthat a Provider supplies, and so on. - Sensor: a special kind of Operator whose job is to "wait until some condition becomes true" — waiting for a file to arrive, an external job to finish, and the like. In Airflow 3, putting a Sensor in deferrable mode lets the Triggerer handle the polling, so it doesn't occupy a worker slot (for details see the Architecture part).
- Hook: a low-level wrapper that attaches to an external system. Operators and Sensors use Hooks internally, and a Hook reads Airflow's Connection (connection details) to establish the actual connection.
In short, a Hook handles "how to connect," while Operators and Sensors handle "what to do over that connection." External system integration itself is covered in depth in Part 8: External System Integration.
Dependencies, trigger_rule, and branching
Dependencies are expressed with >> (downstream) and << (upstream). When you have many tasks, the chain/cross_downstream helpers are easier to read.
from airflow.sdk import chain
# Equivalent to a >> b >> c >> d
chain(a, b, c, d)By default, a task runs only when all of its parents succeed (all_success). What changes this is the trigger_rule. For example, "run a cleanup task no matter what, whether the preceding steps succeed or fail" uses all_done.
trigger_rule | Run condition |
|---|---|
all_success (default) | All parents succeeded |
all_done | All parents finished (regardless of success/failure) — useful for cleanup tasks |
one_success | At least one parent succeeded |
none_failed_min_one_success | No failures + at least one success — useful at branch join points |
Branching is the pattern of "running only one of several branches depending on a condition." A function decorated with @task.branch returns the id of the downstream task to run, and the remaining branches are skipped.
from airflow.sdk import dag, task
import pendulum
@dag(schedule="@daily",
start_date=pendulum.datetime(2026, 1, 1, tz="Asia/Seoul"),
catchup=False)
def etl_with_branch():
@task
def extract():
return {"rows": [1, 2, 3, 4]}
@task.branch
def check_quality(payload: dict):
# If there are rows, take the load branch; if empty, take the notify branch
return "load" if payload["rows"] else "notify_empty"
@task
def load():
print("load")
@task
def notify_empty():
print("No data — notifying")
# Final task where the two branches join: adjust trigger_rule so it runs even if one side is skipped
@task(trigger_rule="none_failed_min_one_success")
def finalize():
print("finalize")
data = extract()
branch = check_quality(data)
branch >> [load(), notify_empty()] >> finalize()
etl_with_branch()The key here is the trigger_rule at the join point. When branching skips one side, if the join task keeps the default all_success, it gets skipped too because of the skipped parent. So you must switch it to none_failed_min_one_success for it to run correctly. Visualized, the flow looks like this:
Dynamic Task Mapping: fanning out at runtime
As you work with DAGs, you run into cases where "you can't know the number of items to process ahead of time." Today there might be 3 files, tomorrow 50. Dynamic Task Mapping is what automatically expands tasks at runtime to match the number of items. You use .expand().
from airflow.sdk import dag, task
import pendulum
@dag(schedule="@daily",
start_date=pendulum.datetime(2026, 1, 1, tz="Asia/Seoul"),
catchup=False)
def fanout_pipeline():
@task
def list_files() -> list[str]:
# Decide the list of files to process at runtime (the count can vary each time)
return ["a.csv", "b.csv", "c.csv"]
@task
def process(file_name: str) -> int:
print(f"Processing: {file_name}")
return len(file_name) # Example: the result of processing each file (a fake value)
@task
def summarize(results: list[int]):
# The return values of all mapped tasks arrive collected into a list
print(f"Processed {len(results)} items, total {sum(results)}")
files = list_files()
counts = process.expand(file_name=files) # process is expanded to match the number of files
summarize(counts) # Aggregate the results into a single task (fan-in)
fanout_pipeline()The single line process.expand(file_name=files) creates as many process task instances as the length of the list produced upstream. Then summarize receives those results as a single list and aggregates them. This is the fan-out / fan-in pattern: one input task expands into N and then collapses back into one.
If the number of tasks mapped by
.expand()explodes, it burdens the scheduler and the metadata DB. Control concurrent execution with the three concurrency layers from Part 3 (parallelism,max_active_tasks_per_dag, Pools).
Structuring with TaskGroups (SubDAGs are gone)
When you have dozens of tasks, the graph stops being readable at a glance. A TaskGroup lets you bundle related tasks into a visual and logical group that can be collapsed.
from airflow.sdk import dag, task, task_group
import pendulum
@dag(schedule="@daily",
start_date=pendulum.datetime(2026, 1, 1, tz="Asia/Seoul"),
catchup=False)
def grouped_pipeline():
@task
def extract():
return [1, 2, 3]
@task_group(group_id="cleanse")
def cleanse(data):
@task
def dedup(rows):
return list(set(rows))
@task
def validate(rows):
return rows
return validate(dedup(data))
@task
def load(rows):
print(f"Load: {rows}")
load(cleanse(extract()))
grouped_pipeline()In the UI the cleanse group shows up collapsed as a single box, and expanding it reveals the inner dedup and validate. Task ids inside the group get a group prefix, like cleanse.dedup.
Important: SubDAGs have been removed in Airflow 3. Replace every pattern you used to bundle with a SubDAG with a TaskGroup instead. If you want to split DAGs into larger units, you're better off connecting cross-DAG dependencies in a data-aware way with Asset-based scheduling from Part 6.
DAG anti-patterns to avoid
DAG files are parsed periodically by the DAG processor (in Airflow 3, an independent process separate from the scheduler). The crucial point is that top-level code in the file runs repeatedly, for every DAG, on every parse.
# Anti-pattern: doing heavy work at the top level runs it on every parse
import pandas as pd
df = pd.read_parquet("s3://big-bucket/huge.parquet") # ❌ downloaded on every parse
# The right way: heavy work must go "inside" a task function
@task
def load_data():
import pandas as pd
return pd.read_parquet("s3://big-bucket/huge.parquet") # ✅ only when it runsCommon pitfalls you'll see:
- Heavy top-level code/imports: don't put DB queries, file downloads, or heavy library imports at the very top of a DAG file. Parsing slows down and the scheduler falls behind. Move heavy imports inside task functions.
- Non-deterministic DAGs: don't use
datetime.now()or random numbers to make the DAG structure (number of tasks, ids) different every time. The same file should always produce the same graph, so that tracking and re-runs stay stable. If you need time-based logic, uselogical_dateordata_interval_start/endat the task's execution time, not in the structure (execution_datewas removed in Airflow 3). - Giant XComs: don't stuff a large DataFrame or a payload of tens of MB straight into a task's return value (XCom). XCom is stored in the metadata DB, so the DB balloons. The canonical approach is to keep large data in external storage and pass only the path or key through XCom. This topic is covered in earnest in Part 7: XCom & Data Passing.
Avoiding just these three prevents a good share of operational incidents like "why is the scheduler slow," "why is the graph different every time," and "why is the metadata DB blowing up."
Wrap-up
- The starting point for an Airflow 3 DAG is
from airflow.sdk import dag, task. Write pure Python with the TaskFlow API, external integrations with off-the-shelf Operators, and mix the two. - Operators/Sensors handle "what to do," and Hooks handle "how to connect."
- Use
@task.branchfor branching andtrigger_rule="none_failed_min_one_success"at the join point. When the item count is variable, use.expand()for runtime fan-out. - For structuring, use TaskGroups (SubDAGs are removed). Avoid heavy top-level code, non-deterministic structure, and giant XComs.
In the next part, Part 5: Advanced DAG Techniques, we cover a level up of practical techniques: running scripts, passing parameters, error handling, calling other DAGs, executing PostgreSQL queries, re-running failures, and date-based logic. For the official documentation, see Airflow Authoring DAGs.