Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesException;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector;
import org.apache.flink.runtime.persistence.PossibleInconsistentStateException;
import org.apache.flink.runtime.persistence.RetrievableStateStorageHelper;
import org.apache.flink.runtime.persistence.StateHandleStore;
import org.apache.flink.runtime.persistence.StringResourceVersion;
import org.apache.flink.runtime.state.RetrievableStateHandle;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.InstantiationUtil;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -48,6 +48,8 @@
import java.util.function.Predicate;
import java.util.stream.Collectors;

import static org.apache.flink.runtime.util.StateHandleStoreUtils.deserialize;
import static org.apache.flink.runtime.util.StateHandleStoreUtils.serializeOrDiscard;
import static org.apache.flink.util.Preconditions.checkNotNull;

/**
Expand Down Expand Up @@ -114,21 +116,28 @@ public KubernetesStateHandleStore(
* @param key Key in ConfigMap
* @param state State to be added
* @throws AlreadyExistException if the name already exists
* @throws PossibleInconsistentStateException if the write-to-Kubernetes operation failed. This
* indicates that it's not clear whether the new state was successfully written to
* Kubernetes or not. No state was discarded. Proper error handling has to be applied on the
* caller's side.
* @throws Exception if persisting state or writing state handle failed
*/
@Override
public RetrievableStateHandle<T> addAndLock(String key, T state) throws Exception {
public RetrievableStateHandle<T> addAndLock(String key, T state)
throws PossibleInconsistentStateException, Exception {
checkNotNull(key, "Key in ConfigMap.");
checkNotNull(state, "State.");

final RetrievableStateHandle<T> storeHandle = storage.store(state);

boolean success = false;
final byte[] serializedStoreHandle = serializeOrDiscard(storeHandle);

// initialize flag to serve the failure case
boolean discardState = true;
try {
final byte[] serializedStoreHandle = InstantiationUtil.serializeObject(storeHandle);
success =
kubeClient
// a successful operation will result in the state not being discarded
discardState =
!kubeClient
.checkAndUpdateConfigMap(
configMapName,
c -> {
Expand All @@ -151,14 +160,20 @@ public RetrievableStateHandle<T> addAndLock(String key, T state) throws Exceptio
.get();
return storeHandle;
} catch (Exception ex) {
final Optional<PossibleInconsistentStateException> possibleInconsistentStateException =
ExceptionUtils.findThrowable(ex, PossibleInconsistentStateException.class);
if (possibleInconsistentStateException.isPresent()) {
// it's unclear whether the state handle metadata was written to the ConfigMap -
// hence, we don't discard the data
discardState = false;
throw possibleInconsistentStateException.get();
}

throw ExceptionUtils.findThrowable(ex, AlreadyExistException.class)
.orElseThrow(() -> ex);
} finally {
if (!success) {
// Cleanup the state handle if it was not written to ConfigMap.
if (storeHandle != null) {
storeHandle.discardState();
}
if (discardState) {
storeHandle.discardState();
}
}
}
Expand All @@ -173,6 +188,9 @@ public RetrievableStateHandle<T> addAndLock(String key, T state) throws Exceptio
* @param resourceVersion resource version when checking existence via {@link #exists}.
* @param state State to be added
* @throws NotExistException if the name does not exist
* @throws PossibleInconsistentStateException if a failure occurred during the update operation.
* It's unclear whether the operation actually succeeded or not. No state was discarded. The
* method's caller should handle this case properly.
* @throws Exception if persisting state or writing state handle failed
*/
@Override
Expand All @@ -185,11 +203,13 @@ public void replace(String key, StringResourceVersion resourceVersion, T state)

final RetrievableStateHandle<T> newStateHandle = storage.store(state);

boolean success = false;
final byte[] serializedStateHandle = serializeOrDiscard(newStateHandle);

// initialize flags to serve the failure case
boolean discardOldState = false;
boolean discardNewState = true;
try {
final byte[] serializedStoreHandle = InstantiationUtil.serializeObject(newStateHandle);
success =
boolean success =
kubeClient
.checkAndUpdateConfigMap(
configMapName,
Expand All @@ -202,7 +222,7 @@ public void replace(String key, StringResourceVersion resourceVersion, T state)
.put(
key,
encodeStateHandle(
serializedStoreHandle));
serializedStateHandle));
} else {
throw new CompletionException(
getKeyNotExistException(key));
Expand All @@ -212,14 +232,29 @@ public void replace(String key, StringResourceVersion resourceVersion, T state)
return Optional.empty();
})
.get();

// swap subject for deletion in case of success
discardOldState = success;
discardNewState = !success;
} catch (Exception ex) {
final Optional<PossibleInconsistentStateException> possibleInconsistentStateException =
ExceptionUtils.findThrowable(ex, PossibleInconsistentStateException.class);
if (possibleInconsistentStateException.isPresent()) {
// it's unclear whether the state handle metadata was written to the ConfigMap -
// hence, we don't discard any data
discardNewState = false;
throw possibleInconsistentStateException.get();
}

throw ExceptionUtils.findThrowable(ex, NotExistException.class).orElseThrow(() -> ex);
} finally {
if (success) {
oldStateHandle.discardState();
} else {
if (discardNewState) {
newStateHandle.discardState();
}

if (discardOldState) {
oldStateHandle.discardState();
}
}
}

Expand Down Expand Up @@ -476,8 +511,7 @@ private RetrievableStateHandle<T> deserializeObject(String content) throws IOExc
final byte[] data = Base64.getDecoder().decode(content);

try {
return InstantiationUtil.deserializeObject(
data, Thread.currentThread().getContextClassLoader());
return deserialize(data);
} catch (IOException | ClassNotFoundException e) {
throw new IOException(
"Failed to deserialize state handle from ConfigMap data " + content + '.', e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.kubernetes.utils.KubernetesUtils;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.persistence.PossibleInconsistentStateException;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.FlinkRuntimeException;
Expand Down Expand Up @@ -290,13 +291,17 @@ public CompletableFuture<Boolean> checkAndUpdateConfigMap(
Throwable
throwable) {
LOG.debug(
"Failed to update ConfigMap {} with data {} because of concurrent "
+ "modifications. Trying again.",
"Failed to update ConfigMap {} with data {}. Trying again.",
configMap
.getName(),
configMap
.getData());
throw throwable;
// the
// client
// implementation does not expose the different kind of error causes to a degree that we could do a more fine-grained error handling here
throw new CompletionException(
new PossibleInconsistentStateException(
throwable));
}
return true;
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch;
import org.apache.flink.runtime.persistence.PossibleInconsistentStateException;

import java.io.File;
import java.util.List;
Expand Down Expand Up @@ -151,7 +152,12 @@ KubernetesLeaderElector createLeaderElector(
* one. If the returned optional is empty, we will not do the update.
* @return Return the ConfigMap update future. The boolean result indicates whether the
* ConfigMap is updated. The returned future will be completed exceptionally if the
* ConfigMap does not exist.
* ConfigMap does not exist. A failure during the update operation will result in the future
* failing with a {@link PossibleInconsistentStateException} indicating that no clear
* decision can be made on whether the update was successful or not. The {@code
* PossibleInconsistentStateException} not being present indicates that the failure happened
* before writing the updated ConfigMap to Kubernetes. For the latter case, it can be
* assumed that the ConfigMap was not updated.
*/
CompletableFuture<Boolean> checkAndUpdateConfigMap(
String configMapName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,16 @@
import io.fabric8.kubernetes.api.model.ServicePortBuilder;
import io.fabric8.kubernetes.api.model.ServiceStatus;
import io.fabric8.kubernetes.api.model.ServiceStatusBuilder;
import io.fabric8.mockwebserver.dsl.DelayPathable;
import io.fabric8.mockwebserver.dsl.HttpMethodable;
import io.fabric8.mockwebserver.dsl.MockServerExpectation;
import io.fabric8.mockwebserver.dsl.ReturnOrWebsocketable;
import io.fabric8.mockwebserver.dsl.TimesOnceableOrHttpHeaderable;

import javax.annotation.Nullable;

import java.util.Collections;
import java.util.function.Function;

/**
* Base class for {@link KubernetesClusterDescriptorTest} and {@link
Expand All @@ -53,14 +59,35 @@ protected void mockExpectedServiceFromServerSide(Service expectedService) {
}

protected void mockCreateConfigMapAlreadyExisting(ConfigMap configMap) {
final String path = String.format("/api/v1/namespaces/%s/configmaps", NAMESPACE);
final String path =
String.format(
"/api/%s/namespaces/%s/configmaps",
configMap.getApiVersion(), configMap.getMetadata().getNamespace());
server.expect().post().withPath(path).andReturn(500, configMap).always();
}

protected void mockGetConfigMapFailed(ConfigMap configMap) {
mockConfigMapRequest(configMap, HttpMethodable::get);
}

protected void mockReplaceConfigMapFailed(ConfigMap configMap) {
final String name = configMap.getMetadata().getName();
final String path = String.format("/api/v1/namespaces/%s/configmaps/%s", NAMESPACE, name);
server.expect().put().withPath(path).andReturn(500, configMap).always();
mockConfigMapRequest(configMap, HttpMethodable::put);
}

private void mockConfigMapRequest(
ConfigMap configMap,
Function<
MockServerExpectation,
DelayPathable<
ReturnOrWebsocketable<TimesOnceableOrHttpHeaderable<Void>>>>
methodTypeSetter) {
final String path =
String.format(
"/api/%s/namespaces/%s/configmaps/%s",
configMap.getApiVersion(),
configMap.getMetadata().getNamespace(),
configMap.getMetadata().getName());
methodTypeSetter.apply(server.expect()).withPath(path).andReturn(500, configMap).always();
}

protected Service buildExternalServiceWithLoadBalancer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public MixedKubernetesServer(boolean https, boolean crudMode) {
}

public void before() {
HashMap<ServerRequest, Queue<ServerResponse>> response = new HashMap<>();
final HashMap<ServerRequest, Queue<ServerResponse>> response = new HashMap<>();
mock =
crudMode
? new KubernetesMockServer(
Expand All @@ -77,6 +77,10 @@ public RecordedRequest takeRequest(long timeout, TimeUnit unit) throws Exception
return mockWebServer.takeRequest(timeout, unit);
}

public int getRequestCount() {
return mockWebServer.getRequestCount();
}

public MockServerExpectation expect() {
return mock.expect();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ public void testMultipleKubernetesStateHandleStores() throws Exception {
new TestingLeaderCallbackHandler[leaderNum];

@SuppressWarnings("unchecked")
final KubernetesStateHandleStore<Long>[] stateHandleStores =
new KubernetesStateHandleStore[leaderNum];
final KubernetesStateHandleStore<TestingLongStateHandleHelper.LongStateHandle>[]
stateHandleStores = new KubernetesStateHandleStore[leaderNum];

try {
for (int i = 0; i < leaderNum; i++) {
Expand Down Expand Up @@ -103,17 +103,19 @@ public void testMultipleKubernetesStateHandleStores() throws Exception {
if (leaderCallbackHandlers[i].getLockIdentity().equals(lockIdentity)) {
expectedState = (long) i;
}
stateHandleStores[i].addAndLock(KEY, (long) i);
stateHandleStores[i].addAndLock(
KEY, new TestingLongStateHandleHelper.LongStateHandle(i));
}

// Only the leader could add successfully
assertThat(expectedState, is(notNullValue()));
assertThat(stateHandleStores[0].getAllAndLock().size(), is(1));
assertThat(
stateHandleStores[0].getAllAndLock().get(0).f0.retrieveState(),
stateHandleStores[0].getAllAndLock().get(0).f0.retrieveState().getValue(),
is(expectedState));
assertThat(stateHandleStores[0].getAllAndLock().get(0).f1, is(KEY));
} finally {
TestingLongStateHandleHelper.clearGlobalState();
// Cleanup the resources
for (int i = 0; i < leaderNum; i++) {
if (leaderElectors[i] != null) {
Expand Down
Loading