PySpark Multi-Job Orchestration — Dependencies, Cache Reuse, and Retries
How to reliably operate pipelines made of multiple Spark jobs. We cover Spark actions and job boundaries, reusing shared results with cache/persist, dividing responsibilities with external orchestrators like Airflow, and designing for idempotency, checkpoints, and partial retries.
Real-world data pipelines never end with a single job. You clean the source, join against multiple dimensions, aggregate, and fan out to several marts. How you wire these stages (jobs) together determines the pipeline's reliability and cost. Wire them poorly and you end up recomputing the same data over and over, or restarting the entire pipeline from scratch when a single stage fails.
This post covers understanding job boundaries inside Spark, reusing shared results with cache, dividing responsibilities with an external orchestrator (Airflow and the like), and designing for idempotency and partial retries.
1. First — What Is a "Job" in Spark?
Let's get the terminology straight. Inside Spark, a Job is the unit of execution triggered by a single action.
Application (one spark-submit)
└─ Job (one per action: count, write, collect, ...)
└─ Stage (split at shuffle boundaries)
└─ Task (parallel per partition)df2 = df.filter(...).groupBy(...).agg(...) # transformation — not a job (lazy)
df2.write.parquet("a") # action → Job 1
df2.count() # action → Job 2 (recomputes df2!)The key trap: every action re-executes the entire transformation chain. If you write
df2and then count it, df2 gets computed twice. This is the answer to "why is it reading the same thing twice?"
2. cache / persist — Reusing Shared Results
If multiple actions use the same DataFrame, cache it to avoid recomputation.
from pyspark import StorageLevel
shared = df.filter(...).join(dim, "key") # intermediate result used in multiple places
shared.persist(StorageLevel.MEMORY_AND_DISK)
shared.write.parquet("out_a") # Job 1 (computes + caches here)
shared.groupBy("k").count().write... # Job 2 (reuses cache — no recomputation)
shared.filter("x>0").write... # Job 3 (reuses cache)
shared.unpersist() # release immediately once done| Principle | Why |
|---|---|
| Cache only what's used 2+ times | Caching a single-use result is pure waste |
MEMORY_AND_DISK | Spills to disk when memory runs short (safe) |
unpersist when done | Frees the Storage pool (prevents OOM) |
(For the problem of caching starving execution memory, see the separate post "Conquering PySpark Executor OOM".)
3. Checkpoints — Cutting Long Lineage
Cache keeps the lineage, but in iterative or complex pipelines the lineage grows so long that the planner slows down and recomputation after a failure gets expensive. checkpoint saves the result to disk and truncates the lineage.
spark.sparkContext.setCheckpointDir("s3://bucket/checkpoints")
# Cut lineage after an iterative/complex step
result = complex_iterative_step(df)
result = result.checkpoint() # save to disk + lineage truncate| cache | checkpoint | |
|---|---|---|
| Storage | Memory/disk | Reliable storage |
| Lineage | Kept | Truncated |
| Use case | Reuse | Cutting long lineage / iterations |
For iterative algorithms (graphs, hierarchies), checkpoint is essential (see the separate posts "PySpark GraphFrames" and "Recursive and Hierarchical Data").
4. Multiple Jobs in One Application vs. Multiple Applications
| Approach | Pros | Cons |
|---|---|---|
| One Application (multiple actions) | Shared cache, fast stage transitions | One stage failure affects all, long resource hold |
| Multiple Applications (one submit per stage) | Stage isolation, independent retries | Inter-stage data must go through storage |
Decision criteria:
Tight coupling, heavy intermediate-result reuse → one Application
Independent stages, separate retry units → multiple Applications (connected via storage)In most cases, putting table boundaries (Iceberg/Delta) between stages and making each stage an independent job works better operationally (retries, monitoring).
5. External Orchestration — Dividing Responsibilities with Airflow and Friends
Spark optimizes "execution within a single job", but dependencies, scheduling, and retries across jobs belong to an external orchestrator (Airflow, Dagster, Argo, etc.). Don't mix the roles.
Orchestrator (Airflow) Spark
- DAG dependencies (A→B→C) - Distributed execution within a job
- Schedule (daily at 2 AM) - Shuffle / join / aggregation optimization
- Retries, alerts, SLAs - Partitions, memory
- Backfill triggers - (inside a stage)# Airflow concept: each Spark job is a task, dependencies form a DAG
# bronze >> silver >> gold (each a SparkSubmitOperator)Anti-pattern: stuffing every stage into one giant Spark Application and controlling the flow with try/except. Per-stage retries and observability become painful. Leave dependencies and retries to the orchestrator and let Spark focus on executing each stage.
6. Idempotency — The Prerequisite for Retries
For an orchestrator to safely retry a failed stage, each stage must be idempotent (rerunning produces the same result). If you load with append, a retry creates duplicates.
# ❌ append: duplicates on retry
result.write.mode("append").save(table)
# ✅ idempotent: partition replacement or MERGE
(result.write.format("delta").mode("overwrite")
.option("replaceWhere", f"dt = '{run_date}'").save(table))(For idempotent backfills and partition replacement, see the separate post "PySpark Dynamic Partition Overwrite and Idempotent Backfills"; for MERGE, see "Large-Scale Deduplication and SCD Type 2".) If every stage is idempotent, rerunning only the failed stage is safe.
7. Partial Retries — Table Boundaries Between Stages
The core design principle for long pipelines is to resume "from the point of failure, not from the beginning". Persisting stage boundaries as tables makes this possible.
Bronze ──persist──> Silver ──persist──> Gold
Each stage's output is persisted as a table → if Silver fails, don't redo Bronze, rerun only Silver# Each stage reads its input from a table and writes its output to a table
def silver_step(run_date):
bronze = spark.read.table("bronze.events").where(f"dt='{run_date}'")
cleaned = transform(bronze)
write_idempotent("silver.events", cleaned, run_date)If you hold intermediate results only in memory and process everything in a single job, a failure near the end forces a full recomputation. Put a persistent boundary between expensive stages.
8. Concurrency and Resources — Multiple Jobs on One Cluster
When multiple jobs share a cluster, resource contention follows.
| Concern | Approach |
|---|---|
| Stage parallelism | Orchestrator runs independent stages in parallel |
| Resource isolation | Dynamic allocation, (YARN) queues / (K8s) namespaces & quotas |
| Scheduler | FAIR scheduler for concurrent jobs within Spark |
| Cost | Spot instances + FTE for heavy batch workloads |
# FAIR is recommended when running multiple actions in parallel via threads in one Application
spark.conf.set("spark.scheduler.mode", "FAIR")9. Summary
| Area | Key takeaway |
|---|---|
| Job boundaries | Every action recomputes — cache shared results |
| cache/checkpoint | cache for reuse, checkpoint for long lineage |
| Division of roles | Dependencies & retries to the orchestrator, execution to Spark |
| Idempotency | The prerequisite for retries — overwrite/MERGE |
| Partial retries | Persist stage boundaries as tables |
| Resources | Dynamic allocation, isolation, FAIR scheduler |
Multi-job pipelines come down to two separations. First, understand Spark's internal job boundaries and tame per-action recomputation with caching and checkpoints. Second, delegate inter-stage dependencies and retries to the orchestrator and let Spark focus on executing each stage. Add idempotent stages and persistent table boundaries on top — and when one stage fails, you rerun just that stage, giving you a robust, cost-efficient pipeline.
This article is based on Spark 3.5. If you need help designing multi-stage data pipelines or orchestration, feel free to reach out.
— Data Dynamics Engineering Team