Skip to content

Commit a0bfff8

Browse files
committed
[FLINK-22494][runtime] Adds PossibleInconsistentStateException handling to CheckpointCoordinator
1 parent 5eeed9d commit a0bfff8

File tree

4 files changed

+111
-4
lines changed

4 files changed

+111
-4
lines changed

flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
3939
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
4040
import org.apache.flink.runtime.operators.coordination.OperatorInfo;
41+
import org.apache.flink.runtime.persistence.PossibleInconsistentStateException;
4142
import org.apache.flink.runtime.state.CheckpointStorageCoordinatorView;
4243
import org.apache.flink.runtime.state.CheckpointStorageLocation;
4344
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
@@ -1210,8 +1211,16 @@ private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint)
12101211
completedCheckpointStore.addCheckpoint(
12111212
completedCheckpoint, checkpointsCleaner, this::scheduleTriggerRequest);
12121213
} catch (Exception exception) {
1213-
// we failed to store the completed checkpoint. Let's clean up
1214-
checkpointsCleaner.cleanCheckpointOnFailedStoring(completedCheckpoint, executor);
1214+
if (exception instanceof PossibleInconsistentStateException) {
1215+
LOG.warn(
1216+
"An error occurred while writing checkpoint {} to the underlying metadata store. Flink was not able to determine whether the metadata was successfully persisted. The corresponding state located at '{}' won't be discarded and needs to be cleaned up manually.",
1217+
completedCheckpoint.getCheckpointID(),
1218+
completedCheckpoint.getExternalPointer());
1219+
} else {
1220+
// we failed to store the completed checkpoint. Let's clean up
1221+
checkpointsCleaner.cleanCheckpointOnFailedStoring(
1222+
completedCheckpoint, executor);
1223+
}
12151224

12161225
sendAbortedMessages(checkpointId, pendingCheckpoint.getCheckpointTimestamp());
12171226
throw new CheckpointException(

flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.flink.api.common.JobStatus;
2222
import org.apache.flink.api.java.tuple.Tuple2;
23+
import org.apache.flink.runtime.persistence.PossibleInconsistentStateException;
2324
import org.apache.flink.runtime.persistence.ResourceVersion;
2425
import org.apache.flink.runtime.persistence.StateHandleStore;
2526
import org.apache.flink.runtime.state.RetrievableStateHandle;
@@ -212,6 +213,9 @@ public void recover() throws Exception {
212213
* older ones.
213214
*
214215
* @param checkpoint Completed checkpoint to add.
216+
* @throws PossibleInconsistentStateException if adding the checkpoint failed and leaving the
217+
* system in an possibly inconsistent state, i.e. it's uncertain whether the checkpoint
218+
* metadata was fully written to the underlying systems or not.
215219
*/
216220
@Override
217221
public void addCheckpoint(

flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java

Lines changed: 90 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,21 +28,29 @@
2828
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
2929
import org.apache.flink.runtime.jobgraph.OperatorID;
3030
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
31+
import org.apache.flink.runtime.persistence.PossibleInconsistentStateException;
3132
import org.apache.flink.runtime.state.InputChannelStateHandle;
3233
import org.apache.flink.runtime.state.KeyedStateHandle;
3334
import org.apache.flink.runtime.state.OperatorStateHandle;
3435
import org.apache.flink.runtime.state.OperatorStreamStateHandle;
3536
import org.apache.flink.runtime.state.ResultSubpartitionStateHandle;
3637
import org.apache.flink.runtime.state.StreamStateHandle;
38+
import org.apache.flink.util.FlinkRuntimeException;
3739
import org.apache.flink.util.TestLogger;
3840

41+
import org.junit.Rule;
3942
import org.junit.Test;
43+
import org.junit.rules.TemporaryFolder;
4044

4145
import java.util.Collections;
4246
import java.util.List;
47+
import java.util.concurrent.Executor;
48+
import java.util.concurrent.atomic.AtomicInteger;
4349

50+
import static org.hamcrest.CoreMatchers.is;
4451
import static org.junit.Assert.assertEquals;
4552
import static org.junit.Assert.assertFalse;
53+
import static org.junit.Assert.assertThat;
4654
import static org.junit.Assert.assertTrue;
4755
import static org.junit.Assert.fail;
4856
import static org.mockito.Mockito.mock;
@@ -53,6 +61,8 @@
5361
/** Tests for failure of checkpoint coordinator. */
5462
public class CheckpointCoordinatorFailureTest extends TestLogger {
5563

64+
@Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
65+
5666
/**
5767
* Tests that a failure while storing a completed checkpoint in the completed checkpoint store
5868
* will properly fail the originating pending checkpoint and clean upt the completed checkpoint.
@@ -73,7 +83,10 @@ public void testFailingCompletedCheckpointStoreAdd() throws Exception {
7383
new CheckpointCoordinatorBuilder()
7484
.setJobId(jid)
7585
.setTasks(new ExecutionVertex[] {vertex})
76-
.setCompletedCheckpointStore(new FailingCompletedCheckpointStore())
86+
.setCompletedCheckpointStore(
87+
new FailingCompletedCheckpointStore(
88+
new Exception(
89+
"The failing completed checkpoint store failed again... :-(")))
7790
.setTimer(manuallyTriggeredScheduledExecutor)
7891
.build();
7992

@@ -158,8 +171,83 @@ public void testFailingCompletedCheckpointStoreAdd() throws Exception {
158171
.discardState();
159172
}
160173

174+
@Test
175+
public void testCleanupForGenericFailure() throws Exception {
176+
testStoringFailureHandling(new FlinkRuntimeException("Expected exception"), 1);
177+
}
178+
179+
@Test
180+
public void testCleanupOmissionForPossibleInconsistentStateException() throws Exception {
181+
testStoringFailureHandling(new PossibleInconsistentStateException(), 0);
182+
}
183+
184+
private void testStoringFailureHandling(Exception failure, int expectedCleanupCalls)
185+
throws Exception {
186+
final JobID jobId = new JobID();
187+
188+
final ExecutionAttemptID executionAttemptId = new ExecutionAttemptID();
189+
final ExecutionVertex vertex =
190+
CheckpointCoordinatorTestingUtils.mockExecutionVertex(executionAttemptId);
191+
192+
final StandaloneCheckpointIDCounter checkpointIDCounter =
193+
new StandaloneCheckpointIDCounter();
194+
195+
final ManuallyTriggeredScheduledExecutor manuallyTriggeredScheduledExecutor =
196+
new ManuallyTriggeredScheduledExecutor();
197+
198+
final CompletedCheckpointStore completedCheckpointStore =
199+
new FailingCompletedCheckpointStore(failure);
200+
201+
final AtomicInteger cleanupCallCount = new AtomicInteger(0);
202+
final CheckpointCoordinator checkpointCoordinator =
203+
new CheckpointCoordinatorBuilder()
204+
.setJobId(jobId)
205+
.setTasks(new ExecutionVertex[] {vertex})
206+
.setCheckpointIDCounter(checkpointIDCounter)
207+
.setCheckpointsCleaner(
208+
new CheckpointsCleaner() {
209+
210+
private static final long serialVersionUID =
211+
2029876992397573325L;
212+
213+
@Override
214+
public void cleanCheckpointOnFailedStoring(
215+
CompletedCheckpoint completedCheckpoint,
216+
Executor executor) {
217+
cleanupCallCount.incrementAndGet();
218+
super.cleanCheckpointOnFailedStoring(
219+
completedCheckpoint, executor);
220+
}
221+
})
222+
.setCompletedCheckpointStore(completedCheckpointStore)
223+
.setTimer(manuallyTriggeredScheduledExecutor)
224+
.build();
225+
checkpointCoordinator.triggerSavepoint(tmpFolder.newFolder().getAbsolutePath());
226+
manuallyTriggeredScheduledExecutor.triggerAll();
227+
228+
try {
229+
checkpointCoordinator.receiveAcknowledgeMessage(
230+
new AcknowledgeCheckpoint(
231+
jobId, executionAttemptId, checkpointIDCounter.getLast()),
232+
"unknown location");
233+
fail("CheckpointException should have been thrown.");
234+
} catch (CheckpointException e) {
235+
assertThat(
236+
e.getCheckpointFailureReason(),
237+
is(CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE));
238+
}
239+
240+
assertThat(cleanupCallCount.get(), is(expectedCleanupCalls));
241+
}
242+
161243
private static final class FailingCompletedCheckpointStore implements CompletedCheckpointStore {
162244

245+
private final Exception addCheckpointFailure;
246+
247+
public FailingCompletedCheckpointStore(Exception addCheckpointFailure) {
248+
this.addCheckpointFailure = addCheckpointFailure;
249+
}
250+
163251
@Override
164252
public void recover() throws Exception {
165253
throw new UnsupportedOperationException("Not implemented.");
@@ -171,7 +259,7 @@ public void addCheckpoint(
171259
CheckpointsCleaner checkpointsCleaner,
172260
Runnable postCleanup)
173261
throws Exception {
174-
throw new Exception("The failing completed checkpoint store failed again... :-(");
262+
throw addCheckpointFailure;
175263
}
176264

177265
@Override

flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -706,6 +706,12 @@ public CheckpointCoordinatorBuilder setCoordinatorsToCheckpoint(
706706
return this;
707707
}
708708

709+
public CheckpointCoordinatorBuilder setCheckpointsCleaner(
710+
CheckpointsCleaner checkpointsCleaner) {
711+
this.checkpointsCleaner = checkpointsCleaner;
712+
return this;
713+
}
714+
709715
public CheckpointCoordinatorBuilder setCheckpointIDCounter(
710716
CheckpointIDCounter checkpointIDCounter) {
711717
this.checkpointIDCounter = checkpointIDCounter;

0 commit comments

Comments
 (0)