diff --git a/pom.xml b/pom.xml index 5528f0768e..88d6d41d4d 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ org.springframework.data spring-data-redis - 2.6.0-SNAPSHOT + 2.6.0-GH-1622-SNAPSHOT Spring Data Redis diff --git a/src/main/asciidoc/new-features.adoc b/src/main/asciidoc/new-features.adoc index fcd9748c43..53d43c50b3 100644 --- a/src/main/asciidoc/new-features.adoc +++ b/src/main/asciidoc/new-features.adoc @@ -3,6 +3,11 @@ This section briefly covers items that are new and noteworthy in the latest releases. +[[new-in-2.6.0]] +== New in Spring Data Redis 2.6 + +* Support for `SubscriptionListener` when using `MessageListener` for subscription confirmation callbacks. `ReactiveRedisMessageListenerContainer` and `ReactiveRedisOperations` provide `receiveLater(…)` and `listenToLater(…)` methods to await until Redis acknowledges the subscription. + [[new-in-2.5.0]] == New in Spring Data Redis 2.5 diff --git a/src/main/asciidoc/reference/reactive-messaging.adoc b/src/main/asciidoc/reference/reactive-messaging.adoc index fa7af9728e..a8ec1c54c4 100644 --- a/src/main/asciidoc/reference/reactive-messaging.adoc +++ b/src/main/asciidoc/reference/reactive-messaging.adoc @@ -57,6 +57,21 @@ ReactiveRedisMessageListenerContainer container = new ReactiveRedisMessageListen Flux> stream = container.receive(ChannelTopic.of("my-channel")); ---- +To await and ensure proper subscription, you can use the `receiveLater` method that returns a `Mono>`. +The resulting `Mono` completes with an inner publisher as a result of completing the subscription to the given topics. By intercepting `onNext` signals, you can synchronize server-side subscriptions. + +[source,java] +---- +ReactiveRedisConnectionFactory factory = … +ReactiveRedisMessageListenerContainer container = new ReactiveRedisMessageListenerContainer(factory); + +Mono>> stream = container.receiveLater(ChannelTopic.of("my-channel")); + +stream.doOnNext(inner -> // notification hook when Redis subscriptions are synchronized with the server) + .flatMapMany(Function.identity()) + .…; +---- + [[redis:reactive:pubsub:subscribe:template]] === Subscribing via template API diff --git a/src/main/asciidoc/reference/redis-messaging.adoc b/src/main/asciidoc/reference/redis-messaging.adoc index e9b96e6f12..86da25dfa3 100644 --- a/src/main/asciidoc/reference/redis-messaging.adoc +++ b/src/main/asciidoc/reference/redis-messaging.adoc @@ -47,6 +47,8 @@ Due to its blocking nature, low-level subscription is not attractive, as it requ `RedisMessageListenerContainer` acts as a message listener container. It is used to receive messages from a Redis channel and drive the `MessageListener` instances that are injected into it. The listener container is responsible for all threading of message reception and dispatches into the listener for processing. A message listener container is the intermediary between an MDP and a messaging provider and takes care of registering to receive messages, resource acquisition and release, exception conversion, and the like. This lets you as an application developer write the (possibly complex) business logic associated with receiving a message (and reacting to it) and delegates boilerplate Redis infrastructure concerns to the framework. + A `MessageListener` can additionally implement `SubscriptionListener` to receive notifications upon subscription/unsubscribe confirmation. Listening to subscription notifications can be useful when synchronizing invocations. + Furthermore, to minimize the application footprint, `RedisMessageListenerContainer` lets one connection and one thread be shared by multiple listeners even though they do not share a subscription. Thus, no matter how many listeners or channels an application tracks, the runtime cost remains the same throughout its lifetime. Moreover, the container allows runtime configuration changes so that you can add or remove listeners while an application is running without the need for a restart. Additionally, the container uses a lazy subscription approach, using a `RedisConnection` only when needed. If all the listeners are unsubscribed, cleanup is automatically performed, and the thread is released. To help with the asynchronous nature of messages, the container requires a `java.util.concurrent.Executor` (or Spring's `TaskExecutor`) for dispatching the messages. Depending on the load, the number of listeners, or the runtime environment, you should change or tweak the executor to better serve your needs. In particular, in managed environments (such as app servers), it is highly recommended to pick a proper `TaskExecutor` to take advantage of its runtime. diff --git a/src/main/java/org/springframework/data/redis/connection/MessageListener.java b/src/main/java/org/springframework/data/redis/connection/MessageListener.java index ebc5575436..069b42b1ed 100644 --- a/src/main/java/org/springframework/data/redis/connection/MessageListener.java +++ b/src/main/java/org/springframework/data/redis/connection/MessageListener.java @@ -18,11 +18,14 @@ import org.springframework.lang.Nullable; /** - * Listener of messages published in Redis. + * Listener of messages published in Redis. A MessageListener can implement {@link SubscriptionListener} to receive + * notifications for subscription states. * * @author Costin Leau * @author Christoph Strobl + * @see SubscriptionListener */ +@FunctionalInterface public interface MessageListener { /** diff --git a/src/main/java/org/springframework/data/redis/connection/ReactivePubSubCommands.java b/src/main/java/org/springframework/data/redis/connection/ReactivePubSubCommands.java index bf2b860172..38ce433d99 100644 --- a/src/main/java/org/springframework/data/redis/connection/ReactivePubSubCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/ReactivePubSubCommands.java @@ -37,7 +37,18 @@ public interface ReactivePubSubCommands { * * @return the subscription. */ - Mono createSubscription(); + default Mono createSubscription() { + return createSubscription(SubscriptionListener.EMPTY); + } + + /** + * Creates a subscription for this connection. Connections can have multiple {@link ReactiveSubscription}s. + * + * @param subscriptionListener the subscription listener to listen for subscription confirmations. + * @return the subscription. + * @since 2.6 + */ + Mono createSubscription(SubscriptionListener subscriptionListener); /** * Publishes the given {@code message} to the given {@code channel}. diff --git a/src/main/java/org/springframework/data/redis/connection/SubscriptionListener.java b/src/main/java/org/springframework/data/redis/connection/SubscriptionListener.java new file mode 100644 index 0000000000..1f5f616a92 --- /dev/null +++ b/src/main/java/org/springframework/data/redis/connection/SubscriptionListener.java @@ -0,0 +1,66 @@ +/* + * Copyright 2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.redis.connection; + +/** + * Listener for subscription notifications. + *

+ * Subscription notifications are reported by Redis as confirmation for subscribe and unsubscribe operations for + * channels and patterns. + * + * @author Mark Paluch + * @since 2.6 + */ +public interface SubscriptionListener { + + /** + * Empty {@link SubscriptionListener}. + */ + SubscriptionListener EMPTY = new SubscriptionListener() {}; + + /** + * Notification when Redis has confirmed a channel subscription. + * + * @param channel name of the channel. + * @param count subscriber count. + */ + default void onChannelSubscribed(byte[] channel, long count) {} + + /** + * Notification when Redis has confirmed a channel un-subscription. + * + * @param channel name of the channel. + * @param count subscriber count. + */ + default void onChannelUnsubscribed(byte[] channel, long count) {} + + /** + * Notification when Redis has confirmed a pattern subscription. + * + * @param pattern the pattern. + * @param count subscriber count. + */ + default void onPatternSubscribed(byte[] pattern, long count) {} + + /** + * Notification when Redis has confirmed a pattern un-subscription. + * + * @param pattern the pattern. + * @param count subscriber count. + */ + default void onPatternUnsubscribed(byte[] pattern, long count) {} + +} diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterConnection.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterConnection.java index 6bd6fcbfa7..cde11d9924 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterConnection.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterConnection.java @@ -450,7 +450,7 @@ public void subscribe(MessageListener listener, byte[]... channels) { "Connection already subscribed; use the connection Subscription to cancel or add new channels"); } try { - BinaryJedisPubSub jedisPubSub = new JedisMessageListener(listener); + JedisMessageListener jedisPubSub = new JedisMessageListener(listener); subscription = new JedisSubscription(listener, jedisPubSub, channels, null); cluster.subscribe(jedisPubSub, channels); } catch (Exception ex) { @@ -466,7 +466,7 @@ public void pSubscribe(MessageListener listener, byte[]... patterns) { "Connection already subscribed; use the connection Subscription to cancel or add new channels"); } try { - BinaryJedisPubSub jedisPubSub = new JedisMessageListener(listener); + JedisMessageListener jedisPubSub = new JedisMessageListener(listener); subscription = new JedisSubscription(listener, jedisPubSub, null, patterns); cluster.psubscribe(jedisPubSub, patterns); } catch (Exception ex) { diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisConnection.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisConnection.java index 1524bff2dd..9a85639158 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisConnection.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisConnection.java @@ -715,7 +715,7 @@ public void pSubscribe(MessageListener listener, byte[]... patterns) { doWithJedis(it -> { - BinaryJedisPubSub jedisPubSub = new JedisMessageListener(listener); + JedisMessageListener jedisPubSub = new JedisMessageListener(listener); subscription = new JedisSubscription(listener, jedisPubSub, null, patterns); it.psubscribe(jedisPubSub, patterns); @@ -740,7 +740,7 @@ public void subscribe(MessageListener listener, byte[]... channels) { doWithJedis(it -> { - BinaryJedisPubSub jedisPubSub = new JedisMessageListener(listener); + JedisMessageListener jedisPubSub = new JedisMessageListener(listener); subscription = new JedisSubscription(listener, jedisPubSub, channels, null); it.subscribe(jedisPubSub, channels); diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisMessageListener.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisMessageListener.java index 6d8e06cf2f..114d967494 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisMessageListener.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisMessageListener.java @@ -15,24 +15,29 @@ */ package org.springframework.data.redis.connection.jedis; +import redis.clients.jedis.BinaryJedisPubSub; + import org.springframework.data.redis.connection.DefaultMessage; import org.springframework.data.redis.connection.MessageListener; +import org.springframework.data.redis.connection.SubscriptionListener; import org.springframework.util.Assert; -import redis.clients.jedis.BinaryJedisPubSub; - /** * MessageListener adapter on top of Jedis. * * @author Costin Leau + * @author Mark Paluch */ class JedisMessageListener extends BinaryJedisPubSub { private final MessageListener listener; + private final SubscriptionListener subscriptionListener; JedisMessageListener(MessageListener listener) { - Assert.notNull(listener, "message listener is required"); + Assert.notNull(listener, "MessageListener is required"); this.listener = listener; + this.subscriptionListener = listener instanceof SubscriptionListener ? (SubscriptionListener) listener + : SubscriptionListener.EMPTY; } public void onMessage(byte[] channel, byte[] message) { @@ -44,18 +49,18 @@ public void onPMessage(byte[] pattern, byte[] channel, byte[] message) { } public void onPSubscribe(byte[] pattern, int subscribedChannels) { - // no-op + subscriptionListener.onPatternSubscribed(pattern, subscribedChannels); } public void onPUnsubscribe(byte[] pattern, int subscribedChannels) { - // no-op + subscriptionListener.onPatternUnsubscribed(pattern, subscribedChannels); } public void onSubscribe(byte[] channel, int subscribedChannels) { - // no-op + subscriptionListener.onChannelSubscribed(channel, subscribedChannels); } public void onUnsubscribe(byte[] channel, int subscribedChannels) { - // no-op + subscriptionListener.onChannelUnsubscribed(channel, subscribedChannels); } } diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisSubscription.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisSubscription.java index 915c4dba42..5358b9f95b 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisSubscription.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisSubscription.java @@ -30,7 +30,7 @@ class JedisSubscription extends AbstractSubscription { private final BinaryJedisPubSub jedisPubSub; - JedisSubscription(MessageListener listener, BinaryJedisPubSub jedisPubSub, @Nullable byte[][] channels, + JedisSubscription(MessageListener listener, JedisMessageListener jedisPubSub, @Nullable byte[][] channels, @Nullable byte[][] patterns) { super(listener, channels, patterns); this.jedisPubSub = jedisPubSub; diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceByteBufferPubSubListenerWrapper.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceByteBufferPubSubListenerWrapper.java new file mode 100644 index 0000000000..03814c0461 --- /dev/null +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceByteBufferPubSubListenerWrapper.java @@ -0,0 +1,112 @@ +/* + * Copyright 2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.redis.connection.lettuce; + +import io.lettuce.core.pubsub.RedisPubSubListener; + +import java.nio.ByteBuffer; + +import org.springframework.lang.Nullable; +import org.springframework.util.Assert; + +/** + * Wrapper around {@link RedisPubSubListener} that converts {@link ByteBuffer} into {@code byte[]}. + * + * @author Mark Paluch + * @since 2.6 + */ +class LettuceByteBufferPubSubListenerWrapper implements RedisPubSubListener { + + private final RedisPubSubListener delegate; + + LettuceByteBufferPubSubListenerWrapper(RedisPubSubListener delegate) { + + Assert.notNull(delegate, "RedisPubSubListener must not be null!"); + + this.delegate = delegate; + } + + /* + * (non-Javadoc) + * @see io.lettuce.core.pubsub.RedisPubSubListener#message(java.lang.Object, java.lang.Object) + */ + public void message(ByteBuffer channel, ByteBuffer message) { + delegate.message(getBytes(channel), getBytes(message)); + } + + /* + * (non-Javadoc) + * @see io.lettuce.core.pubsub.RedisPubSubListener#message(java.lang.Object, java.lang.Object, java.lang.Object) + */ + public void message(ByteBuffer pattern, ByteBuffer channel, ByteBuffer message) { + delegate.message(getBytes(channel), getBytes(message), getBytes(pattern)); + } + + /* + * (non-Javadoc) + * @see io.lettuce.core.pubsub.RedisPubSubListener#subscribed(java.lang.Object, long) + */ + public void subscribed(ByteBuffer channel, long count) { + delegate.subscribed(getBytes(channel), count); + } + + /* + * (non-Javadoc) + * @see io.lettuce.core.pubsub.RedisPubSubListener#psubscribed(java.lang.Object, long) + */ + public void psubscribed(ByteBuffer pattern, long count) { + delegate.psubscribed(getBytes(pattern), count); + } + + /* + * (non-Javadoc) + * @see io.lettuce.core.pubsub.RedisPubSubListener#unsubscribed(java.lang.Object, long) + */ + public void unsubscribed(ByteBuffer channel, long count) { + delegate.unsubscribed(getBytes(channel), count); + } + + /* + * (non-Javadoc) + * @see io.lettuce.core.pubsub.RedisPubSubListener#punsubscribed(java.lang.Object, long) + */ + public void punsubscribed(ByteBuffer pattern, long count) { + delegate.punsubscribed(getBytes(pattern), count); + } + + /** + * Extract a byte array from {@link ByteBuffer} without consuming it. + * + * @param byteBuffer must not be {@literal null}. + * @return + */ + private static byte[] getBytes(@Nullable ByteBuffer byteBuffer) { + + if (byteBuffer == null) { + return new byte[0]; + } + + if (byteBuffer.hasArray()) { + return byteBuffer.array(); + } + + byteBuffer.mark(); + byte[] bytes = new byte[byteBuffer.remaining()]; + byteBuffer.get(bytes); + byteBuffer.reset(); + return bytes; + } +} diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceMessageListener.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceMessageListener.java index dafe89c22f..65bee4a656 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceMessageListener.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceMessageListener.java @@ -19,20 +19,27 @@ import org.springframework.data.redis.connection.DefaultMessage; import org.springframework.data.redis.connection.MessageListener; +import org.springframework.data.redis.connection.SubscriptionListener; import org.springframework.util.Assert; /** * MessageListener wrapper around Lettuce {@link RedisPubSubListener}. * * @author Costin Leau + * @author Mark Paluch */ class LettuceMessageListener implements RedisPubSubListener { private final MessageListener listener; + private final SubscriptionListener subscriptionListener; + + LettuceMessageListener(MessageListener listener, SubscriptionListener subscriptionListener) { - LettuceMessageListener(MessageListener listener) { Assert.notNull(listener, "MessageListener must not be null!"); + Assert.notNull(subscriptionListener, "SubscriptionListener must not be null!"); + this.listener = listener; + this.subscriptionListener = subscriptionListener; } /* @@ -55,23 +62,31 @@ public void message(byte[] pattern, byte[] channel, byte[] message) { * (non-Javadoc) * @see io.lettuce.core.pubsub.RedisPubSubListener#subscribed(java.lang.Object, long) */ - public void subscribed(byte[] channel, long count) {} + public void subscribed(byte[] channel, long count) { + subscriptionListener.onChannelSubscribed(channel, count); + } /* * (non-Javadoc) * @see io.lettuce.core.pubsub.RedisPubSubListener#psubscribed(java.lang.Object, long) */ - public void psubscribed(byte[] pattern, long count) {} + public void psubscribed(byte[] pattern, long count) { + subscriptionListener.onPatternSubscribed(pattern, count); + } /* * (non-Javadoc) * @see io.lettuce.core.pubsub.RedisPubSubListener#unsubscribed(java.lang.Object, long) */ - public void unsubscribed(byte[] channel, long count) {} + public void unsubscribed(byte[] channel, long count) { + subscriptionListener.onChannelUnsubscribed(channel, count); + } /* * (non-Javadoc) * @see io.lettuce.core.pubsub.RedisPubSubListener#punsubscribed(java.lang.Object, long) */ - public void punsubscribed(byte[] pattern, long count) {} + public void punsubscribed(byte[] pattern, long count) { + subscriptionListener.onPatternUnsubscribed(pattern, count); + } } diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactivePubSubCommands.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactivePubSubCommands.java index 45b8106c95..e10c53d72b 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactivePubSubCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactivePubSubCommands.java @@ -23,9 +23,11 @@ import java.util.function.Function; import org.reactivestreams.Publisher; + import org.springframework.data.redis.connection.ReactivePubSubCommands; import org.springframework.data.redis.connection.ReactiveSubscription; import org.springframework.data.redis.connection.ReactiveSubscription.ChannelMessage; +import org.springframework.data.redis.connection.SubscriptionListener; import org.springframework.util.Assert; /** @@ -43,13 +45,13 @@ class LettuceReactivePubSubCommands implements ReactivePubSubCommands { /* * (non-Javadoc) - * @see org.springframework.data.redis.connection.ReactivePubSubCommands#createSubscription() + * @see org.springframework.data.redis.connection.ReactivePubSubCommands#createSubscription(org.springframework.data.redis.connection.SubscriptionListener) */ @Override - public Mono createSubscription() { + public Mono createSubscription(SubscriptionListener listener) { return connection.getPubSubConnection() - .map(pubSubConnection -> new LettuceReactiveSubscription(pubSubConnection.reactive(), + .map(pubSubConnection -> new LettuceReactiveSubscription(listener, pubSubConnection, connection.translateException())); } diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveSubscription.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveSubscription.java index d4dc560384..12c782138c 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveSubscription.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveSubscription.java @@ -15,6 +15,7 @@ */ package org.springframework.data.redis.connection.lettuce; +import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; import io.lettuce.core.pubsub.api.reactive.RedisPubSubReactiveCommands; import reactor.core.Disposable; import reactor.core.publisher.ConnectableFlux; @@ -33,6 +34,7 @@ import java.util.function.Supplier; import org.springframework.data.redis.connection.ReactiveSubscription; +import org.springframework.data.redis.connection.SubscriptionListener; import org.springframework.lang.Nullable; import org.springframework.util.Assert; import org.springframework.util.ObjectUtils; @@ -46,15 +48,23 @@ */ class LettuceReactiveSubscription implements ReactiveSubscription { + private final LettuceByteBufferPubSubListenerWrapper listener; + private final StatefulRedisPubSubConnection connection; private final RedisPubSubReactiveCommands commands; private final State patternState; private final State channelState; - LettuceReactiveSubscription(RedisPubSubReactiveCommands commands, + LettuceReactiveSubscription(SubscriptionListener subscriptionListener, + StatefulRedisPubSubConnection connection, Function exceptionTranslator) { - this.commands = commands; + this.listener = new LettuceByteBufferPubSubListenerWrapper( + new LettuceMessageListener((messages, pattern) -> {}, subscriptionListener)); + this.connection = connection; + this.commands = connection.reactive(); + connection.addListener(listener); + this.patternState = new State(exceptionTranslator); this.channelState = new State(exceptionTranslator); } @@ -176,7 +186,12 @@ public Mono cancel() { channelState.terminate(); patternState.terminate(); - return Mono.empty(); + + // this is to ensure completion of the futures and result processing. Since we're unsubscribing first, we expect + // that we receive pub/sub confirmations before the PING response. + return commands.ping().then(Mono.fromRunnable(() -> { + connection.removeListener(listener); + })); })); } diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceSubscription.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceSubscription.java index c2fd9472b1..b8dfd37279 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceSubscription.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceSubscription.java @@ -16,9 +16,15 @@ package org.springframework.data.redis.connection.lettuce; import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; +import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands; import io.lettuce.core.pubsub.api.sync.RedisPubSubCommands; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; + import org.springframework.data.redis.connection.MessageListener; +import org.springframework.data.redis.connection.SubscriptionListener; import org.springframework.data.redis.connection.util.AbstractSubscription; /** @@ -37,6 +43,7 @@ public class LettuceSubscription extends AbstractSubscription { private final LettuceMessageListener listener; private final LettuceConnectionProvider connectionProvider; private final RedisPubSubCommands pubsub; + private final RedisPubSubAsyncCommands pubSubAsync; /** * Creates a new {@link LettuceSubscription} given {@link MessageListener}, {@link StatefulRedisPubSubConnection}, and @@ -52,9 +59,11 @@ protected LettuceSubscription(MessageListener listener, super(listener); this.connection = pubsubConnection; - this.listener = new LettuceMessageListener(listener); + this.listener = new LettuceMessageListener(listener, + listener instanceof SubscriptionListener ? (SubscriptionListener) listener : SubscriptionListener.EMPTY); this.connectionProvider = connectionProvider; this.pubsub = connection.sync(); + this.pubSubAsync = connection.async(); this.connection.addListener(this.listener); } @@ -70,15 +79,29 @@ protected StatefulRedisPubSubConnection getNativeConnection() { @Override protected void doClose() { + List> futures = new ArrayList<>(); + if (!getChannels().isEmpty()) { - doUnsubscribe(true); + futures.add(pubSubAsync.unsubscribe().toCompletableFuture()); } if (!getPatterns().isEmpty()) { - doPUnsubscribe(true); + futures.add(pubSubAsync.punsubscribe().toCompletableFuture()); + } + + if (!futures.isEmpty()) { + + // this is to ensure completion of the futures and result processing. Since we're unsubscribing first, we expect + // that we receive pub/sub confirmations before the PING response. + futures.add(pubSubAsync.ping().toCompletableFuture()); + + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).whenComplete((v, t) -> { + connection.removeListener(listener); + }); + } else { + connection.removeListener(listener); } - connection.removeListener(this.listener); connectionProvider.release(connection); } diff --git a/src/main/java/org/springframework/data/redis/core/ReactiveRedisOperations.java b/src/main/java/org/springframework/data/redis/core/ReactiveRedisOperations.java index 8e1dae0682..b2b7ada822 100644 --- a/src/main/java/org/springframework/data/redis/core/ReactiveRedisOperations.java +++ b/src/main/java/org/springframework/data/redis/core/ReactiveRedisOperations.java @@ -116,6 +116,49 @@ default Flux> listenToPattern(String... patterns) { */ Flux> listenTo(Topic... topics); + /** + * Subscribe to the given Redis {@code channels} and emit {@link Message messages} received for those. The + * {@link Mono} completes once the {@link Topic topic} subscriptions are registered. + * + * @param channels must not be {@literal null}. + * @return a hot sequence of {@link Message messages}. + * @since 2.6 + * @see org.springframework.data.redis.listener.ReactiveRedisMessageListenerContainer#receiveLater(ChannelTopic...) + */ + default Mono>> listenToChannelLater(String... channels) { + + Assert.notNull(channels, "Channels must not be null!"); + + return listenToLater(Arrays.stream(channels).map(ChannelTopic::of).toArray(ChannelTopic[]::new)); + } + + /** + * Subscribe to the Redis channels matching the given {@code pattern} and emit {@link Message messages} received for + * those. The {@link Mono} completes once the {@link Topic topic} subscriptions are registered. + * + * @param patterns must not be {@literal null}. + * @return a hot sequence of {@link Message messages}. + * @since 2.6 + * @see org.springframework.data.redis.listener.ReactiveRedisMessageListenerContainer#receiveLater(PatternTopic...) + */ + default Mono>> listenToPatternLater(String... patterns) { + + Assert.notNull(patterns, "Patterns must not be null!"); + return listenToLater(Arrays.stream(patterns).map(PatternTopic::of).toArray(PatternTopic[]::new)); + } + + /** + * Subscribe to the Redis channels for the given {@link Topic topics} and emit {@link Message messages} received for + * those. The {@link Mono} completes once the {@link Topic topic} subscriptions are registered. + * + * @param topics must not be {@literal null}. + * @return a hot sequence of {@link Message messages}. + * @since 2.6 + * @see org.springframework.data.redis.listener.ReactiveRedisMessageListenerContainer#receiveLater(Iterable, + * RedisSerializationContext.SerializationPair, RedisSerializationContext.SerializationPair) + */ + Mono>> listenToLater(Topic... topics); + // ------------------------------------------------------------------------- // Methods dealing with Redis Keys // ------------------------------------------------------------------------- diff --git a/src/main/java/org/springframework/data/redis/core/ReactiveRedisTemplate.java b/src/main/java/org/springframework/data/redis/core/ReactiveRedisTemplate.java index 202998fdd1..c9cf241c04 100644 --- a/src/main/java/org/springframework/data/redis/core/ReactiveRedisTemplate.java +++ b/src/main/java/org/springframework/data/redis/core/ReactiveRedisTemplate.java @@ -237,6 +237,21 @@ public Flux> listenTo(Topic... topics) { .doFinally((signalType) -> container.destroyLater().subscribe()); } + /* + * (non-Javadoc) + * @see org.springframework.data.redis.core.ReactiveRedisOperations#listenToLater(org.springframework.data.redis.listener.Topic[]) + */ + @Override + @SuppressWarnings({ "unchecked", "rawtypes" }) + public Mono>> listenToLater(Topic... topics) { + + ReactiveRedisMessageListenerContainer container = new ReactiveRedisMessageListenerContainer(getConnectionFactory()); + + return (Mono) container.receiveLater(Arrays.asList(topics), getSerializationContext().getStringSerializationPair(), + getSerializationContext().getValueSerializationPair()) // + .doFinally((signalType) -> container.destroyLater().subscribe()); + } + // ------------------------------------------------------------------------- // Methods dealing with Redis keys // ------------------------------------------------------------------------- diff --git a/src/main/java/org/springframework/data/redis/listener/ReactiveRedisMessageListenerContainer.java b/src/main/java/org/springframework/data/redis/listener/ReactiveRedisMessageListenerContainer.java index 78a6d0fdca..c6d574f9f4 100644 --- a/src/main/java/org/springframework/data/redis/listener/ReactiveRedisMessageListenerContainer.java +++ b/src/main/java/org/springframework/data/redis/listener/ReactiveRedisMessageListenerContainer.java @@ -17,16 +17,18 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.publisher.MonoProcessor; +import reactor.core.publisher.Sinks; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -40,9 +42,12 @@ import org.springframework.data.redis.connection.ReactiveSubscription.ChannelMessage; import org.springframework.data.redis.connection.ReactiveSubscription.Message; import org.springframework.data.redis.connection.ReactiveSubscription.PatternMessage; +import org.springframework.data.redis.connection.SubscriptionListener; +import org.springframework.data.redis.connection.util.ByteArrayWrapper; import org.springframework.data.redis.serializer.RedisElementReader; import org.springframework.data.redis.serializer.RedisSerializationContext.SerializationPair; import org.springframework.data.redis.serializer.RedisSerializer; +import org.springframework.data.redis.util.ByteUtils; import org.springframework.lang.Nullable; import org.springframework.util.Assert; import org.springframework.util.ObjectUtils; @@ -105,7 +110,7 @@ private Mono doDestroy() { return Mono.empty(); } - ReactiveRedisConnection connection = this.connection; + ReactiveRedisConnection connection = getRequiredConnection(); Flux terminationSignals = null; while (!subscriptions.isEmpty()) { @@ -135,7 +140,7 @@ private Mono doDestroy() { public Collection getActiveSubscriptions() { return subscriptions.entrySet().stream().filter(entry -> entry.getValue().hasRegistration()) - .map(entry -> entry.getKey()).collect(Collectors.toList()); + .map(Map.Entry::getKey).collect(Collectors.toList()); } /** @@ -156,6 +161,28 @@ public Flux> receive(ChannelTopic... channelTopics) { return receive(Arrays.asList(channelTopics), stringSerializationPair, stringSerializationPair); } + /** + * Subscribe to one or more {@link ChannelTopic}s and receive a stream of {@link ChannelMessage} once the returned + * {@link Mono} completes. Messages and channel names are treated as {@link String}. The message stream subscribes + * lazily to the Redis channels and unsubscribes if the inner {@link org.reactivestreams.Subscription} is + * {@link org.reactivestreams.Subscription#cancel() cancelled}. + *

+ * The returned {@link Mono} completes once the connection has been subscribed to the given {@link Topic topics}. Note + * that cancelling the returned {@link Mono} can leave the connection in a subscribed state. + * + * @param channelTopics the channels to subscribe. + * @return the message stream. + * @throws InvalidDataAccessApiUsageException if {@code patternTopics} is empty. + * @since 2.6 + */ + public Mono>> receiveLater(ChannelTopic... channelTopics) { + + Assert.notNull(channelTopics, "ChannelTopics must not be null!"); + Assert.noNullElements(channelTopics, "ChannelTopics must not contain null elements!"); + + return receiveLater(Arrays.asList(channelTopics), stringSerializationPair, stringSerializationPair); + } + /** * Subscribe to one or more {@link PatternTopic}s and receive a stream of {@link PatternMessage}. Messages, pattern, * and channel names are treated as {@link String}. The message stream subscribes lazily to the Redis channels and @@ -178,21 +205,89 @@ public Flux> receive(PatternTopic... patt } /** - * Subscribe to one or more {@link Topic}s and receive a stream of {@link ChannelMessage} The stream may contain + * Subscribe to one or more {@link PatternTopic}s and receive a stream of {@link PatternMessage} once the returned + * {@link Mono} completes. Messages, pattern, and channel names are treated as {@link String}. The message stream + * subscribes lazily to the Redis channels and unsubscribes if the inner {@link org.reactivestreams.Subscription} is + * {@link org.reactivestreams.Subscription#cancel() cancelled}. + *

+ * The returned {@link Mono} completes once the connection has been subscribed to the given {@link Topic topics}. Note + * that cancelling the returned {@link Mono} can leave the connection in a subscribed state. + * + * @param patternTopics the channels to subscribe. + * @return the message stream. + * @throws InvalidDataAccessApiUsageException if {@code patternTopics} is empty. + * @since 2.6 + */ + @SuppressWarnings("unchecked") + public Mono>> receiveLater(PatternTopic... patternTopics) { + + Assert.notNull(patternTopics, "PatternTopic must not be null!"); + Assert.noNullElements(patternTopics, "PatternTopic must not contain null elements!"); + + return receiveLater(Arrays.asList(patternTopics), stringSerializationPair, stringSerializationPair) + .map(it -> it.map(m -> (PatternMessage) m)); + } + + /** + * Subscribe to one or more {@link Topic}s and receive a stream of {@link ChannelMessage}. The stream may contain * {@link PatternMessage} if subscribed to patterns. Messages, and channel names are serialized/deserialized using the * given {@code channelSerializer} and {@code messageSerializer}. The message stream subscribes lazily to the Redis * channels and unsubscribes if the {@link org.reactivestreams.Subscription} is * {@link org.reactivestreams.Subscription#cancel() cancelled}. * - * @param topics the channels to subscribe. + * @param topics the channels/patterns to subscribe. + * @param subscriptionListener listener to receive subscription/unsubscription notifications. + * @return the message stream. + * @throws InvalidDataAccessApiUsageException if {@code patternTopics} is empty. + * @see #receive(Iterable, SerializationPair, SerializationPair) + * @since 2.6 + */ + public Flux> receive(Iterable topics, + SubscriptionListener subscriptionListener) { + return receive(topics, stringSerializationPair, stringSerializationPair, subscriptionListener); + } + + /** + * Subscribe to one or more {@link Topic}s and receive a stream of {@link ChannelMessage}. The stream may contain + * {@link PatternMessage} if subscribed to patterns. Messages, and channel names are serialized/deserialized using the + * given {@code channelSerializer} and {@code messageSerializer}. The message stream subscribes lazily to the Redis + * channels and unsubscribes if the {@link org.reactivestreams.Subscription} is + * {@link org.reactivestreams.Subscription#cancel() cancelled}. + * + * @param topics the channels/patterns to subscribe. * @return the message stream. * @see #receive(Iterable, SerializationPair, SerializationPair) * @throws InvalidDataAccessApiUsageException if {@code topics} is empty. */ public Flux> receive(Iterable topics, SerializationPair channelSerializer, SerializationPair messageSerializer) { + return receive(topics, channelSerializer, messageSerializer, SubscriptionListener.EMPTY); + } + + /** + * Subscribe to one or more {@link Topic}s and receive a stream of {@link ChannelMessage}. The stream may contain + * {@link PatternMessage} if subscribed to patterns. Messages, and channel names are serialized/deserialized using the + * given {@code channelSerializer} and {@code messageSerializer}. The message stream subscribes lazily to the Redis + * channels and unsubscribes if the {@link org.reactivestreams.Subscription} is + * {@link org.reactivestreams.Subscription#cancel() cancelled}. {@link SubscriptionListener} is notified upon + * subscription/unsubscription and can be used for synchronization. + * + * @param topics the channels to subscribe. + * @param channelSerializer serialization pair to decode the channel/pattern name. + * @param messageSerializer serialization pair to decode the message body. + * @param subscriptionListener listener to receive subscription/unsubscription notifications. + * @return the message stream. + * @see #receive(Iterable, SerializationPair, SerializationPair) + * @throws InvalidDataAccessApiUsageException if {@code topics} is empty. + * @since 2.6 + */ + public Flux> receive(Iterable topics, SerializationPair channelSerializer, + SerializationPair messageSerializer, SubscriptionListener subscriptionListener) { Assert.notNull(topics, "Topics must not be null!"); + Assert.notNull(channelSerializer, "Channel serializer must not be null!"); + Assert.notNull(messageSerializer, "Message serializer must not be null!"); + Assert.notNull(subscriptionListener, "SubscriptionListener must not be null!"); verifyConnection(); @@ -203,7 +298,8 @@ public Flux> receive(Iterable topics, Seri throw new InvalidDataAccessApiUsageException("No channels or patterns to subscribe to."); } - return doReceive(channelSerializer, messageSerializer, connection.pubSubCommands().createSubscription(), patterns, + return doReceive(channelSerializer, messageSerializer, + getRequiredConnection().pubSubCommands().createSubscription(subscriptionListener), patterns, channels); } @@ -215,7 +311,7 @@ private Flux> doReceive(SerializationPair channelSeriali Mono subscribe = subscribe(patterns, channels, it); - MonoProcessor> terminalProcessor = MonoProcessor.create(); + Sinks.One> terminalSink = Sinks.one(); return it.receive().mergeWith(subscribe.then(Mono.defer(() -> { getSubscribers(it).registered(); @@ -226,15 +322,77 @@ private Flux> doReceive(SerializationPair channelSeriali Subscribers subscribers = getSubscribers(it); if (subscribers.unregister()) { subscriptions.remove(it); - it.unsubscribe().subscribe(v -> terminalProcessor.onComplete(), terminalProcessor::onError); + it.cancel().subscribe(v -> terminalSink.tryEmitEmpty(), terminalSink::tryEmitError); } - }).mergeWith(terminalProcessor); + }).mergeWith(terminalSink.asMono()); }); return messageStream .map(message -> readMessage(channelSerializer.getReader(), messageSerializer.getReader(), message)); } + /** + * Subscribe to one or more {@link Topic}s and receive a stream of {@link ChannelMessage}. The returned {@link Mono} + * completes once the connection has been subscribed to the given {@link Topic topics}. Note that cancelling the + * returned {@link Mono} can leave the connection in a subscribed state. + * + * @param topics the channels to subscribe. + * @param channelSerializer serialization pair to decode the channel/pattern name. + * @param messageSerializer serialization pair to decode the message body. + * @return the message stream. + * @throws InvalidDataAccessApiUsageException if {@code topics} is empty. + * @since 2.6 + */ + public Mono>> receiveLater(Iterable topics, + SerializationPair channelSerializer, SerializationPair messageSerializer) { + + Assert.notNull(topics, "Topics must not be null!"); + Assert.notNull(channelSerializer, "Channel serializer must not be null!"); + Assert.notNull(messageSerializer, "Message serializer must not be null!"); + + verifyConnection(); + + ByteBuffer[] patterns = getTargets(topics, PatternTopic.class); + ByteBuffer[] channels = getTargets(topics, ChannelTopic.class); + + if (ObjectUtils.isEmpty(patterns) && ObjectUtils.isEmpty(channels)) { + throw new InvalidDataAccessApiUsageException("No channels or patterns to subscribe to."); + } + + return Mono.defer(() -> { + + SubscriptionReadyListener readyListener = SubscriptionReadyListener.create(topics, stringSerializationPair); + + return doReceiveLater(channelSerializer, messageSerializer, + getRequiredConnection().pubSubCommands().createSubscription(readyListener), patterns, channels) + .delayUntil(it -> readyListener.getTrigger()); + }); + } + + private Mono>> doReceiveLater(SerializationPair channelSerializer, + SerializationPair messageSerializer, Mono subscription, ByteBuffer[] patterns, + ByteBuffer[] channels) { + + return subscription.flatMap(it -> { + + Mono subscribe = subscribe(patterns, channels, it).doOnSuccess(v -> getSubscribers(it).registered()); + + Sinks.One> terminalSink = Sinks.one(); + + Flux> receiver = it.receive().doOnCancel(() -> { + + Subscribers subscribers = getSubscribers(it); + if (subscribers.unregister()) { + subscriptions.remove(it); + it.cancel().subscribe(v -> terminalSink.tryEmitEmpty(), terminalSink::tryEmitError); + } + }).mergeWith(terminalSink.asMono()) + .map(message -> readMessage(channelSerializer.getReader(), messageSerializer.getReader(), message)); + + return subscribe.then(Mono.just(receiver)); + }); + } + private static Mono subscribe(ByteBuffer[] patterns, ByteBuffer[] channels, ReactiveSubscription it) { Assert.isTrue(!ObjectUtils.isEmpty(channels) || !ObjectUtils.isEmpty(patterns), @@ -257,7 +415,7 @@ private static Mono subscribe(ByteBuffer[] patterns, ByteBuffer[] channels } } - return subscribe; + return subscribe == null ? Mono.empty() : subscribe; } private boolean isActive() { @@ -284,13 +442,12 @@ private ByteBuffer[] getTargets(Iterable topics, Class class .toArray(ByteBuffer[]::new); } - @SuppressWarnings("unchecked") private Message readMessage(RedisElementReader channelSerializer, RedisElementReader messageSerializer, Message message) { if (message instanceof PatternMessage) { - PatternMessage patternMessage = (PatternMessage) message; + PatternMessage patternMessage = (PatternMessage) message; String pattern = read(stringSerializationPair.getReader(), patternMessage.getPattern()); C channel = read(channelSerializer, patternMessage.getChannel()); @@ -305,6 +462,17 @@ private Message readMessage(RedisElementReader channelSerializer return new ChannelMessage<>(channel, body); } + private ReactiveRedisConnection getRequiredConnection() { + + ReactiveRedisConnection connection = this.connection; + + if (connection == null) { + throw new IllegalStateException("Connection no longer available"); + } + + return connection; + } + private static C read(RedisElementReader reader, ByteBuffer buffer) { try { @@ -362,4 +530,54 @@ boolean unregister() { return false; } } + + static class SubscriptionReadyListener extends AtomicBoolean implements SubscriptionListener { + + private final Set toSubscribe; + private final Sinks.Empty sink = Sinks.empty(); + + private SubscriptionReadyListener(Set topics) { + this.toSubscribe = topics; + } + + public static SubscriptionReadyListener create(Iterable topics, + SerializationPair serializationPair) { + + Set wrappers = new HashSet<>(); + + for (Topic topic : topics) { + wrappers.add(new ByteArrayWrapper(ByteUtils.getBytes(serializationPair.getWriter().write(topic.getTopic())))); + } + + return new SubscriptionReadyListener(wrappers); + } + + @Override + public void onChannelSubscribed(byte[] channel, long count) { + removeRemaining(channel); + } + + @Override + public void onPatternSubscribed(byte[] pattern, long count) { + removeRemaining(pattern); + } + + private void removeRemaining(byte[] channel) { + + boolean done; + + synchronized (toSubscribe) { + toSubscribe.remove(new ByteArrayWrapper(channel)); + done = toSubscribe.isEmpty(); + } + + if (done && compareAndSet(false, true)) { + sink.emitEmpty(Sinks.EmitFailureHandler.FAIL_FAST); + } + } + + public Mono getTrigger() { + return sink.asMono(); + } + } } diff --git a/src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java b/src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java index 99dabda101..f8eca5e292 100644 --- a/src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java +++ b/src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java @@ -28,6 +28,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; + import org.springframework.beans.factory.BeanNameAware; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; @@ -41,6 +42,7 @@ import org.springframework.data.redis.connection.RedisConnection; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.connection.Subscription; +import org.springframework.data.redis.connection.SubscriptionListener; import org.springframework.data.redis.connection.util.ByteArrayWrapper; import org.springframework.data.redis.serializer.RedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer; @@ -62,13 +64,16 @@ * configured). *

* Adding and removing listeners at the same time has undefined results. It is strongly recommended to synchronize/order - * these methods accordingly. + * these methods accordingly. {@link MessageListener Listeners} that wish to receive subscription/unsubscription + * callbacks in response to subscribe/unsubscribe commands can implement {@link SubscriptionListener}. * * @author Costin Leau * @author Jennifer Hickey * @author Way Joke * @author Thomas Darimont * @author Mark Paluch + * @see MessageListener + * @see SubscriptionListener */ public class RedisMessageListenerContainer implements InitializingBean, DisposableBean, BeanNameAware, SmartLifecycle { @@ -133,6 +138,7 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab private long maxSubscriptionRegistrationWaitingTime = DEFAULT_SUBSCRIPTION_REGISTRATION_WAIT_TIME; + @Override public void afterPropertiesSet() { if (taskExecutor == null) { manageExecutor = true; @@ -159,6 +165,7 @@ protected TaskExecutor createDefaultTaskExecutor() { return new SimpleAsyncTaskExecutor(threadNamePrefix); } + @Override public void destroy() throws Exception { initialized = false; @@ -175,24 +182,29 @@ public void destroy() throws Exception { } } + @Override public boolean isAutoStartup() { return true; } + @Override public void stop(Runnable callback) { stop(); callback.run(); } + @Override public int getPhase() { // start the latest return Integer.MAX_VALUE; } + @Override public boolean isRunning() { return running; } + @Override public void start() { if (!running) { running = true; @@ -219,6 +231,7 @@ public void start() { } } + @Override public void stop() { if (isRunning()) { running = false; @@ -312,6 +325,7 @@ public void setConnectionFactory(RedisConnectionFactory connectionFactory) { this.connectionFactory = connectionFactory; } + @Override public void setBeanName(String name) { this.beanName = name; } @@ -958,7 +972,7 @@ void unsubscribePattern(byte[]... patterns) { * * @author Costin Leau */ - private class DispatchMessageListener implements MessageListener { + private class DispatchMessageListener implements MessageListener, SubscriptionListener { @Override public void onMessage(Message message, @Nullable byte[] pattern) { @@ -977,6 +991,56 @@ public void onMessage(Message message, @Nullable byte[] pattern) { dispatchMessage(listeners, message, pattern); } } + + @Override + public void onChannelSubscribed(byte[] channel, long count) { + dispatchSubscriptionNotification( + channelMapping.getOrDefault(new ByteArrayWrapper(channel), Collections.emptyList()), channel, count, + SubscriptionListener::onChannelSubscribed); + } + + @Override + public void onChannelUnsubscribed(byte[] channel, long count) { + dispatchSubscriptionNotification( + channelMapping.getOrDefault(new ByteArrayWrapper(channel), Collections.emptyList()), channel, count, + SubscriptionListener::onChannelUnsubscribed); + } + + @Override + public void onPatternSubscribed(byte[] pattern, long count) { + dispatchSubscriptionNotification( + patternMapping.getOrDefault(new ByteArrayWrapper(pattern), Collections.emptyList()), pattern, count, + SubscriptionListener::onPatternSubscribed); + } + + @Override + public void onPatternUnsubscribed(byte[] pattern, long count) { + dispatchSubscriptionNotification( + patternMapping.getOrDefault(new ByteArrayWrapper(pattern), Collections.emptyList()), pattern, count, + SubscriptionListener::onPatternUnsubscribed); + } + } + + private void dispatchSubscriptionNotification(Collection listeners, byte[] pattern, long count, + SubscriptionConsumer listenerConsumer) { + + if (!CollectionUtils.isEmpty(listeners)) { + byte[] source = pattern.clone(); + + for (MessageListener messageListener : listeners) { + if (messageListener instanceof SubscriptionListener) { + taskExecutor.execute(() -> listenerConsumer.accept((SubscriptionListener) messageListener, source, count)); + } + } + } + } + + /** + * Represents an operation that accepts three input arguments {@link SubscriptionListener}, + * {@code channel or pattern}, and {@code count} and returns no result. + */ + interface SubscriptionConsumer { + void accept(SubscriptionListener listener, byte[] channelOrPattern, long count); } private void dispatchMessage(Collection listeners, Message message, @Nullable byte[] pattern) { diff --git a/src/test/java/org/springframework/data/redis/connection/jedis/JedisSubscriptionUnitTests.java b/src/test/java/org/springframework/data/redis/connection/jedis/JedisSubscriptionUnitTests.java index 27e9b71c4e..beec82b75e 100644 --- a/src/test/java/org/springframework/data/redis/connection/jedis/JedisSubscriptionUnitTests.java +++ b/src/test/java/org/springframework/data/redis/connection/jedis/JedisSubscriptionUnitTests.java @@ -39,7 +39,7 @@ @ExtendWith(MockitoExtension.class) class JedisSubscriptionUnitTests { - @Mock BinaryJedisPubSub jedisPubSub; + @Mock JedisMessageListener jedisPubSub; @Mock MessageListener listener; diff --git a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveSubscriptionUnitTests.java b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveSubscriptionUnitTests.java index 1ee50d8a0e..8efb4d30c5 100644 --- a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveSubscriptionUnitTests.java +++ b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveSubscriptionUnitTests.java @@ -21,6 +21,7 @@ import static org.springframework.data.redis.util.ByteUtils.*; import io.lettuce.core.RedisConnectionException; +import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; import io.lettuce.core.pubsub.api.reactive.RedisPubSubReactiveCommands; import reactor.core.Disposable; import reactor.core.publisher.DirectProcessor; @@ -40,6 +41,7 @@ import org.springframework.data.redis.RedisSystemException; import org.springframework.data.redis.connection.ReactiveSubscription.Message; import org.springframework.data.redis.connection.ReactiveSubscription.PatternMessage; +import org.springframework.data.redis.connection.SubscriptionListener; /** * Unit tests for {@link LettuceReactiveSubscription}. @@ -52,11 +54,14 @@ class LettuceReactiveSubscriptionUnitTests { private LettuceReactiveSubscription subscription; + @Mock StatefulRedisPubSubConnection connectionMock; @Mock RedisPubSubReactiveCommands commandsMock; @BeforeEach void before() { - subscription = new LettuceReactiveSubscription(commandsMock, e -> new RedisSystemException(e.getMessage(), e)); + when(connectionMock.reactive()).thenReturn(commandsMock); + subscription = new LettuceReactiveSubscription(mock(SubscriptionListener.class), connectionMock, + e -> new RedisSystemException(e.getMessage(), e)); } @Test // DATAREDIS-612 diff --git a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceSubscriptionUnitTests.java b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceSubscriptionUnitTests.java index 51a5131244..124cc9024b 100644 --- a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceSubscriptionUnitTests.java +++ b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceSubscriptionUnitTests.java @@ -18,10 +18,13 @@ import static org.assertj.core.api.Assertions.*; import static org.mockito.Mockito.*; +import io.lettuce.core.RedisFuture; import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; +import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands; import io.lettuce.core.pubsub.api.sync.RedisPubSubCommands; import java.util.Collection; +import java.util.concurrent.CompletableFuture; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -42,7 +45,9 @@ class LettuceSubscriptionUnitTests { private StatefulRedisPubSubConnection pubsub; - private RedisPubSubCommands asyncCommands; + private RedisPubSubCommands syncCommands; + + private RedisPubSubAsyncCommands asyncCommands; private LettuceConnectionProvider connectionProvider; @@ -51,10 +56,12 @@ class LettuceSubscriptionUnitTests { void setUp() { pubsub = mock(StatefulRedisPubSubConnection.class); - asyncCommands = mock(RedisPubSubCommands.class); + syncCommands = mock(RedisPubSubCommands.class); + asyncCommands = mock(RedisPubSubAsyncCommands.class); connectionProvider = mock(LettuceConnectionProvider.class); - when(pubsub.sync()).thenReturn(asyncCommands); + when(pubsub.sync()).thenReturn(syncCommands); + when(pubsub.async()).thenReturn(asyncCommands); subscription = new LettuceSubscription(mock(MessageListener.class), pubsub, connectionProvider); } @@ -64,8 +71,8 @@ void testUnsubscribeAllAndClose() { subscription.subscribe(new byte[][] { "a".getBytes() }); subscription.unsubscribe(); - verify(asyncCommands).unsubscribe(); - verify(asyncCommands, never()).punsubscribe(); + verify(syncCommands).unsubscribe(); + verify(syncCommands, never()).punsubscribe(); verify(connectionProvider).release(pubsub); verify(pubsub).removeListener(any(LettuceMessageListener.class)); @@ -81,8 +88,8 @@ void testUnsubscribeAllChannelsWithPatterns() { subscription.pSubscribe(new byte[][] { "s*".getBytes() }); subscription.unsubscribe(); - verify(asyncCommands).unsubscribe(); - verify(asyncCommands, never()).punsubscribe(); + verify(syncCommands).unsubscribe(); + verify(syncCommands, never()).punsubscribe(); assertThat(subscription.isAlive()).isTrue(); assertThat(subscription.getChannels()).isEmpty(); @@ -100,9 +107,9 @@ void testUnsubscribeChannelAndClose() { subscription.subscribe(channel); subscription.unsubscribe(channel); - verify(asyncCommands).unsubscribe(channel); - verify(asyncCommands, never()).unsubscribe(); - verify(asyncCommands, never()).punsubscribe(); + verify(syncCommands).unsubscribe(channel); + verify(syncCommands, never()).unsubscribe(); + verify(syncCommands, never()).punsubscribe(); verify(connectionProvider).release(pubsub); verify(pubsub).removeListener(any(LettuceMessageListener.class)); @@ -119,9 +126,9 @@ void testUnsubscribeChannelSomeLeft() { subscription.subscribe(channels); subscription.unsubscribe(new byte[][] { "a".getBytes() }); - verify(asyncCommands).unsubscribe(new byte[][] { "a".getBytes() }); - verify(asyncCommands, never()).unsubscribe(); - verify(asyncCommands, never()).punsubscribe(); + verify(syncCommands).unsubscribe(new byte[][] { "a".getBytes() }); + verify(syncCommands, never()).unsubscribe(); + verify(syncCommands, never()).punsubscribe(); assertThat(subscription.isAlive()).isTrue(); @@ -140,9 +147,9 @@ void testUnsubscribeChannelWithPatterns() { subscription.pSubscribe(new byte[][] { "s*".getBytes() }); subscription.unsubscribe(channel); - verify(asyncCommands).unsubscribe(channel); - verify(asyncCommands, never()).unsubscribe(); - verify(asyncCommands, never()).punsubscribe(); + verify(syncCommands).unsubscribe(channel); + verify(syncCommands, never()).unsubscribe(); + verify(syncCommands, never()).punsubscribe(); assertThat(subscription.isAlive()).isTrue(); assertThat(subscription.getChannels()).isEmpty(); @@ -161,9 +168,9 @@ void testUnsubscribeChannelWithPatternsSomeLeft() { subscription.pSubscribe(new byte[][] { "s*".getBytes() }); subscription.unsubscribe(channel); - verify(asyncCommands).unsubscribe(channel); - verify(asyncCommands, never()).unsubscribe(); - verify(asyncCommands, never()).punsubscribe(); + verify(syncCommands).unsubscribe(channel); + verify(syncCommands, never()).unsubscribe(); + verify(syncCommands, never()).punsubscribe(); assertThat(subscription.isAlive()).isTrue(); Collection channels = subscription.getChannels(); @@ -181,8 +188,8 @@ void testUnsubscribeAllNoChannels() { subscription.pSubscribe(new byte[][] { "s*".getBytes() }); subscription.unsubscribe(); - verify(asyncCommands, never()).unsubscribe(); - verify(asyncCommands, never()).punsubscribe(); + verify(syncCommands, never()).unsubscribe(); + verify(syncCommands, never()).punsubscribe(); assertThat(subscription.isAlive()).isTrue(); assertThat(subscription.getChannels()).isEmpty(); @@ -204,8 +211,8 @@ void testUnsubscribeNotAlive() { assertThat(subscription.isAlive()).isFalse(); subscription.unsubscribe(); - verify(asyncCommands).unsubscribe(); - verify(asyncCommands, never()).punsubscribe(); + verify(syncCommands).unsubscribe(); + verify(syncCommands, never()).punsubscribe(); } @Test @@ -226,8 +233,8 @@ void testPUnsubscribeAllAndClose() { subscription.pSubscribe(new byte[][] { "a*".getBytes() }); subscription.pUnsubscribe(); - verify(asyncCommands, never()).unsubscribe(); - verify(asyncCommands).punsubscribe(); + verify(syncCommands, never()).unsubscribe(); + verify(syncCommands).punsubscribe(); verify(connectionProvider).release(pubsub); verify(pubsub).removeListener(any(LettuceMessageListener.class)); @@ -243,8 +250,8 @@ void testPUnsubscribeAllPatternsWithChannels() { subscription.pSubscribe(new byte[][] { "s*".getBytes() }); subscription.pUnsubscribe(); - verify(asyncCommands, never()).unsubscribe(); - verify(asyncCommands).punsubscribe(); + verify(syncCommands, never()).unsubscribe(); + verify(syncCommands).punsubscribe(); assertThat(subscription.isAlive()).isTrue(); assertThat(subscription.getPatterns()).isEmpty(); @@ -262,9 +269,9 @@ void testPUnsubscribeAndClose() { subscription.pSubscribe(pattern); subscription.pUnsubscribe(pattern); - verify(asyncCommands, never()).unsubscribe(); - verify(asyncCommands, never()).punsubscribe(); - verify(asyncCommands).punsubscribe(pattern); + verify(syncCommands, never()).unsubscribe(); + verify(syncCommands, never()).punsubscribe(); + verify(syncCommands).punsubscribe(pattern); verify(connectionProvider).release(pubsub); verify(pubsub).removeListener(any(LettuceMessageListener.class)); @@ -280,9 +287,9 @@ void testPUnsubscribePatternSomeLeft() { subscription.pSubscribe(patterns); subscription.pUnsubscribe(new byte[][] { "a*".getBytes() }); - verify(asyncCommands).punsubscribe(new byte[][] { "a*".getBytes() }); - verify(asyncCommands, never()).unsubscribe(); - verify(asyncCommands, never()).punsubscribe(); + verify(syncCommands).punsubscribe(new byte[][] { "a*".getBytes() }); + verify(syncCommands, never()).unsubscribe(); + verify(syncCommands, never()).punsubscribe(); assertThat(subscription.isAlive()).isTrue(); @@ -301,9 +308,9 @@ void testPUnsubscribePatternWithChannels() { subscription.pSubscribe(pattern); subscription.pUnsubscribe(pattern); - verify(asyncCommands).punsubscribe(pattern); - verify(asyncCommands, never()).unsubscribe(); - verify(asyncCommands, never()).punsubscribe(); + verify(syncCommands).punsubscribe(pattern); + verify(syncCommands, never()).unsubscribe(); + verify(syncCommands, never()).punsubscribe(); assertThat(subscription.isAlive()).isTrue(); assertThat(subscription.getPatterns()).isEmpty(); @@ -322,9 +329,9 @@ void testUnsubscribePatternWithChannelsSomeLeft() { subscription.subscribe(new byte[][] { "a".getBytes() }); subscription.pUnsubscribe(pattern); - verify(asyncCommands, never()).unsubscribe(); - verify(asyncCommands, never()).punsubscribe(); - verify(asyncCommands).punsubscribe(pattern); + verify(syncCommands, never()).unsubscribe(); + verify(syncCommands, never()).punsubscribe(); + verify(syncCommands).punsubscribe(pattern); assertThat(subscription.isAlive()).isTrue(); @@ -343,8 +350,8 @@ void testPUnsubscribeAllNoPatterns() { subscription.subscribe(new byte[][] { "s".getBytes() }); subscription.pUnsubscribe(); - verify(asyncCommands, never()).unsubscribe(); - verify(asyncCommands, never()).punsubscribe(); + verify(syncCommands, never()).unsubscribe(); + verify(syncCommands, never()).punsubscribe(); assertThat(subscription.isAlive()).isTrue(); assertThat(subscription.getPatterns()).isEmpty(); @@ -365,8 +372,8 @@ void testPUnsubscribeNotAlive() { verify(connectionProvider).release(pubsub); verify(pubsub).removeListener(any(LettuceMessageListener.class)); - verify(asyncCommands).unsubscribe(); - verify(asyncCommands, never()).punsubscribe(); + verify(syncCommands).unsubscribe(); + verify(syncCommands, never()).punsubscribe(); } @Test @@ -386,27 +393,41 @@ void testDoCloseNotSubscribed() { subscription.doClose(); - verify(asyncCommands, never()).unsubscribe(); - verify(asyncCommands, never()).punsubscribe(); + verify(syncCommands, never()).unsubscribe(); + verify(syncCommands, never()).punsubscribe(); } @Test void testDoCloseSubscribedChannels() { + RedisFuture future = mock(RedisFuture.class); + when(future.toCompletableFuture()).thenReturn(CompletableFuture.completedFuture(null)); + + when(asyncCommands.unsubscribe()).thenReturn(future); + when(asyncCommands.ping()).thenReturn((RedisFuture) future); + subscription.subscribe(new byte[][] { "a".getBytes() }); subscription.doClose(); + verify(asyncCommands).ping(); verify(asyncCommands).unsubscribe(); - verify(asyncCommands, never()).punsubscribe(); + verifyNoMoreInteractions(asyncCommands); } @Test void testDoCloseSubscribedPatterns() { + RedisFuture future = mock(RedisFuture.class); + when(future.toCompletableFuture()).thenReturn(CompletableFuture.completedFuture(null)); + + when(asyncCommands.punsubscribe()).thenReturn(future); + when(asyncCommands.ping()).thenReturn((RedisFuture) future); + subscription.pSubscribe(new byte[][] { "a*".getBytes() }); subscription.doClose(); - verify(asyncCommands, never()).unsubscribe(); + verify(asyncCommands).ping(); verify(asyncCommands).punsubscribe(); + verifyNoMoreInteractions(asyncCommands); } } diff --git a/src/test/java/org/springframework/data/redis/core/ReactiveRedisTemplateIntegrationTests.java b/src/test/java/org/springframework/data/redis/core/ReactiveRedisTemplateIntegrationTests.java index 6b3cc932fe..d38205bc11 100644 --- a/src/test/java/org/springframework/data/redis/core/ReactiveRedisTemplateIntegrationTests.java +++ b/src/test/java/org/springframework/data/redis/core/ReactiveRedisTemplateIntegrationTests.java @@ -19,6 +19,7 @@ import static org.assertj.core.api.Assumptions.*; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import reactor.test.StepVerifier; import java.time.Duration; @@ -27,6 +28,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.function.Function; import org.junit.jupiter.api.BeforeEach; @@ -441,7 +443,29 @@ void listenToChannelShouldReceiveChannelMessagesCorrectly() throws InterruptedEx redisTemplate.listenToChannel(channel).as(StepVerifier::create) // .thenAwait(Duration.ofMillis(500)) // just make sure we the subscription completed - .then(() -> redisTemplate.convertAndSend(channel, message).block()) // + .then(() -> redisTemplate.convertAndSend(channel, message).subscribe()) // + .assertNext(received -> { + + assertThat(received).isInstanceOf(ChannelMessage.class); + assertThat(received.getMessage()).isEqualTo(message); + assertThat(received.getChannel()).isEqualTo(channel); + }) // + .thenAwait(Duration.ofMillis(10)) // + .thenCancel() // + .verify(Duration.ofSeconds(3)); + } + + @ParameterizedRedisTest // GH-1622 + @EnabledIfLongRunningTest + void listenToLaterChannelShouldReceiveChannelMessagesCorrectly() { + + String channel = "my-channel"; + + V message = valueFactory.instance(); + + redisTemplate.listenToChannelLater(channel) // + .doOnNext(it -> redisTemplate.convertAndSend(channel, message).subscribe()).flatMapMany(Function.identity()) // + .as(StepVerifier::create) // .assertNext(received -> { assertThat(received).isInstanceOf(ChannelMessage.class); @@ -455,7 +479,7 @@ void listenToChannelShouldReceiveChannelMessagesCorrectly() throws InterruptedEx @ParameterizedRedisTest // DATAREDIS-612 @EnabledIfLongRunningTest - void listenToChannelPatternShouldReceiveChannelMessagesCorrectly() throws InterruptedException { + void listenToPatternShouldReceiveChannelMessagesCorrectly() { String channel = "my-channel"; String pattern = "my-*"; @@ -466,7 +490,32 @@ void listenToChannelPatternShouldReceiveChannelMessagesCorrectly() throws Interr stream.as(StepVerifier::create) // .thenAwait(Duration.ofMillis(500)) // just make sure we the subscription completed - .then(() -> redisTemplate.convertAndSend(channel, message).block()) // + .then(() -> redisTemplate.convertAndSend(channel, message).subscribe()) // + .assertNext(received -> { + + assertThat(received).isInstanceOf(PatternMessage.class); + assertThat(received.getMessage()).isEqualTo(message); + assertThat(received.getChannel()).isEqualTo(channel); + assertThat(((PatternMessage) received).getPattern()).isEqualTo(pattern); + }) // + .thenCancel() // + .verify(Duration.ofSeconds(3)); + } + + @ParameterizedRedisTest // GH-1622 + @EnabledIfLongRunningTest + void listenToPatternLaterShouldReceiveChannelMessagesCorrectly() { + + String channel = "my-channel"; + String pattern = "my-*"; + + V message = valueFactory.instance(); + + Mono>> stream = redisTemplate.listenToPatternLater(pattern); + + stream.doOnNext(it -> redisTemplate.convertAndSend(channel, message).subscribe()) // + .flatMapMany(Function.identity()) // + .as(StepVerifier::create) // .assertNext(received -> { assertThat(received).isInstanceOf(PatternMessage.class); diff --git a/src/test/java/org/springframework/data/redis/core/ReactiveRedisTemplateUnitTests.java b/src/test/java/org/springframework/data/redis/core/ReactiveRedisTemplateUnitTests.java index c3ae3d4e98..70ed2d3d90 100644 --- a/src/test/java/org/springframework/data/redis/core/ReactiveRedisTemplateUnitTests.java +++ b/src/test/java/org/springframework/data/redis/core/ReactiveRedisTemplateUnitTests.java @@ -71,7 +71,7 @@ void listenToShouldSubscribeToChannel() { when(connectionMock.pubSubCommands()).thenReturn(pubSubCommands); when(pubSubCommands.subscribe(any())).thenReturn(Mono.empty()); - when(pubSubCommands.createSubscription()).thenReturn(Mono.just(subscription)); + when(pubSubCommands.createSubscription(any())).thenReturn(Mono.just(subscription)); when(subscription.receive()).thenReturn(Flux.create(sink -> {})); ReactiveRedisTemplate template = new ReactiveRedisTemplate<>(connectionFactoryMock, diff --git a/src/test/java/org/springframework/data/redis/listener/ReactiveRedisMessageListenerContainerIntegrationTests.java b/src/test/java/org/springframework/data/redis/listener/ReactiveRedisMessageListenerContainerIntegrationTests.java index 56ea8bebec..141aeef41f 100644 --- a/src/test/java/org/springframework/data/redis/listener/ReactiveRedisMessageListenerContainerIntegrationTests.java +++ b/src/test/java/org/springframework/data/redis/listener/ReactiveRedisMessageListenerContainerIntegrationTests.java @@ -20,21 +20,30 @@ import reactor.core.Disposable; import reactor.test.StepVerifier; +import java.nio.ByteBuffer; import java.time.Duration; import java.util.Collection; +import java.util.Collections; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; import java.util.function.Supplier; import org.awaitility.Awaitility; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.springframework.data.redis.connection.Message; +import org.springframework.data.redis.connection.MessageListener; +import org.springframework.data.redis.connection.ReactiveRedisConnection; import org.springframework.data.redis.connection.ReactiveSubscription; import org.springframework.data.redis.connection.ReactiveSubscription.ChannelMessage; import org.springframework.data.redis.connection.ReactiveSubscription.PatternMessage; import org.springframework.data.redis.connection.RedisConnection; +import org.springframework.data.redis.connection.SubscriptionListener; import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory; import org.springframework.data.redis.core.ReactiveRedisTemplate; import org.springframework.data.redis.serializer.RedisSerializationContext; @@ -56,6 +65,7 @@ public class ReactiveRedisMessageListenerContainerIntegrationTests { private final LettuceConnectionFactory connectionFactory; private @Nullable RedisConnection connection; + private @Nullable ReactiveRedisConnection reactiveConnection; /** * @param connectionFactory @@ -73,6 +83,7 @@ public static Collection testParams() { @BeforeEach void before() { connection = connectionFactory.getConnection(); + reactiveConnection = connectionFactory.getReactiveConnection(); } @AfterEach @@ -81,16 +92,64 @@ void tearDown() { if (connection != null) { connection.close(); } + + if (reactiveConnection != null) { + reactiveConnection.close(); + } } - @ParameterizedRedisTest // DATAREDIS-612 + @ParameterizedRedisTest // DATAREDIS-612, GH-1622 void shouldReceiveChannelMessages() { ReactiveRedisMessageListenerContainer container = new ReactiveRedisMessageListenerContainer(connectionFactory); - container.receive(ChannelTopic.of(CHANNEL1)).as(StepVerifier::create) // + container.receiveLater(ChannelTopic.of(CHANNEL1)) // + .doOnNext(it -> doPublish(CHANNEL1.getBytes(), MESSAGE.getBytes())) // + .flatMapMany(Function.identity()) // + .as(StepVerifier::create) // + .assertNext(c -> { + + assertThat(c.getChannel()).isEqualTo(CHANNEL1); + assertThat(c.getMessage()).isEqualTo(MESSAGE); + }) // + .thenCancel().verify(); + + container.destroy(); + } + + @ParameterizedRedisTest // GH-1622 + void receiveChannelShouldNotifySubscriptionListener() throws Exception { + + ReactiveRedisMessageListenerContainer container = new ReactiveRedisMessageListenerContainer(connectionFactory); + + AtomicReference onSubscribe = new AtomicReference<>(); + AtomicReference onUnsubscribe = new AtomicReference<>(); + CompletableFuture subscribe = new CompletableFuture<>(); + CompletableFuture unsubscribe = new CompletableFuture<>(); + + CompositeListener listener = new CompositeListener() { + @Override + public void onMessage(Message message, @Nullable byte[] pattern) { + + } + + @Override + public void onChannelSubscribed(byte[] channel, long count) { + onSubscribe.set(new String(channel)); + subscribe.complete(null); + } + + @Override + public void onChannelUnsubscribed(byte[] channel, long count) { + onUnsubscribe.set(new String(channel)); + unsubscribe.complete(null); + } + }; + + container.receive(Collections.singletonList(ChannelTopic.of(CHANNEL1)), listener) // + .as(StepVerifier::create) // .then(awaitSubscription(container::getActiveSubscriptions)) - .then(() -> connection.publish(CHANNEL1.getBytes(), MESSAGE.getBytes())) // + .then(() -> doPublish(CHANNEL1.getBytes(), MESSAGE.getBytes())) // .assertNext(c -> { assertThat(c.getChannel()).isEqualTo(CHANNEL1); @@ -98,17 +157,67 @@ void shouldReceiveChannelMessages() { }) // .thenCancel().verify(); + unsubscribe.get(10, TimeUnit.SECONDS); + + assertThat(onSubscribe).hasValue(CHANNEL1); + assertThat(onUnsubscribe).hasValue(CHANNEL1); + container.destroy(); } - @ParameterizedRedisTest // DATAREDIS-612 + @ParameterizedRedisTest // DATAREDIS-612, GH-1622 void shouldReceivePatternMessages() { ReactiveRedisMessageListenerContainer container = new ReactiveRedisMessageListenerContainer(connectionFactory); - container.receive(PatternTopic.of(PATTERN1)).as(StepVerifier::create) // + container.receiveLater(PatternTopic.of(PATTERN1)) // + .doOnNext(it -> doPublish(CHANNEL1.getBytes(), MESSAGE.getBytes())).flatMapMany(Function.identity()) // + .as(StepVerifier::create) // + .assertNext(c -> { + + assertThat(c.getPattern()).isEqualTo(PATTERN1); + assertThat(c.getChannel()).isEqualTo(CHANNEL1); + assertThat(c.getMessage()).isEqualTo(MESSAGE); + }) // + .thenCancel().verify(); + + container.destroy(); + } + + @ParameterizedRedisTest // GH-1622 + void receivePatternShouldNotifySubscriptionListener() throws Exception { + + ReactiveRedisMessageListenerContainer container = new ReactiveRedisMessageListenerContainer(connectionFactory); + + AtomicReference onPsubscribe = new AtomicReference<>(); + AtomicReference onPunsubscribe = new AtomicReference<>(); + CompletableFuture psubscribe = new CompletableFuture<>(); + CompletableFuture punsubscribe = new CompletableFuture<>(); + + CompositeListener listener = new CompositeListener() { + @Override + public void onMessage(Message message, @Nullable byte[] pattern) { + + } + + @Override + public void onPatternSubscribed(byte[] pattern, long count) { + onPsubscribe.set(new String(pattern)); + psubscribe.complete(null); + } + + @Override + public void onPatternUnsubscribed(byte[] pattern, long count) { + onPunsubscribe.set(new String(pattern)); + punsubscribe.complete(null); + } + }; + + container.receive(Collections.singletonList(PatternTopic.of(PATTERN1)), listener) // + .cast(PatternMessage.class) // + .as(StepVerifier::create) // .then(awaitSubscription(container::getActiveSubscriptions)) - .then(() -> connection.publish(CHANNEL1.getBytes(), MESSAGE.getBytes())) // + .then(() -> doPublish(CHANNEL1.getBytes(), MESSAGE.getBytes())) // .assertNext(c -> { assertThat(c.getPattern()).isEqualTo(PATTERN1); @@ -117,22 +226,30 @@ void shouldReceivePatternMessages() { }) // .thenCancel().verify(); + punsubscribe.get(10, TimeUnit.SECONDS); + + assertThat(onPsubscribe).hasValue(PATTERN1); + assertThat(onPunsubscribe).hasValue(PATTERN1); + container.destroy(); } - @ParameterizedRedisTest // DATAREDIS-612 - void shouldPublishAndReceiveMessage() throws InterruptedException { + @ParameterizedRedisTest // DATAREDIS-612, GH-1622 + void shouldPublishAndReceiveMessage() throws Exception { ReactiveRedisMessageListenerContainer container = new ReactiveRedisMessageListenerContainer(connectionFactory); ReactiveRedisTemplate template = new ReactiveRedisTemplate<>(connectionFactory, RedisSerializationContext.string()); BlockingQueue> messages = new LinkedBlockingDeque<>(); - Disposable subscription = container.receive(PatternTopic.of(PATTERN1)).doOnNext(messages::add).subscribe(); + CompletableFuture subscribed = new CompletableFuture<>(); + Disposable subscription = container.receiveLater(PatternTopic.of(PATTERN1)) + .doOnNext(it -> subscribed.complete(null)).flatMapMany(Function.identity()).doOnNext(messages::add).subscribe(); - StepVerifier.create(template.convertAndSend(CHANNEL1, MESSAGE), 0) // - .then(awaitSubscription(container::getActiveSubscriptions)) // - .thenRequest(1).expectNextCount(1) // + subscribed.get(5, TimeUnit.SECONDS); + + template.convertAndSend(CHANNEL1, MESSAGE).as(StepVerifier::create) // + .expectNextCount(1) // .verifyComplete(); PatternMessage message = messages.poll(1, TimeUnit.SECONDS); @@ -154,7 +271,7 @@ void listenToChannelShouldReceiveChannelMessagesCorrectly() throws InterruptedEx template.listenToChannel(CHANNEL1).as(StepVerifier::create) // .thenAwait(Duration.ofMillis(100)) // just make sure we the subscription completed - .then(() -> connection.publish(CHANNEL1.getBytes(), MESSAGE.getBytes())) // + .then(() -> doPublish(CHANNEL1.getBytes(), MESSAGE.getBytes())) // .assertNext(message -> { assertThat(message).isInstanceOf(ChannelMessage.class); @@ -173,7 +290,7 @@ void listenToPatternShouldReceiveMessagesCorrectly() { template.listenToPattern(PATTERN1).as(StepVerifier::create) // .thenAwait(Duration.ofMillis(100)) // just make sure we the subscription completed - .then(() -> connection.publish(CHANNEL1.getBytes(), MESSAGE.getBytes())) // + .then(() -> doPublish(CHANNEL1.getBytes(), MESSAGE.getBytes())) // .assertNext(message -> { assertThat(message).isInstanceOf(PatternMessage.class); @@ -185,10 +302,18 @@ void listenToPatternShouldReceiveMessagesCorrectly() { .verify(); } + private void doPublish(byte[] channel, byte[] message) { + reactiveConnection.pubSubCommands().publish(ByteBuffer.wrap(channel), ByteBuffer.wrap(message)).subscribe(); + } + private static Runnable awaitSubscription(Supplier> activeSubscriptions) { return () -> { Awaitility.await().until(() -> !activeSubscriptions.get().isEmpty()); }; } + + interface CompositeListener extends MessageListener, SubscriptionListener { + + } } diff --git a/src/test/java/org/springframework/data/redis/listener/ReactiveRedisMessageListenerContainerUnitTests.java b/src/test/java/org/springframework/data/redis/listener/ReactiveRedisMessageListenerContainerUnitTests.java index 96f0bbebb0..e98053f715 100644 --- a/src/test/java/org/springframework/data/redis/listener/ReactiveRedisMessageListenerContainerUnitTests.java +++ b/src/test/java/org/springframework/data/redis/listener/ReactiveRedisMessageListenerContainerUnitTests.java @@ -67,7 +67,7 @@ void before() { when(connectionFactoryMock.getReactiveConnection()).thenReturn(connectionMock); when(connectionMock.pubSubCommands()).thenReturn(commandsMock); when(connectionMock.closeLater()).thenReturn(Mono.empty()); - when(commandsMock.createSubscription()).thenReturn(Mono.just(subscriptionMock)); + when(commandsMock.createSubscription(any())).thenReturn(Mono.just(subscriptionMock)); when(subscriptionMock.subscribe(any())).thenReturn(Mono.empty()); when(subscriptionMock.pSubscribe(any())).thenReturn(Mono.empty()); when(subscriptionMock.unsubscribe()).thenReturn(Mono.empty()); @@ -182,7 +182,7 @@ void shouldRegisterSubscription() { assertThat(container.getActiveSubscriptions()).isEmpty(); } - @Test // DATAREDIS-612 + @Test // DATAREDIS-612, GH-1622 void shouldRegisterSubscriptionMultipleSubscribers() { reset(subscriptionMock); @@ -203,11 +203,11 @@ void shouldRegisterSubscriptionMultipleSubscribers() { second.dispose(); - verify(subscriptionMock).unsubscribe(); + verify(subscriptionMock).cancel(); assertThat(container.getActiveSubscriptions()).isEmpty(); } - @Test // DATAREDIS-612 + @Test // DATAREDIS-612, GH-1622 void shouldUnsubscribeOnCancel() { when(subscriptionMock.receive()).thenReturn(DirectProcessor.create()); @@ -221,7 +221,7 @@ void shouldUnsubscribeOnCancel() { }).thenCancel().verify(); - verify(subscriptionMock).unsubscribe(); + verify(subscriptionMock).cancel(); } @Test // DATAREDIS-612 diff --git a/src/test/java/org/springframework/data/redis/listener/RedisMessageListenerContainerIntegrationTests.java b/src/test/java/org/springframework/data/redis/listener/RedisMessageListenerContainerIntegrationTests.java index 1d0b83f86c..535424be3f 100644 --- a/src/test/java/org/springframework/data/redis/listener/RedisMessageListenerContainerIntegrationTests.java +++ b/src/test/java/org/springframework/data/redis/listener/RedisMessageListenerContainerIntegrationTests.java @@ -16,87 +16,202 @@ package org.springframework.data.redis.listener; import static org.assertj.core.api.Assertions.*; -import static org.mockito.Mockito.*; -import java.util.concurrent.Executor; +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.springframework.core.task.SyncTaskExecutor; -import org.springframework.data.redis.SettingsUtils; -import org.springframework.data.redis.connection.RedisStandaloneConfiguration; +import org.springframework.data.redis.connection.Message; +import org.springframework.data.redis.connection.MessageListener; +import org.springframework.data.redis.connection.RedisConnection; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.connection.SubscriptionListener; import org.springframework.data.redis.connection.jedis.JedisConnectionFactory; -import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; +import org.springframework.data.redis.connection.jedis.extension.JedisConnectionFactoryExtension; +import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory; +import org.springframework.data.redis.connection.lettuce.extension.LettuceConnectionFactoryExtension; +import org.springframework.data.redis.test.extension.RedisStanalone; +import org.springframework.data.redis.test.extension.parametrized.MethodSource; +import org.springframework.data.redis.test.extension.parametrized.ParameterizedRedisTest; +import org.springframework.lang.Nullable; /** * Integration tests for {@link RedisMessageListenerContainer}. * * @author Mark Paluch - * @author Christoph Strobl */ +@MethodSource("testParams") class RedisMessageListenerContainerIntegrationTests { - private final Object handler = new Object() { - - @SuppressWarnings("unused") - public void handleMessage(Object message) {} - }; - - private final MessageListenerAdapter adapter = new MessageListenerAdapter(handler); - - private JedisConnectionFactory connectionFactory; + private RedisConnectionFactory connectionFactory; private RedisMessageListenerContainer container; - private Executor executorMock; + public RedisMessageListenerContainerIntegrationTests(RedisConnectionFactory connectionFactory) { + this.connectionFactory = connectionFactory; + } @BeforeEach void setUp() { - executorMock = mock(Executor.class); - - RedisStandaloneConfiguration configuration = new RedisStandaloneConfiguration(); - configuration.setPort(SettingsUtils.getPort()); - configuration.setHostName(SettingsUtils.getHost()); - configuration.setDatabase(2); - - connectionFactory = new JedisConnectionFactory(configuration); - connectionFactory.afterPropertiesSet(); - container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setBeanName("container"); - container.setTaskExecutor(new SyncTaskExecutor()); - container.setSubscriptionExecutor(executorMock); container.afterPropertiesSet(); } + public static Collection testParams() { + + // Jedis + JedisConnectionFactory jedisConnFactory = JedisConnectionFactoryExtension + .getConnectionFactory(RedisStanalone.class); + + // Lettuce + LettuceConnectionFactory lettuceConnFactory = LettuceConnectionFactoryExtension + .getConnectionFactory(RedisStanalone.class); + + return Arrays.asList(new Object[][] { { jedisConnFactory }, { lettuceConnFactory } }); + } + @AfterEach void tearDown() throws Exception { + container.destroy(); + } + + @ParameterizedRedisTest + void notifiesChannelSubscriptionState() throws Exception { + + AtomicReference onSubscribe = new AtomicReference<>(); + AtomicReference onUnsubscribe = new AtomicReference<>(); + CompletableFuture subscribe = new CompletableFuture<>(); + CompletableFuture unsubscribe = new CompletableFuture<>(); + + CompositeListener listener = new CompositeListener() { + @Override + public void onMessage(Message message, @Nullable byte[] pattern) { + + } + + @Override + public void onChannelSubscribed(byte[] channel, long count) { + onSubscribe.set(new String(channel)); + subscribe.complete(null); + } + + @Override + public void onChannelUnsubscribed(byte[] channel, long count) { + onUnsubscribe.set(new String(channel)); + unsubscribe.complete(null); + } + }; + + container.addMessageListener(listener, new ChannelTopic("a")); + container.start(); + + subscribe.get(10, TimeUnit.SECONDS); container.destroy(); - connectionFactory.destroy(); + + unsubscribe.get(10, TimeUnit.SECONDS); + + assertThat(onSubscribe).hasValue("a"); + assertThat(onUnsubscribe).hasValue("a"); } - @Test // DATAREDIS-415 - void interruptAtStart() { + @ParameterizedRedisTest + void notifiesPatternSubscriptionState() throws Exception { + + AtomicReference onPsubscribe = new AtomicReference<>(); + AtomicReference onPunsubscribe = new AtomicReference<>(); + CompletableFuture psubscribe = new CompletableFuture<>(); + CompletableFuture punsubscribe = new CompletableFuture<>(); + + CompositeListener listener = new CompositeListener() { + @Override + public void onMessage(Message message, @Nullable byte[] pattern) { - final Thread main = Thread.currentThread(); + } - // interrupt thread once Executor.execute is called - doAnswer(invocationOnMock -> { + @Override + public void onPatternSubscribed(byte[] pattern, long count) { + onPsubscribe.set(new String(pattern)); + psubscribe.complete(null); + } - main.interrupt(); - return null; - }).when(executorMock).execute(any(Runnable.class)); + @Override + public void onPatternUnsubscribed(byte[] pattern, long count) { + onPunsubscribe.set(new String(pattern)); + punsubscribe.complete(null); + } + }; - container.addMessageListener(adapter, new ChannelTopic("a")); + container.addMessageListener(listener, new PatternTopic("a")); container.start(); - // reset the interrupted flag to not destroy the teardown - assertThat(Thread.interrupted()).isTrue(); + psubscribe.get(10, TimeUnit.SECONDS); + + container.destroy(); + + punsubscribe.get(10, TimeUnit.SECONDS); + + assertThat(onPsubscribe).hasValue("a"); + assertThat(onPunsubscribe).hasValue("a"); + } + + @ParameterizedRedisTest + void repeatedSubscribeShouldNotifyOnlyOnce() throws Exception { + + AtomicInteger subscriptions1 = new AtomicInteger(); + AtomicInteger subscriptions2 = new AtomicInteger(); + CountDownLatch received = new CountDownLatch(2); + + CompositeListener listener1 = new CompositeListener() { + @Override + public void onMessage(Message message, @Nullable byte[] pattern) { + received.countDown(); + } + + @Override + public void onPatternSubscribed(byte[] pattern, long count) { + subscriptions1.incrementAndGet(); + } + }; + + CompositeListener listener2 = new CompositeListener() { + @Override + public void onMessage(Message message, @Nullable byte[] pattern) { + received.countDown(); + } + + @Override + public void onPatternSubscribed(byte[] pattern, long count) { + subscriptions2.incrementAndGet(); + } + }; + + container.addMessageListener(listener1, new PatternTopic("a")); + container.addMessageListener(listener2, new PatternTopic("a")); + + container.start(); + + try (RedisConnection connection = connectionFactory.getConnection()) { + connection.publish("a".getBytes(), "hello".getBytes()); + } + + received.await(2, TimeUnit.SECONDS); + container.destroy(); + + assertThat(subscriptions1).hasValue(1); + assertThat(subscriptions2).hasValue(1); + } + + interface CompositeListener extends MessageListener, SubscriptionListener { - assertThat(container.isRunning()).isFalse(); } } diff --git a/src/test/java/org/springframework/data/redis/listener/RedisMessageListenerContainerInterruptIntegrationTests.java b/src/test/java/org/springframework/data/redis/listener/RedisMessageListenerContainerInterruptIntegrationTests.java new file mode 100644 index 0000000000..ae347e1ba1 --- /dev/null +++ b/src/test/java/org/springframework/data/redis/listener/RedisMessageListenerContainerInterruptIntegrationTests.java @@ -0,0 +1,103 @@ +/* + * Copyright 2016-2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.redis.listener; + +import static org.assertj.core.api.Assertions.*; +import static org.mockito.Mockito.*; + +import java.util.concurrent.Executor; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import org.springframework.core.task.SyncTaskExecutor; +import org.springframework.data.redis.SettingsUtils; +import org.springframework.data.redis.connection.RedisStandaloneConfiguration; +import org.springframework.data.redis.connection.jedis.JedisConnectionFactory; +import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; +import org.springframework.data.redis.test.extension.parametrized.MethodSource; + +/** + * Integration tests for {@link RedisMessageListenerContainer}. + * + * @author Mark Paluch + * @author Christoph Strobl + */ +class RedisMessageListenerContainerInterruptIntegrationTests { + + private final Object handler = new Object() { + + @SuppressWarnings("unused") + public void handleMessage(Object message) {} + }; + + private final MessageListenerAdapter adapter = new MessageListenerAdapter(handler); + + private JedisConnectionFactory connectionFactory; + private RedisMessageListenerContainer container; + + private Executor executorMock; + + @BeforeEach + void setUp() { + + executorMock = mock(Executor.class); + + RedisStandaloneConfiguration configuration = new RedisStandaloneConfiguration(); + configuration.setPort(SettingsUtils.getPort()); + configuration.setHostName(SettingsUtils.getHost()); + configuration.setDatabase(2); + + connectionFactory = new JedisConnectionFactory(configuration); + connectionFactory.afterPropertiesSet(); + + container = new RedisMessageListenerContainer(); + container.setConnectionFactory(connectionFactory); + container.setBeanName("container"); + container.setTaskExecutor(new SyncTaskExecutor()); + container.setSubscriptionExecutor(executorMock); + container.afterPropertiesSet(); + } + + @AfterEach + void tearDown() throws Exception { + + container.destroy(); + connectionFactory.destroy(); + } + + @Test // DATAREDIS-415 + void interruptAtStart() { + + final Thread main = Thread.currentThread(); + + // interrupt thread once Executor.execute is called + doAnswer(invocationOnMock -> { + + main.interrupt(); + return null; + }).when(executorMock).execute(any(Runnable.class)); + + container.addMessageListener(adapter, new ChannelTopic("a")); + container.start(); + + // reset the interrupted flag to not destroy the teardown + assertThat(Thread.interrupted()).isTrue(); + + assertThat(container.isRunning()).isFalse(); + } +}