-
Notifications
You must be signed in to change notification settings - Fork 73
[FLINK-37416][BugFix][Connectors/DynamoDB] Fix state inconsistency issue in DDB connector when sending split finished event from reader -> enumerator #193
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
AHeise
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approach LGTM in general and I added some smaller nits.
When we discussed the approach with the TreeMap, I forgot that the state of a reader is just a list of splits. So, now you used a special split to track the finished events, which is a plausible solution but also a bit awkward. (We should probably extend the interface to also allow reader-native state)
So, I provided an alternative solution where the information is layered directly inside the split and thus makes the special split unnecessary. PTAL.
...a/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java
Outdated
Show resolved
Hide resolved
...ava/org/apache/flink/connector/dynamodb/source/enumerator/assigner/UniformShardAssigner.java
Outdated
Show resolved
Hide resolved
...main/java/org/apache/flink/connector/dynamodb/source/reader/DynamoDbStreamsSourceReader.java
Show resolved
Hide resolved
...rc/main/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplit.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/flink/connector/dynamodb/source/reader/DynamoDbStreamsSourceReaderTest.java
Outdated
Show resolved
Hide resolved
05d00d9 to
d086a00
Compare
...main/java/org/apache/flink/connector/dynamodb/source/reader/DynamoDbStreamsSourceReader.java
Outdated
Show resolved
Hide resolved
...a/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java
Outdated
Show resolved
Hide resolved
...rc/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java
Outdated
Show resolved
Hide resolved
...main/java/org/apache/flink/connector/dynamodb/source/reader/DynamoDbStreamsSourceReader.java
Show resolved
Hide resolved
...rc/main/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplit.java
Outdated
Show resolved
Hide resolved
98cb457 to
567e5fa
Compare
...main/java/org/apache/flink/connector/dynamodb/source/reader/DynamoDbStreamsSourceReader.java
Outdated
Show resolved
Hide resolved
60833a5 to
f518db5
Compare
AHeise
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM with some small nits.
...main/java/org/apache/flink/connector/dynamodb/source/reader/DynamoDbStreamsSourceReader.java
Outdated
Show resolved
Hide resolved
...main/java/org/apache/flink/connector/dynamodb/source/reader/DynamoDbStreamsSourceReader.java
Show resolved
Hide resolved
hlteoh37
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Added 1 nit
| // to the subtask, but there might be SplitsFinishedEvent from that subtask. | ||
| // We will not do child shard assignment if that is the case since that might lead to child | ||
| // shards trying to get assigned before there being any readers. | ||
| if (splitsAssignment == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't we make this a generic warn if we don't find the finished split in the assigned splits? (Instead of if no split is assigned to subtask)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd not weaken consistency checks here for all events. I'd mark the events as recovered and then allow some leniency (not even WARN because this is expected).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
made this a info for now, will make the consistency checks stronger in a separate PR if that works?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good spot Arvid..!
f2ec471 to
87fbe9b
Compare
.../java/org/apache/flink/connector/dynamodb/source/reader/DynamoDbStreamsSourceReaderTest.java
Show resolved
Hide resolved
.../java/org/apache/flink/connector/dynamodb/source/reader/DynamoDbStreamsSourceReaderTest.java
Outdated
Show resolved
Hide resolved
…sue in DDB connector when sending split finished event from reader -> enumerator
leekeiabstraction
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the changes and the clean up on finishedAfterCheckpoint!
Purpose of the change
Today Flink does not support distributed consistency of events from subtask (Task Manager) to coordinator (Job Manager) - https://issues.apache.org/jira/browse/FLINK-28639. As a result we have a race condition that can lead to a shard and it's children shards stopped being processed after a job restart.
This can lead to a shard lineage getting lost because of a shard being in ASSIGNED state in enumerator and not being part of any task manager state.
This PR changes the behaviour by also checkpointing the finished splits events received in between two checkpoints and on restore, those events again getting replayed.
Verifying this change
Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
I manually verified this by running the connector in a local flink cluster which was getting restarted every 10 minutes. I see there is no checkpoint discrepancy and there is no issue in the clusster
I also added UTs for the change
Significant changes
(Please check any boxes [x] if the answer is "yes". You can first publish the PR and check them afterwards, for convenience.)
@Public(Evolving))