Blog
trinoicebergnifilakehousestreamingdata-platform

Real-Time Ingest into Trino Iceberg Tables with NiFi

A comparison of two patterns for real-time ingest into Trino-created Iceberg tables with NiFi (PutIceberg direct write vs. Trino JDBC), why Impala-style REFRESH is no longer needed thanks to Iceberg's snapshot model, and how to handle real-world pitfalls like small files, snapshot explosion, and commit conflicts.

Data DynamicsMay 23, 202616 min read

If you've ever loaded files into Impala + Hive with an external tool, the flow is familiar. Drop the files, fire REFRESH table_name or INVALIDATE METADATA, and only then does the new data appear in queries. Forget it and users see yesterday's data; throw the wrong one and the whole cluster metastore wobbles.

Move to Iceberg and this whole step disappears. Snapshots are the metadata. The moment the writer's atomic commit finishes, every reader in the next transaction sees the new data — no separate REFRESH call.

This article lays out the practical design for real-time ingest from NiFi into Iceberg tables that were created in Trino. It compares two ingest paths (PutIceberg direct write, and via Trino JDBC) and digs into the pitfalls operators actually meet underneath the surface promise of "flexible without REFRESH" — small files, snapshot explosion, concurrent commit conflicts, compaction, and expiration.


1. Why Iceberg Has No REFRESH

1.1 The Impala/Hive Model — the Catalog Knows the File List

In a traditional Hive table, "table" is split into two parts.

  • Hive Metastore — holds schema, partition locations, and file statistics as metadata.
  • Files on HDFS / S3 — the actual data files and directories.

For performance, Impala caches metastore information in catalogd. When an external tool (NiFi, Spark, Sqoop, etc.) drops new files, those files are on disk but catalogd doesn't know about them, so queries can't see them.

The remedies were REFRESH and INVALIDATE METADATA.

-- Reload metadata/file listing for a single partition
REFRESH events PARTITION (dt='2026-05-23');
 
-- Invalidate all metadata for a table (expensive)
INVALIDATE METADATA events;

REFRESH is an operator-side signal that says "rescan the metastore and the file system." Skip it or fire the wrong one and you get:

  • Data has been ingested, but BI doesn't see it.
  • Calling INVALIDATE METADATA too often pressures catalogd and pushes up cluster-wide query latency.
  • Races between the ingest pipeline and BI queries.

1.2 The Iceberg Model — Metadata as the Single Source of Truth

Iceberg solves the same problem differently. A table has the following structure.

catalog ──▶ metadata.json (current pointer)
                  │
                  └─▶ snapshot                ← the table state as of time t
                        ├─▶ manifest-list      ← manifests this snapshot references
                        │     ├─▶ manifest     ← list of data files + statistics
                        │     │     └─▶ data files (Parquet/ORC/Avro)
                        │     └─▶ ...
                        └─▶ ...

Two things matter most.

  • Every reader always goes through metadata.json. Files are not discovered through directory listing. The snapshot pointed to by metadata.json is "the table you see right now."
  • Writers create a new metadata.json and atomically swap it in the catalog. Once the swap completes, every reader from the next query onward sees the new snapshot.
[Hive/Impala model]                  [Iceberg model]

External writer ──▶ drop files       External writer ──▶ drop files
                                                     │
Operator ──▶ REFRESH                                 ▼
                │                              Write new metadata.json
                ▼                                    │
        catalogd cache refresh                       ▼
                │                              Atomic swap in catalog
                ▼                                    │
        Readers see new files                        ▼
                                              Readers see new snapshot
                                              (no REFRESH)

The writer's commit is the metadata update. There is no separate signal for the operator to fire. That's why "flexible without Impala's REFRESH" is a valid description.

1.3 One Caveat — Per-Engine Catalog Cache

Strictly speaking, Iceberg engines also briefly cache catalogs/metadata. The Trino Iceberg connector defaults to iceberg.metadata-cache.enabled=true with a short TTL (default 5 minutes, configurable). It's invalidated in two ways.

  • TTL expiry — the next lookup automatically reads a fresh metadata.json.
  • Explicit invalidationCALL iceberg.system.flush_metadata_cache(schema_name => 'sales', table_name => 'events')

For most ingest workloads the TTL is enough, and if you need immediate visibility you can disable the cache (iceberg.metadata-cache.enabled=false) or shorten the TTL. This is fundamentally different from a flow where the operator fires REFRESH on every ingest. The cache is a performance optimization, not a precondition for data visibility.


2. Preconditions — Creating Iceberg Tables in Trino

Before talking about NiFi, it's worth pinning down how the Iceberg table was created in Trino — the ingest-path design depends on these decisions.

2.1 Catalog Configuration (e.g., REST Catalog)

etc/catalog/iceberg.properties:

connector.name=iceberg
iceberg.catalog.type=rest
iceberg.rest-catalog.uri=https://catalog.internal:8181
iceberg.rest-catalog.warehouse=s3://lake/warehouse
iceberg.rest-catalog.security=OAUTH2
iceberg.rest-catalog.oauth2.credential=${ENV:CATALOG_CLIENT_ID}:${ENV:CATALOG_CLIENT_SECRET}
 
fs.native-s3.enabled=true
s3.region=ap-northeast-2
s3.endpoint=https://s3.ap-northeast-2.amazonaws.com
 
iceberg.file-format=PARQUET
iceberg.compression-codec=ZSTD

Hive Metastore and Glue are configured the same way. REST Catalog is the recommended choice in multi-engine environments (see Apache Iceberg REST Catalog Server for details).

2.2 Table Creation

CREATE TABLE iceberg.sales.events (
    event_id      VARCHAR,
    user_id       BIGINT,
    event_type    VARCHAR,
    payload       VARCHAR,
    event_time    TIMESTAMP(6) WITH TIME ZONE,
    ingest_time   TIMESTAMP(6) WITH TIME ZONE
)
WITH (
    format = 'PARQUET',
    partitioning = ARRAY['day(event_time)'],
    format_version = 2,
    sorted_by = ARRAY['event_time']
);

Key decisions:

  • format_version = 2 — V2 or above is required for NiFi/Spark/Flink to use modern operations including row-level deletes. Even for simple append-only, V2 is effectively the standard.
  • Start partitioning with day(event_time) — too-fine partitions explode small-file counts. Adjust the time unit by workload volume: day → hour → bucket(user_id).
  • sorted_by for time ordering — Trino-side compaction and dynamic filtering both work better with predicates on the time column.

From this point, NiFi just needs to look at the same catalog and ingest.


3. NiFi to Iceberg — Two Paths

The realistic options for loading an Iceberg table from NiFi are two.

[Option A] NiFi ──▶ PutIceberg ──▶ Iceberg catalog/storage (direct write)
[Option B] NiFi ──▶ PutDatabaseRecord ──▶ Trino JDBC ──▶ Iceberg

3.1 Option A — PutIceberg (Direct Write)

The PutIceberg processor, in the official bundle since NiFi 1.19, has NiFi directly call the Iceberg library to write data files and commit to the catalog. No other engine in between.

Pros

  • High throughput. NiFi workers write data files directly; only metadata commits hit the catalog. There's no intermediate SQL engine like Trino, so no row-transformation or round-trip cost.
  • Backpressure is intuitive. NiFi queues and thread pools directly determine ingest speed.
  • Failure replay is natural. Retries happen at the FlowFile level, and Iceberg's snapshot model means no partial commits.

Cons

  • Sensitive to catalog compatibility. NiFi PutIceberg has first-class support for Hive Catalog and Hadoop Catalog. REST Catalog support is expanding in NiFi 2.x; Glue/Nessie require extra dependencies/configuration. Check your NiFi version's support matrix first.
  • Iceberg/Hadoop deps and storage credentials must live on NiFi nodes. REST Catalog vended credentials can reduce the credential surface, but only if NiFi's IcebergCatalogService can handle vended credentials.
  • Schema evolution responsibility shifts to NiFi. When Trino adds a column with ALTER TABLE, you have to handle the moment NiFi's RecordSchema diverges.

3.2 Option B — Via Trino JDBC

Connect the PutDatabaseRecord processor to a Trino JDBC DriverConnectionPool and run INSERTs. The actual write to Iceberg is done by Trino workers.

Pros

  • Catalog-type agnostic. Trino holds the catalog, so NiFi only needs JDBC. Works identically for REST/Glue/Nessie/HMS.
  • Unified authorization model. Authentication/authorization, row/column masking, and audit logging are all decided in one place — Trino. The biggest operational win.
  • Robust against schema changes. Trino reads the latest metadata.json when planning the INSERT, so column additions or type changes are often absorbed without redeploying NiFi.

Cons

  • Lower throughput ceiling. Every row goes through the Trino coordinator and out to workers, adding overhead vs. direct write. For workloads beyond tens of thousands of rows per second, PutIceberg is often better.
  • Commits happen per INSERT. Throwing too-small batches as INSERTs piles up snapshots quickly (see section 6).
  • Trino cluster becomes a SPOF. If Trino goes down, the ingest pipeline stops too.

3.3 What to Choose

Rough guidance:

ConditionRecommended
Throughput-critical (10k+ rows/sec)Option A (PutIceberg)
Diverse catalogs (REST/Glue/Nessie)Option B (Trino JDBC)
Permissions/audit/masking controlled in TrinoOption B (Trino JDBC)
Hard to place storage/catalog credentials on NiFi nodesOption B (Trino JDBC)
Simple ingest SLA (append-only, stable schema)Option A (PutIceberg)
Ops team knows Trino well; NiFi as a simple gatewayOption B (Trino JDBC)

In practice, hybrid is common. High-volume fact tables go via Option A; PII tables with strict permissions and audit go via Option B.


4. Implementing Option A — Direct Write with PutIceberg

4.1 NiFi Controller Services

Set up three services first.

  1. HiveCatalogService (or RESTCatalogService — depending on NiFi version) — Iceberg catalog connection info.
  2. AWSCredentialsProviderService — S3 credentials. If using REST Catalog vended credentials, prefer a dynamic token-cache mode over static credentials.
  3. JsonTreeReader — Record schema for incoming FlowFiles.

Example HiveCatalogService

Catalog URI:           thrift://hms.internal:9083
Warehouse Location:    s3://lake/warehouse
Hadoop Configuration:  /opt/nifi/conf/core-site.xml,/opt/nifi/conf/hdfs-site.xml
Kerberos Credentials:  (KerberosControllerService reference if needed)

Encode the S3A credential provider, S3 endpoint, and retry policy in the Hadoop Configuration file. Avoid putting credentials in NiFi properties in plain text; split them out via NiFi's Parameter Provider or environment variables.

4.2 Ingest Flow Design

The most common shape is Kafka → NiFi → Iceberg.

ConsumeKafkaRecord_2_6 ──▶ UpdateRecord ──▶ PutIceberg
        │                       │                │
        │                       │                ├─ Success: end
        │                       │                └─ Failure: retry queue + DLQ
        │                       │
        └─ schema = JsonTreeReader (Avro schema text)
                             writer = JsonRecordSetWriter

Key points:

  • Use Record-oriented processors consistently. ConsumeKafkaRecord, UpdateRecord, and PutIceberg all share the Record abstraction — define the schema once.
  • Add ingest_time in UpdateRecord:
    • Field: /ingest_time
    • Replacement: ${now():format("yyyy-MM-dd'T'HH:mm:ss.SSSXXX")}
  • PutIceberg settings:
    • Catalog Service: the HiveCatalogService created above
    • Catalog Namespace: sales
    • Table Name: events
    • File Format: PARQUET
    • Maximum File Size: 128 MB — small-file prevention (see section 6)
    • Unmatched Column Behavior: IGNORE_UNMATCHED_COLUMNS (tolerant of table-side column additions)
    • Number of Commit Retries: 4

4.3 Batch Size = Snapshot Frequency

This single line is the heart of operating Option A.

The unit at which PutIceberg is called becomes one Iceberg snapshot. The canonical pattern is to bundle records via ConsumeKafkaRecord's Max Poll Records or a MergeRecord processor before handing them to PutIceberg.

ConsumeKafkaRecord (Max Poll Records: 5000)
       │
       ▼
MergeRecord (Min Records: 5000, Max Records: 20000, Max Bin Age: 30 sec)
       │
       ▼
PutIceberg  ← commits every 30s, 5000–20000 records at a time

This yields roughly 2 snapshots per minute and data visibility within 30 seconds. Depending on workload, you can shrink Max Bin Age to as low as 10 seconds.


5. Implementing Option B — Via Trino JDBC

5.1 DBCPConnectionPool

Database Connection URL:
  jdbc:trino://trino-coord.internal:8443/iceberg/sales

Database Driver Class Name:
  io.trino.jdbc.TrinoDriver

Database Driver Location:
  /opt/nifi/lib/trino-jdbc-447.jar

Database User:           nifi-ingest
Password:                ${trino.password}

Properties:
  SSL=true
  SSLVerification=FULL
  externalAuthentication=false
  source=nifi-ingest
  applicationName=nifi-ingest
  sessionProperties=iceberg.target_max_file_size_bytes:134217728

Setting source and applicationName lets you filter NiFi traffic in Trino's query history. Grant a dedicated service account like nifi-ingest only INSERT permission.

5.2 PutDatabaseRecord Settings

Statement Type:               INSERT
Record Reader:                JsonTreeReader (or AvroReader)
Database Type:                Generic
Table Name:                   events
Schema Name:                  sales
Catalog Name:                 iceberg
Translate Field Names:        true
Unmatched Field Behavior:     Ignore Unmatched Fields
Unmatched Column Behavior:    Ignore Unmatched Columns
Maximum Batch Size:           1000

Trino JDBC supports the standard JDBC PreparedStatement.executeBatch(), so Maximum Batch Size works as expected. Around 1000 is a sane default that keeps throughput up without pressuring the coordinator.

5.3 An INSERT Creates a Snapshot

The same principle from Option A applies in Option B. One call to PutDatabaseRecord = one Iceberg snapshot. Trino's INSERT writes data files on the workers, and the coordinator swaps the metadata.json in the catalog.

So the MergeRecord pattern from Option A applies here too. Don't drip-feed FlowFiles — bundle them.


6. Real-Time Ingest Pitfalls — Small Files, Snapshot Explosion, Commit Conflicts

Beneath the surface of "flexible without REFRESH" lies cost that real-time ingest creates. Ignore it and query performance drops 2–3× within days.

6.1 Small Files

Even in Iceberg, once a data file is committed, it's immutable. Writing a 1 MB Parquet every 5 seconds yields 17,000 small files a day — manifest cost and query planning time explode.

The fix is to apply two solutions together.

  • Bundle at write time — the MergeRecord pattern above. Accept 5–10 seconds of visibility latency to land 64–128 MB at a time.
  • Compact after writing — periodically compact whatever small files remain.
ALTER TABLE iceberg.sales.events EXECUTE optimize;
 
-- Or only a specific partition
ALTER TABLE iceberg.sales.events
EXECUTE optimize WHERE event_time >= TIMESTAMP '2026-05-23 00:00:00 UTC';

Common operational patterns: compact the previous hour's partition every hour, or compact the previous day's partition every morning. Schedule via NiFi ExecuteSQL or a separate Airflow DAG.

6.2 Snapshot Explosion

Committing every 30 seconds yields 2,880 snapshots per day — 86,400 per month. metadata.json grows huge, and the cost of each new snapshot commit goes up.

-- Expire snapshots older than 7 days
ALTER TABLE iceberg.sales.events
EXECUTE expire_snapshots(retention_threshold => '7d');
 
-- Remove data files no longer referenced
ALTER TABLE iceberg.sales.events
EXECUTE remove_orphan_files(retention_threshold => '7d');

If you don't need time travel beyond 7 days, running expire_snapshots once daily is the standard. Be more conservative with remove_orphan_files — to avoid colliding with in-flight ingest — weekly is usually enough.

6.3 Concurrent Commit Conflicts

When multiple NiFi nodes commit to the same table simultaneously, Iceberg's optimistic concurrency control (OCC) detects the conflict and the latter commit fails. PutIceberg's Number of Commit Retries (default 4) handles auto-retry, but frequent conflicts erode throughput.

Mitigation patterns:

  • Route the same partition to the same node in NiFi — group by partition column with the PartitionRecord processor and route each group to the same PutIceberg instance.
  • Make batches larger with MergeRecord — reduces conflict frequency itself.
  • Cap writers per table — keep concurrent NiFi writers to the same table within 4–8.

6.4 Clocks / Watermarks / Late-Arriving Events

Both NiFi PutIceberg and Trino JDBC are most natural with append-only. If you need updates/upserts for late-arriving events, you have two choices.

  • Append + post-process MERGE — NiFi only appends; a separate Trino job cleans up with MERGE INTO target USING staging. Iceberg V2's row-level deletes power this.
  • Switch to Flink Iceberg Sink — if you really need streaming upsert semantics, Flink is more natural. NiFi's strength is orchestrating diverse sources/sinks, not ms-level streaming upsert.

7. Operations — Monitoring and Backpressure

7.1 NiFi Side

  • Queue-depth alerts — if the queue after ConsumeKafka keeps filling, PutIceberg/Trino can't keep up. Threshold-alert on NiFi's 5-minute average queue depth.
  • PutIceberg/PutDatabaseRecord failure counts — route transient commit failures to a retry queue, permanent failures to a DLQ.
  • Provenance events — per FlowFile, track ingest time, row count, and committed snapshot ID.

7.2 Trino / Iceberg Side

  • Table metadata queries:
-- Recent snapshot history
SELECT committed_at, snapshot_id, operation, summary
FROM iceberg.sales."events$snapshots"
ORDER BY committed_at DESC
LIMIT 50;
 
-- File statistics
SELECT
    count(*)              AS data_file_count,
    sum(record_count)     AS total_rows,
    sum(file_size_in_bytes) / 1024 / 1024 AS total_mb,
    avg(file_size_in_bytes) / 1024 / 1024 AS avg_mb
FROM iceberg.sales."events$files";
 
-- Size by partition
SELECT partition, file_count, record_count
FROM iceberg.sales."events$partitions"
ORDER BY record_count DESC
LIMIT 20;

Pin these three queries to a dashboard (Superset / Grafana) and your ingest pipeline's health is visible at a glance. avg_mb dropping below 64 is the signal to tighten the compaction cycle.

7.3 Example Alert Thresholds

MetricThresholdAction
ConsumeKafka queue depth (5-min avg)> 50,000 FlowFileAdd NiFi workers / tune Trino INSERT
Time since last commit in events$snapshots> 5 minAlert: ingest pipeline stopped
avg_mb in events$files< 32Shorten compaction cycle
Snapshots per day> 5,000Increase MergeRecord batch size

8. Looking at the Data from Trino — Visibility Without REFRESH

Let's verify. Query data NiFi committed 30 seconds ago from Trino.

-- Check newly ingested events
SELECT event_time, ingest_time, count(*)
FROM iceberg.sales.events
WHERE ingest_time >= current_timestamp - INTERVAL '1' MINUTE
GROUP BY 1, 2
ORDER BY 1 DESC;

This query always sees the latest snapshot. No REFRESH, no INVALIDATE METADATA, no explicit call is needed. Trino reads the metadata.json pointer from the catalog on every query and discovers the new snapshot.

If Trino's Iceberg metadata cache is on with a 5-minute TTL, you can have up to 5 minutes of delay. If immediate visibility after ingest matters, turn it off per session with SET SESSION iceberg.metadata_cache_enabled = false, or shorten the TTL in catalog config to 30s–1m.

Time travel works the same.

-- Table snapshot as of 5 minutes ago
SELECT count(*)
FROM iceberg.sales.events
FOR TIMESTAMP AS OF (current_timestamp - INTERVAL '5' MINUTE);
 
-- Query at a specific snapshot
SELECT count(*)
FROM iceberg.sales.events
FOR VERSION AS OF 7263921832639218321;

This is territory Impala + Hive couldn't reach. The writer's commit is the metadata, every snapshot has a time coordinate, so consistent visibility and retrospective analysis come together without REFRESH.


9. Permissions and Catalog Consistency — the Things Most Often Skipped

The most frequent operational problems are not code but permissions and catalog consistency.

  • Don't give NiFi service accounts too-broad permissions. PutIceberg may need CREATE TABLE / ALTER permissions on the catalog. To trim precisely, allow only INSERT and metadata.json swap in the catalog-side policy.
  • Regularly verify Trino, NiFi, and Spark see the same catalog. When Hive Metastore + Glue + REST Catalog all coexist, you get incidents where a table visible in one engine doesn't show up in another. Unifying on REST Catalog is recommended.
  • Consider vended credentials first. If you can avoid keeping persistent S3 credentials on NiFi nodes, that's better. Check per NiFi version whether the IcebergCatalogService supports a short-lived credential client config from REST Catalog.

10. Wrap-Up

  • In Iceberg, the writer's atomic commit is the metadata. The REFRESH / INVALIDATE METADATA that Impala + Hive needed disappears at the root. Trino's metadata cache is a performance optimization, not a visibility precondition.
  • NiFi → Iceberg has two paths. Throughput-first → PutIceberg direct write; permission/audit/catalog-diversity-first → Trino JDBC.
  • Batch size = snapshot frequency. Bundle with MergeRecord every 5–30 seconds; design as "one PutIceberg / PutDatabaseRecord call = one snapshot."
  • Operational cost lives in compaction, snapshot expiration, and orphan cleanup. Without scheduling OPTIMIZE, expire_snapshots, and remove_orphan_files, queries slow down within days.
  • Mitigate concurrent commit conflicts with PartitionRecord + writer-count caps. Iceberg OCC auto-retries, but frequent conflicts erode throughput.
  • Pin events$snapshots, events$files, events$partitions metatables to a dashboard. The ingest pipeline's health becomes visible through a few SQL queries.
  • Append-only is most natural. If upsert is needed, NiFi-append + nightly Trino MERGE, or consider switching to the Flink Iceberg Sink.

Iceberg cleans up the era when "operators had to refresh metadata by hand." As a result, ingestion tools like NiFi can focus on what they're good at (routing/transforming/retrying across diverse sources), while visibility, time travel, and consistency are delegated to the table format. This is what "flexible without Impala's REFRESH" actually looks like in operations.

— The Data Dynamics Team