Airflow 3 실전 연재 5편: DAG 고급 테크닉
스크립트 실행·파라미터 전달·에러 처리·다른 DAG 호출·DAG 옵션·실패 재실행·날짜 기준까지, DAG 작성 한 단계 위의 실전 기술을 정리합니다.
이 글은 "Airflow 3 실전 연재"의 5편입니다. 직전 편 4편: DAG 작성의 정석에서 DAG의 기본 골격을 익혔다면, 이번 편은 실무에서 반드시 부딪히는 한 단계 위의 기술들을 모았습니다. 스크립트를 어떻게 실행할지, 파라미터를 어떻게 넘길지, 에러를 어떻게 다룰지, 다른 DAG를 어떻게 호출할지, 실패한 실행을 어떻게 다시 돌릴지, 그리고 그 모든 것의 바탕이 되는 날짜 기준까지 차근차근 풀어 갑니다. 다음 편은 6편: 스케줄링 & Asset으로 이어집니다.
이 편의 코드는 모두 Airflow 3의 Task SDK(
airflow.sdk)와, 3.x에서apache-airflow-providers-standard로 자리를 옮긴 표준 오퍼레이터를 기준으로 합니다. 2.x에서airflow.operators.*에 있던BashOperator·PythonOperator등은 3.x에서airflow.providers.standard.operators.*로 이동했습니다.
1. 스크립트 실행 — 어디서, 어떻게 돌릴 것인가
"파이썬 함수 하나 돌리기"부터 "격리된 컨테이너에서 임의 스크립트 돌리기"까지, Airflow는 실행 방식을 여러 단계로 제공합니다. 핵심은 격리 수준과 비용의 트레이드오프입니다.
가장 흔한 세 가지를 코드로 보겠습니다.
from airflow.sdk import dag, task
from airflow.providers.standard.operators.bash import BashOperator
@dag(schedule="@daily", catchup=False, tags=["example"])
def script_examples():
# (1) 순수 파이썬 — 워커 안에서 바로 실행
@task
def transform(rows: int) -> int:
return rows * 2
# (2) 셸 명령 — 기존 스크립트/CLI를 그대로 호출
run_etl = BashOperator(
task_id="run_etl",
# data_interval_start 를 인자로 넘겨 멱등 실행 (아래 8장 참고)
bash_command="/opt/scripts/etl.sh --date {{ data_interval_start | ds }}",
)
# (3) 의존성이 충돌하는 파이썬 — 별도 venv에서 실행
@task.virtualenv(requirements=["pandas==2.1.0"], system_site_packages=False)
def heavy_pandas_job():
import pandas as pd
return pd.__version__
transform(100) >> run_etl >> heavy_pandas_job()
script_examples()셸 명령에 사용자 입력이나 파라미터를 문자열로 이어 붙일 때는 셸 인젝션을 조심하세요. 값이 외부에서 온다면 Jinja로 그대로 박지 말고, 파이썬 태스크에서 검증한 뒤 넘기거나
KubernetesPodOperator의arguments리스트처럼 인자를 분리해 전달하는 편이 안전합니다.
완전한 격리가 필요하면 KubernetesPodOperator(쿠버네티스 클러스터에서 파드 하나를 띄워 실행)나 DockerOperator를 씁니다. 임의의 이미지·언어를 쓸 수 있어 자유롭지만, 파드/컨테이너 기동 비용과 지연이 붙으므로 "가벼운 파이썬 한 줄"에 쓰는 건 과합니다.
2. DAG에서 PostgreSQL 쿼리 실행하기
데이터 파이프라인의 상당수는 결국 "DB에 쿼리를 던지는 일"로 귀결됩니다. Airflow 3에서 PostgreSQL에 쿼리를 실행하는 길은 크게 두 가지이며, 결과가 필요한가로 갈립니다.
두 방법 모두 provider 패키지가 필요합니다:
apache-airflow-providers-postgres(Hook·드라이버)와apache-airflow-providers-common-sql(SQLExecuteQueryOperator). 접속 정보는 코드에 박지 말고 Connection(conn_id)에 둡니다. 비밀번호를 안전하게 관리하는 Secrets Backend는 8편: 외부 시스템 연동 & 싱크 호출에서 다룹니다.
선언적으로 실행 — SQLExecuteQueryOperator
Airflow 2.x에서 쓰던 PostgresOperator는 3.x에서 SQLExecuteQueryOperator로 대체되었습니다(여러 DB에 공통으로 쓰는 통합 오퍼레이터). conn_id만 PostgreSQL Connection으로 지정하면 됩니다.
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
upsert_sales = SQLExecuteQueryOperator(
task_id="upsert_sales",
conn_id="postgres_default",
# %(name)s 자리표시자 + parameters 로 바인딩 (문자열 포매팅 금지 → 인젝션 방지)
sql="""
DELETE FROM sales WHERE day = %(day)s;
INSERT INTO sales (day, amount)
SELECT %(day)s, sum(amount)
FROM staging
WHERE ts >= %(start)s AND ts < %(end)s;
""",
parameters={
"day": "{{ data_interval_start | ds }}",
"start": "{{ data_interval_start }}",
"end": "{{ data_interval_end }}",
},
)여기에도 8장의 날짜 기준이 그대로 적용됩니다 — data_interval로 대상 구간을 정하고 먼저 지우고 다시 넣어(DELETE→INSERT) 몇 번을 재실행해도 결과가 같게 만듭니다.
SQL이 길어지면 별도 .sql 파일로 빼고, DAG에 template_searchpath를 지정해 파일 이름만 넘깁니다. SQL과 파이썬이 분리돼 관리가 쉬워지고, 파일 안에서도 {{ data_interval_start }} 같은 Jinja를 그대로 쓸 수 있습니다.
@dag(schedule="@daily", catchup=False, template_searchpath="/opt/airflow/sql")
def sql_file_pipeline():
SQLExecuteQueryOperator(
task_id="load",
conn_id="postgres_default",
sql="load_sales.sql", # /opt/airflow/sql/load_sales.sql (Jinja 템플릿 가능)
)
sql_file_pipeline()결과를 받아 가공 — PostgresHook
쿼리 결과를 파이썬에서 받아 검증하거나 분기하려면 PostgresHook을 태스크 안에서 씁니다.
from airflow.sdk import task
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.exceptions import AirflowFailException
@task
def assert_loaded(data_interval_start):
hook = PostgresHook(postgres_conn_id="postgres_default")
# 파라미터 바인딩으로 안전하게 조회
(count,) = hook.get_first(
"SELECT count(*) FROM sales WHERE day = %(day)s",
parameters={"day": data_interval_start.date()},
)
if count == 0:
raise AirflowFailException("적재 결과가 0건 — 업스트림 점검 필요")
return countPostgresHook은 get_records()(여러 행), get_first()(첫 행), get_pandas_df()(DataFrame), run()(실행만), 대량 적재용 copy_expert()(COPY) 같은 메서드를 제공합니다.
두 가지만 기억하세요. ① 값은 절대 f-string으로 SQL에 박지 말고
parameters로 바인딩한다(인젝션·따옴표 사고 방지). ② 큰 결과를get_pandas_df()로 통째로 메모리에 올리면 워커가 죽을 수 있으니, 대량은 DB 안에서 처리하거나COPY로 스트리밍한다.
3. 파라미터 넘기기 — Params, conf, Variable, 그리고 템플릿
DAG에 값을 주입하는 통로는 크게 세 가지입니다. 용도가 다릅니다.
| 통로 | 정의 위치 | 언제 정해지나 | 주 용도 |
|---|---|---|---|
| Params | DAG 코드(params=) | DAG 작성 시 기본값, 트리거 시 덮어쓰기 | "이 DAG의 입력 스펙"(타입·검증·UI 폼) |
| DAG run conf | 트리거할 때(conf) | 실행마다 | 일회성 실행 파라미터(수동/API 트리거) |
| Variable | Airflow 전역 저장소 | 운영 중 언제든 | 여러 DAG가 공유하는 환경값 |
Params는 DAG의 입력 스펙을 타입·검증과 함께 선언합니다. UI의 "Trigger DAG w/ config" 폼에 자동으로 폼 필드가 그려집니다.
from airflow.sdk import dag, task, Param
@dag(
schedule=None, # 수동/외부 트리거 전용
catchup=False,
params={
"target_date": Param("2026-06-01", type="string", format="date"),
"batch_size": Param(1000, type="integer", minimum=1, maximum=100000),
"dry_run": Param(False, type="boolean"),
},
tags=["params"],
)
def parametrized_pipeline():
@task
def run(params: dict):
# 태스크 함수 인자로 컨텍스트 키(params)를 그대로 받을 수 있다
if params["dry_run"]:
print(f"[DRY] {params['target_date']} / {params['batch_size']}건")
return
print(f"적재: {params['target_date']} / {params['batch_size']}건")
run()
parametrized_pipeline()트리거할 때 conf로 값을 덮어쓸 수 있습니다. CLI라면 airflow dags trigger parametrized_pipeline --conf '{"target_date":"2026-06-10","dry_run":true}', REST API라면 본문에 conf를 실어 보냅니다(자세한 원격 트리거는 9편: REST API & 원격 스케줄 변경).
템플릿이 가능한 필드(예: BashOperator.bash_command)에서는 {{ params.target_date }}, {{ dag_run.conf["target_date"] }}, {{ var.value.my_key }}처럼 Jinja로 꺼낼 수 있습니다. 파이썬 객체(dict/list)로 받고 싶으면 DAG에 render_template_as_native_obj=True를 주면 문자열이 아니라 네이티브 객체로 렌더됩니다.
함정:
Variable.get()을 DAG 파일 **최상위(top-level)**에서 호출하지 마세요. DAG 파일은 파싱 때마다 반복 실행되므로(자세한 비용은 3편: 환경설정 & 최적화) 메타DB를 끊임없이 두드립니다. 변수 조회는 태스크 실행 시점(함수 안)이나 템플릿({{ var.value.x }})에서 하세요.
4. 에러 처리 — 재시도, 콜백, 그리고 "재시도하지 말 것"
태스크가 실패하면 Airflow는 기본적으로 재시도를 시도하고, 다 쓰면 failed로 확정합니다. 그 흐름을 먼저 그림으로 봅시다.
재시도와 타임아웃은 보통 default_args로 DAG 전체에 깔고, 필요한 태스크만 개별 조정합니다.
from datetime import timedelta
from airflow.sdk import dag, task
from airflow.exceptions import AirflowFailException, AirflowSkipException
default_args = {
"retries": 3,
"retry_delay": timedelta(minutes=2),
"retry_exponential_backoff": True, # 2분 → 4분 → 8분 …
"max_retry_delay": timedelta(minutes=30),
"execution_timeout": timedelta(hours=1), # 이 시간을 넘기면 실패 처리
}
@dag(schedule="@hourly", catchup=False, default_args=default_args, tags=["errors"])
def error_handling():
@task
def call_external(params: dict):
status = fetch_status() # 가상의 외부 호출
if status == 404:
# 다시 시도해도 절대 성공 못 함 → 재시도 없이 즉시 실패
raise AirflowFailException("대상 리소스가 없음(영구 오류)")
if status == 204:
# 처리할 게 없음 → 실패가 아니라 건너뜀
raise AirflowSkipException("이번 구간 데이터 없음")
if status >= 500:
# 일시 오류 → 평범한 예외로 두면 재시도 대상
raise RuntimeError("일시적 5xx, 재시도하자")
call_external()
error_handling()여기서 핵심은 예외의 종류를 구분하는 것입니다.
- 평범한 예외(
RuntimeError등) → 재시도 대상. 네트워크 순단 같은 일시 오류에 적합. AirflowFailException→ 재시도 없이 즉시 실패. "다시 해도 안 될 일"(404, 잘못된 입력)에 씁니다. 무의미한 재시도로 시간만 버리는 걸 막습니다.AirflowSkipException→ 실패가 아니라 건너뜀. "이번엔 처리할 게 없음"을 정상 흐름으로 표현합니다.
실패를 사람에게 알리려면 콜백을 씁니다. Airflow 3에서 SLA는 제거되었고, 마감 기준 경보는 Deadline 개념으로 대체되었습니다(운영 알림은 10편: 모니터링 & 운영).
def alert_on_failure(context):
ti = context["task_instance"]
send_slack(f"실패: {ti.dag_id}.{ti.task_id} @ {context['logical_date']}")
@task(on_failure_callback=alert_on_failure, on_retry_callback=alert_on_failure)
def risky():
...마지막으로, "어떤 태스크가 실패해도 정리 작업은 반드시 돌게" 하려면 trigger_rule을 활용합니다(규칙 전체는 4편 참고).
from airflow.utils.trigger_rule import TriggerRule
@task(trigger_rule=TriggerRule.ALL_DONE) # 앞이 성공이든 실패든 항상 실행
def cleanup():
...5. 다른 DAG 호출하기
DAG를 잘게 나누면 재사용·책임 분리가 좋아집니다. DAG 간 연결 방법은 세 가지가 있고, 상황에 따라 고릅니다.
① TriggerDagRunOperator — A가 B를 직접 트리거합니다. conf로 파라미터를 넘기고, wait_for_completion=True로 B가 끝날 때까지 기다릴 수 있습니다. 기다리는 동안 워커 슬롯을 점유하지 않으려면 deferrable=True로 Triggerer에 위임하세요(자원 이점은 8편: 외부 시스템 연동).
from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator
trigger_b = TriggerDagRunOperator(
task_id="trigger_downstream",
trigger_dag_id="downstream_dag",
conf={"target_date": "{{ data_interval_start | ds }}"},
wait_for_completion=True,
deferrable=True, # 대기 동안 워커 슬롯 반납
poke_interval=30,
reset_dag_run=True, # 같은 run_id 재트리거 시 초기화(재실행 안전)
)② ExternalTaskSensor — 반대로 B가 A(또는 A의 특정 태스크)의 완료를 기다립니다. 두 DAG의 시간 정렬이 중요한데, logical_date가 다르면 execution_delta나 execution_date_fn으로 맞춰야 합니다. Airflow 3에서는 수동·Asset 트리거 시 logical_date가 None일 수 있어 시간 정렬이 까다로워질 수 있으니, 가능하면 ③을 권합니다.
③ Asset 기반(권장) — A가 데이터(Asset)를 "생산"하면, 그 Asset을 "소비"하는 B가 자동으로 트리거됩니다. 시각을 맞출 필요가 없고 결합도 낮습니다. 데이터 인지 스케줄링의 본론은 6편: 스케줄링 & Asset에서 다룹니다.
6. DAG 옵션 총정리
자주 쓰는 DAG/태스크 옵션을 한자리에 모았습니다. "이게 있었지"를 떠올리는 용도로 보세요.
| 옵션 | 수준 | 의미 |
|---|---|---|
schedule | DAG | cron/@daily/타임테이블/Asset/None |
start_date | DAG | 스케줄 계산의 시작 기준점 |
catchup | DAG | 과거 구간 메우기. 3.x 기본 False |
max_active_runs | DAG | 이 DAG의 동시 실행(run) 수 상한 |
max_active_tasks | DAG | 이 DAG에서 동시에 도는 태스크 수 상한 |
dagrun_timeout | DAG | 한 회차가 이 시간을 넘기면 실패 |
default_args | DAG | 모든 태스크에 공통 적용할 인자(retries 등) |
params | DAG | 입력 스펙(타입·검증·UI 폼) |
tags | DAG | UI 필터링용 태그 |
render_template_as_native_obj | DAG | 템플릿을 문자열이 아닌 네이티브 객체로 |
depends_on_past | 태스크 | 같은 태스크의 직전 run이 성공해야 이번 run 실행 |
wait_for_downstream | 태스크 | 직전 run의 하류까지 끝나야 진행 |
retries / retry_delay | 태스크 | 재시도 횟수/간격 |
execution_timeout | 태스크 | 태스크 단위 제한 시간 |
pool / priority_weight | 태스크 | 자원 격리·우선순위(3편) |
trigger_rule | 태스크 | 상류 상태에 따른 실행 조건 |
on_failure_callback 등 | 둘 다 | 상태 전이 시 콜백 |
depends_on_past=True는 강력하지만 위험합니다. 과거 run 하나가 실패해 막히면 이후 run이 줄줄이 멈춥니다. "이전 결과에 누적 의존하는" 파이프라인에만 신중히 쓰세요.
7. 실패한 DAG/태스크 재실행
운영의 절반은 "터진 걸 다시 돌리는 일"입니다. Airflow에는 성격이 다른 재실행 도구가 여럿 있습니다.
- Clear(가장 흔함) — 실패한 태스크(와 그 하류)의 상태를 지우면 스케줄러가 자동으로 다시 큐에 넣어 재실행합니다. UI의 "Clear", 또는 CLI
airflow tasks clear <dag_id> -t <task_regex> -s <start> -e <end>. 중요한 건 재실행이 같은logical_date/data_interval로 돈다는 점입니다(8장 참고). - Backfill — 아직 돌지 않은 과거 구간을 채웁니다. Airflow 3에서는 백필이 스케줄러 관리 방식으로 바뀌어, UI/REST/CLI로 요청하면 스케줄러가 해당 구간의 run들을 만들어 실행합니다(개념은 6편). 정확한 명령·옵션은 운영 버전의 공식 문서를 확인하세요.
- Mark success/failed — 실제로 돌리지 않고 상태만 강제로 지정합니다. "이미 수동으로 처리했으니 성공으로 친다" 같은 예외 상황에만 신중히 씁니다.
depends_on_past=True인 DAG를 Clear할 때는 직전 run의 상태도 함께 고려해야 합니다. 막힌 지점부터 순서대로 풀어 줘야 줄줄이 대기가 풀립니다.
8. 실행·재실행의 날짜 기준 — 가장 중요한 한 가지
초보자가 가장 많이 헷갈리는 지점입니다. Airflow에서 "처리 대상 날짜"와 "실제로 코드가 도는 시각(벽시계)"은 별개입니다.
핵심 개념은 **데이터 구간(data interval)**입니다. 매일 도는 DAG가 2026-06-01 구간을 처리한다면, 그 구간은 [2026-06-01 00:00, 2026-06-02 00:00)이고, 실제 실행은 구간이 끝난 뒤(즉 6월 2일 0시 직후)에 일어납니다. 어제 데이터가 다 쌓인 다음에 어제 치를 처리하는, 배치의 자연스러운 동작입니다.
코드에서 쓰는 주요 값과 매크로는 다음과 같습니다.
| 값/매크로 | 의미 |
|---|---|
logical_date | 이 run의 논리적 기준 시각(과거 execution_date를 대체) |
data_interval_start / data_interval_end | 처리 대상 구간의 시작/끝 |
{{ ds }} / {{ ds_nodash }} | 날짜 문자열 2026-06-01 / 20260601 |
{{ ts }} | ISO 타임스탬프 |
{{ dag_run.conf }} | 트리거 시 넘긴 conf |
@task
def load(data_interval_start, data_interval_end):
# 벽시계 now()가 아니라 '구간'으로 대상을 정한다 → 언제 돌려도 결과 동일
sql = f"""
DELETE FROM sales WHERE day = '{data_interval_start.date()}';
INSERT INTO sales SELECT * FROM staging
WHERE ts >= '{data_interval_start}' AND ts < '{data_interval_end}';
"""
run_sql(sql)이 구조가 왜 중요할까요? Clear로 재실행하면 같은 logical_date·data_interval로 다시 돕니다. 즉 6월 1일 치를 재실행하면 6월 1일 구간을 한 번 더 처리합니다. 그래서 위 예시처럼 먼저 해당 구간을 지우고 다시 넣는(덮어쓰는) 멱등(idempotent) 패턴이 필수입니다. 만약 코드가 datetime.now()로 "오늘"을 잡거나 INSERT만 한다면, 재실행할 때마다 중복이 쌓이고 엉뚱한 날짜를 처리하게 됩니다.
한 줄 원칙: 처리 대상은 항상
data_interval/logical_date로 정하고, 결과는 그 구간을 덮어쓰도록 만든다. 그러면 몇 번을 재실행하든 결과가 같다. 이것이 백필·Clear가 안전해지는 유일한 조건이다.
참고로 수동 트리거나 Asset 트리거 run은 정해진 구간이 없어 logical_date가 None일 수 있습니다. 시간 기반 로직이 꼭 필요하면 data_interval_*를 쓰고, 없을 때의 분기를 준비해 두세요(자세한 스케줄·트리거 동작은 6편: 스케줄링 & Asset).
마무리
이번 편에서는 DAG를 "돌아가게" 만드는 것을 넘어, 운영에서 견디게 만드는 기술들을 봤습니다. 스크립트 실행 방식 고르기, 세 가지 파라미터 통로, 예외를 구분한 에러 처리, 다른 DAG와의 연결, 옵션 총정리, 그리고 재실행을 안전하게 만드는 날짜 기준까지 — 이 중 마지막 멱등성·날짜 기준은 이후 모든 편의 바탕이 됩니다.
다음 편 6편: 스케줄링 & Asset에서는 이 DAG들을 언제, 무엇을 기준으로 돌릴지를 시간 기반과 데이터 인지(Asset) 두 축으로 깊게 다룹니다. 공식 문서는 Airflow DAGs와 Params를 참고하세요.