Skip to content

Commit 3b705f4

Browse files
committed
[FLINK-22494][runtime] Adds PossibleInconsistentStateException handling to CheckpointCoordinator
1 parent 5d1e54e commit 3b705f4

File tree

4 files changed

+107
-2
lines changed

4 files changed

+107
-2
lines changed

flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
3838
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
3939
import org.apache.flink.runtime.operators.coordination.OperatorInfo;
40+
import org.apache.flink.runtime.persistence.PossibleInconsistentStateException;
4041
import org.apache.flink.runtime.state.CheckpointStorage;
4142
import org.apache.flink.runtime.state.CheckpointStorageCoordinatorView;
4243
import org.apache.flink.runtime.state.CheckpointStorageLocation;
@@ -1208,8 +1209,16 @@ private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint)
12081209
completedCheckpointStore.addCheckpoint(
12091210
completedCheckpoint, checkpointsCleaner, this::scheduleTriggerRequest);
12101211
} catch (Exception exception) {
1211-
// we failed to store the completed checkpoint. Let's clean up
1212-
checkpointsCleaner.cleanCheckpointOnFailedStoring(completedCheckpoint, executor);
1212+
if (exception instanceof PossibleInconsistentStateException) {
1213+
LOG.warn(
1214+
"An error occurred while writing checkpoint {} to the underlying metadata store. Flink was not able to determine whether the metadata was successfully persisted. The corresponding state located at '{}' won't be discarded and needs to be cleaned up manually.",
1215+
completedCheckpoint.getCheckpointID(),
1216+
completedCheckpoint.getExternalPointer());
1217+
} else {
1218+
// we failed to store the completed checkpoint. Let's clean up
1219+
checkpointsCleaner.cleanCheckpointOnFailedStoring(
1220+
completedCheckpoint, executor);
1221+
}
12131222

12141223
sendAbortedMessages(
12151224
pendingCheckpoint.getCheckpointPlan().getTasksToCommitTo(),

flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.flink.api.common.JobStatus;
2222
import org.apache.flink.api.java.tuple.Tuple2;
23+
import org.apache.flink.runtime.persistence.PossibleInconsistentStateException;
2324
import org.apache.flink.runtime.persistence.ResourceVersion;
2425
import org.apache.flink.runtime.persistence.StateHandleStore;
2526
import org.apache.flink.runtime.state.RetrievableStateHandle;
@@ -212,6 +213,9 @@ public void recover() throws Exception {
212213
* older ones.
213214
*
214215
* @param checkpoint Completed checkpoint to add.
216+
* @throws PossibleInconsistentStateException if adding the checkpoint failed and leaving the
217+
* system in an possibly inconsistent state, i.e. it's uncertain whether the checkpoint
218+
* metadata was fully written to the underlying systems or not.
215219
*/
216220
@Override
217221
public void addCheckpoint(

flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,27 +23,36 @@
2323
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
2424
import org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionInfo;
2525
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
26+
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
2627
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
2728
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
2829
import org.apache.flink.runtime.jobgraph.JobVertexID;
2930
import org.apache.flink.runtime.jobgraph.OperatorID;
3031
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
32+
import org.apache.flink.runtime.persistence.PossibleInconsistentStateException;
3133
import org.apache.flink.runtime.state.InputChannelStateHandle;
3234
import org.apache.flink.runtime.state.KeyedStateHandle;
3335
import org.apache.flink.runtime.state.OperatorStateHandle;
3436
import org.apache.flink.runtime.state.OperatorStreamStateHandle;
3537
import org.apache.flink.runtime.state.ResultSubpartitionStateHandle;
3638
import org.apache.flink.runtime.state.StreamStateHandle;
39+
import org.apache.flink.util.FlinkRuntimeException;
3740
import org.apache.flink.util.TestLogger;
3841
import org.apache.flink.util.function.TriConsumerWithException;
3942

43+
import org.junit.Rule;
4044
import org.junit.Test;
45+
import org.junit.rules.TemporaryFolder;
4146

4247
import java.util.Collections;
4348
import java.util.List;
49+
import java.util.concurrent.Executor;
50+
import java.util.concurrent.atomic.AtomicInteger;
4451

52+
import static org.hamcrest.CoreMatchers.is;
4553
import static org.junit.Assert.assertEquals;
4654
import static org.junit.Assert.assertFalse;
55+
import static org.junit.Assert.assertThat;
4756
import static org.junit.Assert.assertTrue;
4857
import static org.junit.Assert.fail;
4958
import static org.mockito.Mockito.mock;
@@ -54,6 +63,8 @@
5463
/** Tests for failure of checkpoint coordinator. */
5564
public class CheckpointCoordinatorFailureTest extends TestLogger {
5665

66+
@Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
67+
5768
/**
5869
* Tests that a failure while storing a completed checkpoint in the completed checkpoint store
5970
* will properly fail the originating pending checkpoint and clean upt the completed checkpoint.
@@ -166,6 +177,81 @@ public void testFailingCompletedCheckpointStoreAdd() throws Exception {
166177
.discardState();
167178
}
168179

180+
@Test
181+
public void testCleanupForGenericFailure() throws Exception {
182+
testStoringFailureHandling(new FlinkRuntimeException("Expected exception"), 1);
183+
}
184+
185+
@Test
186+
public void testCleanupOmissionForPossibleInconsistentStateException() throws Exception {
187+
testStoringFailureHandling(new PossibleInconsistentStateException(), 0);
188+
}
189+
190+
private void testStoringFailureHandling(Exception failure, int expectedCleanupCalls)
191+
throws Exception {
192+
final JobVertexID jobVertexID1 = new JobVertexID();
193+
194+
final ExecutionGraph graph =
195+
new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
196+
.addJobVertex(jobVertexID1)
197+
.build();
198+
199+
final ExecutionVertex vertex = graph.getJobVertex(jobVertexID1).getTaskVertices()[0];
200+
final ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
201+
202+
final StandaloneCheckpointIDCounter checkpointIDCounter =
203+
new StandaloneCheckpointIDCounter();
204+
205+
final ManuallyTriggeredScheduledExecutor manuallyTriggeredScheduledExecutor =
206+
new ManuallyTriggeredScheduledExecutor();
207+
208+
final CompletedCheckpointStore completedCheckpointStore =
209+
new FailingCompletedCheckpointStore(
210+
(checkpoint, ignoredCleaner, ignoredPostCleanCallback) -> {
211+
throw failure;
212+
});
213+
214+
final AtomicInteger cleanupCallCount = new AtomicInteger(0);
215+
final CheckpointCoordinator checkpointCoordinator =
216+
new CheckpointCoordinatorBuilder()
217+
.setExecutionGraph(graph)
218+
.setCheckpointIDCounter(checkpointIDCounter)
219+
.setCheckpointsCleaner(
220+
new CheckpointsCleaner() {
221+
222+
private static final long serialVersionUID =
223+
2029876992397573325L;
224+
225+
@Override
226+
public void cleanCheckpointOnFailedStoring(
227+
CompletedCheckpoint completedCheckpoint,
228+
Executor executor) {
229+
cleanupCallCount.incrementAndGet();
230+
super.cleanCheckpointOnFailedStoring(
231+
completedCheckpoint, executor);
232+
}
233+
})
234+
.setCompletedCheckpointStore(completedCheckpointStore)
235+
.setTimer(manuallyTriggeredScheduledExecutor)
236+
.build();
237+
checkpointCoordinator.triggerSavepoint(tmpFolder.newFolder().getAbsolutePath());
238+
manuallyTriggeredScheduledExecutor.triggerAll();
239+
240+
try {
241+
checkpointCoordinator.receiveAcknowledgeMessage(
242+
new AcknowledgeCheckpoint(
243+
graph.getJobID(), attemptId, checkpointIDCounter.getLast()),
244+
"unknown location");
245+
fail("CheckpointException should have been thrown.");
246+
} catch (CheckpointException e) {
247+
assertThat(
248+
e.getCheckpointFailureReason(),
249+
is(CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE));
250+
}
251+
252+
assertThat(cleanupCallCount.get(), is(expectedCleanupCalls));
253+
}
254+
169255
private static final class FailingCompletedCheckpointStore implements CompletedCheckpointStore {
170256

171257
private final TriConsumerWithException<

flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -695,6 +695,12 @@ public CheckpointCoordinatorBuilder setCoordinatorsToCheckpoint(
695695
return this;
696696
}
697697

698+
public CheckpointCoordinatorBuilder setCheckpointsCleaner(
699+
CheckpointsCleaner checkpointsCleaner) {
700+
this.checkpointsCleaner = checkpointsCleaner;
701+
return this;
702+
}
703+
698704
public CheckpointCoordinatorBuilder setCheckpointIDCounter(
699705
CheckpointIDCounter checkpointIDCounter) {
700706
this.checkpointIDCounter = checkpointIDCounter;

0 commit comments

Comments
 (0)