Skip to content

Add support for SubscriptionListener using Pub/Sub and introduce ReactiveRedisMessageListenerContainer.receiveLater(…) #2052

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 8 commits into from
Closed
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>2.6.0-SNAPSHOT</version>
<version>2.6.0-GH-1622-SNAPSHOT</version>

<name>Spring Data Redis</name>

Expand Down
5 changes: 5 additions & 0 deletions src/main/asciidoc/new-features.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@

This section briefly covers items that are new and noteworthy in the latest releases.

[[new-in-2.6.0]]
== New in Spring Data Redis 2.6

* Support for `SubscriptionListener` when using `MessageListener` for subscription confirmation callbacks. `ReactiveRedisMessageListenerContainer` and `ReactiveRedisOperations` provide `receiveLater(…)` and `listenToLater(…)` methods to await until Redis acknowledges the subscription.

[[new-in-2.5.0]]
== New in Spring Data Redis 2.5

Expand Down
15 changes: 15 additions & 0 deletions src/main/asciidoc/reference/reactive-messaging.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,21 @@ ReactiveRedisMessageListenerContainer container = new ReactiveRedisMessageListen
Flux<ChannelMessage<String, String>> stream = container.receive(ChannelTopic.of("my-channel"));
----

To await and ensure proper subscription, you can use the `receiveLater` method that returns a `Mono<Flux<ChannelMessage>>`.
The resulting `Mono` completes with an inner publisher as a result of completing the subscription to the given topics. By intercepting `onNext` signals, you can synchronize server-side subscriptions.

[source,java]
----
ReactiveRedisConnectionFactory factory = …
ReactiveRedisMessageListenerContainer container = new ReactiveRedisMessageListenerContainer(factory);

Mono<Flux<ChannelMessage<String, String>>> stream = container.receiveLater(ChannelTopic.of("my-channel"));

stream.doOnNext(inner -> // notification hook when Redis subscriptions are synchronized with the server)
.flatMapMany(Function.identity())
.…;
----

[[redis:reactive:pubsub:subscribe:template]]
=== Subscribing via template API

Expand Down
2 changes: 2 additions & 0 deletions src/main/asciidoc/reference/redis-messaging.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ Due to its blocking nature, low-level subscription is not attractive, as it requ

`RedisMessageListenerContainer` acts as a message listener container. It is used to receive messages from a Redis channel and drive the `MessageListener` instances that are injected into it. The listener container is responsible for all threading of message reception and dispatches into the listener for processing. A message listener container is the intermediary between an MDP and a messaging provider and takes care of registering to receive messages, resource acquisition and release, exception conversion, and the like. This lets you as an application developer write the (possibly complex) business logic associated with receiving a message (and reacting to it) and delegates boilerplate Redis infrastructure concerns to the framework.

A `MessageListener` can additionally implement `SubscriptionListener` to receive notifications upon subscription/unsubscribe confirmation. Listening to subscription notifications can be useful when synchronizing invocations.

Furthermore, to minimize the application footprint, `RedisMessageListenerContainer` lets one connection and one thread be shared by multiple listeners even though they do not share a subscription. Thus, no matter how many listeners or channels an application tracks, the runtime cost remains the same throughout its lifetime. Moreover, the container allows runtime configuration changes so that you can add or remove listeners while an application is running without the need for a restart. Additionally, the container uses a lazy subscription approach, using a `RedisConnection` only when needed. If all the listeners are unsubscribed, cleanup is automatically performed, and the thread is released.

To help with the asynchronous nature of messages, the container requires a `java.util.concurrent.Executor` (or Spring's `TaskExecutor`) for dispatching the messages. Depending on the load, the number of listeners, or the runtime environment, you should change or tweak the executor to better serve your needs. In particular, in managed environments (such as app servers), it is highly recommended to pick a proper `TaskExecutor` to take advantage of its runtime.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@
import org.springframework.lang.Nullable;

/**
* Listener of messages published in Redis.
* Listener of messages published in Redis. A MessageListener can implement {@link SubscriptionListener} to receive
* notifications for subscription states.
*
* @author Costin Leau
* @author Christoph Strobl
* @see SubscriptionListener
*/
@FunctionalInterface
public interface MessageListener {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,18 @@ public interface ReactivePubSubCommands {
*
* @return the subscription.
*/
Mono<ReactiveSubscription> createSubscription();
default Mono<ReactiveSubscription> createSubscription() {
return createSubscription(SubscriptionListener.EMPTY);
}

/**
* Creates a subscription for this connection. Connections can have multiple {@link ReactiveSubscription}s.
*
* @param subscriptionListener the subscription listener to listen for subscription confirmations.
* @return the subscription.
* @since 2.6
*/
Mono<ReactiveSubscription> createSubscription(SubscriptionListener subscriptionListener);

/**
* Publishes the given {@code message} to the given {@code channel}.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright 2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.redis.connection;

/**
* Listener for subscription notifications.
* <p/>
* Subscription notifications are reported by Redis as confirmation for subscribe and unsubscribe operations for
* channels and patterns.
*
* @author Mark Paluch
* @since 2.6
*/
public interface SubscriptionListener {

/**
* Empty {@link SubscriptionListener}.
*/
SubscriptionListener EMPTY = new SubscriptionListener() {};

/**
* Notification when Redis has confirmed a channel subscription.
*
* @param channel name of the channel.
* @param count subscriber count.
*/
default void onChannelSubscribed(byte[] channel, long count) {}

/**
* Notification when Redis has confirmed a channel un-subscription.
*
* @param channel name of the channel.
* @param count subscriber count.
*/
default void onChannelUnsubscribed(byte[] channel, long count) {}

/**
* Notification when Redis has confirmed a pattern subscription.
*
* @param pattern the pattern.
* @param count subscriber count.
*/
default void onPatternSubscribed(byte[] pattern, long count) {}

/**
* Notification when Redis has confirmed a pattern un-subscription.
*
* @param pattern the pattern.
* @param count subscriber count.
*/
default void onPatternUnsubscribed(byte[] pattern, long count) {}

}
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ public void subscribe(MessageListener listener, byte[]... channels) {
"Connection already subscribed; use the connection Subscription to cancel or add new channels");
}
try {
BinaryJedisPubSub jedisPubSub = new JedisMessageListener(listener);
JedisMessageListener jedisPubSub = new JedisMessageListener(listener);
subscription = new JedisSubscription(listener, jedisPubSub, channels, null);
cluster.subscribe(jedisPubSub, channels);
} catch (Exception ex) {
Expand All @@ -466,7 +466,7 @@ public void pSubscribe(MessageListener listener, byte[]... patterns) {
"Connection already subscribed; use the connection Subscription to cancel or add new channels");
}
try {
BinaryJedisPubSub jedisPubSub = new JedisMessageListener(listener);
JedisMessageListener jedisPubSub = new JedisMessageListener(listener);
subscription = new JedisSubscription(listener, jedisPubSub, null, patterns);
cluster.psubscribe(jedisPubSub, patterns);
} catch (Exception ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,7 @@ public void pSubscribe(MessageListener listener, byte[]... patterns) {

doWithJedis(it -> {

BinaryJedisPubSub jedisPubSub = new JedisMessageListener(listener);
JedisMessageListener jedisPubSub = new JedisMessageListener(listener);

subscription = new JedisSubscription(listener, jedisPubSub, null, patterns);
it.psubscribe(jedisPubSub, patterns);
Expand All @@ -740,7 +740,7 @@ public void subscribe(MessageListener listener, byte[]... channels) {

doWithJedis(it -> {

BinaryJedisPubSub jedisPubSub = new JedisMessageListener(listener);
JedisMessageListener jedisPubSub = new JedisMessageListener(listener);

subscription = new JedisSubscription(listener, jedisPubSub, channels, null);
it.subscribe(jedisPubSub, channels);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,29 @@
*/
package org.springframework.data.redis.connection.jedis;

import redis.clients.jedis.BinaryJedisPubSub;

import org.springframework.data.redis.connection.DefaultMessage;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.SubscriptionListener;
import org.springframework.util.Assert;

import redis.clients.jedis.BinaryJedisPubSub;

/**
* MessageListener adapter on top of Jedis.
*
* @author Costin Leau
* @author Mark Paluch
*/
class JedisMessageListener extends BinaryJedisPubSub {

private final MessageListener listener;
private final SubscriptionListener subscriptionListener;

JedisMessageListener(MessageListener listener) {
Assert.notNull(listener, "message listener is required");
Assert.notNull(listener, "MessageListener is required");
this.listener = listener;
this.subscriptionListener = listener instanceof SubscriptionListener ? (SubscriptionListener) listener
: SubscriptionListener.EMPTY;
}

public void onMessage(byte[] channel, byte[] message) {
Expand All @@ -44,18 +49,18 @@ public void onPMessage(byte[] pattern, byte[] channel, byte[] message) {
}

public void onPSubscribe(byte[] pattern, int subscribedChannels) {
// no-op
subscriptionListener.onPatternSubscribed(pattern, subscribedChannels);
}

public void onPUnsubscribe(byte[] pattern, int subscribedChannels) {
// no-op
subscriptionListener.onPatternUnsubscribed(pattern, subscribedChannels);
}

public void onSubscribe(byte[] channel, int subscribedChannels) {
// no-op
subscriptionListener.onChannelSubscribed(channel, subscribedChannels);
}

public void onUnsubscribe(byte[] channel, int subscribedChannels) {
// no-op
subscriptionListener.onChannelUnsubscribed(channel, subscribedChannels);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class JedisSubscription extends AbstractSubscription {

private final BinaryJedisPubSub jedisPubSub;

JedisSubscription(MessageListener listener, BinaryJedisPubSub jedisPubSub, @Nullable byte[][] channels,
JedisSubscription(MessageListener listener, JedisMessageListener jedisPubSub, @Nullable byte[][] channels,
@Nullable byte[][] patterns) {
super(listener, channels, patterns);
this.jedisPubSub = jedisPubSub;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright 2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.redis.connection.lettuce;

import io.lettuce.core.pubsub.RedisPubSubListener;

import java.nio.ByteBuffer;

import org.springframework.lang.Nullable;
import org.springframework.util.Assert;

/**
* Wrapper around {@link RedisPubSubListener} that converts {@link ByteBuffer} into {@code byte[]}.
*
* @author Mark Paluch
* @since 2.6
*/
class LettuceByteBufferPubSubListenerWrapper implements RedisPubSubListener<ByteBuffer, ByteBuffer> {

private final RedisPubSubListener<byte[], byte[]> delegate;

LettuceByteBufferPubSubListenerWrapper(RedisPubSubListener<byte[], byte[]> delegate) {

Assert.notNull(delegate, "RedisPubSubListener must not be null!");

this.delegate = delegate;
}

/*
* (non-Javadoc)
* @see io.lettuce.core.pubsub.RedisPubSubListener#message(java.lang.Object, java.lang.Object)
*/
public void message(ByteBuffer channel, ByteBuffer message) {
delegate.message(getBytes(channel), getBytes(message));
}

/*
* (non-Javadoc)
* @see io.lettuce.core.pubsub.RedisPubSubListener#message(java.lang.Object, java.lang.Object, java.lang.Object)
*/
public void message(ByteBuffer pattern, ByteBuffer channel, ByteBuffer message) {
delegate.message(getBytes(channel), getBytes(message), getBytes(pattern));
}

/*
* (non-Javadoc)
* @see io.lettuce.core.pubsub.RedisPubSubListener#subscribed(java.lang.Object, long)
*/
public void subscribed(ByteBuffer channel, long count) {
delegate.subscribed(getBytes(channel), count);
}

/*
* (non-Javadoc)
* @see io.lettuce.core.pubsub.RedisPubSubListener#psubscribed(java.lang.Object, long)
*/
public void psubscribed(ByteBuffer pattern, long count) {
delegate.psubscribed(getBytes(pattern), count);
}

/*
* (non-Javadoc)
* @see io.lettuce.core.pubsub.RedisPubSubListener#unsubscribed(java.lang.Object, long)
*/
public void unsubscribed(ByteBuffer channel, long count) {
delegate.unsubscribed(getBytes(channel), count);
}

/*
* (non-Javadoc)
* @see io.lettuce.core.pubsub.RedisPubSubListener#punsubscribed(java.lang.Object, long)
*/
public void punsubscribed(ByteBuffer pattern, long count) {
delegate.punsubscribed(getBytes(pattern), count);
}

/**
* Extract a byte array from {@link ByteBuffer} without consuming it.
*
* @param byteBuffer must not be {@literal null}.
* @return
*/
private static byte[] getBytes(@Nullable ByteBuffer byteBuffer) {

if (byteBuffer == null) {
return new byte[0];
}

if (byteBuffer.hasArray()) {
return byteBuffer.array();
}

byteBuffer.mark();
byte[] bytes = new byte[byteBuffer.remaining()];
byteBuffer.get(bytes);
byteBuffer.reset();
return bytes;
}
}
Loading