Skip to content

Commit d96693c

Browse files
committed
Adds support to lPop or rPop N elements from a Redis List in ReactiveListOperations.
Closes spring-projects#2692
1 parent 7c54dd5 commit d96693c

File tree

5 files changed

+165
-10
lines changed

5 files changed

+165
-10
lines changed

src/main/java/org/springframework/data/redis/core/DefaultReactiveListOperations.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
*
4040
* @author Mark Paluch
4141
* @author Christoph Strobl
42+
* @author John Blum
4243
* @since 2.0
4344
*/
4445
class DefaultReactiveListOperations<K, V> implements ReactiveListOperations<K, V> {
@@ -244,13 +245,21 @@ public Mono<V> leftPop(K key, Duration timeout) {
244245

245246
Assert.notNull(key, "Key must not be null");
246247
Assert.notNull(timeout, "Duration must not be null");
247-
Assert.isTrue(isZeroOrGreater1Second(timeout), "Duration must be either zero or greater or equal to 1 second");
248+
Assert.isTrue(isZeroOrGreaterOneSecond(timeout), "Duration must be either zero or greater or equal to 1 second");
248249

249250
return createMono(listCommands ->
250251
listCommands.blPop(Collections.singletonList(rawKey(key)), timeout)
251252
.map(popResult -> readValue(popResult.getValue())));
252253
}
253254

255+
@Override
256+
public Flux<V> leftPop(K key, long count) {
257+
258+
Assert.notNull(key, "Key must not be null");
259+
260+
return createFlux(listCommands -> listCommands.lPop(rawKey(key), count).map(this::readValue));
261+
}
262+
254263
@Override
255264
public Mono<V> rightPop(K key) {
256265

@@ -264,13 +273,21 @@ public Mono<V> rightPop(K key, Duration timeout) {
264273

265274
Assert.notNull(key, "Key must not be null");
266275
Assert.notNull(timeout, "Duration must not be null");
267-
Assert.isTrue(isZeroOrGreater1Second(timeout), "Duration must be either zero or greater or equal to 1 second");
276+
Assert.isTrue(isZeroOrGreaterOneSecond(timeout), "Duration must be either zero or greater or equal to 1 second");
268277

269278
return createMono(listCommands ->
270279
listCommands.brPop(Collections.singletonList(rawKey(key)), timeout)
271280
.map(popResult -> readValue(popResult.getValue())));
272281
}
273282

283+
@Override
284+
public Flux<V> rightPop(K key, long count) {
285+
286+
Assert.notNull(key, "Key must not be null");
287+
288+
return createFlux(listCommands -> listCommands.rPop(rawKey(key), count).map(this::readValue));
289+
}
290+
274291
@Override
275292
public Mono<V> rightPopAndLeftPush(K sourceKey, K destinationKey) {
276293

@@ -287,7 +304,7 @@ public Mono<V> rightPopAndLeftPush(K sourceKey, K destinationKey, Duration timeo
287304
Assert.notNull(sourceKey, "Source key must not be null");
288305
Assert.notNull(destinationKey, "Destination key must not be null");
289306
Assert.notNull(timeout, "Duration must not be null");
290-
Assert.isTrue(isZeroOrGreater1Second(timeout), "Duration must be either zero or greater or equal to 1 second");
307+
Assert.isTrue(isZeroOrGreaterOneSecond(timeout), "Duration must be either zero or greater or equal to 1 second");
291308

292309
return createMono(listCommands ->
293310
listCommands.bRPopLPush(rawKey(sourceKey), rawKey(destinationKey), timeout).map(this::readValue));
@@ -315,7 +332,7 @@ private <T> Flux<T> createFlux(Function<ReactiveListCommands, Publisher<T>> func
315332
return template.doCreateFlux(connection -> function.apply(connection.listCommands()));
316333
}
317334

318-
private boolean isZeroOrGreater1Second(Duration timeout) {
335+
private boolean isZeroOrGreaterOneSecond(Duration timeout) {
319336
return timeout.isZero() || timeout.getNano() % TimeUnit.NANOSECONDS.convert(1, TimeUnit.SECONDS) == 0;
320337
}
321338

src/main/java/org/springframework/data/redis/core/ReactiveListOperations.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
*
3333
* @author Mark Paluch
3434
* @author Christoph Strobl
35+
* @author John Blum
3536
* @see <a href="https://redis.io/commands#list">Redis Documentation: List Commands</a>
3637
* @since 2.0
3738
*/
@@ -325,6 +326,16 @@ default Mono<V> move(MoveFrom<K> from, MoveTo<K> to, Duration timeout) {
325326
*/
326327
Mono<V> leftPop(K key, Duration timeout);
327328

329+
/**
330+
* Removes {@link Long count} elements from the left-side of the Redis list stored at key.
331+
*
332+
* @param key {@link K Key} referring to the list stored in Redis; must not be {@literal null}.
333+
* @param count {@link Long count} of the number of elements to remove from the left-side of the Redis list.
334+
* @return a {@link Flux} containing the elements removed from the Redis list.
335+
* @since 3.2
336+
*/
337+
Flux<V> leftPop(K key, long count);
338+
328339
/**
329340
* Removes and returns last element in list stored at {@code key}.
330341
*
@@ -347,6 +358,16 @@ default Mono<V> move(MoveFrom<K> from, MoveTo<K> to, Duration timeout) {
347358
*/
348359
Mono<V> rightPop(K key, Duration timeout);
349360

361+
/**
362+
* Removes {@link Long count} elements from the right-side of the Redis list stored at key.
363+
*
364+
* @param key {@link K Key} referring to the list stored in Redis; must not be {@literal null}.
365+
* @param count {@link Long count} of the number of elements to remove from the right-side of the Redis list.
366+
* @return a {@link Flux} containing the elements removed from the Redis list.
367+
* @since 3.2
368+
*/
369+
Flux<V> rightPop(K key, long count);
370+
350371
/**
351372
* Remove the last element from list at {@code sourceKey}, append it to {@code destinationKey} and return its value.
352373
*

src/main/kotlin/org/springframework/data/redis/core/ReactiveListOperationsExtensions.kt

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,15 @@ suspend fun <K : Any, V : Any> ReactiveListOperations<K, V>.leftPopAndAwait(key:
183183
suspend fun <K : Any, V : Any> ReactiveListOperations<K, V>.leftPopAndAwait(key: K, timeout: Duration): V? =
184184
leftPop(key, timeout).awaitFirstOrNull()
185185

186+
/**
187+
* Coroutines variant of [ReactiveListOperations.leftPop] with count.
188+
*
189+
* @author John Blum
190+
* @since 3.2
191+
*/
192+
fun <K : Any, V : Any> ReactiveListOperations<K, V>.leftPopAsFlow(key: K, count :Long): Flow<V> =
193+
leftPop(key, count).asFlow()
194+
186195
/**
187196
* Coroutines variant of [ReactiveListOperations.rightPop].
188197
*
@@ -201,6 +210,15 @@ suspend fun <K : Any, V : Any> ReactiveListOperations<K, V>.rightPopAndAwait(key
201210
suspend fun <K : Any, V : Any> ReactiveListOperations<K, V>.rightPopAndAwait(key: K, timeout: Duration): V? =
202211
rightPop(key, timeout).awaitFirstOrNull()
203212

213+
/**
214+
* Coroutines variant of [ReactiveListOperations.rightPop] with count.
215+
*
216+
* @author John Blum
217+
* @since 3.2
218+
*/
219+
fun <K : Any, V : Any> ReactiveListOperations<K, V>.rightPopAsFlow(key: K, count:Long): Flow<V> =
220+
rightPop(key, count).asFlow()
221+
204222
/**
205223
* Coroutines variant of [ReactiveListOperations.rightPopAndLeftPush].
206224
*

src/test/java/org/springframework/data/redis/core/DefaultReactiveListOperationsIntegrationTests.java

Lines changed: 69 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,8 @@
1515
*/
1616
package org.springframework.data.redis.core;
1717

18-
import static org.assertj.core.api.Assertions.*;
19-
import static org.assertj.core.api.Assumptions.*;
20-
21-
import reactor.test.StepVerifier;
18+
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
19+
import static org.assertj.core.api.Assumptions.assumeThat;
2220

2321
import java.time.Duration;
2422
import java.util.Collection;
@@ -35,11 +33,14 @@
3533
import org.springframework.data.redis.test.extension.parametrized.MethodSource;
3634
import org.springframework.data.redis.test.extension.parametrized.ParameterizedRedisTest;
3735

36+
import reactor.test.StepVerifier;
37+
3838
/**
3939
* Integration tests for {@link DefaultReactiveListOperations}.
4040
*
4141
* @author Mark Paluch
4242
* @author Christoph Strobl
43+
* @author John Blum
4344
*/
4445
@MethodSource("testParams")
4546
@SuppressWarnings("unchecked")
@@ -458,6 +459,38 @@ void leftPop() {
458459
listOperations.leftPop(key).as(StepVerifier::create).expectNext(value2).verifyComplete();
459460
}
460461

462+
@ParameterizedRedisTest // GH-2692
463+
@SuppressWarnings("all")
464+
void leftPopWithNullKey() {
465+
466+
assertThatIllegalArgumentException()
467+
.isThrownBy(() -> this.listOperations.leftPop(null, 100L))
468+
.withMessage("Key must not be null")
469+
.withNoCause();
470+
}
471+
472+
@ParameterizedRedisTest // GH-2692
473+
void leftPopWithCount() {
474+
475+
assumeThat(this.valueFactory).isInstanceOf(ByteBufferObjectFactory.class);
476+
477+
K key = keyFactory.instance();
478+
V value1 = valueFactory.instance();
479+
V value2 = valueFactory.instance();
480+
V value3 = valueFactory.instance();
481+
482+
listOperations.leftPushAll(key, value1, value2, value3)
483+
.as(StepVerifier::create)
484+
.expectNext(3L)
485+
.verifyComplete();
486+
487+
listOperations.leftPop(key, 2)
488+
.as(StepVerifier::create)
489+
.expectNext(value3)
490+
.expectNext(value2)
491+
.verifyComplete();
492+
}
493+
461494
@ParameterizedRedisTest // DATAREDIS-602
462495
void rightPop() {
463496

@@ -472,6 +505,38 @@ void rightPop() {
472505
listOperations.rightPop(key).as(StepVerifier::create).expectNext(value2).verifyComplete();
473506
}
474507

508+
@ParameterizedRedisTest // GH-2692
509+
@SuppressWarnings("all")
510+
void rightPopWithNullKey() {
511+
512+
assertThatIllegalArgumentException()
513+
.isThrownBy(() -> this.listOperations.rightPop(null, 100L))
514+
.withMessage("Key must not be null")
515+
.withNoCause();
516+
}
517+
518+
@ParameterizedRedisTest // GH-2692
519+
void rightPopWithCount() {
520+
521+
assumeThat(this.valueFactory).isInstanceOf(ByteBufferObjectFactory.class);
522+
523+
K key = keyFactory.instance();
524+
V value1 = valueFactory.instance();
525+
V value2 = valueFactory.instance();
526+
V value3 = valueFactory.instance();
527+
528+
listOperations.rightPushAll(key, value3, value2, value1)
529+
.as(StepVerifier::create)
530+
.expectNext(3L)
531+
.verifyComplete();
532+
533+
listOperations.rightPop(key, 2)
534+
.as(StepVerifier::create)
535+
.expectNext(value1)
536+
.expectNext(value2)
537+
.verifyComplete();
538+
}
539+
475540
@ParameterizedRedisTest // DATAREDIS-602
476541
void leftPopWithTimeout() {
477542

src/test/kotlin/org/springframework/data/redis/core/ReactiveListOperationsExtensionsUnitTests.kt

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import java.time.Duration
3232
* @author Mark Paluch
3333
* @author Sebastien Deleuze
3434
* @author Wonwoo Lee
35+
* @author John Blum
3536
*/
3637
class ReactiveListOperationsExtensionsUnitTests {
3738

@@ -290,11 +291,27 @@ class ReactiveListOperationsExtensionsUnitTests {
290291
}
291292
}
292293

294+
@Test // GH-2692
295+
fun leftPopWithCount() {
296+
297+
val operations = mockk<ReactiveListOperations<String, String>>()
298+
299+
every { operations.leftPop(any(), any<Long>()) } returns Flux.just("foo", "bar", "baz")
300+
301+
runBlocking {
302+
assertThat(operations.leftPopAsFlow("TestKey", 3L).toList()).containsExactly("foo", "bar", "baz")
303+
}
304+
305+
verify {
306+
operations.leftPop("TestKey", 3L)
307+
}
308+
}
309+
293310
@Test // DATAREDIS-937
294311
fun blockingLeftPop() {
295312

296313
val operations = mockk<ReactiveListOperations<String, String>>()
297-
every { operations.leftPop(any(), any()) } returns Mono.just("foo")
314+
every { operations.leftPop(any(), any<Duration>()) } returns Mono.just("foo")
298315

299316
runBlocking {
300317
assertThat(operations.leftPopAndAwait("foo", Duration.ofDays(1))).isEqualTo("foo")
@@ -320,11 +337,28 @@ class ReactiveListOperationsExtensionsUnitTests {
320337
}
321338
}
322339

340+
@Test // GH-2692
341+
fun rightPopWithCount() {
342+
343+
val operations = mockk<ReactiveListOperations<String, String>>()
344+
345+
every { operations.rightPop(any(), any<Long>()) } returns Flux.just("foo", "bar", "baz")
346+
347+
runBlocking {
348+
assertThat(operations.rightPopAsFlow("TestKey", 3L).toList())
349+
.containsExactly("foo", "bar", "baz")
350+
}
351+
352+
verify {
353+
operations.rightPop("TestKey", 3L)
354+
}
355+
}
356+
323357
@Test // DATAREDIS-937
324358
fun blockingRightPop() {
325359

326360
val operations = mockk<ReactiveListOperations<String, String>>()
327-
every { operations.rightPop(any(), any()) } returns Mono.just("foo")
361+
every { operations.rightPop(any(), any<Duration>()) } returns Mono.just("foo")
328362

329363
runBlocking {
330364
assertThat(operations.rightPopAndAwait("foo", Duration.ofDays(1))).isEqualTo("foo")

0 commit comments

Comments
 (0)