Skip to content

Commit ac1ee1e

Browse files
committed
Adds support to lPop or rPop N elements (count) in ReactiveListOperations.
Closes spring-projects#2692
1 parent 7c54dd5 commit ac1ee1e

File tree

4 files changed

+114
-10
lines changed

4 files changed

+114
-10
lines changed

src/main/java/org/springframework/data/redis/core/DefaultReactiveListOperations.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
*
4040
* @author Mark Paluch
4141
* @author Christoph Strobl
42+
* @author John Blum
4243
* @since 2.0
4344
*/
4445
class DefaultReactiveListOperations<K, V> implements ReactiveListOperations<K, V> {
@@ -244,13 +245,21 @@ public Mono<V> leftPop(K key, Duration timeout) {
244245

245246
Assert.notNull(key, "Key must not be null");
246247
Assert.notNull(timeout, "Duration must not be null");
247-
Assert.isTrue(isZeroOrGreater1Second(timeout), "Duration must be either zero or greater or equal to 1 second");
248+
Assert.isTrue(isZeroOrGreaterOneSecond(timeout), "Duration must be either zero or greater or equal to 1 second");
248249

249250
return createMono(listCommands ->
250251
listCommands.blPop(Collections.singletonList(rawKey(key)), timeout)
251252
.map(popResult -> readValue(popResult.getValue())));
252253
}
253254

255+
@Override
256+
public Flux<V> leftPop(K key, long count) {
257+
258+
Assert.notNull(key, "Key must not be null");
259+
260+
return createFlux(listCommands -> listCommands.lPop(rawKey(key), count).map(this::readValue));
261+
}
262+
254263
@Override
255264
public Mono<V> rightPop(K key) {
256265

@@ -264,13 +273,21 @@ public Mono<V> rightPop(K key, Duration timeout) {
264273

265274
Assert.notNull(key, "Key must not be null");
266275
Assert.notNull(timeout, "Duration must not be null");
267-
Assert.isTrue(isZeroOrGreater1Second(timeout), "Duration must be either zero or greater or equal to 1 second");
276+
Assert.isTrue(isZeroOrGreaterOneSecond(timeout), "Duration must be either zero or greater or equal to 1 second");
268277

269278
return createMono(listCommands ->
270279
listCommands.brPop(Collections.singletonList(rawKey(key)), timeout)
271280
.map(popResult -> readValue(popResult.getValue())));
272281
}
273282

283+
@Override
284+
public Flux<V> rightPop(K key, long count) {
285+
286+
Assert.notNull(key, "Key must not be null");
287+
288+
return createFlux(listCommands -> listCommands.rPop(rawKey(key), count).map(this::readValue));
289+
}
290+
274291
@Override
275292
public Mono<V> rightPopAndLeftPush(K sourceKey, K destinationKey) {
276293

@@ -287,7 +304,7 @@ public Mono<V> rightPopAndLeftPush(K sourceKey, K destinationKey, Duration timeo
287304
Assert.notNull(sourceKey, "Source key must not be null");
288305
Assert.notNull(destinationKey, "Destination key must not be null");
289306
Assert.notNull(timeout, "Duration must not be null");
290-
Assert.isTrue(isZeroOrGreater1Second(timeout), "Duration must be either zero or greater or equal to 1 second");
307+
Assert.isTrue(isZeroOrGreaterOneSecond(timeout), "Duration must be either zero or greater or equal to 1 second");
291308

292309
return createMono(listCommands ->
293310
listCommands.bRPopLPush(rawKey(sourceKey), rawKey(destinationKey), timeout).map(this::readValue));
@@ -315,7 +332,7 @@ private <T> Flux<T> createFlux(Function<ReactiveListCommands, Publisher<T>> func
315332
return template.doCreateFlux(connection -> function.apply(connection.listCommands()));
316333
}
317334

318-
private boolean isZeroOrGreater1Second(Duration timeout) {
335+
private boolean isZeroOrGreaterOneSecond(Duration timeout) {
319336
return timeout.isZero() || timeout.getNano() % TimeUnit.NANOSECONDS.convert(1, TimeUnit.SECONDS) == 0;
320337
}
321338

src/main/java/org/springframework/data/redis/core/ReactiveListOperations.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
*
3333
* @author Mark Paluch
3434
* @author Christoph Strobl
35+
* @author John Blum
3536
* @see <a href="https://redis.io/commands#list">Redis Documentation: List Commands</a>
3637
* @since 2.0
3738
*/
@@ -325,6 +326,16 @@ default Mono<V> move(MoveFrom<K> from, MoveTo<K> to, Duration timeout) {
325326
*/
326327
Mono<V> leftPop(K key, Duration timeout);
327328

329+
/**
330+
* Removes {@link Long count} elements from the left-side of the Redis list stored at key.
331+
*
332+
* @param key {@link K Key} referring to the list stored in Redis; must not be {@literal null}.
333+
* @param count {@link Long count} of the number of elements to remove from the left-side of the Redis list.
334+
* @return a {@link Flux} containing the elements removed from the Redis list.
335+
* @since 3.2
336+
*/
337+
Flux<V> leftPop(K key, long count);
338+
328339
/**
329340
* Removes and returns last element in list stored at {@code key}.
330341
*
@@ -347,6 +358,16 @@ default Mono<V> move(MoveFrom<K> from, MoveTo<K> to, Duration timeout) {
347358
*/
348359
Mono<V> rightPop(K key, Duration timeout);
349360

361+
/**
362+
* Removes {@link Long count} elements from the right-side of the Redis list stored at key.
363+
*
364+
* @param key {@link K Key} referring to the list stored in Redis; must not be {@literal null}.
365+
* @param count {@link Long count} of the number of elements to remove from the right-side of the Redis list.
366+
* @return a {@link Flux} containing the elements removed from the Redis list.
367+
* @since 3.2
368+
*/
369+
Flux<V> rightPop(K key, long count);
370+
350371
/**
351372
* Remove the last element from list at {@code sourceKey}, append it to {@code destinationKey} and return its value.
352373
*

src/test/java/org/springframework/data/redis/core/DefaultReactiveListOperationsIntegrationTests.java

Lines changed: 69 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,8 @@
1515
*/
1616
package org.springframework.data.redis.core;
1717

18-
import static org.assertj.core.api.Assertions.*;
19-
import static org.assertj.core.api.Assumptions.*;
20-
21-
import reactor.test.StepVerifier;
18+
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
19+
import static org.assertj.core.api.Assumptions.assumeThat;
2220

2321
import java.time.Duration;
2422
import java.util.Collection;
@@ -35,11 +33,14 @@
3533
import org.springframework.data.redis.test.extension.parametrized.MethodSource;
3634
import org.springframework.data.redis.test.extension.parametrized.ParameterizedRedisTest;
3735

36+
import reactor.test.StepVerifier;
37+
3838
/**
3939
* Integration tests for {@link DefaultReactiveListOperations}.
4040
*
4141
* @author Mark Paluch
4242
* @author Christoph Strobl
43+
* @author John Blum
4344
*/
4445
@MethodSource("testParams")
4546
@SuppressWarnings("unchecked")
@@ -458,6 +459,38 @@ void leftPop() {
458459
listOperations.leftPop(key).as(StepVerifier::create).expectNext(value2).verifyComplete();
459460
}
460461

462+
@ParameterizedRedisTest // GH-2692
463+
@SuppressWarnings("all")
464+
void leftPopWithNullKey() {
465+
466+
assertThatIllegalArgumentException()
467+
.isThrownBy(() -> this.listOperations.leftPop(null, 100L))
468+
.withMessage("Key must not be null")
469+
.withNoCause();
470+
}
471+
472+
@ParameterizedRedisTest // GH-2692
473+
void leftPopWithCount() {
474+
475+
assumeThat(this.valueFactory).isInstanceOf(ByteBufferObjectFactory.class);
476+
477+
K key = keyFactory.instance();
478+
V value1 = valueFactory.instance();
479+
V value2 = valueFactory.instance();
480+
V value3 = valueFactory.instance();
481+
482+
listOperations.leftPushAll(key, value1, value2, value3)
483+
.as(StepVerifier::create)
484+
.expectNext(3L)
485+
.verifyComplete();
486+
487+
listOperations.leftPop(key, 2)
488+
.as(StepVerifier::create)
489+
.expectNext(value3)
490+
.expectNext(value2)
491+
.verifyComplete();
492+
}
493+
461494
@ParameterizedRedisTest // DATAREDIS-602
462495
void rightPop() {
463496

@@ -472,6 +505,38 @@ void rightPop() {
472505
listOperations.rightPop(key).as(StepVerifier::create).expectNext(value2).verifyComplete();
473506
}
474507

508+
@ParameterizedRedisTest // GH-2692
509+
@SuppressWarnings("all")
510+
void rightPopWithNullKey() {
511+
512+
assertThatIllegalArgumentException()
513+
.isThrownBy(() -> this.listOperations.rightPop(null, 100L))
514+
.withMessage("Key must not be null")
515+
.withNoCause();
516+
}
517+
518+
@ParameterizedRedisTest // GH-2692
519+
void rightPopWithCount() {
520+
521+
assumeThat(this.valueFactory).isInstanceOf(ByteBufferObjectFactory.class);
522+
523+
K key = keyFactory.instance();
524+
V value1 = valueFactory.instance();
525+
V value2 = valueFactory.instance();
526+
V value3 = valueFactory.instance();
527+
528+
listOperations.rightPushAll(key, value3, value2, value1)
529+
.as(StepVerifier::create)
530+
.expectNext(3L)
531+
.verifyComplete();
532+
533+
listOperations.rightPop(key, 2)
534+
.as(StepVerifier::create)
535+
.expectNext(value1)
536+
.expectNext(value2)
537+
.verifyComplete();
538+
}
539+
475540
@ParameterizedRedisTest // DATAREDIS-602
476541
void leftPopWithTimeout() {
477542

src/test/kotlin/org/springframework/data/redis/core/ReactiveListOperationsExtensionsUnitTests.kt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import java.time.Duration
3232
* @author Mark Paluch
3333
* @author Sebastien Deleuze
3434
* @author Wonwoo Lee
35+
* @author John Blum
3536
*/
3637
class ReactiveListOperationsExtensionsUnitTests {
3738

@@ -294,7 +295,7 @@ class ReactiveListOperationsExtensionsUnitTests {
294295
fun blockingLeftPop() {
295296

296297
val operations = mockk<ReactiveListOperations<String, String>>()
297-
every { operations.leftPop(any(), any()) } returns Mono.just("foo")
298+
every { operations.leftPop(any(), any<Duration>()) } returns Mono.just("foo")
298299

299300
runBlocking {
300301
assertThat(operations.leftPopAndAwait("foo", Duration.ofDays(1))).isEqualTo("foo")
@@ -324,7 +325,7 @@ class ReactiveListOperationsExtensionsUnitTests {
324325
fun blockingRightPop() {
325326

326327
val operations = mockk<ReactiveListOperations<String, String>>()
327-
every { operations.rightPop(any(), any()) } returns Mono.just("foo")
328+
every { operations.rightPop(any(), any<Duration>()) } returns Mono.just("foo")
328329

329330
runBlocking {
330331
assertThat(operations.rightPopAndAwait("foo", Duration.ofDays(1))).isEqualTo("foo")

0 commit comments

Comments
 (0)