Skip to content

Commit c881597

Browse files
committed
Update RedisOperations#convertAndSend to return number of clients receiving the message.
Currently, `RedisOperations#convertAndSend` is void event though the underlying `RedisPubSubCommands#publish` returns a `Long` that represents the number of clients receiving the published message (see https://redis.io/commands/publish). This commit updates `RedisOperations#convertAndSend` to return the number of clients that received the message. Closes spring-projects#2209
1 parent be1663c commit c881597

File tree

6 files changed

+54
-54
lines changed

6 files changed

+54
-54
lines changed

src/main/java/org/springframework/data/redis/core/RedisOperations.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2011-2021 the original author or authors.
2+
* Copyright 2011-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -45,6 +45,7 @@
4545
* @author Mark Paluch
4646
* @author ihaohong
4747
* @author Todd Merrill
48+
* @author Vedran Pavic
4849
*/
4950
public interface RedisOperations<K, V> {
5051

@@ -580,9 +581,11 @@ default void restore(K key, byte[] value, long timeToLive, TimeUnit unit) {
580581
*
581582
* @param destination the channel to publish to, must not be {@literal null}.
582583
* @param message message to publish
584+
* @return the number of clients that received the message
583585
* @see <a href="https://redis.io/commands/publish">Redis Documentation: PUBLISH</a>
584586
*/
585-
void convertAndSend(String destination, Object message);
587+
@Nullable
588+
Long convertAndSend(String destination, Object message);
586589

587590
// -------------------------------------------------------------------------
588591
// Methods to obtain specific operations interface objects.

src/main/java/org/springframework/data/redis/core/RedisTemplate.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2011-2021 the original author or authors.
2+
* Copyright 2011-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -83,6 +83,7 @@
8383
* @author Mark Paluch
8484
* @author Denis Zavedeev
8585
* @author ihaohong
86+
* @author Vedran Pavic
8687
* @param <K> the Redis key type against which the template works (usually a String)
8788
* @param <V> the Redis value type against which the template works
8889
* @see StringRedisTemplate
@@ -837,17 +838,14 @@ public Boolean expireAt(K key, final Date date) {
837838
* @see org.springframework.data.redis.core.RedisOperations#convertAndSend(java.lang.String, java.lang.Object)
838839
*/
839840
@Override
840-
public void convertAndSend(String channel, Object message) {
841+
public Long convertAndSend(String channel, Object message) {
841842

842843
Assert.hasText(channel, "a non-empty channel is required");
843844

844845
byte[] rawChannel = rawString(channel);
845846
byte[] rawMessage = rawValue(message);
846847

847-
execute(connection -> {
848-
connection.publish(rawChannel, rawMessage);
849-
return null;
850-
}, true);
848+
return execute(connection -> connection.publish(rawChannel, rawMessage), true);
851849
}
852850

853851
//

src/test/java/org/springframework/data/redis/config/NamespaceIntegrationTests.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2011-2021 the original author or authors.
2+
* Copyright 2011-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -43,8 +43,8 @@ void testSanityTest() throws Exception {
4343
}
4444

4545
@Test
46-
void testWithMessages() throws Exception {
47-
template.convertAndSend("x1", "[X]test");
48-
template.convertAndSend("z1", "[Z]test");
46+
void testWithMessages() {
47+
assertThat(template.convertAndSend("x1", "[X]test")).isEqualTo(1L);
48+
assertThat(template.convertAndSend("z1", "[Z]test")).isEqualTo(1L);
4949
}
5050
}

src/test/java/org/springframework/data/redis/core/RedisTemplateIntegrationTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2013-2021 the original author or authors.
2+
* Copyright 2013-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -797,10 +797,10 @@ public List<Object> execute(RedisOperations operations) throws DataAccessExcepti
797797
}
798798

799799
@ParameterizedRedisTest
800-
public void testConvertAndSend() {
800+
void testConvertAndSend() {
801801
V value1 = valueFactory.instance();
802802
// Make sure basic message sent without Exception on serialization
803-
redisTemplate.convertAndSend("Channel", value1);
803+
assertThat(redisTemplate.convertAndSend("Channel", value1)).isEqualTo(0L);
804804
}
805805

806806
@ParameterizedRedisTest

src/test/java/org/springframework/data/redis/listener/PubSubResubscribeTests.java

Lines changed: 28 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2011-2021 the original author or authors.
2+
* Copyright 2011-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -15,6 +15,7 @@
1515
*/
1616
package org.springframework.data.redis.listener;
1717

18+
import static org.assertj.core.api.Assertions.*;
1819
import static org.awaitility.Awaitility.*;
1920
import static org.junit.Assume.*;
2021

@@ -54,6 +55,7 @@
5455
* @author Jennifer Hickey
5556
* @author Christoph Strobl
5657
* @author Mark Paluch
58+
* @author Vedran Pavic
5759
*/
5860
@MethodSource("testParams")
5961
@EnabledIfLongRunningTest
@@ -130,7 +132,7 @@ void tearDown() {
130132

131133
@ParameterizedRedisTest
132134
@EnabledIfLongRunningTest
133-
void testContainerPatternResubscribe() throws Exception {
135+
void testContainerPatternResubscribe() {
134136

135137
String payload1 = "do";
136138
String payload2 = "re mi";
@@ -147,22 +149,22 @@ void testContainerPatternResubscribe() throws Exception {
147149
container.addMessageListener(anotherListener, new PatternTopic(PATTERN));
148150

149151
// Wait for async subscription tasks to setup
150-
Thread.sleep(400);
151-
152-
// test no messages are sent just to patterns
153-
template.convertAndSend(CHANNEL, payload1);
154-
template.convertAndSend(ANOTHER_CHANNEL, payload2);
152+
await().atMost(Duration.ofMillis(600)).untilAsserted(() -> {
153+
// test no messages are sent just to patterns
154+
assertThat(template.convertAndSend(CHANNEL, payload1)).isEqualTo(1L);
155+
assertThat(template.convertAndSend(ANOTHER_CHANNEL, payload2)).isEqualTo(1L);
156+
});
155157

156158
await().atMost(Duration.ofSeconds(2)).until(() -> bag2.contains(payload1) && bag2.contains(payload2));
157159

158160
// bind original listener on another channel
159161
container.addMessageListener(adapter, new ChannelTopic(ANOTHER_CHANNEL));
160162

161163
// Wait for async subscription tasks to setup
162-
Thread.sleep(400);
163-
164-
template.convertAndSend(CHANNEL, payload1);
165-
template.convertAndSend(ANOTHER_CHANNEL, payload2);
164+
await().atMost(Duration.ofMillis(400)).untilAsserted(() -> {
165+
assertThat(template.convertAndSend(CHANNEL, payload1)).isEqualTo(1L);
166+
assertThat(template.convertAndSend(ANOTHER_CHANNEL, payload2)).isEqualTo(2L);
167+
});
166168

167169
await().atMost(Duration.ofSeconds(2)).until(() -> bag.contains(payload2));
168170

@@ -171,7 +173,7 @@ void testContainerPatternResubscribe() throws Exception {
171173
}
172174

173175
@ParameterizedRedisTest
174-
void testContainerChannelResubscribe() throws Exception {
176+
void testContainerChannelResubscribe() {
175177

176178
String payload1 = "do";
177179
String payload2 = "re mi";
@@ -187,27 +189,25 @@ void testContainerChannelResubscribe() throws Exception {
187189

188190
// timing: There's currently no other way to synchronize
189191
// than to hope the subscribe/unsubscribe are executed within the time.
190-
Thread.sleep(400);
191-
192-
// Listener removed from channel
193-
template.convertAndSend(CHANNEL, payload1);
194-
template.convertAndSend(CHANNEL, payload2);
195-
196-
// Listener receives messages on another channel
197-
template.convertAndSend(ANOTHER_CHANNEL, anotherPayload1);
198-
template.convertAndSend(ANOTHER_CHANNEL, anotherPayload2);
192+
await().atMost(Duration.ofMillis(400)).untilAsserted(() -> {
193+
// Listener removed from channel
194+
assertThat(template.convertAndSend(CHANNEL, payload1)).isEqualTo(0L);
195+
assertThat(template.convertAndSend(CHANNEL, payload2)).isEqualTo(0L);
196+
197+
// Listener receives messages on another channel
198+
assertThat(template.convertAndSend(ANOTHER_CHANNEL, anotherPayload1)).isEqualTo(1L);
199+
assertThat(template.convertAndSend(ANOTHER_CHANNEL, anotherPayload2)).isEqualTo(1L);
200+
});
199201

200202
await().atMost(Duration.ofSeconds(2)).until(() -> bag.contains(anotherPayload1) && bag.contains(anotherPayload2));
201203
}
202204

203205
/**
204206
* Validates the behavior of {@link RedisMessageListenerContainer} when it needs to spin up a thread executing its
205207
* PatternSubscriptionTask
206-
*
207-
* @throws Exception
208208
*/
209209
@ParameterizedRedisTest
210-
void testInitializeContainerWithMultipleTopicsIncludingPattern() throws Exception {
210+
void testInitializeContainerWithMultipleTopicsIncludingPattern() {
211211

212212
assumeFalse(isClusterAware(template.getConnectionFactory()));
213213

@@ -223,10 +223,10 @@ void testInitializeContainerWithMultipleTopicsIncludingPattern() throws Exceptio
223223

224224
// timing: There's currently no other way to synchronize
225225
// than to hope the subscribe/unsubscribe are executed within the time.
226-
Thread.sleep(250);
227-
228-
template.convertAndSend("somechannel", "HELLO");
229-
template.convertAndSend(uniqueChannel, "WORLD");
226+
await().atMost(Duration.ofMillis(250)).untilAsserted(() -> {
227+
assertThat(template.convertAndSend("somechannel", "HELLO")).isEqualTo(1L);
228+
assertThat(template.convertAndSend(uniqueChannel, "WORLD")).isEqualTo(1L);
229+
});
230230

231231
await().atMost(Duration.ofSeconds(2)).until(() -> bag.contains("HELLO") && bag.contains("WORLD"));
232232
}

src/test/java/org/springframework/data/redis/listener/PubSubTests.java

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2011-2021 the original author or authors.
2+
* Copyright 2011-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -50,6 +50,7 @@
5050
* @author Costin Leau
5151
* @author Jennifer Hickey
5252
* @author Mark Paluch
53+
* @author Vedran Pavic
5354
*/
5455
@MethodSource("testParams")
5556
public class PubSubTests<T> {
@@ -125,14 +126,13 @@ T getT() {
125126
return factory.instance();
126127
}
127128

128-
@SuppressWarnings("unchecked")
129129
@ParameterizedRedisTest
130-
void testContainerSubscribe() throws Exception {
130+
void testContainerSubscribe() {
131131
T payload1 = getT();
132132
T payload2 = getT();
133133

134-
template.convertAndSend(CHANNEL, payload1);
135-
template.convertAndSend(CHANNEL, payload2);
134+
assertThat(template.convertAndSend(CHANNEL, payload1)).isEqualTo(1L);
135+
assertThat(template.convertAndSend(CHANNEL, payload2)).isEqualTo(1L);
136136

137137
await().atMost(Duration.ofSeconds(2)).until(() -> bag.contains(payload1) && bag.contains(payload2));
138138
}
@@ -141,7 +141,7 @@ void testContainerSubscribe() throws Exception {
141141
void testMessageBatch() throws Exception {
142142
int COUNT = 10;
143143
for (int i = 0; i < COUNT; i++) {
144-
template.convertAndSend(CHANNEL, getT());
144+
assertThat(template.convertAndSend(CHANNEL, getT())).isEqualTo(1L);
145145
}
146146

147147
for (int i = 0; i < COUNT; i++) {
@@ -156,8 +156,8 @@ void testContainerUnsubscribe() throws Exception {
156156
T payload2 = getT();
157157

158158
container.removeMessageListener(adapter, new ChannelTopic(CHANNEL));
159-
template.convertAndSend(CHANNEL, payload1);
160-
template.convertAndSend(CHANNEL, payload2);
159+
assertThat(template.convertAndSend(CHANNEL, payload1)).isEqualTo(1L);
160+
assertThat(template.convertAndSend(CHANNEL, payload2)).isEqualTo(1L);
161161

162162
assertThat(bag.poll(200, TimeUnit.MILLISECONDS)).isNull();
163163
}
@@ -170,9 +170,8 @@ void testStartNoListeners() {
170170
container.start();
171171
}
172172

173-
@SuppressWarnings("unchecked")
174173
@ParameterizedRedisTest // DATAREDIS-251
175-
void testStartListenersToNoSpecificChannelTest() throws InterruptedException {
174+
void testStartListenersToNoSpecificChannelTest() {
176175

177176
assumeThat(isClusterAware(template.getConnectionFactory())).isFalse();
178177
assumeThat(ConnectionUtils.isJedis(template.getConnectionFactory())).isTrue();
@@ -186,7 +185,7 @@ void testStartListenersToNoSpecificChannelTest() throws InterruptedException {
186185

187186
T payload = getT();
188187

189-
template.convertAndSend(CHANNEL, payload);
188+
assertThat(template.convertAndSend(CHANNEL, payload)).isEqualTo(1L);
190189

191190
await().atMost(Duration.ofSeconds(2)).until(() -> bag.contains(payload));
192191
}

0 commit comments

Comments
 (0)