Skip to content

Commit 178d44c

Browse files
vpavicmp911de
authored andcommitted
Update RedisOperations#convertAndSend to return number of clients receiving the message.
Currently, `RedisOperations#convertAndSend` is void event though the underlying `RedisPubSubCommands#publish` returns a `Long` that represents the number of clients receiving the published message (see https://redis.io/commands/publish). This commit updates `RedisOperations#convertAndSend` to return the number of clients that received the message. Closes #2209 Original pull request: #2225.
1 parent d2cae75 commit 178d44c

File tree

7 files changed

+50
-45
lines changed

7 files changed

+50
-45
lines changed

src/main/asciidoc/reference/redis-messaging.adoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ byte[] msg = ...
2424
byte[] channel = ...
2525
con.publish(msg, channel); // send message through RedisTemplate
2626
RedisTemplate template = ...
27-
template.convertAndSend("hello!", "world");
27+
Long numberOfClients = template.convertAndSend("hello!", "world");
2828
----
2929

3030
[[redis:pubsub:subscribe]]

src/main/java/org/springframework/data/redis/core/RedisOperations.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
* @author ihaohong
4747
* @author Todd Merrill
4848
* @author Chen Li
49+
* @author Vedran Pavic
4950
*/
5051
public interface RedisOperations<K, V> {
5152

@@ -591,9 +592,11 @@ default void restore(K key, byte[] value, long timeToLive, TimeUnit unit) {
591592
*
592593
* @param destination the channel to publish to, must not be {@literal null}.
593594
* @param message message to publish
595+
* @return the number of clients that received the message
594596
* @see <a href="https://redis.io/commands/publish">Redis Documentation: PUBLISH</a>
595597
*/
596-
void convertAndSend(String destination, Object message);
598+
@Nullable
599+
Long convertAndSend(String destination, Object message);
597600

598601
// -------------------------------------------------------------------------
599602
// Methods to obtain specific operations interface objects.

src/main/java/org/springframework/data/redis/core/RedisTemplate.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@
8686
* @author Denis Zavedeev
8787
* @author ihaohong
8888
* @author Chen Li
89+
* @author Vedran Pavic
8990
* @param <K> the Redis key type against which the template works (usually a String)
9091
* @param <V> the Redis value type against which the template works
9192
* @see StringRedisTemplate
@@ -942,14 +943,14 @@ public void replicaOfNoOne() {
942943
}
943944

944945
@Override
945-
public void convertAndSend(String channel, Object message) {
946+
public Long convertAndSend(String channel, Object message) {
946947

947948
Assert.hasText(channel, "a non-empty channel is required");
948949

949950
byte[] rawChannel = rawString(channel);
950951
byte[] rawMessage = rawValue(message);
951952

952-
executeWithoutResult(connection -> connection.publish(rawChannel, rawMessage));
953+
return execute(connection -> connection.publish(rawChannel, rawMessage), true);
953954
}
954955

955956
private void executeWithoutResult(Consumer<RedisConnection> action) {

src/test/java/org/springframework/data/redis/config/NamespaceIntegrationTests.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
/**
2828
* @author Costin Leau
2929
* @author Mark Paluch
30+
* @author Vedran Pavic
3031
*/
3132
@SpringJUnitConfig(locations = "namespace.xml")
3233
class NamespaceIntegrationTests {
@@ -43,8 +44,8 @@ void testSanityTest() throws Exception {
4344
}
4445

4546
@Test
46-
void testWithMessages() throws Exception {
47-
template.convertAndSend("x1", "[X]test");
48-
template.convertAndSend("z1", "[Z]test");
47+
void testWithMessages() {
48+
assertThat(template.convertAndSend("x1", "[X]test")).isEqualTo(1L);
49+
assertThat(template.convertAndSend("z1", "[Z]test")).isEqualTo(1L);
4950
}
5051
}

src/test/java/org/springframework/data/redis/core/RedisTemplateIntegrationTests.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
* @author ihaohong
6464
* @author Hendrik Duerkop
6565
* @author Chen Li
66+
* @author Vedran Pavic
6667
*/
6768
@MethodSource("testParams")
6869
public class RedisTemplateIntegrationTests<K, V> {
@@ -815,10 +816,10 @@ public List<Object> execute(RedisOperations operations) throws DataAccessExcepti
815816
}
816817

817818
@ParameterizedRedisTest
818-
public void testConvertAndSend() {
819+
void testConvertAndSend() {
819820
V value1 = valueFactory.instance();
820821
// Make sure basic message sent without Exception on serialization
821-
redisTemplate.convertAndSend("Channel", value1);
822+
assertThat(redisTemplate.convertAndSend("Channel", value1)).isEqualTo(0L);
822823
}
823824

824825
@ParameterizedRedisTest

src/test/java/org/springframework/data/redis/listener/PubSubResubscribeTests.java

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package org.springframework.data.redis.listener;
1717

18+
import static org.assertj.core.api.Assertions.*;
1819
import static org.awaitility.Awaitility.*;
1920
import static org.junit.Assume.*;
2021

@@ -53,6 +54,7 @@
5354
* @author Jennifer Hickey
5455
* @author Christoph Strobl
5556
* @author Mark Paluch
57+
* @author Vedran Pavic
5658
*/
5759
@MethodSource("testParams")
5860
@EnabledIfLongRunningTest
@@ -126,7 +128,7 @@ void tearDown() {
126128

127129
@ParameterizedRedisTest
128130
@EnabledIfLongRunningTest
129-
void testContainerPatternResubscribe() throws Exception {
131+
void testContainerPatternResubscribe() {
130132

131133
String payload1 = "do";
132134
String payload2 = "re mi";
@@ -143,22 +145,22 @@ void testContainerPatternResubscribe() throws Exception {
143145
container.addMessageListener(anotherListener, new PatternTopic(PATTERN));
144146

145147
// Wait for async subscription tasks to setup
146-
Thread.sleep(400);
147-
148-
// test no messages are sent just to patterns
149-
template.convertAndSend(CHANNEL, payload1);
150-
template.convertAndSend(ANOTHER_CHANNEL, payload2);
148+
await().atMost(Duration.ofMillis(600)).untilAsserted(() -> {
149+
// test no messages are sent just to patterns
150+
assertThat(template.convertAndSend(CHANNEL, payload1)).isEqualTo(1L);
151+
assertThat(template.convertAndSend(ANOTHER_CHANNEL, payload2)).isEqualTo(1L);
152+
});
151153

152154
await().atMost(Duration.ofSeconds(2)).until(() -> bag2.contains(payload1) && bag2.contains(payload2));
153155

154156
// bind original listener on another channel
155157
container.addMessageListener(adapter, new ChannelTopic(ANOTHER_CHANNEL));
156158

157159
// Wait for async subscription tasks to setup
158-
Thread.sleep(400);
159-
160-
template.convertAndSend(CHANNEL, payload1);
161-
template.convertAndSend(ANOTHER_CHANNEL, payload2);
160+
await().atMost(Duration.ofMillis(400)).untilAsserted(() -> {
161+
assertThat(template.convertAndSend(CHANNEL, payload1)).isEqualTo(1L);
162+
assertThat(template.convertAndSend(ANOTHER_CHANNEL, payload2)).isEqualTo(2L);
163+
});
162164

163165
await().atMost(Duration.ofSeconds(2)).until(() -> bag.contains(payload2));
164166

@@ -167,7 +169,7 @@ void testContainerPatternResubscribe() throws Exception {
167169
}
168170

169171
@ParameterizedRedisTest
170-
void testContainerChannelResubscribe() throws Exception {
172+
void testContainerChannelResubscribe() {
171173

172174
String payload1 = "do";
173175
String payload2 = "re mi";
@@ -183,27 +185,25 @@ void testContainerChannelResubscribe() throws Exception {
183185

184186
// timing: There's currently no other way to synchronize
185187
// than to hope the subscribe/unsubscribe are executed within the time.
186-
Thread.sleep(400);
188+
await().atMost(Duration.ofMillis(400)).untilAsserted(() -> {
189+
// Listener removed from channel
190+
assertThat(template.convertAndSend(CHANNEL, payload1)).isEqualTo(0L);
191+
assertThat(template.convertAndSend(CHANNEL, payload2)).isEqualTo(0L);
187192

188-
// Listener removed from channel
189-
template.convertAndSend(CHANNEL, payload1);
190-
template.convertAndSend(CHANNEL, payload2);
191-
192-
// Listener receives messages on another channel
193-
template.convertAndSend(ANOTHER_CHANNEL, anotherPayload1);
194-
template.convertAndSend(ANOTHER_CHANNEL, anotherPayload2);
193+
// Listener receives messages on another channel
194+
assertThat(template.convertAndSend(ANOTHER_CHANNEL, anotherPayload1)).isEqualTo(1L);
195+
assertThat(template.convertAndSend(ANOTHER_CHANNEL, anotherPayload2)).isEqualTo(1L);
196+
});
195197

196198
await().atMost(Duration.ofSeconds(2)).until(() -> bag.contains(anotherPayload1) && bag.contains(anotherPayload2));
197199
}
198200

199201
/**
200202
* Validates the behavior of {@link RedisMessageListenerContainer} when it needs to spin up a thread executing its
201203
* PatternSubscriptionTask
202-
*
203-
* @throws Exception
204204
*/
205205
@ParameterizedRedisTest
206-
void testInitializeContainerWithMultipleTopicsIncludingPattern() throws Exception {
206+
void testInitializeContainerWithMultipleTopicsIncludingPattern() {
207207

208208
assumeFalse(isClusterAware(template.getConnectionFactory()));
209209

@@ -217,10 +217,10 @@ void testInitializeContainerWithMultipleTopicsIncludingPattern() throws Exceptio
217217

218218
// timing: There's currently no other way to synchronize
219219
// than to hope the subscribe/unsubscribe are executed within the time.
220-
Thread.sleep(250);
221-
222-
template.convertAndSend("somechannel", "HELLO");
223-
template.convertAndSend(uniqueChannel, "WORLD");
220+
await().atMost(Duration.ofMillis(250)).untilAsserted(() -> {
221+
assertThat(template.convertAndSend("somechannel", "HELLO")).isEqualTo(1L);
222+
assertThat(template.convertAndSend(uniqueChannel, "WORLD")).isEqualTo(1L);
223+
});
224224

225225
await().atMost(Duration.ofSeconds(2)).until(() -> bag.contains("HELLO") && bag.contains("WORLD"));
226226
}

src/test/java/org/springframework/data/redis/listener/PubSubTests.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
* @author Costin Leau
5050
* @author Jennifer Hickey
5151
* @author Mark Paluch
52+
* @author Vedran Pavic
5253
*/
5354
@MethodSource("testParams")
5455
public class PubSubTests<T> {
@@ -124,14 +125,13 @@ T getT() {
124125
return factory.instance();
125126
}
126127

127-
@SuppressWarnings("unchecked")
128128
@ParameterizedRedisTest
129-
void testContainerSubscribe() throws Exception {
129+
void testContainerSubscribe() {
130130
T payload1 = getT();
131131
T payload2 = getT();
132132

133-
template.convertAndSend(CHANNEL, payload1);
134-
template.convertAndSend(CHANNEL, payload2);
133+
assertThat(template.convertAndSend(CHANNEL, payload1)).isEqualTo(1L);
134+
assertThat(template.convertAndSend(CHANNEL, payload2)).isEqualTo(1L);
135135

136136
await().atMost(Duration.ofSeconds(2)).until(() -> bag.contains(payload1) && bag.contains(payload2));
137137
}
@@ -140,7 +140,7 @@ void testContainerSubscribe() throws Exception {
140140
void testMessageBatch() throws Exception {
141141
int COUNT = 10;
142142
for (int i = 0; i < COUNT; i++) {
143-
template.convertAndSend(CHANNEL, getT());
143+
assertThat(template.convertAndSend(CHANNEL, getT())).isEqualTo(1L);
144144
}
145145

146146
for (int i = 0; i < COUNT; i++) {
@@ -155,8 +155,8 @@ void testContainerUnsubscribe() throws Exception {
155155
T payload2 = getT();
156156

157157
container.removeMessageListener(adapter, new ChannelTopic(CHANNEL));
158-
template.convertAndSend(CHANNEL, payload1);
159-
template.convertAndSend(CHANNEL, payload2);
158+
assertThat(template.convertAndSend(CHANNEL, payload1)).isEqualTo(1L);
159+
assertThat(template.convertAndSend(CHANNEL, payload2)).isEqualTo(1L);
160160

161161
assertThat(bag.poll(200, TimeUnit.MILLISECONDS)).isNull();
162162
}
@@ -169,9 +169,8 @@ void testStartNoListeners() {
169169
container.start();
170170
}
171171

172-
@SuppressWarnings("unchecked")
173172
@ParameterizedRedisTest // DATAREDIS-251, GH-964
174-
void testStartListenersToNoSpecificChannelTest() throws InterruptedException {
173+
void testStartListenersToNoSpecificChannelTest() {
175174

176175
assumeThat(isClusterAware(template.getConnectionFactory())).isFalse();
177176

@@ -181,7 +180,7 @@ void testStartListenersToNoSpecificChannelTest() throws InterruptedException {
181180

182181
T payload = getT();
183182

184-
template.convertAndSend(CHANNEL, payload);
183+
assertThat(template.convertAndSend(CHANNEL, payload)).isEqualTo(1L);
185184

186185
await().atMost(Duration.ofSeconds(2)).until(() -> bag.contains(payload));
187186
}

0 commit comments

Comments
 (0)