Airflow 3 ⑧ External System Integration & Synchronous Calls
Manage credentials safely with Connections and a Secrets Backend, handle retries, timeouts, and idempotency for synchronous HttpOperator calls, choose between Sensor poke vs reschedule, and wait for external work to finish with Deferrable Operators — all as practical patterns.
Pipelines don't run in isolation. They request settlements from a payment gateway, hand inference jobs to an in-house ML service, and wait for files to land in S3. Most Airflow tasks ultimately come down to "calling the outside world and waiting for the result." This post covers how to make that "call and wait" pattern safe and resource-efficient.
What you'll learn in this post
- How to manage credentials outside your code with Connections, Variables, and a Secrets Backend (Vault, AWS Secrets Manager)
- The retry, timeout, and idempotency pitfalls of synchronous calls to external REST endpoints via
HttpOperator/HttpHook- The difference between poke and reschedule modes when waiting for external state with a Sensor
- How to wait for long-running external work without occupying a worker slot, using a Deferrable Operator + Triggerer
- How to wire the "submit external job → poll status → done" pattern into a single DAG
This is Part 8 of the Airflow 3 in Practice series. The previous part, ⑦ XCom & Passing Data, covered small data exchanges between tasks; this time we send that data outside Airflow and bring results back. The next part, ⑨ REST API & Remote Schedule Changes, covers the opposite direction — calling Airflow itself from the outside.
1. What Is a "Synchronous Call"?
Let's align on terminology first. In this post, a synchronous call is a catch-all for the pattern of calling an external system and then waiting for its result or completion. Depending on intent, it splits into two branches.
- (a) Synchronous external API call — you hit a REST endpoint and get the result within that single request-response. It usually finishes within seconds. (e.g., currency lookup, settlement request)
- (b) Waiting for external work to complete — you submit a job to an external system, it doesn't finish immediately, and you poll until its status changes to done. It can take minutes to hours. (e.g., BigQuery jobs, Spark clusters, external ETL)
You solve (a) with an Operator/Hook and (b) with a Sensor or a Deferrable Operator. Either way, there's one shared premise — you must handle credentials safely. That's where we start.
2. Credentials: Connection / Variable / Secrets Backend
External calls need hosts, tokens, and keys. Airflow manages these in three ways.
| Concept | Purpose | Example |
|---|---|---|
| Connection | External system access info (host, port, login, password, extra JSON) | payment_api, s3_default |
| Variable | Runtime configuration values (constants, flags). Non-secret values | batch_size, target_env |
| Secrets Backend | Look up Connections/Variables from an external secret store | Vault, AWS Secrets Manager |
In code you reference only the Conn ID, and the actual values live outside the environment. The key is to never embed passwords in plaintext in the metadata DB.
from airflow.sdk import dag, task
from airflow.providers.http.hooks.http import HttpHook
@task
def call_external():
# Keep only the Conn ID in code; the actual host/token live in the Connection
hook = HttpHook(method="POST", http_conn_id="payment_api")
resp = hook.run(endpoint="/v1/settlements", json={"order_id": 42})
return resp.json()By default, Airflow looks up Connections/Variables in the order environment variables → Secrets Backend → metadata DB. Once you attach a Secrets Backend, Airflow asks Vault or Secrets Manager first when resolving a Conn ID.
You specify the Secrets Backend in the [secrets] section of airflow.cfg or via environment variables. Below is an AWS Secrets Manager example.
# airflow.cfg
[secrets]
backend = airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend
backend_kwargs = {"connections_prefix": "airflow/connections", "variables_prefix": "airflow/variables"}With this in place, the Conn ID payment_api is resolved from the airflow/connections/payment_api secret. Rotation and auditing are handled by the secret store, and Airflow reads the value only at the moment of use.
Never keep credentials in DAG code or as plaintext in the metadata DB. Keep only the Conn ID in code, and keep the values in the Secrets Backend.
3. Synchronous REST Calls: Retries · Timeouts · Idempotency
Now for pattern (a). Hitting an external REST endpoint once and getting a response looks simple, but it always carries three pitfalls of distributed environments.
Timeout — Prevent Infinite Waiting
HttpHook/HttpOperator accept a timeout argument. It prevents a worker from hanging forever when the external system doesn't respond. Without a timeout, one worker slot gets locked up like a zombie.
Retries — Absorb Transient Failures
Networks occasionally drop. Airflow's task-level retries/retry_delay retry transient failures. The principle is to apply them only to retryable failures like HTTP 5xx and timeouts (4xx is usually pointless to retry).
from datetime import timedelta
from airflow.providers.http.operators.http import HttpOperator
submit = HttpOperator(
task_id="submit_settlement",
http_conn_id="payment_api",
method="POST",
endpoint="/v1/settlements",
headers={"Idempotency-Key": "{{ run_id }}"}, # idempotency key
data='{"order_id": 42}',
# The three safety belts of synchronous calls
retries=3,
retry_delay=timedelta(seconds=30),
retry_exponential_backoff=True,
)Idempotency — Prevent the Cost of Retries
This is where things go wrong most often. As in the sequence diagram above, when the request arrives but only the response is lost, Airflow treats it as a failure and retries. Without idempotency, a payment happens twice.
The solution is to assign an idempotency key (Idempotency-Key) to each request and send the same key on retries. As in the example above, if you use {{ run_id }} (a value that stays fixed throughout the same task execution) as the key, the external server filters out duplicates. If the external API doesn't support idempotency keys, you need a step that checks "has this already been processed?" before the call.
Retries are safe only as a set with idempotency. If you have only one of the two, you get duplicate side effects.
4. Waiting for External State ①: Sensor poke vs reschedule
Now for pattern (b). If you need to wait "until a file lands in S3" or "until a row appears in an external DB," you use a Sensor. A Sensor periodically checks (pokes) until the condition becomes true. The question is what it occupies while waiting.
mode="poke"(default) — holds a worker slot and checks everypoke_interval. Simple to implement, but if you wait several hours, one slot stays tied up the entire time.mode="reschedule"— after checking, if the condition is false, it releases the task (returns the slot) and reschedules for the next check time. Since it frees the slot, it's better for long waits. However, it incurs the overhead of restarting the task each time, so it's unsuitable when the poke interval is short (seconds).
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
wait_file = S3KeySensor(
task_id="wait_for_input",
bucket_key="incoming/{{ ds }}/data.parquet",
bucket_name="my-bucket",
aws_conn_id="s3_default",
mode="reschedule", # long waits return the slot
poke_interval=300, # check every 5 minutes
timeout=60 * 60 * 6, # fail if it doesn't arrive within 6 hours
)Rule of thumb: use reschedule if the poke interval is on the order of minutes, and poke if you need to check frequently on the order of seconds. And in any mode, always set a timeout — to guard against the case where the external system never comes.
5. Waiting for External State ②: Deferrable Operator + Triggerer
reschedule also frees the slot, but it still has to wake the worker at the moment of checking and pays a restart cost each time. The better approach Airflow 3 recommends is a Deferrable Operator.
The core idea: when a task enters its "waiting" state, it defers itself and drops out of the worker entirely. The polling is performed by a separate component called the Triggerer using an asyncio event loop. A single Triggerer can monitor thousands of waits concurrently, using no worker slots at all. When the condition is met, the task wakes back up on a worker and continues to the next step.
Here's an at-a-glance comparison of the resource difference between a poke sensor and a deferrable.
If your waits are long and you're waiting on many external jobs at once, deferrable is overwhelmingly better. Many provider operators/sensors support a deferrable=True argument.
from airflow.providers.http.sensors.http import HttpSensor
wait_job = HttpSensor(
task_id="wait_external_job",
http_conn_id="batch_api",
endpoint="/v1/jobs/{{ ti.xcom_pull(task_ids='submit_job') }}/status",
response_check=lambda r: r.json().get("state") == "SUCCEEDED",
deferrable=True, # ← async wait via the Triggerer (returns the worker slot)
poke_interval=60,
timeout=60 * 60 * 4,
)Deferrable is not magic. It works only when a Triggerer process is running (for cluster setup, see ② Building a Cluster).
6. Putting It Together: Wiring "Submit → Poll → Done" into a Single DAG
Combining (a) and (b) gives you the most common practical pattern — submit a job externally (synchronous call), wait until it completes (deferrable wait), then harvest the result.
from airflow.sdk import dag, task
from datetime import datetime, timedelta
from airflow.providers.http.operators.http import HttpOperator
from airflow.providers.http.sensors.http import HttpSensor
@dag(schedule="@daily", start_date=datetime(2026, 7, 1), catchup=False)
def external_job_flow():
submit = HttpOperator(
task_id="submit_job",
http_conn_id="batch_api",
method="POST",
endpoint="/v1/jobs",
headers={"Idempotency-Key": "{{ run_id }}"},
data='{"date": "{{ ds }}"}',
response_filter=lambda r: r.json()["job_id"], # pass job_id via XCom
retries=3,
retry_delay=timedelta(seconds=30),
)
wait = HttpSensor(
task_id="wait_job",
http_conn_id="batch_api",
endpoint="/v1/jobs/{{ ti.xcom_pull(task_ids='submit_job') }}/status",
response_check=lambda r: r.json().get("state") == "SUCCEEDED",
deferrable=True,
poke_interval=60,
timeout=60 * 60 * 4,
)
@task
def fetch_result(job_id: str):
# Fetch the completed job's output and pass it downstream
return {"job_id": job_id, "ok": True}
submit >> wait >> fetch_result(submit.output)
external_job_flow()This single DAG contains every principle from this post: keep only the Conn ID in code, apply an idempotency key and retries to the submission, return the slot during long waits with deferrable, and set a timeout on every wait.
7. Wrap-up Checklist
Run through this list before integrating an external system.
- Keep credentials in Connection + Secrets Backend, not in code or as plaintext in the metadata DB
- Set a
timeouton synchronous calls (prevent infinite waiting) - Apply
retries/retry_delayonly to retryable failures (5xx, timeouts) - Always pair calls with side effects (payments, creation) with an idempotency key
- Use
pokefor short waits,rescheduleor deferrable for long waits - Use deferrable=True for long waits and many concurrent waits (confirm the Triggerer is running)
- Set a
timeouton every Sensor/wait to prevent permanent waiting
In the next part, ⑨ REST API & Remote Schedule Changes, we cover the opposite direction of this post — calling Airflow from the outside to trigger DAGs and change schedules.
The essence of external integration is "call and wait." What you occupy while waiting is what determines your cluster's resource efficiency.