Skip to content

Commit a6c13e7

Browse files
XComptillrohrmann
authored andcommitted
[FLINK-22494][runtime] Refactors CheckpointsCleaner to handle also discardOnFailedStoring
1 parent b5c49ef commit a6c13e7

File tree

2 files changed

+36
-25
lines changed

2 files changed

+36
-25
lines changed

flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1209,20 +1209,7 @@ private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint)
12091209
completedCheckpoint, checkpointsCleaner, this::scheduleTriggerRequest);
12101210
} catch (Exception exception) {
12111211
// we failed to store the completed checkpoint. Let's clean up
1212-
executor.execute(
1213-
new Runnable() {
1214-
@Override
1215-
public void run() {
1216-
try {
1217-
completedCheckpoint.discardOnFailedStoring();
1218-
} catch (Throwable t) {
1219-
LOG.warn(
1220-
"Could not properly discard completed checkpoint {}.",
1221-
completedCheckpoint.getCheckpointID(),
1222-
t);
1223-
}
1224-
}
1225-
});
1212+
checkpointsCleaner.cleanCheckpointOnFailedStoring(completedCheckpoint, executor);
12261213

12271214
sendAbortedMessages(
12281215
pendingCheckpoint.getCheckpointPlan().getTasksToCommitTo(),

flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
package org.apache.flink.runtime.checkpoint;
2020

21+
import org.apache.flink.util.function.RunnableWithException;
22+
2123
import org.slf4j.Logger;
2224
import org.slf4j.LoggerFactory;
2325

@@ -34,6 +36,7 @@
3436
@ThreadSafe
3537
public class CheckpointsCleaner implements Serializable {
3638
private static final Logger LOG = LoggerFactory.getLogger(CheckpointsCleaner.class);
39+
private static final long serialVersionUID = 2545865801947537790L;
3740

3841
private final AtomicInteger numberOfCheckpointsToClean;
3942

@@ -50,23 +53,44 @@ public void cleanCheckpoint(
5053
boolean shouldDiscard,
5154
Runnable postCleanAction,
5255
Executor executor) {
56+
cleanup(
57+
checkpoint,
58+
() -> {
59+
if (shouldDiscard) {
60+
checkpoint.discard();
61+
}
62+
},
63+
postCleanAction,
64+
executor);
65+
}
66+
67+
public void cleanCheckpointOnFailedStoring(
68+
CompletedCheckpoint completedCheckpoint, Executor executor) {
69+
cleanup(
70+
completedCheckpoint,
71+
completedCheckpoint::discardOnFailedStoring,
72+
() -> {},
73+
executor);
74+
}
75+
76+
private void cleanup(
77+
Checkpoint checkpoint,
78+
RunnableWithException cleanupAction,
79+
Runnable postCleanupAction,
80+
Executor executor) {
5381
numberOfCheckpointsToClean.incrementAndGet();
5482
executor.execute(
5583
() -> {
5684
try {
57-
if (shouldDiscard) {
58-
try {
59-
checkpoint.discard();
60-
} catch (Exception e) {
61-
LOG.warn(
62-
"Could not discard completed checkpoint {}.",
63-
checkpoint.getCheckpointID(),
64-
e);
65-
}
66-
}
85+
cleanupAction.run();
86+
} catch (Exception e) {
87+
LOG.warn(
88+
"Could not properly discard completed checkpoint {}.",
89+
checkpoint.getCheckpointID(),
90+
e);
6791
} finally {
6892
numberOfCheckpointsToClean.decrementAndGet();
69-
postCleanAction.run();
93+
postCleanupAction.run();
7094
}
7195
});
7296
}

0 commit comments

Comments
 (0)