A Deep Dive into the Trino Cost-Based Optimizer — Statistics, Join Reordering, and How to Read EXPLAIN
How Trino's cost-based optimizer (CBO) uses statistics to decide join order and distribution strategy, how to populate statistics with ANALYZE, and how to actually read EXPLAIN / EXPLAIN ANALYZE output to diagnose slow queries.
The same query can run tens of times faster or slower depending on join order and data distribution strategy. Trino's Cost-Based Optimizer (CBO) makes these decisions automatically based on statistics. The catch: if statistics are missing or inaccurate, the CBO produces bad plans.
This post covers what the CBO looks at when making decisions, how to populate statistics, and how to read execution plans with EXPLAIN to diagnose slow queries.
1. What the CBO Decides
The optimizer picks the lowest-cost plan among multiple execution plans that produce logically identical results. The CBO drives two key decisions.
| Decision | What it means | Impact |
|---|---|---|
| Join Reordering | The order in which multiple tables are joined | Intermediate result size → memory and time |
| Join Distribution | Broadcast vs. partitioned (hash redistribution) | Network shuffle cost |
Get either of these wrong and, instead of shrinking the small table first, you blow up memory with a massive intermediate result. The CBO's basis for these judgments is statistics.
2. The Statistics the CBO Uses
Trino leverages the following table/column statistics.
| Statistic | Meaning | Used for |
|---|---|---|
| row count | Number of rows in a table/partition | Join size estimation |
| NDV (distinct values) | Number of distinct values in a column | Join selectivity, group count estimation |
| min / max | Value range of a column | Range-predicate selectivity, pruning |
| null fraction | Fraction of NULL values | Selectivity correction |
| data size | Average column size | Memory and network cost |
Iceberg tables always carry data-file-level statistics (row count, min/max, null count) in their manifests, so basic pruning works without any statistics collection. However, core CBO statistics like NDV must be collected separately with ANALYZE to improve the quality of join-order optimization.
3. ANALYZE — Collecting Statistics
-- Collect statistics for all columns
ANALYZE iceberg.analytics.events;
-- Specific columns only (focus on frequently joined/filtered columns)
ANALYZE iceberg.analytics.events
WITH (columns = ARRAY['user_id', 'event_type', 'event_time']);Inspect the collected statistics with SHOW STATS.
SHOW STATS FOR iceberg.analytics.events;
-- You can also view estimated statistics for a filtered result
SHOW STATS FOR (
SELECT * FROM iceberg.analytics.events
WHERE event_time >= TIMESTAMP '2026-06-01 00:00:00 UTC'
);The output shows per-column distinct_values_count, nulls_fraction, row_count, data_size, low_value, and high_value. If distinct_values_count is empty (NULL), the CBO falls back to guessing, so whether this value is populated is your first checkpoint.
Operational tip: For large tables, attach
ANALYZEto the end of your ingestion pipeline or run it on a nightly schedule. When statistics go stale (data has grown but statistics still reflect old values), the CBO under- or over-estimates sizes and plans degrade.
4. Join Distribution — Broadcast vs. Partitioned
When joining two tables, there are two strategies for placing the data on workers.
Broadcast Join (Replicated Join)
The small table (build side) is replicated to every worker, and the large table (probe side) is joined in place.
Large table (stays distributed) × Small table (replicated to all workers)
→ No shuffle of the large table. Best when the small table is small enoughPartitioned Join (Hash-Redistribution Join)
Both tables are hash-redistributed on the join key so that matching keys land on the same worker.
Both sides shuffled on the join key → required when joining two large tables| Broadcast | Partitioned | |
|---|---|---|
| Build side handling | Replicated to all workers | Redistributed by key |
| Best for | One side is small | Both sides are large |
| Risk | Memory blowup if build side is large | Shuffle network cost |
The policy is controlled via configuration.
# etc/config.properties
join-distribution-type=AUTOMATIC # CBO chooses automatically based on statistics (recommended)
# Can be forced to BROADCAST or PARTITIONEDWith AUTOMATIC, the CBO inspects the statistics and decides whether to broadcast the smaller side or go partitioned when both sides are large. Without statistics this judgment misfires, replicating a large table that should never be broadcast and causing OOM.
5. Join Reordering
In multi-table joins, order determines performance. The core principle is to keep intermediate results as small as possible.
SELECT *
FROM fact_events e -- 1 billion rows
JOIN dim_users u ON e.user_id = u.id -- 1 million rows
JOIN dim_region r ON u.region_id = r.id -- 50 rows
WHERE r.country = 'KR';Shrinking dim_region first with r.country = 'KR', joining that result to dim_users, and joining fact_events last keeps intermediate results small. Going in the opposite order inflates the 1 billion rows first and blows up memory. The CBO estimates the cost of each ordering using statistics (especially NDV and row count) and picks the optimal one.
optimizer.join-reordering-strategy=AUTOMATIC # Statistics-based reordering (recommended)AUTOMATIC only works properly when statistics are available. Without statistics, it only performs conservative reordering at the ELIMINATE_CROSS_JOINS level.
6. How to Read EXPLAIN
6.1 EXPLAIN (Logical/Distributed Plan)
EXPLAIN
SELECT u.region_id, count(*)
FROM iceberg.analytics.events e
JOIN iceberg.analytics.users u ON e.user_id = u.id
GROUP BY u.region_id;-- To also see the distribution strategy
EXPLAIN (TYPE DISTRIBUTED)
SELECT ...;Key keywords to look for in the plan:
| Keyword | Meaning |
|---|---|
ScanFilterProject | Table scan + filter/projection. Pushed-down predicates appear here |
InnerJoin etc. | Join node. Annotated with (REPLICATED)=broadcast, (PARTITIONED)=hash join |
Aggregate(PARTIAL/FINAL) | Partial aggregation → final aggregation (two-phase aggregation) |
RemoteExchange | Data shuffle between workers (a network cost point) |
Estimates | The CBO's row count, size, and cost estimates |
If estimates show up as ?, e.g. Estimates: rows = ?, that's a signal that statistics are missing. In that case, run ANALYZE first.
6.2 EXPLAIN ANALYZE (After Actual Execution)
This actually executes the query and shows measured values per stage. It's the core diagnostic tool.
EXPLAIN ANALYZE
SELECT u.region_id, count(*)
FROM iceberg.analytics.events e
JOIN iceberg.analytics.users u ON e.user_id = u.id
GROUP BY u.region_id;What to look for in the output:
- Actual vs. estimated row counts per operator: a large gap means inaccurate statistics. → Re-run
ANALYZE. - Time per operator (CPU/wall): the stage where time concentrates is the bottleneck.
- Input/Output rows: where the data explodes.
- Shuffle (Exchange) data volume: the network cost of partitioned joins.
Diagnosis priority #1: the gap between estimated and actual row counts. Every CBO decision flows from estimates, so if the estimates are wrong, the whole plan is wrong. A large gap means a statistics problem; if the gap is small but the query is still slow, the problem is data layout (partitioning, sorting, small files).
7. Pushdown and Dynamic Filtering — The CBO's Companions
Even when the CBO produces a good plan, it's useless if the scan stage can't reduce the data. Two mechanisms complement it.
- Predicate / Aggregate Pushdown: WHERE predicates or aggregations are pushed down to the connector (source), so filtering/aggregation happens at the source. Especially powerful with RDBMS connectors.
- Dynamic Filtering: a dynamic filter built from the join's build side results is applied to the probe side scan, reducing the files/rows read at runtime.
You can verify dynamic filtering is active by checking for a dynamicFilter entry in EXPLAIN. Wrapping a column in a function in the WHERE clause (CAST(ts AS DATE), year(ts)) breaks pushdown, so the rule is to keep the column bare on the left-hand side.
8. When the CBO Misfires — Checklist
| Symptom | Cause | Remedy |
|---|---|---|
Estimates in EXPLAIN show ? | No statistics | Collect with ANALYZE |
| Estimated rows ≪ actual | Stale statistics | Re-run ANALYZE |
| Large table gets broadcast, causing OOM | Size estimation failure | Refresh statistics; force join_distribution_type per session if needed |
| Inefficient join order | Reordering disabled / no statistics | join-reordering-strategy=AUTOMATIC + statistics |
| Pushdown not happening | Function wrapping in WHERE | Keep the column bare on the left-hand side |
| Dynamic filter not kicking in | Build side too large / OUTER JOIN | Revisit the join structure |
9. Session-Level Fine-Tuning
When you want to treat a single query differently:
SET SESSION join_reordering_strategy = 'AUTOMATIC';
SET SESSION join_distribution_type = 'PARTITIONED'; -- disable forced broadcastKeep the global defaults stable and adjust only outlier queries at the session level — this is the operationally safe approach.
10. Summary
| Topic | Key point |
|---|---|
| The CBO's fuel | Statistics (row count, NDV, min/max, null). Without them it guesses |
| Populating statistics | ANALYZE, regular schedules, at the end of ingestion pipelines |
| Join distribution | AUTOMATIC + accurate statistics for automatic broadcast/partitioned selection |
| Join order | Statistics-based reordering to minimize intermediate results |
| Diagnosis | Read the plan with EXPLAIN; check the estimate-vs-actual gap with EXPLAIN ANALYZE |
| Scan efficiency | Pushdown and dynamic filtering; no function wrapping |
Every CBO decision starts from statistical estimates. Half of Trino performance tuning is therefore "keeping statistics accurate," and the other half is "reading the gap between estimates and reality with EXPLAIN ANALYZE." Once these two become habits, you can fix slow queries based on evidence, not intuition.
This article is based on the Trino 440 series. If you need help with query performance diagnosis and tuning or building a statistics operations process, feel free to reach out.
— Data Dynamics Engineering Team