Skip to content

Commit 6be9668

Browse files
committed
[FLINK-22431] Add information when and why the AdaptiveScheduler restarts or fails jobs
This commit adds info log statements to tell the user when and why it restarts or fails a job. This closes #15736.
1 parent fb31e28 commit 6be9668

File tree

5 files changed

+17
-14
lines changed

5 files changed

+17
-14
lines changed

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1058,7 +1058,7 @@ public Executing.FailureResult howToHandleFailure(Throwable failure) {
10581058
restartBackoffTimeStrategy.notifyFailure(failure);
10591059
if (restartBackoffTimeStrategy.canRestart()) {
10601060
return Executing.FailureResult.canRestart(
1061-
Duration.ofMillis(restartBackoffTimeStrategy.getBackoffTime()));
1061+
failure, Duration.ofMillis(restartBackoffTimeStrategy.getBackoffTime()));
10621062
} else {
10631063
return Executing.FailureResult.canNotRestart(
10641064
new JobException(

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -85,12 +85,14 @@ private void handleAnyFailure(Throwable cause) {
8585
final FailureResult failureResult = context.howToHandleFailure(cause);
8686

8787
if (failureResult.canRestart()) {
88+
getLogger().info("Restarting job.", failureResult.getFailureCause());
8889
context.goToRestarting(
8990
getExecutionGraph(),
9091
getExecutionGraphHandler(),
9192
getOperatorCoordinatorHandler(),
9293
failureResult.getBackoffTime());
9394
} else {
95+
getLogger().info("Failing job.", failureResult.getFailureCause());
9496
context.goToFailing(
9597
getExecutionGraph(),
9698
getExecutionGraphHandler(),
@@ -281,9 +283,9 @@ CompletableFuture<String> goToStopWithSavepoint(
281283
static final class FailureResult {
282284
@Nullable private final Duration backoffTime;
283285

284-
@Nullable private final Throwable failureCause;
286+
private final Throwable failureCause;
285287

286-
private FailureResult(@Nullable Duration backoffTime, @Nullable Throwable failureCause) {
288+
private FailureResult(Throwable failureCause, @Nullable Duration backoffTime) {
287289
this.backoffTime = backoffTime;
288290
this.failureCause = failureCause;
289291
}
@@ -299,20 +301,18 @@ Duration getBackoffTime() {
299301
}
300302

301303
Throwable getFailureCause() {
302-
Preconditions.checkState(
303-
failureCause != null,
304-
"Failure result must not be restartable to return a failure cause.");
305304
return failureCause;
306305
}
307306

308307
/**
309308
* Creates a FailureResult which allows to restart the job.
310309
*
310+
* @param failureCause failureCause for restarting the job
311311
* @param backoffTime backoffTime to wait before restarting the job
312312
* @return FailureResult which allows to restart the job
313313
*/
314-
static FailureResult canRestart(Duration backoffTime) {
315-
return new FailureResult(backoffTime, null);
314+
static FailureResult canRestart(Throwable failureCause, Duration backoffTime) {
315+
return new FailureResult(failureCause, backoffTime);
316316
}
317317

318318
/**
@@ -322,7 +322,7 @@ static FailureResult canRestart(Duration backoffTime) {
322322
* @return FailureResult which does not allow to restart the job
323323
*/
324324
static FailureResult canNotRestart(Throwable failureCause) {
325-
return new FailureResult(null, failureCause);
325+
return new FailureResult(failureCause, null);
326326
}
327327
}
328328

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,12 +179,14 @@ private void handleAnyFailure(Throwable cause) {
179179
final Executing.FailureResult failureResult = context.howToHandleFailure(cause);
180180

181181
if (failureResult.canRestart()) {
182+
getLogger().info("Restarting job.", failureResult.getFailureCause());
182183
context.goToRestarting(
183184
getExecutionGraph(),
184185
getExecutionGraphHandler(),
185186
getOperatorCoordinatorHandler(),
186187
failureResult.getBackoffTime());
187188
} else {
189+
getLogger().info("Failing job.", failureResult.getFailureCause());
188190
context.goToFailing(
189191
getExecutionGraph(),
190192
getExecutionGraphHandler(),

flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ public void testRecoverableGlobalFailureTransitionsToRestarting() throws Excepti
140140
ctx.setExpectRestarting(
141141
(restartingArguments ->
142142
assertThat(restartingArguments.getBackoffTime(), is(duration))));
143-
ctx.setHowToHandleFailure((t) -> Executing.FailureResult.canRestart(duration));
143+
ctx.setHowToHandleFailure((t) -> Executing.FailureResult.canRestart(t, duration));
144144
exec.handleGlobalFailure(new RuntimeException("Recoverable error"));
145145
}
146146
}
@@ -234,7 +234,8 @@ public void testFailureReportedViaUpdateTaskExecutionStateCausesRestart() throws
234234
new ExecutingStateBuilder()
235235
.setExecutionGraph(returnsFailedStateExecutionGraph)
236236
.build(ctx);
237-
ctx.setHowToHandleFailure((ign) -> Executing.FailureResult.canRestart(Duration.ZERO));
237+
ctx.setHowToHandleFailure(
238+
(throwable) -> Executing.FailureResult.canRestart(throwable, Duration.ZERO));
238239
ctx.setExpectRestarting(assertNonNull());
239240

240241
exec.updateTaskExecutionState(createFailingStateTransition());

flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ public void testRestartOnGlobalFailureIfRestartConfigured() throws Exception {
174174
StopWithSavepoint sws = createStopWithSavepoint(ctx);
175175
ctx.setStopWithSavepoint(sws);
176176
ctx.setHowToHandleFailure(
177-
(ignore) -> Executing.FailureResult.canRestart(Duration.ZERO));
177+
(throwable) -> Executing.FailureResult.canRestart(throwable, Duration.ZERO));
178178

179179
ctx.setExpectRestarting(assertNonNull());
180180

@@ -229,7 +229,7 @@ public void testRestartingOnUpdateTaskExecutionStateWithRestart() throws Excepti
229229
createStopWithSavepoint(ctx, new StateTrackingMockExecutionGraph());
230230
ctx.setStopWithSavepoint(sws);
231231
ctx.setHowToHandleFailure(
232-
(ignore) -> Executing.FailureResult.canRestart(Duration.ZERO));
232+
(throwable) -> Executing.FailureResult.canRestart(throwable, Duration.ZERO));
233233

234234
ctx.setExpectRestarting(assertNonNull());
235235

@@ -277,7 +277,7 @@ public void testRestartOnTaskFailureAfterSavepointCompletion() throws Exception
277277
ctx.setStopWithSavepoint(sws);
278278

279279
ctx.setHowToHandleFailure(
280-
(ignore) -> Executing.FailureResult.canRestart(Duration.ZERO));
280+
(throwable) -> Executing.FailureResult.canRestart(throwable, Duration.ZERO));
281281

282282
ctx.setExpectRestarting(assertNonNull());
283283

0 commit comments

Comments
 (0)