diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/pool/AdvancedShardAwarenessIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/pool/AdvancedShardAwarenessIT.java index aeda4a02c7b..2c3f03aed5d 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/core/pool/AdvancedShardAwarenessIT.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/pool/AdvancedShardAwarenessIT.java @@ -51,7 +51,7 @@ public class AdvancedShardAwarenessIT { @ClassRule public static final CustomCcmRule CCM_RULE = - CustomCcmRule.builder().withNodes(2).withJvmArgs("--smp=3").build(); + CustomCcmRule.builder().withNodes(1).withJvmArgs("--smp=3").build(); public static ch.qos.logback.classic.Logger channelPoolLogger = (ch.qos.logback.classic.Logger) LoggerFactory.getLogger(ChannelPool.class); @@ -100,33 +100,22 @@ public void stopCapturingLogs() { public void should_initialize_all_channels(boolean reuseAddress) { int expectedChannelsPerNode = 6; // Divisible by smp String node1 = CCM_RULE.getCcmBridge().getNodeIpAddress(1); - String node2 = CCM_RULE.getCcmBridge().getNodeIpAddress(2); Pattern reconnectionPattern1 = Pattern.compile(".*" + Pattern.quote(node1) + ".*Scheduling next reconnection in.*"); - Pattern reconnectionPattern2 = - Pattern.compile(".*" + Pattern.quote(node2) + ".*Scheduling next reconnection in.*"); - Set forbiddenOccurences = - ImmutableSet.of(shardMismatchPattern, reconnectionPattern1, reconnectionPattern2); + Set forbiddenOccurences = ImmutableSet.of(shardMismatchPattern, reconnectionPattern1); Map expectedOccurences = ImmutableMap.of( Pattern.compile( - ".*" - + Pattern.quote(node1) - + ":19042.*Reconnection attempt complete, 6/6 channels.*"), - 1, - Pattern.compile( - ".*" - + Pattern.quote(node2) - + ":19042.*Reconnection attempt complete, 6/6 channels.*"), - 1); + ".*" + + Pattern.quote(node1) + + ":19042.*Reconnection attempt complete, 6/6 channels.*"), + 1); DriverConfigLoader loader = SessionUtils.configLoaderBuilder() .withBoolean(DefaultDriverOption.SOCKET_REUSE_ADDRESS, reuseAddress) .withBoolean(DefaultDriverOption.CONNECTION_ADVANCED_SHARD_AWARENESS_ENABLED, true) .withInt(DefaultDriverOption.ADVANCED_SHARD_AWARENESS_PORT_LOW, 10000) .withInt(DefaultDriverOption.ADVANCED_SHARD_AWARENESS_PORT_HIGH, 60000) - // Due to rounding up the connections per shard this will result in 6 connections per - // node .withInt(DefaultDriverOption.CONNECTION_POOL_LOCAL_SIZE, expectedChannelsPerNode) .build(); try (CqlSession session = @@ -149,13 +138,13 @@ public void should_initialize_all_channels(boolean reuseAddress) { @Test public void should_see_mismatched_shard() { - int expectedChannelsPerNode = 66; // Divisible by smp + int expectedChannelsPerNode = 33; // Divisible by smp DriverConfigLoader loader = SessionUtils.configLoaderBuilder() .withBoolean(DefaultDriverOption.CONNECTION_ADVANCED_SHARD_AWARENESS_ENABLED, true) .withInt(DefaultDriverOption.ADVANCED_SHARD_AWARENESS_PORT_LOW, 10000) .withInt(DefaultDriverOption.ADVANCED_SHARD_AWARENESS_PORT_HIGH, 60000) - .withInt(DefaultDriverOption.CONNECTION_POOL_LOCAL_SIZE, 66) + .withInt(DefaultDriverOption.CONNECTION_POOL_LOCAL_SIZE, expectedChannelsPerNode) .build(); try (CqlSession session = CqlSession.builder() @@ -176,13 +165,13 @@ public void should_see_mismatched_shard() { // There is no need to run this as a test, but it serves as a comparison @SuppressWarnings("unused") public void should_struggle_to_fill_pools() { - int expectedChannelsPerNode = 66; // Divisible by smp + int expectedChannelsPerNode = 33; // Divisible by smp DriverConfigLoader loader = SessionUtils.configLoaderBuilder() .withBoolean(DefaultDriverOption.CONNECTION_ADVANCED_SHARD_AWARENESS_ENABLED, false) - .withInt(DefaultDriverOption.CONNECTION_POOL_LOCAL_SIZE, 66) - .withDuration(DefaultDriverOption.RECONNECTION_BASE_DELAY, Duration.ofMillis(200)) - .withDuration(DefaultDriverOption.RECONNECTION_MAX_DELAY, Duration.ofMillis(4000)) + .withInt(DefaultDriverOption.CONNECTION_POOL_LOCAL_SIZE, expectedChannelsPerNode) + .withDuration(DefaultDriverOption.RECONNECTION_BASE_DELAY, Duration.ofMillis(300)) + .withDuration(DefaultDriverOption.RECONNECTION_MAX_DELAY, Duration.ofMillis(3200)) .build(); CqlSessionBuilder builder = CqlSession.builder() @@ -210,13 +199,13 @@ public void should_struggle_to_fill_pools() { @Test public void should_not_struggle_to_fill_pools() { - int expectedChannelsPerNode = 66; + int expectedChannelsPerNode = 33; DriverConfigLoader loader = SessionUtils.configLoaderBuilder() .withBoolean(DefaultDriverOption.CONNECTION_ADVANCED_SHARD_AWARENESS_ENABLED, true) .withInt(DefaultDriverOption.CONNECTION_POOL_LOCAL_SIZE, expectedChannelsPerNode) - .withDuration(DefaultDriverOption.RECONNECTION_BASE_DELAY, Duration.ofMillis(10)) - .withDuration(DefaultDriverOption.RECONNECTION_MAX_DELAY, Duration.ofMillis(20)) + .withDuration(DefaultDriverOption.RECONNECTION_BASE_DELAY, Duration.ofMillis(300)) + .withDuration(DefaultDriverOption.RECONNECTION_MAX_DELAY, Duration.ofMillis(3200)) .build(); CqlSessionBuilder builder = CqlSession.builder() @@ -239,29 +228,24 @@ public void should_not_struggle_to_fill_pools() { .until(() -> areAllPoolsFullyInitialized(allSessions, expectedChannelsPerNode)); int tolerance = 2; // Sometimes socket ends up already in use String node1 = CCM_RULE.getCcmBridge().getNodeIpAddress(1); - String node2 = CCM_RULE.getCcmBridge().getNodeIpAddress(2); Pattern reconnectionPattern1 = Pattern.compile(".*" + Pattern.quote(node1) + ".*Scheduling next reconnection in.*"); - Pattern reconnectionPattern2 = - Pattern.compile(".*" + Pattern.quote(node2) + ".*Scheduling next reconnection in.*"); Map expectedOccurences = ImmutableMap.of( Pattern.compile( - ".*" - + Pattern.quote(node1) - + ":19042.*Reconnection attempt complete, 66/66 channels.*"), - 1 * sessions, - Pattern.compile( - ".*" - + Pattern.quote(node2) - + ":19042.*Reconnection attempt complete, 66/66 channels.*"), - 1 * sessions); + ".*" + + Pattern.quote(node1) + + ":19042.*Reconnection attempt complete, " + + expectedChannelsPerNode + + "/" + + expectedChannelsPerNode + + " channels.*"), + sessions); List logsCopy = ImmutableList.copyOf(appender.list); expectedOccurences.forEach( (pattern, times) -> assertMatchesAtLeast(pattern, times, logsCopy)); assertNoLogMatches(shardMismatchPattern, logsCopy); assertMatchesAtMost(reconnectionPattern1, tolerance, logsCopy); - assertMatchesAtMost(reconnectionPattern2, tolerance, logsCopy); } }