Skip to content

Conversation

@gguptp
Copy link
Contributor

@gguptp gguptp commented Mar 25, 2025

Purpose of the change

Today Flink does not support distributed consistency of events from subtask (Task Manager) to coordinator (Job Manager) - https://issues.apache.org/jira/browse/FLINK-28639. As a result we have a race condition that can lead to a shard and it's children shards stopped being processed after a job restart.

  • A checkpoint started
  • Enumerator took a checkpoint (shard was assigned here)
  • Enumerator sent checkpoint event to reader
  • Before taking reader checkpoint, a SplitFinishedEvent came up in reader
  • Reader took checkpoint
  • Now, just after checkpoint complete, job restarted

This can lead to a shard lineage getting lost because of a shard being in ASSIGNED state in enumerator and not being part of any task manager state.
This PR changes the behaviour by also checkpointing the finished splits events received in between two checkpoints and on restore, those events again getting replayed.

Verifying this change

Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment
  • Added unit tests
  • Manually verified by running the Kinesis connector on a local Flink cluster.

I manually verified this by running the connector in a local flink cluster which was getting restarted every 10 minutes. I see there is no checkpoint discrepancy and there is no issue in the clusster

I also added UTs for the change

Significant changes

(Please check any boxes [x] if the answer is "yes". You can first publish the PR and check them afterwards, for convenience.)

  • Dependencies have been added or upgraded
  • Public API has been changed (Public API is any class annotated with @Public(Evolving))
  • Serializers have been changed
  • New feature has been introduced
    • If yes, how is this documented? (not applicable / docs / JavaDocs / not documented)

Copy link

@AHeise AHeise left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approach LGTM in general and I added some smaller nits.

When we discussed the approach with the TreeMap, I forgot that the state of a reader is just a list of splits. So, now you used a special split to track the finished events, which is a plausible solution but also a bit awkward. (We should probably extend the interface to also allow reader-native state)

So, I provided an alternative solution where the information is layered directly inside the split and thus makes the special split unnecessary. PTAL.

@gguptp gguptp force-pushed the main branch 3 times, most recently from 05d00d9 to d086a00 Compare March 26, 2025 09:54
@gguptp gguptp force-pushed the main branch 4 times, most recently from 98cb457 to 567e5fa Compare March 26, 2025 12:57
@gguptp gguptp force-pushed the main branch 4 times, most recently from 60833a5 to f518db5 Compare March 26, 2025 13:50
Copy link

@AHeise AHeise left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM with some small nits.

Copy link
Contributor

@hlteoh37 hlteoh37 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Added 1 nit

// to the subtask, but there might be SplitsFinishedEvent from that subtask.
// We will not do child shard assignment if that is the case since that might lead to child
// shards trying to get assigned before there being any readers.
if (splitsAssignment == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we make this a generic warn if we don't find the finished split in the assigned splits? (Instead of if no split is assigned to subtask)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd not weaken consistency checks here for all events. I'd mark the events as recovered and then allow some leniency (not even WARN because this is expected).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

made this a info for now, will make the consistency checks stronger in a separate PR if that works?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good spot Arvid..!

@gguptp gguptp force-pushed the main branch 2 times, most recently from f2ec471 to 87fbe9b Compare March 26, 2025 14:40
…sue in DDB connector when sending split finished event from reader -> enumerator
Copy link
Contributor

@leekeiabstraction leekeiabstraction left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the changes and the clean up on finishedAfterCheckpoint!

@hlteoh37 hlteoh37 merged commit ca96d84 into apache:main Mar 26, 2025
7 of 9 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants