diff --git a/pom.xml b/pom.xml
index 1f6ddd552b..e62c49dede 100644
--- a/pom.xml
+++ b/pom.xml
@@ -5,7 +5,7 @@
org.springframework.data
spring-data-redis
- 3.2.0-SNAPSHOT
+ 3.2.0-GH-2692-SNAPSHOT
Spring Data Redis
Spring Data module for Redis
diff --git a/src/main/java/org/springframework/data/redis/core/DefaultReactiveListOperations.java b/src/main/java/org/springframework/data/redis/core/DefaultReactiveListOperations.java
index 231c188d27..430daec5f9 100644
--- a/src/main/java/org/springframework/data/redis/core/DefaultReactiveListOperations.java
+++ b/src/main/java/org/springframework/data/redis/core/DefaultReactiveListOperations.java
@@ -39,6 +39,7 @@
*
* @author Mark Paluch
* @author Christoph Strobl
+ * @author John Blum
* @since 2.0
*/
class DefaultReactiveListOperations implements ReactiveListOperations {
@@ -244,13 +245,21 @@ public Mono leftPop(K key, Duration timeout) {
Assert.notNull(key, "Key must not be null");
Assert.notNull(timeout, "Duration must not be null");
- Assert.isTrue(isZeroOrGreater1Second(timeout), "Duration must be either zero or greater or equal to 1 second");
+ Assert.isTrue(isZeroOrGreaterOneSecond(timeout), "Duration must be either zero or greater or equal to 1 second");
return createMono(listCommands ->
listCommands.blPop(Collections.singletonList(rawKey(key)), timeout)
.map(popResult -> readValue(popResult.getValue())));
}
+ @Override
+ public Flux leftPop(K key, long count) {
+
+ Assert.notNull(key, "Key must not be null");
+
+ return createFlux(listCommands -> listCommands.lPop(rawKey(key), count).map(this::readValue));
+ }
+
@Override
public Mono rightPop(K key) {
@@ -264,13 +273,21 @@ public Mono rightPop(K key, Duration timeout) {
Assert.notNull(key, "Key must not be null");
Assert.notNull(timeout, "Duration must not be null");
- Assert.isTrue(isZeroOrGreater1Second(timeout), "Duration must be either zero or greater or equal to 1 second");
+ Assert.isTrue(isZeroOrGreaterOneSecond(timeout), "Duration must be either zero or greater or equal to 1 second");
return createMono(listCommands ->
listCommands.brPop(Collections.singletonList(rawKey(key)), timeout)
.map(popResult -> readValue(popResult.getValue())));
}
+ @Override
+ public Flux rightPop(K key, long count) {
+
+ Assert.notNull(key, "Key must not be null");
+
+ return createFlux(listCommands -> listCommands.rPop(rawKey(key), count).map(this::readValue));
+ }
+
@Override
public Mono rightPopAndLeftPush(K sourceKey, K destinationKey) {
@@ -287,7 +304,7 @@ public Mono rightPopAndLeftPush(K sourceKey, K destinationKey, Duration timeo
Assert.notNull(sourceKey, "Source key must not be null");
Assert.notNull(destinationKey, "Destination key must not be null");
Assert.notNull(timeout, "Duration must not be null");
- Assert.isTrue(isZeroOrGreater1Second(timeout), "Duration must be either zero or greater or equal to 1 second");
+ Assert.isTrue(isZeroOrGreaterOneSecond(timeout), "Duration must be either zero or greater or equal to 1 second");
return createMono(listCommands ->
listCommands.bRPopLPush(rawKey(sourceKey), rawKey(destinationKey), timeout).map(this::readValue));
@@ -315,7 +332,7 @@ private Flux createFlux(Function> func
return template.doCreateFlux(connection -> function.apply(connection.listCommands()));
}
- private boolean isZeroOrGreater1Second(Duration timeout) {
+ private boolean isZeroOrGreaterOneSecond(Duration timeout) {
return timeout.isZero() || timeout.getNano() % TimeUnit.NANOSECONDS.convert(1, TimeUnit.SECONDS) == 0;
}
diff --git a/src/main/java/org/springframework/data/redis/core/ReactiveListOperations.java b/src/main/java/org/springframework/data/redis/core/ReactiveListOperations.java
index e77f413a21..6a13c75ccf 100644
--- a/src/main/java/org/springframework/data/redis/core/ReactiveListOperations.java
+++ b/src/main/java/org/springframework/data/redis/core/ReactiveListOperations.java
@@ -32,6 +32,7 @@
*
* @author Mark Paluch
* @author Christoph Strobl
+ * @author John Blum
* @see Redis Documentation: List Commands
* @since 2.0
*/
@@ -325,6 +326,16 @@ default Mono move(MoveFrom from, MoveTo to, Duration timeout) {
*/
Mono leftPop(K key, Duration timeout);
+ /**
+ * Removes {@link Long count} elements from the left-side of the Redis list stored at key.
+ *
+ * @param key {@link K Key} referring to the list stored in Redis; must not be {@literal null}.
+ * @param count {@link Long count} of the number of elements to remove from the left-side of the Redis list.
+ * @return a {@link Flux} containing the elements removed from the Redis list.
+ * @since 3.2
+ */
+ Flux leftPop(K key, long count);
+
/**
* Removes and returns last element in list stored at {@code key}.
*
@@ -347,6 +358,16 @@ default Mono move(MoveFrom from, MoveTo to, Duration timeout) {
*/
Mono rightPop(K key, Duration timeout);
+ /**
+ * Removes {@link Long count} elements from the right-side of the Redis list stored at key.
+ *
+ * @param key {@link K Key} referring to the list stored in Redis; must not be {@literal null}.
+ * @param count {@link Long count} of the number of elements to remove from the right-side of the Redis list.
+ * @return a {@link Flux} containing the elements removed from the Redis list.
+ * @since 3.2
+ */
+ Flux rightPop(K key, long count);
+
/**
* Remove the last element from list at {@code sourceKey}, append it to {@code destinationKey} and return its value.
*
diff --git a/src/main/kotlin/org/springframework/data/redis/core/ReactiveListOperationsExtensions.kt b/src/main/kotlin/org/springframework/data/redis/core/ReactiveListOperationsExtensions.kt
index 1f09b2df14..e0c71ef1f4 100644
--- a/src/main/kotlin/org/springframework/data/redis/core/ReactiveListOperationsExtensions.kt
+++ b/src/main/kotlin/org/springframework/data/redis/core/ReactiveListOperationsExtensions.kt
@@ -183,6 +183,15 @@ suspend fun ReactiveListOperations.leftPopAndAwait(key:
suspend fun ReactiveListOperations.leftPopAndAwait(key: K, timeout: Duration): V? =
leftPop(key, timeout).awaitFirstOrNull()
+/**
+ * Coroutines variant of [ReactiveListOperations.leftPop] with count.
+ *
+ * @author John Blum
+ * @since 3.2
+ */
+fun ReactiveListOperations.leftPopAsFlow(key: K, count :Long): Flow =
+ leftPop(key, count).asFlow()
+
/**
* Coroutines variant of [ReactiveListOperations.rightPop].
*
@@ -201,6 +210,15 @@ suspend fun ReactiveListOperations.rightPopAndAwait(key
suspend fun ReactiveListOperations.rightPopAndAwait(key: K, timeout: Duration): V? =
rightPop(key, timeout).awaitFirstOrNull()
+/**
+ * Coroutines variant of [ReactiveListOperations.rightPop] with count.
+ *
+ * @author John Blum
+ * @since 3.2
+ */
+fun ReactiveListOperations.rightPopAsFlow(key: K, count:Long): Flow =
+ rightPop(key, count).asFlow()
+
/**
* Coroutines variant of [ReactiveListOperations.rightPopAndLeftPush].
*
diff --git a/src/test/java/org/springframework/data/redis/core/DefaultReactiveListOperationsIntegrationTests.java b/src/test/java/org/springframework/data/redis/core/DefaultReactiveListOperationsIntegrationTests.java
index 3b2cdf6dc1..181d46d1b8 100644
--- a/src/test/java/org/springframework/data/redis/core/DefaultReactiveListOperationsIntegrationTests.java
+++ b/src/test/java/org/springframework/data/redis/core/DefaultReactiveListOperationsIntegrationTests.java
@@ -15,10 +15,8 @@
*/
package org.springframework.data.redis.core;
-import static org.assertj.core.api.Assertions.*;
-import static org.assertj.core.api.Assumptions.*;
-
-import reactor.test.StepVerifier;
+import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
+import static org.assertj.core.api.Assumptions.assumeThat;
import java.time.Duration;
import java.util.Collection;
@@ -35,11 +33,14 @@
import org.springframework.data.redis.test.extension.parametrized.MethodSource;
import org.springframework.data.redis.test.extension.parametrized.ParameterizedRedisTest;
+import reactor.test.StepVerifier;
+
/**
* Integration tests for {@link DefaultReactiveListOperations}.
*
* @author Mark Paluch
* @author Christoph Strobl
+ * @author John Blum
*/
@MethodSource("testParams")
@SuppressWarnings("unchecked")
@@ -458,6 +459,38 @@ void leftPop() {
listOperations.leftPop(key).as(StepVerifier::create).expectNext(value2).verifyComplete();
}
+ @ParameterizedRedisTest // GH-2692
+ @SuppressWarnings("all")
+ void leftPopWithNullKey() {
+
+ assertThatIllegalArgumentException()
+ .isThrownBy(() -> this.listOperations.leftPop(null, 100L))
+ .withMessage("Key must not be null")
+ .withNoCause();
+ }
+
+ @ParameterizedRedisTest // GH-2692
+ void leftPopWithCount() {
+
+ assumeThat(this.valueFactory).isInstanceOf(ByteBufferObjectFactory.class);
+
+ K key = keyFactory.instance();
+ V value1 = valueFactory.instance();
+ V value2 = valueFactory.instance();
+ V value3 = valueFactory.instance();
+
+ listOperations.leftPushAll(key, value1, value2, value3)
+ .as(StepVerifier::create)
+ .expectNext(3L)
+ .verifyComplete();
+
+ listOperations.leftPop(key, 2)
+ .as(StepVerifier::create)
+ .expectNext(value3)
+ .expectNext(value2)
+ .verifyComplete();
+ }
+
@ParameterizedRedisTest // DATAREDIS-602
void rightPop() {
@@ -472,6 +505,38 @@ void rightPop() {
listOperations.rightPop(key).as(StepVerifier::create).expectNext(value2).verifyComplete();
}
+ @ParameterizedRedisTest // GH-2692
+ @SuppressWarnings("all")
+ void rightPopWithNullKey() {
+
+ assertThatIllegalArgumentException()
+ .isThrownBy(() -> this.listOperations.rightPop(null, 100L))
+ .withMessage("Key must not be null")
+ .withNoCause();
+ }
+
+ @ParameterizedRedisTest // GH-2692
+ void rightPopWithCount() {
+
+ assumeThat(this.valueFactory).isInstanceOf(ByteBufferObjectFactory.class);
+
+ K key = keyFactory.instance();
+ V value1 = valueFactory.instance();
+ V value2 = valueFactory.instance();
+ V value3 = valueFactory.instance();
+
+ listOperations.rightPushAll(key, value3, value2, value1)
+ .as(StepVerifier::create)
+ .expectNext(3L)
+ .verifyComplete();
+
+ listOperations.rightPop(key, 2)
+ .as(StepVerifier::create)
+ .expectNext(value1)
+ .expectNext(value2)
+ .verifyComplete();
+ }
+
@ParameterizedRedisTest // DATAREDIS-602
void leftPopWithTimeout() {
diff --git a/src/test/kotlin/org/springframework/data/redis/core/ReactiveListOperationsExtensionsUnitTests.kt b/src/test/kotlin/org/springframework/data/redis/core/ReactiveListOperationsExtensionsUnitTests.kt
index c09d49b8fc..f7ce4346b9 100644
--- a/src/test/kotlin/org/springframework/data/redis/core/ReactiveListOperationsExtensionsUnitTests.kt
+++ b/src/test/kotlin/org/springframework/data/redis/core/ReactiveListOperationsExtensionsUnitTests.kt
@@ -32,6 +32,7 @@ import java.time.Duration
* @author Mark Paluch
* @author Sebastien Deleuze
* @author Wonwoo Lee
+ * @author John Blum
*/
class ReactiveListOperationsExtensionsUnitTests {
@@ -290,11 +291,27 @@ class ReactiveListOperationsExtensionsUnitTests {
}
}
+ @Test // GH-2692
+ fun leftPopWithCount() {
+
+ val operations = mockk>()
+
+ every { operations.leftPop(any(), any()) } returns Flux.just("foo", "bar", "baz")
+
+ runBlocking {
+ assertThat(operations.leftPopAsFlow("TestKey", 3L).toList()).containsExactly("foo", "bar", "baz")
+ }
+
+ verify {
+ operations.leftPop("TestKey", 3L)
+ }
+ }
+
@Test // DATAREDIS-937
fun blockingLeftPop() {
val operations = mockk>()
- every { operations.leftPop(any(), any()) } returns Mono.just("foo")
+ every { operations.leftPop(any(), any()) } returns Mono.just("foo")
runBlocking {
assertThat(operations.leftPopAndAwait("foo", Duration.ofDays(1))).isEqualTo("foo")
@@ -320,11 +337,28 @@ class ReactiveListOperationsExtensionsUnitTests {
}
}
+ @Test // GH-2692
+ fun rightPopWithCount() {
+
+ val operations = mockk>()
+
+ every { operations.rightPop(any(), any()) } returns Flux.just("foo", "bar", "baz")
+
+ runBlocking {
+ assertThat(operations.rightPopAsFlow("TestKey", 3L).toList())
+ .containsExactly("foo", "bar", "baz")
+ }
+
+ verify {
+ operations.rightPop("TestKey", 3L)
+ }
+ }
+
@Test // DATAREDIS-937
fun blockingRightPop() {
val operations = mockk>()
- every { operations.rightPop(any(), any()) } returns Mono.just("foo")
+ every { operations.rightPop(any(), any()) } returns Mono.just("foo")
runBlocking {
assertThat(operations.rightPopAndAwait("foo", Duration.ofDays(1))).isEqualTo("foo")