Skip to content

Commit 5154d21

Browse files
committed
[FLINK-37416][BugFix][Connectors/DynamoDB] Fix state inconsistency issue in DDB connector when sending split finished event from reader -> enumerator
1 parent 8b09b33 commit 5154d21

File tree

11 files changed

+369
-22
lines changed

11 files changed

+369
-22
lines changed

flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -139,13 +139,25 @@ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
139139
/** When we mark a split as finished, we will only assign its child splits to the subtasks. */
140140
private void handleFinishedSplits(int subtaskId, SplitsFinishedEvent splitsFinishedEvent) {
141141
splitTracker.markAsFinished(splitsFinishedEvent.getFinishedSplitIds());
142-
splitAssignment
143-
.get(subtaskId)
144-
.removeIf(
145-
split ->
146-
splitsFinishedEvent
147-
.getFinishedSplitIds()
148-
.contains(split.splitId()));
142+
143+
Set<DynamoDbStreamsShardSplit> splitsAssignment = splitAssignment.get(subtaskId);
144+
// during recovery, splitAssignment may return null since there might be no split assigned
145+
// to the subtask, but there might be SplitsFinishedEvent from that subtask.
146+
// We will not do child shard assignment if that is the case since that might lead to child
147+
// shards trying to get assigned before there being any readers.
148+
if (splitsAssignment == null) {
149+
LOG.info(
150+
"handleFinishedSplits called for subtask: {} which doesnt have any "
151+
+ "assigned splits right now. This might happen due to job restarts. "
152+
+ "Child shard discovery might be delayed until we have enough readers."
153+
+ "Finished split ids: {}",
154+
subtaskId,
155+
splitsFinishedEvent.getFinishedSplitIds());
156+
return;
157+
}
158+
159+
splitsAssignment.removeIf(
160+
split -> splitsFinishedEvent.getFinishedSplitIds().contains(split.splitId()));
149161
assignChildSplits(splitsFinishedEvent.getFinishedSplitIds());
150162
}
151163

flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/assigner/UniformShardAssigner.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@ public int assign(DynamoDbStreamsShardSplit split, Context context) {
5454

5555
Preconditions.checkArgument(
5656
selectedSubtask != -1,
57-
"Expected at least one registered reader. Unable to assign split.");
57+
"Expected at least one registered reader. Unable to assign split with id: %s.",
58+
split.splitId());
5859
return selectedSubtask;
5960
}
6061
}

flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -333,7 +333,13 @@ private boolean verifyParentIsEitherFinishedOrCleanedUp(DynamoDbStreamsShardSpli
333333
|| isFinished(split.getParentShardId());
334334
}
335335

336-
private boolean isFinished(String splitId) {
336+
/**
337+
* Provides information whether a split is finished or not.
338+
*
339+
* @param splitId
340+
* @return boolean value indicating if split is finished
341+
*/
342+
public boolean isFinished(String splitId) {
337343
return finishedSplits.contains(splitId);
338344
}
339345
}

flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/reader/DynamoDbStreamsSourceReader.java

Lines changed: 81 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,14 @@
3333
import org.slf4j.LoggerFactory;
3434
import software.amazon.awssdk.services.dynamodb.model.Record;
3535

36+
import java.util.ArrayList;
37+
import java.util.Collections;
3638
import java.util.HashSet;
3739
import java.util.List;
3840
import java.util.Map;
41+
import java.util.NavigableMap;
42+
import java.util.Set;
43+
import java.util.TreeMap;
3944

4045
/**
4146
* Coordinates the reading from assigned splits. Runs on the TaskManager.
@@ -49,6 +54,8 @@ public class DynamoDbStreamsSourceReader<T>
4954

5055
private static final Logger LOG = LoggerFactory.getLogger(DynamoDbStreamsSourceReader.class);
5156
private final Map<String, DynamoDbStreamsShardMetrics> shardMetricGroupMap;
57+
private final NavigableMap<Long, Set<DynamoDbStreamsShardSplit>> splitFinishedEvents;
58+
private long currentCheckpointId;
5259

5360
public DynamoDbStreamsSourceReader(
5461
SingleThreadFetcherManager<Record, DynamoDbStreamsShardSplit> splitFetcherManager,
@@ -58,10 +65,35 @@ public DynamoDbStreamsSourceReader(
5865
Map<String, DynamoDbStreamsShardMetrics> shardMetricGroupMap) {
5966
super(splitFetcherManager, recordEmitter, config, context);
6067
this.shardMetricGroupMap = shardMetricGroupMap;
68+
this.splitFinishedEvents = new TreeMap<>();
69+
this.currentCheckpointId = Long.MIN_VALUE;
6170
}
6271

72+
/**
73+
* We store the finished splits in a map keyed by the checkpoint id.
74+
*
75+
* @param finishedSplitIds
76+
*/
6377
@Override
6478
protected void onSplitFinished(Map<String, DynamoDbStreamsShardSplitState> finishedSplitIds) {
79+
if (finishedSplitIds.isEmpty()) {
80+
return;
81+
}
82+
83+
splitFinishedEvents.computeIfAbsent(currentCheckpointId, k -> new HashSet<>());
84+
finishedSplitIds.values().stream()
85+
.map(
86+
finishedSplit ->
87+
new DynamoDbStreamsShardSplit(
88+
finishedSplit.getStreamArn(),
89+
finishedSplit.getShardId(),
90+
finishedSplit.getNextStartingPosition(),
91+
finishedSplit
92+
.getDynamoDbStreamsShardSplit()
93+
.getParentShardId(),
94+
true))
95+
.forEach(split -> splitFinishedEvents.get(currentCheckpointId).add(split));
96+
6597
context.sendSourceEventToCoordinator(
6698
new SplitsFinishedEvent(new HashSet<>(finishedSplitIds.keySet())));
6799
finishedSplitIds.keySet().forEach(this::unregisterShardMetricGroup);
@@ -80,8 +112,55 @@ protected DynamoDbStreamsShardSplit toSplitType(
80112

81113
@Override
82114
public void addSplits(List<DynamoDbStreamsShardSplit> splits) {
83-
splits.forEach(this::registerShardMetricGroup);
84-
super.addSplits(splits);
115+
List<DynamoDbStreamsShardSplit> dynamoDbStreamsShardSplits = new ArrayList<>();
116+
for (DynamoDbStreamsShardSplit split : splits) {
117+
if (split.isFinished()) {
118+
// Replay the finished split event.
119+
// We don't need to reload the split finished events in buffer back
120+
// since if the next checkpoint completes, these would just be removed from the
121+
// buffer. If the next checkpoint doesn't complete,
122+
// we would go back to the previous checkpointed
123+
// state which will again replay these split finished events.
124+
context.sendSourceEventToCoordinator(
125+
new SplitsFinishedEvent(Collections.singleton(split.splitId())));
126+
} else {
127+
dynamoDbStreamsShardSplits.add(split);
128+
}
129+
}
130+
dynamoDbStreamsShardSplits.forEach(this::registerShardMetricGroup);
131+
super.addSplits(dynamoDbStreamsShardSplits);
132+
}
133+
134+
/**
135+
* At snapshot, we also store the pending finished split ids in the current checkpoint so that
136+
* in case we have to restore the reader from state, we also send the finished split ids
137+
* otherwise we run a risk of data loss during restarts of the source because of the
138+
* SplitsFinishedEvent going missing.
139+
*
140+
* @param checkpointId
141+
* @return
142+
*/
143+
@Override
144+
public List<DynamoDbStreamsShardSplit> snapshotState(long checkpointId) {
145+
this.currentCheckpointId = checkpointId;
146+
List<DynamoDbStreamsShardSplit> splits = new ArrayList<>(super.snapshotState(checkpointId));
147+
148+
if (!splitFinishedEvents.isEmpty()) {
149+
// Add all finished splits to the snapshot
150+
splitFinishedEvents.values().forEach(splits::addAll);
151+
}
152+
return splits;
153+
}
154+
155+
/**
156+
* During notifyCheckpointComplete, we should clean up the state of finished splits that are
157+
* less than or equal to the checkpoint id.
158+
*
159+
* @param checkpointId
160+
*/
161+
@Override
162+
public void notifyCheckpointComplete(long checkpointId) {
163+
splitFinishedEvents.headMap(checkpointId, true).clear();
85164
}
86165

87166
@Override

flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplit.java

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,22 @@ public final class DynamoDbStreamsShardSplit implements SourceSplit {
4040
private final String shardId;
4141
private final StartingPosition startingPosition;
4242
private final String parentShardId;
43+
private final boolean isFinished;
4344

4445
public DynamoDbStreamsShardSplit(
4546
String streamArn,
4647
String shardId,
4748
StartingPosition startingPosition,
4849
String parentShardId) {
50+
this(streamArn, shardId, startingPosition, parentShardId, false);
51+
}
52+
53+
public DynamoDbStreamsShardSplit(
54+
String streamArn,
55+
String shardId,
56+
StartingPosition startingPosition,
57+
String parentShardId,
58+
boolean isFinished) {
4959
checkNotNull(streamArn, "streamArn cannot be null");
5060
checkNotNull(shardId, "shardId cannot be null");
5161
checkNotNull(startingPosition, "startingPosition cannot be null");
@@ -54,6 +64,7 @@ public DynamoDbStreamsShardSplit(
5464
this.shardId = shardId;
5565
this.startingPosition = startingPosition;
5666
this.parentShardId = parentShardId;
67+
this.isFinished = isFinished;
5768
}
5869

5970
@Override
@@ -77,6 +88,10 @@ public String getParentShardId() {
7788
return parentShardId;
7889
}
7990

91+
public boolean isFinished() {
92+
return isFinished;
93+
}
94+
8095
@Override
8196
public String toString() {
8297
return "DynamoDbStreamsShardSplit{"
@@ -90,7 +105,10 @@ public String toString() {
90105
+ startingPosition
91106
+ ", parentShardId=["
92107
+ parentShardId
93-
+ '}';
108+
+ "]"
109+
+ ", isFinished="
110+
+ isFinished
111+
+ "}";
94112
}
95113

96114
@Override
@@ -105,11 +123,12 @@ public boolean equals(Object o) {
105123
return Objects.equals(streamArn, that.streamArn)
106124
&& Objects.equals(shardId, that.shardId)
107125
&& Objects.equals(startingPosition, that.startingPosition)
108-
&& Objects.equals(parentShardId, that.parentShardId);
126+
&& Objects.equals(parentShardId, that.parentShardId)
127+
&& Objects.equals(isFinished, that.isFinished);
109128
}
110129

111130
@Override
112131
public int hashCode() {
113-
return Objects.hash(streamArn, shardId, startingPosition, parentShardId);
132+
return Objects.hash(streamArn, shardId, startingPosition, parentShardId, isFinished);
114133
}
115134
}

flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplitSerializer.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@
2929
import java.io.DataInputStream;
3030
import java.io.DataOutputStream;
3131
import java.io.IOException;
32+
import java.util.Arrays;
33+
import java.util.HashSet;
34+
import java.util.Set;
3235

3336
/**
3437
* Serializes and deserializes the {@link DynamoDbStreamsShardSplit}. This class needs to handle
@@ -38,7 +41,8 @@
3841
public class DynamoDbStreamsShardSplitSerializer
3942
implements SimpleVersionedSerializer<DynamoDbStreamsShardSplit> {
4043

41-
private static final int CURRENT_VERSION = 0;
44+
private static final Set<Integer> COMPATIBLE_VERSIONS = new HashSet<>(Arrays.asList(0, 1));
45+
private static final int CURRENT_VERSION = 1;
4246

4347
@Override
4448
public int getVersion() {
@@ -69,6 +73,7 @@ public byte[] serialize(DynamoDbStreamsShardSplit split) throws IOException {
6973
out.writeBoolean(true);
7074
out.writeUTF(split.getParentShardId());
7175
}
76+
out.writeBoolean(split.isFinished());
7277

7378
out.flush();
7479
return baos.toByteArray();
@@ -80,7 +85,7 @@ public DynamoDbStreamsShardSplit deserialize(int version, byte[] serialized)
8085
throws IOException {
8186
try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
8287
DataInputStream in = new DataInputStream(bais)) {
83-
if (version != getVersion()) {
88+
if (!COMPATIBLE_VERSIONS.contains(version)) {
8489
throw new VersionMismatchException(
8590
"Trying to deserialize DynamoDbStreamsShardSplit serialized with unsupported version "
8691
+ version
@@ -105,11 +110,18 @@ public DynamoDbStreamsShardSplit deserialize(int version, byte[] serialized)
105110
if (hasParentShardId) {
106111
parentShardId = in.readUTF();
107112
}
113+
114+
boolean isFinished = false;
115+
if (version > 0) {
116+
isFinished = in.readBoolean();
117+
}
118+
108119
return new DynamoDbStreamsShardSplit(
109120
streamArn,
110121
shardId,
111122
new StartingPosition(shardIteratorType, startingMarker),
112-
parentShardId);
123+
parentShardId,
124+
isFinished);
113125
}
114126
}
115127
}

flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumeratorStateSerializerTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,8 +127,8 @@ void testDeserializeWithWrongVersionSplitSerializer() throws Exception {
127127
.isThrownBy(() -> serializer.deserialize(serializer.getVersion(), serialized))
128128
.withMessageContaining(
129129
"Trying to deserialize DynamoDbStreamsShardSplit serialized with unsupported version")
130-
.withMessageContaining(String.valueOf(serializer.getVersion()))
131-
.withMessageContaining(String.valueOf(wrongVersionStateSerializer.getVersion()));
130+
.withMessageContaining(String.valueOf(splitSerializer.getVersion()))
131+
.withMessageContaining(String.valueOf(wrongVersionSplitSerializer.getVersion()));
132132
}
133133

134134
@Test

flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/enumerator/assigner/UniformShardAssignerTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,9 @@ void testNoRegisteredReaders() {
106106
assertThatExceptionOfType(IllegalArgumentException.class)
107107
.isThrownBy(() -> assigner.assign(split, assignerContext))
108108
.withMessageContaining(
109-
"Expected at least one registered reader. Unable to assign split.");
109+
String.format(
110+
"Expected at least one registered reader. Unable to assign split with id: %s.",
111+
split.splitId()));
110112
}
111113

112114
private void createReaderWithAssignedSplits(

0 commit comments

Comments
 (0)