Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,25 @@ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
/** When we mark a split as finished, we will only assign its child splits to the subtasks. */
private void handleFinishedSplits(int subtaskId, SplitsFinishedEvent splitsFinishedEvent) {
splitTracker.markAsFinished(splitsFinishedEvent.getFinishedSplitIds());
splitAssignment
.get(subtaskId)
.removeIf(
split ->
splitsFinishedEvent
.getFinishedSplitIds()
.contains(split.splitId()));

Set<DynamoDbStreamsShardSplit> splitsAssignment = splitAssignment.get(subtaskId);
// during recovery, splitAssignment may return null since there might be no split assigned
// to the subtask, but there might be SplitsFinishedEvent from that subtask.
// We will not do child shard assignment if that is the case since that might lead to child
// shards trying to get assigned before there being any readers.
if (splitsAssignment == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we make this a generic warn if we don't find the finished split in the assigned splits? (Instead of if no split is assigned to subtask)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd not weaken consistency checks here for all events. I'd mark the events as recovered and then allow some leniency (not even WARN because this is expected).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

made this a info for now, will make the consistency checks stronger in a separate PR if that works?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good spot Arvid..!

LOG.info(
"handleFinishedSplits called for subtask: {} which doesnt have any "
+ "assigned splits right now. This might happen due to job restarts. "
+ "Child shard discovery might be delayed until we have enough readers."
+ "Finished split ids: {}",
subtaskId,
splitsFinishedEvent.getFinishedSplitIds());
return;
}

splitsAssignment.removeIf(
split -> splitsFinishedEvent.getFinishedSplitIds().contains(split.splitId()));
assignChildSplits(splitsFinishedEvent.getFinishedSplitIds());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ public int assign(DynamoDbStreamsShardSplit split, Context context) {

Preconditions.checkArgument(
selectedSubtask != -1,
"Expected at least one registered reader. Unable to assign split.");
"Expected at least one registered reader. Unable to assign split with id: %s.",
split.splitId());
return selectedSubtask;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,13 @@ private boolean verifyParentIsEitherFinishedOrCleanedUp(DynamoDbStreamsShardSpli
|| isFinished(split.getParentShardId());
}

private boolean isFinished(String splitId) {
/**
* Provides information whether a split is finished or not.
*
* @param splitId
* @return boolean value indicating if split is finished
*/
public boolean isFinished(String splitId) {
return finishedSplits.contains(splitId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,14 @@
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.dynamodb.model.Record;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;

/**
* Coordinates the reading from assigned splits. Runs on the TaskManager.
Expand All @@ -49,6 +54,8 @@ public class DynamoDbStreamsSourceReader<T>

private static final Logger LOG = LoggerFactory.getLogger(DynamoDbStreamsSourceReader.class);
private final Map<String, DynamoDbStreamsShardMetrics> shardMetricGroupMap;
private final NavigableMap<Long, Set<DynamoDbStreamsShardSplit>> splitFinishedEvents;
private long currentCheckpointId;

public DynamoDbStreamsSourceReader(
SingleThreadFetcherManager<Record, DynamoDbStreamsShardSplit> splitFetcherManager,
Expand All @@ -58,10 +65,35 @@ public DynamoDbStreamsSourceReader(
Map<String, DynamoDbStreamsShardMetrics> shardMetricGroupMap) {
super(splitFetcherManager, recordEmitter, config, context);
this.shardMetricGroupMap = shardMetricGroupMap;
this.splitFinishedEvents = new TreeMap<>();
this.currentCheckpointId = Long.MIN_VALUE;
}

/**
* We store the finished splits in a map keyed by the checkpoint id.
*
* @param finishedSplitIds
*/
@Override
protected void onSplitFinished(Map<String, DynamoDbStreamsShardSplitState> finishedSplitIds) {
if (finishedSplitIds.isEmpty()) {
return;
}

splitFinishedEvents.computeIfAbsent(currentCheckpointId, k -> new HashSet<>());
finishedSplitIds.values().stream()
.map(
finishedSplit ->
new DynamoDbStreamsShardSplit(
finishedSplit.getStreamArn(),
finishedSplit.getShardId(),
finishedSplit.getNextStartingPosition(),
finishedSplit
.getDynamoDbStreamsShardSplit()
.getParentShardId(),
true))
.forEach(split -> splitFinishedEvents.get(currentCheckpointId).add(split));

context.sendSourceEventToCoordinator(
new SplitsFinishedEvent(new HashSet<>(finishedSplitIds.keySet())));
finishedSplitIds.keySet().forEach(this::unregisterShardMetricGroup);
Expand All @@ -80,8 +112,55 @@ protected DynamoDbStreamsShardSplit toSplitType(

@Override
public void addSplits(List<DynamoDbStreamsShardSplit> splits) {
splits.forEach(this::registerShardMetricGroup);
super.addSplits(splits);
List<DynamoDbStreamsShardSplit> dynamoDbStreamsShardSplits = new ArrayList<>();
for (DynamoDbStreamsShardSplit split : splits) {
if (split.isFinished()) {
// Replay the finished split event.
// We don't need to reload the split finished events in buffer back
// since if the next checkpoint completes, these would just be removed from the
// buffer. If the next checkpoint doesn't complete,
// we would go back to the previous checkpointed
// state which will again replay these split finished events.
context.sendSourceEventToCoordinator(
new SplitsFinishedEvent(Collections.singleton(split.splitId())));
} else {
dynamoDbStreamsShardSplits.add(split);
}
}
dynamoDbStreamsShardSplits.forEach(this::registerShardMetricGroup);
super.addSplits(dynamoDbStreamsShardSplits);
}

/**
* At snapshot, we also store the pending finished split ids in the current checkpoint so that
* in case we have to restore the reader from state, we also send the finished split ids
* otherwise we run a risk of data loss during restarts of the source because of the
* SplitsFinishedEvent going missing.
*
* @param checkpointId
* @return
*/
@Override
public List<DynamoDbStreamsShardSplit> snapshotState(long checkpointId) {
this.currentCheckpointId = checkpointId;
List<DynamoDbStreamsShardSplit> splits = new ArrayList<>(super.snapshotState(checkpointId));

if (!splitFinishedEvents.isEmpty()) {
// Add all finished splits to the snapshot
splitFinishedEvents.values().forEach(splits::addAll);
}
return splits;
}

/**
* During notifyCheckpointComplete, we should clean up the state of finished splits that are
* less than or equal to the checkpoint id.
*
* @param checkpointId
*/
@Override
public void notifyCheckpointComplete(long checkpointId) {
splitFinishedEvents.headMap(checkpointId, true).clear();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,22 @@ public final class DynamoDbStreamsShardSplit implements SourceSplit {
private final String shardId;
private final StartingPosition startingPosition;
private final String parentShardId;
private final boolean isFinished;

public DynamoDbStreamsShardSplit(
String streamArn,
String shardId,
StartingPosition startingPosition,
String parentShardId) {
this(streamArn, shardId, startingPosition, parentShardId, false);
}

public DynamoDbStreamsShardSplit(
String streamArn,
String shardId,
StartingPosition startingPosition,
String parentShardId,
boolean isFinished) {
checkNotNull(streamArn, "streamArn cannot be null");
checkNotNull(shardId, "shardId cannot be null");
checkNotNull(startingPosition, "startingPosition cannot be null");
Expand All @@ -54,6 +64,7 @@ public DynamoDbStreamsShardSplit(
this.shardId = shardId;
this.startingPosition = startingPosition;
this.parentShardId = parentShardId;
this.isFinished = isFinished;
}

@Override
Expand All @@ -77,6 +88,10 @@ public String getParentShardId() {
return parentShardId;
}

public boolean isFinished() {
return isFinished;
}

@Override
public String toString() {
return "DynamoDbStreamsShardSplit{"
Expand All @@ -90,7 +105,10 @@ public String toString() {
+ startingPosition
+ ", parentShardId=["
+ parentShardId
+ '}';
+ "]"
+ ", isFinished="
+ isFinished
+ "}";
}

@Override
Expand All @@ -105,11 +123,12 @@ public boolean equals(Object o) {
return Objects.equals(streamArn, that.streamArn)
&& Objects.equals(shardId, that.shardId)
&& Objects.equals(startingPosition, that.startingPosition)
&& Objects.equals(parentShardId, that.parentShardId);
&& Objects.equals(parentShardId, that.parentShardId)
&& Objects.equals(isFinished, that.isFinished);
}

@Override
public int hashCode() {
return Objects.hash(streamArn, shardId, startingPosition, parentShardId);
return Objects.hash(streamArn, shardId, startingPosition, parentShardId, isFinished);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;

/**
* Serializes and deserializes the {@link DynamoDbStreamsShardSplit}. This class needs to handle
Expand All @@ -38,7 +41,8 @@
public class DynamoDbStreamsShardSplitSerializer
implements SimpleVersionedSerializer<DynamoDbStreamsShardSplit> {

private static final int CURRENT_VERSION = 0;
private static final Set<Integer> COMPATIBLE_VERSIONS = new HashSet<>(Arrays.asList(0, 1));
private static final int CURRENT_VERSION = 1;

@Override
public int getVersion() {
Expand Down Expand Up @@ -69,6 +73,7 @@ public byte[] serialize(DynamoDbStreamsShardSplit split) throws IOException {
out.writeBoolean(true);
out.writeUTF(split.getParentShardId());
}
out.writeBoolean(split.isFinished());

out.flush();
return baos.toByteArray();
Expand All @@ -80,7 +85,7 @@ public DynamoDbStreamsShardSplit deserialize(int version, byte[] serialized)
throws IOException {
try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
DataInputStream in = new DataInputStream(bais)) {
if (version != getVersion()) {
if (!COMPATIBLE_VERSIONS.contains(version)) {
throw new VersionMismatchException(
"Trying to deserialize DynamoDbStreamsShardSplit serialized with unsupported version "
+ version
Expand All @@ -105,11 +110,18 @@ public DynamoDbStreamsShardSplit deserialize(int version, byte[] serialized)
if (hasParentShardId) {
parentShardId = in.readUTF();
}

boolean isFinished = false;
if (version > 0) {
isFinished = in.readBoolean();
}

return new DynamoDbStreamsShardSplit(
streamArn,
shardId,
new StartingPosition(shardIteratorType, startingMarker),
parentShardId);
parentShardId,
isFinished);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@ void testDeserializeWithWrongVersionSplitSerializer() throws Exception {
.isThrownBy(() -> serializer.deserialize(serializer.getVersion(), serialized))
.withMessageContaining(
"Trying to deserialize DynamoDbStreamsShardSplit serialized with unsupported version")
.withMessageContaining(String.valueOf(serializer.getVersion()))
.withMessageContaining(String.valueOf(wrongVersionStateSerializer.getVersion()));
.withMessageContaining(String.valueOf(splitSerializer.getVersion()))
.withMessageContaining(String.valueOf(wrongVersionSplitSerializer.getVersion()));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,9 @@ void testNoRegisteredReaders() {
assertThatExceptionOfType(IllegalArgumentException.class)
.isThrownBy(() -> assigner.assign(split, assignerContext))
.withMessageContaining(
"Expected at least one registered reader. Unable to assign split.");
String.format(
"Expected at least one registered reader. Unable to assign split with id: %s.",
split.splitId()));
}

private void createReaderWithAssignedSplits(
Expand Down
Loading
Loading