getTrigger() {
+			return sink.asMono();
+		}
+	}
 }
diff --git a/src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java b/src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java
index 99dabda101..f8eca5e292 100644
--- a/src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java
+++ b/src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java
@@ -28,6 +28,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+
 import org.springframework.beans.factory.BeanNameAware;
 import org.springframework.beans.factory.DisposableBean;
 import org.springframework.beans.factory.InitializingBean;
@@ -41,6 +42,7 @@
 import org.springframework.data.redis.connection.RedisConnection;
 import org.springframework.data.redis.connection.RedisConnectionFactory;
 import org.springframework.data.redis.connection.Subscription;
+import org.springframework.data.redis.connection.SubscriptionListener;
 import org.springframework.data.redis.connection.util.ByteArrayWrapper;
 import org.springframework.data.redis.serializer.RedisSerializer;
 import org.springframework.data.redis.serializer.StringRedisSerializer;
@@ -62,13 +64,16 @@
  * configured).
  * 
  * Adding and removing listeners at the same time has undefined results. It is strongly recommended to synchronize/order
- * these methods accordingly.
+ * these methods accordingly. {@link MessageListener Listeners} that wish to receive subscription/unsubscription
+ * callbacks in response to subscribe/unsubscribe commands can implement {@link SubscriptionListener}.
  *
  * @author Costin Leau
  * @author Jennifer Hickey
  * @author Way Joke
  * @author Thomas Darimont
  * @author Mark Paluch
+ * @see MessageListener
+ * @see SubscriptionListener
  */
 public class RedisMessageListenerContainer implements InitializingBean, DisposableBean, BeanNameAware, SmartLifecycle {
 
@@ -133,6 +138,7 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
 
 	private long maxSubscriptionRegistrationWaitingTime = DEFAULT_SUBSCRIPTION_REGISTRATION_WAIT_TIME;
 
+	@Override
 	public void afterPropertiesSet() {
 		if (taskExecutor == null) {
 			manageExecutor = true;
@@ -159,6 +165,7 @@ protected TaskExecutor createDefaultTaskExecutor() {
 		return new SimpleAsyncTaskExecutor(threadNamePrefix);
 	}
 
+	@Override
 	public void destroy() throws Exception {
 		initialized = false;
 
@@ -175,24 +182,29 @@ public void destroy() throws Exception {
 		}
 	}
 
+	@Override
 	public boolean isAutoStartup() {
 		return true;
 	}
 
+	@Override
 	public void stop(Runnable callback) {
 		stop();
 		callback.run();
 	}
 
+	@Override
 	public int getPhase() {
 		// start the latest
 		return Integer.MAX_VALUE;
 	}
 
+	@Override
 	public boolean isRunning() {
 		return running;
 	}
 
+	@Override
 	public void start() {
 		if (!running) {
 			running = true;
@@ -219,6 +231,7 @@ public void start() {
 		}
 	}
 
+	@Override
 	public void stop() {
 		if (isRunning()) {
 			running = false;
@@ -312,6 +325,7 @@ public void setConnectionFactory(RedisConnectionFactory connectionFactory) {
 		this.connectionFactory = connectionFactory;
 	}
 
+	@Override
 	public void setBeanName(String name) {
 		this.beanName = name;
 	}
@@ -958,7 +972,7 @@ void unsubscribePattern(byte[]... patterns) {
 	 *
 	 * @author Costin Leau
 	 */
-	private class DispatchMessageListener implements MessageListener {
+	private class DispatchMessageListener implements MessageListener, SubscriptionListener {
 
 		@Override
 		public void onMessage(Message message, @Nullable byte[] pattern) {
@@ -977,6 +991,56 @@ public void onMessage(Message message, @Nullable byte[] pattern) {
 				dispatchMessage(listeners, message, pattern);
 			}
 		}
+
+		@Override
+		public void onChannelSubscribed(byte[] channel, long count) {
+			dispatchSubscriptionNotification(
+					channelMapping.getOrDefault(new ByteArrayWrapper(channel), Collections.emptyList()), channel, count,
+					SubscriptionListener::onChannelSubscribed);
+		}
+
+		@Override
+		public void onChannelUnsubscribed(byte[] channel, long count) {
+			dispatchSubscriptionNotification(
+					channelMapping.getOrDefault(new ByteArrayWrapper(channel), Collections.emptyList()), channel, count,
+					SubscriptionListener::onChannelUnsubscribed);
+		}
+
+		@Override
+		public void onPatternSubscribed(byte[] pattern, long count) {
+			dispatchSubscriptionNotification(
+					patternMapping.getOrDefault(new ByteArrayWrapper(pattern), Collections.emptyList()), pattern, count,
+					SubscriptionListener::onPatternSubscribed);
+		}
+
+		@Override
+		public void onPatternUnsubscribed(byte[] pattern, long count) {
+			dispatchSubscriptionNotification(
+					patternMapping.getOrDefault(new ByteArrayWrapper(pattern), Collections.emptyList()), pattern, count,
+					SubscriptionListener::onPatternUnsubscribed);
+		}
+	}
+
+	private void dispatchSubscriptionNotification(Collection listeners, byte[] pattern, long count,
+			SubscriptionConsumer listenerConsumer) {
+
+		if (!CollectionUtils.isEmpty(listeners)) {
+			byte[] source = pattern.clone();
+
+			for (MessageListener messageListener : listeners) {
+				if (messageListener instanceof SubscriptionListener) {
+					taskExecutor.execute(() -> listenerConsumer.accept((SubscriptionListener) messageListener, source, count));
+				}
+			}
+		}
+	}
+
+	/**
+	 * Represents an operation that accepts three input arguments {@link SubscriptionListener},
+	 * {@code channel or pattern}, and {@code count} and returns no result.
+	 */
+	interface SubscriptionConsumer {
+		void accept(SubscriptionListener listener, byte[] channelOrPattern, long count);
 	}
 
 	private void dispatchMessage(Collection listeners, Message message, @Nullable byte[] pattern) {
diff --git a/src/test/java/org/springframework/data/redis/connection/jedis/JedisSubscriptionUnitTests.java b/src/test/java/org/springframework/data/redis/connection/jedis/JedisSubscriptionUnitTests.java
index 27e9b71c4e..beec82b75e 100644
--- a/src/test/java/org/springframework/data/redis/connection/jedis/JedisSubscriptionUnitTests.java
+++ b/src/test/java/org/springframework/data/redis/connection/jedis/JedisSubscriptionUnitTests.java
@@ -39,7 +39,7 @@
 @ExtendWith(MockitoExtension.class)
 class JedisSubscriptionUnitTests {
 
-	@Mock BinaryJedisPubSub jedisPubSub;
+	@Mock JedisMessageListener jedisPubSub;
 
 	@Mock MessageListener listener;
 
diff --git a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveSubscriptionUnitTests.java b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveSubscriptionUnitTests.java
index 1ee50d8a0e..8efb4d30c5 100644
--- a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveSubscriptionUnitTests.java
+++ b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveSubscriptionUnitTests.java
@@ -21,6 +21,7 @@
 import static org.springframework.data.redis.util.ByteUtils.*;
 
 import io.lettuce.core.RedisConnectionException;
+import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
 import io.lettuce.core.pubsub.api.reactive.RedisPubSubReactiveCommands;
 import reactor.core.Disposable;
 import reactor.core.publisher.DirectProcessor;
@@ -40,6 +41,7 @@
 import org.springframework.data.redis.RedisSystemException;
 import org.springframework.data.redis.connection.ReactiveSubscription.Message;
 import org.springframework.data.redis.connection.ReactiveSubscription.PatternMessage;
+import org.springframework.data.redis.connection.SubscriptionListener;
 
 /**
  * Unit tests for {@link LettuceReactiveSubscription}.
@@ -52,11 +54,14 @@ class LettuceReactiveSubscriptionUnitTests {
 
 	private LettuceReactiveSubscription subscription;
 
+	@Mock StatefulRedisPubSubConnection connectionMock;
 	@Mock RedisPubSubReactiveCommands commandsMock;
 
 	@BeforeEach
 	void before() {
-		subscription = new LettuceReactiveSubscription(commandsMock, e -> new RedisSystemException(e.getMessage(), e));
+		when(connectionMock.reactive()).thenReturn(commandsMock);
+		subscription = new LettuceReactiveSubscription(mock(SubscriptionListener.class), connectionMock,
+				e -> new RedisSystemException(e.getMessage(), e));
 	}
 
 	@Test // DATAREDIS-612
diff --git a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceSubscriptionUnitTests.java b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceSubscriptionUnitTests.java
index 51a5131244..124cc9024b 100644
--- a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceSubscriptionUnitTests.java
+++ b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceSubscriptionUnitTests.java
@@ -18,10 +18,13 @@
 import static org.assertj.core.api.Assertions.*;
 import static org.mockito.Mockito.*;
 
+import io.lettuce.core.RedisFuture;
 import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
+import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands;
 import io.lettuce.core.pubsub.api.sync.RedisPubSubCommands;
 
 import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -42,7 +45,9 @@ class LettuceSubscriptionUnitTests {
 
 	private StatefulRedisPubSubConnection pubsub;
 
-	private RedisPubSubCommands asyncCommands;
+	private RedisPubSubCommands syncCommands;
+
+	private RedisPubSubAsyncCommands asyncCommands;
 
 	private LettuceConnectionProvider connectionProvider;
 
@@ -51,10 +56,12 @@ class LettuceSubscriptionUnitTests {
 	void setUp() {
 
 		pubsub = mock(StatefulRedisPubSubConnection.class);
-		asyncCommands = mock(RedisPubSubCommands.class);
+		syncCommands = mock(RedisPubSubCommands.class);
+		asyncCommands = mock(RedisPubSubAsyncCommands.class);
 		connectionProvider = mock(LettuceConnectionProvider.class);
 
-		when(pubsub.sync()).thenReturn(asyncCommands);
+		when(pubsub.sync()).thenReturn(syncCommands);
+		when(pubsub.async()).thenReturn(asyncCommands);
 		subscription = new LettuceSubscription(mock(MessageListener.class), pubsub, connectionProvider);
 	}
 
@@ -64,8 +71,8 @@ void testUnsubscribeAllAndClose() {
 		subscription.subscribe(new byte[][] { "a".getBytes() });
 		subscription.unsubscribe();
 
-		verify(asyncCommands).unsubscribe();
-		verify(asyncCommands, never()).punsubscribe();
+		verify(syncCommands).unsubscribe();
+		verify(syncCommands, never()).punsubscribe();
 		verify(connectionProvider).release(pubsub);
 		verify(pubsub).removeListener(any(LettuceMessageListener.class));
 
@@ -81,8 +88,8 @@ void testUnsubscribeAllChannelsWithPatterns() {
 		subscription.pSubscribe(new byte[][] { "s*".getBytes() });
 		subscription.unsubscribe();
 
-		verify(asyncCommands).unsubscribe();
-		verify(asyncCommands, never()).punsubscribe();
+		verify(syncCommands).unsubscribe();
+		verify(syncCommands, never()).punsubscribe();
 
 		assertThat(subscription.isAlive()).isTrue();
 		assertThat(subscription.getChannels()).isEmpty();
@@ -100,9 +107,9 @@ void testUnsubscribeChannelAndClose() {
 		subscription.subscribe(channel);
 		subscription.unsubscribe(channel);
 
-		verify(asyncCommands).unsubscribe(channel);
-		verify(asyncCommands, never()).unsubscribe();
-		verify(asyncCommands, never()).punsubscribe();
+		verify(syncCommands).unsubscribe(channel);
+		verify(syncCommands, never()).unsubscribe();
+		verify(syncCommands, never()).punsubscribe();
 		verify(connectionProvider).release(pubsub);
 		verify(pubsub).removeListener(any(LettuceMessageListener.class));
 
@@ -119,9 +126,9 @@ void testUnsubscribeChannelSomeLeft() {
 		subscription.subscribe(channels);
 		subscription.unsubscribe(new byte[][] { "a".getBytes() });
 
-		verify(asyncCommands).unsubscribe(new byte[][] { "a".getBytes() });
-		verify(asyncCommands, never()).unsubscribe();
-		verify(asyncCommands, never()).punsubscribe();
+		verify(syncCommands).unsubscribe(new byte[][] { "a".getBytes() });
+		verify(syncCommands, never()).unsubscribe();
+		verify(syncCommands, never()).punsubscribe();
 
 		assertThat(subscription.isAlive()).isTrue();
 
@@ -140,9 +147,9 @@ void testUnsubscribeChannelWithPatterns() {
 		subscription.pSubscribe(new byte[][] { "s*".getBytes() });
 		subscription.unsubscribe(channel);
 
-		verify(asyncCommands).unsubscribe(channel);
-		verify(asyncCommands, never()).unsubscribe();
-		verify(asyncCommands, never()).punsubscribe();
+		verify(syncCommands).unsubscribe(channel);
+		verify(syncCommands, never()).unsubscribe();
+		verify(syncCommands, never()).punsubscribe();
 
 		assertThat(subscription.isAlive()).isTrue();
 		assertThat(subscription.getChannels()).isEmpty();
@@ -161,9 +168,9 @@ void testUnsubscribeChannelWithPatternsSomeLeft() {
 		subscription.pSubscribe(new byte[][] { "s*".getBytes() });
 		subscription.unsubscribe(channel);
 
-		verify(asyncCommands).unsubscribe(channel);
-		verify(asyncCommands, never()).unsubscribe();
-		verify(asyncCommands, never()).punsubscribe();
+		verify(syncCommands).unsubscribe(channel);
+		verify(syncCommands, never()).unsubscribe();
+		verify(syncCommands, never()).punsubscribe();
 		assertThat(subscription.isAlive()).isTrue();
 
 		Collection channels = subscription.getChannels();
@@ -181,8 +188,8 @@ void testUnsubscribeAllNoChannels() {
 		subscription.pSubscribe(new byte[][] { "s*".getBytes() });
 		subscription.unsubscribe();
 
-		verify(asyncCommands, never()).unsubscribe();
-		verify(asyncCommands, never()).punsubscribe();
+		verify(syncCommands, never()).unsubscribe();
+		verify(syncCommands, never()).punsubscribe();
 
 		assertThat(subscription.isAlive()).isTrue();
 		assertThat(subscription.getChannels()).isEmpty();
@@ -204,8 +211,8 @@ void testUnsubscribeNotAlive() {
 		assertThat(subscription.isAlive()).isFalse();
 
 		subscription.unsubscribe();
-		verify(asyncCommands).unsubscribe();
-		verify(asyncCommands, never()).punsubscribe();
+		verify(syncCommands).unsubscribe();
+		verify(syncCommands, never()).punsubscribe();
 	}
 
 	@Test
@@ -226,8 +233,8 @@ void testPUnsubscribeAllAndClose() {
 		subscription.pSubscribe(new byte[][] { "a*".getBytes() });
 		subscription.pUnsubscribe();
 
-		verify(asyncCommands, never()).unsubscribe();
-		verify(asyncCommands).punsubscribe();
+		verify(syncCommands, never()).unsubscribe();
+		verify(syncCommands).punsubscribe();
 		verify(connectionProvider).release(pubsub);
 		verify(pubsub).removeListener(any(LettuceMessageListener.class));
 
@@ -243,8 +250,8 @@ void testPUnsubscribeAllPatternsWithChannels() {
 		subscription.pSubscribe(new byte[][] { "s*".getBytes() });
 		subscription.pUnsubscribe();
 
-		verify(asyncCommands, never()).unsubscribe();
-		verify(asyncCommands).punsubscribe();
+		verify(syncCommands, never()).unsubscribe();
+		verify(syncCommands).punsubscribe();
 
 		assertThat(subscription.isAlive()).isTrue();
 		assertThat(subscription.getPatterns()).isEmpty();
@@ -262,9 +269,9 @@ void testPUnsubscribeAndClose() {
 		subscription.pSubscribe(pattern);
 		subscription.pUnsubscribe(pattern);
 
-		verify(asyncCommands, never()).unsubscribe();
-		verify(asyncCommands, never()).punsubscribe();
-		verify(asyncCommands).punsubscribe(pattern);
+		verify(syncCommands, never()).unsubscribe();
+		verify(syncCommands, never()).punsubscribe();
+		verify(syncCommands).punsubscribe(pattern);
 		verify(connectionProvider).release(pubsub);
 		verify(pubsub).removeListener(any(LettuceMessageListener.class));
 
@@ -280,9 +287,9 @@ void testPUnsubscribePatternSomeLeft() {
 		subscription.pSubscribe(patterns);
 		subscription.pUnsubscribe(new byte[][] { "a*".getBytes() });
 
-		verify(asyncCommands).punsubscribe(new byte[][] { "a*".getBytes() });
-		verify(asyncCommands, never()).unsubscribe();
-		verify(asyncCommands, never()).punsubscribe();
+		verify(syncCommands).punsubscribe(new byte[][] { "a*".getBytes() });
+		verify(syncCommands, never()).unsubscribe();
+		verify(syncCommands, never()).punsubscribe();
 
 		assertThat(subscription.isAlive()).isTrue();
 
@@ -301,9 +308,9 @@ void testPUnsubscribePatternWithChannels() {
 		subscription.pSubscribe(pattern);
 		subscription.pUnsubscribe(pattern);
 
-		verify(asyncCommands).punsubscribe(pattern);
-		verify(asyncCommands, never()).unsubscribe();
-		verify(asyncCommands, never()).punsubscribe();
+		verify(syncCommands).punsubscribe(pattern);
+		verify(syncCommands, never()).unsubscribe();
+		verify(syncCommands, never()).punsubscribe();
 
 		assertThat(subscription.isAlive()).isTrue();
 		assertThat(subscription.getPatterns()).isEmpty();
@@ -322,9 +329,9 @@ void testUnsubscribePatternWithChannelsSomeLeft() {
 		subscription.subscribe(new byte[][] { "a".getBytes() });
 		subscription.pUnsubscribe(pattern);
 
-		verify(asyncCommands, never()).unsubscribe();
-		verify(asyncCommands, never()).punsubscribe();
-		verify(asyncCommands).punsubscribe(pattern);
+		verify(syncCommands, never()).unsubscribe();
+		verify(syncCommands, never()).punsubscribe();
+		verify(syncCommands).punsubscribe(pattern);
 
 		assertThat(subscription.isAlive()).isTrue();
 
@@ -343,8 +350,8 @@ void testPUnsubscribeAllNoPatterns() {
 		subscription.subscribe(new byte[][] { "s".getBytes() });
 		subscription.pUnsubscribe();
 
-		verify(asyncCommands, never()).unsubscribe();
-		verify(asyncCommands, never()).punsubscribe();
+		verify(syncCommands, never()).unsubscribe();
+		verify(syncCommands, never()).punsubscribe();
 		assertThat(subscription.isAlive()).isTrue();
 		assertThat(subscription.getPatterns()).isEmpty();
 
@@ -365,8 +372,8 @@ void testPUnsubscribeNotAlive() {
 
 		verify(connectionProvider).release(pubsub);
 		verify(pubsub).removeListener(any(LettuceMessageListener.class));
-		verify(asyncCommands).unsubscribe();
-		verify(asyncCommands, never()).punsubscribe();
+		verify(syncCommands).unsubscribe();
+		verify(syncCommands, never()).punsubscribe();
 	}
 
 	@Test
@@ -386,27 +393,41 @@ void testDoCloseNotSubscribed() {
 
 		subscription.doClose();
 
-		verify(asyncCommands, never()).unsubscribe();
-		verify(asyncCommands, never()).punsubscribe();
+		verify(syncCommands, never()).unsubscribe();
+		verify(syncCommands, never()).punsubscribe();
 	}
 
 	@Test
 	void testDoCloseSubscribedChannels() {
 
+		RedisFuture future = mock(RedisFuture.class);
+		when(future.toCompletableFuture()).thenReturn(CompletableFuture.completedFuture(null));
+
+		when(asyncCommands.unsubscribe()).thenReturn(future);
+		when(asyncCommands.ping()).thenReturn((RedisFuture) future);
+
 		subscription.subscribe(new byte[][] { "a".getBytes() });
 		subscription.doClose();
 
+		verify(asyncCommands).ping();
 		verify(asyncCommands).unsubscribe();
-		verify(asyncCommands, never()).punsubscribe();
+		verifyNoMoreInteractions(asyncCommands);
 	}
 
 	@Test
 	void testDoCloseSubscribedPatterns() {
 
+		RedisFuture future = mock(RedisFuture.class);
+		when(future.toCompletableFuture()).thenReturn(CompletableFuture.completedFuture(null));
+
+		when(asyncCommands.punsubscribe()).thenReturn(future);
+		when(asyncCommands.ping()).thenReturn((RedisFuture) future);
+
 		subscription.pSubscribe(new byte[][] { "a*".getBytes() });
 		subscription.doClose();
 
-		verify(asyncCommands, never()).unsubscribe();
+		verify(asyncCommands).ping();
 		verify(asyncCommands).punsubscribe();
+		verifyNoMoreInteractions(asyncCommands);
 	}
 }
diff --git a/src/test/java/org/springframework/data/redis/core/ReactiveRedisTemplateIntegrationTests.java b/src/test/java/org/springframework/data/redis/core/ReactiveRedisTemplateIntegrationTests.java
index 6b3cc932fe..d38205bc11 100644
--- a/src/test/java/org/springframework/data/redis/core/ReactiveRedisTemplateIntegrationTests.java
+++ b/src/test/java/org/springframework/data/redis/core/ReactiveRedisTemplateIntegrationTests.java
@@ -19,6 +19,7 @@
 import static org.assertj.core.api.Assumptions.*;
 
 import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 import reactor.test.StepVerifier;
 
 import java.time.Duration;
@@ -27,6 +28,7 @@
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.function.Function;
 
 import org.junit.jupiter.api.BeforeEach;
 
@@ -441,7 +443,29 @@ void listenToChannelShouldReceiveChannelMessagesCorrectly() throws InterruptedEx
 
 		redisTemplate.listenToChannel(channel).as(StepVerifier::create) //
 				.thenAwait(Duration.ofMillis(500)) // just make sure we the subscription completed
-				.then(() -> redisTemplate.convertAndSend(channel, message).block()) //
+				.then(() -> redisTemplate.convertAndSend(channel, message).subscribe()) //
+				.assertNext(received -> {
+
+					assertThat(received).isInstanceOf(ChannelMessage.class);
+					assertThat(received.getMessage()).isEqualTo(message);
+					assertThat(received.getChannel()).isEqualTo(channel);
+				}) //
+				.thenAwait(Duration.ofMillis(10)) //
+				.thenCancel() //
+				.verify(Duration.ofSeconds(3));
+	}
+
+	@ParameterizedRedisTest // GH-1622
+	@EnabledIfLongRunningTest
+	void listenToLaterChannelShouldReceiveChannelMessagesCorrectly() {
+
+		String channel = "my-channel";
+
+		V message = valueFactory.instance();
+
+		redisTemplate.listenToChannelLater(channel) //
+				.doOnNext(it -> redisTemplate.convertAndSend(channel, message).subscribe()).flatMapMany(Function.identity()) //
+				.as(StepVerifier::create) //
 				.assertNext(received -> {
 
 					assertThat(received).isInstanceOf(ChannelMessage.class);
@@ -455,7 +479,7 @@ void listenToChannelShouldReceiveChannelMessagesCorrectly() throws InterruptedEx
 
 	@ParameterizedRedisTest // DATAREDIS-612
 	@EnabledIfLongRunningTest
-	void listenToChannelPatternShouldReceiveChannelMessagesCorrectly() throws InterruptedException {
+	void listenToPatternShouldReceiveChannelMessagesCorrectly() {
 
 		String channel = "my-channel";
 		String pattern = "my-*";
@@ -466,7 +490,32 @@ void listenToChannelPatternShouldReceiveChannelMessagesCorrectly() throws Interr
 
 		stream.as(StepVerifier::create) //
 				.thenAwait(Duration.ofMillis(500)) // just make sure we the subscription completed
-				.then(() -> redisTemplate.convertAndSend(channel, message).block()) //
+				.then(() -> redisTemplate.convertAndSend(channel, message).subscribe()) //
+				.assertNext(received -> {
+
+					assertThat(received).isInstanceOf(PatternMessage.class);
+					assertThat(received.getMessage()).isEqualTo(message);
+					assertThat(received.getChannel()).isEqualTo(channel);
+					assertThat(((PatternMessage) received).getPattern()).isEqualTo(pattern);
+				}) //
+				.thenCancel() //
+				.verify(Duration.ofSeconds(3));
+	}
+
+	@ParameterizedRedisTest // GH-1622
+	@EnabledIfLongRunningTest
+	void listenToPatternLaterShouldReceiveChannelMessagesCorrectly() {
+
+		String channel = "my-channel";
+		String pattern = "my-*";
+
+		V message = valueFactory.instance();
+
+		Mono>> stream = redisTemplate.listenToPatternLater(pattern);
+
+		stream.doOnNext(it -> redisTemplate.convertAndSend(channel, message).subscribe()) //
+				.flatMapMany(Function.identity()) //
+				.as(StepVerifier::create) //
 				.assertNext(received -> {
 
 					assertThat(received).isInstanceOf(PatternMessage.class);
diff --git a/src/test/java/org/springframework/data/redis/core/ReactiveRedisTemplateUnitTests.java b/src/test/java/org/springframework/data/redis/core/ReactiveRedisTemplateUnitTests.java
index c3ae3d4e98..70ed2d3d90 100644
--- a/src/test/java/org/springframework/data/redis/core/ReactiveRedisTemplateUnitTests.java
+++ b/src/test/java/org/springframework/data/redis/core/ReactiveRedisTemplateUnitTests.java
@@ -71,7 +71,7 @@ void listenToShouldSubscribeToChannel() {
 
 		when(connectionMock.pubSubCommands()).thenReturn(pubSubCommands);
 		when(pubSubCommands.subscribe(any())).thenReturn(Mono.empty());
-		when(pubSubCommands.createSubscription()).thenReturn(Mono.just(subscription));
+		when(pubSubCommands.createSubscription(any())).thenReturn(Mono.just(subscription));
 		when(subscription.receive()).thenReturn(Flux.create(sink -> {}));
 
 		ReactiveRedisTemplate template = new ReactiveRedisTemplate<>(connectionFactoryMock,
diff --git a/src/test/java/org/springframework/data/redis/listener/ReactiveRedisMessageListenerContainerIntegrationTests.java b/src/test/java/org/springframework/data/redis/listener/ReactiveRedisMessageListenerContainerIntegrationTests.java
index 56ea8bebec..141aeef41f 100644
--- a/src/test/java/org/springframework/data/redis/listener/ReactiveRedisMessageListenerContainerIntegrationTests.java
+++ b/src/test/java/org/springframework/data/redis/listener/ReactiveRedisMessageListenerContainerIntegrationTests.java
@@ -20,21 +20,30 @@
 import reactor.core.Disposable;
 import reactor.test.StepVerifier;
 
+import java.nio.ByteBuffer;
 import java.time.Duration;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
 import java.util.function.Supplier;
 
 import org.awaitility.Awaitility;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 
+import org.springframework.data.redis.connection.Message;
+import org.springframework.data.redis.connection.MessageListener;
+import org.springframework.data.redis.connection.ReactiveRedisConnection;
 import org.springframework.data.redis.connection.ReactiveSubscription;
 import org.springframework.data.redis.connection.ReactiveSubscription.ChannelMessage;
 import org.springframework.data.redis.connection.ReactiveSubscription.PatternMessage;
 import org.springframework.data.redis.connection.RedisConnection;
+import org.springframework.data.redis.connection.SubscriptionListener;
 import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
 import org.springframework.data.redis.core.ReactiveRedisTemplate;
 import org.springframework.data.redis.serializer.RedisSerializationContext;
@@ -56,6 +65,7 @@ public class ReactiveRedisMessageListenerContainerIntegrationTests {
 
 	private final LettuceConnectionFactory connectionFactory;
 	private @Nullable RedisConnection connection;
+	private @Nullable ReactiveRedisConnection reactiveConnection;
 
 	/**
 	 * @param connectionFactory
@@ -73,6 +83,7 @@ public static Collection