Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,14 @@ public RedisFuture<Void> ssubscribe(K... channels) {
@Override
@SuppressWarnings("unchecked")
public RedisFuture<Void> sunsubscribe(K... channels) {
// Mark these channels as intentionally unsubscribed to prevent auto-resubscription
StatefulRedisPubSubConnection<K, V> connection = getStatefulConnection();
if (connection instanceof StatefulRedisPubSubConnectionImpl) {
StatefulRedisPubSubConnectionImpl<K, V> impl = (StatefulRedisPubSubConnectionImpl<K, V>) connection;
for (K channel : channels) {
impl.markIntentionalUnsubscribe(channel);
}
}
return (RedisFuture<Void>) dispatch(commandBuilder.sunsubscribe(channels));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,14 @@ public Mono<Void> ssubscribe(K... shardChannels) {

@Override
public Mono<Void> sunsubscribe(K... shardChannels) {
// Mark these channels as intentionally unsubscribed to prevent auto-resubscription
StatefulRedisPubSubConnection<K, V> connection = getStatefulConnection();
if (connection instanceof StatefulRedisPubSubConnectionImpl) {
StatefulRedisPubSubConnectionImpl<K, V> impl = (StatefulRedisPubSubConnectionImpl<K, V>) connection;
for (K channel : shardChannels) {
impl.markIntentionalUnsubscribe(channel);
}
}
return createFlux(() -> commandBuilder.sunsubscribe(shardChannels)).then();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import io.lettuce.core.RedisChannelWriter;
import io.lettuce.core.RedisCommandExecutionException;
Expand Down Expand Up @@ -53,6 +55,8 @@ public class StatefulRedisPubSubConnectionImpl<K, V> extends StatefulRedisConnec

private final PubSubEndpoint<K, V> endpoint;

private final ShardedPubSubAutoResubscribeListener autoResubscribeListener;

/**
* Initialize a new connection.
*
Expand All @@ -67,6 +71,10 @@ public StatefulRedisPubSubConnectionImpl(PubSubEndpoint<K, V> endpoint, RedisCha
super(writer, endpoint, codec, timeout, DEFAULT_JSON_PARSER);
this.endpoint = endpoint;
endpoint.setConnectionState(getConnectionState());

// Add internal listener for auto-resubscription on sunsubscribe events
this.autoResubscribeListener = new ShardedPubSubAutoResubscribeListener();
endpoint.addListener(autoResubscribeListener);
}

/**
Expand Down Expand Up @@ -163,4 +171,48 @@ public void activated() {
}
}

/**
* Internal listener that handles automatic resubscription for sharded pub/sub channels when they are unsubscribed due to
* slot rebalancing.
*/
private class ShardedPubSubAutoResubscribeListener extends RedisPubSubAdapter<K, V> {

private final Set<K> intentionalUnsubscriptions = ConcurrentHashMap.newKeySet();

@Override
public void sunsubscribed(K shardChannel, long count) {
if (intentionalUnsubscriptions.remove(shardChannel)) {
return;
}

if (shardChannel != null) {
InternalLoggerFactory.getInstance(getClass()).debug(
"Triggering auto-resubscribe to generate MovedRedirectionEvent for shard channel: {}", shardChannel);
RedisFuture<Void> resubscribeResult = async().ssubscribe(shardChannel);
resubscribeResult.exceptionally(throwable -> {
InternalLoggerFactory.getInstance(getClass()).debug(
"Auto-resubscribe triggered cluster redirection for shard channel {}: {}", shardChannel,
throwable.getMessage());
return null;
});
}
}

/**
* Mark a channel as intentionally unsubscribed to prevent auto-resubscription
*/
public void markIntentionalUnsubscribe(K shardChannel) {
intentionalUnsubscriptions.add(shardChannel);
}

}

/**
* Mark a channel as intentionally unsubscribed to prevent auto-resubscription. This method is called by
* RedisPubSubAsyncCommandsImpl when sunsubscribe is explicitly called.
*/
public void markIntentionalUnsubscribe(K shardChannel) {
autoResubscribeListener.markIntentionalUnsubscribe(shardChannel);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -611,4 +611,68 @@ void echoAllowedInSubscriptionState() {
pubsub.unsubscribe(channel);
}

@Test
void autoResubscribeOnShardChannelUnsubscribed() throws Exception {
final BlockingQueue<String> subscribedChannels = LettuceFactories.newBlockingQueue();
final BlockingQueue<String> unsubscribedChannels = LettuceFactories.newBlockingQueue();

RedisPubSubListener<String, String> listener = new RedisPubSubAdapter<String, String>() {

@Override
public void ssubscribed(String channel, long count) {
subscribedChannels.add(channel);
}

@Override
public void sunsubscribed(String channel, long count) {
unsubscribedChannels.add(channel);
}

};

pubsub.getStatefulConnection().addListener(listener);
pubsub.ssubscribe(shardChannel);

assertThat(subscribedChannels.take()).isEqualTo(shardChannel);

pubsub.sunsubscribe(shardChannel);

assertThat(unsubscribedChannels.take()).isEqualTo(shardChannel);
assertThat(subscribedChannels.poll(50, TimeUnit.MILLISECONDS)).isNull();

pubsub.getStatefulConnection().removeListener(listener);
}

@Test
void noAutoResubscribeOnIntentionalUnsubscribe() throws Exception {
final BlockingQueue<String> subscribedChannels = LettuceFactories.newBlockingQueue();
final BlockingQueue<String> unsubscribedChannels = LettuceFactories.newBlockingQueue();

RedisPubSubListener<String, String> listener = new RedisPubSubAdapter<String, String>() {

@Override
public void ssubscribed(String channel, long count) {
subscribedChannels.add(channel);
}

@Override
public void sunsubscribed(String channel, long count) {
unsubscribedChannels.add(channel);
}

};

pubsub.getStatefulConnection().addListener(listener);
pubsub.ssubscribe(shardChannel);

assertThat(subscribedChannels.take()).isEqualTo(shardChannel);

pubsub.sunsubscribe(shardChannel);
assertThat(unsubscribedChannels.take()).isEqualTo(shardChannel);

assertThat(subscribedChannels.poll(50, TimeUnit.MILLISECONDS)).isNull();

pubsub.getStatefulConnection().removeListener(listener);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
import static io.lettuce.TestTags.UNIT_TEST;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.*;

import java.lang.reflect.Field;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashSet;
Expand Down Expand Up @@ -122,4 +124,36 @@ void resubscribeChannelAndPatternAndShardChanelSubscription() {
assertInstanceOf(AsyncCommand.class, subscriptions.get(1));
}

@Test
void autoResubscribeListenerIsRegistered() {
connection.markIntentionalUnsubscribe("test-channel");
assertTrue(true);
}

@Test
void intentionalUnsubscribeBypassesAutoResubscribe() throws Exception {
connection.markIntentionalUnsubscribe("test-channel");

RedisPubSubListener<String, String> autoResubscribeListener = getAutoResubscribeListener(connection);

autoResubscribeListener.sunsubscribed("test-channel", 0);
verify(mockedWriter, never()).write(any(io.lettuce.core.protocol.RedisCommand.class));
}

@Test
void unintentionalUnsubscribeTriggersAutoResubscribe() throws Exception {
RedisPubSubListener<String, String> autoResubscribeListener = getAutoResubscribeListener(connection);

autoResubscribeListener.sunsubscribed("test-channel", 0);

verify(mockedWriter, times(1)).write(any(io.lettuce.core.protocol.RedisCommand.class));
}

@SuppressWarnings("unchecked")
private RedisPubSubListener<String, String> getAutoResubscribeListener(
StatefulRedisPubSubConnectionImpl<String, String> connection) throws Exception {
Field listenerField = StatefulRedisPubSubConnectionImpl.class.getDeclaredField("autoResubscribeListener");
listenerField.setAccessible(true);
return (RedisPubSubListener<String, String>) listenerField.get(connection);
}
}