Skip to content

Commit 1399332

Browse files
committed
[FLINK-22494][runtime] Adds PossibleInconsistentStateException handling to CheckpointCoordinator
1 parent 3d8ea07 commit 1399332

File tree

4 files changed

+115
-4
lines changed

4 files changed

+115
-4
lines changed

flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
3838
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
3939
import org.apache.flink.runtime.operators.coordination.OperatorInfo;
40+
import org.apache.flink.runtime.persistence.PossibleInconsistentStateException;
4041
import org.apache.flink.runtime.state.CheckpointStorage;
4142
import org.apache.flink.runtime.state.CheckpointStorageCoordinatorView;
4243
import org.apache.flink.runtime.state.CheckpointStorageLocation;
@@ -1208,8 +1209,16 @@ private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint)
12081209
completedCheckpointStore.addCheckpoint(
12091210
completedCheckpoint, checkpointsCleaner, this::scheduleTriggerRequest);
12101211
} catch (Exception exception) {
1211-
// we failed to store the completed checkpoint. Let's clean up
1212-
checkpointsCleaner.cleanCheckpointOnFailedStoring(completedCheckpoint, executor);
1212+
if (exception instanceof PossibleInconsistentStateException) {
1213+
LOG.warn(
1214+
"An error occurred while writing checkpoint {} to the underlying metadata store. Flink was not able to determine whether the metadata was successfully persisted. The corresponding state located at '{}' won't be discarded and needs to be cleaned up manually.",
1215+
completedCheckpoint.getCheckpointID(),
1216+
completedCheckpoint.getExternalPointer());
1217+
} else {
1218+
// we failed to store the completed checkpoint. Let's clean up
1219+
checkpointsCleaner.cleanCheckpointOnFailedStoring(
1220+
completedCheckpoint, executor);
1221+
}
12131222

12141223
sendAbortedMessages(
12151224
pendingCheckpoint.getCheckpointPlan().getTasksToCommitTo(),

flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.flink.api.common.JobStatus;
2222
import org.apache.flink.api.java.tuple.Tuple2;
23+
import org.apache.flink.runtime.persistence.PossibleInconsistentStateException;
2324
import org.apache.flink.runtime.persistence.ResourceVersion;
2425
import org.apache.flink.runtime.persistence.StateHandleStore;
2526
import org.apache.flink.runtime.state.RetrievableStateHandle;
@@ -212,6 +213,9 @@ public void recover() throws Exception {
212213
* older ones.
213214
*
214215
* @param checkpoint Completed checkpoint to add.
216+
* @throws PossibleInconsistentStateException if adding the checkpoint failed and leaving the
217+
* system in an possibly inconsistent state, i.e. it's uncertain whether the checkpoint
218+
* metadata was fully written to the underlying systems or not.
215219
*/
216220
@Override
217221
public void addCheckpoint(

flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java

Lines changed: 94 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,26 +23,35 @@
2323
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
2424
import org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionInfo;
2525
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
26+
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
2627
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
2728
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
2829
import org.apache.flink.runtime.jobgraph.JobVertexID;
2930
import org.apache.flink.runtime.jobgraph.OperatorID;
3031
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
32+
import org.apache.flink.runtime.persistence.PossibleInconsistentStateException;
3133
import org.apache.flink.runtime.state.InputChannelStateHandle;
3234
import org.apache.flink.runtime.state.KeyedStateHandle;
3335
import org.apache.flink.runtime.state.OperatorStateHandle;
3436
import org.apache.flink.runtime.state.OperatorStreamStateHandle;
3537
import org.apache.flink.runtime.state.ResultSubpartitionStateHandle;
3638
import org.apache.flink.runtime.state.StreamStateHandle;
39+
import org.apache.flink.util.FlinkRuntimeException;
3740
import org.apache.flink.util.TestLogger;
3841

42+
import org.junit.Rule;
3943
import org.junit.Test;
44+
import org.junit.rules.TemporaryFolder;
4045

4146
import java.util.Collections;
4247
import java.util.List;
48+
import java.util.concurrent.Executor;
49+
import java.util.concurrent.atomic.AtomicInteger;
4350

51+
import static org.hamcrest.CoreMatchers.is;
4452
import static org.junit.Assert.assertEquals;
4553
import static org.junit.Assert.assertFalse;
54+
import static org.junit.Assert.assertThat;
4655
import static org.junit.Assert.assertTrue;
4756
import static org.junit.Assert.fail;
4857
import static org.mockito.Mockito.mock;
@@ -53,6 +62,8 @@
5362
/** Tests for failure of checkpoint coordinator. */
5463
public class CheckpointCoordinatorFailureTest extends TestLogger {
5564

65+
@Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
66+
5667
/**
5768
* Tests that a failure while storing a completed checkpoint in the completed checkpoint store
5869
* will properly fail the originating pending checkpoint and clean upt the completed checkpoint.
@@ -75,7 +86,10 @@ public void testFailingCompletedCheckpointStoreAdd() throws Exception {
7586
CheckpointCoordinator coord =
7687
new CheckpointCoordinatorBuilder()
7788
.setExecutionGraph(testGraph)
78-
.setCompletedCheckpointStore(new FailingCompletedCheckpointStore())
89+
.setCompletedCheckpointStore(
90+
new FailingCompletedCheckpointStore(
91+
new Exception(
92+
"The failing completed checkpoint store failed again... :-(")))
7993
.setTimer(manuallyTriggeredScheduledExecutor)
8094
.build();
8195

@@ -160,8 +174,86 @@ public void testFailingCompletedCheckpointStoreAdd() throws Exception {
160174
.discardState();
161175
}
162176

177+
@Test
178+
public void testCleanupForGenericFailure() throws Exception {
179+
testStoringFailureHandling(new FlinkRuntimeException("Expected exception"), 1);
180+
}
181+
182+
@Test
183+
public void testCleanupOmissionForPossibleInconsistentStateException() throws Exception {
184+
testStoringFailureHandling(new PossibleInconsistentStateException(), 0);
185+
}
186+
187+
private void testStoringFailureHandling(Exception failure, int expectedCleanupCalls)
188+
throws Exception {
189+
final JobVertexID jobVertexID1 = new JobVertexID();
190+
191+
final ExecutionGraph graph =
192+
new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
193+
.addJobVertex(jobVertexID1)
194+
.build();
195+
196+
final ExecutionVertex vertex = graph.getJobVertex(jobVertexID1).getTaskVertices()[0];
197+
final ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
198+
199+
final StandaloneCheckpointIDCounter checkpointIDCounter =
200+
new StandaloneCheckpointIDCounter();
201+
202+
final ManuallyTriggeredScheduledExecutor manuallyTriggeredScheduledExecutor =
203+
new ManuallyTriggeredScheduledExecutor();
204+
205+
final CompletedCheckpointStore completedCheckpointStore =
206+
new FailingCompletedCheckpointStore(failure);
207+
208+
final AtomicInteger cleanupCallCount = new AtomicInteger(0);
209+
final CheckpointCoordinator checkpointCoordinator =
210+
new CheckpointCoordinatorBuilder()
211+
.setExecutionGraph(graph)
212+
.setCheckpointIDCounter(checkpointIDCounter)
213+
.setCheckpointsCleaner(
214+
new CheckpointsCleaner() {
215+
216+
private static final long serialVersionUID =
217+
2029876992397573325L;
218+
219+
@Override
220+
public void cleanCheckpointOnFailedStoring(
221+
CompletedCheckpoint completedCheckpoint,
222+
Executor executor) {
223+
cleanupCallCount.incrementAndGet();
224+
super.cleanCheckpointOnFailedStoring(
225+
completedCheckpoint, executor);
226+
}
227+
})
228+
.setCompletedCheckpointStore(completedCheckpointStore)
229+
.setTimer(manuallyTriggeredScheduledExecutor)
230+
.build();
231+
checkpointCoordinator.triggerSavepoint(tmpFolder.newFolder().getAbsolutePath());
232+
manuallyTriggeredScheduledExecutor.triggerAll();
233+
234+
try {
235+
checkpointCoordinator.receiveAcknowledgeMessage(
236+
new AcknowledgeCheckpoint(
237+
graph.getJobID(), attemptId, checkpointIDCounter.getLast()),
238+
"unknown location");
239+
fail("CheckpointException should have been thrown.");
240+
} catch (CheckpointException e) {
241+
assertThat(
242+
e.getCheckpointFailureReason(),
243+
is(CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE));
244+
}
245+
246+
assertThat(cleanupCallCount.get(), is(expectedCleanupCalls));
247+
}
248+
163249
private static final class FailingCompletedCheckpointStore implements CompletedCheckpointStore {
164250

251+
private final Exception addCheckpointFailure;
252+
253+
public FailingCompletedCheckpointStore(Exception addCheckpointFailure) {
254+
this.addCheckpointFailure = addCheckpointFailure;
255+
}
256+
165257
@Override
166258
public void recover() throws Exception {
167259
throw new UnsupportedOperationException("Not implemented.");
@@ -173,7 +265,7 @@ public void addCheckpoint(
173265
CheckpointsCleaner checkpointsCleaner,
174266
Runnable postCleanup)
175267
throws Exception {
176-
throw new Exception("The failing completed checkpoint store failed again... :-(");
268+
throw addCheckpointFailure;
177269
}
178270

179271
@Override

flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -695,6 +695,12 @@ public CheckpointCoordinatorBuilder setCoordinatorsToCheckpoint(
695695
return this;
696696
}
697697

698+
public CheckpointCoordinatorBuilder setCheckpointsCleaner(
699+
CheckpointsCleaner checkpointsCleaner) {
700+
this.checkpointsCleaner = checkpointsCleaner;
701+
return this;
702+
}
703+
698704
public CheckpointCoordinatorBuilder setCheckpointIDCounter(
699705
CheckpointIDCounter checkpointIDCounter) {
700706
this.checkpointIDCounter = checkpointIDCounter;

0 commit comments

Comments
 (0)