Blog
kafkadisaster-recoverymirrormaker2offsetsfailover

[Kafka DR 3] The Hard Part of DR — Offset Translation and Consumer Failover

A DR cluster is an independent log, so the same record carries a different offset there. This post dissects how MirrorMaker 2 bridges that gap with offset-syncs and checkpoints, how to read translated offsets via RemoteClusterUtils, and how sync.group.offsets fails consumers over with zero app changes.

Data DynamicsJune 15, 202615 min read

Replication is easy. The hard part is "where exactly do we let the consumer resume reading?" A consumer that had read up to offset 1,000,000 on the primary cluster fails over to the DR cluster — can it just resume at offset 1,000,000 there? Absolutely not. The DR cluster is not a byte-for-byte copy of the original; it is a separate log with its own, independently assigned offsets. That single fact creates the single hardest problem in all of Kafka DR. This installment focuses on that one problem only — offset translation and consumer failover.

What you'll learn in this post

  • Why DR offsets differ from the primary (the independent-log problem)
  • How MirrorMaker 2 bridges the offset gap with offset-syncs and checkpoints
  • Reading translated offsets via RemoteClusterUtils / MirrorClient.translateOffsets()
  • Failing consumer groups over with zero app changes using sync.group.offsets.enabled
  • Why translation is approximate and at-least-once — what that means for RPO and idempotency
  • Tuning checkpoint periodicity and the operational traps to avoid

This is Part 3 of the "Building Kafka DR" series. In Part 1 (DR architecture and RPO/RTO design) and Part 2 (replicating topics and data with MirrorMaker 2), we got as far as "streaming data into the DR cluster." This installment covers the final step — letting consumers actually pick up where they left off.


1. The Core Problem — A DR Cluster Is an Independent Log

Same record, different offset

A Kafka offset is a physical position within a partition log. It is not a global ID; it is a local number each broker assigns sequentially from 0 as it appends records to its own log. The key insight: an offset is the record's position, not its identity.

When MirrorMaker 2 (MM2) replicates a topic from the primary (primary) to the DR cluster (dr), a new topic named primary.orders appears on the DR side and records are re-produced into it. During that process, the offsets in the DR topic are freshly assigned by the DR broker. The original offsets are not carried over.

Loading diagram…

Why do they diverge? Several causes overlap.

CauseExplanation
Replication start timeIf MM2 begins mid-topic, DR offsets start at 0 while the primary is already in the millions
Retention / compactionIf the head of the primary log is purged by retention before replication, the start offset differs
Gaps / retriesProducer retries, transaction markers, etc. can make the append counts differ slightly on each side
Topic recreationRecreating the primary topic resets its offsets, but DR keeps incrementing

What actually goes wrong

A consumer group records how far it has read as a committed offset in __consumer_offsets. That value is a number relative to the primary cluster. If, during failover, you seek to that same number on DR:

  • If the DR offset is larger → you skip records not yet processed (data loss, effectively dropped processing)
  • If the DR offset is smaller → you re-read already-processed records (massive duplicate processing)

Both are production incidents. That is why we need offset translation — a mapping that says "the record that was at offset X on the primary is at offset Y on DR," so that at failover X is rewritten to Y and the consumer resumes at the correct position.


2. How MM2 Solves It — offset-syncs and checkpoints

MM2 builds this mapping automatically. Two connectors and two internal topics cooperate.

MirrorSourceConnector → the mm2-offset-syncs topic

MirrorSourceConnector, which handles data replication, produces records to DR and, as it does so, periodically records "primary offset ↔ DR offset" mapping pairs into a separate internal topic, mm2-offset-syncs.<target>.internal (commonly shortened to mm2-offset-syncs). It does not write a mapping for every record; at fixed intervals it leaves an anchor mapping.

A record in this topic essentially means:

topic-partition  = primary.orders-0
upstreamOffset   = 1_000_000   (offset on the primary)
downstreamOffset =   742_318   (offset on DR)

That is, it drives an anchor stating "the record at primary offset 1,000,000 is at DR offset 742,318."

MirrorCheckpointConnector → the <source>.checkpoints.internal topic

MirrorCheckpointConnector goes one step further. This connector periodically reads how far the primary cluster's consumer groups have committed, uses the offset-syncs mappings to translate those into DR-relative offsets, and writes the result into the <source>.checkpoints.internal topic.

A single checkpoint record carries roughly this information:

FieldMeaning
consumerGroupIdWhich consumer group (e.g. order-processor)
topicPartitionWhich topic-partition (DR-side name, primary.orders-0)
upstreamOffsetThe offset that group committed on the primary
downstreamOffsetThe translated DR offset (= the seek position at failover)
metadataAdditional info

Putting it together, the data flow looks like this.

Loading diagram…

Configuration example

Adding checkpoint-related options to the MM2 configuration from Part 2 looks like this.

# mm2.properties (excerpt)
clusters = primary, dr
primary->dr.enabled = true
 
# Data replication (MirrorSourceConnector) — covered in Part 2
primary->dr.topics = orders, payments, .*
 
# Location of the offset-syncs topic (default is target=dr)
primary->dr.offset-syncs.topic.location = target
 
# Enable checkpoints (consumer group offset translation)
primary->dr.emit.checkpoints.enabled = true
primary->dr.groups = order-processor, payment-.*
 
# Checkpoint emit interval — smaller is more accurate but heavier
primary->dr.emit.checkpoints.interval.seconds = 30
 
# offset-syncs emit interval
primary->dr.sync.offsets.interval.seconds = 30

groups specifies the consumer groups to translate, as a regex. If left empty, no checkpoints are produced, so you must explicitly name the groups you intend to fail over to DR.

Walking through it with numbers

The abstract description only goes so far, so let's trace one translation cycle with concrete values. We'll look at a single partition, orders-0.

  1. On the primary, replication started at primary offset 250,000, and that record landed at offset 0 in DR's primary.orders-0. So the constant delta = 250,000.

  2. MirrorSourceConnector periodically leaves anchors.

    upstreamOffset (primary)downstreamOffset (DR)
    250,0000
    300,00050,000
    1,000,000750,000
  3. The consumer group order-processor committed up to offset 1,000,000 on the primary.

  4. MirrorCheckpointConnector reads this commit and, using the nearest anchor (1,000,000 → 750,000), translates it to downstreamOffset = 750,000 and records it in the checkpoint.

  5. At failover, the consumer resumes from offset 750,000 in DR's primary.orders-0. Had it used the primary's 1,000,000 directly, it would have shot far past the end of the DR log.

A caveat: if the commit position (say 1,000,000) is not an exact anchor but lands between anchors, MM2 picks the smaller (earlier) anchor and maps slightly forward of it. In the example above, if the group had committed 320,000, MM2 would translate conservatively against the 300,000 → 50,000 anchor. The result is a bit of reprocessing (duplicates) — which is precisely the at-least-once nature we discuss next (Section 4).


3. Reading Translated Offsets — RemoteClusterUtils and sync.group.offsets

Once the checkpoint topic is populated, there are two ways to consume those values at failover.

Option A — query directly with RemoteClusterUtils

RemoteClusterUtils (which uses MirrorClient under the hood) reads the checkpoint topic and computes "where this group should start reading on DR."

import org.apache.kafka.connect.mirror.RemoteClusterUtils;
 
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.common.TopicPartition;
 
// Configuration that connects to the DR cluster
Properties props = new Properties();
props.put("bootstrap.servers", "dr-broker1:9092,dr-broker2:9092");
 
// Compute the DR-relative resume position for a group that came from "primary"
Map<TopicPartition, OffsetAndMetadata> translated =
    RemoteClusterUtils.translateOffsets(
        props,
        "primary",          // source cluster alias (DR topic prefix)
        "order-processor",  // consumer group to translate
        java.time.Duration.ofSeconds(10));
 
// Keys in `translated` are DR topic-partitions (e.g. primary.orders-0),
// values are the offsets that group should seek/commit on DR.

Using MirrorClient directly gives finer control.

import org.apache.kafka.connect.mirror.MirrorClient;
import org.apache.kafka.connect.mirror.MirrorClientConfig;
 
MirrorClient client = new MirrorClient(new MirrorClientConfig(/* configs */));
 
// Map a primary topic name to its remote DR topic name
String remoteTopic = client.remoteTopic("orders", "primary"); // -> "primary.orders"
 
// Query the group's translated offsets
Map<TopicPartition, OffsetAndMetadata> offsets =
    client.translateOffsets(props, "primary", "order-processor", Duration.ofSeconds(10));

In your failover script, commit these offsets into the DR cluster's __consumer_offsets, or feed them into seek() when the consumer starts.

Option B — auto-sync with sync.group.offsets (zero app changes)

Option A requires writing code. The smoother approach is to let MM2 write the translated offsets directly into the DR cluster's __consumer_offsets.

# Automatically write translated consumer group offsets into DR's __consumer_offsets
primary->dr.sync.group.offsets.enabled = true
primary->dr.sync.group.offsets.interval.seconds = 30

With this enabled, MirrorCheckpointConnector does not stop at producing checkpoints; it also pre-syncs committed offsets onto the DR cluster under the same group ID. As a result:

  • At failover you need no application changes at all. Just point the consumer's bootstrap.servers at DR and restart; the moment it connects with the same group.id, the translated position is already in DR's __consumer_offsets, and it resumes from there.
  • However, syncing happens only when that group is not active on DR. If the same group is already running on DR (e.g. active-active), MM2 will not overwrite it — to avoid clobbering the progress of a healthy, running consumer.
ComparisonOption A (translateOffsets)Option B (sync.group.offsets)
App changesFailover logic/script requiredNone (just change bootstrap)
ControlFine-grained (per group/partition)Automatic (bulk)
When appliedExplicit call at failover momentPre-synced periodically
Best forCustom failover orchestrationSimple, standard active-passive DR

For most active-passive DR, Option B carries the least operational burden.

The difference under active-passive vs active-active

Offset translation is cleanest when you assume a one-directional flow (active-passive). Consumers run only on the primary, DR is on standby, and DR consumers wake up only at the moment of failover. At that point the group on DR is "inactive," so sync.group.offsets can safely pre-fill its offsets.

Under active-active (bidirectional), the story gets complicated.

SituationConsideration
Same group.id runs on both sidesMM2 will not overwrite an active group's offsets → the sync is intentionally skipped
Bidirectionally replicated topicsprimary.orders and dr.orders coexist; you must clearly design which one consumers read
Preventing cyclic replicationMM2's topic prefixes (primary., dr.) prevent infinite loops, but group offset sync is only trustworthy one-directionally

If you run active-active, rather than relying on group offset sync it is more predictable to translate and commit explicitly via Option A at the failover moment. Bidirectional replication and consumer topology design are covered more deeply in a later installment (or in Part 1's topology section).

Sanity-checking against end offsets

Do not blindly trust the translation result. Just before failover you must compare it against the DR partition's end offset (log end offset) as a sanity check.

translated offset > DR end offset    → bad mapping (a future position). Seeking there yields OffsetOutOfRange.
                                     → clamp to the end offset or nearest valid anchor.
translated offset < DR start offset  → points at a range already purged by retention.
                                     → reset to earliest (be ready for heavy reprocessing).

When RemoteClusterUtils works normally these boundary violations are rare, but they can certainly occur under large replication lag or right after a topic is recreated. Putting clamp logic in your failover script adds one more layer against a production incident.


4. What You Must Know — Translation Is Approximate and At-Least-Once

Here we need to be honest. Offset translation is not exact. And that is not a bug — it is inherent to the design.

Why it's approximate

offset-syncs records only periodic anchors, not every record (for load reasons). So translation is computed against the "nearest known anchor," and records between anchors are mapped to a slightly conservative (earlier) position. Checkpoints, likewise, are stamped only once per emit.checkpoints.interval.seconds, so any progress the primary consumer made after the last checkpoint is not reflected.

As a result, the translated position is likely to be slightly behind (in the past of) the consumer's true last-processed point. What that means:

After failover, some records will be reprocessed (duplicates). This is normal, and MM2's semantics are explicitly at-least-once — not exactly-once.

So consumers must be idempotent

To safely absorb post-failover duplicates, your downstream logic must be idempotent.

Bad (not idempotent):
  receive order event → "subtract" amount from balance
  → reprocessing the same event double-subtracts (incident)
 
Good (idempotent):
  receive order event → UPSERT keyed by idempotency key (order_id)
  → reprocessing the same event yields the same result

Typical idempotency techniques:

TechniqueDescription
Idempotency key + UPSERTIgnore/update duplicate INSERTs by business key
Processed-events tableStore processed event IDs and skip on reappearance
Conditional writeCAS-style "update only if current state is X"
Idempotent external APIPayments etc. support an idempotency-key header

Un-replicated data at failover = your RPO

Another truth: records that had not yet replicated to DR at the moment of failure cannot be rescued by translation. That volume is exactly your RPO (Recovery Point Objective).

RPO ≈ primary's latest produce position − position fully replicated to DR
    ≈ replication lag × failure incidence

If average replication lag is 2 seconds, worst case you can lose roughly 2 seconds' worth of data (this ties directly to the RPO design in Part 1). To shrink this loss you must shrink replication lag itself; to drive the loss to zero you have left asynchronous DR entirely and entered synchronous-replication territory.

Tune accuracy via intervals

Translation accuracy (i.e. how much gets reprocessed) is controlled by two intervals.

SettingEffectTrade-off
emit.checkpoints.interval.secondsStamp checkpoints more often to keep translation freshToo small increases Connect cluster load and internal-topic traffic
sync.group.offsets.interval.secondsSync DR __consumer_offsets more oftenSame — increased load
offset-syncs (sync.offsets.interval.seconds)Denser anchors → finer translation granularityMore internal-topic writes

A good starting point is 30 seconds. If your RPO/duplicate tolerance is tight, lower it toward 10s; if you have a very large number of topics and load is a concern, raise it to 60s and monitor. The core principle: denser checkpoints = more accurate failover = fewer duplicates, but higher load.


5. Operational Checklist

Verify the following before a failover rehearsal.

CheckHow to verify
Are checkpoint topics being populated?Consume primary.checkpoints.internal on DR and inspect records
Are target groups covered by the groups regex?emit.checkpoints logs / group presence in checkpoints
Are offset-syncs flowing?Confirm mappings land in mm2-offset-syncs.<target>.internal
Is sync.group.offsets working?Confirm the group's offsets exist in DR __consumer_offsets
Are translation results sane?Compare translateOffsets() output against DR end offsets (no negatives/overshoot)
Consumer idempotencyValidate no side effects by processing the same event twice
Replication lag (RPO)Dashboard the MirrorSourceConnector lag metric

A common trap: if you enabled sync.group.offsets but no offsets show up on DR, nine times out of ten the group (1) was not matched by the groups regex, (2) never committed on the primary, or (3) is already active on DR so MM2 refused to overwrite.


Wrapping up

  • A DR cluster is not a copy of the primary — it is an independent log. The same record has a different offset, so seeking on DR with a primary offset causes either dropped records or massive duplicates.
  • MM2 automates "primary offset → DR offset" translation through the mm2-offset-syncs mappings left by MirrorSourceConnector and the <source>.checkpoints.internal records produced by MirrorCheckpointConnector.
  • Read the translated values directly with RemoteClusterUtils.translateOffsets(), or pre-write them into DR's __consumer_offsets with sync.group.offsets.enabled=true so you can fail over with zero app changes.
  • Translation is approximate and at-least-once. Some reprocessing after failover is normal, so consumers must be idempotent. Whatever has not yet replicated is exactly your RPO.
  • Balance accuracy against load with emit.checkpoints.interval.seconds and sync.group.offsets.interval.seconds (start at 30s).
  • In Part 4, we put all this translation and syncing to work in an actual failover/failback runbook — failure detection, traffic cutover, fallback procedure, and rehearsal scenarios — step by step.

References


— The Data Dynamics Engineering Team