From 7c54dd5b3190e71df5f3b43444d702f0117a6752 Mon Sep 17 00:00:00 2001 From: John Blum Date: Thu, 7 Sep 2023 16:47:24 -0700 Subject: [PATCH 1/2] Prepare issue branch for 2692. --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 1f6ddd552b..e62c49dede 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ org.springframework.data spring-data-redis - 3.2.0-SNAPSHOT + 3.2.0-GH-2692-SNAPSHOT Spring Data Redis Spring Data module for Redis From d96693ca0b3867c2255996c5ccbb25a6ab8534cb Mon Sep 17 00:00:00 2001 From: John Blum Date: Thu, 7 Sep 2023 16:58:33 -0700 Subject: [PATCH 2/2] Adds support to lPop or rPop N elements from a Redis List in ReactiveListOperations. Closes #2692 --- .../core/DefaultReactiveListOperations.java | 25 ++++++- .../redis/core/ReactiveListOperations.java | 21 ++++++ .../core/ReactiveListOperationsExtensions.kt | 18 +++++ ...eactiveListOperationsIntegrationTests.java | 73 ++++++++++++++++++- ...activeListOperationsExtensionsUnitTests.kt | 38 +++++++++- 5 files changed, 165 insertions(+), 10 deletions(-) diff --git a/src/main/java/org/springframework/data/redis/core/DefaultReactiveListOperations.java b/src/main/java/org/springframework/data/redis/core/DefaultReactiveListOperations.java index 231c188d27..430daec5f9 100644 --- a/src/main/java/org/springframework/data/redis/core/DefaultReactiveListOperations.java +++ b/src/main/java/org/springframework/data/redis/core/DefaultReactiveListOperations.java @@ -39,6 +39,7 @@ * * @author Mark Paluch * @author Christoph Strobl + * @author John Blum * @since 2.0 */ class DefaultReactiveListOperations implements ReactiveListOperations { @@ -244,13 +245,21 @@ public Mono leftPop(K key, Duration timeout) { Assert.notNull(key, "Key must not be null"); Assert.notNull(timeout, "Duration must not be null"); - Assert.isTrue(isZeroOrGreater1Second(timeout), "Duration must be either zero or greater or equal to 1 second"); + Assert.isTrue(isZeroOrGreaterOneSecond(timeout), "Duration must be either zero or greater or equal to 1 second"); return createMono(listCommands -> listCommands.blPop(Collections.singletonList(rawKey(key)), timeout) .map(popResult -> readValue(popResult.getValue()))); } + @Override + public Flux leftPop(K key, long count) { + + Assert.notNull(key, "Key must not be null"); + + return createFlux(listCommands -> listCommands.lPop(rawKey(key), count).map(this::readValue)); + } + @Override public Mono rightPop(K key) { @@ -264,13 +273,21 @@ public Mono rightPop(K key, Duration timeout) { Assert.notNull(key, "Key must not be null"); Assert.notNull(timeout, "Duration must not be null"); - Assert.isTrue(isZeroOrGreater1Second(timeout), "Duration must be either zero or greater or equal to 1 second"); + Assert.isTrue(isZeroOrGreaterOneSecond(timeout), "Duration must be either zero or greater or equal to 1 second"); return createMono(listCommands -> listCommands.brPop(Collections.singletonList(rawKey(key)), timeout) .map(popResult -> readValue(popResult.getValue()))); } + @Override + public Flux rightPop(K key, long count) { + + Assert.notNull(key, "Key must not be null"); + + return createFlux(listCommands -> listCommands.rPop(rawKey(key), count).map(this::readValue)); + } + @Override public Mono rightPopAndLeftPush(K sourceKey, K destinationKey) { @@ -287,7 +304,7 @@ public Mono rightPopAndLeftPush(K sourceKey, K destinationKey, Duration timeo Assert.notNull(sourceKey, "Source key must not be null"); Assert.notNull(destinationKey, "Destination key must not be null"); Assert.notNull(timeout, "Duration must not be null"); - Assert.isTrue(isZeroOrGreater1Second(timeout), "Duration must be either zero or greater or equal to 1 second"); + Assert.isTrue(isZeroOrGreaterOneSecond(timeout), "Duration must be either zero or greater or equal to 1 second"); return createMono(listCommands -> listCommands.bRPopLPush(rawKey(sourceKey), rawKey(destinationKey), timeout).map(this::readValue)); @@ -315,7 +332,7 @@ private Flux createFlux(Function> func return template.doCreateFlux(connection -> function.apply(connection.listCommands())); } - private boolean isZeroOrGreater1Second(Duration timeout) { + private boolean isZeroOrGreaterOneSecond(Duration timeout) { return timeout.isZero() || timeout.getNano() % TimeUnit.NANOSECONDS.convert(1, TimeUnit.SECONDS) == 0; } diff --git a/src/main/java/org/springframework/data/redis/core/ReactiveListOperations.java b/src/main/java/org/springframework/data/redis/core/ReactiveListOperations.java index e77f413a21..6a13c75ccf 100644 --- a/src/main/java/org/springframework/data/redis/core/ReactiveListOperations.java +++ b/src/main/java/org/springframework/data/redis/core/ReactiveListOperations.java @@ -32,6 +32,7 @@ * * @author Mark Paluch * @author Christoph Strobl + * @author John Blum * @see Redis Documentation: List Commands * @since 2.0 */ @@ -325,6 +326,16 @@ default Mono move(MoveFrom from, MoveTo to, Duration timeout) { */ Mono leftPop(K key, Duration timeout); + /** + * Removes {@link Long count} elements from the left-side of the Redis list stored at key. + * + * @param key {@link K Key} referring to the list stored in Redis; must not be {@literal null}. + * @param count {@link Long count} of the number of elements to remove from the left-side of the Redis list. + * @return a {@link Flux} containing the elements removed from the Redis list. + * @since 3.2 + */ + Flux leftPop(K key, long count); + /** * Removes and returns last element in list stored at {@code key}. * @@ -347,6 +358,16 @@ default Mono move(MoveFrom from, MoveTo to, Duration timeout) { */ Mono rightPop(K key, Duration timeout); + /** + * Removes {@link Long count} elements from the right-side of the Redis list stored at key. + * + * @param key {@link K Key} referring to the list stored in Redis; must not be {@literal null}. + * @param count {@link Long count} of the number of elements to remove from the right-side of the Redis list. + * @return a {@link Flux} containing the elements removed from the Redis list. + * @since 3.2 + */ + Flux rightPop(K key, long count); + /** * Remove the last element from list at {@code sourceKey}, append it to {@code destinationKey} and return its value. * diff --git a/src/main/kotlin/org/springframework/data/redis/core/ReactiveListOperationsExtensions.kt b/src/main/kotlin/org/springframework/data/redis/core/ReactiveListOperationsExtensions.kt index 1f09b2df14..e0c71ef1f4 100644 --- a/src/main/kotlin/org/springframework/data/redis/core/ReactiveListOperationsExtensions.kt +++ b/src/main/kotlin/org/springframework/data/redis/core/ReactiveListOperationsExtensions.kt @@ -183,6 +183,15 @@ suspend fun ReactiveListOperations.leftPopAndAwait(key: suspend fun ReactiveListOperations.leftPopAndAwait(key: K, timeout: Duration): V? = leftPop(key, timeout).awaitFirstOrNull() +/** + * Coroutines variant of [ReactiveListOperations.leftPop] with count. + * + * @author John Blum + * @since 3.2 + */ +fun ReactiveListOperations.leftPopAsFlow(key: K, count :Long): Flow = + leftPop(key, count).asFlow() + /** * Coroutines variant of [ReactiveListOperations.rightPop]. * @@ -201,6 +210,15 @@ suspend fun ReactiveListOperations.rightPopAndAwait(key suspend fun ReactiveListOperations.rightPopAndAwait(key: K, timeout: Duration): V? = rightPop(key, timeout).awaitFirstOrNull() +/** + * Coroutines variant of [ReactiveListOperations.rightPop] with count. + * + * @author John Blum + * @since 3.2 + */ +fun ReactiveListOperations.rightPopAsFlow(key: K, count:Long): Flow = + rightPop(key, count).asFlow() + /** * Coroutines variant of [ReactiveListOperations.rightPopAndLeftPush]. * diff --git a/src/test/java/org/springframework/data/redis/core/DefaultReactiveListOperationsIntegrationTests.java b/src/test/java/org/springframework/data/redis/core/DefaultReactiveListOperationsIntegrationTests.java index 3b2cdf6dc1..181d46d1b8 100644 --- a/src/test/java/org/springframework/data/redis/core/DefaultReactiveListOperationsIntegrationTests.java +++ b/src/test/java/org/springframework/data/redis/core/DefaultReactiveListOperationsIntegrationTests.java @@ -15,10 +15,8 @@ */ package org.springframework.data.redis.core; -import static org.assertj.core.api.Assertions.*; -import static org.assertj.core.api.Assumptions.*; - -import reactor.test.StepVerifier; +import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; +import static org.assertj.core.api.Assumptions.assumeThat; import java.time.Duration; import java.util.Collection; @@ -35,11 +33,14 @@ import org.springframework.data.redis.test.extension.parametrized.MethodSource; import org.springframework.data.redis.test.extension.parametrized.ParameterizedRedisTest; +import reactor.test.StepVerifier; + /** * Integration tests for {@link DefaultReactiveListOperations}. * * @author Mark Paluch * @author Christoph Strobl + * @author John Blum */ @MethodSource("testParams") @SuppressWarnings("unchecked") @@ -458,6 +459,38 @@ void leftPop() { listOperations.leftPop(key).as(StepVerifier::create).expectNext(value2).verifyComplete(); } + @ParameterizedRedisTest // GH-2692 + @SuppressWarnings("all") + void leftPopWithNullKey() { + + assertThatIllegalArgumentException() + .isThrownBy(() -> this.listOperations.leftPop(null, 100L)) + .withMessage("Key must not be null") + .withNoCause(); + } + + @ParameterizedRedisTest // GH-2692 + void leftPopWithCount() { + + assumeThat(this.valueFactory).isInstanceOf(ByteBufferObjectFactory.class); + + K key = keyFactory.instance(); + V value1 = valueFactory.instance(); + V value2 = valueFactory.instance(); + V value3 = valueFactory.instance(); + + listOperations.leftPushAll(key, value1, value2, value3) + .as(StepVerifier::create) + .expectNext(3L) + .verifyComplete(); + + listOperations.leftPop(key, 2) + .as(StepVerifier::create) + .expectNext(value3) + .expectNext(value2) + .verifyComplete(); + } + @ParameterizedRedisTest // DATAREDIS-602 void rightPop() { @@ -472,6 +505,38 @@ void rightPop() { listOperations.rightPop(key).as(StepVerifier::create).expectNext(value2).verifyComplete(); } + @ParameterizedRedisTest // GH-2692 + @SuppressWarnings("all") + void rightPopWithNullKey() { + + assertThatIllegalArgumentException() + .isThrownBy(() -> this.listOperations.rightPop(null, 100L)) + .withMessage("Key must not be null") + .withNoCause(); + } + + @ParameterizedRedisTest // GH-2692 + void rightPopWithCount() { + + assumeThat(this.valueFactory).isInstanceOf(ByteBufferObjectFactory.class); + + K key = keyFactory.instance(); + V value1 = valueFactory.instance(); + V value2 = valueFactory.instance(); + V value3 = valueFactory.instance(); + + listOperations.rightPushAll(key, value3, value2, value1) + .as(StepVerifier::create) + .expectNext(3L) + .verifyComplete(); + + listOperations.rightPop(key, 2) + .as(StepVerifier::create) + .expectNext(value1) + .expectNext(value2) + .verifyComplete(); + } + @ParameterizedRedisTest // DATAREDIS-602 void leftPopWithTimeout() { diff --git a/src/test/kotlin/org/springframework/data/redis/core/ReactiveListOperationsExtensionsUnitTests.kt b/src/test/kotlin/org/springframework/data/redis/core/ReactiveListOperationsExtensionsUnitTests.kt index c09d49b8fc..f7ce4346b9 100644 --- a/src/test/kotlin/org/springframework/data/redis/core/ReactiveListOperationsExtensionsUnitTests.kt +++ b/src/test/kotlin/org/springframework/data/redis/core/ReactiveListOperationsExtensionsUnitTests.kt @@ -32,6 +32,7 @@ import java.time.Duration * @author Mark Paluch * @author Sebastien Deleuze * @author Wonwoo Lee + * @author John Blum */ class ReactiveListOperationsExtensionsUnitTests { @@ -290,11 +291,27 @@ class ReactiveListOperationsExtensionsUnitTests { } } + @Test // GH-2692 + fun leftPopWithCount() { + + val operations = mockk>() + + every { operations.leftPop(any(), any()) } returns Flux.just("foo", "bar", "baz") + + runBlocking { + assertThat(operations.leftPopAsFlow("TestKey", 3L).toList()).containsExactly("foo", "bar", "baz") + } + + verify { + operations.leftPop("TestKey", 3L) + } + } + @Test // DATAREDIS-937 fun blockingLeftPop() { val operations = mockk>() - every { operations.leftPop(any(), any()) } returns Mono.just("foo") + every { operations.leftPop(any(), any()) } returns Mono.just("foo") runBlocking { assertThat(operations.leftPopAndAwait("foo", Duration.ofDays(1))).isEqualTo("foo") @@ -320,11 +337,28 @@ class ReactiveListOperationsExtensionsUnitTests { } } + @Test // GH-2692 + fun rightPopWithCount() { + + val operations = mockk>() + + every { operations.rightPop(any(), any()) } returns Flux.just("foo", "bar", "baz") + + runBlocking { + assertThat(operations.rightPopAsFlow("TestKey", 3L).toList()) + .containsExactly("foo", "bar", "baz") + } + + verify { + operations.rightPop("TestKey", 3L) + } + } + @Test // DATAREDIS-937 fun blockingRightPop() { val operations = mockk>() - every { operations.rightPop(any(), any()) } returns Mono.just("foo") + every { operations.rightPop(any(), any()) } returns Mono.just("foo") runBlocking { assertThat(operations.rightPopAndAwait("foo", Duration.ofDays(1))).isEqualTo("foo")