diff --git a/api/src/test/java/io/grpc/LoadBalancerRegistryTest.java b/api/src/test/java/io/grpc/LoadBalancerRegistryTest.java index 5b348b7adab..a30d26a78f6 100644 --- a/api/src/test/java/io/grpc/LoadBalancerRegistryTest.java +++ b/api/src/test/java/io/grpc/LoadBalancerRegistryTest.java @@ -40,7 +40,7 @@ public void getClassesViaHardcoded_classesPresent() throws Exception { @Test public void stockProviders() { LoadBalancerRegistry defaultRegistry = LoadBalancerRegistry.getDefaultRegistry(); - assertThat(defaultRegistry.providers()).hasSize(3); + assertThat(defaultRegistry.providers()).hasSize(4); LoadBalancerProvider pickFirst = defaultRegistry.getProvider("pick_first"); assertThat(pickFirst).isInstanceOf(PickFirstLoadBalancerProvider.class); @@ -56,6 +56,11 @@ public void stockProviders() { assertThat(outlierDetection.getClass().getName()).isEqualTo( "io.grpc.util.OutlierDetectionLoadBalancerProvider"); assertThat(roundRobin.getPriority()).isEqualTo(5); + + LoadBalancerProvider randomSubsetting = defaultRegistry.getProvider("random_subsetting"); + assertThat(randomSubsetting.getClass().getName()).isEqualTo( + "io.grpc.util.RandomSubsettingLoadBalancerProvider"); + assertThat(randomSubsetting.getPriority()).isEqualTo(5); } @Test diff --git a/util/build.gradle b/util/build.gradle index 6fbd6925c00..846b110b106 100644 --- a/util/build.gradle +++ b/util/build.gradle @@ -58,6 +58,7 @@ animalsniffer { tasks.named("javadoc").configure { exclude 'io/grpc/util/MultiChildLoadBalancer.java' exclude 'io/grpc/util/OutlierDetectionLoadBalancer*' + exclude 'io/grpc/util/RandomSubsettingLoadBalancer*' exclude 'io/grpc/util/RoundRobinLoadBalancer*' } diff --git a/util/src/main/java/io/grpc/util/RandomSubsettingLoadBalancer.java b/util/src/main/java/io/grpc/util/RandomSubsettingLoadBalancer.java new file mode 100644 index 00000000000..748dabde80d --- /dev/null +++ b/util/src/main/java/io/grpc/util/RandomSubsettingLoadBalancer.java @@ -0,0 +1,157 @@ +/* + * Copyright 2025 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.util; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.hash.HashCode; +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hashing; +import io.grpc.EquivalentAddressGroup; +import io.grpc.LoadBalancer; +import io.grpc.Status; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Random; + + +/** + * Wraps a child {@code LoadBalancer}, separating the total set of backends into smaller subsets for + * the child balancer to balance across. + * + *

This implements random subsetting gRFC: + * https://https://github.com/grpc/proposal/blob/master/A68-random-subsetting.md + */ +final class RandomSubsettingLoadBalancer extends LoadBalancer { + private final GracefulSwitchLoadBalancer switchLb; + private final HashFunction hashFunc; + + public RandomSubsettingLoadBalancer(Helper helper) { + switchLb = new GracefulSwitchLoadBalancer(checkNotNull(helper, "helper")); + int seed = new Random().nextInt(); + hashFunc = Hashing.murmur3_128(seed); + } + + @Override + public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { + RandomSubsettingLoadBalancerConfig config = + (RandomSubsettingLoadBalancerConfig) + resolvedAddresses.getLoadBalancingPolicyConfig(); + + ResolvedAddresses subsetAddresses = filterEndpoints(resolvedAddresses, config.subsetSize); + + return switchLb.acceptResolvedAddresses( + subsetAddresses.toBuilder() + .setLoadBalancingPolicyConfig(config.childConfig) + .build()); + } + + // implements the subsetting algorithm, as described in A68: + // https://github.com/grpc/proposal/pull/423 + private ResolvedAddresses filterEndpoints(ResolvedAddresses resolvedAddresses, int subsetSize) { + if (subsetSize >= resolvedAddresses.getAddresses().size()) { + return resolvedAddresses; + } + + ArrayList endpointWithHashList = + new ArrayList<>(resolvedAddresses.getAddresses().size()); + + for (EquivalentAddressGroup addressGroup : resolvedAddresses.getAddresses()) { + endpointWithHashList.add( + new EndpointWithHash( + addressGroup, + hashFunc.hashString( + addressGroup.getAddresses().get(0).toString(), + StandardCharsets.UTF_8))); + } + + Collections.sort(endpointWithHashList, new HashAddressComparator()); + + ArrayList addressGroups = new ArrayList<>(subsetSize); + + for (int idx = 0; idx < subsetSize; ++idx) { + addressGroups.add(endpointWithHashList.get(idx).addressGroup); + } + + return resolvedAddresses.toBuilder().setAddresses(addressGroups).build(); + } + + @Override + public void handleNameResolutionError(Status error) { + switchLb.handleNameResolutionError(error); + } + + @Override + public void shutdown() { + switchLb.shutdown(); + } + + private static final class EndpointWithHash { + public final EquivalentAddressGroup addressGroup; + public final HashCode hashCode; + + public EndpointWithHash(EquivalentAddressGroup addressGroup, HashCode hashCode) { + this.addressGroup = addressGroup; + this.hashCode = hashCode; + } + } + + private static final class HashAddressComparator implements Comparator { + @Override + public int compare(EndpointWithHash lhs, EndpointWithHash rhs) { + return Long.compare(lhs.hashCode.asLong(), rhs.hashCode.asLong()); + } + } + + public static final class RandomSubsettingLoadBalancerConfig { + public final int subsetSize; + public final Object childConfig; + + private RandomSubsettingLoadBalancerConfig(int subsetSize, Object childConfig) { + this.subsetSize = subsetSize; + this.childConfig = childConfig; + } + + public static class Builder { + int subsetSize; + Object childConfig; + + public Builder setSubsetSize(long subsetSize) { + checkArgument(subsetSize > 0L, "Subset size must be greater than 0"); + // clamping subset size to Integer.MAX_VALUE due to collection indexing limitations in JVM + long subsetSizeClamped = Math.min(subsetSize, (long) Integer.MAX_VALUE); + // safe narrowing cast due to clamping + this.subsetSize = (int) subsetSizeClamped; + return this; + } + + public Builder setChildConfig(Object childConfig) { + this.childConfig = checkNotNull(childConfig, "childConfig"); + return this; + } + + public RandomSubsettingLoadBalancerConfig build() { + return new RandomSubsettingLoadBalancerConfig( + subsetSize, + checkNotNull(childConfig, "childConfig")); + } + } + } +} diff --git a/util/src/main/java/io/grpc/util/RandomSubsettingLoadBalancerProvider.java b/util/src/main/java/io/grpc/util/RandomSubsettingLoadBalancerProvider.java new file mode 100644 index 00000000000..54680823803 --- /dev/null +++ b/util/src/main/java/io/grpc/util/RandomSubsettingLoadBalancerProvider.java @@ -0,0 +1,86 @@ +/* + * Copyright 2025 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.util; + +import io.grpc.Internal; +import io.grpc.LoadBalancer; +import io.grpc.LoadBalancerProvider; +import io.grpc.NameResolver.ConfigOrError; +import io.grpc.Status; +import io.grpc.internal.JsonUtil; +import java.util.Map; + +@Internal +public final class RandomSubsettingLoadBalancerProvider extends LoadBalancerProvider { + private static final String POLICY_NAME = "random_subsetting"; + + @Override + public LoadBalancer newLoadBalancer(LoadBalancer.Helper helper) { + return new RandomSubsettingLoadBalancer(helper); + } + + @Override + public boolean isAvailable() { + return true; + } + + @Override + public int getPriority() { + return 5; + } + + @Override + public String getPolicyName() { + return POLICY_NAME; + } + + @Override + public ConfigOrError parseLoadBalancingPolicyConfig(Map rawConfig) { + try { + return parseLoadBalancingPolicyConfigInternal(rawConfig); + } catch (RuntimeException e) { + return ConfigOrError.fromError( + Status.UNAVAILABLE + .withCause(e) + .withDescription("Failed parsing configuration for " + getPolicyName())); + } + } + + private ConfigOrError parseLoadBalancingPolicyConfigInternal(Map rawConfig) { + Long subsetSize = JsonUtil.getNumberAsLong(rawConfig, "subsetSize"); + if (subsetSize == null) { + return ConfigOrError.fromError( + Status.INTERNAL.withDescription( + "Subset size missing in " + getPolicyName() + ", LB policy config=" + rawConfig)); + } + + ConfigOrError childConfig = GracefulSwitchLoadBalancer.parseLoadBalancingPolicyConfig( + JsonUtil.getListOfObjects(rawConfig, "childPolicy")); + if (childConfig.getError() != null) { + return ConfigOrError.fromError(Status.INTERNAL + .withDescription( + "Failed to parse child in " + getPolicyName() + ", LB policy config=" + rawConfig) + .withCause(childConfig.getError().asRuntimeException())); + } + + return ConfigOrError.fromConfig( + new RandomSubsettingLoadBalancer.RandomSubsettingLoadBalancerConfig.Builder() + .setSubsetSize(subsetSize) + .setChildConfig(childConfig.getConfig()) + .build()); + } +} diff --git a/util/src/main/resources/META-INF/services/io.grpc.LoadBalancerProvider b/util/src/main/resources/META-INF/services/io.grpc.LoadBalancerProvider index 1fdd69cb00b..d973a6f6728 100644 --- a/util/src/main/resources/META-INF/services/io.grpc.LoadBalancerProvider +++ b/util/src/main/resources/META-INF/services/io.grpc.LoadBalancerProvider @@ -1,2 +1,3 @@ io.grpc.util.SecretRoundRobinLoadBalancerProvider$Provider io.grpc.util.OutlierDetectionLoadBalancerProvider +io.grpc.util.RandomSubsettingLoadBalancerProvider diff --git a/util/src/test/java/io/grpc/util/RandomSubsettingLoadBalancerProviderTest.java b/util/src/test/java/io/grpc/util/RandomSubsettingLoadBalancerProviderTest.java new file mode 100644 index 00000000000..830ad9723d8 --- /dev/null +++ b/util/src/test/java/io/grpc/util/RandomSubsettingLoadBalancerProviderTest.java @@ -0,0 +1,133 @@ +/* + * Copyright 2025 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.util; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; + +import io.grpc.InternalServiceProviders; +import io.grpc.LoadBalancer.Helper; +import io.grpc.LoadBalancerProvider; +import io.grpc.NameResolver.ConfigOrError; +import io.grpc.Status; +import io.grpc.internal.JsonParser; +import io.grpc.util.RandomSubsettingLoadBalancer.RandomSubsettingLoadBalancerConfig; +import java.io.IOException; +import java.util.Map; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class RandomSubsettingLoadBalancerProviderTest { + private final RandomSubsettingLoadBalancerProvider provider = + new RandomSubsettingLoadBalancerProvider(); + + @Test + public void registered() { + for (LoadBalancerProvider current : + InternalServiceProviders.getCandidatesViaServiceLoader( + LoadBalancerProvider.class, getClass().getClassLoader())) { + if (current instanceof RandomSubsettingLoadBalancerProvider) { + return; + } + } + fail("RandomSubsettingLoadBalancerProvider not registered"); + } + + @Test + public void providesLoadBalancer() { + Helper helper = mock(Helper.class); + assertThat(provider.newLoadBalancer(helper)) + .isInstanceOf(RandomSubsettingLoadBalancer.class); + } + + @Test + public void parseConfigRequiresSubsetSize() throws IOException { + String emptyConfig = "{}"; + + ConfigOrError configOrError = + provider.parseLoadBalancingPolicyConfig(parseJsonObject(emptyConfig)); + assertThat(configOrError.getError()).isNotNull(); + assertThat(configOrError.getError().toString()) + .isEqualTo( + Status.INTERNAL + .withDescription("Subset size missing in random_subsetting, LB policy config={}") + .toString()); + } + + @Test + public void parseConfigReturnsErrorWhenChildPolicyMissing() throws IOException { + String missingChildPolicyConfig = "{\"subsetSize\": 3}"; + + ConfigOrError configOrError = + provider.parseLoadBalancingPolicyConfig(parseJsonObject(missingChildPolicyConfig)); + assertThat(configOrError.getError()).isNotNull(); + + Status error = configOrError.getError(); + assertThat(error.getCode()).isEqualTo(Status.Code.INTERNAL); + assertThat(error.getDescription()).isEqualTo( + "Failed to parse child in random_subsetting" + + ", LB policy config={subsetSize=3.0}"); + assertThat(error.getCause().getMessage()).isEqualTo("INTERNAL: No child LB config specified"); + } + + @Test + public void parseConfigReturnsErrorWhenChildPolicyInvalid() throws IOException { + String invalidChildPolicyConfig = + "{" + + "\"subsetSize\": 3, " + + "\"childPolicy\" : [{\"random_policy\" : {}}]" + + "}"; + + ConfigOrError configOrError = + provider.parseLoadBalancingPolicyConfig(parseJsonObject(invalidChildPolicyConfig)); + assertThat(configOrError.getError()).isNotNull(); + + Status error = configOrError.getError(); + assertThat(error.getCode()).isEqualTo(Status.Code.INTERNAL); + assertThat(error.getDescription()).isEqualTo( + "Failed to parse child in random_subsetting, LB policy config=" + + "{subsetSize=3.0, childPolicy=[{random_policy={}}]}"); + assertThat(error.getCause().getMessage()).contains( + "INTERNAL: None of [random_policy] specified by Service Config are available."); + } + + @Test + public void parseValidConfig() throws IOException { + String validConfig = + "{" + + "\"subsetSize\": 3, " + + "\"childPolicy\" : [{\"round_robin\" : {}}]" + + "}"; + ConfigOrError configOrError = + provider.parseLoadBalancingPolicyConfig(parseJsonObject(validConfig)); + assertThat(configOrError.getConfig()).isNotNull(); + + RandomSubsettingLoadBalancerConfig actualConfig = + (RandomSubsettingLoadBalancerConfig) configOrError.getConfig(); + assertThat(GracefulSwitchLoadBalancerAccessor.getChildProvider( + actualConfig.childConfig).getPolicyName()).isEqualTo("round_robin"); + assertThat(actualConfig.subsetSize).isEqualTo(3); + } + + @SuppressWarnings("unchecked") + private static Map parseJsonObject(String json) throws IOException { + return (Map) JsonParser.parse(json); + } +} diff --git a/util/src/test/java/io/grpc/util/RandomSubsettingLoadBalancerTest.java b/util/src/test/java/io/grpc/util/RandomSubsettingLoadBalancerTest.java new file mode 100644 index 00000000000..7e889379e0f --- /dev/null +++ b/util/src/test/java/io/grpc/util/RandomSubsettingLoadBalancerTest.java @@ -0,0 +1,327 @@ +/* + * Copyright 2025 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.util; + +import static com.google.common.truth.Truth.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import io.grpc.ConnectivityState; +import io.grpc.ConnectivityStateInfo; +import io.grpc.EquivalentAddressGroup; +import io.grpc.LoadBalancer; +import io.grpc.LoadBalancer.CreateSubchannelArgs; +import io.grpc.LoadBalancer.ResolvedAddresses; +import io.grpc.LoadBalancer.Subchannel; +import io.grpc.LoadBalancer.SubchannelStateListener; +import io.grpc.LoadBalancerProvider; +import io.grpc.Status; +import io.grpc.internal.TestUtils; +import io.grpc.util.RandomSubsettingLoadBalancer.RandomSubsettingLoadBalancerConfig; +import java.net.SocketAddress; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; +import org.mockito.stubbing.Answer; + +public class RandomSubsettingLoadBalancerTest { + @Rule + public final MockitoRule mockitoRule = MockitoJUnit.rule(); + + @Mock + private LoadBalancer.Helper mockHelper; + @Mock + private LoadBalancer mockChildLb; + @Mock + private SocketAddress mockSocketAddress; + + @Captor + private ArgumentCaptor resolvedAddrCaptor; + + private BackendDetails backendDetails; + + private RandomSubsettingLoadBalancer loadBalancer; + + private final LoadBalancerProvider mockChildLbProvider = + new TestUtils.StandardLoadBalancerProvider("foo_policy") { + @Override + public LoadBalancer newLoadBalancer(LoadBalancer.Helper helper) { + return mockChildLb; + } + }; + + private final LoadBalancerProvider roundRobinLbProvider = + new TestUtils.StandardLoadBalancerProvider("round_robin") { + @Override + public LoadBalancer newLoadBalancer(LoadBalancer.Helper helper) { + return new RoundRobinLoadBalancer(helper); + } + }; + + private Object newChildConfig(LoadBalancerProvider provider, Object config) { + return GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(provider, config); + } + + private RandomSubsettingLoadBalancerConfig createRandomSubsettingLbConfig( + int subsetSize, LoadBalancerProvider childLbProvider, Object childConfig) { + return new RandomSubsettingLoadBalancer.RandomSubsettingLoadBalancerConfig.Builder() + .setSubsetSize(subsetSize) + .setChildConfig(newChildConfig(childLbProvider, childConfig)) + .build(); + } + + private BackendDetails setupBackends(int backendCount) { + List servers = Lists.newArrayList(); + Map, Subchannel> subchannels = Maps.newLinkedHashMap(); + + for (int i = 0; i < backendCount; i++) { + SocketAddress addr = new FakeSocketAddress("server" + i); + EquivalentAddressGroup addressGroup = new EquivalentAddressGroup(addr); + servers.add(addressGroup); + Subchannel subchannel = mock(Subchannel.class); + subchannels.put(Arrays.asList(addressGroup), subchannel); + } + + return new BackendDetails(servers, subchannels); + } + + @Before + public void setUp() { + loadBalancer = new RandomSubsettingLoadBalancer(mockHelper); + + int backendSize = 5; + backendDetails = setupBackends(backendSize); + } + + @Test + public void handleNameResolutionError() { + int subsetSize = 2; + Object childConfig = "someConfig"; + + RandomSubsettingLoadBalancerConfig config = createRandomSubsettingLbConfig( + subsetSize, mockChildLbProvider, childConfig); + + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(ImmutableList.of(new EquivalentAddressGroup(mockSocketAddress))) + .setLoadBalancingPolicyConfig(config) + .build()); + + loadBalancer.handleNameResolutionError(Status.DEADLINE_EXCEEDED); + verify(mockChildLb).handleNameResolutionError(Status.DEADLINE_EXCEEDED); + } + + @Test + public void shutdown() { + int subsetSize = 2; + Object childConfig = "someConfig"; + + RandomSubsettingLoadBalancerConfig config = createRandomSubsettingLbConfig( + subsetSize, mockChildLbProvider, childConfig); + + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(ImmutableList.of(new EquivalentAddressGroup(mockSocketAddress))) + .setLoadBalancingPolicyConfig(config) + .build()); + + loadBalancer.shutdown(); + verify(mockChildLb).shutdown(); + } + + @Test + public void acceptResolvedAddresses_mockedChildLbPolicy() { + int subsetSize = 3; + Object childConfig = "someConfig"; + + RandomSubsettingLoadBalancerConfig config = createRandomSubsettingLbConfig( + subsetSize, mockChildLbProvider, childConfig); + + ResolvedAddresses resolvedAddresses = + ResolvedAddresses.newBuilder() + .setAddresses(ImmutableList.copyOf(backendDetails.servers)) + .setLoadBalancingPolicyConfig(config) + .build(); + + loadBalancer.acceptResolvedAddresses(resolvedAddresses); + + verify(mockChildLb).acceptResolvedAddresses(resolvedAddrCaptor.capture()); + assertThat(resolvedAddrCaptor.getValue().getAddresses().size()).isEqualTo(subsetSize); + assertThat(resolvedAddrCaptor.getValue().getLoadBalancingPolicyConfig()).isEqualTo(childConfig); + } + + @Test + public void acceptResolvedAddresses_roundRobinChildLbPolicy() { + int subsetSize = 3; + Object childConfig = null; + + RandomSubsettingLoadBalancerConfig config = createRandomSubsettingLbConfig( + subsetSize, roundRobinLbProvider, childConfig); + + ResolvedAddresses resolvedAddresses = + ResolvedAddresses.newBuilder() + .setAddresses(ImmutableList.copyOf(backendDetails.servers)) + .setLoadBalancingPolicyConfig(config) + .build(); + + loadBalancer.acceptResolvedAddresses(resolvedAddresses); + + int insubset = 0; + for (Subchannel subchannel : backendDetails.subchannels.values()) { + LoadBalancer.SubchannelStateListener ssl = + backendDetails.subchannelStateListeners.get(subchannel); + if (ssl != null) { // it might be null if it's not in the subset. + insubset += 1; + ssl.onSubchannelState(ConnectivityStateInfo.forNonError(ConnectivityState.READY)); + } + } + + assertThat(insubset).isEqualTo(subsetSize); + } + + // verifies: https://github.com/grpc/proposal/blob/master/A68_graphics/subsetting100-100-5.png + @Test + public void backendsCanBeDistributedEvenly_subsetting100_100_5() { + verifyConnectionsByServer(100, 100, 5, 15); + } + + // verifies https://github.com/grpc/proposal/blob/master/A68_graphics/subsetting100-100-25.png + @Test + public void backendsCanBeDistributedEvenly_subsetting100_100_25() { + verifyConnectionsByServer(100, 100, 25, 40); + } + + // verifies: https://github.com/grpc/proposal/blob/master/A68_graphics/subsetting100-10-5.png + @Test + public void backendsCanBeDistributedEvenly_subsetting100_10_5() { + verifyConnectionsByServer(100, 10, 5, 70); + } + + // verifies: https://github.com/grpc/proposal/blob/master/A68_graphics/subsetting500-10-5.png + @Test + public void backendsCanBeDistributedEvenly_subsetting500_10_5() { + verifyConnectionsByServer(500, 10, 5, 600); + } + + // verifies: https://github.com/grpc/proposal/blob/master/A68_graphics/subsetting2000-10-5.png + @Test + public void backendsCanBeDistributedEvenly_subsetting2000_100_5() { + verifyConnectionsByServer(2000, 10, 5, 1200); + } + + public void verifyConnectionsByServer( + int clientsCount, int serversCount, int subsetSize, int expectedMaxConnections) { + backendDetails = setupBackends(serversCount); + Object childConfig = "someConfig"; + + List configs = Lists.newArrayList(); + for (int i = 0; i < clientsCount; i++) { + configs.add(createRandomSubsettingLbConfig(subsetSize, mockChildLbProvider, childConfig)); + } + + Map connectionsByServer = Maps.newLinkedHashMap(); + + for (RandomSubsettingLoadBalancerConfig config : configs) { + ResolvedAddresses resolvedAddresses = + ResolvedAddresses.newBuilder() + .setAddresses(ImmutableList.copyOf(backendDetails.servers)) + .setLoadBalancingPolicyConfig(config) + .build(); + + loadBalancer = new RandomSubsettingLoadBalancer(mockHelper); + loadBalancer.acceptResolvedAddresses(resolvedAddresses); + + verify(mockChildLb, atLeastOnce()).acceptResolvedAddresses(resolvedAddrCaptor.capture()); + // Verify ChildLB is only getting subsetSize ResolvedAddresses each time + assertThat(resolvedAddrCaptor.getValue().getAddresses().size()).isEqualTo(config.subsetSize); + + for (EquivalentAddressGroup eag : resolvedAddrCaptor.getValue().getAddresses()) { + for (SocketAddress addr : eag.getAddresses()) { + Integer prev = connectionsByServer.getOrDefault(addr, 0); + connectionsByServer.put(addr, prev + 1); + } + } + } + + int maxConnections = Collections.max(connectionsByServer.values()); + + assertThat(maxConnections).isAtMost(expectedMaxConnections); + } + + private class BackendDetails { + private final List servers; + private final Map, Subchannel> subchannels; + private final Map subchannelStateListeners; + + BackendDetails(List servers, + Map, Subchannel> subchannels) { + this.servers = servers; + this.subchannels = subchannels; + this.subchannelStateListeners = Maps.newLinkedHashMap(); + + when(mockHelper.createSubchannel(any(LoadBalancer.CreateSubchannelArgs.class))).then( + new Answer() { + @Override + public Subchannel answer(InvocationOnMock invocation) throws Throwable { + CreateSubchannelArgs args = (CreateSubchannelArgs) invocation.getArguments()[0]; + final Subchannel subchannel = backendDetails.subchannels.get(args.getAddresses()); + when(subchannel.getAllAddresses()).thenReturn(args.getAddresses()); + when(subchannel.getAttributes()).thenReturn(args.getAttributes()); + doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + subchannelStateListeners.put(subchannel, + (SubchannelStateListener) invocation.getArguments()[0]); + return null; + } + }).when(subchannel).start(any(SubchannelStateListener.class)); + return subchannel; + } + }); + } + } + + private static class FakeSocketAddress extends SocketAddress { + final String name; + + FakeSocketAddress(String name) { + this.name = name; + } + + @Override + public String toString() { + return "FakeSocketAddress-" + name; + } + } +} diff --git a/xds/src/main/java/io/grpc/xds/LoadBalancerConfigFactory.java b/xds/src/main/java/io/grpc/xds/LoadBalancerConfigFactory.java index e08ea0fab43..c934eb5843e 100644 --- a/xds/src/main/java/io/grpc/xds/LoadBalancerConfigFactory.java +++ b/xds/src/main/java/io/grpc/xds/LoadBalancerConfigFactory.java @@ -32,6 +32,7 @@ import io.envoyproxy.envoy.extensions.load_balancing_policies.client_side_weighted_round_robin.v3.ClientSideWeightedRoundRobin; import io.envoyproxy.envoy.extensions.load_balancing_policies.least_request.v3.LeastRequest; import io.envoyproxy.envoy.extensions.load_balancing_policies.pick_first.v3.PickFirst; +import io.envoyproxy.envoy.extensions.load_balancing_policies.random_subsetting.v3.RandomSubsetting; import io.envoyproxy.envoy.extensions.load_balancing_policies.ring_hash.v3.RingHash; import io.envoyproxy.envoy.extensions.load_balancing_policies.round_robin.v3.RoundRobin; import io.envoyproxy.envoy.extensions.load_balancing_policies.wrr_locality.v3.WrrLocality; @@ -92,6 +93,9 @@ class LoadBalancerConfigFactory { static final String ERROR_UTILIZATION_PENALTY = "errorUtilizationPenalty"; + static final String RANDOM_SUBSETTING_FIELD_NAME = "random_subsetting"; + static final String SUBSET_SIZE = "subsetSize"; + /** * Factory method for creating a new {link LoadBalancerConfigConverter} for a given xDS {@link * Cluster}. @@ -200,6 +204,20 @@ class LoadBalancerConfigFactory { return ImmutableMap.of(PICK_FIRST_FIELD_NAME, configBuilder.buildOrThrow()); } + /** + * Builds a service config JSON object for the random_subsetting load balancer config based on the + * given config values. + */ + private static ImmutableMap buildRandomSubsettingConfig( + RandomSubsetting randomSubsetting) { + return ImmutableMap.of( + RANDOM_SUBSETTING_FIELD_NAME, + ImmutableMap.of( + SUBSET_SIZE, randomSubsetting.getSubsetSize(), + CHILD_POLICY_FIELD, randomSubsetting.getChildPolicy() + )); + } + /** * Responsible for converting from a {@code envoy.config.cluster.v3.LoadBalancingPolicy} proto * message to a gRPC service config format. @@ -236,6 +254,9 @@ static class LoadBalancingPolicyConverter { typedConfig.unpack(ClientSideWeightedRoundRobin.class)); } else if (typedConfig.is(PickFirst.class)) { serviceConfig = convertPickFirstConfig(typedConfig.unpack(PickFirst.class)); + } else if (typedConfig.is(RandomSubsetting.class)) { + serviceConfig = convertRandomSubsettingConfig( + typedConfig.unpack(RandomSubsetting.class)); } else if (typedConfig.is(com.github.xds.type.v3.TypedStruct.class)) { serviceConfig = convertCustomConfig( typedConfig.unpack(com.github.xds.type.v3.TypedStruct.class)); @@ -324,6 +345,14 @@ static class LoadBalancingPolicyConverter { return buildPickFirstConfig(pickFirst.getShuffleAddressList()); } + /** + * "Converts" a random_subsetting configuration to service config format. + */ + private static ImmutableMap convertRandomSubsettingConfig( + RandomSubsetting randomSubsetting) { + return buildRandomSubsettingConfig(randomSubsetting); + } + /** * Converts a least_request {@link Any} configuration to service config format. */ diff --git a/xds/third_party/envoy/src/main/proto/envoy/extensions/load_balancing_policies/random_subsetting/v3/random_subsetting.proto b/xds/third_party/envoy/src/main/proto/envoy/extensions/load_balancing_policies/random_subsetting/v3/random_subsetting.proto new file mode 100644 index 00000000000..0690397f770 --- /dev/null +++ b/xds/third_party/envoy/src/main/proto/envoy/extensions/load_balancing_policies/random_subsetting/v3/random_subsetting.proto @@ -0,0 +1,33 @@ +syntax = "proto3"; + +package envoy.extensions.load_balancing_policies.random_subsetting.v3; + +import "envoy/config/cluster/v3/cluster.proto"; + +import "google/protobuf/wrappers.proto"; + +import "udpa/annotations/status.proto"; +import "validate/validate.proto"; + +option java_package = "io.envoyproxy.envoy.extensions.load_balancing_policies.random_subsetting.v3"; +option java_outer_classname = "RandomSubsettingProto"; +option java_multiple_files = true; +option go_package = "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/random_subsetting/v3;random_subsettingv3"; +option (udpa.annotations.file_status).package_version_status = ACTIVE; + +// [#protodoc-title: Random Subsetting Load Balancing Policy] +// [#extension: envoy.load_balancing_policies.random_subsetting] + +message RandomSubsetting { + // subset_size indicates how many backends every client will be connected to. + // The value must be greater than 0. + google.protobuf.UInt32Value subset_size = 1 [ + (validate.rules).uint32 = {gt: 0} + ]; + + // The config for the child policy. + // The value is required. + config.cluster.v3.LoadBalancingPolicy child_policy = 2 [ + (validate.rules).message = {required: true} + ]; +}