diff --git a/src/main/asciidoc/reference/redis-messaging.adoc b/src/main/asciidoc/reference/redis-messaging.adoc index 86da25dfa3..179cda78f3 100644 --- a/src/main/asciidoc/reference/redis-messaging.adoc +++ b/src/main/asciidoc/reference/redis-messaging.adoc @@ -24,7 +24,7 @@ byte[] msg = ... byte[] channel = ... con.publish(msg, channel); // send message through RedisTemplate RedisTemplate template = ... -template.convertAndSend("hello!", "world"); +Long numberOfClients = template.convertAndSend("hello!", "world"); ---- [[redis:pubsub:subscribe]] diff --git a/src/main/java/org/springframework/data/redis/core/RedisOperations.java b/src/main/java/org/springframework/data/redis/core/RedisOperations.java index d4404435d7..b330184c4c 100644 --- a/src/main/java/org/springframework/data/redis/core/RedisOperations.java +++ b/src/main/java/org/springframework/data/redis/core/RedisOperations.java @@ -1,5 +1,5 @@ /* - * Copyright 2011-2021 the original author or authors. + * Copyright 2011-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -45,6 +45,7 @@ * @author Mark Paluch * @author ihaohong * @author Todd Merrill + * @author Vedran Pavic */ public interface RedisOperations { @@ -580,9 +581,11 @@ default void restore(K key, byte[] value, long timeToLive, TimeUnit unit) { * * @param destination the channel to publish to, must not be {@literal null}. * @param message message to publish + * @return the number of clients that received the message * @see Redis Documentation: PUBLISH */ - void convertAndSend(String destination, Object message); + @Nullable + Long convertAndSend(String destination, Object message); // ------------------------------------------------------------------------- // Methods to obtain specific operations interface objects. diff --git a/src/main/java/org/springframework/data/redis/core/RedisTemplate.java b/src/main/java/org/springframework/data/redis/core/RedisTemplate.java index 9075f96933..b11c959064 100644 --- a/src/main/java/org/springframework/data/redis/core/RedisTemplate.java +++ b/src/main/java/org/springframework/data/redis/core/RedisTemplate.java @@ -1,5 +1,5 @@ /* - * Copyright 2011-2021 the original author or authors. + * Copyright 2011-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -83,6 +83,7 @@ * @author Mark Paluch * @author Denis Zavedeev * @author ihaohong + * @author Vedran Pavic * @param the Redis key type against which the template works (usually a String) * @param the Redis value type against which the template works * @see StringRedisTemplate @@ -837,17 +838,14 @@ public Boolean expireAt(K key, final Date date) { * @see org.springframework.data.redis.core.RedisOperations#convertAndSend(java.lang.String, java.lang.Object) */ @Override - public void convertAndSend(String channel, Object message) { + public Long convertAndSend(String channel, Object message) { Assert.hasText(channel, "a non-empty channel is required"); byte[] rawChannel = rawString(channel); byte[] rawMessage = rawValue(message); - execute(connection -> { - connection.publish(rawChannel, rawMessage); - return null; - }, true); + return execute(connection -> connection.publish(rawChannel, rawMessage), true); } // diff --git a/src/test/java/org/springframework/data/redis/config/NamespaceIntegrationTests.java b/src/test/java/org/springframework/data/redis/config/NamespaceIntegrationTests.java index 3c98cb7e74..be11bcc2ef 100644 --- a/src/test/java/org/springframework/data/redis/config/NamespaceIntegrationTests.java +++ b/src/test/java/org/springframework/data/redis/config/NamespaceIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2011-2021 the original author or authors. + * Copyright 2011-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,6 +27,7 @@ /** * @author Costin Leau * @author Mark Paluch + * @author Vedran Pavic */ @SpringJUnitConfig(locations = "namespace.xml") class NamespaceIntegrationTests { @@ -43,8 +44,8 @@ void testSanityTest() throws Exception { } @Test - void testWithMessages() throws Exception { - template.convertAndSend("x1", "[X]test"); - template.convertAndSend("z1", "[Z]test"); + void testWithMessages() { + assertThat(template.convertAndSend("x1", "[X]test")).isEqualTo(1L); + assertThat(template.convertAndSend("z1", "[Z]test")).isEqualTo(1L); } } diff --git a/src/test/java/org/springframework/data/redis/core/RedisTemplateIntegrationTests.java b/src/test/java/org/springframework/data/redis/core/RedisTemplateIntegrationTests.java index b6344e2744..34dfd36950 100644 --- a/src/test/java/org/springframework/data/redis/core/RedisTemplateIntegrationTests.java +++ b/src/test/java/org/springframework/data/redis/core/RedisTemplateIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2013-2021 the original author or authors. + * Copyright 2013-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -61,6 +61,7 @@ * @author Duobiao Ou * @author Mark Paluch * @author ihaohong + * @author Vedran Pavic */ @MethodSource("testParams") public class RedisTemplateIntegrationTests { @@ -797,10 +798,10 @@ public List execute(RedisOperations operations) throws DataAccessExcepti } @ParameterizedRedisTest - public void testConvertAndSend() { + void testConvertAndSend() { V value1 = valueFactory.instance(); // Make sure basic message sent without Exception on serialization - redisTemplate.convertAndSend("Channel", value1); + assertThat(redisTemplate.convertAndSend("Channel", value1)).isEqualTo(0L); } @ParameterizedRedisTest diff --git a/src/test/java/org/springframework/data/redis/listener/PubSubResubscribeTests.java b/src/test/java/org/springframework/data/redis/listener/PubSubResubscribeTests.java index 76d12a5187..efa33be291 100644 --- a/src/test/java/org/springframework/data/redis/listener/PubSubResubscribeTests.java +++ b/src/test/java/org/springframework/data/redis/listener/PubSubResubscribeTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2011-2021 the original author or authors. + * Copyright 2011-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,6 +15,7 @@ */ package org.springframework.data.redis.listener; +import static org.assertj.core.api.Assertions.*; import static org.awaitility.Awaitility.*; import static org.junit.Assume.*; @@ -54,6 +55,7 @@ * @author Jennifer Hickey * @author Christoph Strobl * @author Mark Paluch + * @author Vedran Pavic */ @MethodSource("testParams") @EnabledIfLongRunningTest @@ -130,7 +132,7 @@ void tearDown() { @ParameterizedRedisTest @EnabledIfLongRunningTest - void testContainerPatternResubscribe() throws Exception { + void testContainerPatternResubscribe() { String payload1 = "do"; String payload2 = "re mi"; @@ -147,11 +149,11 @@ void testContainerPatternResubscribe() throws Exception { container.addMessageListener(anotherListener, new PatternTopic(PATTERN)); // Wait for async subscription tasks to setup - Thread.sleep(400); - - // test no messages are sent just to patterns - template.convertAndSend(CHANNEL, payload1); - template.convertAndSend(ANOTHER_CHANNEL, payload2); + await().atMost(Duration.ofMillis(600)).untilAsserted(() -> { + // test no messages are sent just to patterns + assertThat(template.convertAndSend(CHANNEL, payload1)).isEqualTo(1L); + assertThat(template.convertAndSend(ANOTHER_CHANNEL, payload2)).isEqualTo(1L); + }); await().atMost(Duration.ofSeconds(2)).until(() -> bag2.contains(payload1) && bag2.contains(payload2)); @@ -159,10 +161,10 @@ void testContainerPatternResubscribe() throws Exception { container.addMessageListener(adapter, new ChannelTopic(ANOTHER_CHANNEL)); // Wait for async subscription tasks to setup - Thread.sleep(400); - - template.convertAndSend(CHANNEL, payload1); - template.convertAndSend(ANOTHER_CHANNEL, payload2); + await().atMost(Duration.ofMillis(400)).untilAsserted(() -> { + assertThat(template.convertAndSend(CHANNEL, payload1)).isEqualTo(1L); + assertThat(template.convertAndSend(ANOTHER_CHANNEL, payload2)).isEqualTo(2L); + }); await().atMost(Duration.ofSeconds(2)).until(() -> bag.contains(payload2)); @@ -171,7 +173,7 @@ void testContainerPatternResubscribe() throws Exception { } @ParameterizedRedisTest - void testContainerChannelResubscribe() throws Exception { + void testContainerChannelResubscribe() { String payload1 = "do"; String payload2 = "re mi"; @@ -187,15 +189,15 @@ void testContainerChannelResubscribe() throws Exception { // timing: There's currently no other way to synchronize // than to hope the subscribe/unsubscribe are executed within the time. - Thread.sleep(400); - - // Listener removed from channel - template.convertAndSend(CHANNEL, payload1); - template.convertAndSend(CHANNEL, payload2); - - // Listener receives messages on another channel - template.convertAndSend(ANOTHER_CHANNEL, anotherPayload1); - template.convertAndSend(ANOTHER_CHANNEL, anotherPayload2); + await().atMost(Duration.ofMillis(400)).untilAsserted(() -> { + // Listener removed from channel + assertThat(template.convertAndSend(CHANNEL, payload1)).isEqualTo(0L); + assertThat(template.convertAndSend(CHANNEL, payload2)).isEqualTo(0L); + + // Listener receives messages on another channel + assertThat(template.convertAndSend(ANOTHER_CHANNEL, anotherPayload1)).isEqualTo(1L); + assertThat(template.convertAndSend(ANOTHER_CHANNEL, anotherPayload2)).isEqualTo(1L); + }); await().atMost(Duration.ofSeconds(2)).until(() -> bag.contains(anotherPayload1) && bag.contains(anotherPayload2)); } @@ -203,11 +205,9 @@ void testContainerChannelResubscribe() throws Exception { /** * Validates the behavior of {@link RedisMessageListenerContainer} when it needs to spin up a thread executing its * PatternSubscriptionTask - * - * @throws Exception */ @ParameterizedRedisTest - void testInitializeContainerWithMultipleTopicsIncludingPattern() throws Exception { + void testInitializeContainerWithMultipleTopicsIncludingPattern() { assumeFalse(isClusterAware(template.getConnectionFactory())); @@ -223,10 +223,10 @@ void testInitializeContainerWithMultipleTopicsIncludingPattern() throws Exceptio // timing: There's currently no other way to synchronize // than to hope the subscribe/unsubscribe are executed within the time. - Thread.sleep(250); - - template.convertAndSend("somechannel", "HELLO"); - template.convertAndSend(uniqueChannel, "WORLD"); + await().atMost(Duration.ofMillis(250)).untilAsserted(() -> { + assertThat(template.convertAndSend("somechannel", "HELLO")).isEqualTo(1L); + assertThat(template.convertAndSend(uniqueChannel, "WORLD")).isEqualTo(1L); + }); await().atMost(Duration.ofSeconds(2)).until(() -> bag.contains("HELLO") && bag.contains("WORLD")); } diff --git a/src/test/java/org/springframework/data/redis/listener/PubSubTests.java b/src/test/java/org/springframework/data/redis/listener/PubSubTests.java index 5e397af918..dd03109a21 100644 --- a/src/test/java/org/springframework/data/redis/listener/PubSubTests.java +++ b/src/test/java/org/springframework/data/redis/listener/PubSubTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2011-2021 the original author or authors. + * Copyright 2011-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -50,6 +50,7 @@ * @author Costin Leau * @author Jennifer Hickey * @author Mark Paluch + * @author Vedran Pavic */ @MethodSource("testParams") public class PubSubTests { @@ -125,14 +126,13 @@ T getT() { return factory.instance(); } - @SuppressWarnings("unchecked") @ParameterizedRedisTest - void testContainerSubscribe() throws Exception { + void testContainerSubscribe() { T payload1 = getT(); T payload2 = getT(); - template.convertAndSend(CHANNEL, payload1); - template.convertAndSend(CHANNEL, payload2); + assertThat(template.convertAndSend(CHANNEL, payload1)).isEqualTo(1L); + assertThat(template.convertAndSend(CHANNEL, payload2)).isEqualTo(1L); await().atMost(Duration.ofSeconds(2)).until(() -> bag.contains(payload1) && bag.contains(payload2)); } @@ -141,7 +141,7 @@ void testContainerSubscribe() throws Exception { void testMessageBatch() throws Exception { int COUNT = 10; for (int i = 0; i < COUNT; i++) { - template.convertAndSend(CHANNEL, getT()); + assertThat(template.convertAndSend(CHANNEL, getT())).isEqualTo(1L); } for (int i = 0; i < COUNT; i++) { @@ -156,8 +156,8 @@ void testContainerUnsubscribe() throws Exception { T payload2 = getT(); container.removeMessageListener(adapter, new ChannelTopic(CHANNEL)); - template.convertAndSend(CHANNEL, payload1); - template.convertAndSend(CHANNEL, payload2); + assertThat(template.convertAndSend(CHANNEL, payload1)).isEqualTo(1L); + assertThat(template.convertAndSend(CHANNEL, payload2)).isEqualTo(1L); assertThat(bag.poll(200, TimeUnit.MILLISECONDS)).isNull(); } @@ -170,9 +170,8 @@ void testStartNoListeners() { container.start(); } - @SuppressWarnings("unchecked") @ParameterizedRedisTest // DATAREDIS-251 - void testStartListenersToNoSpecificChannelTest() throws InterruptedException { + void testStartListenersToNoSpecificChannelTest() { assumeThat(isClusterAware(template.getConnectionFactory())).isFalse(); assumeThat(ConnectionUtils.isJedis(template.getConnectionFactory())).isTrue(); @@ -186,7 +185,7 @@ void testStartListenersToNoSpecificChannelTest() throws InterruptedException { T payload = getT(); - template.convertAndSend(CHANNEL, payload); + assertThat(template.convertAndSend(CHANNEL, payload)).isEqualTo(1L); await().atMost(Duration.ofSeconds(2)).until(() -> bag.contains(payload)); }