Blog
airflowairflow3dagerror-handlingbackfill

Airflow 3 실전 연재 5편: DAG 고급 테크닉

스크립트 실행·파라미터 전달·에러 처리·다른 DAG 호출·DAG 옵션·실패 재실행·날짜 기준까지, DAG 작성 한 단계 위의 실전 기술을 정리합니다.

Data Dynamics2026年6月29日25 min read
This post is not yet translated. The original Korean version is shown below.

이 글은 "Airflow 3 실전 연재"의 5편입니다. 직전 편 4편: DAG 작성의 정석에서 DAG의 기본 골격을 익혔다면, 이번 편은 실무에서 반드시 부딪히는 한 단계 위의 기술들을 모았습니다. 스크립트를 어떻게 실행할지, 파라미터를 어떻게 넘길지, 에러를 어떻게 다룰지, 다른 DAG를 어떻게 호출할지, 실패한 실행을 어떻게 다시 돌릴지, 그리고 그 모든 것의 바탕이 되는 날짜 기준까지 차근차근 풀어 갑니다. 다음 편은 6편: 스케줄링 & Asset으로 이어집니다.

이 편의 코드는 모두 Airflow 3의 Task SDK(airflow.sdk)와, 3.x에서 apache-airflow-providers-standard로 자리를 옮긴 표준 오퍼레이터를 기준으로 합니다. 2.x에서 airflow.operators.*에 있던 BashOperator·PythonOperator 등은 3.x에서 airflow.providers.standard.operators.*로 이동했습니다.

1. 스크립트 실행 — 어디서, 어떻게 돌릴 것인가

"파이썬 함수 하나 돌리기"부터 "격리된 컨테이너에서 임의 스크립트 돌리기"까지, Airflow는 실행 방식을 여러 단계로 제공합니다. 핵심은 격리 수준과 비용의 트레이드오프입니다.

Loading diagram…

가장 흔한 세 가지를 코드로 보겠습니다.

from airflow.sdk import dag, task
from airflow.providers.standard.operators.bash import BashOperator
 
 
@dag(schedule="@daily", catchup=False, tags=["example"])
def script_examples():
 
    # (1) 순수 파이썬 — 워커 안에서 바로 실행
    @task
    def transform(rows: int) -> int:
        return rows * 2
 
    # (2) 셸 명령 — 기존 스크립트/CLI를 그대로 호출
    run_etl = BashOperator(
        task_id="run_etl",
        # data_interval_start 를 인자로 넘겨 멱등 실행 (아래 8장 참고)
        bash_command="/opt/scripts/etl.sh --date {{ data_interval_start | ds }}",
    )
 
    # (3) 의존성이 충돌하는 파이썬 — 별도 venv에서 실행
    @task.virtualenv(requirements=["pandas==2.1.0"], system_site_packages=False)
    def heavy_pandas_job():
        import pandas as pd
        return pd.__version__
 
    transform(100) >> run_etl >> heavy_pandas_job()
 
 
script_examples()

셸 명령에 사용자 입력이나 파라미터를 문자열로 이어 붙일 때는 셸 인젝션을 조심하세요. 값이 외부에서 온다면 Jinja로 그대로 박지 말고, 파이썬 태스크에서 검증한 뒤 넘기거나 KubernetesPodOperatorarguments 리스트처럼 인자를 분리해 전달하는 편이 안전합니다.

완전한 격리가 필요하면 KubernetesPodOperator(쿠버네티스 클러스터에서 파드 하나를 띄워 실행)나 DockerOperator를 씁니다. 임의의 이미지·언어를 쓸 수 있어 자유롭지만, 파드/컨테이너 기동 비용과 지연이 붙으므로 "가벼운 파이썬 한 줄"에 쓰는 건 과합니다.

2. DAG에서 PostgreSQL 쿼리 실행하기

데이터 파이프라인의 상당수는 결국 "DB에 쿼리를 던지는 일"로 귀결됩니다. Airflow 3에서 PostgreSQL에 쿼리를 실행하는 길은 크게 두 가지이며, 결과가 필요한가로 갈립니다.

Loading diagram…

두 방법 모두 provider 패키지가 필요합니다: apache-airflow-providers-postgres(Hook·드라이버)와 apache-airflow-providers-common-sql(SQLExecuteQueryOperator). 접속 정보는 코드에 박지 말고 Connection(conn_id)에 둡니다. 비밀번호를 안전하게 관리하는 Secrets Backend는 8편: 외부 시스템 연동 & 싱크 호출에서 다룹니다.

선언적으로 실행 — SQLExecuteQueryOperator

Airflow 2.x에서 쓰던 PostgresOperator는 3.x에서 SQLExecuteQueryOperator로 대체되었습니다(여러 DB에 공통으로 쓰는 통합 오퍼레이터). conn_id만 PostgreSQL Connection으로 지정하면 됩니다.

from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
 
upsert_sales = SQLExecuteQueryOperator(
    task_id="upsert_sales",
    conn_id="postgres_default",
    # %(name)s 자리표시자 + parameters 로 바인딩 (문자열 포매팅 금지 → 인젝션 방지)
    sql="""
        DELETE FROM sales WHERE day = %(day)s;
        INSERT INTO sales (day, amount)
        SELECT %(day)s, sum(amount)
        FROM staging
        WHERE ts >= %(start)s AND ts < %(end)s;
    """,
    parameters={
        "day": "{{ data_interval_start | ds }}",
        "start": "{{ data_interval_start }}",
        "end": "{{ data_interval_end }}",
    },
)

여기에도 8장의 날짜 기준이 그대로 적용됩니다 — data_interval로 대상 구간을 정하고 먼저 지우고 다시 넣어(DELETE→INSERT) 몇 번을 재실행해도 결과가 같게 만듭니다.

SQL이 길어지면 별도 .sql 파일로 빼고, DAG에 template_searchpath를 지정해 파일 이름만 넘깁니다. SQL과 파이썬이 분리돼 관리가 쉬워지고, 파일 안에서도 {{ data_interval_start }} 같은 Jinja를 그대로 쓸 수 있습니다.

@dag(schedule="@daily", catchup=False, template_searchpath="/opt/airflow/sql")
def sql_file_pipeline():
    SQLExecuteQueryOperator(
        task_id="load",
        conn_id="postgres_default",
        sql="load_sales.sql",   # /opt/airflow/sql/load_sales.sql (Jinja 템플릿 가능)
    )
 
 
sql_file_pipeline()

결과를 받아 가공 — PostgresHook

쿼리 결과를 파이썬에서 받아 검증하거나 분기하려면 PostgresHook을 태스크 안에서 씁니다.

from airflow.sdk import task
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.exceptions import AirflowFailException
 
 
@task
def assert_loaded(data_interval_start):
    hook = PostgresHook(postgres_conn_id="postgres_default")
    # 파라미터 바인딩으로 안전하게 조회
    (count,) = hook.get_first(
        "SELECT count(*) FROM sales WHERE day = %(day)s",
        parameters={"day": data_interval_start.date()},
    )
    if count == 0:
        raise AirflowFailException("적재 결과가 0건 — 업스트림 점검 필요")
    return count

PostgresHookget_records()(여러 행), get_first()(첫 행), get_pandas_df()(DataFrame), run()(실행만), 대량 적재용 copy_expert()(COPY) 같은 메서드를 제공합니다.

두 가지만 기억하세요. ① 값은 절대 f-string으로 SQL에 박지 말고 parameters로 바인딩한다(인젝션·따옴표 사고 방지). ② 큰 결과를 get_pandas_df()로 통째로 메모리에 올리면 워커가 죽을 수 있으니, 대량은 DB 안에서 처리하거나 COPY로 스트리밍한다.

3. 파라미터 넘기기 — Params, conf, Variable, 그리고 템플릿

DAG에 값을 주입하는 통로는 크게 세 가지입니다. 용도가 다릅니다.

통로정의 위치언제 정해지나주 용도
ParamsDAG 코드(params=)DAG 작성 시 기본값, 트리거 시 덮어쓰기"이 DAG의 입력 스펙"(타입·검증·UI 폼)
DAG run conf트리거할 때(conf)실행마다일회성 실행 파라미터(수동/API 트리거)
VariableAirflow 전역 저장소운영 중 언제든여러 DAG가 공유하는 환경값
Loading diagram…

Params는 DAG의 입력 스펙을 타입·검증과 함께 선언합니다. UI의 "Trigger DAG w/ config" 폼에 자동으로 폼 필드가 그려집니다.

from airflow.sdk import dag, task, Param
 
 
@dag(
    schedule=None,            # 수동/외부 트리거 전용
    catchup=False,
    params={
        "target_date": Param("2026-06-01", type="string", format="date"),
        "batch_size": Param(1000, type="integer", minimum=1, maximum=100000),
        "dry_run": Param(False, type="boolean"),
    },
    tags=["params"],
)
def parametrized_pipeline():
 
    @task
    def run(params: dict):
        # 태스크 함수 인자로 컨텍스트 키(params)를 그대로 받을 수 있다
        if params["dry_run"]:
            print(f"[DRY] {params['target_date']} / {params['batch_size']}건")
            return
        print(f"적재: {params['target_date']} / {params['batch_size']}건")
 
    run()
 
 
parametrized_pipeline()

트리거할 때 conf로 값을 덮어쓸 수 있습니다. CLI라면 airflow dags trigger parametrized_pipeline --conf '{"target_date":"2026-06-10","dry_run":true}', REST API라면 본문에 conf를 실어 보냅니다(자세한 원격 트리거는 9편: REST API & 원격 스케줄 변경).

템플릿이 가능한 필드(예: BashOperator.bash_command)에서는 {{ params.target_date }}, {{ dag_run.conf["target_date"] }}, {{ var.value.my_key }}처럼 Jinja로 꺼낼 수 있습니다. 파이썬 객체(dict/list)로 받고 싶으면 DAG에 render_template_as_native_obj=True를 주면 문자열이 아니라 네이티브 객체로 렌더됩니다.

함정: Variable.get()을 DAG 파일 **최상위(top-level)**에서 호출하지 마세요. DAG 파일은 파싱 때마다 반복 실행되므로(자세한 비용은 3편: 환경설정 & 최적화) 메타DB를 끊임없이 두드립니다. 변수 조회는 태스크 실행 시점(함수 안)이나 템플릿({{ var.value.x }})에서 하세요.

4. 에러 처리 — 재시도, 콜백, 그리고 "재시도하지 말 것"

태스크가 실패하면 Airflow는 기본적으로 재시도를 시도하고, 다 쓰면 failed로 확정합니다. 그 흐름을 먼저 그림으로 봅시다.

Loading diagram…

재시도와 타임아웃은 보통 default_args로 DAG 전체에 깔고, 필요한 태스크만 개별 조정합니다.

from datetime import timedelta
from airflow.sdk import dag, task
from airflow.exceptions import AirflowFailException, AirflowSkipException
 
default_args = {
    "retries": 3,
    "retry_delay": timedelta(minutes=2),
    "retry_exponential_backoff": True,        # 2분 → 4분 → 8분 …
    "max_retry_delay": timedelta(minutes=30),
    "execution_timeout": timedelta(hours=1),  # 이 시간을 넘기면 실패 처리
}
 
 
@dag(schedule="@hourly", catchup=False, default_args=default_args, tags=["errors"])
def error_handling():
 
    @task
    def call_external(params: dict):
        status = fetch_status()  # 가상의 외부 호출
        if status == 404:
            # 다시 시도해도 절대 성공 못 함 → 재시도 없이 즉시 실패
            raise AirflowFailException("대상 리소스가 없음(영구 오류)")
        if status == 204:
            # 처리할 게 없음 → 실패가 아니라 건너뜀
            raise AirflowSkipException("이번 구간 데이터 없음")
        if status >= 500:
            # 일시 오류 → 평범한 예외로 두면 재시도 대상
            raise RuntimeError("일시적 5xx, 재시도하자")
 
    call_external()
 
 
error_handling()

여기서 핵심은 예외의 종류를 구분하는 것입니다.

  • 평범한 예외(RuntimeError 등) → 재시도 대상. 네트워크 순단 같은 일시 오류에 적합.
  • AirflowFailException재시도 없이 즉시 실패. "다시 해도 안 될 일"(404, 잘못된 입력)에 씁니다. 무의미한 재시도로 시간만 버리는 걸 막습니다.
  • AirflowSkipException → 실패가 아니라 건너뜀. "이번엔 처리할 게 없음"을 정상 흐름으로 표현합니다.

실패를 사람에게 알리려면 콜백을 씁니다. Airflow 3에서 SLA는 제거되었고, 마감 기준 경보는 Deadline 개념으로 대체되었습니다(운영 알림은 10편: 모니터링 & 운영).

def alert_on_failure(context):
    ti = context["task_instance"]
    send_slack(f"실패: {ti.dag_id}.{ti.task_id} @ {context['logical_date']}")
 
@task(on_failure_callback=alert_on_failure, on_retry_callback=alert_on_failure)
def risky():
    ...

마지막으로, "어떤 태스크가 실패해도 정리 작업은 반드시 돌게" 하려면 trigger_rule을 활용합니다(규칙 전체는 4편 참고).

from airflow.utils.trigger_rule import TriggerRule
 
@task(trigger_rule=TriggerRule.ALL_DONE)   # 앞이 성공이든 실패든 항상 실행
def cleanup():
    ...

5. 다른 DAG 호출하기

DAG를 잘게 나누면 재사용·책임 분리가 좋아집니다. DAG 간 연결 방법은 세 가지가 있고, 상황에 따라 고릅니다.

Loading diagram…

① TriggerDagRunOperator — A가 B를 직접 트리거합니다. conf로 파라미터를 넘기고, wait_for_completion=True로 B가 끝날 때까지 기다릴 수 있습니다. 기다리는 동안 워커 슬롯을 점유하지 않으려면 deferrable=True로 Triggerer에 위임하세요(자원 이점은 8편: 외부 시스템 연동).

from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator
 
trigger_b = TriggerDagRunOperator(
    task_id="trigger_downstream",
    trigger_dag_id="downstream_dag",
    conf={"target_date": "{{ data_interval_start | ds }}"},
    wait_for_completion=True,
    deferrable=True,            # 대기 동안 워커 슬롯 반납
    poke_interval=30,
    reset_dag_run=True,         # 같은 run_id 재트리거 시 초기화(재실행 안전)
)

② ExternalTaskSensor — 반대로 B가 A(또는 A의 특정 태스크)의 완료를 기다립니다. 두 DAG의 시간 정렬이 중요한데, logical_date가 다르면 execution_deltaexecution_date_fn으로 맞춰야 합니다. Airflow 3에서는 수동·Asset 트리거 시 logical_date가 None일 수 있어 시간 정렬이 까다로워질 수 있으니, 가능하면 ③을 권합니다.

③ Asset 기반(권장) — A가 데이터(Asset)를 "생산"하면, 그 Asset을 "소비"하는 B가 자동으로 트리거됩니다. 시각을 맞출 필요가 없고 결합도 낮습니다. 데이터 인지 스케줄링의 본론은 6편: 스케줄링 & Asset에서 다룹니다.

6. DAG 옵션 총정리

자주 쓰는 DAG/태스크 옵션을 한자리에 모았습니다. "이게 있었지"를 떠올리는 용도로 보세요.

옵션수준의미
scheduleDAGcron/@daily/타임테이블/Asset/None
start_dateDAG스케줄 계산의 시작 기준점
catchupDAG과거 구간 메우기. 3.x 기본 False
max_active_runsDAG이 DAG의 동시 실행(run) 수 상한
max_active_tasksDAG이 DAG에서 동시에 도는 태스크 수 상한
dagrun_timeoutDAG한 회차가 이 시간을 넘기면 실패
default_argsDAG모든 태스크에 공통 적용할 인자(retries 등)
paramsDAG입력 스펙(타입·검증·UI 폼)
tagsDAGUI 필터링용 태그
render_template_as_native_objDAG템플릿을 문자열이 아닌 네이티브 객체로
depends_on_past태스크같은 태스크의 직전 run이 성공해야 이번 run 실행
wait_for_downstream태스크직전 run의 하류까지 끝나야 진행
retries / retry_delay태스크재시도 횟수/간격
execution_timeout태스크태스크 단위 제한 시간
pool / priority_weight태스크자원 격리·우선순위(3편)
trigger_rule태스크상류 상태에 따른 실행 조건
on_failure_callback둘 다상태 전이 시 콜백

depends_on_past=True는 강력하지만 위험합니다. 과거 run 하나가 실패해 막히면 이후 run이 줄줄이 멈춥니다. "이전 결과에 누적 의존하는" 파이프라인에만 신중히 쓰세요.

7. 실패한 DAG/태스크 재실행

운영의 절반은 "터진 걸 다시 돌리는 일"입니다. Airflow에는 성격이 다른 재실행 도구가 여럿 있습니다.

Loading diagram…
  • Clear(가장 흔함) — 실패한 태스크(와 그 하류)의 상태를 지우면 스케줄러가 자동으로 다시 큐에 넣어 재실행합니다. UI의 "Clear", 또는 CLI airflow tasks clear <dag_id> -t <task_regex> -s <start> -e <end>. 중요한 건 재실행이 같은 logical_date/data_interval로 돈다는 점입니다(8장 참고).
  • Backfill — 아직 돌지 않은 과거 구간을 채웁니다. Airflow 3에서는 백필이 스케줄러 관리 방식으로 바뀌어, UI/REST/CLI로 요청하면 스케줄러가 해당 구간의 run들을 만들어 실행합니다(개념은 6편). 정확한 명령·옵션은 운영 버전의 공식 문서를 확인하세요.
  • Mark success/failed — 실제로 돌리지 않고 상태만 강제로 지정합니다. "이미 수동으로 처리했으니 성공으로 친다" 같은 예외 상황에만 신중히 씁니다.

depends_on_past=True인 DAG를 Clear할 때는 직전 run의 상태도 함께 고려해야 합니다. 막힌 지점부터 순서대로 풀어 줘야 줄줄이 대기가 풀립니다.

8. 실행·재실행의 날짜 기준 — 가장 중요한 한 가지

초보자가 가장 많이 헷갈리는 지점입니다. Airflow에서 "처리 대상 날짜"와 "실제로 코드가 도는 시각(벽시계)"은 별개입니다.

핵심 개념은 **데이터 구간(data interval)**입니다. 매일 도는 DAG가 2026-06-01 구간을 처리한다면, 그 구간은 [2026-06-01 00:00, 2026-06-02 00:00)이고, 실제 실행은 구간이 끝난 뒤(즉 6월 2일 0시 직후)에 일어납니다. 어제 데이터가 다 쌓인 다음에 어제 치를 처리하는, 배치의 자연스러운 동작입니다.

Loading diagram…

코드에서 쓰는 주요 값과 매크로는 다음과 같습니다.

값/매크로의미
logical_date이 run의 논리적 기준 시각(과거 execution_date를 대체)
data_interval_start / data_interval_end처리 대상 구간의 시작/끝
{{ ds }} / {{ ds_nodash }}날짜 문자열 2026-06-01 / 20260601
{{ ts }}ISO 타임스탬프
{{ dag_run.conf }}트리거 시 넘긴 conf
@task
def load(data_interval_start, data_interval_end):
    # 벽시계 now()가 아니라 '구간'으로 대상을 정한다 → 언제 돌려도 결과 동일
    sql = f"""
        DELETE FROM sales WHERE day = '{data_interval_start.date()}';
        INSERT INTO sales SELECT * FROM staging
        WHERE ts >= '{data_interval_start}' AND ts < '{data_interval_end}';
    """
    run_sql(sql)

이 구조가 왜 중요할까요? Clear로 재실행하면 같은 logical_date·data_interval로 다시 돕니다. 즉 6월 1일 치를 재실행하면 6월 1일 구간을 한 번 더 처리합니다. 그래서 위 예시처럼 먼저 해당 구간을 지우고 다시 넣는(덮어쓰는) 멱등(idempotent) 패턴이 필수입니다. 만약 코드가 datetime.now()로 "오늘"을 잡거나 INSERT만 한다면, 재실행할 때마다 중복이 쌓이고 엉뚱한 날짜를 처리하게 됩니다.

한 줄 원칙: 처리 대상은 항상 data_interval/logical_date로 정하고, 결과는 그 구간을 덮어쓰도록 만든다. 그러면 몇 번을 재실행하든 결과가 같다. 이것이 백필·Clear가 안전해지는 유일한 조건이다.

참고로 수동 트리거나 Asset 트리거 run은 정해진 구간이 없어 logical_dateNone일 수 있습니다. 시간 기반 로직이 꼭 필요하면 data_interval_*를 쓰고, 없을 때의 분기를 준비해 두세요(자세한 스케줄·트리거 동작은 6편: 스케줄링 & Asset).

마무리

이번 편에서는 DAG를 "돌아가게" 만드는 것을 넘어, 운영에서 견디게 만드는 기술들을 봤습니다. 스크립트 실행 방식 고르기, 세 가지 파라미터 통로, 예외를 구분한 에러 처리, 다른 DAG와의 연결, 옵션 총정리, 그리고 재실행을 안전하게 만드는 날짜 기준까지 — 이 중 마지막 멱등성·날짜 기준은 이후 모든 편의 바탕이 됩니다.

다음 편 6편: 스케줄링 & Asset에서는 이 DAG들을 언제, 무엇을 기준으로 돌릴지를 시간 기반과 데이터 인지(Asset) 두 축으로 깊게 다룹니다. 공식 문서는 Airflow DAGsParams를 참고하세요.