Skip to content

Commit 58e7aa5

Browse files
mp911dechristophstrobl
authored andcommitted
Add support for SubscriptionListener using ReactiveRedisMessageListenerContainer.
Original Pull Request: #2052
1 parent 0f1c598 commit 58e7aa5

11 files changed

+323
-22
lines changed

src/main/java/org/springframework/data/redis/connection/ReactivePubSubCommands.java

+12-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,18 @@ public interface ReactivePubSubCommands {
3737
*
3838
* @return the subscription.
3939
*/
40-
Mono<ReactiveSubscription> createSubscription();
40+
default Mono<ReactiveSubscription> createSubscription() {
41+
return createSubscription(SubscriptionListener.EMPTY);
42+
}
43+
44+
/**
45+
* Creates a subscription for this connection. Connections can have multiple {@link ReactiveSubscription}s.
46+
*
47+
* @param subscriptionListener the subscription listener to listen for subscription confirmations.
48+
* @return the subscription.
49+
* @since 2.6
50+
*/
51+
Mono<ReactiveSubscription> createSubscription(SubscriptionListener subscriptionListener);
4152

4253
/**
4354
* Publishes the given {@code message} to the given {@code channel}.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
* Copyright 2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.data.redis.connection.lettuce;
17+
18+
import io.lettuce.core.pubsub.RedisPubSubListener;
19+
20+
import java.nio.ByteBuffer;
21+
22+
import org.springframework.lang.Nullable;
23+
import org.springframework.util.Assert;
24+
25+
/**
26+
* Wrapper around {@link RedisPubSubListener} that converts {@link ByteBuffer} into {@code byte[]}.
27+
*
28+
* @author Mark Paluch
29+
* @since 2.6
30+
*/
31+
class LettuceByteBufferPubSubListenerWrapper implements RedisPubSubListener<ByteBuffer, ByteBuffer> {
32+
33+
private final RedisPubSubListener<byte[], byte[]> delegate;
34+
35+
LettuceByteBufferPubSubListenerWrapper(RedisPubSubListener<byte[], byte[]> delegate) {
36+
37+
Assert.notNull(delegate, "RedisPubSubListener must not be null!");
38+
39+
this.delegate = delegate;
40+
}
41+
42+
/*
43+
* (non-Javadoc)
44+
* @see io.lettuce.core.pubsub.RedisPubSubListener#message(java.lang.Object, java.lang.Object)
45+
*/
46+
public void message(ByteBuffer channel, ByteBuffer message) {
47+
delegate.message(getBytes(channel), getBytes(message));
48+
}
49+
50+
/*
51+
* (non-Javadoc)
52+
* @see io.lettuce.core.pubsub.RedisPubSubListener#message(java.lang.Object, java.lang.Object, java.lang.Object)
53+
*/
54+
public void message(ByteBuffer pattern, ByteBuffer channel, ByteBuffer message) {
55+
delegate.message(getBytes(channel), getBytes(message), getBytes(pattern));
56+
}
57+
58+
/*
59+
* (non-Javadoc)
60+
* @see io.lettuce.core.pubsub.RedisPubSubListener#subscribed(java.lang.Object, long)
61+
*/
62+
public void subscribed(ByteBuffer channel, long count) {
63+
delegate.subscribed(getBytes(channel), count);
64+
}
65+
66+
/*
67+
* (non-Javadoc)
68+
* @see io.lettuce.core.pubsub.RedisPubSubListener#psubscribed(java.lang.Object, long)
69+
*/
70+
public void psubscribed(ByteBuffer pattern, long count) {
71+
delegate.psubscribed(getBytes(pattern), count);
72+
}
73+
74+
/*
75+
* (non-Javadoc)
76+
* @see io.lettuce.core.pubsub.RedisPubSubListener#unsubscribed(java.lang.Object, long)
77+
*/
78+
public void unsubscribed(ByteBuffer channel, long count) {
79+
delegate.unsubscribed(getBytes(channel), count);
80+
}
81+
82+
/*
83+
* (non-Javadoc)
84+
* @see io.lettuce.core.pubsub.RedisPubSubListener#punsubscribed(java.lang.Object, long)
85+
*/
86+
public void punsubscribed(ByteBuffer pattern, long count) {
87+
delegate.punsubscribed(getBytes(pattern), count);
88+
}
89+
90+
/**
91+
* Extract a byte array from {@link ByteBuffer} without consuming it.
92+
*
93+
* @param byteBuffer must not be {@literal null}.
94+
* @return
95+
*/
96+
private static byte[] getBytes(@Nullable ByteBuffer byteBuffer) {
97+
98+
if (byteBuffer == null) {
99+
return new byte[0];
100+
}
101+
102+
if (byteBuffer.hasArray()) {
103+
return byteBuffer.array();
104+
}
105+
106+
byteBuffer.mark();
107+
byte[] bytes = new byte[byteBuffer.remaining()];
108+
byteBuffer.get(bytes);
109+
byteBuffer.reset();
110+
return bytes;
111+
}
112+
}

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceMessageListener.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,13 @@ class LettuceMessageListener implements RedisPubSubListener<byte[], byte[]> {
3333
private final MessageListener listener;
3434
private final SubscriptionListener subscriptionListener;
3535

36-
LettuceMessageListener(MessageListener listener) {
36+
LettuceMessageListener(MessageListener listener, SubscriptionListener subscriptionListener) {
37+
3738
Assert.notNull(listener, "MessageListener must not be null!");
39+
Assert.notNull(subscriptionListener, "SubscriptionListener must not be null!");
3840

3941
this.listener = listener;
40-
this.subscriptionListener = listener instanceof SubscriptionListener ? (SubscriptionListener) listener
41-
: SubscriptionListener.EMPTY;
42+
this.subscriptionListener = subscriptionListener;
4243
}
4344

4445
/*

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactivePubSubCommands.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,11 @@
2323
import java.util.function.Function;
2424

2525
import org.reactivestreams.Publisher;
26+
2627
import org.springframework.data.redis.connection.ReactivePubSubCommands;
2728
import org.springframework.data.redis.connection.ReactiveSubscription;
2829
import org.springframework.data.redis.connection.ReactiveSubscription.ChannelMessage;
30+
import org.springframework.data.redis.connection.SubscriptionListener;
2931
import org.springframework.util.Assert;
3032

3133
/**
@@ -43,13 +45,13 @@ class LettuceReactivePubSubCommands implements ReactivePubSubCommands {
4345

4446
/*
4547
* (non-Javadoc)
46-
* @see org.springframework.data.redis.connection.ReactivePubSubCommands#createSubscription()
48+
* @see org.springframework.data.redis.connection.ReactivePubSubCommands#createSubscription(org.springframework.data.redis.connection.SubscriptionListener)
4749
*/
4850
@Override
49-
public Mono<ReactiveSubscription> createSubscription() {
51+
public Mono<ReactiveSubscription> createSubscription(SubscriptionListener listener) {
5052

5153
return connection.getPubSubConnection()
52-
.map(pubSubConnection -> new LettuceReactiveSubscription(pubSubConnection.reactive(),
54+
.map(pubSubConnection -> new LettuceReactiveSubscription(listener, pubSubConnection,
5355
connection.translateException()));
5456
}
5557

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveSubscription.java

+18-3
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package org.springframework.data.redis.connection.lettuce;
1717

18+
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
1819
import io.lettuce.core.pubsub.api.reactive.RedisPubSubReactiveCommands;
1920
import reactor.core.Disposable;
2021
import reactor.core.publisher.ConnectableFlux;
@@ -33,6 +34,7 @@
3334
import java.util.function.Supplier;
3435

3536
import org.springframework.data.redis.connection.ReactiveSubscription;
37+
import org.springframework.data.redis.connection.SubscriptionListener;
3638
import org.springframework.lang.Nullable;
3739
import org.springframework.util.Assert;
3840
import org.springframework.util.ObjectUtils;
@@ -46,15 +48,23 @@
4648
*/
4749
class LettuceReactiveSubscription implements ReactiveSubscription {
4850

51+
private final LettuceByteBufferPubSubListenerWrapper listener;
52+
private final StatefulRedisPubSubConnection<ByteBuffer, ByteBuffer> connection;
4953
private final RedisPubSubReactiveCommands<ByteBuffer, ByteBuffer> commands;
5054

5155
private final State patternState;
5256
private final State channelState;
5357

54-
LettuceReactiveSubscription(RedisPubSubReactiveCommands<ByteBuffer, ByteBuffer> commands,
58+
LettuceReactiveSubscription(SubscriptionListener subscriptionListener,
59+
StatefulRedisPubSubConnection<ByteBuffer, ByteBuffer> connection,
5560
Function<Throwable, Throwable> exceptionTranslator) {
5661

57-
this.commands = commands;
62+
this.listener = new LettuceByteBufferPubSubListenerWrapper(
63+
new LettuceMessageListener((messages, pattern) -> {}, subscriptionListener));
64+
this.connection = connection;
65+
this.commands = connection.reactive();
66+
connection.addListener(listener);
67+
5868
this.patternState = new State(exceptionTranslator);
5969
this.channelState = new State(exceptionTranslator);
6070
}
@@ -176,7 +186,12 @@ public Mono<Void> cancel() {
176186

177187
channelState.terminate();
178188
patternState.terminate();
179-
return Mono.empty();
189+
190+
// this is to ensure completion of the futures and result processing. Since we're unsubscribing first, we expect
191+
// that we receive pub/sub confirmations before the PING response.
192+
return commands.ping().then(Mono.fromRunnable(() -> {
193+
connection.removeListener(listener);
194+
}));
180195
}));
181196
}
182197

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceSubscription.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.concurrent.CompletableFuture;
2525

2626
import org.springframework.data.redis.connection.MessageListener;
27+
import org.springframework.data.redis.connection.SubscriptionListener;
2728
import org.springframework.data.redis.connection.util.AbstractSubscription;
2829

2930
/**
@@ -58,7 +59,8 @@ protected LettuceSubscription(MessageListener listener,
5859
super(listener);
5960

6061
this.connection = pubsubConnection;
61-
this.listener = new LettuceMessageListener(listener);
62+
this.listener = new LettuceMessageListener(listener,
63+
listener instanceof SubscriptionListener ? (SubscriptionListener) listener : SubscriptionListener.EMPTY);
6264
this.connectionProvider = connectionProvider;
6365
this.pubsub = connection.sync();
6466
this.pubSubAsync = connection.async();

src/main/java/org/springframework/data/redis/listener/ReactiveRedisMessageListenerContainer.java

+50-4
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.springframework.data.redis.connection.ReactiveSubscription.ChannelMessage;
4141
import org.springframework.data.redis.connection.ReactiveSubscription.Message;
4242
import org.springframework.data.redis.connection.ReactiveSubscription.PatternMessage;
43+
import org.springframework.data.redis.connection.SubscriptionListener;
4344
import org.springframework.data.redis.serializer.RedisElementReader;
4445
import org.springframework.data.redis.serializer.RedisSerializationContext.SerializationPair;
4546
import org.springframework.data.redis.serializer.RedisSerializer;
@@ -178,21 +179,65 @@ public Flux<PatternMessage<String, String, String>> receive(PatternTopic... patt
178179
}
179180

180181
/**
181-
* Subscribe to one or more {@link Topic}s and receive a stream of {@link ChannelMessage} The stream may contain
182+
* Subscribe to one or more {@link Topic}s and receive a stream of {@link ChannelMessage}. The stream may contain
182183
* {@link PatternMessage} if subscribed to patterns. Messages, and channel names are serialized/deserialized using the
183184
* given {@code channelSerializer} and {@code messageSerializer}. The message stream subscribes lazily to the Redis
184185
* channels and unsubscribes if the {@link org.reactivestreams.Subscription} is
185186
* {@link org.reactivestreams.Subscription#cancel() cancelled}.
186187
*
187-
* @param topics the channels to subscribe.
188+
* @param topics the channels/patterns to subscribe.
189+
* @param subscriptionListener listener to receive subscription/unsubscription notifications.
190+
* @return the message stream.
191+
* @throws InvalidDataAccessApiUsageException if {@code patternTopics} is empty.
192+
* @see #receive(Iterable, SerializationPair, SerializationPair)
193+
* @since 2.6
194+
*/
195+
public Flux<Message<String, String>> receive(Iterable<? extends Topic> topics,
196+
SubscriptionListener subscriptionListener) {
197+
return receive(topics, stringSerializationPair, stringSerializationPair, subscriptionListener);
198+
}
199+
200+
/**
201+
* Subscribe to one or more {@link Topic}s and receive a stream of {@link ChannelMessage}. The stream may contain
202+
* {@link PatternMessage} if subscribed to patterns. Messages, and channel names are serialized/deserialized using the
203+
* given {@code channelSerializer} and {@code messageSerializer}. The message stream subscribes lazily to the Redis
204+
* channels and unsubscribes if the {@link org.reactivestreams.Subscription} is
205+
* {@link org.reactivestreams.Subscription#cancel() cancelled}.
206+
*
207+
* @param topics the channels/patterns to subscribe.
188208
* @return the message stream.
189209
* @see #receive(Iterable, SerializationPair, SerializationPair)
190210
* @throws InvalidDataAccessApiUsageException if {@code topics} is empty.
191211
*/
192212
public <C, B> Flux<Message<C, B>> receive(Iterable<? extends Topic> topics, SerializationPair<C> channelSerializer,
193213
SerializationPair<B> messageSerializer) {
214+
return receive(topics, channelSerializer, messageSerializer, SubscriptionListener.EMPTY);
215+
}
216+
217+
/**
218+
* Subscribe to one or more {@link Topic}s and receive a stream of {@link ChannelMessage}. The stream may contain
219+
* {@link PatternMessage} if subscribed to patterns. Messages, and channel names are serialized/deserialized using the
220+
* given {@code channelSerializer} and {@code messageSerializer}. The message stream subscribes lazily to the Redis
221+
* channels and unsubscribes if the {@link org.reactivestreams.Subscription} is
222+
* {@link org.reactivestreams.Subscription#cancel() cancelled}. {@link SubscriptionListener} is notified upon
223+
* subscription/unsubscription and can be used for synchronization.
224+
*
225+
* @param topics the channels to subscribe.
226+
* @param channelSerializer
227+
* @param messageSerializer
228+
* @param subscriptionListener listener to receive subscription/unsubscription notifications.
229+
* @return the message stream.
230+
* @see #receive(Iterable, SerializationPair, SerializationPair)
231+
* @throws InvalidDataAccessApiUsageException if {@code topics} is empty.
232+
* @since 2.6
233+
*/
234+
public <C, B> Flux<Message<C, B>> receive(Iterable<? extends Topic> topics, SerializationPair<C> channelSerializer,
235+
SerializationPair<B> messageSerializer, SubscriptionListener subscriptionListener) {
194236

195237
Assert.notNull(topics, "Topics must not be null!");
238+
Assert.notNull(channelSerializer, "Channel serializer must not be null!");
239+
Assert.notNull(messageSerializer, "Message serializer must not be null!");
240+
Assert.notNull(subscriptionListener, "SubscriptionListener must not be null!");
196241

197242
verifyConnection();
198243

@@ -203,7 +248,8 @@ public <C, B> Flux<Message<C, B>> receive(Iterable<? extends Topic> topics, Seri
203248
throw new InvalidDataAccessApiUsageException("No channels or patterns to subscribe to.");
204249
}
205250

206-
return doReceive(channelSerializer, messageSerializer, connection.pubSubCommands().createSubscription(), patterns,
251+
return doReceive(channelSerializer, messageSerializer,
252+
connection.pubSubCommands().createSubscription(subscriptionListener), patterns,
207253
channels);
208254
}
209255

@@ -226,7 +272,7 @@ private <C, B> Flux<Message<C, B>> doReceive(SerializationPair<C> channelSeriali
226272
Subscribers subscribers = getSubscribers(it);
227273
if (subscribers.unregister()) {
228274
subscriptions.remove(it);
229-
it.unsubscribe().subscribe(v -> terminalProcessor.onComplete(), terminalProcessor::onError);
275+
it.cancel().subscribe(v -> terminalProcessor.onComplete(), terminalProcessor::onError);
230276
}
231277
}).mergeWith(terminalProcessor);
232278
});

src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveSubscriptionUnitTests.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static org.springframework.data.redis.util.ByteUtils.*;
2222

2323
import io.lettuce.core.RedisConnectionException;
24+
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
2425
import io.lettuce.core.pubsub.api.reactive.RedisPubSubReactiveCommands;
2526
import reactor.core.Disposable;
2627
import reactor.core.publisher.DirectProcessor;
@@ -40,6 +41,7 @@
4041
import org.springframework.data.redis.RedisSystemException;
4142
import org.springframework.data.redis.connection.ReactiveSubscription.Message;
4243
import org.springframework.data.redis.connection.ReactiveSubscription.PatternMessage;
44+
import org.springframework.data.redis.connection.SubscriptionListener;
4345

4446
/**
4547
* Unit tests for {@link LettuceReactiveSubscription}.
@@ -52,11 +54,14 @@ class LettuceReactiveSubscriptionUnitTests {
5254

5355
private LettuceReactiveSubscription subscription;
5456

57+
@Mock StatefulRedisPubSubConnection<ByteBuffer, ByteBuffer> connectionMock;
5558
@Mock RedisPubSubReactiveCommands<ByteBuffer, ByteBuffer> commandsMock;
5659

5760
@BeforeEach
5861
void before() {
59-
subscription = new LettuceReactiveSubscription(commandsMock, e -> new RedisSystemException(e.getMessage(), e));
62+
when(connectionMock.reactive()).thenReturn(commandsMock);
63+
subscription = new LettuceReactiveSubscription(mock(SubscriptionListener.class), connectionMock,
64+
e -> new RedisSystemException(e.getMessage(), e));
6065
}
6166

6267
@Test // DATAREDIS-612

src/test/java/org/springframework/data/redis/core/ReactiveRedisTemplateUnitTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ void listenToShouldSubscribeToChannel() {
7171

7272
when(connectionMock.pubSubCommands()).thenReturn(pubSubCommands);
7373
when(pubSubCommands.subscribe(any())).thenReturn(Mono.empty());
74-
when(pubSubCommands.createSubscription()).thenReturn(Mono.just(subscription));
74+
when(pubSubCommands.createSubscription(any())).thenReturn(Mono.just(subscription));
7575
when(subscription.receive()).thenReturn(Flux.create(sink -> {}));
7676

7777
ReactiveRedisTemplate<String, String> template = new ReactiveRedisTemplate<>(connectionFactoryMock,

0 commit comments

Comments
 (0)