diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java index d225275e3f674..68caf9f387e8a 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java @@ -23,12 +23,12 @@ import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap; import org.apache.flink.kubernetes.kubeclient.resources.KubernetesException; import org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector; +import org.apache.flink.runtime.persistence.PossibleInconsistentStateException; import org.apache.flink.runtime.persistence.RetrievableStateStorageHelper; import org.apache.flink.runtime.persistence.StateHandleStore; import org.apache.flink.runtime.persistence.StringResourceVersion; import org.apache.flink.runtime.state.RetrievableStateHandle; import org.apache.flink.util.ExceptionUtils; -import org.apache.flink.util.InstantiationUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,6 +48,8 @@ import java.util.function.Predicate; import java.util.stream.Collectors; +import static org.apache.flink.runtime.util.StateHandleStoreUtils.deserialize; +import static org.apache.flink.runtime.util.StateHandleStoreUtils.serializeOrDiscard; import static org.apache.flink.util.Preconditions.checkNotNull; /** @@ -114,21 +116,28 @@ public KubernetesStateHandleStore( * @param key Key in ConfigMap * @param state State to be added * @throws AlreadyExistException if the name already exists + * @throws PossibleInconsistentStateException if the write-to-Kubernetes operation failed. This + * indicates that it's not clear whether the new state was successfully written to + * Kubernetes or not. No state was discarded. Proper error handling has to be applied on the + * caller's side. * @throws Exception if persisting state or writing state handle failed */ @Override - public RetrievableStateHandle addAndLock(String key, T state) throws Exception { + public RetrievableStateHandle addAndLock(String key, T state) + throws PossibleInconsistentStateException, Exception { checkNotNull(key, "Key in ConfigMap."); checkNotNull(state, "State."); final RetrievableStateHandle storeHandle = storage.store(state); - boolean success = false; + final byte[] serializedStoreHandle = serializeOrDiscard(storeHandle); + // initialize flag to serve the failure case + boolean discardState = true; try { - final byte[] serializedStoreHandle = InstantiationUtil.serializeObject(storeHandle); - success = - kubeClient + // a successful operation will result in the state not being discarded + discardState = + !kubeClient .checkAndUpdateConfigMap( configMapName, c -> { @@ -151,14 +160,20 @@ public RetrievableStateHandle addAndLock(String key, T state) throws Exceptio .get(); return storeHandle; } catch (Exception ex) { + final Optional possibleInconsistentStateException = + ExceptionUtils.findThrowable(ex, PossibleInconsistentStateException.class); + if (possibleInconsistentStateException.isPresent()) { + // it's unclear whether the state handle metadata was written to the ConfigMap - + // hence, we don't discard the data + discardState = false; + throw possibleInconsistentStateException.get(); + } + throw ExceptionUtils.findThrowable(ex, AlreadyExistException.class) .orElseThrow(() -> ex); } finally { - if (!success) { - // Cleanup the state handle if it was not written to ConfigMap. - if (storeHandle != null) { - storeHandle.discardState(); - } + if (discardState) { + storeHandle.discardState(); } } } @@ -173,6 +188,9 @@ public RetrievableStateHandle addAndLock(String key, T state) throws Exceptio * @param resourceVersion resource version when checking existence via {@link #exists}. * @param state State to be added * @throws NotExistException if the name does not exist + * @throws PossibleInconsistentStateException if a failure occurred during the update operation. + * It's unclear whether the operation actually succeeded or not. No state was discarded. The + * method's caller should handle this case properly. * @throws Exception if persisting state or writing state handle failed */ @Override @@ -185,11 +203,13 @@ public void replace(String key, StringResourceVersion resourceVersion, T state) final RetrievableStateHandle newStateHandle = storage.store(state); - boolean success = false; + final byte[] serializedStateHandle = serializeOrDiscard(newStateHandle); + // initialize flags to serve the failure case + boolean discardOldState = false; + boolean discardNewState = true; try { - final byte[] serializedStoreHandle = InstantiationUtil.serializeObject(newStateHandle); - success = + boolean success = kubeClient .checkAndUpdateConfigMap( configMapName, @@ -202,7 +222,7 @@ public void replace(String key, StringResourceVersion resourceVersion, T state) .put( key, encodeStateHandle( - serializedStoreHandle)); + serializedStateHandle)); } else { throw new CompletionException( getKeyNotExistException(key)); @@ -212,14 +232,29 @@ public void replace(String key, StringResourceVersion resourceVersion, T state) return Optional.empty(); }) .get(); + + // swap subject for deletion in case of success + discardOldState = success; + discardNewState = !success; } catch (Exception ex) { + final Optional possibleInconsistentStateException = + ExceptionUtils.findThrowable(ex, PossibleInconsistentStateException.class); + if (possibleInconsistentStateException.isPresent()) { + // it's unclear whether the state handle metadata was written to the ConfigMap - + // hence, we don't discard any data + discardNewState = false; + throw possibleInconsistentStateException.get(); + } + throw ExceptionUtils.findThrowable(ex, NotExistException.class).orElseThrow(() -> ex); } finally { - if (success) { - oldStateHandle.discardState(); - } else { + if (discardNewState) { newStateHandle.discardState(); } + + if (discardOldState) { + oldStateHandle.discardState(); + } } } @@ -476,8 +511,7 @@ private RetrievableStateHandle deserializeObject(String content) throws IOExc final byte[] data = Base64.getDecoder().decode(content); try { - return InstantiationUtil.deserializeObject( - data, Thread.currentThread().getContextClassLoader()); + return deserialize(data); } catch (IOException | ClassNotFoundException e) { throw new IOException( "Failed to deserialize state handle from ConfigMap data " + content + '.', e); diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java index 210de450b4b9a..49f1d0154892b 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java @@ -33,6 +33,7 @@ import org.apache.flink.kubernetes.utils.Constants; import org.apache.flink.kubernetes.utils.KubernetesUtils; import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.persistence.PossibleInconsistentStateException; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.ExecutorUtils; import org.apache.flink.util.FlinkRuntimeException; @@ -290,13 +291,17 @@ public CompletableFuture checkAndUpdateConfigMap( Throwable throwable) { LOG.debug( - "Failed to update ConfigMap {} with data {} because of concurrent " - + "modifications. Trying again.", + "Failed to update ConfigMap {} with data {}. Trying again.", configMap .getName(), configMap .getData()); - throw throwable; + // the + // client + // implementation does not expose the different kind of error causes to a degree that we could do a more fine-grained error handling here + throw new CompletionException( + new PossibleInconsistentStateException( + throwable)); } return true; }) diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkKubeClient.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkKubeClient.java index 7eb485bcfcdf5..88ecfe189bdf2 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkKubeClient.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkKubeClient.java @@ -24,6 +24,7 @@ import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod; import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService; import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch; +import org.apache.flink.runtime.persistence.PossibleInconsistentStateException; import java.io.File; import java.util.List; @@ -151,7 +152,12 @@ KubernetesLeaderElector createLeaderElector( * one. If the returned optional is empty, we will not do the update. * @return Return the ConfigMap update future. The boolean result indicates whether the * ConfigMap is updated. The returned future will be completed exceptionally if the - * ConfigMap does not exist. + * ConfigMap does not exist. A failure during the update operation will result in the future + * failing with a {@link PossibleInconsistentStateException} indicating that no clear + * decision can be made on whether the update was successful or not. The {@code + * PossibleInconsistentStateException} not being present indicates that the failure happened + * before writing the updated ConfigMap to Kubernetes. For the latter case, it can be + * assumed that the ConfigMap was not updated. */ CompletableFuture checkAndUpdateConfigMap( String configMapName, diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java index b00c56d0b1d60..f66d62ca70a4a 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java @@ -31,10 +31,16 @@ import io.fabric8.kubernetes.api.model.ServicePortBuilder; import io.fabric8.kubernetes.api.model.ServiceStatus; import io.fabric8.kubernetes.api.model.ServiceStatusBuilder; +import io.fabric8.mockwebserver.dsl.DelayPathable; +import io.fabric8.mockwebserver.dsl.HttpMethodable; +import io.fabric8.mockwebserver.dsl.MockServerExpectation; +import io.fabric8.mockwebserver.dsl.ReturnOrWebsocketable; +import io.fabric8.mockwebserver.dsl.TimesOnceableOrHttpHeaderable; import javax.annotation.Nullable; import java.util.Collections; +import java.util.function.Function; /** * Base class for {@link KubernetesClusterDescriptorTest} and {@link @@ -53,14 +59,35 @@ protected void mockExpectedServiceFromServerSide(Service expectedService) { } protected void mockCreateConfigMapAlreadyExisting(ConfigMap configMap) { - final String path = String.format("/api/v1/namespaces/%s/configmaps", NAMESPACE); + final String path = + String.format( + "/api/%s/namespaces/%s/configmaps", + configMap.getApiVersion(), configMap.getMetadata().getNamespace()); server.expect().post().withPath(path).andReturn(500, configMap).always(); } + protected void mockGetConfigMapFailed(ConfigMap configMap) { + mockConfigMapRequest(configMap, HttpMethodable::get); + } + protected void mockReplaceConfigMapFailed(ConfigMap configMap) { - final String name = configMap.getMetadata().getName(); - final String path = String.format("/api/v1/namespaces/%s/configmaps/%s", NAMESPACE, name); - server.expect().put().withPath(path).andReturn(500, configMap).always(); + mockConfigMapRequest(configMap, HttpMethodable::put); + } + + private void mockConfigMapRequest( + ConfigMap configMap, + Function< + MockServerExpectation, + DelayPathable< + ReturnOrWebsocketable>>> + methodTypeSetter) { + final String path = + String.format( + "/api/%s/namespaces/%s/configmaps/%s", + configMap.getApiVersion(), + configMap.getMetadata().getNamespace(), + configMap.getMetadata().getName()); + methodTypeSetter.apply(server.expect()).withPath(path).andReturn(500, configMap).always(); } protected Service buildExternalServiceWithLoadBalancer( diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/MixedKubernetesServer.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/MixedKubernetesServer.java index acdf697c04e8d..c574563261003 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/MixedKubernetesServer.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/MixedKubernetesServer.java @@ -50,7 +50,7 @@ public MixedKubernetesServer(boolean https, boolean crudMode) { } public void before() { - HashMap> response = new HashMap<>(); + final HashMap> response = new HashMap<>(); mock = crudMode ? new KubernetesMockServer( @@ -77,6 +77,10 @@ public RecordedRequest takeRequest(long timeout, TimeUnit unit) throws Exception return mockWebServer.takeRequest(timeout, unit); } + public int getRequestCount() { + return mockWebServer.getRequestCount(); + } + public MockServerExpectation expect() { return mock.expect(); } diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreITCase.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreITCase.java index 009dc9815bcb1..75da66c890514 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreITCase.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreITCase.java @@ -68,8 +68,8 @@ public void testMultipleKubernetesStateHandleStores() throws Exception { new TestingLeaderCallbackHandler[leaderNum]; @SuppressWarnings("unchecked") - final KubernetesStateHandleStore[] stateHandleStores = - new KubernetesStateHandleStore[leaderNum]; + final KubernetesStateHandleStore[] + stateHandleStores = new KubernetesStateHandleStore[leaderNum]; try { for (int i = 0; i < leaderNum; i++) { @@ -103,17 +103,19 @@ public void testMultipleKubernetesStateHandleStores() throws Exception { if (leaderCallbackHandlers[i].getLockIdentity().equals(lockIdentity)) { expectedState = (long) i; } - stateHandleStores[i].addAndLock(KEY, (long) i); + stateHandleStores[i].addAndLock( + KEY, new TestingLongStateHandleHelper.LongStateHandle(i)); } // Only the leader could add successfully assertThat(expectedState, is(notNullValue())); assertThat(stateHandleStores[0].getAllAndLock().size(), is(1)); assertThat( - stateHandleStores[0].getAllAndLock().get(0).f0.retrieveState(), + stateHandleStores[0].getAllAndLock().get(0).f0.retrieveState().getValue(), is(expectedState)); assertThat(stateHandleStores[0].getAllAndLock().get(0).f1, is(KEY)); } finally { + TestingLongStateHandleHelper.clearGlobalState(); // Cleanup the resources for (int i = 0; i < leaderNum; i++) { if (leaderElectors[i] != null) { diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java index 339291d8e793f..b1da85b9ec884 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java @@ -22,10 +22,11 @@ import org.apache.flink.core.testutils.FlinkMatchers; import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient; import org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.persistence.PossibleInconsistentStateException; import org.apache.flink.runtime.persistence.StateHandleStore; import org.apache.flink.runtime.persistence.StringResourceVersion; import org.apache.flink.runtime.persistence.TestingLongStateHandleHelper; -import org.apache.flink.runtime.persistence.TestingLongStateHandleHelper.LongRetrievableStateHandle; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.function.FunctionUtils; @@ -36,6 +37,7 @@ import java.util.Comparator; import java.util.List; import java.util.function.Predicate; +import java.util.stream.Collectors; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.is; @@ -49,14 +51,16 @@ public class KubernetesStateHandleStoreTest extends KubernetesHighAvailabilityTe private static final String PREFIX = "test-prefix-"; private final String key = PREFIX + JobID.generate(); private final Predicate filter = k -> k.startsWith(PREFIX); - private final Long state = 12345L; + private final TestingLongStateHandleHelper.LongStateHandle state = + new TestingLongStateHandleHelper.LongStateHandle(12345L); - private TestingLongStateHandleHelper longStateStorage; + private final TestingLongStateHandleHelper longStateStorage = + new TestingLongStateHandleHelper(); @Before public void setup() { super.setup(); - longStateStorage = new TestingLongStateHandleHelper(); + TestingLongStateHandleHelper.clearGlobalState(); } @Test @@ -67,13 +71,15 @@ public void testAdd() throws Exception { () -> { leaderCallbackGrantLeadership(); - final KubernetesStateHandleStore store = - new KubernetesStateHandleStore<>( - flinkKubeClient, - LEADER_CONFIGMAP_NAME, - longStateStorage, - filter, - LOCK_IDENTITY); + final KubernetesStateHandleStore< + TestingLongStateHandleHelper.LongStateHandle> + store = + new KubernetesStateHandleStore<>( + flinkKubeClient, + LEADER_CONFIGMAP_NAME, + longStateStorage, + filter, + LOCK_IDENTITY); store.addAndLock(key, state); assertThat(store.getAllAndLock().size(), is(1)); assertThat(store.getAndLock(key).retrieveState(), is(state)); @@ -92,13 +98,15 @@ public void testAddAlreadyExistingKey() throws Exception { getLeaderConfigMap().getData().put(key, "existing data"); - final KubernetesStateHandleStore store = - new KubernetesStateHandleStore<>( - flinkKubeClient, - LEADER_CONFIGMAP_NAME, - longStateStorage, - filter, - LOCK_IDENTITY); + final KubernetesStateHandleStore< + TestingLongStateHandleHelper.LongStateHandle> + store = + new KubernetesStateHandleStore<>( + flinkKubeClient, + LEADER_CONFIGMAP_NAME, + longStateStorage, + filter, + LOCK_IDENTITY); try { store.addAndLock(key, state); @@ -110,31 +118,69 @@ public void testAddAlreadyExistingKey() throws Exception { key, LEADER_CONFIGMAP_NAME); assertThat(ex, FlinkMatchers.containsMessage(msg)); } - assertThat(longStateStorage.getStateHandles().size(), is(1)); + assertThat(TestingLongStateHandleHelper.getGlobalStorageSize(), is(1)); assertThat( - longStateStorage - .getStateHandles() - .get(0) - .getNumberOfDiscardCalls(), + TestingLongStateHandleHelper + .getDiscardCallCountForStateHandleByIndex(0), is(1)); }); } }; } + @Test + public void testAddWithPossiblyInconsistentStateHandling() throws Exception { + new Context() { + { + runTest( + () -> { + leaderCallbackGrantLeadership(); + + final FlinkKubeClient anotherFlinkKubeClient = + createFlinkKubeClientBuilder() + .setCheckAndUpdateConfigMapFunction( + (configMapName, function) -> + FutureUtils.completedExceptionally( + new PossibleInconsistentStateException())) + .build(); + final KubernetesStateHandleStore< + TestingLongStateHandleHelper.LongStateHandle> + store = + new KubernetesStateHandleStore<>( + anotherFlinkKubeClient, + LEADER_CONFIGMAP_NAME, + longStateStorage, + filter, + LOCK_IDENTITY); + + try { + store.addAndLock(key, state); + fail("PossibleInconsistentStateException should have been thrown."); + } catch (PossibleInconsistentStateException ex) { + // PossibleInconsistentStateException is expected + } + assertThat(TestingLongStateHandleHelper.getGlobalStorageSize(), is(1)); + assertThat(TestingLongStateHandleHelper.getGlobalDiscardCount(), is(0)); + }); + } + }; + } + @Test public void testAddFailedWhenConfigMapNotExistAndDiscardState() throws Exception { new Context() { { runTest( () -> { - final KubernetesStateHandleStore store = - new KubernetesStateHandleStore<>( - flinkKubeClient, - LEADER_CONFIGMAP_NAME, - longStateStorage, - filter, - LOCK_IDENTITY); + final KubernetesStateHandleStore< + TestingLongStateHandleHelper.LongStateHandle> + store = + new KubernetesStateHandleStore<>( + flinkKubeClient, + LEADER_CONFIGMAP_NAME, + longStateStorage, + filter, + LOCK_IDENTITY); try { store.addAndLock(key, state); @@ -146,12 +192,10 @@ public void testAddFailedWhenConfigMapNotExistAndDiscardState() throws Exception LEADER_CONFIGMAP_NAME); assertThat(ex, FlinkMatchers.containsMessage(msg)); } - assertThat(longStateStorage.getStateHandles().size(), is(1)); + assertThat(TestingLongStateHandleHelper.getGlobalStorageSize(), is(1)); assertThat( - longStateStorage - .getStateHandles() - .get(0) - .getNumberOfDiscardCalls(), + TestingLongStateHandleHelper + .getDiscardCallCountForStateHandleByIndex(0), is(1)); }); } @@ -166,17 +210,20 @@ public void testReplace() throws Exception { () -> { leaderCallbackGrantLeadership(); - final KubernetesStateHandleStore store = - new KubernetesStateHandleStore<>( - flinkKubeClient, - LEADER_CONFIGMAP_NAME, - longStateStorage, - filter, - LOCK_IDENTITY); + final KubernetesStateHandleStore< + TestingLongStateHandleHelper.LongStateHandle> + store = + new KubernetesStateHandleStore<>( + flinkKubeClient, + LEADER_CONFIGMAP_NAME, + longStateStorage, + filter, + LOCK_IDENTITY); store.addAndLock(key, state); - final Long newState = 23456L; + final TestingLongStateHandleHelper.LongStateHandle newState = + new TestingLongStateHandleHelper.LongStateHandle(23456L); final StringResourceVersion resourceVersion = store.exists(key); store.replace(key, resourceVersion, newState); @@ -195,14 +242,17 @@ public void testReplaceWithKeyNotExist() throws Exception { () -> { leaderCallbackGrantLeadership(); - final KubernetesStateHandleStore store = - new KubernetesStateHandleStore<>( - flinkKubeClient, - LEADER_CONFIGMAP_NAME, - longStateStorage, - filter, - LOCK_IDENTITY); - final Long newState = 23456L; + final KubernetesStateHandleStore< + TestingLongStateHandleHelper.LongStateHandle> + store = + new KubernetesStateHandleStore<>( + flinkKubeClient, + LEADER_CONFIGMAP_NAME, + longStateStorage, + filter, + LOCK_IDENTITY); + final TestingLongStateHandleHelper.LongStateHandle newState = + new TestingLongStateHandleHelper.LongStateHandle(23456L); try { assertThat( @@ -230,14 +280,17 @@ public void testReplaceWithNoLeadershipAndDiscardState() throws Exception { () -> { leaderCallbackGrantLeadership(); - final KubernetesStateHandleStore store = - new KubernetesStateHandleStore<>( - flinkKubeClient, - LEADER_CONFIGMAP_NAME, - longStateStorage, - filter, - LOCK_IDENTITY); - final Long newState = 23456L; + final KubernetesStateHandleStore< + TestingLongStateHandleHelper.LongStateHandle> + store = + new KubernetesStateHandleStore<>( + flinkKubeClient, + LEADER_CONFIGMAP_NAME, + longStateStorage, + filter, + LOCK_IDENTITY); + final TestingLongStateHandleHelper.LongStateHandle newState = + new TestingLongStateHandleHelper.LongStateHandle(23456L); store.addAndLock(key, state); // Lost leadership @@ -253,18 +306,14 @@ public void testReplaceWithNoLeadershipAndDiscardState() throws Exception { // The state do not change assertThat(store.getAndLock(key).retrieveState(), is(state)); - assertThat(longStateStorage.getStateHandles().size(), is(2)); + assertThat(TestingLongStateHandleHelper.getGlobalStorageSize(), is(2)); assertThat( - longStateStorage - .getStateHandles() - .get(0) - .getNumberOfDiscardCalls(), + TestingLongStateHandleHelper + .getDiscardCallCountForStateHandleByIndex(0), is(0)); assertThat( - longStateStorage - .getStateHandles() - .get(1) - .getNumberOfDiscardCalls(), + TestingLongStateHandleHelper + .getDiscardCallCountForStateHandleByIndex(1), is(1)); }); } @@ -280,13 +329,15 @@ public void testReplaceFailedAndDiscardState() throws Exception { () -> { leaderCallbackGrantLeadership(); - final KubernetesStateHandleStore store = - new KubernetesStateHandleStore<>( - flinkKubeClient, - LEADER_CONFIGMAP_NAME, - longStateStorage, - filter, - LOCK_IDENTITY); + final KubernetesStateHandleStore< + TestingLongStateHandleHelper.LongStateHandle> + store = + new KubernetesStateHandleStore<>( + flinkKubeClient, + LEADER_CONFIGMAP_NAME, + longStateStorage, + filter, + LOCK_IDENTITY); store.addAndLock(key, state); final FlinkKubeClient anotherFlinkKubeClient = @@ -296,15 +347,18 @@ public void testReplaceFailedAndDiscardState() throws Exception { throw updateException; }) .build(); - final KubernetesStateHandleStore anotherStore = - new KubernetesStateHandleStore<>( - anotherFlinkKubeClient, - LEADER_CONFIGMAP_NAME, - longStateStorage, - filter, - LOCK_IDENTITY); - - final Long newState = 23456L; + final KubernetesStateHandleStore< + TestingLongStateHandleHelper.LongStateHandle> + anotherStore = + new KubernetesStateHandleStore<>( + anotherFlinkKubeClient, + LEADER_CONFIGMAP_NAME, + longStateStorage, + filter, + LOCK_IDENTITY); + + final TestingLongStateHandleHelper.LongStateHandle newState = + new TestingLongStateHandleHelper.LongStateHandle(23456L); final StringResourceVersion resourceVersion = anotherStore.exists(key); assertThat(resourceVersion.isExisting(), is(true)); try { @@ -318,24 +372,89 @@ public void testReplaceFailedAndDiscardState() throws Exception { // The state do not change assertThat(anotherStore.getAndLock(key).retrieveState(), is(state)); - assertThat(longStateStorage.getStateHandles().size(), is(2)); + assertThat(TestingLongStateHandleHelper.getGlobalStorageSize(), is(2)); assertThat( - longStateStorage - .getStateHandles() - .get(0) - .getNumberOfDiscardCalls(), + TestingLongStateHandleHelper + .getDiscardCallCountForStateHandleByIndex(0), is(0)); assertThat( - longStateStorage - .getStateHandles() - .get(1) - .getNumberOfDiscardCalls(), + TestingLongStateHandleHelper + .getDiscardCallCountForStateHandleByIndex(1), is(1)); }); } }; } + @Test + public void testReplaceFailedWithPossiblyInconsistentState() throws Exception { + final PossibleInconsistentStateException updateException = + new PossibleInconsistentStateException(); + new Context() { + { + runTest( + () -> { + leaderCallbackGrantLeadership(); + + final KubernetesStateHandleStore< + TestingLongStateHandleHelper.LongStateHandle> + store = + new KubernetesStateHandleStore<>( + flinkKubeClient, + LEADER_CONFIGMAP_NAME, + longStateStorage, + filter, + LOCK_IDENTITY); + store.addAndLock(key, state); + + final FlinkKubeClient anotherFlinkKubeClient = + createFlinkKubeClientBuilder() + .setCheckAndUpdateConfigMapFunction( + (configMapName, function) -> + FutureUtils.completedExceptionally( + updateException)) + .build(); + final KubernetesStateHandleStore< + TestingLongStateHandleHelper.LongStateHandle> + anotherStore = + new KubernetesStateHandleStore<>( + anotherFlinkKubeClient, + LEADER_CONFIGMAP_NAME, + longStateStorage, + filter, + LOCK_IDENTITY); + + final StringResourceVersion resourceVersion = anotherStore.exists(key); + assertThat(resourceVersion.isExisting(), is(true)); + try { + anotherStore.replace( + key, + resourceVersion, + new TestingLongStateHandleHelper.LongStateHandle(23456L)); + fail( + "An exception having a PossibleInconsistentStateException as its cause should have been thrown."); + } catch (Exception ex) { + assertThat(ex, is(updateException)); + } + assertThat(anotherStore.getAllAndLock().size(), is(1)); + // The state does not change + assertThat(anotherStore.getAndLock(key).retrieveState(), is(state)); + + assertThat(TestingLongStateHandleHelper.getGlobalStorageSize(), is(2)); + // no state was discarded + assertThat( + TestingLongStateHandleHelper + .getDiscardCallCountForStateHandleByIndex(0), + is(0)); + assertThat( + TestingLongStateHandleHelper + .getDiscardCallCountForStateHandleByIndex(1), + is(0)); + }); + } + }; + } + @Test public void testGetAndExist() throws Exception { new Context() { @@ -344,13 +463,15 @@ public void testGetAndExist() throws Exception { () -> { leaderCallbackGrantLeadership(); - final KubernetesStateHandleStore store = - new KubernetesStateHandleStore<>( - flinkKubeClient, - LEADER_CONFIGMAP_NAME, - longStateStorage, - filter, - LOCK_IDENTITY); + final KubernetesStateHandleStore< + TestingLongStateHandleHelper.LongStateHandle> + store = + new KubernetesStateHandleStore<>( + flinkKubeClient, + LEADER_CONFIGMAP_NAME, + longStateStorage, + filter, + LOCK_IDENTITY); store.addAndLock(key, state); assertThat( store.exists(key), @@ -369,13 +490,15 @@ public void testGetNonExistingKey() throws Exception { () -> { leaderCallbackGrantLeadership(); - final KubernetesStateHandleStore store = - new KubernetesStateHandleStore<>( - flinkKubeClient, - LEADER_CONFIGMAP_NAME, - longStateStorage, - filter, - LOCK_IDENTITY); + final KubernetesStateHandleStore< + TestingLongStateHandleHelper.LongStateHandle> + store = + new KubernetesStateHandleStore<>( + flinkKubeClient, + LEADER_CONFIGMAP_NAME, + longStateStorage, + filter, + LOCK_IDENTITY); final String nonExistingKey = "non-existing-key"; store.addAndLock(key, state); assertThat( @@ -404,26 +527,37 @@ public void testGetAll() throws Exception { () -> { leaderCallbackGrantLeadership(); - final KubernetesStateHandleStore store = - new KubernetesStateHandleStore<>( - flinkKubeClient, - LEADER_CONFIGMAP_NAME, - longStateStorage, - filter, - LOCK_IDENTITY); + final KubernetesStateHandleStore< + TestingLongStateHandleHelper.LongStateHandle> + store = + new KubernetesStateHandleStore<>( + flinkKubeClient, + LEADER_CONFIGMAP_NAME, + longStateStorage, + filter, + LOCK_IDENTITY); final List expected = Arrays.asList(3L, 2L, 1L); for (Long each : expected) { - store.addAndLock(key + each, each); + store.addAndLock( + key + each, + new TestingLongStateHandleHelper.LongStateHandle(each)); } - final Long[] actual = + final TestingLongStateHandleHelper.LongStateHandle[] actual = store.getAllAndLock().stream() .map( FunctionUtils.uncheckedFunction( e -> e.f0.retrieveState())) - .toArray(Long[]::new); + .toArray( + TestingLongStateHandleHelper.LongStateHandle[] + ::new); assertThat( - Arrays.asList(actual), containsInAnyOrder(expected.toArray())); + Arrays.stream(actual) + .map( + TestingLongStateHandleHelper.LongStateHandle + ::getValue) + .collect(Collectors.toList()), + containsInAnyOrder(expected.toArray())); }); } }; @@ -437,13 +571,15 @@ public void testGetAllHandles() throws Exception { () -> { leaderCallbackGrantLeadership(); - final KubernetesStateHandleStore store = - new KubernetesStateHandleStore<>( - flinkKubeClient, - LEADER_CONFIGMAP_NAME, - longStateStorage, - filter, - LOCK_IDENTITY); + final KubernetesStateHandleStore< + TestingLongStateHandleHelper.LongStateHandle> + store = + new KubernetesStateHandleStore<>( + flinkKubeClient, + LEADER_CONFIGMAP_NAME, + longStateStorage, + filter, + LOCK_IDENTITY); final List expected = Arrays.asList(key + 3, key + 2, key + 1); for (String each : expected) { @@ -465,24 +601,23 @@ public void testRemove() throws Exception { runTest( () -> { leaderCallbackGrantLeadership(); - LongRetrievableStateHandle.clearNumberOfGlobalDiscardCalls(); - - final KubernetesStateHandleStore store = - new KubernetesStateHandleStore<>( - flinkKubeClient, - LEADER_CONFIGMAP_NAME, - longStateStorage, - filter, - LOCK_IDENTITY); + + final KubernetesStateHandleStore< + TestingLongStateHandleHelper.LongStateHandle> + store = + new KubernetesStateHandleStore<>( + flinkKubeClient, + LEADER_CONFIGMAP_NAME, + longStateStorage, + filter, + LOCK_IDENTITY); store.addAndLock(key, state); assertThat(store.getAllAndLock().size(), is(1)); assertThat(store.releaseAndTryRemove(key), is(true)); assertThat(store.getAllAndLock().size(), is(0)); // State should also be discarded. - assertThat( - LongRetrievableStateHandle.getNumberOfGlobalDiscardCalls(), - is(1)); + assertThat(TestingLongStateHandleHelper.getGlobalDiscardCount(), is(1)); }); } }; @@ -495,15 +630,16 @@ public void testRemoveFailedShouldNotDiscardState() throws Exception { runTest( () -> { leaderCallbackGrantLeadership(); - LongRetrievableStateHandle.clearNumberOfGlobalDiscardCalls(); - final KubernetesStateHandleStore store = - new KubernetesStateHandleStore<>( - flinkKubeClient, - LEADER_CONFIGMAP_NAME, - longStateStorage, - filter, - LOCK_IDENTITY); + final KubernetesStateHandleStore< + TestingLongStateHandleHelper.LongStateHandle> + store = + new KubernetesStateHandleStore<>( + flinkKubeClient, + LEADER_CONFIGMAP_NAME, + longStateStorage, + filter, + LOCK_IDENTITY); store.addAndLock(key, state); // Lost leadership @@ -516,9 +652,7 @@ public void testRemoveFailedShouldNotDiscardState() throws Exception { assertThat(store.releaseAndTryRemove(key), is(false)); assertThat(store.getAllAndLock().size(), is(1)); - assertThat( - LongRetrievableStateHandle.getNumberOfGlobalDiscardCalls(), - is(0)); + assertThat(TestingLongStateHandleHelper.getGlobalDiscardCount(), is(0)); }); } }; @@ -531,23 +665,24 @@ public void testRemoveAllHandlesAndDiscardState() throws Exception { runTest( () -> { leaderCallbackGrantLeadership(); - LongRetrievableStateHandle.clearNumberOfGlobalDiscardCalls(); - - final KubernetesStateHandleStore store = - new KubernetesStateHandleStore<>( - flinkKubeClient, - LEADER_CONFIGMAP_NAME, - longStateStorage, - filter, - LOCK_IDENTITY); + + final KubernetesStateHandleStore< + TestingLongStateHandleHelper.LongStateHandle> + store = + new KubernetesStateHandleStore<>( + flinkKubeClient, + LEADER_CONFIGMAP_NAME, + longStateStorage, + filter, + LOCK_IDENTITY); store.addAndLock(key, state); - store.addAndLock(key + "1", 2L); + store.addAndLock( + key + "1", + new TestingLongStateHandleHelper.LongStateHandle(2L)); assertThat(store.getAllAndLock().size(), is(2)); store.releaseAndTryRemoveAll(); assertThat(store.getAllAndLock().size(), is(0)); - assertThat( - LongRetrievableStateHandle.getNumberOfGlobalDiscardCalls(), - is(2)); + assertThat(TestingLongStateHandleHelper.getGlobalDiscardCount(), is(2)); }); } }; @@ -560,26 +695,27 @@ public void testRemoveAllHandles() throws Exception { runTest( () -> { leaderCallbackGrantLeadership(); - LongRetrievableStateHandle.clearNumberOfGlobalDiscardCalls(); final String anotherKey = "key-not-with-prefix"; getLeaderConfigMap().getData().put(anotherKey, "value"); - final KubernetesStateHandleStore store = - new KubernetesStateHandleStore<>( - flinkKubeClient, - LEADER_CONFIGMAP_NAME, - longStateStorage, - filter, - LOCK_IDENTITY); + final KubernetesStateHandleStore< + TestingLongStateHandleHelper.LongStateHandle> + store = + new KubernetesStateHandleStore<>( + flinkKubeClient, + LEADER_CONFIGMAP_NAME, + longStateStorage, + filter, + LOCK_IDENTITY); store.addAndLock(key, state); - store.addAndLock(key + "1", 2L); + store.addAndLock( + key + "1", + new TestingLongStateHandleHelper.LongStateHandle(2L)); assertThat(store.getAllAndLock().size(), is(2)); store.clearEntries(); assertThat(store.getAllAndLock().size(), is(0)); - assertThat( - LongRetrievableStateHandle.getNumberOfGlobalDiscardCalls(), - is(0)); + assertThat(TestingLongStateHandleHelper.getGlobalDiscardCount(), is(0)); // Should only remove the key with specified prefix. assertThat( diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java index 22c63127ae261..6ccffe09f05d1 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java @@ -36,8 +36,10 @@ import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap; import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod; import org.apache.flink.kubernetes.kubeclient.resources.NoOpWatchCallbackHandler; +import org.apache.flink.runtime.persistence.PossibleInconsistentStateException; import org.apache.flink.runtime.rest.HttpMethodWrapper; import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.util.ExceptionUtils; import io.fabric8.kubernetes.api.model.ConfigMap; import io.fabric8.kubernetes.api.model.ConfigMapBuilder; @@ -435,6 +437,44 @@ public void testCheckAndUpdateConfigMapWhenConfigMapNotExist() { } } + @Test + public void testCheckAndUpdateConfigMapWhenGetConfigMapFailed() throws Exception { + final int configuredRetries = + flinkConfig.getInteger( + KubernetesConfigOptions.KUBERNETES_TRANSACTIONAL_OPERATION_MAX_RETRIES); + final KubernetesConfigMap configMap = buildTestingConfigMap(); + this.flinkKubeClient.createConfigMap(configMap).get(); + + mockGetConfigMapFailed(configMap.getInternalResource()); + + final int initialRequestCount = server.getRequestCount(); + try { + this.flinkKubeClient + .checkAndUpdateConfigMap( + TESTING_CONFIG_MAP_NAME, + c -> { + throw new AssertionError( + "The replace operation should have never been triggered."); + }) + .get(); + fail( + "checkAndUpdateConfigMap should fail without a PossibleInconsistentStateException being the cause when number of retries has been exhausted."); + } catch (Exception ex) { + assertThat( + ex, + FlinkMatchers.containsMessage( + "Could not complete the " + + "operation. Number of retries has been exhausted.")); + final int actualRetryCount = server.getRequestCount() - initialRequestCount; + assertThat(actualRetryCount, is(configuredRetries + 1)); + assertThat( + "An error while retrieving the ConfigMap should not cause a PossibleInconsistentStateException.", + ExceptionUtils.findThrowable(ex, PossibleInconsistentStateException.class) + .isPresent(), + is(false)); + } + } + @Test public void testCheckAndUpdateConfigMapWhenReplaceConfigMapFailed() throws Exception { final int configuredRetries = @@ -456,7 +496,7 @@ public void testCheckAndUpdateConfigMapWhenReplaceConfigMapFailed() throws Excep }) .get(); fail( - "CheckAndUpdateConfigMap should fail with exception when number of retries has been exhausted."); + "checkAndUpdateConfigMap should fail due to a PossibleInconsistentStateException when number of retries has been exhausted."); } catch (Exception ex) { assertThat( ex, @@ -464,6 +504,12 @@ public void testCheckAndUpdateConfigMapWhenReplaceConfigMapFailed() throws Excep "Could not complete the " + "operation. Number of retries has been exhausted.")); assertThat(retries.get(), is(configuredRetries + 1)); + + assertThat( + "An error while replacing the ConfigMap should cause an PossibleInconsistentStateException.", + ExceptionUtils.findThrowable(ex, PossibleInconsistentStateException.class) + .isPresent(), + is(true)); } } @@ -504,6 +550,7 @@ private KubernetesConfigMap buildTestingConfigMap() { .withNewMetadata() .withName(TESTING_CONFIG_MAP_NAME) .withLabels(TESTING_LABELS) + .withNamespace(NAMESPACE) .endMetadata() .withData(data) .build()); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index db1509e30fdf1..b31dd8f3c5844 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -37,6 +37,7 @@ import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; import org.apache.flink.runtime.operators.coordination.OperatorInfo; +import org.apache.flink.runtime.persistence.PossibleInconsistentStateException; import org.apache.flink.runtime.state.CheckpointStorage; import org.apache.flink.runtime.state.CheckpointStorageCoordinatorView; import org.apache.flink.runtime.state.CheckpointStorageLocation; @@ -1208,21 +1209,16 @@ private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint) completedCheckpointStore.addCheckpoint( completedCheckpoint, checkpointsCleaner, this::scheduleTriggerRequest); } catch (Exception exception) { - // we failed to store the completed checkpoint. Let's clean up - executor.execute( - new Runnable() { - @Override - public void run() { - try { - completedCheckpoint.discardOnFailedStoring(); - } catch (Throwable t) { - LOG.warn( - "Could not properly discard completed checkpoint {}.", - completedCheckpoint.getCheckpointID(), - t); - } - } - }); + if (exception instanceof PossibleInconsistentStateException) { + LOG.warn( + "An error occurred while writing checkpoint {} to the underlying metadata store. Flink was not able to determine whether the metadata was successfully persisted. The corresponding state located at '{}' won't be discarded and needs to be cleaned up manually.", + completedCheckpoint.getCheckpointID(), + completedCheckpoint.getExternalPointer()); + } else { + // we failed to store the completed checkpoint. Let's clean up + checkpointsCleaner.cleanCheckpointOnFailedStoring( + completedCheckpoint, executor); + } sendAbortedMessages( pendingCheckpoint.getCheckpointPlan().getTasksToCommitTo(), diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java index ab222e7f2d0bb..5591107b8a958 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java @@ -18,6 +18,8 @@ package org.apache.flink.runtime.checkpoint; +import org.apache.flink.util.function.RunnableWithException; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,6 +36,7 @@ @ThreadSafe public class CheckpointsCleaner implements Serializable { private static final Logger LOG = LoggerFactory.getLogger(CheckpointsCleaner.class); + private static final long serialVersionUID = 2545865801947537790L; private final AtomicInteger numberOfCheckpointsToClean; @@ -50,23 +53,44 @@ public void cleanCheckpoint( boolean shouldDiscard, Runnable postCleanAction, Executor executor) { + cleanup( + checkpoint, + () -> { + if (shouldDiscard) { + checkpoint.discard(); + } + }, + postCleanAction, + executor); + } + + public void cleanCheckpointOnFailedStoring( + CompletedCheckpoint completedCheckpoint, Executor executor) { + cleanup( + completedCheckpoint, + completedCheckpoint::discardOnFailedStoring, + () -> {}, + executor); + } + + private void cleanup( + Checkpoint checkpoint, + RunnableWithException cleanupAction, + Runnable postCleanupAction, + Executor executor) { numberOfCheckpointsToClean.incrementAndGet(); executor.execute( () -> { try { - if (shouldDiscard) { - try { - checkpoint.discard(); - } catch (Exception e) { - LOG.warn( - "Could not discard completed checkpoint {}.", - checkpoint.getCheckpointID(), - e); - } - } + cleanupAction.run(); + } catch (Exception e) { + LOG.warn( + "Could not properly discard completed checkpoint {}.", + checkpoint.getCheckpointID(), + e); } finally { numberOfCheckpointsToClean.decrementAndGet(); - postCleanAction.run(); + postCleanupAction.run(); } }); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java index 3490b8e008954..8bc32060d2557 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java @@ -20,6 +20,7 @@ import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.persistence.PossibleInconsistentStateException; import org.apache.flink.runtime.persistence.ResourceVersion; import org.apache.flink.runtime.persistence.StateHandleStore; import org.apache.flink.runtime.state.RetrievableStateHandle; @@ -161,6 +162,9 @@ public void recover() throws Exception { * older ones. * * @param checkpoint Completed checkpoint to add. + * @throws PossibleInconsistentStateException if adding the checkpoint failed and leaving the + * system in an possibly inconsistent state, i.e. it's uncertain whether the checkpoint + * metadata was fully written to the underlying systems or not. */ @Override public void addCheckpoint( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/persistence/PossibleInconsistentStateException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/persistence/PossibleInconsistentStateException.java new file mode 100644 index 0000000000000..364c012e9f1f1 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/persistence/PossibleInconsistentStateException.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.persistence; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.util.FlinkException; + +/** + * {@code PossibleInconsistentStateException} represents errors that might have lead to an + * inconsistent state within the HA resources. + */ +public class PossibleInconsistentStateException extends FlinkException { + + private static final long serialVersionUID = 364105635349022882L; + + @VisibleForTesting + public PossibleInconsistentStateException() { + super("The system might be in an inconsistent state."); + } + + public PossibleInconsistentStateException(String message, Throwable cause) { + super(message, cause); + } + + public PossibleInconsistentStateException(Throwable cause) { + super(cause); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/persistence/StateHandleStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/persistence/StateHandleStore.java index cf5d8dc3c512b..dff5d084f9683 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/persistence/StateHandleStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/persistence/StateHandleStore.java @@ -50,9 +50,14 @@ public interface StateHandleStore addAndLock(String name, T state) throws Exception; + RetrievableStateHandle addAndLock(String name, T state) + throws PossibleInconsistentStateException, Exception; /** * Replaces a state handle in the distributed coordination system and discards the old state @@ -64,9 +69,13 @@ public interface StateHandleStore The type of data handled by the deserialized {@code RetrievableStateHandle}. + * @return The {@code RetrievableStateHandle} instance. + * @throws IOException Any of the usual Input/Output related exceptions. + * @throws ClassNotFoundException If the data couldn't be deserialized into a {@code + * RetrievableStateHandle} referring to the expected type {@code }. + */ + public static T deserialize(byte[] data) + throws IOException, ClassNotFoundException { + return InstantiationUtil.deserializeObject( + data, Thread.currentThread().getContextClassLoader()); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java index 378d49bb61b7c..a7b4c418698c7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java @@ -18,13 +18,14 @@ package org.apache.flink.runtime.zookeeper; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.persistence.IntegerResourceVersion; +import org.apache.flink.runtime.persistence.PossibleInconsistentStateException; import org.apache.flink.runtime.persistence.RetrievableStateStorageHelper; import org.apache.flink.runtime.persistence.StateHandleStore; import org.apache.flink.runtime.state.RetrievableStateHandle; import org.apache.flink.util.ExceptionUtils; -import org.apache.flink.util.InstantiationUtil; import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework; import org.apache.flink.shaded.curator4.org.apache.curator.utils.ZKPaths; @@ -41,8 +42,12 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Set; import java.util.UUID; +import static org.apache.flink.runtime.util.StateHandleStoreUtils.deserialize; +import static org.apache.flink.runtime.util.StateHandleStoreUtils.serializeOrDiscard; +import static org.apache.flink.shaded.guava18.com.google.common.collect.Sets.newHashSet; import static org.apache.flink.util.Preconditions.checkNotNull; /** @@ -82,6 +87,19 @@ public class ZooKeeperStateHandleStore private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperStateHandleStore.class); + @VisibleForTesting + static final Set> PRE_COMMIT_EXCEPTIONS = + newHashSet( + KeeperException.NodeExistsException.class, + KeeperException.BadArgumentsException.class, + KeeperException.NoNodeException.class, + KeeperException.NoAuthException.class, + KeeperException.BadVersionException.class, + KeeperException.AuthFailedException.class, + KeeperException.InvalidACLException.class, + KeeperException.SessionMovedException.class, + KeeperException.NotReadOnlyException.class); + /** Curator ZooKeeper client. */ private final CuratorFramework client; @@ -126,10 +144,14 @@ public ZooKeeperStateHandleStore( * @param pathInZooKeeper Destination path in ZooKeeper (expected to *not* exist yet) * @param state State to be added * @return The Created {@link RetrievableStateHandle}. + * @throws PossibleInconsistentStateException if the write-to-ZooKeeper operation failed. This + * indicates that it's not clear whether the new state was successfully written to ZooKeeper + * or not. Proper error handling has to be applied on the caller's side. * @throws Exception If a ZooKeeper or state handle operation fails */ @Override - public RetrievableStateHandle addAndLock(String pathInZooKeeper, T state) throws Exception { + public RetrievableStateHandle addAndLock(String pathInZooKeeper, T state) + throws PossibleInconsistentStateException, Exception { checkNotNull(pathInZooKeeper, "Path in ZooKeeper"); checkNotNull(state, "State"); @@ -137,45 +159,51 @@ public RetrievableStateHandle addAndLock(String pathInZooKeeper, T state) thr RetrievableStateHandle storeHandle = storage.store(state); - boolean success = false; + byte[] serializedStoreHandle = serializeOrDiscard(storeHandle); try { - // Serialize the state handle. This writes the state to the backend. - byte[] serializedStoreHandle = InstantiationUtil.serializeObject(storeHandle); - - // Write state handle (not the actual state) to ZooKeeper. This is expected to be - // smaller than the state itself. This level of indirection makes sure that data in - // ZooKeeper is small, because ZooKeeper is designed for data in the KB range, but - // the state can be larger. - // Create the lock node in a transaction with the actual state node. That way we can - // prevent - // race conditions with a concurrent delete operation. - client.inTransaction() - .create() - .withMode(CreateMode.PERSISTENT) - .forPath(path, serializedStoreHandle) - .and() - .create() - .withMode(CreateMode.EPHEMERAL) - .forPath(getLockPath(path)) - .and() - .commit(); - - success = true; + writeStoreHandleTransactionally(path, serializedStoreHandle); return storeHandle; - } catch (KeeperException.NodeExistsException e) { - // We wrap the exception here so that it could be caught in DefaultJobGraphStore - throw new AlreadyExistException("ZooKeeper node " + path + " already exists.", e); - } finally { - if (!success) { - // Cleanup the state handle if it was not written to ZooKeeper. - if (storeHandle != null) { - storeHandle.discardState(); - } + } catch (Exception e) { + if (indicatesPossiblyInconsistentState(e)) { + throw new PossibleInconsistentStateException(e); } + + // in any other failure case: discard the state + storeHandle.discardState(); + + // We wrap the exception here so that it could be caught in DefaultJobGraphStore + throw ExceptionUtils.findThrowable(e, KeeperException.NodeExistsException.class) + .map( + nee -> + new AlreadyExistException( + "ZooKeeper node " + path + " already exists.", nee)) + .orElseThrow(() -> e); } } + // this method is provided for the sole purpose of easier testing + @VisibleForTesting + protected void writeStoreHandleTransactionally(String path, byte[] serializedStoreHandle) + throws Exception { + // Write state handle (not the actual state) to ZooKeeper. This is expected to be + // smaller than the state itself. This level of indirection makes sure that data in + // ZooKeeper is small, because ZooKeeper is designed for data in the KB range, but + // the state can be larger. + // Create the lock node in a transaction with the actual state node. That way we can + // prevent race conditions with a concurrent delete operation. + client.inTransaction() + .create() + .withMode(CreateMode.PERSISTENT) + .forPath(path, serializedStoreHandle) + .and() + .create() + .withMode(CreateMode.EPHEMERAL) + .forPath(getLockPath(path)) + .and() + .commit(); + } + /** * Replaces a state handle in ZooKeeper and discards the old state handle. * @@ -196,29 +224,55 @@ public void replace(String pathInZooKeeper, IntegerResourceVersion expectedVersi RetrievableStateHandle newStateHandle = storage.store(state); - boolean success = false; + final byte[] serializedStateHandle = serializeOrDiscard(newStateHandle); + // initialize flags to serve the failure case + boolean discardOldState = false; + boolean discardNewState = true; try { - // Serialize the new state handle. This writes the state to the backend. - byte[] serializedStateHandle = InstantiationUtil.serializeObject(newStateHandle); + setStateHandle(path, serializedStateHandle, expectedVersion.getValue()); + + // swap subject for deletion in case of success + discardOldState = true; + discardNewState = false; + } catch (Exception e) { + if (indicatesPossiblyInconsistentState(e)) { + // it's unclear whether the state handle metadata was written to ZooKeeper - + // hence, we don't discard any data + discardNewState = false; + throw new PossibleInconsistentStateException(e); + } - // Replace state handle in ZooKeeper. - client.setData() - .withVersion(expectedVersion.getValue()) - .forPath(path, serializedStateHandle); - success = true; - } catch (KeeperException.NoNodeException e) { // We wrap the exception here so that it could be caught in DefaultJobGraphStore - throw new NotExistException("ZooKeeper node " + path + " does not exist.", e); + throw ExceptionUtils.findThrowable(e, KeeperException.NoNodeException.class) + .map( + nnee -> + new NotExistException( + "ZooKeeper node " + path + " does not exist.", nnee)) + .orElseThrow(() -> e); } finally { - if (success) { + if (discardOldState) { oldStateHandle.discardState(); - } else { + } + + if (discardNewState) { newStateHandle.discardState(); } } } + // this method is provided for the sole purpose of easier testing + @VisibleForTesting + protected void setStateHandle(String path, byte[] serializedStateHandle, int expectedVersion) + throws Exception { + // Replace state handle in ZooKeeper. + client.setData().withVersion(expectedVersion).forPath(path, serializedStateHandle); + } + + private boolean indicatesPossiblyInconsistentState(Exception e) { + return !PRE_COMMIT_EXCEPTIONS.contains(e.getClass()); + } + /** * Returns the version of the node if it exists or -1 if it doesn't. * @@ -518,9 +572,7 @@ private RetrievableStateHandle get(String pathInZooKeeper, boolean lock) thro try { byte[] data = client.getData().forPath(path); - RetrievableStateHandle retrievableStateHandle = - InstantiationUtil.deserializeObject( - data, Thread.currentThread().getContextClassLoader()); + RetrievableStateHandle retrievableStateHandle = deserialize(data); success = true; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java index 5fac8e06ac83d..6445fe989f36c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java @@ -23,26 +23,35 @@ import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo; import org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionInfo; import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; +import org.apache.flink.runtime.persistence.PossibleInconsistentStateException; import org.apache.flink.runtime.state.InputChannelStateHandle; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.OperatorStreamStateHandle; import org.apache.flink.runtime.state.ResultSubpartitionStateHandle; import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.TestLogger; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import java.util.Collections; import java.util.List; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicInteger; +import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; @@ -53,6 +62,8 @@ /** Tests for failure of checkpoint coordinator. */ public class CheckpointCoordinatorFailureTest extends TestLogger { + @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); + /** * Tests that a failure while storing a completed checkpoint in the completed checkpoint store * will properly fail the originating pending checkpoint and clean upt the completed checkpoint. @@ -75,7 +86,10 @@ public void testFailingCompletedCheckpointStoreAdd() throws Exception { CheckpointCoordinator coord = new CheckpointCoordinatorBuilder() .setExecutionGraph(testGraph) - .setCompletedCheckpointStore(new FailingCompletedCheckpointStore()) + .setCompletedCheckpointStore( + new FailingCompletedCheckpointStore( + new Exception( + "The failing completed checkpoint store failed again... :-("))) .setTimer(manuallyTriggeredScheduledExecutor) .build(); @@ -160,8 +174,86 @@ public void testFailingCompletedCheckpointStoreAdd() throws Exception { .discardState(); } + @Test + public void testCleanupForGenericFailure() throws Exception { + testStoringFailureHandling(new FlinkRuntimeException("Expected exception"), 1); + } + + @Test + public void testCleanupOmissionForPossibleInconsistentStateException() throws Exception { + testStoringFailureHandling(new PossibleInconsistentStateException(), 0); + } + + private void testStoringFailureHandling(Exception failure, int expectedCleanupCalls) + throws Exception { + final JobVertexID jobVertexID1 = new JobVertexID(); + + final ExecutionGraph graph = + new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder() + .addJobVertex(jobVertexID1) + .build(); + + final ExecutionVertex vertex = graph.getJobVertex(jobVertexID1).getTaskVertices()[0]; + final ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId(); + + final StandaloneCheckpointIDCounter checkpointIDCounter = + new StandaloneCheckpointIDCounter(); + + final ManuallyTriggeredScheduledExecutor manuallyTriggeredScheduledExecutor = + new ManuallyTriggeredScheduledExecutor(); + + final CompletedCheckpointStore completedCheckpointStore = + new FailingCompletedCheckpointStore(failure); + + final AtomicInteger cleanupCallCount = new AtomicInteger(0); + final CheckpointCoordinator checkpointCoordinator = + new CheckpointCoordinatorBuilder() + .setExecutionGraph(graph) + .setCheckpointIDCounter(checkpointIDCounter) + .setCheckpointsCleaner( + new CheckpointsCleaner() { + + private static final long serialVersionUID = + 2029876992397573325L; + + @Override + public void cleanCheckpointOnFailedStoring( + CompletedCheckpoint completedCheckpoint, + Executor executor) { + cleanupCallCount.incrementAndGet(); + super.cleanCheckpointOnFailedStoring( + completedCheckpoint, executor); + } + }) + .setCompletedCheckpointStore(completedCheckpointStore) + .setTimer(manuallyTriggeredScheduledExecutor) + .build(); + checkpointCoordinator.triggerSavepoint(tmpFolder.newFolder().getAbsolutePath()); + manuallyTriggeredScheduledExecutor.triggerAll(); + + try { + checkpointCoordinator.receiveAcknowledgeMessage( + new AcknowledgeCheckpoint( + graph.getJobID(), attemptId, checkpointIDCounter.getLast()), + "unknown location"); + fail("CheckpointException should have been thrown."); + } catch (CheckpointException e) { + assertThat( + e.getCheckpointFailureReason(), + is(CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE)); + } + + assertThat(cleanupCallCount.get(), is(expectedCleanupCalls)); + } + private static final class FailingCompletedCheckpointStore implements CompletedCheckpointStore { + private final Exception addCheckpointFailure; + + public FailingCompletedCheckpointStore(Exception addCheckpointFailure) { + this.addCheckpointFailure = addCheckpointFailure; + } + @Override public void recover() throws Exception { throw new UnsupportedOperationException("Not implemented."); @@ -173,7 +265,7 @@ public void addCheckpoint( CheckpointsCleaner checkpointsCleaner, Runnable postCleanup) throws Exception { - throw new Exception("The failing completed checkpoint store failed again... :-("); + throw addCheckpointFailure; } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java index 479717d163286..0693ff2e8885a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java @@ -695,6 +695,12 @@ public CheckpointCoordinatorBuilder setCoordinatorsToCheckpoint( return this; } + public CheckpointCoordinatorBuilder setCheckpointsCleaner( + CheckpointsCleaner checkpointsCleaner) { + this.checkpointsCleaner = checkpointsCleaner; + return this; + } + public CheckpointCoordinatorBuilder setCheckpointIDCounter( CheckpointIDCounter checkpointIDCounter) { this.checkpointIDCounter = checkpointIDCounter; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/persistence/TestingLongStateHandleHelper.java b/flink-runtime/src/test/java/org/apache/flink/runtime/persistence/TestingLongStateHandleHelper.java index 4010391c1e249..fd387b2488127 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/persistence/TestingLongStateHandleHelper.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/persistence/TestingLongStateHandleHelper.java @@ -19,72 +19,122 @@ package org.apache.flink.runtime.persistence; import org.apache.flink.runtime.state.RetrievableStateHandle; +import org.apache.flink.runtime.state.StateObject; +import org.apache.flink.util.AbstractID; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.StringJoiner; /** * Testing implementation for {@link RetrievableStateStorageHelper} and {@link * RetrievableStateHandle} with type {@link Long}. */ -public class TestingLongStateHandleHelper implements RetrievableStateStorageHelper { +public class TestingLongStateHandleHelper + implements RetrievableStateStorageHelper { - private final List stateHandles = new ArrayList<>(); + private static final List STATE_STORAGE = new ArrayList<>(); @Override - public RetrievableStateHandle store(Long state) { - final LongRetrievableStateHandle stateHandle = new LongRetrievableStateHandle(state); - stateHandles.add(stateHandle); + public RetrievableStateHandle store(LongStateHandle state) { + final int pos = STATE_STORAGE.size(); + STATE_STORAGE.add(state); - return stateHandle; + return new LongRetrievableStateHandle(pos); } - public List getStateHandles() { - return stateHandles; + public static LongStateHandle createState(long value) { + return new LongStateHandle(value); } - /** Testing {@link RetrievableStateStorageHelper} implementation with {@link Long}. */ - public static class LongRetrievableStateHandle implements RetrievableStateHandle { + public static long getStateHandleValueByIndex(int index) { + return STATE_STORAGE.get(index).getValue(); + } - private static final long serialVersionUID = -3555329254423838912L; + public static int getDiscardCallCountForStateHandleByIndex(int index) { + return STATE_STORAGE.get(index).getNumberOfDiscardCalls(); + } - private static AtomicInteger numberOfGlobalDiscardCalls = new AtomicInteger(0); + public static int getGlobalStorageSize() { + return STATE_STORAGE.size(); + } + + public static void clearGlobalState() { + STATE_STORAGE.clear(); + } + + public static int getGlobalDiscardCount() { + return STATE_STORAGE.stream().mapToInt(LongStateHandle::getNumberOfDiscardCalls).sum(); + } - private final Long state; + /** + * {@code LongStateHandle} implements {@link StateObject} to monitor the {@link + * StateObject#discardState()} calls. + */ + public static class LongStateHandle implements StateObject { + + private static final long serialVersionUID = -5752042587113549855L; + + private final Long value; private int numberOfDiscardCalls = 0; - public LongRetrievableStateHandle(Long state) { - this.state = state; + public LongStateHandle(long value) { + this.value = value; } - @Override - public Long retrieveState() { - return state; + public long getValue() { + return value; } @Override public void discardState() { - numberOfGlobalDiscardCalls.incrementAndGet(); numberOfDiscardCalls++; } + public int getNumberOfDiscardCalls() { + return numberOfDiscardCalls; + } + @Override public long getStateSize() { - return 0; + return 8L; } - public int getNumberOfDiscardCalls() { - return numberOfDiscardCalls; + @Override + public String toString() { + return new StringJoiner(", ", LongStateHandle.class.getSimpleName() + "[", "]") + .add("value=" + value) + .add("numberOfDiscardCalls=" + numberOfDiscardCalls) + .toString(); } + } + + /** Testing {@link RetrievableStateStorageHelper} implementation with {@link Long}. */ + public static class LongRetrievableStateHandle + implements RetrievableStateHandle { - public static int getNumberOfGlobalDiscardCalls() { - return numberOfGlobalDiscardCalls.get(); + private static final long serialVersionUID = -3555329254423838912L; + + private final int stateReference; + + public LongRetrievableStateHandle(int stateReference) { + this.stateReference = stateReference; } - public static void clearNumberOfGlobalDiscardCalls() { - numberOfGlobalDiscardCalls.set(0); + @Override + public LongStateHandle retrieveState() { + return STATE_STORAGE.get(stateReference); + } + + @Override + public void discardState() { + STATE_STORAGE.get(stateReference).discardState(); + } + + @Override + public long getStateSize() { + return AbstractID.SIZE; } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/StateHandleStoreUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/StateHandleStoreUtilsTest.java new file mode 100644 index 0000000000000..eb79ade5b6f7f --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/StateHandleStoreUtilsTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util; + +import org.apache.flink.runtime.persistence.TestingLongStateHandleHelper; +import org.apache.flink.runtime.state.StateObject; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.RunnableWithException; + +import org.junit.Test; + +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +/** + * {@code StateHandleStoreUtilsTest} tests the utility classes collected in {@link + * StateHandleStoreUtils}. + */ +public class StateHandleStoreUtilsTest extends TestLogger { + + @Test + public void testSerializationAndDeserialization() throws Exception { + final TestingLongStateHandleHelper.LongStateHandle original = + new TestingLongStateHandleHelper.LongStateHandle(42L); + byte[] serializedData = StateHandleStoreUtils.serializeOrDiscard(original); + + final TestingLongStateHandleHelper.LongStateHandle deserializedInstance = + StateHandleStoreUtils.deserialize(serializedData); + assertThat(deserializedInstance.getStateSize(), is(original.getStateSize())); + assertThat(deserializedInstance.getValue(), is(original.getValue())); + } + + @Test + public void testSerializeOrDiscardFailureHandling() throws Exception { + final AtomicBoolean discardCalled = new AtomicBoolean(false); + final StateObject original = + new FailingSerializationStateObject(() -> discardCalled.set(true)); + + try { + StateHandleStoreUtils.serializeOrDiscard(original); + fail("An IOException is expected to be thrown."); + } catch (IOException e) { + // IOException is expected + } + + assertThat(discardCalled.get(), is(true)); + } + + @Test + public void testSerializationOrDiscardWithDiscardFailure() throws Exception { + final Exception discardException = + new IllegalStateException( + "Expected IllegalStateException that should be suppressed."); + final StateObject original = + new FailingSerializationStateObject( + () -> { + throw discardException; + }); + + try { + StateHandleStoreUtils.serializeOrDiscard(original); + fail("An IOException is expected to be thrown."); + } catch (IOException e) { + // IOException is expected + assertThat(e.getSuppressed().length, is(1)); + assertThat(e.getSuppressed()[0], is(discardException)); + } + } + + private static class FailingSerializationStateObject implements StateObject { + + private static final long serialVersionUID = 6382458109061973983L; + private final RunnableWithException discardStateRunnable; + + public FailingSerializationStateObject(RunnableWithException discardStateRunnable) { + this.discardStateRunnable = discardStateRunnable; + } + + private void writeObject(ObjectOutputStream outputStream) throws IOException { + throw new IOException("Expected IOException to test serialization error."); + } + + @Override + public void discardState() throws Exception { + discardStateRunnable.run(); + } + + @Override + public long getStateSize() { + return 0; + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java index c89e9d5acb3fd..0acaa80560a56 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java @@ -22,17 +22,20 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.runtime.persistence.IntegerResourceVersion; +import org.apache.flink.runtime.persistence.PossibleInconsistentStateException; import org.apache.flink.runtime.persistence.RetrievableStateStorageHelper; +import org.apache.flink.runtime.persistence.StateHandleStore; import org.apache.flink.runtime.persistence.TestingLongStateHandleHelper; -import org.apache.flink.runtime.persistence.TestingLongStateHandleHelper.LongRetrievableStateHandle; import org.apache.flink.runtime.state.RetrievableStateHandle; import org.apache.flink.runtime.util.ZooKeeperUtils; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.TestLogger; import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework; +import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException; import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.data.Stat; +import org.hamcrest.core.IsInstanceOf; import org.junit.AfterClass; import org.junit.Before; import org.junit.Test; @@ -82,26 +85,27 @@ public static void tearDown() throws Exception { @Before public void cleanUp() throws Exception { ZOOKEEPER.deleteAll(); + TestingLongStateHandleHelper.clearGlobalState(); } /** Tests add operation with lock. */ @Test public void testAddAndLock() throws Exception { final TestingLongStateHandleHelper longStateStorage = new TestingLongStateHandleHelper(); - ZooKeeperStateHandleStore store = + ZooKeeperStateHandleStore store = new ZooKeeperStateHandleStore<>(ZOOKEEPER.getClient(), longStateStorage); // Config final String pathInZooKeeper = "/testAdd"; - final Long state = 1239712317L; + final long state = 1239712317L; // Test - store.addAndLock(pathInZooKeeper, state); + store.addAndLock(pathInZooKeeper, new TestingLongStateHandleHelper.LongStateHandle(state)); // Verify // State handle created assertEquals(1, store.getAllAndLock().size()); - assertEquals(state, store.getAndLock(pathInZooKeeper).retrieveState()); + assertEquals(state, store.getAndLock(pathInZooKeeper).retrieveState().getValue()); // Path created and is persistent Stat stat = ZOOKEEPER.getClient().checkExists().forPath(pathInZooKeeper); @@ -121,64 +125,134 @@ public void testAddAndLock() throws Exception { // Data is equal @SuppressWarnings("unchecked") - Long actual = - ((RetrievableStateHandle) + final long actual = + ((RetrievableStateHandle) InstantiationUtil.deserializeObject( ZOOKEEPER.getClient().getData().forPath(pathInZooKeeper), ClassLoader.getSystemClassLoader())) - .retrieveState(); + .retrieveState() + .getValue(); assertEquals(state, actual); } - /** Tests that an existing path throws an Exception. */ - @Test(expected = Exception.class) - public void testAddAlreadyExistingPath() throws Exception { + /** + * Tests that the created state handle is not discarded if ZooKeeper create fails with an + * generic exception. + */ + @Test + public void testFailingAddWithPossiblyInconsistentState() throws Exception { final TestingLongStateHandleHelper stateHandleProvider = new TestingLongStateHandleHelper(); - ZooKeeperStateHandleStore store = - new ZooKeeperStateHandleStore<>(ZOOKEEPER.getClient(), stateHandleProvider); + CuratorFramework client = spy(ZOOKEEPER.getClient()); + when(client.inTransaction().create()) + .thenThrow(new RuntimeException("Expected test Exception.")); + + ZooKeeperStateHandleStore store = + new ZooKeeperStateHandleStore<>(client, stateHandleProvider); - ZOOKEEPER.getClient().create().forPath("/testAddAlreadyExistingPath"); + // Config + final String pathInZooKeeper = "/testAddDiscardStateHandleAfterFailure"; + final long state = 81282227L; - store.addAndLock("/testAddAlreadyExistingPath", 1L); + try { + // Test + store.addAndLock( + pathInZooKeeper, new TestingLongStateHandleHelper.LongStateHandle(state)); + fail("PossibleInconsistentStateException should have been thrown."); + } catch (PossibleInconsistentStateException ignored) { + // PossibleInconsistentStateException expected + } - // writing to the state storage should have succeeded - assertEquals(1, stateHandleProvider.getStateHandles()); + // State handle created and not discarded + assertEquals(1, TestingLongStateHandleHelper.getGlobalStorageSize()); + assertEquals(state, TestingLongStateHandleHelper.getStateHandleValueByIndex(0)); + assertEquals(0, TestingLongStateHandleHelper.getDiscardCallCountForStateHandleByIndex(0)); + } - // the created state handle should have been cleaned up if the add operation failed - assertEquals(1, stateHandleProvider.getStateHandles().get(0).getNumberOfDiscardCalls()); + @Test + public void testAddFailureHandlingForNodeExistsException() { + testFailingAddWithStateDiscardTriggeredFor( + new KeeperException.NodeExistsException(), + StateHandleStore.AlreadyExistException.class); } - /** Tests that the created state handle is discarded if ZooKeeper create fails. */ @Test - public void testAddDiscardStateHandleAfterFailure() throws Exception { - // Setup - final TestingLongStateHandleHelper stateHandleProvider = new TestingLongStateHandleHelper(); + public void testAddFailureHandlingForBadArgumentsException() { + testFailingAddWithStateDiscardTriggeredFor(new KeeperException.BadArgumentsException()); + } - CuratorFramework client = spy(ZOOKEEPER.getClient()); - when(client.inTransaction().create()) - .thenThrow(new RuntimeException("Expected test Exception.")); + @Test + public void testAddFailureHandlingForNoNodeException() { + testFailingAddWithStateDiscardTriggeredFor(new KeeperException.NoNodeException()); + } - ZooKeeperStateHandleStore store = - new ZooKeeperStateHandleStore<>(client, stateHandleProvider); + @Test + public void testAddFailureHandlingForNoAuthException() { + testFailingAddWithStateDiscardTriggeredFor(new KeeperException.NoAuthException()); + } + + @Test + public void testAddFailureHandlingForBadVersionException() { + testFailingAddWithStateDiscardTriggeredFor(new KeeperException.BadVersionException()); + } + + @Test + public void testAddFailureHandlingForAuthFailedException() { + testFailingAddWithStateDiscardTriggeredFor(new KeeperException.AuthFailedException()); + } + + @Test + public void testAddFailureHandlingForInvalidACLException() { + testFailingAddWithStateDiscardTriggeredFor(new KeeperException.InvalidACLException()); + } + + @Test + public void testAddFailureHandlingForSessionMovedException() { + testFailingAddWithStateDiscardTriggeredFor(new KeeperException.SessionMovedException()); + } + + @Test + public void testAddFailureHandlingForNotReadOnlyException() { + testFailingAddWithStateDiscardTriggeredFor(new KeeperException.NotReadOnlyException()); + } + + private static void testFailingAddWithStateDiscardTriggeredFor(Exception actualException) { + testFailingAddWithStateDiscardTriggeredFor(actualException, actualException.getClass()); + } + + private static void testFailingAddWithStateDiscardTriggeredFor( + Exception actualException, Class expectedException) { + final TestingLongStateHandleHelper stateHandleProvider = new TestingLongStateHandleHelper(); + + ZooKeeperStateHandleStore store = + new ZooKeeperStateHandleStore( + ZOOKEEPER.getClient(), stateHandleProvider) { + @Override + protected void writeStoreHandleTransactionally( + String path, byte[] serializedStoreHandle) throws Exception { + throw actualException; + } + }; // Config - final String pathInZooKeeper = "/testAddDiscardStateHandleAfterFailure"; - final Long state = 81282227L; + final String pathInZooKeeper = + "/testAddDiscardStateHandleAfterFailure-" + expectedException.getSimpleName(); + final long state = 81282227L; try { // Test - store.addAndLock(pathInZooKeeper, state); - fail("Did not throw expected exception"); - } catch (Exception ignored) { + store.addAndLock( + pathInZooKeeper, new TestingLongStateHandleHelper.LongStateHandle(state)); + fail(expectedException.getSimpleName() + " should have been thrown."); + } catch (Exception ex) { + assertThat(ex, IsInstanceOf.instanceOf(expectedException)); } - // Verify // State handle created and discarded - assertEquals(1, stateHandleProvider.getStateHandles().size()); - assertEquals(state, stateHandleProvider.getStateHandles().get(0).retrieveState()); - assertEquals(1, stateHandleProvider.getStateHandles().get(0).getNumberOfDiscardCalls()); + assertEquals(1, TestingLongStateHandleHelper.getGlobalStorageSize()); + assertEquals(state, TestingLongStateHandleHelper.getStateHandleValueByIndex(0)); + assertEquals(1, TestingLongStateHandleHelper.getDiscardCallCountForStateHandleByIndex(0)); } /** Tests that a state handle is replaced. */ @@ -187,23 +261,27 @@ public void testReplace() throws Exception { // Setup final TestingLongStateHandleHelper stateHandleProvider = new TestingLongStateHandleHelper(); - ZooKeeperStateHandleStore store = + ZooKeeperStateHandleStore store = new ZooKeeperStateHandleStore<>(ZOOKEEPER.getClient(), stateHandleProvider); // Config final String pathInZooKeeper = "/testReplace"; - final Long initialState = 30968470898L; - final Long replaceState = 88383776661L; + final long initialState = 30968470898L; + final long replaceState = 88383776661L; // Test - store.addAndLock(pathInZooKeeper, initialState); - store.replace(pathInZooKeeper, IntegerResourceVersion.valueOf(0), replaceState); + store.addAndLock( + pathInZooKeeper, new TestingLongStateHandleHelper.LongStateHandle(initialState)); + store.replace( + pathInZooKeeper, + IntegerResourceVersion.valueOf(0), + new TestingLongStateHandleHelper.LongStateHandle(replaceState)); // Verify // State handles created - assertEquals(2, stateHandleProvider.getStateHandles().size()); - assertEquals(initialState, stateHandleProvider.getStateHandles().get(0).retrieveState()); - assertEquals(replaceState, stateHandleProvider.getStateHandles().get(1).retrieveState()); + assertEquals(2, TestingLongStateHandleHelper.getGlobalStorageSize()); + assertEquals(initialState, TestingLongStateHandleHelper.getStateHandleValueByIndex(0)); + assertEquals(replaceState, TestingLongStateHandleHelper.getStateHandleValueByIndex(1)); // Path created and is persistent Stat stat = ZOOKEEPER.getClient().checkExists().forPath(pathInZooKeeper); @@ -212,12 +290,13 @@ public void testReplace() throws Exception { // Data is equal @SuppressWarnings("unchecked") - Long actual = - ((RetrievableStateHandle) + final long actual = + ((RetrievableStateHandle) InstantiationUtil.deserializeObject( ZOOKEEPER.getClient().getData().forPath(pathInZooKeeper), ClassLoader.getSystemClassLoader())) - .retrieveState(); + .retrieveState() + .getValue(); assertEquals(replaceState, actual); } @@ -225,12 +304,16 @@ public void testReplace() throws Exception { /** Tests that a non existing path throws an Exception. */ @Test(expected = Exception.class) public void testReplaceNonExistingPath() throws Exception { - final RetrievableStateStorageHelper stateStorage = new TestingLongStateHandleHelper(); + final RetrievableStateStorageHelper + stateStorage = new TestingLongStateHandleHelper(); - ZooKeeperStateHandleStore store = + ZooKeeperStateHandleStore store = new ZooKeeperStateHandleStore<>(ZOOKEEPER.getClient(), stateStorage); - store.replace("/testReplaceNonExistingPath", IntegerResourceVersion.valueOf(0), 1L); + store.replace( + "/testReplaceNonExistingPath", + IntegerResourceVersion.valueOf(0), + new TestingLongStateHandleHelper.LongStateHandle(1L)); } /** Tests that the replace state handle is discarded if ZooKeeper setData fails. */ @@ -242,38 +325,151 @@ public void testReplaceDiscardStateHandleAfterFailure() throws Exception { CuratorFramework client = spy(ZOOKEEPER.getClient()); when(client.setData()).thenThrow(new RuntimeException("Expected test Exception.")); - ZooKeeperStateHandleStore store = + ZooKeeperStateHandleStore store = new ZooKeeperStateHandleStore<>(client, stateHandleProvider); // Config final String pathInZooKeeper = "/testReplaceDiscardStateHandleAfterFailure"; - final Long initialState = 30968470898L; - final Long replaceState = 88383776661L; + final long initialState = 30968470898L; + final long replaceState = 88383776661L; // Test - store.addAndLock(pathInZooKeeper, initialState); + store.addAndLock( + pathInZooKeeper, new TestingLongStateHandleHelper.LongStateHandle(initialState)); try { - store.replace(pathInZooKeeper, IntegerResourceVersion.valueOf(0), replaceState); + store.replace( + pathInZooKeeper, + IntegerResourceVersion.valueOf(0), + new TestingLongStateHandleHelper.LongStateHandle(replaceState)); fail("Did not throw expected exception"); } catch (Exception ignored) { } // Verify // State handle created and discarded - assertEquals(2, stateHandleProvider.getStateHandles().size()); - assertEquals(initialState, stateHandleProvider.getStateHandles().get(0).retrieveState()); - assertEquals(replaceState, stateHandleProvider.getStateHandles().get(1).retrieveState()); - assertEquals(1, stateHandleProvider.getStateHandles().get(1).getNumberOfDiscardCalls()); + assertEquals(2, TestingLongStateHandleHelper.getGlobalStorageSize()); + assertEquals(initialState, TestingLongStateHandleHelper.getStateHandleValueByIndex(0)); + assertEquals(replaceState, TestingLongStateHandleHelper.getStateHandleValueByIndex(1)); + assertThat(TestingLongStateHandleHelper.getDiscardCallCountForStateHandleByIndex(0), is(0)); + assertThat(TestingLongStateHandleHelper.getDiscardCallCountForStateHandleByIndex(1), is(0)); + + // Initial value + @SuppressWarnings("unchecked") + final long actual = + ((RetrievableStateHandle) + InstantiationUtil.deserializeObject( + ZOOKEEPER.getClient().getData().forPath(pathInZooKeeper), + ClassLoader.getSystemClassLoader())) + .retrieveState() + .getValue(); + + assertEquals(initialState, actual); + } + + @Test + public void testDiscardAfterReplaceFailureWithNoNodeException() throws Exception { + testDiscardAfterReplaceFailureWith( + new KeeperException.NoNodeException(), StateHandleStore.NotExistException.class); + } + + @Test + public void testDiscardAfterReplaceFailureWithNodeExistsException() throws Exception { + testDiscardAfterReplaceFailureWith(new KeeperException.NodeExistsException()); + } + + @Test + public void testDiscardAfterReplaceFailureWithBadArgumentsException() throws Exception { + testDiscardAfterReplaceFailureWith(new KeeperException.BadArgumentsException()); + } + + @Test + public void testDiscardAfterReplaceFailureWithNoAuthException() throws Exception { + testDiscardAfterReplaceFailureWith(new KeeperException.NoAuthException()); + } + + @Test + public void testDiscardAfterReplaceFailureWithBadVersionException() throws Exception { + testDiscardAfterReplaceFailureWith(new KeeperException.BadVersionException()); + } + + @Test + public void testDiscardAfterReplaceFailureWithAuthFailedException() throws Exception { + testDiscardAfterReplaceFailureWith(new KeeperException.AuthFailedException()); + } + + @Test + public void testDiscardAfterReplaceFailureWithInvalidACLException() throws Exception { + testDiscardAfterReplaceFailureWith(new KeeperException.InvalidACLException()); + } + + @Test + public void testDiscardAfterReplaceFailureWithSessionMovedException() throws Exception { + testDiscardAfterReplaceFailureWith(new KeeperException.SessionMovedException()); + } + + @Test + public void testDiscardAfterReplaceFailureWithNotReadOnlyException() throws Exception { + testDiscardAfterReplaceFailureWith(new KeeperException.NotReadOnlyException()); + } + + private static void testDiscardAfterReplaceFailureWith(Exception actualException) + throws Exception { + testDiscardAfterReplaceFailureWith(actualException, actualException.getClass()); + } + + private static void testDiscardAfterReplaceFailureWith( + Exception actualException, Class expectedException) + throws Exception { + final TestingLongStateHandleHelper stateHandleProvider = new TestingLongStateHandleHelper(); + + ZooKeeperStateHandleStore store = + new ZooKeeperStateHandleStore( + ZOOKEEPER.getClient(), stateHandleProvider) { + @Override + protected void setStateHandle( + String path, byte[] serializedStateHandle, int expectedVersion) + throws Exception { + throw actualException; + } + }; + + // Config + final String pathInZooKeeper = + "/testReplaceDiscardStateHandleAfterFailure-" + expectedException.getSimpleName(); + final long initialState = 30968470898L; + final long replaceState = 88383776661L; + + // Test + store.addAndLock( + pathInZooKeeper, new TestingLongStateHandleHelper.LongStateHandle(initialState)); + + try { + store.replace( + pathInZooKeeper, + IntegerResourceVersion.valueOf(0), + new TestingLongStateHandleHelper.LongStateHandle(replaceState)); + fail("Did not throw expected exception"); + } catch (Throwable t) { + assertThat(t, IsInstanceOf.instanceOf(expectedException)); + } + + // State handle created and discarded + assertEquals(2, TestingLongStateHandleHelper.getGlobalStorageSize()); + assertEquals(initialState, TestingLongStateHandleHelper.getStateHandleValueByIndex(0)); + assertEquals(replaceState, TestingLongStateHandleHelper.getStateHandleValueByIndex(1)); + assertThat(TestingLongStateHandleHelper.getDiscardCallCountForStateHandleByIndex(0), is(0)); + assertThat(TestingLongStateHandleHelper.getDiscardCallCountForStateHandleByIndex(1), is(1)); // Initial value @SuppressWarnings("unchecked") - Long actual = - ((RetrievableStateHandle) + final long actual = + ((RetrievableStateHandle) InstantiationUtil.deserializeObject( ZOOKEEPER.getClient().getData().forPath(pathInZooKeeper), ClassLoader.getSystemClassLoader())) - .retrieveState(); + .retrieveState() + .getValue(); assertEquals(initialState, actual); } @@ -284,21 +480,22 @@ public void testGetAndExists() throws Exception { // Setup final TestingLongStateHandleHelper stateHandleProvider = new TestingLongStateHandleHelper(); - ZooKeeperStateHandleStore store = + ZooKeeperStateHandleStore store = new ZooKeeperStateHandleStore<>(ZOOKEEPER.getClient(), stateHandleProvider); // Config final String pathInZooKeeper = "/testGetAndExists"; - final Long state = 311222268470898L; + final long state = 311222268470898L; // Test assertThat(store.exists(pathInZooKeeper).isExisting(), is(false)); - store.addAndLock(pathInZooKeeper, state); - RetrievableStateHandle actual = store.getAndLock(pathInZooKeeper); + store.addAndLock(pathInZooKeeper, new TestingLongStateHandleHelper.LongStateHandle(state)); + RetrievableStateHandle actual = + store.getAndLock(pathInZooKeeper); // Verify - assertEquals(state, actual.retrieveState()); + assertEquals(state, actual.retrieveState().getValue()); assertTrue(store.exists(pathInZooKeeper).getValue() >= 0); } @@ -307,7 +504,7 @@ public void testGetAndExists() throws Exception { public void testGetNonExistingPath() throws Exception { final TestingLongStateHandleHelper stateHandleProvider = new TestingLongStateHandleHelper(); - ZooKeeperStateHandleStore store = + ZooKeeperStateHandleStore store = new ZooKeeperStateHandleStore<>(ZOOKEEPER.getClient(), stateHandleProvider); store.getAndLock("/testGetNonExistingPath"); @@ -319,7 +516,7 @@ public void testGetAll() throws Exception { // Setup final TestingLongStateHandleHelper stateHandleProvider = new TestingLongStateHandleHelper(); - ZooKeeperStateHandleStore store = + ZooKeeperStateHandleStore store = new ZooKeeperStateHandleStore<>(ZOOKEEPER.getClient(), stateHandleProvider); // Config @@ -333,11 +530,13 @@ public void testGetAll() throws Exception { // Test for (long val : expected) { - store.addAndLock(pathInZooKeeper + val, val); + store.addAndLock( + pathInZooKeeper + val, new TestingLongStateHandleHelper.LongStateHandle(val)); } - for (Tuple2, String> val : store.getAllAndLock()) { - assertTrue(expected.remove(val.f0.retrieveState())); + for (Tuple2, String> + val : store.getAllAndLock()) { + assertTrue(expected.remove(val.f0.retrieveState().getValue())); } assertEquals(0, expected.size()); } @@ -348,7 +547,7 @@ public void testGetAllSortedByName() throws Exception { // Setup final TestingLongStateHandleHelper stateHandleProvider = new TestingLongStateHandleHelper(); - ZooKeeperStateHandleStore store = + ZooKeeperStateHandleStore store = new ZooKeeperStateHandleStore<>(ZOOKEEPER.getClient(), stateHandleProvider); // Config @@ -359,10 +558,12 @@ public void testGetAllSortedByName() throws Exception { // Test for (long val : expected) { final String pathInZooKeeper = String.format("%s%016d", basePath, val); - store.addAndLock(pathInZooKeeper, val); + store.addAndLock( + pathInZooKeeper, new TestingLongStateHandleHelper.LongStateHandle(val)); } - List, String>> actual = store.getAllAndLock(); + List, String>> + actual = store.getAllAndLock(); assertEquals(expected.length, actual.size()); // bring the elements in sort order @@ -370,7 +571,7 @@ public void testGetAllSortedByName() throws Exception { Collections.sort(actual, Comparator.comparing(o -> o.f1)); for (int i = 0; i < expected.length; i++) { - assertEquals(expected[i], actual.get(i).f0.retrieveState()); + assertEquals(expected[i], (Long) actual.get(i).f0.retrieveState().getValue()); } } @@ -380,17 +581,16 @@ public void testRemove() throws Exception { // Setup final TestingLongStateHandleHelper stateHandleProvider = new TestingLongStateHandleHelper(); - ZooKeeperStateHandleStore store = + ZooKeeperStateHandleStore store = new ZooKeeperStateHandleStore<>(ZOOKEEPER.getClient(), stateHandleProvider); // Config final String pathInZooKeeper = "/testRemove"; - final Long state = 27255442L; - store.addAndLock(pathInZooKeeper, state); + store.addAndLock( + pathInZooKeeper, new TestingLongStateHandleHelper.LongStateHandle(27255442L)); - final int numberOfGlobalDiscardCalls = - LongRetrievableStateHandle.getNumberOfGlobalDiscardCalls(); + final int numberOfGlobalDiscardCalls = TestingLongStateHandleHelper.getGlobalDiscardCount(); // Test store.releaseAndTryRemove(pathInZooKeeper); @@ -399,7 +599,7 @@ public void testRemove() throws Exception { assertEquals(0, ZOOKEEPER.getClient().getChildren().forPath("/").size()); assertEquals( numberOfGlobalDiscardCalls + 1, - LongRetrievableStateHandle.getNumberOfGlobalDiscardCalls()); + TestingLongStateHandleHelper.getGlobalDiscardCount()); } /** Tests that all state handles are correctly discarded. */ @@ -408,7 +608,7 @@ public void testReleaseAndTryRemoveAll() throws Exception { // Setup final TestingLongStateHandleHelper stateHandleProvider = new TestingLongStateHandleHelper(); - ZooKeeperStateHandleStore store = + ZooKeeperStateHandleStore store = new ZooKeeperStateHandleStore<>(ZOOKEEPER.getClient(), stateHandleProvider); // Config @@ -422,7 +622,8 @@ public void testReleaseAndTryRemoveAll() throws Exception { // Test for (long val : expected) { - store.addAndLock(pathInZooKeeper + val, val); + store.addAndLock( + pathInZooKeeper + val, new TestingLongStateHandleHelper.LongStateHandle(val)); } store.releaseAndTryRemoveAll(); @@ -439,7 +640,7 @@ public void testReleaseAndTryRemoveAll() throws Exception { public void testCorruptedData() throws Exception { final TestingLongStateHandleHelper stateStorage = new TestingLongStateHandleHelper(); - ZooKeeperStateHandleStore store = + ZooKeeperStateHandleStore store = new ZooKeeperStateHandleStore<>(ZOOKEEPER.getClient(), stateStorage); final Collection input = new HashSet<>(); @@ -448,21 +649,23 @@ public void testCorruptedData() throws Exception { input.add(3L); for (Long aLong : input) { - store.addAndLock("/" + aLong, aLong); + store.addAndLock("/" + aLong, new TestingLongStateHandleHelper.LongStateHandle(aLong)); } // corrupt one of the entries ZOOKEEPER.getClient().setData().forPath("/" + 2, new byte[2]); - List, String>> allEntries = store.getAllAndLock(); + List, String>> + allEntries = store.getAllAndLock(); Collection expected = new HashSet<>(input); expected.remove(2L); Collection actual = new HashSet<>(expected.size()); - for (Tuple2, String> entry : allEntries) { - actual.add(entry.f0.retrieveState()); + for (Tuple2, String> + entry : allEntries) { + actual.add(entry.f0.retrieveState().getValue()); } assertEquals(expected, actual); @@ -478,23 +681,24 @@ public void testCorruptedData() throws Exception { public void testConcurrentDeleteOperation() throws Exception { final TestingLongStateHandleHelper longStateStorage = new TestingLongStateHandleHelper(); - ZooKeeperStateHandleStore zkStore1 = + ZooKeeperStateHandleStore zkStore1 = new ZooKeeperStateHandleStore<>(ZOOKEEPER.getClient(), longStateStorage); - ZooKeeperStateHandleStore zkStore2 = + ZooKeeperStateHandleStore zkStore2 = new ZooKeeperStateHandleStore<>(ZOOKEEPER.getClient(), longStateStorage); final String statePath = "/state"; - zkStore1.addAndLock(statePath, 42L); - RetrievableStateHandle stateHandle = zkStore2.getAndLock(statePath); + zkStore1.addAndLock(statePath, new TestingLongStateHandleHelper.LongStateHandle(42L)); + RetrievableStateHandle stateHandle = + zkStore2.getAndLock(statePath); // this should not remove the referenced node because we are still holding a state handle // reference via zkStore2 zkStore1.releaseAndTryRemove(statePath); // sanity check - assertEquals(42L, (long) stateHandle.retrieveState()); + assertEquals(42L, stateHandle.retrieveState().getValue()); Stat nodeStat = ZOOKEEPER.getClient().checkExists().forPath(statePath); @@ -521,15 +725,15 @@ public void testConcurrentDeleteOperation() throws Exception { public void testLockCleanupWhenGetAndLockFails() throws Exception { final TestingLongStateHandleHelper longStateStorage = new TestingLongStateHandleHelper(); - ZooKeeperStateHandleStore zkStore1 = + ZooKeeperStateHandleStore zkStore1 = new ZooKeeperStateHandleStore<>(ZOOKEEPER.getClient(), longStateStorage); - ZooKeeperStateHandleStore zkStore2 = + ZooKeeperStateHandleStore zkStore2 = new ZooKeeperStateHandleStore<>(ZOOKEEPER.getClient(), longStateStorage); final String path = "/state"; - zkStore1.addAndLock(path, 42L); + zkStore1.addAndLock(path, new TestingLongStateHandleHelper.LongStateHandle(42L)); final byte[] corruptedData = {1, 2}; @@ -581,12 +785,12 @@ public void testLockCleanupWhenClientTimesOut() throws Exception { try (CuratorFramework client = ZooKeeperUtils.startCuratorFramework(configuration); CuratorFramework client2 = ZooKeeperUtils.startCuratorFramework(configuration)) { - ZooKeeperStateHandleStore zkStore = + ZooKeeperStateHandleStore zkStore = new ZooKeeperStateHandleStore<>(client, longStateStorage); final String path = "/state"; - zkStore.addAndLock(path, 42L); + zkStore.addAndLock(path, new TestingLongStateHandleHelper.LongStateHandle(42L)); // this should delete all ephemeral nodes client.close(); @@ -612,12 +816,12 @@ public void testLockCleanupWhenClientTimesOut() throws Exception { public void testRelease() throws Exception { final TestingLongStateHandleHelper longStateStorage = new TestingLongStateHandleHelper(); - ZooKeeperStateHandleStore zkStore = + ZooKeeperStateHandleStore zkStore = new ZooKeeperStateHandleStore<>(ZOOKEEPER.getClient(), longStateStorage); final String path = "/state"; - zkStore.addAndLock(path, 42L); + zkStore.addAndLock(path, new TestingLongStateHandleHelper.LongStateHandle(42L)); final String lockPath = zkStore.getLockPath(path); @@ -648,13 +852,13 @@ public void testRelease() throws Exception { public void testReleaseAll() throws Exception { final TestingLongStateHandleHelper longStateStorage = new TestingLongStateHandleHelper(); - ZooKeeperStateHandleStore zkStore = + ZooKeeperStateHandleStore zkStore = new ZooKeeperStateHandleStore<>(ZOOKEEPER.getClient(), longStateStorage); final Collection paths = Arrays.asList("/state1", "/state2", "/state3"); for (String path : paths) { - zkStore.addAndLock(path, 42L); + zkStore.addAndLock(path, new TestingLongStateHandleHelper.LongStateHandle(42L)); } for (String path : paths) { @@ -680,12 +884,12 @@ public void testReleaseAll() throws Exception { @Test public void testRemoveAllHandlesShouldRemoveAllPaths() throws Exception { - final ZooKeeperStateHandleStore zkStore = + final ZooKeeperStateHandleStore zkStore = new ZooKeeperStateHandleStore<>( ZooKeeperUtils.useNamespaceAndEnsurePath(ZOOKEEPER.getClient(), "/path"), new TestingLongStateHandleHelper()); - zkStore.addAndLock("/state", 1L); + zkStore.addAndLock("/state", new TestingLongStateHandleHelper.LongStateHandle(1L)); zkStore.clearEntries(); assertThat(zkStore.getAllHandles(), is(empty()));