Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesException;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector;
import org.apache.flink.runtime.persistence.PossibleInconsistentStateException;
import org.apache.flink.runtime.persistence.RetrievableStateStorageHelper;
import org.apache.flink.runtime.persistence.StateHandleStore;
import org.apache.flink.runtime.persistence.StringResourceVersion;
import org.apache.flink.runtime.state.RetrievableStateHandle;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.InstantiationUtil;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -48,6 +48,8 @@
import java.util.function.Predicate;
import java.util.stream.Collectors;

import static org.apache.flink.runtime.util.StateHandleStoreUtils.deserialize;
import static org.apache.flink.runtime.util.StateHandleStoreUtils.serializeOrDiscard;
import static org.apache.flink.util.Preconditions.checkNotNull;

/**
Expand Down Expand Up @@ -114,21 +116,28 @@ public KubernetesStateHandleStore(
* @param key Key in ConfigMap
* @param state State to be added
* @throws AlreadyExistException if the name already exists
* @throws PossibleInconsistentStateException if the write-to-Kubernetes operation failed. This
* indicates that it's not clear whether the new state was successfully written to
* Kubernetes or not. No state was discarded. Proper error handling has to be applied on the
* caller's side.
* @throws Exception if persisting state or writing state handle failed
*/
@Override
public RetrievableStateHandle<T> addAndLock(String key, T state) throws Exception {
public RetrievableStateHandle<T> addAndLock(String key, T state)
throws PossibleInconsistentStateException, Exception {
checkNotNull(key, "Key in ConfigMap.");
checkNotNull(state, "State.");

final RetrievableStateHandle<T> storeHandle = storage.store(state);

boolean success = false;
final byte[] serializedStoreHandle = serializeOrDiscard(storeHandle);

// initialize flag to serve the failure case
boolean discardState = true;
try {
final byte[] serializedStoreHandle = InstantiationUtil.serializeObject(storeHandle);
success =
kubeClient
// a successful operation will result in the state not being discarded
discardState =
!kubeClient
.checkAndUpdateConfigMap(
configMapName,
c -> {
Expand All @@ -151,14 +160,20 @@ public RetrievableStateHandle<T> addAndLock(String key, T state) throws Exceptio
.get();
return storeHandle;
} catch (Exception ex) {
final Optional<PossibleInconsistentStateException> possibleInconsistentStateException =
ExceptionUtils.findThrowable(ex, PossibleInconsistentStateException.class);
if (possibleInconsistentStateException.isPresent()) {
// it's unclear whether the state handle metadata was written to the ConfigMap -
// hence, we don't discard the data
discardState = false;
throw possibleInconsistentStateException.get();
}

throw ExceptionUtils.findThrowable(ex, AlreadyExistException.class)
.orElseThrow(() -> ex);
} finally {
if (!success) {
// Cleanup the state handle if it was not written to ConfigMap.
if (storeHandle != null) {
storeHandle.discardState();
}
if (discardState) {
storeHandle.discardState();
}
}
}
Expand All @@ -173,6 +188,9 @@ public RetrievableStateHandle<T> addAndLock(String key, T state) throws Exceptio
* @param resourceVersion resource version when checking existence via {@link #exists}.
* @param state State to be added
* @throws NotExistException if the name does not exist
* @throws PossibleInconsistentStateException if a failure occurred during the update operation.
* It's unclear whether the operation actually succeeded or not. No state was discarded. The
* method's caller should handle this case properly.
* @throws Exception if persisting state or writing state handle failed
*/
@Override
Expand All @@ -185,11 +203,13 @@ public void replace(String key, StringResourceVersion resourceVersion, T state)

final RetrievableStateHandle<T> newStateHandle = storage.store(state);

boolean success = false;
final byte[] serializedStateHandle = serializeOrDiscard(newStateHandle);

// initialize flags to serve the failure case
boolean discardOldState = false;
boolean discardNewState = true;
try {
final byte[] serializedStoreHandle = InstantiationUtil.serializeObject(newStateHandle);
success =
boolean success =
kubeClient
.checkAndUpdateConfigMap(
configMapName,
Expand All @@ -202,7 +222,7 @@ public void replace(String key, StringResourceVersion resourceVersion, T state)
.put(
key,
encodeStateHandle(
serializedStoreHandle));
serializedStateHandle));
} else {
throw new CompletionException(
getKeyNotExistException(key));
Expand All @@ -212,14 +232,29 @@ public void replace(String key, StringResourceVersion resourceVersion, T state)
return Optional.empty();
})
.get();

// swap subject for deletion in case of success
discardOldState = success;
discardNewState = !success;
} catch (Exception ex) {
final Optional<PossibleInconsistentStateException> possibleInconsistentStateException =
ExceptionUtils.findThrowable(ex, PossibleInconsistentStateException.class);
if (possibleInconsistentStateException.isPresent()) {
// it's unclear whether the state handle metadata was written to the ConfigMap -
// hence, we don't discard any data
discardNewState = false;
throw possibleInconsistentStateException.get();
}

throw ExceptionUtils.findThrowable(ex, NotExistException.class).orElseThrow(() -> ex);
} finally {
if (success) {
oldStateHandle.discardState();
} else {
if (discardNewState) {
newStateHandle.discardState();
}

if (discardOldState) {
oldStateHandle.discardState();
}
}
}

Expand Down Expand Up @@ -476,8 +511,7 @@ private RetrievableStateHandle<T> deserializeObject(String content) throws IOExc
final byte[] data = Base64.getDecoder().decode(content);

try {
return InstantiationUtil.deserializeObject(
data, Thread.currentThread().getContextClassLoader());
return deserialize(data);
} catch (IOException | ClassNotFoundException e) {
throw new IOException(
"Failed to deserialize state handle from ConfigMap data " + content + '.', e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.kubernetes.utils.KubernetesUtils;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.persistence.PossibleInconsistentStateException;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.FlinkRuntimeException;
Expand Down Expand Up @@ -290,13 +291,17 @@ public CompletableFuture<Boolean> checkAndUpdateConfigMap(
Throwable
throwable) {
LOG.debug(
"Failed to update ConfigMap {} with data {} because of concurrent "
+ "modifications. Trying again.",
"Failed to update ConfigMap {} with data {}. Trying again.",
configMap
.getName(),
configMap
.getData());
throw throwable;
// the
// client
// implementation does not expose the different kind of error causes to a degree that we could do a more fine-grained error handling here
throw new CompletionException(
new PossibleInconsistentStateException(
throwable));
}
return true;
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch;
import org.apache.flink.runtime.persistence.PossibleInconsistentStateException;

import java.io.File;
import java.util.List;
Expand Down Expand Up @@ -151,7 +152,12 @@ KubernetesLeaderElector createLeaderElector(
* one. If the returned optional is empty, we will not do the update.
* @return Return the ConfigMap update future. The boolean result indicates whether the
* ConfigMap is updated. The returned future will be completed exceptionally if the
* ConfigMap does not exist.
* ConfigMap does not exist. A failure during the update operation will result in the future
* failing with a {@link PossibleInconsistentStateException} indicating that no clear
* decision can be made on whether the update was successful or not. The {@code
* PossibleInconsistentStateException} not being present indicates that the failure happened
* before writing the updated ConfigMap to Kubernetes. For the latter case, it can be
* assumed that the ConfigMap was not updated.
*/
CompletableFuture<Boolean> checkAndUpdateConfigMap(
String configMapName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,16 @@
import io.fabric8.kubernetes.api.model.ServicePortBuilder;
import io.fabric8.kubernetes.api.model.ServiceStatus;
import io.fabric8.kubernetes.api.model.ServiceStatusBuilder;
import io.fabric8.mockwebserver.dsl.DelayPathable;
import io.fabric8.mockwebserver.dsl.HttpMethodable;
import io.fabric8.mockwebserver.dsl.MockServerExpectation;
import io.fabric8.mockwebserver.dsl.ReturnOrWebsocketable;
import io.fabric8.mockwebserver.dsl.TimesOnceableOrHttpHeaderable;

import javax.annotation.Nullable;

import java.util.Collections;
import java.util.function.Function;

/**
* Base class for {@link KubernetesClusterDescriptorTest} and {@link
Expand All @@ -53,14 +59,35 @@ protected void mockExpectedServiceFromServerSide(Service expectedService) {
}

protected void mockCreateConfigMapAlreadyExisting(ConfigMap configMap) {
final String path = String.format("/api/v1/namespaces/%s/configmaps", NAMESPACE);
final String path =
String.format(
"/api/%s/namespaces/%s/configmaps",
configMap.getApiVersion(), configMap.getMetadata().getNamespace());
server.expect().post().withPath(path).andReturn(500, configMap).always();
}

protected void mockGetConfigMapFailed(ConfigMap configMap) {
mockConfigMapRequest(configMap, HttpMethodable::get);
}

protected void mockReplaceConfigMapFailed(ConfigMap configMap) {
final String name = configMap.getMetadata().getName();
final String path = String.format("/api/v1/namespaces/%s/configmaps/%s", NAMESPACE, name);
server.expect().put().withPath(path).andReturn(500, configMap).always();
mockConfigMapRequest(configMap, HttpMethodable::put);
}

private void mockConfigMapRequest(
ConfigMap configMap,
Function<
MockServerExpectation,
DelayPathable<
ReturnOrWebsocketable<TimesOnceableOrHttpHeaderable<Void>>>>
methodTypeSetter) {
final String path =
String.format(
"/api/%s/namespaces/%s/configmaps/%s",
configMap.getApiVersion(),
configMap.getMetadata().getNamespace(),
configMap.getMetadata().getName());
methodTypeSetter.apply(server.expect()).withPath(path).andReturn(500, configMap).always();
}

protected Service buildExternalServiceWithLoadBalancer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public MixedKubernetesServer(boolean https, boolean crudMode) {
}

public void before() {
HashMap<ServerRequest, Queue<ServerResponse>> response = new HashMap<>();
final HashMap<ServerRequest, Queue<ServerResponse>> response = new HashMap<>();
mock =
crudMode
? new KubernetesMockServer(
Expand All @@ -77,6 +77,10 @@ public RecordedRequest takeRequest(long timeout, TimeUnit unit) throws Exception
return mockWebServer.takeRequest(timeout, unit);
}

public int getRequestCount() {
return mockWebServer.getRequestCount();
}

public MockServerExpectation expect() {
return mock.expect();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ public void testMultipleKubernetesStateHandleStores() throws Exception {
new TestingLeaderCallbackHandler[leaderNum];

@SuppressWarnings("unchecked")
final KubernetesStateHandleStore<Long>[] stateHandleStores =
new KubernetesStateHandleStore[leaderNum];
final KubernetesStateHandleStore<TestingLongStateHandleHelper.LongStateHandle>[]
Copy link
Contributor Author

@XComp XComp May 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Kubernetes-related ITCases are currently not running. I created FLINK-20564 to cover this. I haven't had the chance to run the ITCase locally, yet. I checked that the test passes locally including my changes.

stateHandleStores = new KubernetesStateHandleStore[leaderNum];

try {
for (int i = 0; i < leaderNum; i++) {
Expand Down Expand Up @@ -103,17 +103,19 @@ public void testMultipleKubernetesStateHandleStores() throws Exception {
if (leaderCallbackHandlers[i].getLockIdentity().equals(lockIdentity)) {
expectedState = (long) i;
}
stateHandleStores[i].addAndLock(KEY, (long) i);
stateHandleStores[i].addAndLock(
KEY, new TestingLongStateHandleHelper.LongStateHandle(i));
}

// Only the leader could add successfully
assertThat(expectedState, is(notNullValue()));
assertThat(stateHandleStores[0].getAllAndLock().size(), is(1));
assertThat(
stateHandleStores[0].getAllAndLock().get(0).f0.retrieveState(),
stateHandleStores[0].getAllAndLock().get(0).f0.retrieveState().getValue(),
is(expectedState));
assertThat(stateHandleStores[0].getAllAndLock().get(0).f1, is(KEY));
} finally {
TestingLongStateHandleHelper.clearGlobalState();
// Cleanup the resources
for (int i = 0; i < leaderNum; i++) {
if (leaderElectors[i] != null) {
Expand Down
Loading