Skip to content

Conversation

@HuangZhenQiu
Copy link
Collaborator

@HuangZhenQiu HuangZhenQiu commented Jan 29, 2026

Describe the issue this Pull Request addresses

support watermark emitter from split info

closes #14424

Summary and Changelog

  1. add the watermark emit by looking up the base file path in HoodieRecordEmitter
  2. add test coverage to test logic change

Impact

none

Risk Level

none

Documentation Update

none

Contributor's checklist

  • Read through contributor's guide
  • Enough context is provided in the sections above
  • Adequate tests were added if applicable

@github-actions github-actions bot added the size:M PR with lines of changes in (100, 300] label Jan 29, 2026
Copy link
Member

@xushiyan xushiyan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some minor issues


private long extractWatermark(HoodieSourceSplit split) {
// CDC split will be handled later
ValidationUtils.checkArgument(split.getBasePath().isPresent(), "Split base path can't be null.");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this check assumes no base path, while the comment implies it's CDC split. Should this check be a silent skip instead of an exception? Or should CDC splits be explicitly excluded upstream?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding explicit handling for CDC splits (even if just logging and skipping watermark emission)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't include CDC read in this release, shall we keep it as it is?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for streaming read, the split base path could also be null.

private long extractWatermark(HoodieSourceSplit split) {
// CDC split will be handled later
ValidationUtils.checkArgument(split.getBasePath().isPresent(), "Split base path can't be null.");
return Long.parseLong(FSUtils.getCommitTime(split.getBasePath().get()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HoodieSourceSplit always has latestCommit, shouldn't it be used directly?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The latestCommit in HoodieSourceSplit is the lastest commit when scans completed commits since the last scan time. The file level commit time is in a lower granularity and introduces a more accurate of watermark.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HuangZhenQiu i'm not sure i understand how this is scan-level latest commit. HoodieSourceSplit should be pointing to a file slice, so its latestCommit should be effectively the same as the latest commit from all files (base and log files) within the file slice

long newWatermark = extractWatermark(split);
if (newWatermark < watermark) {
LOG.info(
"Received a new split with lower watermark. Previous watermark = {}, current watermark = {}, previous split = {}, current split = {}",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldn't this info level log be too noisy? looks more like debug msg

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It actually shouldn't happen as we sorted split by the file commit instant time. Changed to warn.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why use the base instant time as the watermark instead of the latest commit in the split?

Copy link
Collaborator Author

@HuangZhenQiu HuangZhenQiu Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If so, all of the split discovered by the one time scan will emit the same watermark, right? But there could be multiple commits before the lastest commit are returned from scan. In this case, will not base instant time be more accurate?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

split is discovered by timeline commits, my guess is for each split, it reports the latest instant time and the Flink would finally figure out how to allgin them then send to downsteam?

BTW, if there is no base file in the split, the base instant time is the smallest instant from the log files.

Copy link
Collaborator Author

@HuangZhenQiu HuangZhenQiu Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the hint. How to find the instant from the log file? Is there a log file name pattern? If a split has both base file and log files, It should also be the the smallest instant from the log files right?

Copy link
Contributor

@danny0405 danny0405 Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should also be the the smallest instant from the log files right

does Flink has the contract that require split to return the min or max instant? if it is min, then it should be the smallest instant time of all files, if it is max, it should be the max instant time of all files.

We can always decode the instant time using FsUtils.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the split watermark is emitted after the split is processed, then it should be the max instant time of all files.

@xushiyan
Copy link
Member

xushiyan commented Jan 30, 2026

Pls also link issues, if this solves the issue, then use closing key words, if it's part of the issue, then just mention the it

@xushiyan xushiyan requested a review from cshuo January 30, 2026 05:01
@HuangZhenQiu HuangZhenQiu force-pushed the watermark-record-emitter branch 2 times, most recently from e0e73f1 to 2622251 Compare January 30, 2026 07:06
@github-actions github-actions bot added size:L PR with lines of changes in (300, 1000] and removed size:M PR with lines of changes in (100, 300] labels Jan 30, 2026
@HuangZhenQiu HuangZhenQiu force-pushed the watermark-record-emitter branch from 2622251 to 675b4b6 Compare January 30, 2026 07:28
}

output.collect(record.record());
split.updatePosition(record.fileOffset(), record.recordOffset());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't the watermark be only emitted after records processed? meaning emission happens after this line?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This watermark is for split finish not for per record. Thus, it should emit after a split is processed.

private long extractWatermark(HoodieSourceSplit split) {
// CDC split will be handled later
ValidationUtils.checkArgument(split.getBasePath().isPresent(), "Split base path can't be null.");
return Long.parseLong(FSUtils.getCommitTime(split.getBasePath().get()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HuangZhenQiu i'm not sure i understand how this is scan-level latest commit. HoodieSourceSplit should be pointing to a file slice, so its latestCommit should be effectively the same as the latest commit from all files (base and log files) within the file slice

}
}

return maxInstantTime;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HoodieSourceSplit should have maintain the order of log files which have the latest log file that can be accessed directly, if no log file, then just get base file's commit time as the latest. So this extraction can be O(1). But, as i commented on the other thread, shouldn't HoodieSourceSplit#latestCommit already be serving this info?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

latestCommit is not the last commit of the split. It is the latest commit when we fetch new commits from timeline. Thus, there could splits committed in different instant time have the same latestCommit time.

https://github.com/apache/hudi/blob/master/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java#L409

@HuangZhenQiu HuangZhenQiu force-pushed the watermark-record-emitter branch from 675b4b6 to 152e5bf Compare January 31, 2026 03:34
@HuangZhenQiu HuangZhenQiu force-pushed the watermark-record-emitter branch from 152e5bf to a131124 Compare January 31, 2026 03:41
@hudi-bot
Copy link
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

size:L PR with lines of changes in (300, 1000]

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Hudi Watermark Record Emitter

4 participants