getTrigger() {
+ return sink.asMono();
+ }
+ }
}
diff --git a/src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java b/src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java
index 99dabda101..f8eca5e292 100644
--- a/src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java
+++ b/src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java
@@ -28,6 +28,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
@@ -41,6 +42,7 @@
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.Subscription;
+import org.springframework.data.redis.connection.SubscriptionListener;
import org.springframework.data.redis.connection.util.ByteArrayWrapper;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@@ -62,13 +64,16 @@
* configured).
*
* Adding and removing listeners at the same time has undefined results. It is strongly recommended to synchronize/order
- * these methods accordingly.
+ * these methods accordingly. {@link MessageListener Listeners} that wish to receive subscription/unsubscription
+ * callbacks in response to subscribe/unsubscribe commands can implement {@link SubscriptionListener}.
*
* @author Costin Leau
* @author Jennifer Hickey
* @author Way Joke
* @author Thomas Darimont
* @author Mark Paluch
+ * @see MessageListener
+ * @see SubscriptionListener
*/
public class RedisMessageListenerContainer implements InitializingBean, DisposableBean, BeanNameAware, SmartLifecycle {
@@ -133,6 +138,7 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
private long maxSubscriptionRegistrationWaitingTime = DEFAULT_SUBSCRIPTION_REGISTRATION_WAIT_TIME;
+ @Override
public void afterPropertiesSet() {
if (taskExecutor == null) {
manageExecutor = true;
@@ -159,6 +165,7 @@ protected TaskExecutor createDefaultTaskExecutor() {
return new SimpleAsyncTaskExecutor(threadNamePrefix);
}
+ @Override
public void destroy() throws Exception {
initialized = false;
@@ -175,24 +182,29 @@ public void destroy() throws Exception {
}
}
+ @Override
public boolean isAutoStartup() {
return true;
}
+ @Override
public void stop(Runnable callback) {
stop();
callback.run();
}
+ @Override
public int getPhase() {
// start the latest
return Integer.MAX_VALUE;
}
+ @Override
public boolean isRunning() {
return running;
}
+ @Override
public void start() {
if (!running) {
running = true;
@@ -219,6 +231,7 @@ public void start() {
}
}
+ @Override
public void stop() {
if (isRunning()) {
running = false;
@@ -312,6 +325,7 @@ public void setConnectionFactory(RedisConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
}
+ @Override
public void setBeanName(String name) {
this.beanName = name;
}
@@ -958,7 +972,7 @@ void unsubscribePattern(byte[]... patterns) {
*
* @author Costin Leau
*/
- private class DispatchMessageListener implements MessageListener {
+ private class DispatchMessageListener implements MessageListener, SubscriptionListener {
@Override
public void onMessage(Message message, @Nullable byte[] pattern) {
@@ -977,6 +991,56 @@ public void onMessage(Message message, @Nullable byte[] pattern) {
dispatchMessage(listeners, message, pattern);
}
}
+
+ @Override
+ public void onChannelSubscribed(byte[] channel, long count) {
+ dispatchSubscriptionNotification(
+ channelMapping.getOrDefault(new ByteArrayWrapper(channel), Collections.emptyList()), channel, count,
+ SubscriptionListener::onChannelSubscribed);
+ }
+
+ @Override
+ public void onChannelUnsubscribed(byte[] channel, long count) {
+ dispatchSubscriptionNotification(
+ channelMapping.getOrDefault(new ByteArrayWrapper(channel), Collections.emptyList()), channel, count,
+ SubscriptionListener::onChannelUnsubscribed);
+ }
+
+ @Override
+ public void onPatternSubscribed(byte[] pattern, long count) {
+ dispatchSubscriptionNotification(
+ patternMapping.getOrDefault(new ByteArrayWrapper(pattern), Collections.emptyList()), pattern, count,
+ SubscriptionListener::onPatternSubscribed);
+ }
+
+ @Override
+ public void onPatternUnsubscribed(byte[] pattern, long count) {
+ dispatchSubscriptionNotification(
+ patternMapping.getOrDefault(new ByteArrayWrapper(pattern), Collections.emptyList()), pattern, count,
+ SubscriptionListener::onPatternUnsubscribed);
+ }
+ }
+
+ private void dispatchSubscriptionNotification(Collection listeners, byte[] pattern, long count,
+ SubscriptionConsumer listenerConsumer) {
+
+ if (!CollectionUtils.isEmpty(listeners)) {
+ byte[] source = pattern.clone();
+
+ for (MessageListener messageListener : listeners) {
+ if (messageListener instanceof SubscriptionListener) {
+ taskExecutor.execute(() -> listenerConsumer.accept((SubscriptionListener) messageListener, source, count));
+ }
+ }
+ }
+ }
+
+ /**
+ * Represents an operation that accepts three input arguments {@link SubscriptionListener},
+ * {@code channel or pattern}, and {@code count} and returns no result.
+ */
+ interface SubscriptionConsumer {
+ void accept(SubscriptionListener listener, byte[] channelOrPattern, long count);
}
private void dispatchMessage(Collection listeners, Message message, @Nullable byte[] pattern) {
diff --git a/src/test/java/org/springframework/data/redis/connection/jedis/JedisSubscriptionUnitTests.java b/src/test/java/org/springframework/data/redis/connection/jedis/JedisSubscriptionUnitTests.java
index 27e9b71c4e..beec82b75e 100644
--- a/src/test/java/org/springframework/data/redis/connection/jedis/JedisSubscriptionUnitTests.java
+++ b/src/test/java/org/springframework/data/redis/connection/jedis/JedisSubscriptionUnitTests.java
@@ -39,7 +39,7 @@
@ExtendWith(MockitoExtension.class)
class JedisSubscriptionUnitTests {
- @Mock BinaryJedisPubSub jedisPubSub;
+ @Mock JedisMessageListener jedisPubSub;
@Mock MessageListener listener;
diff --git a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveSubscriptionUnitTests.java b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveSubscriptionUnitTests.java
index 1ee50d8a0e..8efb4d30c5 100644
--- a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveSubscriptionUnitTests.java
+++ b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveSubscriptionUnitTests.java
@@ -21,6 +21,7 @@
import static org.springframework.data.redis.util.ByteUtils.*;
import io.lettuce.core.RedisConnectionException;
+import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.lettuce.core.pubsub.api.reactive.RedisPubSubReactiveCommands;
import reactor.core.Disposable;
import reactor.core.publisher.DirectProcessor;
@@ -40,6 +41,7 @@
import org.springframework.data.redis.RedisSystemException;
import org.springframework.data.redis.connection.ReactiveSubscription.Message;
import org.springframework.data.redis.connection.ReactiveSubscription.PatternMessage;
+import org.springframework.data.redis.connection.SubscriptionListener;
/**
* Unit tests for {@link LettuceReactiveSubscription}.
@@ -52,11 +54,14 @@ class LettuceReactiveSubscriptionUnitTests {
private LettuceReactiveSubscription subscription;
+ @Mock StatefulRedisPubSubConnection connectionMock;
@Mock RedisPubSubReactiveCommands commandsMock;
@BeforeEach
void before() {
- subscription = new LettuceReactiveSubscription(commandsMock, e -> new RedisSystemException(e.getMessage(), e));
+ when(connectionMock.reactive()).thenReturn(commandsMock);
+ subscription = new LettuceReactiveSubscription(mock(SubscriptionListener.class), connectionMock,
+ e -> new RedisSystemException(e.getMessage(), e));
}
@Test // DATAREDIS-612
diff --git a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceSubscriptionUnitTests.java b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceSubscriptionUnitTests.java
index 51a5131244..124cc9024b 100644
--- a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceSubscriptionUnitTests.java
+++ b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceSubscriptionUnitTests.java
@@ -18,10 +18,13 @@
import static org.assertj.core.api.Assertions.*;
import static org.mockito.Mockito.*;
+import io.lettuce.core.RedisFuture;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
+import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands;
import io.lettuce.core.pubsub.api.sync.RedisPubSubCommands;
import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -42,7 +45,9 @@ class LettuceSubscriptionUnitTests {
private StatefulRedisPubSubConnection pubsub;
- private RedisPubSubCommands asyncCommands;
+ private RedisPubSubCommands syncCommands;
+
+ private RedisPubSubAsyncCommands asyncCommands;
private LettuceConnectionProvider connectionProvider;
@@ -51,10 +56,12 @@ class LettuceSubscriptionUnitTests {
void setUp() {
pubsub = mock(StatefulRedisPubSubConnection.class);
- asyncCommands = mock(RedisPubSubCommands.class);
+ syncCommands = mock(RedisPubSubCommands.class);
+ asyncCommands = mock(RedisPubSubAsyncCommands.class);
connectionProvider = mock(LettuceConnectionProvider.class);
- when(pubsub.sync()).thenReturn(asyncCommands);
+ when(pubsub.sync()).thenReturn(syncCommands);
+ when(pubsub.async()).thenReturn(asyncCommands);
subscription = new LettuceSubscription(mock(MessageListener.class), pubsub, connectionProvider);
}
@@ -64,8 +71,8 @@ void testUnsubscribeAllAndClose() {
subscription.subscribe(new byte[][] { "a".getBytes() });
subscription.unsubscribe();
- verify(asyncCommands).unsubscribe();
- verify(asyncCommands, never()).punsubscribe();
+ verify(syncCommands).unsubscribe();
+ verify(syncCommands, never()).punsubscribe();
verify(connectionProvider).release(pubsub);
verify(pubsub).removeListener(any(LettuceMessageListener.class));
@@ -81,8 +88,8 @@ void testUnsubscribeAllChannelsWithPatterns() {
subscription.pSubscribe(new byte[][] { "s*".getBytes() });
subscription.unsubscribe();
- verify(asyncCommands).unsubscribe();
- verify(asyncCommands, never()).punsubscribe();
+ verify(syncCommands).unsubscribe();
+ verify(syncCommands, never()).punsubscribe();
assertThat(subscription.isAlive()).isTrue();
assertThat(subscription.getChannels()).isEmpty();
@@ -100,9 +107,9 @@ void testUnsubscribeChannelAndClose() {
subscription.subscribe(channel);
subscription.unsubscribe(channel);
- verify(asyncCommands).unsubscribe(channel);
- verify(asyncCommands, never()).unsubscribe();
- verify(asyncCommands, never()).punsubscribe();
+ verify(syncCommands).unsubscribe(channel);
+ verify(syncCommands, never()).unsubscribe();
+ verify(syncCommands, never()).punsubscribe();
verify(connectionProvider).release(pubsub);
verify(pubsub).removeListener(any(LettuceMessageListener.class));
@@ -119,9 +126,9 @@ void testUnsubscribeChannelSomeLeft() {
subscription.subscribe(channels);
subscription.unsubscribe(new byte[][] { "a".getBytes() });
- verify(asyncCommands).unsubscribe(new byte[][] { "a".getBytes() });
- verify(asyncCommands, never()).unsubscribe();
- verify(asyncCommands, never()).punsubscribe();
+ verify(syncCommands).unsubscribe(new byte[][] { "a".getBytes() });
+ verify(syncCommands, never()).unsubscribe();
+ verify(syncCommands, never()).punsubscribe();
assertThat(subscription.isAlive()).isTrue();
@@ -140,9 +147,9 @@ void testUnsubscribeChannelWithPatterns() {
subscription.pSubscribe(new byte[][] { "s*".getBytes() });
subscription.unsubscribe(channel);
- verify(asyncCommands).unsubscribe(channel);
- verify(asyncCommands, never()).unsubscribe();
- verify(asyncCommands, never()).punsubscribe();
+ verify(syncCommands).unsubscribe(channel);
+ verify(syncCommands, never()).unsubscribe();
+ verify(syncCommands, never()).punsubscribe();
assertThat(subscription.isAlive()).isTrue();
assertThat(subscription.getChannels()).isEmpty();
@@ -161,9 +168,9 @@ void testUnsubscribeChannelWithPatternsSomeLeft() {
subscription.pSubscribe(new byte[][] { "s*".getBytes() });
subscription.unsubscribe(channel);
- verify(asyncCommands).unsubscribe(channel);
- verify(asyncCommands, never()).unsubscribe();
- verify(asyncCommands, never()).punsubscribe();
+ verify(syncCommands).unsubscribe(channel);
+ verify(syncCommands, never()).unsubscribe();
+ verify(syncCommands, never()).punsubscribe();
assertThat(subscription.isAlive()).isTrue();
Collection channels = subscription.getChannels();
@@ -181,8 +188,8 @@ void testUnsubscribeAllNoChannels() {
subscription.pSubscribe(new byte[][] { "s*".getBytes() });
subscription.unsubscribe();
- verify(asyncCommands, never()).unsubscribe();
- verify(asyncCommands, never()).punsubscribe();
+ verify(syncCommands, never()).unsubscribe();
+ verify(syncCommands, never()).punsubscribe();
assertThat(subscription.isAlive()).isTrue();
assertThat(subscription.getChannels()).isEmpty();
@@ -204,8 +211,8 @@ void testUnsubscribeNotAlive() {
assertThat(subscription.isAlive()).isFalse();
subscription.unsubscribe();
- verify(asyncCommands).unsubscribe();
- verify(asyncCommands, never()).punsubscribe();
+ verify(syncCommands).unsubscribe();
+ verify(syncCommands, never()).punsubscribe();
}
@Test
@@ -226,8 +233,8 @@ void testPUnsubscribeAllAndClose() {
subscription.pSubscribe(new byte[][] { "a*".getBytes() });
subscription.pUnsubscribe();
- verify(asyncCommands, never()).unsubscribe();
- verify(asyncCommands).punsubscribe();
+ verify(syncCommands, never()).unsubscribe();
+ verify(syncCommands).punsubscribe();
verify(connectionProvider).release(pubsub);
verify(pubsub).removeListener(any(LettuceMessageListener.class));
@@ -243,8 +250,8 @@ void testPUnsubscribeAllPatternsWithChannels() {
subscription.pSubscribe(new byte[][] { "s*".getBytes() });
subscription.pUnsubscribe();
- verify(asyncCommands, never()).unsubscribe();
- verify(asyncCommands).punsubscribe();
+ verify(syncCommands, never()).unsubscribe();
+ verify(syncCommands).punsubscribe();
assertThat(subscription.isAlive()).isTrue();
assertThat(subscription.getPatterns()).isEmpty();
@@ -262,9 +269,9 @@ void testPUnsubscribeAndClose() {
subscription.pSubscribe(pattern);
subscription.pUnsubscribe(pattern);
- verify(asyncCommands, never()).unsubscribe();
- verify(asyncCommands, never()).punsubscribe();
- verify(asyncCommands).punsubscribe(pattern);
+ verify(syncCommands, never()).unsubscribe();
+ verify(syncCommands, never()).punsubscribe();
+ verify(syncCommands).punsubscribe(pattern);
verify(connectionProvider).release(pubsub);
verify(pubsub).removeListener(any(LettuceMessageListener.class));
@@ -280,9 +287,9 @@ void testPUnsubscribePatternSomeLeft() {
subscription.pSubscribe(patterns);
subscription.pUnsubscribe(new byte[][] { "a*".getBytes() });
- verify(asyncCommands).punsubscribe(new byte[][] { "a*".getBytes() });
- verify(asyncCommands, never()).unsubscribe();
- verify(asyncCommands, never()).punsubscribe();
+ verify(syncCommands).punsubscribe(new byte[][] { "a*".getBytes() });
+ verify(syncCommands, never()).unsubscribe();
+ verify(syncCommands, never()).punsubscribe();
assertThat(subscription.isAlive()).isTrue();
@@ -301,9 +308,9 @@ void testPUnsubscribePatternWithChannels() {
subscription.pSubscribe(pattern);
subscription.pUnsubscribe(pattern);
- verify(asyncCommands).punsubscribe(pattern);
- verify(asyncCommands, never()).unsubscribe();
- verify(asyncCommands, never()).punsubscribe();
+ verify(syncCommands).punsubscribe(pattern);
+ verify(syncCommands, never()).unsubscribe();
+ verify(syncCommands, never()).punsubscribe();
assertThat(subscription.isAlive()).isTrue();
assertThat(subscription.getPatterns()).isEmpty();
@@ -322,9 +329,9 @@ void testUnsubscribePatternWithChannelsSomeLeft() {
subscription.subscribe(new byte[][] { "a".getBytes() });
subscription.pUnsubscribe(pattern);
- verify(asyncCommands, never()).unsubscribe();
- verify(asyncCommands, never()).punsubscribe();
- verify(asyncCommands).punsubscribe(pattern);
+ verify(syncCommands, never()).unsubscribe();
+ verify(syncCommands, never()).punsubscribe();
+ verify(syncCommands).punsubscribe(pattern);
assertThat(subscription.isAlive()).isTrue();
@@ -343,8 +350,8 @@ void testPUnsubscribeAllNoPatterns() {
subscription.subscribe(new byte[][] { "s".getBytes() });
subscription.pUnsubscribe();
- verify(asyncCommands, never()).unsubscribe();
- verify(asyncCommands, never()).punsubscribe();
+ verify(syncCommands, never()).unsubscribe();
+ verify(syncCommands, never()).punsubscribe();
assertThat(subscription.isAlive()).isTrue();
assertThat(subscription.getPatterns()).isEmpty();
@@ -365,8 +372,8 @@ void testPUnsubscribeNotAlive() {
verify(connectionProvider).release(pubsub);
verify(pubsub).removeListener(any(LettuceMessageListener.class));
- verify(asyncCommands).unsubscribe();
- verify(asyncCommands, never()).punsubscribe();
+ verify(syncCommands).unsubscribe();
+ verify(syncCommands, never()).punsubscribe();
}
@Test
@@ -386,27 +393,41 @@ void testDoCloseNotSubscribed() {
subscription.doClose();
- verify(asyncCommands, never()).unsubscribe();
- verify(asyncCommands, never()).punsubscribe();
+ verify(syncCommands, never()).unsubscribe();
+ verify(syncCommands, never()).punsubscribe();
}
@Test
void testDoCloseSubscribedChannels() {
+ RedisFuture future = mock(RedisFuture.class);
+ when(future.toCompletableFuture()).thenReturn(CompletableFuture.completedFuture(null));
+
+ when(asyncCommands.unsubscribe()).thenReturn(future);
+ when(asyncCommands.ping()).thenReturn((RedisFuture) future);
+
subscription.subscribe(new byte[][] { "a".getBytes() });
subscription.doClose();
+ verify(asyncCommands).ping();
verify(asyncCommands).unsubscribe();
- verify(asyncCommands, never()).punsubscribe();
+ verifyNoMoreInteractions(asyncCommands);
}
@Test
void testDoCloseSubscribedPatterns() {
+ RedisFuture future = mock(RedisFuture.class);
+ when(future.toCompletableFuture()).thenReturn(CompletableFuture.completedFuture(null));
+
+ when(asyncCommands.punsubscribe()).thenReturn(future);
+ when(asyncCommands.ping()).thenReturn((RedisFuture) future);
+
subscription.pSubscribe(new byte[][] { "a*".getBytes() });
subscription.doClose();
- verify(asyncCommands, never()).unsubscribe();
+ verify(asyncCommands).ping();
verify(asyncCommands).punsubscribe();
+ verifyNoMoreInteractions(asyncCommands);
}
}
diff --git a/src/test/java/org/springframework/data/redis/core/ReactiveRedisTemplateIntegrationTests.java b/src/test/java/org/springframework/data/redis/core/ReactiveRedisTemplateIntegrationTests.java
index 6b3cc932fe..d38205bc11 100644
--- a/src/test/java/org/springframework/data/redis/core/ReactiveRedisTemplateIntegrationTests.java
+++ b/src/test/java/org/springframework/data/redis/core/ReactiveRedisTemplateIntegrationTests.java
@@ -19,6 +19,7 @@
import static org.assertj.core.api.Assumptions.*;
import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import java.time.Duration;
@@ -27,6 +28,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.function.Function;
import org.junit.jupiter.api.BeforeEach;
@@ -441,7 +443,29 @@ void listenToChannelShouldReceiveChannelMessagesCorrectly() throws InterruptedEx
redisTemplate.listenToChannel(channel).as(StepVerifier::create) //
.thenAwait(Duration.ofMillis(500)) // just make sure we the subscription completed
- .then(() -> redisTemplate.convertAndSend(channel, message).block()) //
+ .then(() -> redisTemplate.convertAndSend(channel, message).subscribe()) //
+ .assertNext(received -> {
+
+ assertThat(received).isInstanceOf(ChannelMessage.class);
+ assertThat(received.getMessage()).isEqualTo(message);
+ assertThat(received.getChannel()).isEqualTo(channel);
+ }) //
+ .thenAwait(Duration.ofMillis(10)) //
+ .thenCancel() //
+ .verify(Duration.ofSeconds(3));
+ }
+
+ @ParameterizedRedisTest // GH-1622
+ @EnabledIfLongRunningTest
+ void listenToLaterChannelShouldReceiveChannelMessagesCorrectly() {
+
+ String channel = "my-channel";
+
+ V message = valueFactory.instance();
+
+ redisTemplate.listenToChannelLater(channel) //
+ .doOnNext(it -> redisTemplate.convertAndSend(channel, message).subscribe()).flatMapMany(Function.identity()) //
+ .as(StepVerifier::create) //
.assertNext(received -> {
assertThat(received).isInstanceOf(ChannelMessage.class);
@@ -455,7 +479,7 @@ void listenToChannelShouldReceiveChannelMessagesCorrectly() throws InterruptedEx
@ParameterizedRedisTest // DATAREDIS-612
@EnabledIfLongRunningTest
- void listenToChannelPatternShouldReceiveChannelMessagesCorrectly() throws InterruptedException {
+ void listenToPatternShouldReceiveChannelMessagesCorrectly() {
String channel = "my-channel";
String pattern = "my-*";
@@ -466,7 +490,32 @@ void listenToChannelPatternShouldReceiveChannelMessagesCorrectly() throws Interr
stream.as(StepVerifier::create) //
.thenAwait(Duration.ofMillis(500)) // just make sure we the subscription completed
- .then(() -> redisTemplate.convertAndSend(channel, message).block()) //
+ .then(() -> redisTemplate.convertAndSend(channel, message).subscribe()) //
+ .assertNext(received -> {
+
+ assertThat(received).isInstanceOf(PatternMessage.class);
+ assertThat(received.getMessage()).isEqualTo(message);
+ assertThat(received.getChannel()).isEqualTo(channel);
+ assertThat(((PatternMessage) received).getPattern()).isEqualTo(pattern);
+ }) //
+ .thenCancel() //
+ .verify(Duration.ofSeconds(3));
+ }
+
+ @ParameterizedRedisTest // GH-1622
+ @EnabledIfLongRunningTest
+ void listenToPatternLaterShouldReceiveChannelMessagesCorrectly() {
+
+ String channel = "my-channel";
+ String pattern = "my-*";
+
+ V message = valueFactory.instance();
+
+ Mono>> stream = redisTemplate.listenToPatternLater(pattern);
+
+ stream.doOnNext(it -> redisTemplate.convertAndSend(channel, message).subscribe()) //
+ .flatMapMany(Function.identity()) //
+ .as(StepVerifier::create) //
.assertNext(received -> {
assertThat(received).isInstanceOf(PatternMessage.class);
diff --git a/src/test/java/org/springframework/data/redis/core/ReactiveRedisTemplateUnitTests.java b/src/test/java/org/springframework/data/redis/core/ReactiveRedisTemplateUnitTests.java
index c3ae3d4e98..70ed2d3d90 100644
--- a/src/test/java/org/springframework/data/redis/core/ReactiveRedisTemplateUnitTests.java
+++ b/src/test/java/org/springframework/data/redis/core/ReactiveRedisTemplateUnitTests.java
@@ -71,7 +71,7 @@ void listenToShouldSubscribeToChannel() {
when(connectionMock.pubSubCommands()).thenReturn(pubSubCommands);
when(pubSubCommands.subscribe(any())).thenReturn(Mono.empty());
- when(pubSubCommands.createSubscription()).thenReturn(Mono.just(subscription));
+ when(pubSubCommands.createSubscription(any())).thenReturn(Mono.just(subscription));
when(subscription.receive()).thenReturn(Flux.create(sink -> {}));
ReactiveRedisTemplate template = new ReactiveRedisTemplate<>(connectionFactoryMock,
diff --git a/src/test/java/org/springframework/data/redis/listener/ReactiveRedisMessageListenerContainerIntegrationTests.java b/src/test/java/org/springframework/data/redis/listener/ReactiveRedisMessageListenerContainerIntegrationTests.java
index 56ea8bebec..141aeef41f 100644
--- a/src/test/java/org/springframework/data/redis/listener/ReactiveRedisMessageListenerContainerIntegrationTests.java
+++ b/src/test/java/org/springframework/data/redis/listener/ReactiveRedisMessageListenerContainerIntegrationTests.java
@@ -20,21 +20,30 @@
import reactor.core.Disposable;
import reactor.test.StepVerifier;
+import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Collection;
+import java.util.Collections;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
import java.util.function.Supplier;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
+import org.springframework.data.redis.connection.Message;
+import org.springframework.data.redis.connection.MessageListener;
+import org.springframework.data.redis.connection.ReactiveRedisConnection;
import org.springframework.data.redis.connection.ReactiveSubscription;
import org.springframework.data.redis.connection.ReactiveSubscription.ChannelMessage;
import org.springframework.data.redis.connection.ReactiveSubscription.PatternMessage;
import org.springframework.data.redis.connection.RedisConnection;
+import org.springframework.data.redis.connection.SubscriptionListener;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializationContext;
@@ -56,6 +65,7 @@ public class ReactiveRedisMessageListenerContainerIntegrationTests {
private final LettuceConnectionFactory connectionFactory;
private @Nullable RedisConnection connection;
+ private @Nullable ReactiveRedisConnection reactiveConnection;
/**
* @param connectionFactory
@@ -73,6 +83,7 @@ public static Collection