[GSoC 2026] Kafka Streams runner — WatermarkManager part 1: in-memory per-source-partition tracking#38957
Conversation
First slice of the Kafka Streams runner WatermarkManager, decoupled from the Kafka wiring so it can be unit-tested in isolation. A stage's input watermark is min() over its upstream source partitions' committed watermarks. Tracking is keyed on source partitions rather than producer instances: the total source-partition count travels in-band with every report, and a partition is always owned by exactly one live instance (reassigned to a new owner on failure), so a killed instance never leaves the reader stuck and no instance-liveness tracking is needed. This was validated in a standalone Kafka Streams PoC before implementation. WatermarkManager holds at BoundedWindow.TIMESTAMP_MIN_VALUE until every source partition has reported, then emits min(); the emitted watermark is clamped to be non-decreasing, and a change in totalSourcePartitions re-opens the hold (the "revert" case, without an explicit epoch). Wiring into ExecutableStageProcessor (flush coupled to the offset commit, fan-out to all downstream partitions, real-Kafka integration tests) is a follow-up. Refs apache#18479
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces the core in-memory logic for the WatermarkManager, a critical component for enabling stateful transforms in the Kafka Streams runner. By decoupling watermark tracking from Kafka-specific infrastructure, the implementation provides a robust, testable mechanism for calculating stage input watermarks based on source partition reports. This is the first of a multi-part effort to fully integrate watermark propagation into the runner. Highlights
New Features🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces the WatermarkManager class and its corresponding unit tests to track a fused stage's input watermark based on upstream source partitions. The reviewer identified a critical issue where a change in the total source partition count does not clear existing partition reports, which could lead to using stale watermarks and premature advancement. To resolve this, the reviewer suggested clearing the watermark map whenever the partition count changes to ensure fresh reports are collected.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| if (totalSourcePartitions != expectedSourcePartitionCount) { | ||
| expectedSourcePartitionCount = totalSourcePartitions; | ||
| // On a partition-count decrease, drop reports for partitions that no longer exist so | ||
| // completeness and min() are computed over the current partition set only. | ||
| committedWatermarkByPartition.keySet().removeIf(p -> p >= totalSourcePartitions); | ||
| } |
There was a problem hiding this comment.
When the totalSourcePartitions count changes (e.g., during a repartitioning or scaling event), the mapping and distribution of partitions change.
Currently, if the partition count decreases (e.g., from 4 to 2), the code only prunes partitions with indices >= totalSourcePartitions (line 105). However, if partitions 0 and 1 had already reported watermarks in the previous epoch, isComplete() will immediately return true without waiting for fresh reports from the new epoch. This can lead to using stale/outdated watermarks from the old partition layout, potentially causing the stage watermark to over-advance incorrectly.
To ensure correctness and adhere to the design goal of re-opening the hold until the new full set has reported, we should clear the committedWatermarkByPartition map whenever totalSourcePartitions changes. Since lastEmittedMillis protects the stage watermark from regressing, clearing the map is perfectly safe and guarantees we wait for a fresh report from every partition in the new layout.
Note: If you apply this change, you will also need to update the corresponding unit tests in WatermarkManagerTest.java (such as partitionCountIncreaseReopensHold and partitionCountDecreasePrunesStalePartitions) to ensure all partitions report after a partition count change.
| if (totalSourcePartitions != expectedSourcePartitionCount) { | |
| expectedSourcePartitionCount = totalSourcePartitions; | |
| // On a partition-count decrease, drop reports for partitions that no longer exist so | |
| // completeness and min() are computed over the current partition set only. | |
| committedWatermarkByPartition.keySet().removeIf(p -> p >= totalSourcePartitions); | |
| } | |
| if (totalSourcePartitions != expectedSourcePartitionCount) { | |
| expectedSourcePartitionCount = totalSourcePartitions; | |
| // On a partition-count change, clear previous reports to ensure we wait for a fresh | |
| // report from every partition in the new layout, avoiding stale watermarks. | |
| committedWatermarkByPartition.clear(); | |
| } |
|
Assigning reviewers: R: @kennknowles added as fallback since no labels match configuration Note: If you would like to opt out of this review, comment Available commands:
The PR bot will only process comments in the main thread (not review comments). |
Summary
First slice of the WatermarkManager, the prerequisite for any stateful transform (GroupByKey etc.). Too large for one PR, so split; this part is the in-memory core, decoupled from the Kafka wiring so it can be unit-tested in isolation. Plan agreed with @je-ik on Slack.
Design (agreed with @je-ik)
A stage's input watermark is min() over its upstream source partitions' committed watermarks. Tracking is keyed on source partitions, not producer
instances:
reader always knows how many it's waiting for;
partitions are reassigned and the new owner keeps reporting — so a killed
instance never leaves the reader stuck (no instance liveness tracking, no
describeConsumerGroups, no generationId needed).
Validated in a standalone Kafka Streams PoC before implementation.
Scope (this PR)
WatermarkManager:observe(sourcePartition, committedWatermarkMillis, totalSourcePartitions); holds atBoundedWindow.TIMESTAMP_MIN_VALUEuntilevery source partition has reported; then emits
min(); output is clampednon-decreasing; a change in
totalSourcePartitionsre-opens the hold (the"revert" case, without an explicit epoch).
partition-count increase/decrease, argument validation.
Out of scope (later parts)
ExecutableStageProcessor— flush(sourcePartition, committedWatermark, totalSourcePartitions)atomicallywith the offset commit (EOS), fan out to all downstream partitions, consume
behavior; real-Kafka integration tests over the 5 scenarios (steady,
scale-out, clean scale-in, SIGKILL, partition reassignment).
state + timers land.
Testing
./gradlew :runners:kafka-streams:checkgreen; 9 new unit tests.Fixes #38955
Refs #18479