Summary
The issue described is a classic symptom of duplicate processing in Flink and Beam pipelines, specifically caused by non-deterministic logic during snapshotting. When a job restarts from a savepoint (snapshot), Flink restores its operator state, including the Kafka consumer offsets. However, if the application code contains non-deterministic operations (e.g., using random UUIDs, current timestamps, or external calls within the processing logic), Flink’s Chandy-Lamport snapshot algorithm may produce inconsistent state snapshots. Upon restoration, the DoFn lifecycle in Beam/Flink might trigger re-processing of the same records, or the restored state may not align perfectly with the Kafka offsets, leading to the consumption of the same messages again. The presence of “two readers” typically indicates that Flink has successfully restored the job and is running with exactly-once or at-least-once parallelism settings, but the logic inside the DoFn is failing the @ProcessElement or @FinishBundle checks.
Root Cause
The root cause is non-deterministic logic within the Beam DoFn interacting poorly with Flink’s checkpointing mechanism.
- Determinism Violation: Flink requires all user-defined functions to be deterministic. Using functions like
UUID.randomUUID(),Instant.now(), orMath.random()inside@ProcessElementbreaks determinism. When a checkpoint is taken, Flink serializes the operator state. If the logic is non-deterministic, the state restored upon restart may behave differently or trigger re-computation paths. - State Backend Alignment: In Flink 1.19, the state backend (likely RocksDB) holds the offsets. The logs showing “Reader-0: first record offset” twice suggest that either:
- The Kafka offsets were committed (despite
enable.auto.commit = false, Beam might commit via Flink’s checkpoint committer) and the job restarted from the latest committed offset, but the startup logic re-read the previous batch. - Or more likely: The snapshot corrupted the internal watermark state, causing the restored job to re-read the window boundaries it had already processed.
- The Kafka offsets were committed (despite
- Source Buffering: The “two readers” log indicates parallel source splitting. If the snapshot contained buffered data in the
Sourceoperator (data read from Kafka but not yet processed by downstream operators), Flink might re-emit that buffered data upon restoration to ensure no data loss, leading to duplicates if the downstream logic doesn’t handle idempotency.
Why This Happens in Real Systems
- Exactly-Once Semantics Trade-off: Beam on Flink aims for exactly-once processing. To achieve this, Flink uses barriers and snapshots. If the job fails or is manually stopped and restarted from a savepoint, Flink’s guarantee is “at least once” for the records in the flight window at the time of the savepoint.
- Managed Services Complexity: On AWS (e.g., Kinesis Data Analytics for Fink or self-managed EMR), network latency can cause checkpoint timeouts. If a checkpoint completes after some data has been read but before it is processed, the restart logic will re-read that data to be safe.
- Idempotency Assumption: Engineers often assume Kafka’s
enable.auto.commit = falseprevents duplicates. However, Beam runners on Flink commit offsets internally as part of the Flink checkpoint (to the Kafka consumer group), independent of the consumer config. If the snapshot saves the “I have read this” state, the consumer will resume from there.
Real-World Impact
- Data Integrity Failure: The primary impact is duplicate records in the downstream sink (e.g., Database, S3, Elasticsearch). This leads to financial discrepancies, inflated metrics, or duplicate entries in analytical datasets.
- State Bloat: If the duplicate processing causes the
DoFnto write new state (e.g., to a KeyedState), the Flink RocksDB state size will grow indefinitely, eventually causing Out Of Memory (OOM) or Checkpoint Failures, leading to pipeline instability. - Latency Spikes: Re-processing data increases the processing time, causing the pipeline to fall behind real-time ingestion (Consumer Lag), making the “Real-time” aspect of the pipeline useless.
Example or Code
Here is the example of the deterministic vs. non-deterministic code pattern causing this issue. Flink serializes the DoFn instance. If processElement produces different outputs for the same input upon restart, the pipeline fails.
Non-Deterministic (The Bug):
@ProcessElement
public void processElement(ProcessContext c) {
// BAD: Generates a new UUID every time this element is processed.
// If Flink replays this element during a snapshot restore,
// a NEW UUID is generated, but the business logic might fail
// or the data looks like a duplicate entry downstream.
String id = UUID.randomUUID().toString();
// BAD: Using current time makes the output time-dependent.
long processingTime = Instant.now().toEpochMilli();
c.output(KV.of(id, c.element() + "-" + processingTime));
}
Deterministic (The Fix):
@ProcessElement
public void processElement(ProcessContext c) {
// GOOD: Deterministic ID based on the input data.
String id = Hashing.sha256().hashString(c.element(), StandardCharsets.UTF_8).toString();
// GOOD: Use Flink's watermark time, not system time.
long processingTime = c.timestamp().getMillis();
c.output(KV.of(id, c.element() + "-" + processingTime));
}
How Senior Engineers Fix It
Senior engineers address this by enforcing determinism and managing state expiration.
- Enforce Determinism: They audit the codebase for
UUID.randomUUID(),new Date(), andMath.random(). They replace these with hashing functions (SHA-256) of the input data or deterministic lookups. - Checkpoint Tuning: They tune
execution.checkpointing.min-pauseandexecution.checkpointing.timeoutto ensure snapshots are stable and don’t trigger frequent restarts. - Idempotent Sinks: They ensure the final sink (e.g., Database writer) has a Primary Key or Upsert mechanism. Even if Flink sends a duplicate, the database rejects it or overwrites it.
- Disable Auto-Commit Explicitly: They ensure
enable.auto.commitis false, but they configure the FlinkKafkaConsumer to use Flink’s offset commit mechanism (which is tied to checkpoints) rather than relying on the Kafka client’s background thread. - State TTL: They set Time-to-Live (TTL) on Flink KeyedState to prevent old snapshots from growing the RocksDB directory indefinitely.
Why Juniors Miss It
- Abstracted Frameworks: Juniors often use
ThreadLocalRandomorInstant.now()because “it works locally.” They do not understand that in a distributed system, these functions must be executed inside aRichFlatMapFunctionorDoFnin a way that is reproducible. - Testing Gaps: Unit tests often run on single records. They don’t test Savepoint Restoration. A junior engineer might test “Process 1 record” and “Restart from savepoint” separately, but fail to verify if that same record is processed twice after the restart.
- Configuration Confusion: They see
enable.auto.commit = falsein the properties and assume it prevents all duplicates, not realizing that the Flink Runner overrides this behavior to manage its own checkpointing safety.