From 41f2daa81345559942352164185d991b5478f172 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Fri, 16 Apr 2021 10:41:44 +0200 Subject: [PATCH] [hotfix] Do not use ExecutorService.submit since it can swallow exceptions This commit changes the KubernetesLeaderElector to use ExecutorService.execute instead of submit which ensures that potential exceptions are forwarded to the fatal uncaught exeception handler. --- .../resources/KubernetesLeaderElector.java | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java index 82a0bf9cfb641..0b4d94122ee2f 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java @@ -53,6 +53,8 @@ public class KubernetesLeaderElector { @VisibleForTesting public static final String LEADER_ANNOTATION_KEY = "control-plane.alpha.kubernetes.io/leader"; + private final Object lock = new Object(); + private final ExecutorService executorService = Executors.newSingleThreadExecutor( new ExecutorThreadFactory("KubernetesLeaderElector-ExecutorService")); @@ -92,11 +94,20 @@ public KubernetesLeaderElector( } public void run() { - executorService.submit(internalLeaderElector::run); + synchronized (lock) { + if (executorService.isShutdown()) { + LOG.debug( + "Ignoring KubernetesLeaderElector.run call because the leader elector has already been shut down."); + } else { + executorService.execute(internalLeaderElector::run); + } + } } public void stop() { - executorService.shutdownNow(); + synchronized (lock) { + executorService.shutdownNow(); + } } public static boolean hasLeadership(KubernetesConfigMap configMap, String lockIdentity) {