Skip to content

Commit 9480851

Browse files
mp911dechristophstrobl
authored andcommitted
Add ReactiveRedisOperations.listenToLater(…) to await subscriptions.
Original Pull Request: #2052
1 parent fe51d78 commit 9480851

File tree

4 files changed

+111
-4
lines changed

4 files changed

+111
-4
lines changed

src/main/java/org/springframework/data/redis/core/ReactiveRedisOperations.java

+43
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,49 @@ default Flux<? extends Message<String, V>> listenToPattern(String... patterns) {
116116
*/
117117
Flux<? extends Message<String, V>> listenTo(Topic... topics);
118118

119+
/**
120+
* Subscribe to the given Redis {@code channels} and emit {@link Message messages} received for those. The
121+
* {@link Mono} completes once the {@link Topic topic} subscriptions are registered.
122+
*
123+
* @param channels must not be {@literal null}.
124+
* @return a hot sequence of {@link Message messages}.
125+
* @since 2.6
126+
* @see org.springframework.data.redis.listener.ReactiveRedisMessageListenerContainer#receiveLater(ChannelTopic...)
127+
*/
128+
default Mono<Flux<? extends Message<String, V>>> listenToChannelLater(String... channels) {
129+
130+
Assert.notNull(channels, "Channels must not be null!");
131+
132+
return listenToLater(Arrays.stream(channels).map(ChannelTopic::of).toArray(ChannelTopic[]::new));
133+
}
134+
135+
/**
136+
* Subscribe to the Redis channels matching the given {@code pattern} and emit {@link Message messages} received for
137+
* those. The {@link Mono} completes once the {@link Topic topic} subscriptions are registered.
138+
*
139+
* @param patterns must not be {@literal null}.
140+
* @return a hot sequence of {@link Message messages}.
141+
* @since 2.6
142+
* @see org.springframework.data.redis.listener.ReactiveRedisMessageListenerContainer#receiveLater(PatternTopic...)
143+
*/
144+
default Mono<Flux<? extends Message<String, V>>> listenToPatternLater(String... patterns) {
145+
146+
Assert.notNull(patterns, "Patterns must not be null!");
147+
return listenToLater(Arrays.stream(patterns).map(PatternTopic::of).toArray(PatternTopic[]::new));
148+
}
149+
150+
/**
151+
* Subscribe to the Redis channels for the given {@link Topic topics} and emit {@link Message messages} received for
152+
* those. The {@link Mono} completes once the {@link Topic topic} subscriptions are registered.
153+
*
154+
* @param topics must not be {@literal null}.
155+
* @return a hot sequence of {@link Message messages}.
156+
* @since 2.6
157+
* @see org.springframework.data.redis.listener.ReactiveRedisMessageListenerContainer#receiveLater(Iterable,
158+
* RedisSerializationContext.SerializationPair, RedisSerializationContext.SerializationPair)
159+
*/
160+
Mono<Flux<? extends Message<String, V>>> listenToLater(Topic... topics);
161+
119162
// -------------------------------------------------------------------------
120163
// Methods dealing with Redis Keys
121164
// -------------------------------------------------------------------------

src/main/java/org/springframework/data/redis/core/ReactiveRedisTemplate.java

+15
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,21 @@ public Flux<? extends Message<String, V>> listenTo(Topic... topics) {
237237
.doFinally((signalType) -> container.destroyLater().subscribe());
238238
}
239239

240+
/*
241+
* (non-Javadoc)
242+
* @see org.springframework.data.redis.core.ReactiveRedisOperations#listenToLater(org.springframework.data.redis.listener.Topic[])
243+
*/
244+
@Override
245+
@SuppressWarnings({ "unchecked", "rawtypes" })
246+
public Mono<Flux<? extends Message<String, V>>> listenToLater(Topic... topics) {
247+
248+
ReactiveRedisMessageListenerContainer container = new ReactiveRedisMessageListenerContainer(getConnectionFactory());
249+
250+
return (Mono) container.receiveLater(Arrays.asList(topics), getSerializationContext().getStringSerializationPair(),
251+
getSerializationContext().getValueSerializationPair()) //
252+
.doFinally((signalType) -> container.destroyLater().subscribe());
253+
}
254+
240255
// -------------------------------------------------------------------------
241256
// Methods dealing with Redis keys
242257
// -------------------------------------------------------------------------

src/main/java/org/springframework/data/redis/listener/ReactiveRedisMessageListenerContainer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -343,7 +343,7 @@ private <C, B> Flux<Message<C, B>> doReceive(SerializationPair<C> channelSeriali
343343
* @throws InvalidDataAccessApiUsageException if {@code topics} is empty.
344344
* @since 2.6
345345
*/
346-
private <C, B> Mono<Flux<Message<C, B>>> receiveLater(Iterable<? extends Topic> topics,
346+
public <C, B> Mono<Flux<Message<C, B>>> receiveLater(Iterable<? extends Topic> topics,
347347
SerializationPair<C> channelSerializer, SerializationPair<B> messageSerializer) {
348348

349349
Assert.notNull(topics, "Topics must not be null!");

src/test/java/org/springframework/data/redis/core/ReactiveRedisTemplateIntegrationTests.java

+52-3
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static org.assertj.core.api.Assumptions.*;
2020

2121
import reactor.core.publisher.Flux;
22+
import reactor.core.publisher.Mono;
2223
import reactor.test.StepVerifier;
2324

2425
import java.time.Duration;
@@ -27,6 +28,7 @@
2728
import java.util.Collections;
2829
import java.util.HashMap;
2930
import java.util.Map;
31+
import java.util.function.Function;
3032

3133
import org.junit.jupiter.api.BeforeEach;
3234

@@ -441,7 +443,29 @@ void listenToChannelShouldReceiveChannelMessagesCorrectly() throws InterruptedEx
441443

442444
redisTemplate.listenToChannel(channel).as(StepVerifier::create) //
443445
.thenAwait(Duration.ofMillis(500)) // just make sure we the subscription completed
444-
.then(() -> redisTemplate.convertAndSend(channel, message).block()) //
446+
.then(() -> redisTemplate.convertAndSend(channel, message).subscribe()) //
447+
.assertNext(received -> {
448+
449+
assertThat(received).isInstanceOf(ChannelMessage.class);
450+
assertThat(received.getMessage()).isEqualTo(message);
451+
assertThat(received.getChannel()).isEqualTo(channel);
452+
}) //
453+
.thenAwait(Duration.ofMillis(10)) //
454+
.thenCancel() //
455+
.verify(Duration.ofSeconds(3));
456+
}
457+
458+
@ParameterizedRedisTest // GH-1622
459+
@EnabledIfLongRunningTest
460+
void listenToLaterChannelShouldReceiveChannelMessagesCorrectly() {
461+
462+
String channel = "my-channel";
463+
464+
V message = valueFactory.instance();
465+
466+
redisTemplate.listenToChannelLater(channel) //
467+
.doOnNext(it -> redisTemplate.convertAndSend(channel, message).subscribe()).flatMapMany(Function.identity()) //
468+
.as(StepVerifier::create) //
445469
.assertNext(received -> {
446470

447471
assertThat(received).isInstanceOf(ChannelMessage.class);
@@ -455,7 +479,7 @@ void listenToChannelShouldReceiveChannelMessagesCorrectly() throws InterruptedEx
455479

456480
@ParameterizedRedisTest // DATAREDIS-612
457481
@EnabledIfLongRunningTest
458-
void listenToChannelPatternShouldReceiveChannelMessagesCorrectly() throws InterruptedException {
482+
void listenToPatternShouldReceiveChannelMessagesCorrectly() {
459483

460484
String channel = "my-channel";
461485
String pattern = "my-*";
@@ -466,7 +490,32 @@ void listenToChannelPatternShouldReceiveChannelMessagesCorrectly() throws Interr
466490

467491
stream.as(StepVerifier::create) //
468492
.thenAwait(Duration.ofMillis(500)) // just make sure we the subscription completed
469-
.then(() -> redisTemplate.convertAndSend(channel, message).block()) //
493+
.then(() -> redisTemplate.convertAndSend(channel, message).subscribe()) //
494+
.assertNext(received -> {
495+
496+
assertThat(received).isInstanceOf(PatternMessage.class);
497+
assertThat(received.getMessage()).isEqualTo(message);
498+
assertThat(received.getChannel()).isEqualTo(channel);
499+
assertThat(((PatternMessage) received).getPattern()).isEqualTo(pattern);
500+
}) //
501+
.thenCancel() //
502+
.verify(Duration.ofSeconds(3));
503+
}
504+
505+
@ParameterizedRedisTest // GH-1622
506+
@EnabledIfLongRunningTest
507+
void listenToPatternLaterShouldReceiveChannelMessagesCorrectly() {
508+
509+
String channel = "my-channel";
510+
String pattern = "my-*";
511+
512+
V message = valueFactory.instance();
513+
514+
Mono<Flux<? extends Message<String, V>>> stream = redisTemplate.listenToPatternLater(pattern);
515+
516+
stream.doOnNext(it -> redisTemplate.convertAndSend(channel, message).subscribe()) //
517+
.flatMapMany(Function.identity()) //
518+
.as(StepVerifier::create) //
470519
.assertNext(received -> {
471520

472521
assertThat(received).isInstanceOf(PatternMessage.class);

0 commit comments

Comments
 (0)