Airflow 3 in Practice, Part 5: Advanced DAG Techniques
Script execution, passing parameters, error handling, calling other DAGs, DAG options, rerunning failures, and the date semantics behind it all.
This is Part 5 of the Airflow 3 in Practice series. If the previous part, Part 4: DAG Authoring Done Right, gave you the basic skeleton of a DAG, this part collects the next-level techniques you will inevitably hit in real work. We walk through, step by step, how to run scripts, how to pass parameters, how to handle errors, how to call other DAGs, how to rerun failed executions, and the date semantics that underpins all of it. The next part continues with Part 6: Scheduling & Assets.
All code in this part is based on Airflow 3's Task SDK (
airflow.sdk) and the standard operators that moved intoapache-airflow-providers-standardin 3.x. TheBashOperator,PythonOperator, and friends that lived underairflow.operators.*in 2.x have moved toairflow.providers.standard.operators.*in 3.x.
1. Running Scripts — Where and How
From "running a single Python function" to "running an arbitrary script in an isolated container," Airflow offers execution in several tiers. The key is the trade-off between isolation level and cost.
Let's look at the three most common cases in code.
from airflow.sdk import dag, task
from airflow.providers.standard.operators.bash import BashOperator
@dag(schedule="@daily", catchup=False, tags=["example"])
def script_examples():
# (1) Pure Python — runs directly inside the worker
@task
def transform(rows: int) -> int:
return rows * 2
# (2) Shell command — call an existing script/CLI as-is
run_etl = BashOperator(
task_id="run_etl",
# Pass data_interval_start as an argument for idempotent execution (see section 8)
bash_command="/opt/scripts/etl.sh --date {{ data_interval_start | ds }}",
)
# (3) Python with conflicting dependencies — run in a separate venv
@task.virtualenv(requirements=["pandas==2.1.0"], system_site_packages=False)
def heavy_pandas_job():
import pandas as pd
return pd.__version__
transform(100) >> run_etl >> heavy_pandas_job()
script_examples()When you concatenate user input or parameters into a shell command string, watch out for shell injection. If a value comes from outside, don't drop it in raw via Jinja; it's safer to validate it in a Python task first, or to pass arguments separately, such as the
argumentslist ofKubernetesPodOperator.
When you need full isolation, use KubernetesPodOperator (which launches a single pod on a Kubernetes cluster to run the workload) or DockerOperator. They let you use arbitrary images and languages, which is liberating, but pod/container startup cost and latency are added, so using them for "one lightweight line of Python" is overkill.
2. Running PostgreSQL Queries from a DAG
A large share of data pipelines ultimately boils down to "firing a query at a DB." In Airflow 3 there are broadly two paths to run a PostgreSQL query, and the split comes down to whether you need the result.
Both approaches require provider packages:
apache-airflow-providers-postgres(the Hook and driver) andapache-airflow-providers-common-sql(SQLExecuteQueryOperator). Don't hardcode connection details in code; keep them in a Connection (conn_id). The Secrets Backend for managing passwords safely is covered in Part 8: Integrating External Systems & Calling Sinks.
Declarative execution — SQLExecuteQueryOperator
The PostgresOperator used in Airflow 2.x has been replaced by SQLExecuteQueryOperator in 3.x (a unified operator shared across multiple databases). You just set conn_id to a PostgreSQL Connection.
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
upsert_sales = SQLExecuteQueryOperator(
task_id="upsert_sales",
conn_id="postgres_default",
# Bind with %(name)s placeholders + parameters (no string formatting -> prevents injection)
sql="""
DELETE FROM sales WHERE day = %(day)s;
INSERT INTO sales (day, amount)
SELECT %(day)s, sum(amount)
FROM staging
WHERE ts >= %(start)s AND ts < %(end)s;
""",
parameters={
"day": "{{ data_interval_start | ds }}",
"start": "{{ data_interval_start }}",
"end": "{{ data_interval_end }}",
},
)The date semantics from section 8 applies here too — use data_interval to define the target window and delete first, then re-insert (DELETE then INSERT) so that the result is the same no matter how many times you rerun.
When the SQL grows long, pull it out into a separate .sql file, set template_searchpath on the DAG, and pass just the file name. SQL and Python are separated, which is easier to manage, and you can still use Jinja like {{ data_interval_start }} inside the file.
@dag(schedule="@daily", catchup=False, template_searchpath="/opt/airflow/sql")
def sql_file_pipeline():
SQLExecuteQueryOperator(
task_id="load",
conn_id="postgres_default",
sql="load_sales.sql", # /opt/airflow/sql/load_sales.sql (Jinja templating allowed)
)
sql_file_pipeline()Taking the result and processing it — PostgresHook
To take a query result into Python to validate or branch on it, use PostgresHook inside a task.
from airflow.sdk import task
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.exceptions import AirflowFailException
@task
def assert_loaded(data_interval_start):
hook = PostgresHook(postgres_conn_id="postgres_default")
# Query safely with parameter binding
(count,) = hook.get_first(
"SELECT count(*) FROM sales WHERE day = %(day)s",
parameters={"day": data_interval_start.date()},
)
if count == 0:
raise AirflowFailException("Loaded 0 rows — check upstream")
return countPostgresHook provides methods such as get_records() (multiple rows), get_first() (the first row), get_pandas_df() (a DataFrame), run() (execute only), and copy_expert() (COPY) for bulk loading.
Remember just two things. 1. Never drop values into SQL with an f-string; bind them with
parameters(prevents injection and quoting accidents). 2. Loading a large result into memory all at once withget_pandas_df()can kill the worker, so process large volumes inside the DB or stream them withCOPY.
3. Passing Parameters — Params, conf, Variable, and Templates
There are broadly three channels for injecting values into a DAG. Their purposes differ.
| Channel | Where defined | When decided | Main purpose |
|---|---|---|---|
| Params | DAG code (params=) | Default at authoring time, overridden at trigger time | "Input spec of this DAG" (type / validation / UI form) |
| DAG run conf | At trigger time (conf) | Per execution | One-off execution parameters (manual / API trigger) |
| Variable | Airflow global store | Anytime during operation | Environment values shared across multiple DAGs |
Params declare a DAG's input spec along with type and validation. Form fields are drawn automatically in the UI's "Trigger DAG w/ config" form.
from airflow.sdk import dag, task, Param
@dag(
schedule=None, # manual / external trigger only
catchup=False,
params={
"target_date": Param("2026-06-01", type="string", format="date"),
"batch_size": Param(1000, type="integer", minimum=1, maximum=100000),
"dry_run": Param(False, type="boolean"),
},
tags=["params"],
)
def parametrized_pipeline():
@task
def run(params: dict):
# A task function can receive a context key (params) directly as an argument
if params["dry_run"]:
print(f"[DRY] {params['target_date']} / {params['batch_size']} rows")
return
print(f"Loading: {params['target_date']} / {params['batch_size']} rows")
run()
parametrized_pipeline()When triggering, you can override values with conf. From the CLI, airflow dags trigger parametrized_pipeline --conf '{"target_date":"2026-06-10","dry_run":true}'; from the REST API, send conf in the body (remote triggering in detail is in Part 9: REST API & Remote Schedule Changes).
In template-enabled fields (e.g., BashOperator.bash_command), you can pull values out with Jinja like {{ params.target_date }}, {{ dag_run.conf["target_date"] }}, or {{ var.value.my_key }}. If you want to receive them as Python objects (dict/list), set render_template_as_native_obj=True on the DAG and they render as native objects rather than strings.
Pitfall: Don't call
Variable.get()at the top level of a DAG file. A DAG file is re-executed every time it's parsed (the cost is detailed in Part 3: Configuration & Optimization), so it would hammer the metadata DB endlessly. Do variable lookups at task execution time (inside the function) or in templates ({{ var.value.x }}).
4. Error Handling — Retries, Callbacks, and "Do Not Retry"
When a task fails, Airflow by default attempts retries and, once they're exhausted, confirms it as failed. Let's see that flow as a diagram first.
Retries and timeouts are usually applied DAG-wide via default_args, with only the tasks that need it adjusted individually.
from datetime import timedelta
from airflow.sdk import dag, task
from airflow.exceptions import AirflowFailException, AirflowSkipException
default_args = {
"retries": 3,
"retry_delay": timedelta(minutes=2),
"retry_exponential_backoff": True, # 2 min -> 4 min -> 8 min ...
"max_retry_delay": timedelta(minutes=30),
"execution_timeout": timedelta(hours=1), # exceeding this time is treated as a failure
}
@dag(schedule="@hourly", catchup=False, default_args=default_args, tags=["errors"])
def error_handling():
@task
def call_external(params: dict):
status = fetch_status() # a hypothetical external call
if status == 404:
# retrying will never succeed -> fail immediately without retry
raise AirflowFailException("Target resource not found (permanent error)")
if status == 204:
# nothing to process -> skip rather than fail
raise AirflowSkipException("No data for this window")
if status >= 500:
# transient error -> leave it as a plain exception so it's retried
raise RuntimeError("Transient 5xx, let's retry")
call_external()
error_handling()The key here is to distinguish the kind of exception.
- A plain exception (
RuntimeError, etc.) → eligible for retry. Suitable for transient errors like a brief network blip. AirflowFailException→ fail immediately without retry. Use it for "things that won't work no matter how often you try" (404, invalid input). It stops you from wasting time on pointless retries.AirflowSkipException→ a skip, not a failure. It expresses "there's nothing to process this time" as part of the normal flow.
To notify a person of a failure, use a callback. In Airflow 3, SLAs were removed, and deadline-based alerting was replaced by the Deadline concept (operational alerting is in Part 10: Monitoring & Operations).
def alert_on_failure(context):
ti = context["task_instance"]
send_slack(f"Failed: {ti.dag_id}.{ti.task_id} @ {context['logical_date']}")
@task(on_failure_callback=alert_on_failure, on_retry_callback=alert_on_failure)
def risky():
...Finally, to make cleanup work "always run no matter which task fails," use trigger_rule (the full set of rules is in Part 4).
from airflow.utils.trigger_rule import TriggerRule
@task(trigger_rule=TriggerRule.ALL_DONE) # always runs whether upstream succeeded or failed
def cleanup():
...5. Calling Other DAGs
Splitting a DAG into smaller pieces improves reuse and separation of responsibility. There are three ways to connect DAGs, and you pick by situation.
1. TriggerDagRunOperator — A triggers B directly. You pass parameters with conf, and with wait_for_completion=True you can wait until B finishes. To avoid holding a worker slot while waiting, delegate to the Triggerer with deferrable=True (the resource benefits are in Part 8: Integrating External Systems).
from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator
trigger_b = TriggerDagRunOperator(
task_id="trigger_downstream",
trigger_dag_id="downstream_dag",
conf={"target_date": "{{ data_interval_start | ds }}"},
wait_for_completion=True,
deferrable=True, # release the worker slot while waiting
poke_interval=30,
reset_dag_run=True, # reset on re-triggering the same run_id (safe for rerun)
)2. ExternalTaskSensor — conversely, B waits for A (or a specific task in A) to complete. Time alignment between the two DAGs matters; if their logical_date differs, you have to align them with execution_delta or execution_date_fn. In Airflow 3, logical_date can be None for manual and Asset triggers, which can make time alignment tricky, so prefer option 3 when possible.
3. Asset-based (recommended) — when A "produces" data (an Asset), B, which "consumes" that Asset, is triggered automatically. There's no need to align timestamps and coupling is low. The core of data-aware scheduling is covered in Part 6: Scheduling & Assets.
6. DAG Options at a Glance
Here are the frequently used DAG/task options gathered in one place. Use it to jog your memory of "oh right, that exists."
| Option | Level | Meaning |
|---|---|---|
schedule | DAG | cron / @daily / timetable / Asset / None |
start_date | DAG | The starting reference point for schedule calculation |
catchup | DAG | Fill in past windows. Defaults to False in 3.x |
max_active_runs | DAG | Upper bound on concurrent runs of this DAG |
max_active_tasks | DAG | Upper bound on tasks running concurrently in this DAG |
dagrun_timeout | DAG | A single run fails if it exceeds this time |
default_args | DAG | Arguments applied commonly to all tasks (retries, etc.) |
params | DAG | Input spec (type / validation / UI form) |
tags | DAG | Tags for UI filtering |
render_template_as_native_obj | DAG | Render templates as native objects rather than strings |
depends_on_past | Task | The previous run of the same task must succeed before this run executes |
wait_for_downstream | Task | Proceed only after the downstream of the previous run finishes |
retries / retry_delay | Task | Number/interval of retries |
execution_timeout | Task | Per-task time limit |
pool / priority_weight | Task | Resource isolation / priority (Part 3) |
trigger_rule | Task | Execution condition based on upstream state |
on_failure_callback, etc. | Both | Callbacks on state transitions |
depends_on_past=Trueis powerful but dangerous. If one past run fails and gets stuck, subsequent runs stall one after another. Use it carefully only for pipelines that "depend cumulatively on previous results."
7. Rerunning Failed DAGs/Tasks
Half of operations is "rerunning the thing that broke." Airflow has several rerun tools with different characteristics.
- Clear (most common) — clearing the state of a failed task (and its downstream) makes the scheduler automatically re-queue and rerun it. Use the UI's "Clear," or the CLI
airflow tasks clear <dag_id> -t <task_regex> -s <start> -e <end>. The important point is that the rerun runs with the samelogical_date/data_interval(see section 8). - Backfill — fills in past windows that haven't run yet. In Airflow 3, backfill has shifted to a scheduler-managed model: when you request it via the UI/REST/CLI, the scheduler creates and runs the runs for that window (the concept is in Part 6). For the exact commands and options, check the official docs for the version you're running.
- Mark success/failed — sets the state by force without actually running anything. Use it carefully only in exceptional cases like "I already handled this manually, so count it as success."
When clearing a DAG with
depends_on_past=True, you also have to account for the state of the previous run. You need to unblock things in order, starting from the stuck point, so the chain of waiting releases.
8. The Date Semantics of Execution and Rerun — The Single Most Important Thing
This is where beginners get confused the most. In Airflow, the "date being processed" and the "wall-clock time at which the code actually runs" are separate.
The core concept is the data interval. If a daily DAG processes the 2026-06-01 window, that window is [2026-06-01 00:00, 2026-06-02 00:00), and the actual execution happens after the window ends (i.e., just after midnight on June 2). It's the natural behavior of batch processing — process yesterday's data after yesterday's data has fully accumulated.
The main values and macros used in code are as follows.
| Value/Macro | Meaning |
|---|---|
logical_date | The logical reference time of this run (replaces the old execution_date) |
data_interval_start / data_interval_end | The start/end of the window to process |
{{ ds }} / {{ ds_nodash }} | The date string 2026-06-01 / 20260601 |
{{ ts }} | ISO timestamp |
{{ dag_run.conf }} | The conf passed at trigger time |
@task
def load(data_interval_start, data_interval_end):
# Decide the target by the 'window', not the wall-clock now() -> same result whenever you run it
sql = f"""
DELETE FROM sales WHERE day = '{data_interval_start.date()}';
INSERT INTO sales SELECT * FROM staging
WHERE ts >= '{data_interval_start}' AND ts < '{data_interval_end}';
"""
run_sql(sql)Why does this structure matter? Rerunning with Clear runs again with the same logical_date/data_interval. That is, rerunning the June 1 batch processes the June 1 window once more. So the idempotent pattern of deleting the window first and re-inserting (overwriting), as in the example above, is essential. If your code grabs "today" with datetime.now() or only INSERTs, duplicates pile up on every rerun and you end up processing the wrong date.
One-line principle: always define what to process by
data_interval/logical_date, and make the result overwrite that window. Then the result is the same no matter how many times you rerun. This is the one and only condition that makes backfill and Clear safe.
Note that manual triggers and Asset-triggered runs have no fixed window, so logical_date can be None. If you really need time-based logic, use data_interval_* and prepare a branch for when they're absent (schedule/trigger behavior in detail is in Part 6: Scheduling & Assets).
Wrapping Up
In this part we went beyond making a DAG "run" to making it survive in operations. Choosing how to run scripts, the three parameter channels, error handling that distinguishes exceptions, connecting to other DAGs, a roundup of options, and the date semantics that make reruns safe — of these, the last one, idempotency and date semantics, is the foundation for every part that follows.
In the next part, Part 6: Scheduling & Assets, we go deep into when and based on what to run these DAGs, along two axes: time-based and data-aware (Asset). For the official docs, see Airflow DAGs and Params.