Skip to content

Add support to lPop or rPop N elements from a Redis list in ReactiveListOperations #2704

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>3.2.0-SNAPSHOT</version>
<version>3.2.0-GH-2692-SNAPSHOT</version>

<name>Spring Data Redis</name>
<description>Spring Data module for Redis</description>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
*
* @author Mark Paluch
* @author Christoph Strobl
* @author John Blum
* @since 2.0
*/
class DefaultReactiveListOperations<K, V> implements ReactiveListOperations<K, V> {
Expand Down Expand Up @@ -244,13 +245,21 @@ public Mono<V> leftPop(K key, Duration timeout) {

Assert.notNull(key, "Key must not be null");
Assert.notNull(timeout, "Duration must not be null");
Assert.isTrue(isZeroOrGreater1Second(timeout), "Duration must be either zero or greater or equal to 1 second");
Assert.isTrue(isZeroOrGreaterOneSecond(timeout), "Duration must be either zero or greater or equal to 1 second");

return createMono(listCommands ->
listCommands.blPop(Collections.singletonList(rawKey(key)), timeout)
.map(popResult -> readValue(popResult.getValue())));
}

@Override
public Flux<V> leftPop(K key, long count) {

Assert.notNull(key, "Key must not be null");

return createFlux(listCommands -> listCommands.lPop(rawKey(key), count).map(this::readValue));
}

@Override
public Mono<V> rightPop(K key) {

Expand All @@ -264,13 +273,21 @@ public Mono<V> rightPop(K key, Duration timeout) {

Assert.notNull(key, "Key must not be null");
Assert.notNull(timeout, "Duration must not be null");
Assert.isTrue(isZeroOrGreater1Second(timeout), "Duration must be either zero or greater or equal to 1 second");
Assert.isTrue(isZeroOrGreaterOneSecond(timeout), "Duration must be either zero or greater or equal to 1 second");

return createMono(listCommands ->
listCommands.brPop(Collections.singletonList(rawKey(key)), timeout)
.map(popResult -> readValue(popResult.getValue())));
}

@Override
public Flux<V> rightPop(K key, long count) {

Assert.notNull(key, "Key must not be null");

return createFlux(listCommands -> listCommands.rPop(rawKey(key), count).map(this::readValue));
}

@Override
public Mono<V> rightPopAndLeftPush(K sourceKey, K destinationKey) {

Expand All @@ -287,7 +304,7 @@ public Mono<V> rightPopAndLeftPush(K sourceKey, K destinationKey, Duration timeo
Assert.notNull(sourceKey, "Source key must not be null");
Assert.notNull(destinationKey, "Destination key must not be null");
Assert.notNull(timeout, "Duration must not be null");
Assert.isTrue(isZeroOrGreater1Second(timeout), "Duration must be either zero or greater or equal to 1 second");
Assert.isTrue(isZeroOrGreaterOneSecond(timeout), "Duration must be either zero or greater or equal to 1 second");

return createMono(listCommands ->
listCommands.bRPopLPush(rawKey(sourceKey), rawKey(destinationKey), timeout).map(this::readValue));
Expand Down Expand Up @@ -315,7 +332,7 @@ private <T> Flux<T> createFlux(Function<ReactiveListCommands, Publisher<T>> func
return template.doCreateFlux(connection -> function.apply(connection.listCommands()));
}

private boolean isZeroOrGreater1Second(Duration timeout) {
private boolean isZeroOrGreaterOneSecond(Duration timeout) {
return timeout.isZero() || timeout.getNano() % TimeUnit.NANOSECONDS.convert(1, TimeUnit.SECONDS) == 0;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
*
* @author Mark Paluch
* @author Christoph Strobl
* @author John Blum
* @see <a href="https://redis.io/commands#list">Redis Documentation: List Commands</a>
* @since 2.0
*/
Expand Down Expand Up @@ -325,6 +326,16 @@ default Mono<V> move(MoveFrom<K> from, MoveTo<K> to, Duration timeout) {
*/
Mono<V> leftPop(K key, Duration timeout);

/**
* Removes {@link Long count} elements from the left-side of the Redis list stored at key.
*
* @param key {@link K Key} referring to the list stored in Redis; must not be {@literal null}.
* @param count {@link Long count} of the number of elements to remove from the left-side of the Redis list.
* @return a {@link Flux} containing the elements removed from the Redis list.
* @since 3.2
*/
Flux<V> leftPop(K key, long count);

/**
* Removes and returns last element in list stored at {@code key}.
*
Expand All @@ -347,6 +358,16 @@ default Mono<V> move(MoveFrom<K> from, MoveTo<K> to, Duration timeout) {
*/
Mono<V> rightPop(K key, Duration timeout);

/**
* Removes {@link Long count} elements from the right-side of the Redis list stored at key.
*
* @param key {@link K Key} referring to the list stored in Redis; must not be {@literal null}.
* @param count {@link Long count} of the number of elements to remove from the right-side of the Redis list.
* @return a {@link Flux} containing the elements removed from the Redis list.
* @since 3.2
*/
Flux<V> rightPop(K key, long count);

/**
* Remove the last element from list at {@code sourceKey}, append it to {@code destinationKey} and return its value.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,15 @@ suspend fun <K : Any, V : Any> ReactiveListOperations<K, V>.leftPopAndAwait(key:
suspend fun <K : Any, V : Any> ReactiveListOperations<K, V>.leftPopAndAwait(key: K, timeout: Duration): V? =
leftPop(key, timeout).awaitFirstOrNull()

/**
* Coroutines variant of [ReactiveListOperations.leftPop] with count.
*
* @author John Blum
* @since 3.2
*/
fun <K : Any, V : Any> ReactiveListOperations<K, V>.leftPopAsFlow(key: K, count :Long): Flow<V> =
leftPop(key, count).asFlow()

/**
* Coroutines variant of [ReactiveListOperations.rightPop].
*
Expand All @@ -201,6 +210,15 @@ suspend fun <K : Any, V : Any> ReactiveListOperations<K, V>.rightPopAndAwait(key
suspend fun <K : Any, V : Any> ReactiveListOperations<K, V>.rightPopAndAwait(key: K, timeout: Duration): V? =
rightPop(key, timeout).awaitFirstOrNull()

/**
* Coroutines variant of [ReactiveListOperations.rightPop] with count.
*
* @author John Blum
* @since 3.2
*/
fun <K : Any, V : Any> ReactiveListOperations<K, V>.rightPopAsFlow(key: K, count:Long): Flow<V> =
rightPop(key, count).asFlow()

/**
* Coroutines variant of [ReactiveListOperations.rightPopAndLeftPush].
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,8 @@
*/
package org.springframework.data.redis.core;

import static org.assertj.core.api.Assertions.*;
import static org.assertj.core.api.Assumptions.*;

import reactor.test.StepVerifier;
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
import static org.assertj.core.api.Assumptions.assumeThat;

import java.time.Duration;
import java.util.Collection;
Expand All @@ -35,11 +33,14 @@
import org.springframework.data.redis.test.extension.parametrized.MethodSource;
import org.springframework.data.redis.test.extension.parametrized.ParameterizedRedisTest;

import reactor.test.StepVerifier;

/**
* Integration tests for {@link DefaultReactiveListOperations}.
*
* @author Mark Paluch
* @author Christoph Strobl
* @author John Blum
*/
@MethodSource("testParams")
@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -458,6 +459,38 @@ void leftPop() {
listOperations.leftPop(key).as(StepVerifier::create).expectNext(value2).verifyComplete();
}

@ParameterizedRedisTest // GH-2692
@SuppressWarnings("all")
void leftPopWithNullKey() {

assertThatIllegalArgumentException()
.isThrownBy(() -> this.listOperations.leftPop(null, 100L))
.withMessage("Key must not be null")
.withNoCause();
}

@ParameterizedRedisTest // GH-2692
void leftPopWithCount() {

assumeThat(this.valueFactory).isInstanceOf(ByteBufferObjectFactory.class);

K key = keyFactory.instance();
V value1 = valueFactory.instance();
V value2 = valueFactory.instance();
V value3 = valueFactory.instance();

listOperations.leftPushAll(key, value1, value2, value3)
.as(StepVerifier::create)
.expectNext(3L)
.verifyComplete();

listOperations.leftPop(key, 2)
.as(StepVerifier::create)
.expectNext(value3)
.expectNext(value2)
.verifyComplete();
}

@ParameterizedRedisTest // DATAREDIS-602
void rightPop() {

Expand All @@ -472,6 +505,38 @@ void rightPop() {
listOperations.rightPop(key).as(StepVerifier::create).expectNext(value2).verifyComplete();
}

@ParameterizedRedisTest // GH-2692
@SuppressWarnings("all")
void rightPopWithNullKey() {

assertThatIllegalArgumentException()
.isThrownBy(() -> this.listOperations.rightPop(null, 100L))
.withMessage("Key must not be null")
.withNoCause();
}

@ParameterizedRedisTest // GH-2692
void rightPopWithCount() {

assumeThat(this.valueFactory).isInstanceOf(ByteBufferObjectFactory.class);

K key = keyFactory.instance();
V value1 = valueFactory.instance();
V value2 = valueFactory.instance();
V value3 = valueFactory.instance();

listOperations.rightPushAll(key, value3, value2, value1)
.as(StepVerifier::create)
.expectNext(3L)
.verifyComplete();

listOperations.rightPop(key, 2)
.as(StepVerifier::create)
.expectNext(value1)
.expectNext(value2)
.verifyComplete();
}

@ParameterizedRedisTest // DATAREDIS-602
void leftPopWithTimeout() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import java.time.Duration
* @author Mark Paluch
* @author Sebastien Deleuze
* @author Wonwoo Lee
* @author John Blum
*/
class ReactiveListOperationsExtensionsUnitTests {

Expand Down Expand Up @@ -290,11 +291,27 @@ class ReactiveListOperationsExtensionsUnitTests {
}
}

@Test // GH-2692
fun leftPopWithCount() {

val operations = mockk<ReactiveListOperations<String, String>>()

every { operations.leftPop(any(), any<Long>()) } returns Flux.just("foo", "bar", "baz")

runBlocking {
assertThat(operations.leftPopAsFlow("TestKey", 3L).toList()).containsExactly("foo", "bar", "baz")
}

verify {
operations.leftPop("TestKey", 3L)
}
}

@Test // DATAREDIS-937
fun blockingLeftPop() {

val operations = mockk<ReactiveListOperations<String, String>>()
every { operations.leftPop(any(), any()) } returns Mono.just("foo")
every { operations.leftPop(any(), any<Duration>()) } returns Mono.just("foo")

runBlocking {
assertThat(operations.leftPopAndAwait("foo", Duration.ofDays(1))).isEqualTo("foo")
Expand All @@ -320,11 +337,28 @@ class ReactiveListOperationsExtensionsUnitTests {
}
}

@Test // GH-2692
fun rightPopWithCount() {

val operations = mockk<ReactiveListOperations<String, String>>()

every { operations.rightPop(any(), any<Long>()) } returns Flux.just("foo", "bar", "baz")

runBlocking {
assertThat(operations.rightPopAsFlow("TestKey", 3L).toList())
.containsExactly("foo", "bar", "baz")
}

verify {
operations.rightPop("TestKey", 3L)
}
}

@Test // DATAREDIS-937
fun blockingRightPop() {

val operations = mockk<ReactiveListOperations<String, String>>()
every { operations.rightPop(any(), any()) } returns Mono.just("foo")
every { operations.rightPop(any(), any<Duration>()) } returns Mono.just("foo")

runBlocking {
assertThat(operations.rightPopAndAwait("foo", Duration.ofDays(1))).isEqualTo("foo")
Expand Down