Skip to content

Commit bdf15de

Browse files
Support precise sub second timeout for BZPOPMIN, BZPOPMAX.
In cases where the timeout is defined in msec we calculate a precise timeout for the BZPOP* operations. #2102 will bring this behaviour also to List commands. Original Pull Request: #2088
1 parent 739b917 commit bdf15de

11 files changed

+96
-33
lines changed

src/main/java/org/springframework/data/redis/connection/ReactiveZSetCommands.java

+35-15
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,10 @@
2525
import java.util.Collections;
2626
import java.util.List;
2727
import java.util.Optional;
28+
import java.util.concurrent.TimeUnit;
2829
import java.util.function.Function;
2930

3031
import org.reactivestreams.Publisher;
31-
3232
import org.springframework.data.domain.Range;
3333
import org.springframework.data.domain.Sort.Direction;
3434
import org.springframework.data.redis.connection.ReactiveRedisConnection.CommandResponse;
@@ -163,8 +163,7 @@ public ZAddCommand incr() {
163163
}
164164

165165
/**
166-
* Applies {@literal GT} mode. Constructs a new command
167-
* instance with all previously configured properties.
166+
* Applies {@literal GT} mode. Constructs a new command instance with all previously configured properties.
168167
*
169168
* @return a new {@link ZAddCommand} with {@literal incr} applied.
170169
* @since 2.5
@@ -174,8 +173,7 @@ public ZAddCommand gt() {
174173
}
175174

176175
/**
177-
* Applies {@literal LT} mode. Constructs a new command
178-
* instance with all previously configured properties.
176+
* Applies {@literal LT} mode. Constructs a new command instance with all previously configured properties.
179177
*
180178
* @return a new {@link ZAddCommand} with {@literal incr} applied.
181179
* @since 2.5
@@ -206,7 +204,6 @@ public boolean isIncr() {
206204
}
207205

208206
/**
209-
*
210207
* @return {@literal true} if {@literal GT} is set.
211208
* @since 2.5
212209
*/
@@ -1337,15 +1334,18 @@ class BZPopCommand extends KeyCommand {
13371334

13381335
private final PopDirection direction;
13391336

1340-
private final Duration timeout;
1337+
private final @Nullable TimeUnit timeUnit;
1338+
private final @Nullable Long timeout;
13411339

13421340
private final long count;
13431341

1344-
private BZPopCommand(@Nullable ByteBuffer key, Duration timeout, long count, PopDirection direction) {
1342+
private BZPopCommand(@Nullable ByteBuffer key, @Nullable Long timeout, @Nullable TimeUnit timeUnit, long count,
1343+
PopDirection direction) {
13451344

13461345
super(key);
13471346
this.count = count;
13481347
this.timeout = timeout;
1348+
this.timeUnit = timeUnit;
13491349
this.direction = direction;
13501350
}
13511351

@@ -1355,7 +1355,7 @@ private BZPopCommand(@Nullable ByteBuffer key, Duration timeout, long count, Pop
13551355
* @return a new {@link BZPopCommand} for min pop ({@literal ZPOPMIN}).
13561356
*/
13571357
public static BZPopCommand min() {
1358-
return new BZPopCommand(null, null, 0, PopDirection.MIN);
1358+
return new BZPopCommand(null, null, null, 0, PopDirection.MIN);
13591359
}
13601360

13611361
/**
@@ -1364,7 +1364,7 @@ public static BZPopCommand min() {
13641364
* @return a new {@link BZPopCommand} for max pop ({@literal ZPOPMAX}).
13651365
*/
13661366
public static BZPopCommand max() {
1367-
return new BZPopCommand(null, null, 0, PopDirection.MAX);
1367+
return new BZPopCommand(null, null, null, 0, PopDirection.MAX);
13681368
}
13691369

13701370
/**
@@ -1377,7 +1377,7 @@ public BZPopCommand from(ByteBuffer key) {
13771377

13781378
Assert.notNull(key, "Key must not be null!");
13791379

1380-
return new BZPopCommand(key, timeout, count, direction);
1380+
return new BZPopCommand(key, timeout, timeUnit, count, direction);
13811381
}
13821382

13831383
/**
@@ -1387,7 +1387,7 @@ public BZPopCommand from(ByteBuffer key) {
13871387
* @return a new {@link BZPopCommand} with {@literal value} applied.
13881388
*/
13891389
public BZPopCommand count(long count) {
1390-
return new BZPopCommand(getKey(), timeout, count, direction);
1390+
return new BZPopCommand(getKey(), timeout, timeUnit, count, direction);
13911391
}
13921392

13931393
/**
@@ -1400,7 +1400,21 @@ public BZPopCommand blockingFor(Duration timeout) {
14001400

14011401
Assert.notNull(timeout, "Timeout must not be null!");
14021402

1403-
return new BZPopCommand(getKey(), timeout, count, direction);
1403+
return blockingFor(timeout.toMillis(), TimeUnit.MILLISECONDS);
1404+
}
1405+
1406+
/**
1407+
* Applies a {@link Duration timeout}. Constructs a new command instance with all previously configured properties.
1408+
*
1409+
* @param timeout value.
1410+
* @param timeout must not be {@literal null}.
1411+
* @return a new {@link BZPopCommand} with {@link Duration timeout} applied.
1412+
*/
1413+
public BZPopCommand blockingFor(long timeout, TimeUnit timeUnit) {
1414+
1415+
Assert.notNull(timeUnit, "TimeUnit must not be null!");
1416+
1417+
return new BZPopCommand(getKey(), timeout, timeUnit, count, direction);
14041418
}
14051419

14061420
/**
@@ -1410,10 +1424,16 @@ public PopDirection getDirection() {
14101424
return direction;
14111425
}
14121426

1413-
public Duration getTimeout() {
1427+
@Nullable
1428+
public Long getTimeout() {
14141429
return timeout;
14151430
}
14161431

1432+
@Nullable
1433+
public TimeUnit getTimeUnit() {
1434+
return timeUnit;
1435+
}
1436+
14171437
public long getCount() {
14181438
return count;
14191439
}
@@ -1664,7 +1684,7 @@ public static ZMScoreCommand scoreOf(ByteBuffer member) {
16641684
/**
16651685
* Creates a new {@link ZMScoreCommand} given a {@link List members}.
16661686
*
1667-
* @param member must not be {@literal null}.
1687+
* @param members must not be {@literal null}.
16681688
* @return a new {@link ZMScoreCommand} for {@link List} of members.
16691689
*/
16701690
public static ZMScoreCommand scoreOf(Collection<ByteBuffer> members) {

src/main/java/org/springframework/data/redis/connection/RedisZSetCommands.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -997,8 +997,8 @@ default Long zCount(byte[] key, double min, double max) {
997997
Set<Tuple> zPopMin(byte[] key, long count);
998998

999999
/**
1000-
* Remove and return the value with its score having the lowest score from sorted set at {@code key}. <b>Blocks
1001-
* connection</b> until element available or {@code timeout} reached.
1000+
* Remove and return the value with its score having the lowest score from sorted set at {@code key}. <br />
1001+
* <b>Blocks connection</b> until element available or {@code timeout} reached.
10021002
*
10031003
* @param key must not be {@literal null}.
10041004
* @param timeout
@@ -1034,8 +1034,8 @@ default Long zCount(byte[] key, double min, double max) {
10341034
Set<Tuple> zPopMax(byte[] key, long count);
10351035

10361036
/**
1037-
* Remove and return the value with its score having the highest score from sorted set at {@code key}. <b>Blocks
1038-
* connection</b> until element available or {@code timeout} reached.
1037+
* Remove and return the value with its score having the highest score from sorted set at {@code key}. <br />
1038+
* <b>Blocks connection</b> until element available or {@code timeout} reached.
10391039
*
10401040
* @param key must not be {@literal null}.
10411041
* @param timeout

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveZSetCommands.java

+17-2
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@
2525
import reactor.core.publisher.Mono;
2626

2727
import java.nio.ByteBuffer;
28-
import java.time.temporal.ChronoUnit;
2928
import java.util.List;
29+
import java.util.concurrent.TimeUnit;
3030

3131
import org.reactivestreams.Publisher;
3232

@@ -390,7 +390,18 @@ public Flux<CommandResponse<BZPopCommand, Flux<Tuple>>> bZPop(Publisher<BZPopCom
390390
Assert.notNull(command.getKey(), "Key must not be null!");
391391
Assert.notNull(command.getTimeout(), "Timeout must not be null!");
392392

393-
long timeout = command.getTimeout().get(ChronoUnit.SECONDS);
393+
if(command.getTimeUnit() == TimeUnit.MILLISECONDS) {
394+
395+
double timeout = preciseTimeout(command.getTimeout(), command.getTimeUnit());
396+
397+
Mono<ScoredValue<ByteBuffer>> result = (command.getDirection() == PopDirection.MIN
398+
? cmd.bzpopmin(timeout, command.getKey())
399+
: cmd.bzpopmax(timeout, command.getKey())).filter(Value::hasValue).map(Value::getValue);
400+
401+
return new CommandResponse<>(command, result.filter(Value::hasValue).map(this::toTuple).flux());
402+
}
403+
404+
long timeout = command.getTimeUnit().toSeconds(command.getTimeout());
394405

395406
Mono<ScoredValue<ByteBuffer>> result = (command.getDirection() == PopDirection.MIN
396407
? cmd.bzpopmin(timeout, command.getKey())
@@ -624,6 +635,10 @@ private Tuple toTuple(ByteBuffer value, double score) {
624635
return new DefaultTuple(ByteUtils.getBytes(value), score);
625636
}
626637

638+
static double preciseTimeout(long val, TimeUnit unit) {
639+
return (double) unit.toMillis(val) / 1000.0D;
640+
}
641+
627642
protected LettuceReactiveRedisConnection getConnection() {
628643
return connection;
629644
}

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceZSetCommands.java

+17
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,13 @@ public Tuple bZPopMin(byte[] key, long timeout, TimeUnit unit) {
316316
Assert.notNull(key, "Key must not be null!");
317317
Assert.notNull(unit, "TimeUnit must not be null!");
318318

319+
if(TimeUnit.MILLISECONDS == unit) {
320+
321+
return connection.invoke(connection.getAsyncDedicatedConnection())
322+
.from(RedisSortedSetAsyncCommands::bzpopmin, preciseTimeout(timeout, unit), key)
323+
.get(it -> it.map(LettuceConverters::toTuple).getValueOrElse(null));
324+
}
325+
319326
return connection.invoke(connection.getAsyncDedicatedConnection())
320327
.from(RedisSortedSetAsyncCommands::bzpopmin, unit.toSeconds(timeout), key)
321328
.get(it -> it.map(LettuceConverters::toTuple).getValueOrElse(null));
@@ -359,6 +366,13 @@ public Tuple bZPopMax(byte[] key, long timeout, TimeUnit unit) {
359366
Assert.notNull(key, "Key must not be null!");
360367
Assert.notNull(unit, "TimeUnit must not be null!");
361368

369+
if(TimeUnit.MILLISECONDS == unit) {
370+
371+
return connection.invoke(connection.getAsyncDedicatedConnection())
372+
.from(RedisSortedSetAsyncCommands::bzpopmax, preciseTimeout(timeout, unit), key)
373+
.get(it -> it.map(LettuceConverters::toTuple).getValueOrElse(null));
374+
}
375+
362376
return connection.invoke(connection.getAsyncDedicatedConnection())
363377
.from(RedisSortedSetAsyncCommands::bzpopmax, unit.toSeconds(timeout), key)
364378
.get(it -> it.map(LettuceConverters::toTuple).getValueOrElse(null));
@@ -703,4 +717,7 @@ private static io.lettuce.core.ZAddArgs toZAddArgs(ZAddArgs source) {
703717
return target;
704718
}
705719

720+
static double preciseTimeout(long val, TimeUnit unit) {
721+
return (double) unit.toMillis(val) / 1000.0D;
722+
}
706723
}

src/main/java/org/springframework/data/redis/core/ZSetOperations.java

+8-8
Original file line numberDiff line numberDiff line change
@@ -365,8 +365,8 @@ static <V> TypedTuple<V> of(V value, @Nullable Double score) {
365365
Set<TypedTuple<V>> popMin(K key, long count);
366366

367367
/**
368-
* Remove and return the value with its score having the lowest score from sorted set at {@code key}. <b>Blocks
369-
* connection</b> until element available or {@code timeout} reached.
368+
* Remove and return the value with its score having the lowest score from sorted set at {@code key}. <br />
369+
* <b>Blocks connection</b> until element available or {@code timeout} reached.
370370
*
371371
* @param key must not be {@literal null}.
372372
* @param timeout
@@ -379,8 +379,8 @@ static <V> TypedTuple<V> of(V value, @Nullable Double score) {
379379
TypedTuple<V> popMin(K key, long timeout, TimeUnit unit);
380380

381381
/**
382-
* Remove and return the value with its score having the lowest score from sorted set at {@code key}. <b>Blocks
383-
* connection</b> until element available or {@code timeout} reached.
382+
* Remove and return the value with its score having the lowest score from sorted set at {@code key}. <br />
383+
* <b>Blocks connection</b> until element available or {@code timeout} reached.
384384
*
385385
* @param key must not be {@literal null}.
386386
* @param timeout must not be {@literal null}.
@@ -422,8 +422,8 @@ default TypedTuple<V> popMin(K key, Duration timeout) {
422422
Set<TypedTuple<V>> popMax(K key, long count);
423423

424424
/**
425-
* Remove and return the value with its score having the highest score from sorted set at {@code key}. <b>Blocks
426-
* connection</b> until element available or {@code timeout} reached.
425+
* Remove and return the value with its score having the highest score from sorted set at {@code key}. <br />
426+
* <b>Blocks connection</b> until element available or {@code timeout} reached.
427427
*
428428
* @param key must not be {@literal null}.
429429
* @param timeout
@@ -436,8 +436,8 @@ default TypedTuple<V> popMin(K key, Duration timeout) {
436436
TypedTuple<V> popMax(K key, long timeout, TimeUnit unit);
437437

438438
/**
439-
* Remove and return the value with its score having the highest score from sorted set at {@code key}. <b>Blocks
440-
* connection</b> until element available or {@code timeout} reached.
439+
* Remove and return the value with its score having the highest score from sorted set at {@code key}. <br />
440+
* <b>Blocks connection</b> until element available or {@code timeout} reached.
441441
*
442442
* @param key must not be {@literal null}.
443443
* @param timeout must not be {@literal null}.

src/test/java/org/springframework/data/redis/connection/AbstractConnectionIntegrationTests.java

+8-2
Original file line numberDiff line numberDiff line change
@@ -2074,6 +2074,7 @@ void testZRemRangeByLex() {
20742074

20752075
@Test
20762076
void testZRemRangeByScore() {
2077+
20772078
actual.add(connection.zAdd("myset", 2, "Bob"));
20782079
actual.add(connection.zAdd("myset", 1, "James"));
20792080
actual.add(connection.zRemRangeByScore("myset", 0d, 1d));
@@ -2084,6 +2085,7 @@ void testZRemRangeByScore() {
20842085

20852086
@Test
20862087
void testZRevRank() {
2088+
20872089
actual.add(connection.zAdd("myset", 2, "Bob"));
20882090
actual.add(connection.zAdd("myset", 1, "James"));
20892091
actual.add(connection.zAdd("myset", 3, "Joe"));
@@ -2093,6 +2095,7 @@ void testZRevRank() {
20932095

20942096
@Test
20952097
void testZScore() {
2098+
20962099
actual.add(connection.zAdd("myset", 2, "Bob"));
20972100
actual.add(connection.zAdd("myset", 1, "James"));
20982101
actual.add(connection.zAdd("myset", 3, "Joe"));
@@ -2103,15 +2106,17 @@ void testZScore() {
21032106
@Test
21042107
@EnabledOnCommand("ZMSCORE")
21052108
void testZMScore() {
2109+
21062110
actual.add(connection.zAdd("myset", 2, "Bob"));
21072111
actual.add(connection.zAdd("myset", 1, "James"));
21082112
actual.add(connection.zAdd("myset", 3, "Joe"));
2109-
actual.add(connection.zMScore("myset", "James", "Joe"));
2110-
verifyResults(Arrays.asList(new Object[] { true, true, true, Arrays.asList(1d, 3d) }));
2113+
actual.add(connection.zMScore("myset", "James", "Joe", "Dave"));
2114+
verifyResults(Arrays.asList(new Object[] { true, true, true, Arrays.asList(1d, 3d, null) }));
21112115
}
21122116

21132117
@Test
21142118
void testZUnionStore() {
2119+
21152120
actual.add(connection.zAdd("myset", 2, "Bob"));
21162121
actual.add(connection.zAdd("myset", 1, "James"));
21172122
actual.add(connection.zAdd("myset", 5, "Joe"));
@@ -2125,6 +2130,7 @@ void testZUnionStore() {
21252130

21262131
@Test
21272132
void testZUnionStoreAggWeights() {
2133+
21282134
actual.add(connection.zAdd("myset", 2, "Bob"));
21292135
actual.add(connection.zAdd("myset", 1, "James"));
21302136
actual.add(connection.zAdd("myset", 4, "Joe"));

src/test/java/org/springframework/data/redis/connection/DefaultStringRedisConnectionPipelineTests.java

+2
Original file line numberDiff line numberDiff line change
@@ -1332,12 +1332,14 @@ public void testZScore() {
13321332

13331333
@Test
13341334
public void testZMScore() {
1335+
13351336
doReturn(Collections.singletonList(Arrays.asList(1d, 3d))).when(nativeConnection).closePipeline();
13361337
super.testZMScore();
13371338
}
13381339

13391340
@Test
13401341
public void testZUnionStoreAggWeightsBytes() {
1342+
13411343
doReturn(Collections.singletonList(5L)).when(nativeConnection).closePipeline();
13421344
super.testZUnionStoreAggWeightsBytes();
13431345
}

src/test/java/org/springframework/data/redis/connection/DefaultStringRedisConnectionPipelineTxTests.java

+1
Original file line numberDiff line numberDiff line change
@@ -1435,6 +1435,7 @@ public void testZScore() {
14351435

14361436
@Test
14371437
public void testZMScore() {
1438+
14381439
doReturn(Collections.singletonList(Collections.singletonList(Arrays.asList(1d, 3d)))).when(nativeConnection)
14391440
.closePipeline();
14401441
super.testZMScore();

src/test/java/org/springframework/data/redis/connection/DefaultStringRedisConnectionTests.java

+1
Original file line numberDiff line numberDiff line change
@@ -1626,6 +1626,7 @@ public void testZScore() {
16261626

16271627
@Test
16281628
public void testZMScore() {
1629+
16291630
doReturn(Arrays.asList(1d, 3d)).when(nativeConnection).zMScore(fooBytes, barBytes, bar2Bytes);
16301631
actual.add(connection.zMScore(foo, bar, bar2));
16311632
verifyResults(Collections.singletonList(Arrays.asList(1d, 3d)));

src/test/java/org/springframework/data/redis/connection/DefaultStringRedisConnectionTxTests.java

+1
Original file line numberDiff line numberDiff line change
@@ -1318,6 +1318,7 @@ public void testZScore() {
13181318

13191319
@Test
13201320
public void testZMScore() {
1321+
13211322
doReturn(Collections.singletonList(Arrays.asList(1d, 3d))).when(nativeConnection).exec();
13221323
super.testZMScore();
13231324
}

src/test/java/org/springframework/data/redis/support/collections/AbstractRedisZSetTestIntegration.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ void testFirst() {
121121
assertThat(zSet.first()).isEqualTo(t1);
122122
}
123123

124-
@ParameterizedRedisTest
124+
@ParameterizedRedisTest // GH-2038
125125
@EnabledOnCommand("ZPOPMIN")
126126
void testPopFirst() {
127127

@@ -137,7 +137,7 @@ void testPopFirst() {
137137
assertThat(zSet).hasSize(2);
138138
}
139139

140-
@ParameterizedRedisTest
140+
@ParameterizedRedisTest // GH-2038
141141
@EnabledOnCommand("ZPOPMIN")
142142
void testPopFirstWithTimeout() {
143143

0 commit comments

Comments
 (0)