Airflow 3 REST API & 원격 스케줄 변경
JWT 인증부터 DAG 트리거·일시정지·완료 폴링까지, 외부 시스템에서 Airflow 3를 원격 제어하는 실전 패턴.
이 글은 Airflow 3 실전 연재의 9편입니다. 직전 편 외부 시스템 연동 & 싱크 호출에서는 Airflow 안에서 바깥 시스템을 호출하는 방향을 다뤘습니다. 이번에는 방향을 뒤집어서, 바깥 시스템이 Airflow를 원격으로 조종하는 이야기를 합니다. 다음 편은 모니터링 & 운영입니다.
왜 REST API가 다시 중요해졌나
Airflow 2 시절에는 두 종류의 REST API가 있었습니다. 하나는 불안정하고 인증도 빈약했던 /api/experimental, 다른 하나는 나중에 추가된 안정 API입니다. 사람들은 헷갈렸고, 종종 실험용 API를 운영에 써서 사고를 냈습니다.
Airflow 3에서는 이 혼란이 정리됐습니다. 구 experimental API는 완전히 제거됐고, API server(2.x의 webserver를 대체하는 컴포넌트)가 UI와 함께 버전이 명시된 안정 REST API를 제공합니다. 인증은 JWT 토큰 기반으로 통일됐습니다. 즉, "외부에서 Airflow를 제어한다"는 시나리오가 처음으로 1급 시민이 된 셈입니다.
외부 시스템이 Airflow를 호출하는 통로는 이제 단 하나, API server의 안정 REST API다.
이게 왜 실무에서 중요할까요. 데이터 파이프라인은 혼자 돌지 않습니다. 사내 포털의 "리포트 재생성" 버튼, 외부 SaaS의 webhook, 다른 오케스트레이터, 심지어 사람이 쓰는 Slack 봇까지 — 이 모두가 "특정 DAG를 지금 돌려줘"라고 말할 수 있어야 합니다. REST API가 그 입구입니다.
JWT 인증 흐름
Airflow 3의 API는 거의 모든 호출에 JWT(JSON Web Token) 를 요구합니다. 흐름은 단순합니다. 먼저 자격증명으로 토큰을 발급받고, 이후 모든 요청의 Authorization: Bearer <token> 헤더에 그 토큰을 실어 보냅니다. 토큰은 만료 시간이 있어서, 만료되면 다시 발급받습니다.
토큰을 받는 구체적인 엔드포인트 경로와 자격증명 방식은 배포에서 설정한 auth manager(기본 인증, OAuth, 외부 IdP 연동 등)에 따라 달라집니다. 그래서 여기서는 경로를 단정하지 않습니다. 실제 경로·페이로드는 운영 중인 버전의 공식 REST API 레퍼런스를 확인하세요. 핵심 원리는 어느 버전이든 동일합니다: 토큰을 먼저 받고, Bearer 헤더로 호출한다.
토큰 발급을 추상화한 클라이언트의 골격은 이렇습니다.
import requests
class AirflowClient:
def __init__(self, base_url: str, token: str):
self.base_url = base_url.rstrip("/")
# 모든 호출에 토큰을 공통 헤더로 부착
self.session = requests.Session()
self.session.headers.update({
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
})
def get(self, path: str, **kw):
r = self.session.get(f"{self.base_url}{path}", timeout=30, **kw)
r.raise_for_status()
return r.json()
def post(self, path: str, **kw):
r = self.session.post(f"{self.base_url}{path}", timeout=30, **kw)
r.raise_for_status()
return r.json()토큰은 비밀번호와 같다. 코드·로그·URL 쿼리스트링에 박지 말고, 환경변수나 시크릿 매니저에서 주입하라.
외부에서 DAG 트리거하기
가장 흔한 시나리오는 "특정 DAG를 지금 한 번 실행"입니다. REST API에서는 해당 DAG의 DAG Run을 생성(POST .../dagRuns)하는 것으로 표현됩니다. 이때 두 가지를 함께 보낼 수 있습니다.
logical_date: 이 실행이 논리적으로 "어느 시점"의 데이터를 처리하는지. Airflow 3에서는execution_date가 제거되고logical_date로 통일됐습니다. 수동/외부 트리거에서는 생략하거나 None일 수 있고, 그 경우data_interval_start/end로 다룹니다.conf: 이번 실행에만 적용할 파라미터. DAG 코드 안에서dag_run.conf또는 Params로 꺼내 씁니다.
curl로 보면 형태가 직관적입니다(경로는 일반화한 예시이며, 정확한 path는 공식 문서 기준).
# DAG 트리거 (conf로 파라미터 주입)
curl -X POST "$AIRFLOW_URL/api/v2/dags/sales_report/dagRuns" \
-H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
-d '{
"logical_date": "2026-07-02T00:00:00Z",
"conf": { "region": "APAC", "dry_run": false }
}'DAG 코드 쪽에서 conf를 받는 방법은 두 가지입니다. 빠르게는 컨텍스트의 dag_run.conf에서 직접 꺼내고, 견고하게는 Params로 스키마와 기본값을 선언해 검증까지 받는 편이 좋습니다.
from airflow.sdk import dag, task
from airflow.models.param import Param
@dag(
schedule="0 6 * * *", # 스케줄 자체는 코드로 고정
catchup=False, # 3.x 기본값. 명시해 의도를 드러낸다
params={
"region": Param("KR", type="string"),
"dry_run": Param(False, type="boolean"),
},
)
def sales_report():
@task
def run(params: dict):
# 외부에서 넘긴 conf가 params로 들어온다
region = params["region"]
if params["dry_run"]:
print(f"[dry-run] {region} 리포트 생성 생략")
return
print(f"{region} 리포트 생성")
run()
sales_report()일시정지·재개와 "무엇을 원격으로 바꿀 것인가"
DAG를 원격으로 켜고 끄는 것도 API 한 번이면 됩니다. DAG 리소스의 is_paused 속성을 PATCH로 바꾸면 일시정지(pause)/재개(unpause)가 됩니다.
# DAG 일시정지
curl -X PATCH "$AIRFLOW_URL/api/v2/dags/sales_report" \
-H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
-d '{ "is_paused": true }'여기서 한 번 멈춰서 경계를 정리해야 합니다. 원격으로 "스케줄을 바꾼다"는 말은 두 가지로 갈립니다.
| 바꾸려는 것 | 어디서 바꾸나 | 이유 |
|---|---|---|
| 실행 주기(cron, asset 트리거 등) | 코드 (schedule=...) + Git | 스케줄은 파이프라인의 정의다. GitOps로 리뷰·이력 관리해야 한다 |
| 동작 파라미터(임계값, 대상 리전, 토글) | Variable 또는 트리거 conf/Params | 자주 바뀌고, 코드 배포 없이 즉시 반영하고 싶다 |
| 지금 한 번 멈춤/재개 | API의 is_paused PATCH | 운영 상황 대응(점검, 사고)용 일시 조치 |
스케줄 자체는 코드로, 동작 파라미터는 Variable로. 이 선을 지키면 "왜 어제 다르게 돌았지?"를 코드 이력으로 추적할 수 있다.
schedule을 Variable 값에 따라 동적으로 바꾸려는 유혹이 들 수 있는데, 이는 권하지 않습니다. DAG가 파싱될 때마다 스케줄이 달라지면 추적이 불가능해지고, Airflow 3의 DAG versioning이 주는 "이 실행은 어떤 버전으로 돌았나"라는 이점을 스스로 버리게 됩니다. 대신 cron 문자열은 코드에 고정하고, 그 안에서 "이번에 무엇을 할지"만 Variable/Params로 조정하세요.
Variable은 REST API로도 읽고 쓸 수 있어, 외부 컨트롤 플레인에서 토글을 관리하기 좋습니다. 다만 자주 읽히는 Variable을 DAG 최상단(파싱 시점)에서 가져오면 DAG processor에 부하를 주므로, 가능하면 태스크 실행 시점에 읽습니다.
동기 트리거 + 완료까지 폴링
외부 시스템 입장에서 흔한 요구는 "DAG를 돌리고, 끝날 때까지 기다렸다가 성공/실패를 받아오고 싶다"입니다. REST API는 본질적으로 비동기라(트리거하면 즉시 run id를 돌려줌), 동기처럼 쓰려면 트리거 → DAG Run 상태 폴링 → 종료 상태 확인의 패턴을 클라이언트가 구현해야 합니다.
폴링 클라이언트를 앞서 만든 AirflowClient 위에 얹어 보겠습니다. 핵심은 (1) 종료 상태(success/failed)를 명확히 구분하고, (2) 타임아웃을 둬서 무한 대기를 막고, (3) 폴링 간격에 약간의 backoff를 주는 것입니다.
import time
import uuid
TERMINAL = {"success", "failed"}
def trigger_and_wait(client, dag_id, conf=None,
poll_interval=5, timeout=1800):
# 1) 트리거 — run_id를 직접 지정하면 추적이 쉽다
run_id = f"ext__{uuid.uuid4().hex[:12]}"
client.post(
f"/api/v2/dags/{dag_id}/dagRuns",
json={"dag_run_id": run_id, "conf": conf or {}},
)
# 2) 종료 상태까지 폴링
deadline = time.time() + timeout
while time.time() < deadline:
run = client.get(f"/api/v2/dags/{dag_id}/dagRuns/{run_id}")
state = run.get("state")
if state in TERMINAL:
if state == "failed":
raise RuntimeError(f"DAG run {run_id} 실패")
return run # success
time.sleep(poll_interval)
raise TimeoutError(f"DAG run {run_id} 가 {timeout}s 안에 끝나지 않음")이 패턴을 쓸 때 현실적인 주의점이 있습니다. 폴링 간격을 너무 짧게(1초 미만) 잡으면 API server에 불필요한 부하를 주고, 길게 잡으면 응답이 늦어집니다. 보통 수 초~수십 초가 무난합니다. 그리고 호출하는 쪽에 timeout과 실패 처리(알림·재시도)를 반드시 넣으세요. 어떤 DAG는 몇 시간씩 돌 수 있고, 그동안 클라이언트가 멈춰 있으면 그쪽 시스템이 먼저 죽습니다. 더 긴 워크플로라면 폴링 대신 DAG 마지막 태스크가 외부로 webhook을 쏘는 방식(8편의 콜백 패턴)이 더 깔끔합니다.
보안: 토큰과 권한을 가볍게 보지 말 것
REST API를 열었다는 건 "코드 배포 없이 파이프라인을 움직일 수 있는 문"을 만들었다는 뜻입니다. 그 문은 반드시 잠가야 합니다.
- 토큰 관리: JWT는 만료가 있는 단기 토큰으로 쓰고, 시크릿 매니저에서 주입합니다. 절대 Git·로그·URL에 남기지 마세요. 외부 시스템마다 별도 계정/토큰을 발급해 누가 무엇을 했는지 감사 로그로 구분합니다.
- RBAC 최소권한: Airflow는 역할 기반 접근제어를 제공합니다. "리포트 재생성 버튼"용 계정에 관리자 권한을 주지 말고, 해당 DAG를 트리거할 수 있는 최소 권한만 부여하세요. 트리거만 필요한 클라이언트에 Variable 쓰기나 Connection 조회 권한까지 주면, 그 토큰이 새는 순간 전체가 노출됩니다.
- 전송 구간 보호: API server 앞에 TLS를 두고(HTTPS), 가능하면 사내망/VPN/IP 허용 목록으로 한 번 더 좁힙니다.
정리
Airflow 3의 REST API는 더 이상 곁다리 기능이 아니라, 외부 세계와 파이프라인을 잇는 정식 관문입니다. 정리하면 이렇습니다.
- 구 experimental API는 사라졌고, API server의 안정 REST API + JWT 하나로 통일됐다.
- 트리거는 DAG Run 생성, 파라미터는
conf/Params, 켜고 끄기는is_pausedPATCH. - 스케줄은 코드로, 동작 파라미터는 Variable로 — 이 경계가 추적성과 GitOps를 지킨다.
- 동기 실행이 필요하면 트리거 후 상태 폴링(타임아웃·실패 처리 필수), 긴 작업은 webhook이 낫다.
- 토큰·RBAC·TLS는 선택이 아니라 기본이다.
정확한 엔드포인트 경로와 페이로드 스키마는 운영 중인 버전의 공식 REST API 레퍼런스에서 확인하세요. 다음 편 모니터링 & 운영에서는 이렇게 돌린 실행들을 어떻게 관찰하고 알람을 걸지 다룹니다.