Skip to content

Update RedisOperations#convertAndSend to return number of clients receiving the message. #2225

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/main/asciidoc/reference/redis-messaging.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ byte[] msg = ...
byte[] channel = ...
con.publish(msg, channel); // send message through RedisTemplate
RedisTemplate template = ...
template.convertAndSend("hello!", "world");
Long numberOfClients = template.convertAndSend("hello!", "world");
----

[[redis:pubsub:subscribe]]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2011-2021 the original author or authors.
* Copyright 2011-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -45,6 +45,7 @@
* @author Mark Paluch
* @author ihaohong
* @author Todd Merrill
* @author Vedran Pavic
*/
public interface RedisOperations<K, V> {

Expand Down Expand Up @@ -580,9 +581,11 @@ default void restore(K key, byte[] value, long timeToLive, TimeUnit unit) {
*
* @param destination the channel to publish to, must not be {@literal null}.
* @param message message to publish
* @return the number of clients that received the message
* @see <a href="https://redis.io/commands/publish">Redis Documentation: PUBLISH</a>
*/
void convertAndSend(String destination, Object message);
@Nullable
Long convertAndSend(String destination, Object message);

// -------------------------------------------------------------------------
// Methods to obtain specific operations interface objects.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2011-2021 the original author or authors.
* Copyright 2011-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -83,6 +83,7 @@
* @author Mark Paluch
* @author Denis Zavedeev
* @author ihaohong
* @author Vedran Pavic
* @param <K> the Redis key type against which the template works (usually a String)
* @param <V> the Redis value type against which the template works
* @see StringRedisTemplate
Expand Down Expand Up @@ -837,17 +838,14 @@ public Boolean expireAt(K key, final Date date) {
* @see org.springframework.data.redis.core.RedisOperations#convertAndSend(java.lang.String, java.lang.Object)
*/
@Override
public void convertAndSend(String channel, Object message) {
public Long convertAndSend(String channel, Object message) {

Assert.hasText(channel, "a non-empty channel is required");

byte[] rawChannel = rawString(channel);
byte[] rawMessage = rawValue(message);

execute(connection -> {
connection.publish(rawChannel, rawMessage);
return null;
}, true);
return execute(connection -> connection.publish(rawChannel, rawMessage), true);
}

//
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2011-2021 the original author or authors.
* Copyright 2011-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -27,6 +27,7 @@
/**
* @author Costin Leau
* @author Mark Paluch
* @author Vedran Pavic
*/
@SpringJUnitConfig(locations = "namespace.xml")
class NamespaceIntegrationTests {
Expand All @@ -43,8 +44,8 @@ void testSanityTest() throws Exception {
}

@Test
void testWithMessages() throws Exception {
template.convertAndSend("x1", "[X]test");
template.convertAndSend("z1", "[Z]test");
void testWithMessages() {
assertThat(template.convertAndSend("x1", "[X]test")).isEqualTo(1L);
assertThat(template.convertAndSend("z1", "[Z]test")).isEqualTo(1L);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2013-2021 the original author or authors.
* Copyright 2013-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -61,6 +61,7 @@
* @author Duobiao Ou
* @author Mark Paluch
* @author ihaohong
* @author Vedran Pavic
*/
@MethodSource("testParams")
public class RedisTemplateIntegrationTests<K, V> {
Expand Down Expand Up @@ -797,10 +798,10 @@ public List<Object> execute(RedisOperations operations) throws DataAccessExcepti
}

@ParameterizedRedisTest
public void testConvertAndSend() {
void testConvertAndSend() {
V value1 = valueFactory.instance();
// Make sure basic message sent without Exception on serialization
redisTemplate.convertAndSend("Channel", value1);
assertThat(redisTemplate.convertAndSend("Channel", value1)).isEqualTo(0L);
}

@ParameterizedRedisTest
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2011-2021 the original author or authors.
* Copyright 2011-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -15,6 +15,7 @@
*/
package org.springframework.data.redis.listener;

import static org.assertj.core.api.Assertions.*;
import static org.awaitility.Awaitility.*;
import static org.junit.Assume.*;

Expand Down Expand Up @@ -54,6 +55,7 @@
* @author Jennifer Hickey
* @author Christoph Strobl
* @author Mark Paluch
* @author Vedran Pavic
*/
@MethodSource("testParams")
@EnabledIfLongRunningTest
Expand Down Expand Up @@ -130,7 +132,7 @@ void tearDown() {

@ParameterizedRedisTest
@EnabledIfLongRunningTest
void testContainerPatternResubscribe() throws Exception {
void testContainerPatternResubscribe() {

String payload1 = "do";
String payload2 = "re mi";
Expand All @@ -147,22 +149,22 @@ void testContainerPatternResubscribe() throws Exception {
container.addMessageListener(anotherListener, new PatternTopic(PATTERN));

// Wait for async subscription tasks to setup
Thread.sleep(400);

// test no messages are sent just to patterns
template.convertAndSend(CHANNEL, payload1);
template.convertAndSend(ANOTHER_CHANNEL, payload2);
await().atMost(Duration.ofMillis(600)).untilAsserted(() -> {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I utilized Awaitility in use cases like this as it's more handy vs having to resort to Thread#sleep. Note that for this particular set of assertions I had to increase the grace period from 400 to 600 ms as the test was flaky, on my machine at least.

// test no messages are sent just to patterns
assertThat(template.convertAndSend(CHANNEL, payload1)).isEqualTo(1L);
assertThat(template.convertAndSend(ANOTHER_CHANNEL, payload2)).isEqualTo(1L);
});

await().atMost(Duration.ofSeconds(2)).until(() -> bag2.contains(payload1) && bag2.contains(payload2));

// bind original listener on another channel
container.addMessageListener(adapter, new ChannelTopic(ANOTHER_CHANNEL));

// Wait for async subscription tasks to setup
Thread.sleep(400);

template.convertAndSend(CHANNEL, payload1);
template.convertAndSend(ANOTHER_CHANNEL, payload2);
await().atMost(Duration.ofMillis(400)).untilAsserted(() -> {
assertThat(template.convertAndSend(CHANNEL, payload1)).isEqualTo(1L);
assertThat(template.convertAndSend(ANOTHER_CHANNEL, payload2)).isEqualTo(2L);
});

await().atMost(Duration.ofSeconds(2)).until(() -> bag.contains(payload2));

Expand All @@ -171,7 +173,7 @@ void testContainerPatternResubscribe() throws Exception {
}

@ParameterizedRedisTest
void testContainerChannelResubscribe() throws Exception {
void testContainerChannelResubscribe() {

String payload1 = "do";
String payload2 = "re mi";
Expand All @@ -187,27 +189,25 @@ void testContainerChannelResubscribe() throws Exception {

// timing: There's currently no other way to synchronize
// than to hope the subscribe/unsubscribe are executed within the time.
Thread.sleep(400);

// Listener removed from channel
template.convertAndSend(CHANNEL, payload1);
template.convertAndSend(CHANNEL, payload2);

// Listener receives messages on another channel
template.convertAndSend(ANOTHER_CHANNEL, anotherPayload1);
template.convertAndSend(ANOTHER_CHANNEL, anotherPayload2);
await().atMost(Duration.ofMillis(400)).untilAsserted(() -> {
// Listener removed from channel
assertThat(template.convertAndSend(CHANNEL, payload1)).isEqualTo(0L);
assertThat(template.convertAndSend(CHANNEL, payload2)).isEqualTo(0L);

// Listener receives messages on another channel
assertThat(template.convertAndSend(ANOTHER_CHANNEL, anotherPayload1)).isEqualTo(1L);
assertThat(template.convertAndSend(ANOTHER_CHANNEL, anotherPayload2)).isEqualTo(1L);
});

await().atMost(Duration.ofSeconds(2)).until(() -> bag.contains(anotherPayload1) && bag.contains(anotherPayload2));
}

/**
* Validates the behavior of {@link RedisMessageListenerContainer} when it needs to spin up a thread executing its
* PatternSubscriptionTask
*
* @throws Exception
*/
@ParameterizedRedisTest
void testInitializeContainerWithMultipleTopicsIncludingPattern() throws Exception {
void testInitializeContainerWithMultipleTopicsIncludingPattern() {

assumeFalse(isClusterAware(template.getConnectionFactory()));

Expand All @@ -223,10 +223,10 @@ void testInitializeContainerWithMultipleTopicsIncludingPattern() throws Exceptio

// timing: There's currently no other way to synchronize
// than to hope the subscribe/unsubscribe are executed within the time.
Thread.sleep(250);

template.convertAndSend("somechannel", "HELLO");
template.convertAndSend(uniqueChannel, "WORLD");
await().atMost(Duration.ofMillis(250)).untilAsserted(() -> {
assertThat(template.convertAndSend("somechannel", "HELLO")).isEqualTo(1L);
assertThat(template.convertAndSend(uniqueChannel, "WORLD")).isEqualTo(1L);
});

await().atMost(Duration.ofSeconds(2)).until(() -> bag.contains("HELLO") && bag.contains("WORLD"));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2011-2021 the original author or authors.
* Copyright 2011-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -50,6 +50,7 @@
* @author Costin Leau
* @author Jennifer Hickey
* @author Mark Paluch
* @author Vedran Pavic
*/
@MethodSource("testParams")
public class PubSubTests<T> {
Expand Down Expand Up @@ -125,14 +126,13 @@ T getT() {
return factory.instance();
}

@SuppressWarnings("unchecked")
@ParameterizedRedisTest
void testContainerSubscribe() throws Exception {
void testContainerSubscribe() {
T payload1 = getT();
T payload2 = getT();

template.convertAndSend(CHANNEL, payload1);
template.convertAndSend(CHANNEL, payload2);
assertThat(template.convertAndSend(CHANNEL, payload1)).isEqualTo(1L);
assertThat(template.convertAndSend(CHANNEL, payload2)).isEqualTo(1L);

await().atMost(Duration.ofSeconds(2)).until(() -> bag.contains(payload1) && bag.contains(payload2));
}
Expand All @@ -141,7 +141,7 @@ void testContainerSubscribe() throws Exception {
void testMessageBatch() throws Exception {
int COUNT = 10;
for (int i = 0; i < COUNT; i++) {
template.convertAndSend(CHANNEL, getT());
assertThat(template.convertAndSend(CHANNEL, getT())).isEqualTo(1L);
}

for (int i = 0; i < COUNT; i++) {
Expand All @@ -156,8 +156,8 @@ void testContainerUnsubscribe() throws Exception {
T payload2 = getT();

container.removeMessageListener(adapter, new ChannelTopic(CHANNEL));
template.convertAndSend(CHANNEL, payload1);
template.convertAndSend(CHANNEL, payload2);
assertThat(template.convertAndSend(CHANNEL, payload1)).isEqualTo(1L);
assertThat(template.convertAndSend(CHANNEL, payload2)).isEqualTo(1L);

assertThat(bag.poll(200, TimeUnit.MILLISECONDS)).isNull();
}
Expand All @@ -170,9 +170,8 @@ void testStartNoListeners() {
container.start();
}

@SuppressWarnings("unchecked")
@ParameterizedRedisTest // DATAREDIS-251
void testStartListenersToNoSpecificChannelTest() throws InterruptedException {
void testStartListenersToNoSpecificChannelTest() {

assumeThat(isClusterAware(template.getConnectionFactory())).isFalse();
assumeThat(ConnectionUtils.isJedis(template.getConnectionFactory())).isTrue();
Expand All @@ -186,7 +185,7 @@ void testStartListenersToNoSpecificChannelTest() throws InterruptedException {

T payload = getT();

template.convertAndSend(CHANNEL, payload);
assertThat(template.convertAndSend(CHANNEL, payload)).isEqualTo(1L);

await().atMost(Duration.ofSeconds(2)).until(() -> bag.contains(payload));
}
Expand Down