diff --git a/pom.xml b/pom.xml index 8657b1992a..416b942497 100644 --- a/pom.xml +++ b/pom.xml @@ -1,11 +1,13 @@ - + 4.0.0 org.springframework.data spring-data-redis - 2.6.0-SNAPSHOT + 2.6.0-2038-SNAPSHOT Spring Data Redis diff --git a/src/main/asciidoc/new-features.adoc b/src/main/asciidoc/new-features.adoc index eab00c043a..28d9cc7f14 100644 --- a/src/main/asciidoc/new-features.adoc +++ b/src/main/asciidoc/new-features.adoc @@ -7,7 +7,7 @@ This section briefly covers items that are new and noteworthy in the latest rele == New in Spring Data Redis 2.6 * Support for `SubscriptionListener` when using `MessageListener` for subscription confirmation callbacks. `ReactiveRedisMessageListenerContainer` and `ReactiveRedisOperations` provide `receiveLater(…)` and `listenToLater(…)` methods to await until Redis acknowledges the subscription. -* Support Redis 6.2 commands (`LPOP`/`RPOP` with `count`, `COPY`, `GETEX`, `GETDEL`). +* Support Redis 6.2 commands (`LPOP`/`RPOP` with `count`, `COPY`, `GETEX`, `GETDEL`, `ZPOPMIN`, `BZPOPMIN`, `ZPOPMAX`, `BZPOPMAX`, `ZMSCORE`). [[new-in-2.5.0]] == New in Spring Data Redis 2.5 diff --git a/src/main/java/org/springframework/data/redis/connection/DefaultStringRedisConnection.java b/src/main/java/org/springframework/data/redis/connection/DefaultStringRedisConnection.java index a6348abafd..0639751657 100644 --- a/src/main/java/org/springframework/data/redis/connection/DefaultStringRedisConnection.java +++ b/src/main/java/org/springframework/data/redis/connection/DefaultStringRedisConnection.java @@ -81,7 +81,8 @@ public class DefaultStringRedisConnection implements StringRedisConnection, Deco private final RedisSerializer serializer; private Converter bytesToString = new DeserializingConverter(); private Converter stringToBytes = new SerializingConverter(); - private SetConverter tupleToStringTuple = new SetConverter<>(new TupleConverter()); + private final TupleConverter tupleConverter = new TupleConverter(); + private SetConverter tupleToStringTuple = new SetConverter<>(tupleConverter); private SetConverter stringTupleToTuple = new SetConverter<>(new StringTupleConverter()); private ListConverter byteListToStringList = new ListConverter<>(bytesToString); private MapConverter byteMapToStringMap = new MapConverter<>(bytesToString); @@ -1722,6 +1723,15 @@ public Double zScore(byte[] key, byte[] value) { return convertAndReturn(delegate.zScore(key, value), Converters.identityConverter()); } + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisZSetCommands#zMScore(byte[], byte[][]) + */ + @Override + public List zMScore(byte[] key, byte[]... values) { + return convertAndReturn(delegate.zMScore(key, values), Converters.identityConverter()); + } + /* * (non-Javadoc) * @see org.springframework.data.redis.connection.RedisZSetCommands#zUnionStore(byte[], org.springframework.data.redis.connection.RedisZSetCommands.Aggregate, org.springframework.data.redis.connection.RedisZSetCommands.Weights, byte[][]) @@ -2852,6 +2862,126 @@ public Long zLexCount(byte[] key, Range range) { return delegate.zLexCount(key, range); } + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisZSetCommands#zPopMin(byte[]) + */ + @Nullable + @Override + public Tuple zPopMin(byte[] key) { + return delegate.zPopMin(key); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisZSetCommands#zPopMin(String) + */ + @Nullable + @Override + public StringTuple zPopMin(String key) { + return convertAndReturn(delegate.zPopMin(serialize(key)), tupleConverter); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisZSetCommands#zPopMinMin(byte[], count) + */ + @Nullable + @Override + public Set zPopMin(byte[] key, long count) { + return delegate.zPopMin(key, count); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisZSetCommands#zPopMin(String, long) + */ + @Nullable + @Override + public Set zPopMin(String key, long count) { + return convertAndReturn(delegate.zPopMin(serialize(key), count), tupleToStringTuple); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisZSetCommands#bZPopMin(byte[], long, java.util.concurrent.TimeUnit) + */ + @Nullable + @Override + public Tuple bZPopMin(byte[] key, long timeout, TimeUnit unit) { + return delegate.bZPopMin(key, timeout, unit); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisZSetCommands#bZPopMin(String, long, java.util.concurrent.TimeUnit) + */ + @Nullable + @Override + public StringTuple bZPopMin(String key, long timeout, TimeUnit unit) { + return convertAndReturn(delegate.bZPopMin(serialize(key), timeout, unit), tupleConverter); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisZSetCommands#zPopMax(byte[]) + */ + @Nullable + @Override + public Tuple zPopMax(byte[] key) { + return delegate.zPopMax(key); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisZSetCommands#zPopMax(String) + */ + @Nullable + @Override + public StringTuple zPopMax(String key) { + return convertAndReturn(delegate.zPopMax(serialize(key)), tupleConverter); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisZSetCommands#zPopMax(byte[], long) + */ + @Nullable + @Override + public Set zPopMax(byte[] key, long count) { + return delegate.zPopMax(key, count); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisZSetCommands#zPopMax(String, long) + */ + @Nullable + @Override + public Set zPopMax(String key, long count) { + return convertAndReturn(delegate.zPopMax(serialize(key), count), tupleToStringTuple); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisZSetCommands#bZPopMax(byte[], long, java.util.concurrent.TimeUnit) + */ + @Nullable + @Override + public Tuple bZPopMax(byte[] key, long timeout, TimeUnit unit) { + return delegate.bZPopMax(key, timeout, unit); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisZSetCommands#bZPopMax(String, long, java.util.concurrent.TimeUnit) + */ + @Nullable + @Override + public StringTuple bZPopMax(String key, long timeout, TimeUnit unit) { + return convertAndReturn(delegate.bZPopMax(serialize(key), timeout, unit), tupleConverter); + } + /* * (non-Javadoc) * @see org.springframework.data.redis.connection.StringRedisConnection#zIncrBy(java.lang.String, double, java.lang.String) @@ -3052,6 +3182,15 @@ public Double zScore(String key, String value) { return zScore(serialize(key), serialize(value)); } + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.StringRedisConnection#zMScore(java.lang.String, java.lang.String[]) + */ + @Override + public List zMScore(String key, String... values) { + return zMScore(serialize(key), serializeMulti(values)); + } + /* * (non-Javadoc) * @see org.springframework.data.redis.connection.StringRedisConnection#zUnionStore(java.lang.String, org.springframework.data.redis.connection.RedisZSetCommands.Aggregate, int[], java.lang.String[]) diff --git a/src/main/java/org/springframework/data/redis/connection/DefaultTuple.java b/src/main/java/org/springframework/data/redis/connection/DefaultTuple.java index 52dd426295..640c5bab53 100644 --- a/src/main/java/org/springframework/data/redis/connection/DefaultTuple.java +++ b/src/main/java/org/springframework/data/redis/connection/DefaultTuple.java @@ -89,4 +89,14 @@ public int compareTo(Double o) { Double a = (o == null ? Double.valueOf(0.0d) : o); return d.compareTo(a); } + + @Override + public String toString() { + StringBuffer sb = new StringBuffer(); + sb.append(getClass().getSimpleName()); + sb.append(" [score=").append(score); + sb.append(", value=").append(value == null ? "null" : new String(value)); + sb.append(']'); + return sb.toString(); + } } diff --git a/src/main/java/org/springframework/data/redis/connection/DefaultedRedisConnection.java b/src/main/java/org/springframework/data/redis/connection/DefaultedRedisConnection.java index fad7b59af8..cf51c4141f 100644 --- a/src/main/java/org/springframework/data/redis/connection/DefaultedRedisConnection.java +++ b/src/main/java/org/springframework/data/redis/connection/DefaultedRedisConnection.java @@ -942,6 +942,48 @@ default Long zLexCount(byte[] key, Range range) { return zSetCommands().zLexCount(key, range); } + /** @deprecated in favor of {@link RedisConnection#zSetCommands()}}. */ + @Override + @Deprecated + default Tuple zPopMin(byte[] key) { + return zSetCommands().zPopMin(key); + } + + /** @deprecated in favor of {@link RedisConnection#zSetCommands()}}. */ + @Override + @Deprecated + default Set zPopMin(byte[] key, long count) { + return zSetCommands().zPopMin(key, count); + } + + /** @deprecated in favor of {@link RedisConnection#zSetCommands()}}. */ + @Override + @Deprecated + default Tuple bZPopMin(byte[] key, long timeout, TimeUnit unit) { + return zSetCommands().bZPopMin(key, timeout, unit); + } + + /** @deprecated in favor of {@link RedisConnection#zSetCommands()}}. */ + @Override + @Deprecated + default Tuple zPopMax(byte[] key) { + return zSetCommands().zPopMax(key); + } + + /** @deprecated in favor of {@link RedisConnection#zSetCommands()}}. */ + @Override + @Deprecated + default Set zPopMax(byte[] key, long count) { + return zSetCommands().zPopMax(key, count); + } + + /** @deprecated in favor of {@link RedisConnection#zSetCommands()}}. */ + @Override + @Deprecated + default Tuple bZPopMax(byte[] key, long timeout, TimeUnit unit) { + return zSetCommands().bZPopMax(key, timeout, unit); + } + /** @deprecated in favor of {@link RedisConnection#zSetCommands()}}. */ @Override @Deprecated @@ -1103,6 +1145,13 @@ default Double zScore(byte[] key, byte[] value) { return zSetCommands().zScore(key, value); } + /** @deprecated in favor of {@link RedisConnection#zSetCommands()}}. */ + @Override + @Deprecated + default List zMScore(byte[] key, byte[]... values) { + return zSetCommands().zMScore(key, values); + } + /** @deprecated in favor of {@link RedisConnection#zSetCommands()}}. */ @Override @Deprecated diff --git a/src/main/java/org/springframework/data/redis/connection/ReactiveZSetCommands.java b/src/main/java/org/springframework/data/redis/connection/ReactiveZSetCommands.java index ed307120c2..bb3f01cedd 100644 --- a/src/main/java/org/springframework/data/redis/connection/ReactiveZSetCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/ReactiveZSetCommands.java @@ -19,11 +19,14 @@ import reactor.core.publisher.Mono; import java.nio.ByteBuffer; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; import org.reactivestreams.Publisher; import org.springframework.data.domain.Range; @@ -31,6 +34,7 @@ import org.springframework.data.redis.connection.ReactiveRedisConnection.CommandResponse; import org.springframework.data.redis.connection.ReactiveRedisConnection.KeyCommand; import org.springframework.data.redis.connection.ReactiveRedisConnection.KeyScanCommand; +import org.springframework.data.redis.connection.ReactiveRedisConnection.MultiValueResponse; import org.springframework.data.redis.connection.ReactiveRedisConnection.NumericResponse; import org.springframework.data.redis.connection.RedisZSetCommands.Aggregate; import org.springframework.data.redis.connection.RedisZSetCommands.Limit; @@ -159,8 +163,7 @@ public ZAddCommand incr() { } /** - * Applies {@literal GT} mode. Constructs a new command - * instance with all previously configured properties. + * Applies {@literal GT} mode. Constructs a new command instance with all previously configured properties. * * @return a new {@link ZAddCommand} with {@literal incr} applied. * @since 2.5 @@ -170,8 +173,7 @@ public ZAddCommand gt() { } /** - * Applies {@literal LT} mode. Constructs a new command - * instance with all previously configured properties. + * Applies {@literal LT} mode. Constructs a new command instance with all previously configured properties. * * @return a new {@link ZAddCommand} with {@literal incr} applied. * @since 2.5 @@ -202,7 +204,6 @@ public boolean isIncr() { } /** - * * @return {@literal true} if {@literal GT} is set. * @since 2.5 */ @@ -292,7 +293,7 @@ private ZRemCommand(@Nullable ByteBuffer key, List values) { * Creates a new {@link ZRemCommand} given a {@link Tuple}. * * @param value must not be {@literal null}. - * @return a new {@link ZAddCommand} for {@link Tuple}. + * @return a new {@link ZRemCommand} for {@link Tuple}. */ public static ZRemCommand values(ByteBuffer value) { @@ -305,7 +306,7 @@ public static ZRemCommand values(ByteBuffer value) { * Creates a new {@link ZRemCommand} given a {@link Collection} of {@link Tuple}. * * @param values must not be {@literal null}. - * @return a new {@link ZAddCommand} for {@link Tuple}. + * @return a new {@link ZRemCommand} for {@link Tuple}. */ public static ZRemCommand values(Collection values) { @@ -1240,6 +1241,316 @@ default Mono zLexCount(ByteBuffer key, Range range) { */ Flux> zLexCount(Publisher commands); + /** + * @author Mark Paluch + */ + enum PopDirection { + MIN, MAX + } + + /** + * {@code ZPOPMIN}/{@literal ZPOPMAX} command parameters. + * + * @author Mark Paluch + * @see Redis Documentation: ZPOPMIN + * @see Redis Documentation: ZPOPMAX + * @since 2.6 + */ + class ZPopCommand extends KeyCommand { + + private final PopDirection direction; + + private final long count; + + private ZPopCommand(PopDirection direction, @Nullable ByteBuffer key, long count) { + + super(key); + this.count = count; + this.direction = direction; + } + + /** + * Creates a new {@link ZPopCommand} for min pop ({@literal ZPOPMIN}). + * + * @return a new {@link ZPopCommand} for min pop ({@literal ZPOPMIN}). + */ + public static ZPopCommand min() { + return new ZPopCommand(PopDirection.MIN, null, 1); + } + + /** + * Creates a new {@link ZPopCommand} for max pop ({@literal ZPOPMAX}). + * + * @return a new {@link ZPopCommand} for max pop ({@literal ZPOPMAX}). + */ + public static ZPopCommand max() { + return new ZPopCommand(PopDirection.MAX, null, 1); + } + + /** + * Applies the {@literal key}. Constructs a new command instance with all previously configured properties. + * + * @param key must not be {@literal null}. + * @return a new {@link ZPopCommand} with {@literal value} applied. + */ + public ZPopCommand from(ByteBuffer key) { + + Assert.notNull(key, "Key must not be null!"); + + return new ZPopCommand(direction, key, count); + } + + /** + * Applies the {@literal key}. Constructs a new command instance with all previously configured properties. + * + * @param count + * @return a new {@link ZPopCommand} with {@literal value} applied. + */ + public ZPopCommand count(long count) { + return new ZPopCommand(direction, getKey(), count); + } + + /** + * @return never {@literal null}. + */ + public PopDirection getDirection() { + return direction; + } + + public long getCount() { + return count; + } + } + + /** + * {@code BZPOPMIN}/{@literal BZPOPMAX} command parameters. + * + * @author Mark Paluch + * @see Redis Documentation: BZPOPMIN + * @see Redis Documentation: BZPOPMAX + * @since 2.6 + */ + class BZPopCommand extends KeyCommand { + + private final PopDirection direction; + + private final @Nullable TimeUnit timeUnit; + private final @Nullable Long timeout; + + private final long count; + + private BZPopCommand(@Nullable ByteBuffer key, @Nullable Long timeout, @Nullable TimeUnit timeUnit, long count, + PopDirection direction) { + + super(key); + this.count = count; + this.timeout = timeout; + this.timeUnit = timeUnit; + this.direction = direction; + } + + /** + * Creates a new {@link BZPopCommand} for min pop ({@literal ZPOPMIN}). + * + * @return a new {@link BZPopCommand} for min pop ({@literal ZPOPMIN}). + */ + public static BZPopCommand min() { + return new BZPopCommand(null, null, null, 0, PopDirection.MIN); + } + + /** + * Creates a new {@link BZPopCommand} for max pop ({@literal ZPOPMAX}). + * + * @return a new {@link BZPopCommand} for max pop ({@literal ZPOPMAX}). + */ + public static BZPopCommand max() { + return new BZPopCommand(null, null, null, 0, PopDirection.MAX); + } + + /** + * Applies the {@literal key}. Constructs a new command instance with all previously configured properties. + * + * @param key must not be {@literal null}. + * @return a new {@link BZPopCommand} with {@literal value} applied. + */ + public BZPopCommand from(ByteBuffer key) { + + Assert.notNull(key, "Key must not be null!"); + + return new BZPopCommand(key, timeout, timeUnit, count, direction); + } + + /** + * Applies the {@literal key}. Constructs a new command instance with all previously configured properties. + * + * @param count + * @return a new {@link BZPopCommand} with {@literal value} applied. + */ + public BZPopCommand count(long count) { + return new BZPopCommand(getKey(), timeout, timeUnit, count, direction); + } + + /** + * Applies a {@link Duration timeout}. Constructs a new command instance with all previously configured properties. + * + * @param timeout must not be {@literal null}. + * @return a new {@link BZPopCommand} with {@link Duration timeout} applied. + */ + public BZPopCommand blockingFor(Duration timeout) { + + Assert.notNull(timeout, "Timeout must not be null!"); + + return blockingFor(timeout.toMillis(), TimeUnit.MILLISECONDS); + } + + /** + * Applies a {@link Duration timeout}. Constructs a new command instance with all previously configured properties. + * + * @param timeout value. + * @param timeout must not be {@literal null}. + * @return a new {@link BZPopCommand} with {@link Duration timeout} applied. + */ + public BZPopCommand blockingFor(long timeout, TimeUnit timeUnit) { + + Assert.notNull(timeUnit, "TimeUnit must not be null!"); + + return new BZPopCommand(getKey(), timeout, timeUnit, count, direction); + } + + /** + * @return never {@literal null}. + */ + public PopDirection getDirection() { + return direction; + } + + @Nullable + public Long getTimeout() { + return timeout; + } + + @Nullable + public TimeUnit getTimeUnit() { + return timeUnit; + } + + public long getCount() { + return count; + } + } + + /** + * Remove and return the value with its score having the lowest score from sorted set at {@code key}. + * + * @param key must not be {@literal null}. + * @return + * @see Redis Documentation: ZPOPMIN + * @since 2.6 + */ + default Mono zPopMin(ByteBuffer key) { + return zPop(Mono.just(ZPopCommand.min().from(key))).map(CommandResponse::getOutput).flatMap(Flux::next).next(); + } + + /** + * Remove and return {@code count} values with their score having the lowest score from sorted set at {@code key}. + * + * @param key must not be {@literal null}. + * @param count number of elements to pop. + * @return + * @see Redis Documentation: ZPOPMIN + * @since 2.6 + */ + default Flux zPopMin(ByteBuffer key, long count) { + return zPop(Mono.just(ZPopCommand.min().from(key).count(count))).map(CommandResponse::getOutput) + .flatMap(Function.identity()); + } + + /** + * Remove and return the value with its score having the lowest score from sorted set at {@code key}. Blocks + * connection until element available or {@code timeout} reached. + * + * @param key must not be {@literal null}. + * @param timeout must not be {@literal null}. + * @return + * @throws IllegalArgumentException if the timeout is {@literal null} or negative. + * @see Redis Documentation: BZPOPMIN + * @since 2.6 + */ + default Mono bZPopMin(ByteBuffer key, Duration timeout) { + + Assert.notNull(timeout, "Timeout must not be null"); + Assert.isTrue(!timeout.isNegative(), "Timeout must not be negative"); + + return bZPop(Mono.just(BZPopCommand.min().from(key).blockingFor(timeout))).map(CommandResponse::getOutput) + .flatMap(Flux::next).next(); + } + + /** + * Remove and return the value with its score having the highest score from sorted set at {@code key}. + * + * @param key must not be {@literal null}. + * @return + * @see Redis Documentation: ZPOPMAX + * @since 2.6 + */ + default Mono zPopMax(ByteBuffer key) { + return zPop(Mono.just(ZPopCommand.max().from(key))).map(CommandResponse::getOutput).flatMap(Flux::next).next(); + } + + /** + * Remove and return {@code count} values with their score having the highest score from sorted set at {@code key}. + * + * @param key must not be {@literal null}. + * @param count number of elements to pop. + * @return + * @see Redis Documentation: ZPOPMAX + * @since 2.6 + */ + default Flux zPopMax(ByteBuffer key, long count) { + return zPop(Mono.just(ZPopCommand.max().from(key).count(count))).map(CommandResponse::getOutput) + .flatMap(Function.identity()); + } + + /** + * Remove and return the value with its score having the highest score from sorted set at {@code key}. Blocks + * connection until element available or {@code timeout} reached. + * + * @param key must not be {@literal null}. + * @param timeout must not be {@literal null}. + * @return + * @throws IllegalArgumentException if the timeout is {@literal null} or negative. + * @see Redis Documentation: BZPOPMAX + * @since 2.6 + */ + default Mono bZPopMax(ByteBuffer key, Duration timeout) { + + Assert.notNull(timeout, "Timeout must not be null"); + Assert.isTrue(!timeout.isNegative(), "Timeout must not be negative"); + + return bZPop(Mono.just(BZPopCommand.max().from(key).blockingFor(timeout))).map(CommandResponse::getOutput) + .flatMap(Flux::next).next(); + } + + /** + * Remove and return elements from sorted set at {@link ByteBuffer keyCommand#getKey()}. + * + * @param commands must not be {@literal null}. + * @return + * @see Redis Documentation: ZPOPMIN + * @see Redis Documentation: ZPOPMAX + */ + Flux>> zPop(Publisher commands); + + /** + * Remove and return elements from sorted set at {@link ByteBuffer keyCommand#getKey()}. + * + * @param commands must not be {@literal null}. + * @return + * @see Redis Documentation: ZPOPMIN + * @see Redis Documentation: ZPOPMAX + */ + Flux>> bZPop(Publisher commands); + /** * Get the size of sorted set with {@literal key}. * @@ -1255,7 +1566,7 @@ default Mono zCard(ByteBuffer key) { } /** - * Get the size of sorted set with {@link KeyCommand#getKey()}. + * Get the size of sorted set with {@linByteBuffer keyCommand#getKey()}. * * @param commands must not be {@literal null}. * @return @@ -1283,7 +1594,7 @@ private ZScoreCommand(@Nullable ByteBuffer key, ByteBuffer value) { * Creates a new {@link ZScoreCommand} given a {@link ByteBuffer member}. * * @param member must not be {@literal null}. - * @return a new {@link ZScoreCommand} for {@link Range}. + * @return a new {@link ZScoreCommand} for {@link ByteBuffer member}. */ public static ZScoreCommand scoreOf(ByteBuffer member) { @@ -1339,6 +1650,99 @@ default Mono zScore(ByteBuffer key, ByteBuffer value) { */ Flux> zScore(Publisher commands); + /** + * {@code ZMSCORE} command parameters. + * + * @author Mark Paluch + * @see Redis Documentation: ZMSCORE + * @since 2.6 + */ + class ZMScoreCommand extends KeyCommand { + + private final Collection values; + + private ZMScoreCommand(@Nullable ByteBuffer key, Collection values) { + + super(key); + + this.values = values; + } + + /** + * Creates a new {@link ZMScoreCommand} given a {@link ByteBuffer member}. + * + * @param member must not be {@literal null}. + * @return a new {@link ZMScoreCommand} for {@link ByteBuffer}. + */ + public static ZMScoreCommand scoreOf(ByteBuffer member) { + + Assert.notNull(member, "Member must not be null!"); + + return new ZMScoreCommand(null, Collections.singletonList(member)); + } + + /** + * Creates a new {@link ZMScoreCommand} given a {@link List members}. + * + * @param members must not be {@literal null}. + * @return a new {@link ZMScoreCommand} for {@link List} of members. + */ + public static ZMScoreCommand scoreOf(Collection members) { + + Assert.notNull(members, "Members must not be null!"); + + return new ZMScoreCommand(null, members); + } + + /** + * Applies the {@literal key}. Constructs a new command instance with all previously configured properties. + * + * @param key must not be {@literal null}. + * @return a new {@link ZMScoreCommand} with {@literal key} applied. + */ + public ZMScoreCommand forKey(ByteBuffer key) { + + Assert.notNull(key, "Key must not be null!"); + + return new ZMScoreCommand(key, values); + } + + /** + * @return + */ + public Collection getValues() { + return values; + } + } + + /** + * Get the scores of elements with {@literal values} from sorted set with key {@literal key}. + * + * @param key must not be {@literal null}. + * @param values must not be {@literal null}. + * @return + * @see Redis Documentation: ZMSCORE + * @since 2.6 + */ + default Mono> zMScore(ByteBuffer key, Collection values) { + + Assert.notNull(key, "Key must not be null!"); + Assert.notNull(values, "Values must not be null!"); + + return zMScore(Mono.just(ZMScoreCommand.scoreOf(values).forKey(key))).next().map(MultiValueResponse::getOutput); + } + + /** + * Get the scores of elements with {@link ZMScoreCommand#getValues()} from sorted set with key + * {@link ZMScoreCommand#getKey()} + * + * @param commands must not be {@literal null}. + * @return + * @see Redis Documentation: ZMSCORE + * @since 2.6 + */ + Flux> zMScore(Publisher commands); + /** * {@code ZREMRANGEBYRANK} command parameters. * @@ -1787,7 +2191,7 @@ private ZInterStoreCommand(ByteBuffer key, List sourceKeys, List keys) { diff --git a/src/main/java/org/springframework/data/redis/connection/RedisZSetCommands.java b/src/main/java/org/springframework/data/redis/connection/RedisZSetCommands.java index fa8f5ef741..9e575e8b7c 100644 --- a/src/main/java/org/springframework/data/redis/connection/RedisZSetCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/RedisZSetCommands.java @@ -20,6 +20,7 @@ import java.util.EnumSet; import java.util.List; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.function.DoubleUnaryOperator; import java.util.function.Function; import java.util.stream.Collectors; @@ -972,6 +973,80 @@ default Long zCount(byte[] key, double min, double max) { @Nullable Long zLexCount(byte[] key, Range range); + /** + * Remove and return the value with its score having the lowest score from sorted set at {@code key}. + * + * @param key must not be {@literal null}. + * @return {@literal null} when the sorted set is empty or used in pipeline / transaction. + * @see Redis Documentation: ZPOPMIN + * @since 2.6 + */ + @Nullable + Tuple zPopMin(byte[] key); + + /** + * Remove and return {@code count} values with their score having the lowest score from sorted set at {@code key}. + * + * @param key must not be {@literal null}. + * @param count number of elements to pop. + * @return {@literal null} when the sorted set is empty or used in pipeline / transaction. + * @see Redis Documentation: ZPOPMIN + * @since 2.6 + */ + @Nullable + Set zPopMin(byte[] key, long count); + + /** + * Remove and return the value with its score having the lowest score from sorted set at {@code key}.
+ * Blocks connection until element available or {@code timeout} reached. + * + * @param key must not be {@literal null}. + * @param timeout + * @param unit must not be {@literal null}. + * @return can be {@literal null}. + * @see Redis Documentation: BZPOPMIN + * @since 2.6 + */ + @Nullable + Tuple bZPopMin(byte[] key, long timeout, TimeUnit unit); + + /** + * Remove and return the value with its score having the highest score from sorted set at {@code key}. + * + * @param key must not be {@literal null}. + * @return {@literal null} when the sorted set is empty or used in pipeline / transaction. + * @see Redis Documentation: ZPOPMAX + * @since 2.6 + */ + @Nullable + Tuple zPopMax(byte[] key); + + /** + * Remove and return {@code count} values with their score having the highest score from sorted set at {@code key}. + * + * @param key must not be {@literal null}. + * @param count number of elements to pop. + * @return {@literal null} when the sorted set is empty or used in pipeline / transaction. + * @see Redis Documentation: ZPOPMAX + * @since 2.6 + */ + @Nullable + Set zPopMax(byte[] key, long count); + + /** + * Remove and return the value with its score having the highest score from sorted set at {@code key}.
+ * Blocks connection until element available or {@code timeout} reached. + * + * @param key must not be {@literal null}. + * @param timeout + * @param unit must not be {@literal null}. + * @return can be {@literal null}. + * @see Redis Documentation: BZPOPMAX + * @since 2.6 + */ + @Nullable + Tuple bZPopMax(byte[] key, long timeout, TimeUnit unit); + /** * Get the size of sorted set with {@code key}. * @@ -993,6 +1068,18 @@ default Long zCount(byte[] key, double min, double max) { @Nullable Double zScore(byte[] key, byte[] value); + /** + * Get the scores of elements with {@code values} from sorted set with key {@code key}. + * + * @param key must not be {@literal null}. + * @param values the values. + * @return {@literal null} when used in pipeline / transaction. + * @see Redis Documentation: ZMSCORE + * @since 2.6 + */ + @Nullable + List zMScore(byte[] key, byte[]... values); + /** * Remove elements in range between {@code start} and {@code end} from sorted set with {@code key}. * diff --git a/src/main/java/org/springframework/data/redis/connection/StringRedisConnection.java b/src/main/java/org/springframework/data/redis/connection/StringRedisConnection.java index d828fbe692..469b2074d3 100644 --- a/src/main/java/org/springframework/data/redis/connection/StringRedisConnection.java +++ b/src/main/java/org/springframework/data/redis/connection/StringRedisConnection.java @@ -1434,6 +1434,80 @@ default Long lPos(String key, String element) { @Nullable Long zLexCount(String key, Range range); + /** + * Remove and return the value with its score having the lowest score from sorted set at {@code key}. + * + * @param key must not be {@literal null}. + * @return {@literal null} when the sorted set is empty or used in pipeline / transaction. + * @see Redis Documentation: ZPOPMIN + * @since 2.6 + */ + @Nullable + Tuple zPopMin(String key); + + /** + * Remove and return {@code count} values with their score having the lowest score from sorted set at {@code key}. + * + * @param key must not be {@literal null}. + * @param count number of elements to pop. + * @return {@literal null} when the sorted set is empty or used in pipeline / transaction. + * @see Redis Documentation: ZPOPMIN + * @since 2.6 + */ + @Nullable + Set zPopMin(String key, long count); + + /** + * Remove and return the value with its score having the lowest score from sorted set at {@code key}. Blocks + * connection until element available or {@code timeout} reached. + * + * @param key must not be {@literal null}. + * @param timeout + * @param unit must not be {@literal null}. + * @return can be {@literal null}. + * @see Redis Documentation: BZPOPMIN + * @since 2.6 + */ + @Nullable + StringTuple bZPopMin(String key, long timeout, TimeUnit unit); + + /** + * Remove and return the value with its score having the highest score from sorted set at {@code key}. + * + * @param key must not be {@literal null}. + * @return {@literal null} when the sorted set is empty or used in pipeline / transaction. + * @see Redis Documentation: ZPOPMAX + * @since 2.6 + */ + @Nullable + StringTuple zPopMax(String key); + + /** + * Remove and return {@code count} values with their score having the highest score from sorted set at {@code key}. + * + * @param key must not be {@literal null}. + * @param count number of elements to pop. + * @return {@literal null} when the sorted set is empty or used in pipeline / transaction. + * @see Redis Documentation: ZPOPMAX + * @since 2.6 + */ + @Nullable + Set zPopMax(String key, long count); + + /** + * Remove and return the value with its score having the highest score from sorted set at {@code key}. Blocks + * connection until element available or {@code timeout} reached. + * + * @param key must not be {@literal null}. + * @param timeout + * @param unit must not be {@literal null}. + * @return can be {@literal null}. + * @see Redis Documentation: BZPOPMAX + * @since 2.6 + */ + @Nullable + StringTuple bZPopMax(String key, long timeout, TimeUnit unit); + /** * Get the size of sorted set with {@code key}. * @@ -1455,6 +1529,18 @@ default Long lPos(String key, String element) { */ Double zScore(String key, String value); + /** + * Get the scores of elements with {@code values} from sorted set with key {@code key}. + * + * @param key must not be {@literal null}. + * @param values the values. + * @return {@literal null} when used in pipeline / transaction. + * @see Redis Documentation: ZMSCORE + * @see RedisZSetCommands#zMScore(byte[], byte[][]) + * @since 2.6 + */ + List zMScore(String key, String... values); + /** * Remove elements in range between {@code start} and {@code end} from sorted set with {@code key}. * diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterZSetCommands.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterZSetCommands.java index 53df378987..3334a45dc8 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterZSetCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterZSetCommands.java @@ -18,11 +18,14 @@ import redis.clients.jedis.ScanParams; import redis.clients.jedis.ZParams; +import java.util.List; import java.util.Set; +import java.util.concurrent.TimeUnit; import org.springframework.dao.DataAccessException; import org.springframework.dao.InvalidDataAccessApiUsageException; import org.springframework.data.redis.connection.ClusterSlotHashUtil; +import org.springframework.data.redis.connection.DefaultTuple; import org.springframework.data.redis.connection.RedisZSetCommands; import org.springframework.data.redis.connection.convert.SetConverter; import org.springframework.data.redis.core.Cursor; @@ -30,6 +33,7 @@ import org.springframework.data.redis.core.ScanIteration; import org.springframework.data.redis.core.ScanOptions; import org.springframework.data.redis.util.ByteUtils; +import org.springframework.lang.Nullable; import org.springframework.util.Assert; /** @@ -280,6 +284,112 @@ public Long zLexCount(byte[] key, Range range) { } } + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisZSetCommands#zPopMin(byte[]) + */ + @Nullable + @Override + public Tuple zPopMin(byte[] key) { + + Assert.notNull(key, "Key must not be null!"); + + try { + redis.clients.jedis.Tuple tuple = connection.getCluster().zpopmin(key); + return tuple != null ? JedisConverters.toTuple(tuple) : null; + } catch (Exception ex) { + throw convertJedisAccessException(ex); + } + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisZSetCommands#zPopMin(byte[], long) + */ + @Nullable + @Override + public Set zPopMin(byte[] key, long count) { + + Assert.notNull(key, "Key must not be null!"); + + try { + return toTupleSet(connection.getCluster().zpopmin(key, Math.toIntExact(count))); + } catch (Exception ex) { + throw convertJedisAccessException(ex); + } + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisZSetCommands#bZPopMin(byte[], long, java.util.concurrent.TimeUnit) + */ + @Nullable + @Override + public Tuple bZPopMin(byte[] key, long timeout, TimeUnit unit) { + + Assert.notNull(key, "Key must not be null!"); + Assert.notNull(unit, "TimeUnit must not be null!"); + + try { + return toTuple(connection.getCluster().bzpopmin(JedisConverters.toSeconds(timeout, unit), key)); + } catch (Exception ex) { + throw convertJedisAccessException(ex); + } + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisZSetCommands#zPopMax(byte[]) + */ + @Nullable + @Override + public Tuple zPopMax(byte[] key) { + + Assert.notNull(key, "Key must not be null!"); + + try { + redis.clients.jedis.Tuple tuple = connection.getCluster().zpopmax(key); + return tuple != null ? JedisConverters.toTuple(tuple) : null; + } catch (Exception ex) { + throw convertJedisAccessException(ex); + } + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisZSetCommands#zPopMax(byte[], long) + */ + @Nullable + @Override + public Set zPopMax(byte[] key, long count) { + + Assert.notNull(key, "Key must not be null!"); + + try { + return toTupleSet(connection.getCluster().zpopmax(key, Math.toIntExact(count))); + } catch (Exception ex) { + throw convertJedisAccessException(ex); + } + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisZSetCommands#bZPopMax(byte[], long, java.util.concurrent.TimeUnit) + */ + @Nullable + @Override + public Tuple bZPopMax(byte[] key, long timeout, TimeUnit unit) { + + Assert.notNull(key, "Key must not be null!"); + Assert.notNull(unit, "TimeUnit must not be null!"); + + try { + return toTuple(connection.getCluster().bzpopmax(JedisConverters.toSeconds(timeout, unit), key)); + } catch (Exception ex) { + throw convertJedisAccessException(ex); + } + } + /* * (non-Javadoc) * @see org.springframework.data.redis.connection.RedisZSetCommands#zRemRangeByScore(byte[], org.springframework.data.redis.connection.RedisZSetCommands.Range) @@ -637,6 +747,23 @@ public Double zScore(byte[] key, byte[] value) { } } + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisZSetCommands#zMScore(byte[], byte[][]) + */ + @Override + public List zMScore(byte[] key, byte[][] values) { + + Assert.notNull(key, "Key must not be null!"); + Assert.notNull(values, "Values must not be null!"); + + try { + return connection.getCluster().zmscore(key, values); + } catch (Exception ex) { + throw convertJedisAccessException(ex); + } + } + /* * (non-Javadoc) * @see org.springframework.data.redis.connection.RedisZSetCommands#zRemRange(byte[], long, long) @@ -846,4 +973,21 @@ private static Set toTupleSet(Set source) { return TUPLE_SET_CONVERTER.convert(source); } + /** + * Workaround for broken Jedis BZPOP signature. + * + * @param bytes + * @return + */ + @Nullable + @SuppressWarnings("unchecked") + private static Tuple toTuple(List bytes) { + + if (bytes.isEmpty()) { + return null; + } + + return new DefaultTuple((byte[]) bytes.get(1), Double.parseDouble(new String((byte[]) bytes.get(2)))); + } + } diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisConverters.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisConverters.java index 54a129eb5c..a90cac7f2f 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisConverters.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisConverters.java @@ -758,6 +758,26 @@ public static GeoRadiusParam toGeoRadiusParam(GeoRadiusCommandArgs source) { return param; } + /** + * Convert a timeout to seconds using {@code double} representation including fraction of seconds. + * + * @param timeout + * @param unit + * @return + * @since 2.6 + */ + static double toSeconds(long timeout, TimeUnit unit) { + + switch (unit) { + case MILLISECONDS: + case MICROSECONDS: + case NANOSECONDS: + return unit.toMillis(timeout) / 1000d; + default: + return unit.toSeconds(timeout); + } + } + /** * Convert given {@link BitFieldSubCommands} into argument array. * diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisZSetCommands.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisZSetCommands.java index 2f6b4f67e7..338b5c173b 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisZSetCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisZSetCommands.java @@ -21,18 +21,20 @@ import redis.clients.jedis.ScanParams; import redis.clients.jedis.ScanResult; import redis.clients.jedis.ZParams; -import redis.clients.jedis.params.ZAddParams; import java.nio.charset.StandardCharsets; import java.util.LinkedHashSet; +import java.util.List; import java.util.Set; +import java.util.concurrent.TimeUnit; +import org.springframework.data.redis.connection.DefaultTuple; import org.springframework.data.redis.connection.RedisZSetCommands; -import org.springframework.data.redis.connection.RedisZSetCommands.ZAddArgs.Flag; import org.springframework.data.redis.core.Cursor; import org.springframework.data.redis.core.KeyBoundCursor; import org.springframework.data.redis.core.ScanIteration; import org.springframework.data.redis.core.ScanOptions; +import org.springframework.lang.Nullable; import org.springframework.util.Assert; /** @@ -298,6 +300,96 @@ public Long zLexCount(byte[] key, Range range) { return connection.invoke().just(BinaryJedis::zlexcount, MultiKeyPipelineBase::zlexcount, key, min, max); } + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisZSetCommands#zPopMin(byte[]) + */ + @Nullable + @Override + public Tuple zPopMin(byte[] key) { + + Assert.notNull(key, "Key must not be null!"); + + return connection.invoke().from(BinaryJedis::zpopmin, MultiKeyPipelineBase::zpopmin, key) + .get(JedisConverters::toTuple); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisZSetCommands#zPopMin(byte[], long) + */ + @Nullable + @Override + public Set zPopMin(byte[] key, long count) { + + Assert.notNull(key, "Key must not be null!"); + + return connection.invoke() + .fromMany(BinaryJedis::zpopmin, MultiKeyPipelineBase::zpopmin, key, Math.toIntExact(count)) + .toSet(JedisConverters::toTuple); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisZSetCommands#bZPopMin(byte[], long, java.util.concurrent.TimeUnit) + */ + @Nullable + @Override + public Tuple bZPopMin(byte[] key, long timeout, TimeUnit unit) { + + Assert.notNull(key, "Key must not be null!"); + Assert.notNull(unit, "TimeUnit must not be null!"); + + return connection.invoke() + .from(BinaryJedis::bzpopmin, MultiKeyPipelineBase::bzpopmin, JedisConverters.toSeconds(timeout, unit), key) + .get(JedisZSetCommands::toTuple); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisZSetCommands#zPopMax(byte[]) + */ + @Nullable + @Override + public Tuple zPopMax(byte[] key) { + + Assert.notNull(key, "Key must not be null!"); + + return connection.invoke().from(BinaryJedis::zpopmax, MultiKeyPipelineBase::zpopmax, key) + .get(JedisConverters::toTuple); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisZSetCommands#zPopMax(byte[], long) + */ + @Nullable + @Override + public Set zPopMax(byte[] key, long count) { + + Assert.notNull(key, "Key must not be null!"); + + return connection.invoke() + .fromMany(BinaryJedis::zpopmax, MultiKeyPipelineBase::zpopmax, key, Math.toIntExact(count)) + .toSet(JedisConverters::toTuple); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisZSetCommands#bZPopMax(byte[], long, java.util.concurrent.TimeUnit) + */ + @Nullable + @Override + public Tuple bZPopMax(byte[] key, long timeout, TimeUnit unit) { + + Assert.notNull(key, "Key must not be null!"); + Assert.notNull(unit, "TimeUnit must not be null!"); + + return connection.invoke() + .from(BinaryJedis::bzpopmax, MultiKeyPipelineBase::bzpopmax, JedisConverters.toSeconds(timeout, unit), key) + .get(JedisZSetCommands::toTuple); + } + /* * (non-Javadoc) * @see org.springframework.data.redis.connection.RedisZSetCommands#zCard(byte[]) @@ -323,6 +415,19 @@ public Double zScore(byte[] key, byte[] value) { return connection.invoke().just(BinaryJedis::zscore, MultiKeyPipelineBase::zscore, key, value); } + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisZSetCommands#zScore(byte[], byte[][]) + */ + @Override + public List zMScore(byte[] key, byte[][] values) { + + Assert.notNull(key, "Key must not be null!"); + Assert.notNull(values, "Value must not be null!"); + + return connection.invoke().just(BinaryJedis::zmscore, MultiKeyPipelineBase::zmscore, key, values); + } + /* * (non-Javadoc) * @see org.springframework.data.redis.connection.RedisZSetCommands#zRemRange(byte[], long, long) @@ -590,4 +695,21 @@ private boolean isQueueing() { return connection.isQueueing(); } + /** + * Workaround for broken Jedis BZPOP signature. + * + * @param bytes + * @return + */ + @Nullable + @SuppressWarnings("unchecked") + private static Tuple toTuple(List bytes) { + + if (bytes.isEmpty()) { + return null; + } + + return new DefaultTuple((byte[]) bytes.get(1), Double.parseDouble(new String((byte[]) bytes.get(2)))); + } + } diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnection.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnection.java index edac71ad27..238a8fab70 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnection.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnection.java @@ -936,7 +936,7 @@ void transaction(FutureResult result) { RedisClusterAsyncCommands getAsyncConnection() { - if (isQueueing()) { + if (isQueueing() || isPipelined()) { return getAsyncDedicatedConnection(); } @@ -1209,6 +1209,9 @@ static class TypeHints { COMMAND_OUTPUT_TYPE_MAPPING.put(ZINCRBY, DoubleOutput.class); COMMAND_OUTPUT_TYPE_MAPPING.put(ZSCORE, DoubleOutput.class); + // DOUBLE LIST + COMMAND_OUTPUT_TYPE_MAPPING.put(ZMSCORE, DoubleListOutput.class); + // MAP COMMAND_OUTPUT_TYPE_MAPPING.put(HGETALL, MapOutput.class); @@ -1281,6 +1284,7 @@ static class TypeHints { COMMAND_OUTPUT_TYPE_MAPPING.put(HSET, BooleanOutput.class); COMMAND_OUTPUT_TYPE_MAPPING.put(HSETNX, BooleanOutput.class); COMMAND_OUTPUT_TYPE_MAPPING.put(MOVE, BooleanOutput.class); + COMMAND_OUTPUT_TYPE_MAPPING.put(COPY, BooleanOutput.class); COMMAND_OUTPUT_TYPE_MAPPING.put(MSETNX, BooleanOutput.class); COMMAND_OUTPUT_TYPE_MAPPING.put(PERSIST, BooleanOutput.class); COMMAND_OUTPUT_TYPE_MAPPING.put(PEXPIRE, BooleanOutput.class); diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConverters.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConverters.java index aa86703ee3..f986127d21 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConverters.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConverters.java @@ -247,7 +247,8 @@ public static Set toTupleSet(@Nullable List> source) } public static Tuple toTuple(@Nullable ScoredValue source) { - return source != null ? new DefaultTuple(source.getValue(), Double.valueOf(source.getScore())) : null; + return source != null && source.hasValue() ? new DefaultTuple(source.getValue(), Double.valueOf(source.getScore())) + : null; } public static String toString(@Nullable byte[] source) { diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveZSetCommands.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveZSetCommands.java index a46f061d07..e9f07fee7e 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveZSetCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveZSetCommands.java @@ -18,6 +18,7 @@ import io.lettuce.core.Range; import io.lettuce.core.ScanStream; import io.lettuce.core.ScoredValue; +import io.lettuce.core.Value; import io.lettuce.core.ZAddArgs; import io.lettuce.core.ZStoreArgs; import reactor.core.publisher.Flux; @@ -25,13 +26,16 @@ import java.nio.ByteBuffer; import java.util.List; +import java.util.concurrent.TimeUnit; import org.reactivestreams.Publisher; + import org.springframework.data.domain.Sort.Direction; import org.springframework.data.redis.connection.DefaultTuple; import org.springframework.data.redis.connection.ReactiveRedisConnection.CommandResponse; import org.springframework.data.redis.connection.ReactiveRedisConnection.KeyCommand; import org.springframework.data.redis.connection.ReactiveRedisConnection.KeyScanCommand; +import org.springframework.data.redis.connection.ReactiveRedisConnection.MultiValueResponse; import org.springframework.data.redis.connection.ReactiveRedisConnection.NumericResponse; import org.springframework.data.redis.connection.ReactiveZSetCommands; import org.springframework.data.redis.connection.RedisZSetCommands.Aggregate; @@ -197,21 +201,21 @@ public Flux>> zRange(Publisher new DefaultTuple(getBytes(sc), sc.getScore())); + .map(this::toTuple); } else { result = cmd.zrange(command.getKey(), start, stop) - .map(value -> new DefaultTuple(ByteUtils.getBytes(value), Double.NaN)); + .map(value -> toTuple(value, Double.NaN)); } } else { if (command.isWithScores()) { result = cmd.zrevrangeWithScores(command.getKey(), start, stop) - .map(sc -> new DefaultTuple(getBytes(sc), sc.getScore())); + .map(this::toTuple); } else { result = cmd.zrevrange(command.getKey(), start, stop) - .map(value -> new DefaultTuple(ByteUtils.getBytes(value), Double.NaN)); + .map(value -> toTuple(value, Double.NaN)); } } @@ -244,21 +248,21 @@ public Flux>> zRangeByScore( if (!isLimited) { result = cmd.zrangebyscoreWithScores(command.getKey(), range) - .map(sc -> new DefaultTuple(ByteUtils.getBytes(sc.getValue()), sc.getScore())); + .map(this::toTuple); } else { result = cmd .zrangebyscoreWithScores(command.getKey(), range, LettuceConverters.toLimit(command.getLimit().get())) - .map(sc -> new DefaultTuple(ByteUtils.getBytes(sc.getValue()), sc.getScore())); + .map(this::toTuple); } } else { if (!isLimited) { result = cmd.zrangebyscore(command.getKey(), range) - .map(value -> new DefaultTuple(ByteUtils.getBytes(value), Double.NaN)); + .map(value -> toTuple(value, Double.NaN)); } else { result = cmd.zrangebyscore(command.getKey(), range, LettuceConverters.toLimit(command.getLimit().get())) - .map(value -> new DefaultTuple(ByteUtils.getBytes(value), Double.NaN)); + .map(value -> toTuple(value, Double.NaN)); } } } else { @@ -269,23 +273,23 @@ public Flux>> zRangeByScore( if (!isLimited) { result = cmd.zrevrangebyscoreWithScores(command.getKey(), range) - .map(sc -> new DefaultTuple(ByteUtils.getBytes(sc.getValue()), sc.getScore())); + .map(this::toTuple); } else { result = cmd .zrevrangebyscoreWithScores(command.getKey(), range, LettuceConverters.toLimit(command.getLimit().get())) - .map(sc -> new DefaultTuple(ByteUtils.getBytes(sc.getValue()), sc.getScore())); + .map(this::toTuple); } } else { if (!isLimited) { result = cmd.zrevrangebyscore(command.getKey(), range) - .map(value -> new DefaultTuple(ByteUtils.getBytes(value), Double.NaN)); + .map(value -> toTuple(value, Double.NaN)); } else { result = cmd.zrevrangebyscore(command.getKey(), range, LettuceConverters.toLimit(command.getLimit().get())) - .map(value -> new DefaultTuple(ByteUtils.getBytes(value), Double.NaN)); + .map(value -> toTuple(value, Double.NaN)); } } } @@ -307,7 +311,7 @@ public Flux>> zScan(Publisher result = ScanStream.zscan(cmd, command.getKey(), LettuceConverters.toScanArgs(command.getOptions())) - .map(it -> new DefaultTuple(ByteUtils.getBytes(it.getValue()), it.getScore())); + .map(this::toTuple); return Mono.just(new CommandResponse<>(command, result)); })); @@ -350,6 +354,63 @@ public Flux> zLexCount(Publisher>> zPop(Publisher commands) { + + return connection.execute(cmd -> Flux.from(commands).map(command -> { + + Assert.notNull(command.getKey(), "Key must not be null!"); + + Flux> result; + if (command.getCount() > 1) { + result = command.getDirection() == PopDirection.MIN ? cmd.zpopmin(command.getKey(), command.getCount()) + : cmd.zpopmax(command.getKey(), command.getCount()); + } else { + result = (command.getDirection() == PopDirection.MIN ? cmd.zpopmin(command.getKey()) + : cmd.zpopmax(command.getKey())).flux(); + } + + return new CommandResponse<>(command, result.filter(Value::hasValue).map(this::toTuple)); + })); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.ReactiveZSetCommands#bZPop(org.reactivestreams.Publisher) + */ + @Override + public Flux>> bZPop(Publisher commands) { + + return connection.execute(cmd -> Flux.from(commands).map(command -> { + + Assert.notNull(command.getKey(), "Key must not be null!"); + Assert.notNull(command.getTimeout(), "Timeout must not be null!"); + + if(command.getTimeUnit() == TimeUnit.MILLISECONDS) { + + double timeout = preciseTimeout(command.getTimeout(), command.getTimeUnit()); + + Mono> result = (command.getDirection() == PopDirection.MIN + ? cmd.bzpopmin(timeout, command.getKey()) + : cmd.bzpopmax(timeout, command.getKey())).filter(Value::hasValue).map(Value::getValue); + + return new CommandResponse<>(command, result.filter(Value::hasValue).map(this::toTuple).flux()); + } + + long timeout = command.getTimeUnit().toSeconds(command.getTimeout()); + + Mono> result = (command.getDirection() == PopDirection.MIN + ? cmd.bzpopmin(timeout, command.getKey()) + : cmd.bzpopmax(timeout, command.getKey())).filter(Value::hasValue).map(Value::getValue); + + return new CommandResponse<>(command, result.filter(Value::hasValue).map(this::toTuple).flux()); + })); + } + /* * (non-Javadoc) * @see org.springframework.data.redis.connection.ReactiveZSetCommands#zCard(org.reactivestreams.Publisher) @@ -381,6 +442,23 @@ public Flux> zScore(Publisher> zMScore(Publisher commands) { + + return connection.execute(cmd -> Flux.from(commands).concatMap(command -> { + + Assert.notNull(command.getKey(), "Key must not be null!"); + Assert.notNull(command.getValues(), "Values must not be null!"); + + return cmd.zmscore(command.getKey(), command.getValues().toArray(new ByteBuffer[0])) + .map(value -> new MultiValueResponse<>(command, value)); + })); + } + /* * (non-Javadoc) * @see org.springframework.data.redis.connection.ReactiveZSetCommands#zRemRangeByRank(org.reactivestreams.Publisher) @@ -549,8 +627,16 @@ private static ZStoreArgs zStoreArgs(@Nullable Aggregate aggregate, @Nullable Li return args; } - private static byte[] getBytes(ScoredValue scoredValue) { - return scoredValue.optional().map(ByteUtils::getBytes).orElse(new byte[0]); + private Tuple toTuple(ScoredValue scoredValue) { + return scoredValue.map(it -> new DefaultTuple(ByteUtils.getBytes(it), scoredValue.getScore())).getValue(); + } + + private Tuple toTuple(ByteBuffer value, double score) { + return new DefaultTuple(ByteUtils.getBytes(value), score); + } + + static double preciseTimeout(long val, TimeUnit unit) { + return (double) unit.toMillis(val) / 1000.0D; } protected LettuceReactiveRedisConnection getConnection() { diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceZSetCommands.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceZSetCommands.java index 7376860d81..e245ccb203 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceZSetCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceZSetCommands.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Set; +import java.util.concurrent.TimeUnit; import org.springframework.data.redis.connection.RedisZSetCommands; import org.springframework.data.redis.connection.RedisZSetCommands.ZAddArgs.Flag; @@ -277,6 +278,106 @@ public Long zLexCount(byte[] key, Range range) { LettuceConverters. toRange(range, true)); } + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisZSetCommands#zPopMin(byte[]) + */ + @Nullable + @Override + public Tuple zPopMin(byte[] key) { + + Assert.notNull(key, "Key must not be null!"); + + return connection.invoke().from(RedisSortedSetAsyncCommands::zpopmin, key).get(LettuceConverters::toTuple); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisZSetCommands#zPopMin(byte[], long) + */ + @Nullable + @Override + public Set zPopMin(byte[] key, long count) { + + Assert.notNull(key, "Key must not be null!"); + + return connection.invoke().fromMany(RedisSortedSetAsyncCommands::zpopmin, key, count) + .toSet(LettuceConverters::toTuple); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisZSetCommands#bZPopMin(byte[], long, java.util.concurrent.TimeUnit) + */ + @Nullable + @Override + public Tuple bZPopMin(byte[] key, long timeout, TimeUnit unit) { + + Assert.notNull(key, "Key must not be null!"); + Assert.notNull(unit, "TimeUnit must not be null!"); + + if(TimeUnit.MILLISECONDS == unit) { + + return connection.invoke(connection.getAsyncDedicatedConnection()) + .from(RedisSortedSetAsyncCommands::bzpopmin, preciseTimeout(timeout, unit), key) + .get(it -> it.map(LettuceConverters::toTuple).getValueOrElse(null)); + } + + return connection.invoke(connection.getAsyncDedicatedConnection()) + .from(RedisSortedSetAsyncCommands::bzpopmin, unit.toSeconds(timeout), key) + .get(it -> it.map(LettuceConverters::toTuple).getValueOrElse(null)); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisZSetCommands#zPopMax(byte[]) + */ + @Nullable + @Override + public Tuple zPopMax(byte[] key) { + + Assert.notNull(key, "Key must not be null!"); + + return connection.invoke().from(RedisSortedSetAsyncCommands::zpopmax, key).get(LettuceConverters::toTuple); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisZSetCommands#zPopMax(byte[], long) + */ + @Nullable + @Override + public Set zPopMax(byte[] key, long count) { + + Assert.notNull(key, "Key must not be null!"); + + return connection.invoke().fromMany(RedisSortedSetAsyncCommands::zpopmax, key, count) + .toSet(LettuceConverters::toTuple); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisZSetCommands#bZPopMax(byte[], long, java.util.concurrent.TimeUnit) + */ + @Nullable + @Override + public Tuple bZPopMax(byte[] key, long timeout, TimeUnit unit) { + + Assert.notNull(key, "Key must not be null!"); + Assert.notNull(unit, "TimeUnit must not be null!"); + + if(TimeUnit.MILLISECONDS == unit) { + + return connection.invoke(connection.getAsyncDedicatedConnection()) + .from(RedisSortedSetAsyncCommands::bzpopmax, preciseTimeout(timeout, unit), key) + .get(it -> it.map(LettuceConverters::toTuple).getValueOrElse(null)); + } + + return connection.invoke(connection.getAsyncDedicatedConnection()) + .from(RedisSortedSetAsyncCommands::bzpopmax, unit.toSeconds(timeout), key) + .get(it -> it.map(LettuceConverters::toTuple).getValueOrElse(null)); + } + /* * (non-Javadoc) * @see org.springframework.data.redis.connection.RedisZSetCommands#zCard(byte[]) @@ -302,6 +403,19 @@ public Double zScore(byte[] key, byte[] value) { return connection.invoke().just(RedisSortedSetAsyncCommands::zscore, key, value); } + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisZSetCommands#zMScore(byte[], byte[][]) + */ + @Override + public List zMScore(byte[] key, byte[][] values) { + + Assert.notNull(key, "Key must not be null!"); + Assert.notNull(values, "Value must not be null!"); + + return connection.invoke().just(RedisSortedSetAsyncCommands::zmscore, key, values); + } + /* * (non-Javadoc) * @see org.springframework.data.redis.connection.RedisZSetCommands#zRemRange(byte[], long, long) @@ -603,4 +717,7 @@ private static io.lettuce.core.ZAddArgs toZAddArgs(ZAddArgs source) { return target; } + static double preciseTimeout(long val, TimeUnit unit) { + return (double) unit.toMillis(val) / 1000.0D; + } } diff --git a/src/main/java/org/springframework/data/redis/core/AbstractOperations.java b/src/main/java/org/springframework/data/redis/core/AbstractOperations.java index 53048d9b4a..4cd8a942dc 100644 --- a/src/main/java/org/springframework/data/redis/core/AbstractOperations.java +++ b/src/main/java/org/springframework/data/redis/core/AbstractOperations.java @@ -223,7 +223,8 @@ Set deserializeValues(Set rawValues) { return SerializationUtils.deserialize(rawValues, valueSerializer()); } - Set> deserializeTupleValues(Collection rawValues) { + @Nullable + Set> deserializeTupleValues(@Nullable Collection rawValues) { if (rawValues == null) { return null; } @@ -235,7 +236,11 @@ Set> deserializeTupleValues(Collection rawValues) { } @SuppressWarnings({ "unchecked", "rawtypes" }) - TypedTuple deserializeTuple(Tuple tuple) { + @Nullable + TypedTuple deserializeTuple(@Nullable Tuple tuple) { + if (tuple == null) { + return null; + } Object value = tuple.getValue(); if (valueSerializer() != null) { value = valueSerializer().deserialize(tuple.getValue()); diff --git a/src/main/java/org/springframework/data/redis/core/BoundZSetOperations.java b/src/main/java/org/springframework/data/redis/core/BoundZSetOperations.java index 8ff3df2eee..82c694af56 100644 --- a/src/main/java/org/springframework/data/redis/core/BoundZSetOperations.java +++ b/src/main/java/org/springframework/data/redis/core/BoundZSetOperations.java @@ -15,8 +15,11 @@ */ package org.springframework.data.redis.core; +import java.time.Duration; import java.util.Collection; +import java.util.List; import java.util.Set; +import java.util.concurrent.TimeUnit; import org.springframework.data.redis.connection.RedisZSetCommands.Aggregate; import org.springframework.data.redis.connection.RedisZSetCommands.Limit; @@ -25,6 +28,7 @@ import org.springframework.data.redis.connection.RedisZSetCommands.Weights; import org.springframework.data.redis.core.ZSetOperations.TypedTuple; import org.springframework.lang.Nullable; +import org.springframework.util.Assert; /** * ZSet (or SortedSet) operations bound to a certain key. @@ -234,6 +238,112 @@ public interface BoundZSetOperations extends BoundKeyOperations { @Nullable Long lexCount(Range range); + /** + * Remove and return the value with its score having the lowest score from sorted set at the bound key. + * + * @return {@literal null} when the sorted set is empty or used in pipeline / transaction. + * @see Redis Documentation: ZPOPMIN + * @since 2.6 + */ + @Nullable + TypedTuple popMin(); + + /** + * Remove and return {@code count} values with their score having the lowest score from sorted set at the bound key. + * + * @param count number of elements to pop. + * @return {@literal null} when the sorted set is empty or used in pipeline / transaction. + * @see Redis Documentation: ZPOPMIN + * @since 2.6 + */ + @Nullable + Set> popMin(long count); + + /** + * Remove and return the value with its score having the lowest score from sorted set at the bound key. Blocks + * connection until element available or {@code timeout} reached. + * + * @param timeout + * @param unit must not be {@literal null}. + * @return can be {@literal null}. + * @see Redis Documentation: BZPOPMIN + * @since 2.6 + */ + @Nullable + TypedTuple popMin(long timeout, TimeUnit unit); + + /** + * Remove and return the value with its score having the lowest score from sorted set at the bound key. Blocks + * connection until element available or {@code timeout} reached. + * + * @param timeout must not be {@literal null}. + * @return can be {@literal null}. + * @throws IllegalArgumentException if the timeout is {@literal null} or negative. + * @see Redis Documentation: BZPOPMIN + * @since 2.6 + */ + @Nullable + default TypedTuple popMin(Duration timeout) { + + Assert.notNull(timeout, "Timeout must not be null"); + Assert.isTrue(!timeout.isNegative(), "Timeout must not be negative"); + + return popMin(TimeoutUtils.toSeconds(timeout), TimeUnit.SECONDS); + } + + /** + * Remove and return the value with its score having the highest score from sorted set at the bound key. + * + * @return {@literal null} when the sorted set is empty or used in pipeline / transaction. + * @see Redis Documentation: ZPOPMAX + * @since 2.6 + */ + @Nullable + TypedTuple popMax(); + + /** + * Remove and return {@code count} values with their score having the highest score from sorted set at the bound key. + * + * @param count number of elements to pop. + * @return {@literal null} when the sorted set is empty or used in pipeline / transaction. + * @see Redis Documentation: ZPOPMAX + * @since 2.6 + */ + @Nullable + Set> popMax(long count); + + /** + * Remove and return the value with its score having the highest score from sorted set at the bound key. Blocks + * connection until element available or {@code timeout} reached. + * + * @param timeout + * @param unit must not be {@literal null}. + * @return can be {@literal null}. + * @see Redis Documentation: BZPOPMAX + * @since 2.6 + */ + @Nullable + TypedTuple popMax(long timeout, TimeUnit unit); + + /** + * Remove and return the value with its score having the highest score from sorted set at the bound key. Blocks + * connection until element available or {@code timeout} reached. + * + * @param timeout must not be {@literal null}. + * @return can be {@literal null}. + * @throws IllegalArgumentException if the timeout is {@literal null} or negative. + * @see Redis Documentation: BZPOPMAX + * @since 2.6 + */ + @Nullable + default TypedTuple popMax(Duration timeout) { + + Assert.notNull(timeout, "Timeout must not be null"); + Assert.isTrue(!timeout.isNegative(), "Timeout must not be negative"); + + return popMax(TimeoutUtils.toSeconds(timeout), TimeUnit.SECONDS); + } + /** * Returns the number of elements of the sorted set stored with given the bound key. * @@ -264,6 +374,17 @@ public interface BoundZSetOperations extends BoundKeyOperations { @Nullable Double score(Object o); + /** + * Get the scores of elements with {@code values} from sorted set with key the bound key. + * + * @param o the values. + * @return {@literal null} when used in pipeline / transaction. + * @see Redis Documentation: ZMSCORE + * @since 2.6 + */ + @Nullable + List score(Object... o); + /** * Remove elements in range between {@code start} and {@code end} from sorted set with the bound key. * diff --git a/src/main/java/org/springframework/data/redis/core/DefaultBoundZSetOperations.java b/src/main/java/org/springframework/data/redis/core/DefaultBoundZSetOperations.java index f9a4a42bee..848c4318f4 100644 --- a/src/main/java/org/springframework/data/redis/core/DefaultBoundZSetOperations.java +++ b/src/main/java/org/springframework/data/redis/core/DefaultBoundZSetOperations.java @@ -17,7 +17,9 @@ package org.springframework.data.redis.core; import java.util.Collection; +import java.util.List; import java.util.Set; +import java.util.concurrent.TimeUnit; import org.springframework.data.redis.connection.DataType; import org.springframework.data.redis.connection.RedisZSetCommands.Aggregate; @@ -25,6 +27,7 @@ import org.springframework.data.redis.connection.RedisZSetCommands.Range; import org.springframework.data.redis.connection.RedisZSetCommands.Weights; import org.springframework.data.redis.core.ZSetOperations.TypedTuple; +import org.springframework.lang.Nullable; /** * Default implementation for {@link BoundZSetOperations}. @@ -249,6 +252,15 @@ public Double score(Object o) { return ops.score(getKey(), o); } + /* + * (non-Javadoc) + * @see org.springframework.data.redis.core.BoundZSetOperations#score(java.lang.Object[]) + */ + @Override + public List score(Object... o) { + return ops.score(getKey(), o); + } + /* * (non-Javadoc) * @see org.springframework.data.redis.core.BoundZSetOperations#remove(java.lang.Object[]) @@ -312,6 +324,66 @@ public Long lexCount(Range range) { return ops.lexCount(getKey(), range); } + /* + * (non-Javadoc) + * @see org.springframework.data.redis.core.BoundZSetOperations#popMin() + */ + @Nullable + @Override + public TypedTuple popMin() { + return ops.popMin(getKey()); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.core.BoundZSetOperations#popMin(long) + */ + @Nullable + @Override + public Set> popMin(long count) { + return ops.popMin(getKey(), count); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.core.BoundZSetOperations#popMin(long, java.util.concurrent.TimeUnit) + */ + @Nullable + @Override + public TypedTuple popMin(long timeout, TimeUnit unit) { + return ops.popMin(getKey(), timeout, unit); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.core.BoundZSetOperations#popMax() + */ + @Nullable + @Override + public TypedTuple popMax() { + return ops.popMax(getKey()); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.core.BoundZSetOperations#popMax(long) + */ + @Nullable + @Override + public Set> popMax(long count) { + return ops.popMax(getKey(), count); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.core.BoundZSetOperations#popMax(long, java.util.concurrent.TimeUnit) + */ + @Nullable + @Override + public TypedTuple popMax(long timeout, TimeUnit unit) { + return ops.popMax(getKey(), timeout, unit); + } + /* * (non-Javadoc) * @see org.springframework.data.redis.core.BoundZSetOperations#size() diff --git a/src/main/java/org/springframework/data/redis/core/DefaultReactiveZSetOperations.java b/src/main/java/org/springframework/data/redis/core/DefaultReactiveZSetOperations.java index 0995bc96d0..234320b1ed 100644 --- a/src/main/java/org/springframework/data/redis/core/DefaultReactiveZSetOperations.java +++ b/src/main/java/org/springframework/data/redis/core/DefaultReactiveZSetOperations.java @@ -19,6 +19,7 @@ import reactor.core.publisher.Mono; import java.nio.ByteBuffer; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -26,6 +27,7 @@ import java.util.function.Function; import org.reactivestreams.Publisher; + import org.springframework.data.domain.Range; import org.springframework.data.redis.connection.DefaultTuple; import org.springframework.data.redis.connection.ReactiveZSetCommands; @@ -345,6 +347,80 @@ public Mono lexCount(K key, Range range) { return createMono(connection -> connection.zLexCount(rawKey(key), range)); } + /* + * (non-Javadoc) + * @see org.springframework.data.redis.core.ReactiveZSetOperations#popMin(java.lang.Object) + */ + @Override + public Mono> popMin(K key) { + + Assert.notNull(key, "Key must not be null!"); + + return createMono(connection -> connection.zPopMin(rawKey(key)).map(this::readTypedTuple)); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.core.ReactiveZSetOperations#popMin(java.lang.Object, long) + */ + @Override + public Flux> popMin(K key, long count) { + + Assert.notNull(key, "Key must not be null!"); + + return createFlux(connection -> connection.zPopMin(rawKey(key), count).map(this::readTypedTuple)); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.core.ReactiveZSetOperations#popMin(java.lang.Object, java.time.Duration) + */ + @Override + public Mono> popMin(K key, Duration timeout) { + + Assert.notNull(key, "Key must not be null!"); + Assert.notNull(timeout, "Timeout must not be null!"); + + return createMono(connection -> connection.bZPopMin(rawKey(key), timeout).map(this::readTypedTuple)); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.core.ReactiveZSetOperations#popMax(java.lang.Object) + */ + @Override + public Mono> popMax(K key) { + + Assert.notNull(key, "Key must not be null!"); + + return createMono(connection -> connection.zPopMax(rawKey(key)).map(this::readTypedTuple)); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.core.ReactiveZSetOperations#popMax(java.lang.Object, long) + */ + @Override + public Flux> popMax(K key, long count) { + + Assert.notNull(key, "Key must not be null!"); + + return createFlux(connection -> connection.zPopMax(rawKey(key), count).map(this::readTypedTuple)); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.core.ReactiveZSetOperations#popMax(java.lang.Object, java.time.Duration) + */ + @Override + public Mono> popMax(K key, Duration timeout) { + + Assert.notNull(key, "Key must not be null!"); + Assert.notNull(timeout, "Timeout must not be null!"); + + return createMono(connection -> connection.bZPopMax(rawKey(key), timeout).map(this::readTypedTuple)); + } + /* * (non-Javadoc) * @see org.springframework.data.redis.core.ReactiveZSetOperations#size(java.lang.Object) @@ -370,6 +446,22 @@ public Mono score(K key, Object o) { return createMono(connection -> connection.zScore(rawKey(key), rawValue((V) o))); } + /* + * (non-Javadoc) + * @see org.springframework.data.redis.core.ReactiveZSetOperations#score(java.lang.Object, java.lang.Object) + */ + @Override + @SuppressWarnings("unchecked") + public Mono> score(K key, Object... o) { + + Assert.notNull(key, "Key must not be null!"); + + return createMono(connection -> Flux.fromArray((V[]) o) // + .map(this::rawValue) // + .collectList() // + .flatMap(values -> connection.zMScore(rawKey(key), values))); + } + /* * (non-Javadoc) * @see org.springframework.data.redis.core.ReactiveZSetOperations#removeRange(java.lang.Object, org.springframework.data.domain.Range) diff --git a/src/main/java/org/springframework/data/redis/core/DefaultTypedTuple.java b/src/main/java/org/springframework/data/redis/core/DefaultTypedTuple.java index bf3ecda2c1..4bf8a9778d 100644 --- a/src/main/java/org/springframework/data/redis/core/DefaultTypedTuple.java +++ b/src/main/java/org/springframework/data/redis/core/DefaultTypedTuple.java @@ -102,4 +102,14 @@ public int compareTo(TypedTuple o) { return compareTo(o.getScore()); } + + @Override + public String toString() { + StringBuffer sb = new StringBuffer(); + sb.append(getClass().getSimpleName()); + sb.append(" [score=").append(score); + sb.append(", value=").append(value); + sb.append(']'); + return sb.toString(); + } } diff --git a/src/main/java/org/springframework/data/redis/core/DefaultZSetOperations.java b/src/main/java/org/springframework/data/redis/core/DefaultZSetOperations.java index 79ae492d80..c905088b67 100644 --- a/src/main/java/org/springframework/data/redis/core/DefaultZSetOperations.java +++ b/src/main/java/org/springframework/data/redis/core/DefaultZSetOperations.java @@ -17,7 +17,9 @@ import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.Set; +import java.util.concurrent.TimeUnit; import org.springframework.data.redis.connection.RedisZSetCommands.Aggregate; import org.springframework.data.redis.connection.RedisZSetCommands.Limit; @@ -437,6 +439,18 @@ public Double score(K key, Object o) { return execute(connection -> connection.zScore(rawKey, rawValue), true); } + /* + * (non-Javadoc) + * @see org.springframework.data.redis.core.ZSetOperations#score(java.lang.Object, java.lang.Object[]) + */ + @Override + public List score(K key, Object... o) { + + byte[] rawKey = rawKey(key); + byte[][] rawValues = rawValues(o); + return execute(connection -> connection.zMScore(rawKey, rawValues), true); + } + /* * (non-Javadoc) * @see org.springframework.data.redis.core.ZSetOperations#count(java.lang.Object, double, double) @@ -459,6 +473,78 @@ public Long lexCount(K key, Range range) { return execute(connection -> connection.zLexCount(rawKey, range), true); } + /* + * (non-Javadoc) + * @see org.springframework.data.redis.core.ZSetOperations#popMin(java.lang.Object) + */ + @Nullable + @Override + public TypedTuple popMin(K key) { + + byte[] rawKey = rawKey(key); + return deserializeTuple(execute(connection -> connection.zPopMin(rawKey), true)); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.core.ZSetOperations#popMin(java.lang.Object, long) + */ + @Nullable + @Override + public Set> popMin(K key, long count) { + + byte[] rawKey = rawKey(key); + return deserializeTupleValues(execute(connection -> connection.zPopMin(rawKey, count), true)); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.core.ZSetOperations#popMin(java.lang.Object, long, java.util.concurrent.TimeUnit) + */ + @Nullable + @Override + public TypedTuple popMin(K key, long timeout, TimeUnit unit) { + + byte[] rawKey = rawKey(key); + return deserializeTuple(execute(connection -> connection.bZPopMin(rawKey, timeout, unit), true)); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.core.ZSetOperations#popMax(java.lang.Object) + */ + @Nullable + @Override + public TypedTuple popMax(K key) { + + byte[] rawKey = rawKey(key); + return deserializeTuple(execute(connection -> connection.zPopMax(rawKey), true)); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.core.ZSetOperations#popMax(java.lang.Object, long) + */ + @Nullable + @Override + public Set> popMax(K key, long count) { + + byte[] rawKey = rawKey(key); + return deserializeTupleValues(execute(connection -> connection.zPopMax(rawKey, count), true)); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.core.ZSetOperations#popMax(java.lang.Object, long, java.util.concurrent.TimeUnit) + */ + @Nullable + @Override + public TypedTuple popMax(K key, long timeout, TimeUnit unit) { + + byte[] rawKey = rawKey(key); + return deserializeTuple(execute(connection -> connection.bZPopMax(rawKey, timeout, unit), true)); + } + /* * (non-Javadoc) * @see org.springframework.data.redis.core.ZSetOperations#size(java.lang.Object) diff --git a/src/main/java/org/springframework/data/redis/core/ReactiveZSetOperations.java b/src/main/java/org/springframework/data/redis/core/ReactiveZSetOperations.java index 1914da9664..32d994618f 100644 --- a/src/main/java/org/springframework/data/redis/core/ReactiveZSetOperations.java +++ b/src/main/java/org/springframework/data/redis/core/ReactiveZSetOperations.java @@ -18,7 +18,9 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.time.Duration; import java.util.Collection; +import java.util.List; import org.springframework.data.domain.Range; import org.springframework.data.redis.connection.RedisZSetCommands.Aggregate; @@ -281,6 +283,74 @@ default Flux> scan(K key) { */ Mono lexCount(K key, Range range); + /** + * Remove and return the value with its score having the lowest score from sorted set at {@code key}. + * + * @param key must not be {@literal null}. + * @return + * @see Redis Documentation: ZPOPMIN + * @since 2.6 + */ + Mono> popMin(K key); + + /** + * Remove and return {@code count} values with their score having the lowest score from sorted set at {@code key}. + * + * @param key must not be {@literal null}. + * @param count number of elements to pop. + * @return + * @see Redis Documentation: ZPOPMIN + * @since 2.6 + */ + Flux> popMin(K key, long count); + + /** + * Remove and return the value with its score having the lowest score from sorted set at {@code key}. + * + * @param key must not be {@literal null}. + * @param timeout maximal duration to wait until an entry in the list at {@code key} is available. Must be either + * {@link Duration#ZERO} or greater {@link 1 second}, must not be {@literal null}. A timeout of zero can be + * used to wait indefinitely. Durations between zero and one second are not supported. + * @return + * @see Redis Documentation: ZPOPMIN + * @since 2.6 + */ + Mono> popMin(K key, Duration timeout); + + /** + * Remove and return the value with its score having the highest score from sorted set at {@code key}. + * + * @param key must not be {@literal null}. + * @return + * @see Redis Documentation: ZPOPMAX + * @since 2.6 + */ + Mono> popMax(K key); + + /** + * Remove and return {@code count} values with their score having the highest score from sorted set at {@code key}. + * + * @param key must not be {@literal null}. + * @param count number of elements to pop. + * @return + * @see Redis Documentation: ZPOPMAX + * @since 2.6 + */ + Flux> popMax(K key, long count); + + /** + * Remove and return the value with its score having the highest score from sorted set at {@code key}. + * + * @param key must not be {@literal null}. + * @param timeout maximal duration to wait until an entry in the list at {@code key} is available. Must be either + * {@link Duration#ZERO} or greater {@link 1 second}, must not be {@literal null}. A timeout of zero can be + * used to wait indefinitely. Durations between zero and one second are not supported. + * @return + * @see Redis Documentation: ZPOPMIN + * @since 2.6 + */ + Mono> popMax(K key, Duration timeout); + /** * Returns the number of elements of the sorted set stored with given {@code key}. * @@ -300,6 +370,17 @@ default Flux> scan(K key) { */ Mono score(K key, Object o); + /** + * Get the scores of elements with {@code values} from sorted set with key {@code key}. + * + * @param key must not be {@literal null}. + * @param o the values. + * @return + * @see Redis Documentation: ZMSCORE + * @since 2.6 + */ + Mono> score(K key, Object... o); + /** * Remove elements in range between {@code start} and {@code end} from sorted set with {@code key}. * diff --git a/src/main/java/org/springframework/data/redis/core/ZSetOperations.java b/src/main/java/org/springframework/data/redis/core/ZSetOperations.java index 822b8a47a7..b862c45f69 100644 --- a/src/main/java/org/springframework/data/redis/core/ZSetOperations.java +++ b/src/main/java/org/springframework/data/redis/core/ZSetOperations.java @@ -15,8 +15,11 @@ */ package org.springframework.data.redis.core; +import java.time.Duration; import java.util.Collection; +import java.util.List; import java.util.Set; +import java.util.concurrent.TimeUnit; import org.springframework.data.redis.connection.RedisZSetCommands.Aggregate; import org.springframework.data.redis.connection.RedisZSetCommands.Limit; @@ -24,6 +27,7 @@ import org.springframework.data.redis.connection.RedisZSetCommands.Tuple; import org.springframework.data.redis.connection.RedisZSetCommands.Weights; import org.springframework.lang.Nullable; +import org.springframework.util.Assert; /** * Redis ZSet/sorted set specific operations. @@ -337,6 +341,120 @@ static TypedTuple of(V value, @Nullable Double score) { @Nullable Long lexCount(K key, Range range); + /** + * Remove and return the value with its score having the lowest score from sorted set at {@code key}. + * + * @param key must not be {@literal null}. + * @return {@literal null} when the sorted set is empty or used in pipeline / transaction. + * @see Redis Documentation: ZPOPMIN + * @since 2.6 + */ + @Nullable + TypedTuple popMin(K key); + + /** + * Remove and return {@code count} values with their score having the lowest score from sorted set at {@code key}. + * + * @param key must not be {@literal null}. + * @param count number of elements to pop. + * @return {@literal null} when the sorted set is empty or used in pipeline / transaction. + * @see Redis Documentation: ZPOPMIN + * @since 2.6 + */ + @Nullable + Set> popMin(K key, long count); + + /** + * Remove and return the value with its score having the lowest score from sorted set at {@code key}.
+ * Blocks connection until element available or {@code timeout} reached. + * + * @param key must not be {@literal null}. + * @param timeout + * @param unit must not be {@literal null}. + * @return can be {@literal null}. + * @see Redis Documentation: BZPOPMIN + * @since 2.6 + */ + @Nullable + TypedTuple popMin(K key, long timeout, TimeUnit unit); + + /** + * Remove and return the value with its score having the lowest score from sorted set at {@code key}.
+ * Blocks connection until element available or {@code timeout} reached. + * + * @param key must not be {@literal null}. + * @param timeout must not be {@literal null}. + * @return can be {@literal null}. + * @throws IllegalArgumentException if the timeout is {@literal null} or negative. + * @see Redis Documentation: BZPOPMIN + * @since 2.6 + */ + @Nullable + default TypedTuple popMin(K key, Duration timeout) { + + Assert.notNull(timeout, "Timeout must not be null"); + Assert.isTrue(!timeout.isNegative(), "Timeout must not be negative"); + + return popMin(key, TimeoutUtils.toSeconds(timeout), TimeUnit.SECONDS); + } + + /** + * Remove and return the value with its score having the highest score from sorted set at {@code key}. + * + * @param key must not be {@literal null}. + * @return {@literal null} when the sorted set is empty or used in pipeline / transaction. + * @see Redis Documentation: ZPOPMAX + * @since 2.6 + */ + @Nullable + TypedTuple popMax(K key); + + /** + * Remove and return {@code count} values with their score having the highest score from sorted set at {@code key}. + * + * @param key must not be {@literal null}. + * @param count number of elements to pop. + * @return {@literal null} when the sorted set is empty or used in pipeline / transaction. + * @see Redis Documentation: ZPOPMAX + * @since 2.6 + */ + @Nullable + Set> popMax(K key, long count); + + /** + * Remove and return the value with its score having the highest score from sorted set at {@code key}.
+ * Blocks connection until element available or {@code timeout} reached. + * + * @param key must not be {@literal null}. + * @param timeout + * @param unit must not be {@literal null}. + * @return can be {@literal null}. + * @see Redis Documentation: BZPOPMAX + * @since 2.6 + */ + @Nullable + TypedTuple popMax(K key, long timeout, TimeUnit unit); + + /** + * Remove and return the value with its score having the highest score from sorted set at {@code key}.
+ * Blocks connection until element available or {@code timeout} reached. + * + * @param key must not be {@literal null}. + * @param timeout must not be {@literal null}. + * @return can be {@literal null}. + * @throws IllegalArgumentException if the timeout is {@literal null} or negative. + * @see Redis Documentation: BZPOPMAX + * @since 2.6 + */ + @Nullable + default TypedTuple popMax(K key, Duration timeout) { + + Assert.notNull(timeout, "Timeout must not be null"); + Assert.isTrue(!timeout.isNegative(), "Timeout must not be negative"); + + return popMin(key, TimeoutUtils.toSeconds(timeout), TimeUnit.SECONDS); + } + /** * Returns the number of elements of the sorted set stored with given {@code key}. * @@ -370,6 +488,18 @@ static TypedTuple of(V value, @Nullable Double score) { @Nullable Double score(K key, Object o); + /** + * Get the scores of elements with {@code values} from sorted set with key {@code key}. + * + * @param key must not be {@literal null}. + * @param o the values. + * @return {@literal null} when used in pipeline / transaction. + * @see Redis Documentation: ZMSCORE + * @since 2.6 + */ + @Nullable + List score(K key, Object... o); + /** * Remove elements in range between {@code start} and {@code end} from sorted set with {@code key}. * diff --git a/src/main/java/org/springframework/data/redis/support/collections/AbstractRedisCollection.java b/src/main/java/org/springframework/data/redis/support/collections/AbstractRedisCollection.java index 4cc2e7397f..1361da4893 100644 --- a/src/main/java/org/springframework/data/redis/support/collections/AbstractRedisCollection.java +++ b/src/main/java/org/springframework/data/redis/support/collections/AbstractRedisCollection.java @@ -22,6 +22,7 @@ import org.springframework.data.redis.core.RedisOperations; import org.springframework.lang.Nullable; +import org.springframework.util.Assert; /** * Base implementation for {@link RedisCollection}. Provides a skeletal implementation. Note that the collection support @@ -47,6 +48,9 @@ public abstract class AbstractRedisCollection extends AbstractCollection i */ public AbstractRedisCollection(String key, RedisOperations operations) { + Assert.hasText(key, "Key must not be empty!"); + Assert.notNull(operations, "RedisOperations must not be null!"); + this.key = key; this.operations = operations; } @@ -217,7 +221,7 @@ public String toString() { StringBuilder sb = new StringBuilder(); - sb.append("RedisStore for key:"); + sb.append(String.format("%s for key:", getClass().getSimpleName())); sb.append(getKey()); return sb.toString(); diff --git a/src/main/java/org/springframework/data/redis/support/collections/DefaultRedisZSet.java b/src/main/java/org/springframework/data/redis/support/collections/DefaultRedisZSet.java index d9b9add8ad..9658870431 100644 --- a/src/main/java/org/springframework/data/redis/support/collections/DefaultRedisZSet.java +++ b/src/main/java/org/springframework/data/redis/support/collections/DefaultRedisZSet.java @@ -19,6 +19,7 @@ import java.util.Iterator; import java.util.NoSuchElementException; import java.util.Set; +import java.util.concurrent.TimeUnit; import org.springframework.data.redis.connection.DataType; import org.springframework.data.redis.connection.RedisZSetCommands.Limit; @@ -42,7 +43,7 @@ public class DefaultRedisZSet extends AbstractRedisCollection implements RedisZSet { private final BoundZSetOperations boundZSetOps; - private double defaultScore = 1; + private final double defaultScore; private class DefaultRedisSortedSetIterator extends RedisIterator { @@ -380,6 +381,38 @@ public E first() { throw new NoSuchElementException(); } + /* + * (non-Javadoc) + * @see org.springframework.data.redis.support.collections.RedisZSet#popFirst() + */ + @Override + public E popFirst() { + + TypedTuple tuple = boundZSetOps.popMin(); + + if (tuple != null) { + return tuple.getValue(); + } + + throw new NoSuchElementException(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.support.collections.RedisZSet#popFirst(long, java.util.concurrent.TimeUnit) + */ + @Override + public E popFirst(long timeout, TimeUnit unit) { + + TypedTuple tuple = boundZSetOps.popMin(timeout, unit); + + if (tuple != null) { + return tuple.getValue(); + } + + throw new NoSuchElementException(); + } + /* * (non-Javadoc) * @see org.springframework.data.redis.support.collections.RedisZSet#last() @@ -397,6 +430,38 @@ public E last() { throw new NoSuchElementException(); } + /* + * (non-Javadoc) + * @see org.springframework.data.redis.support.collections.RedisZSet#popLast() + */ + @Override + public E popLast() { + + TypedTuple tuple = boundZSetOps.popMax(); + + if (tuple != null) { + return tuple.getValue(); + } + + throw new NoSuchElementException(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.support.collections.RedisZSet#popLast(long, java.util.concurrent.TimeUnit) + */ + @Override + public E popLast(long timeout, TimeUnit unit) { + + TypedTuple tuple = boundZSetOps.popMax(timeout, unit); + + if (tuple != null) { + return tuple.getValue(); + } + + throw new NoSuchElementException(); + } + /* * (non-Javadoc) * @see org.springframework.data.redis.support.collections.RedisZSet#rank(java.lang.Object) diff --git a/src/main/java/org/springframework/data/redis/support/collections/RedisCollectionFactoryBean.java b/src/main/java/org/springframework/data/redis/support/collections/RedisCollectionFactoryBean.java index bb352dc852..bf3df48096 100644 --- a/src/main/java/org/springframework/data/redis/support/collections/RedisCollectionFactoryBean.java +++ b/src/main/java/org/springframework/data/redis/support/collections/RedisCollectionFactoryBean.java @@ -112,7 +112,7 @@ private RedisStore createStore(DataType dt) { return new DefaultRedisSet(key, template); case ZSET: - return new DefaultRedisZSet(key, template); + return RedisZSet.create(key, template); case HASH: if (CollectionType.PROPERTIES.equals(type)) { diff --git a/src/main/java/org/springframework/data/redis/support/collections/RedisZSet.java b/src/main/java/org/springframework/data/redis/support/collections/RedisZSet.java index 86fc8646ef..4750ab6bc3 100644 --- a/src/main/java/org/springframework/data/redis/support/collections/RedisZSet.java +++ b/src/main/java/org/springframework/data/redis/support/collections/RedisZSet.java @@ -21,10 +21,13 @@ import java.util.NoSuchElementException; import java.util.Set; import java.util.SortedSet; +import java.util.concurrent.TimeUnit; +import org.springframework.data.redis.connection.RedisZSetCommands; import org.springframework.data.redis.connection.RedisZSetCommands.Limit; import org.springframework.data.redis.connection.RedisZSetCommands.Range; import org.springframework.data.redis.core.BoundZSetOperations; +import org.springframework.data.redis.core.RedisOperations; import org.springframework.data.redis.core.ZSetOperations.TypedTuple; /** @@ -40,16 +43,85 @@ */ public interface RedisZSet extends RedisCollection, Set { + /** + * Constructs a new {@link RedisZSet} instance with a default score of {@literal 1}. + * + * @param key Redis key of this set. + * @param operations {@link RedisOperations} for the value type of this set. + * @since 2.6 + */ + static RedisZSet create(String key, RedisOperations operations) { + return new DefaultRedisZSet<>(key, operations, 1); + } + + /** + * Constructs a new {@link RedisZSet} instance. + * + * @param key Redis key of this set. + * @param operations {@link RedisOperations} for the value type of this set. + * @param defaultScore + * @since 2.6 + */ + static RedisZSet create(String key, RedisOperations operations, double defaultScore) { + return new DefaultRedisZSet<>(key, operations, defaultScore); + } + + /** + * Create a new {@link RedisZSet} by intersecting this sorted set and {@link RedisZSet} and store result in + * destination {@code destKey}. + * + * @param set must not be {@literal null}. + * @param destKey must not be {@literal null}. + * @return a new {@link RedisZSet} pointing at {@code destKey} + */ RedisZSet intersectAndStore(RedisZSet set, String destKey); + /** + * Create a new {@link RedisZSet} by intersecting this sorted set and the collection {@link RedisZSet} and store + * result in destination {@code destKey}. + * + * @param sets must not be {@literal null}. + * @param destKey must not be {@literal null}. + * @return a new {@link RedisZSet} pointing at {@code destKey} + */ RedisZSet intersectAndStore(Collection> sets, String destKey); + /** + * Create a new {@link RedisZSet} by union this sorted set and {@link RedisZSet} and store result in destination + * {@code destKey}. + * + * @param set must not be {@literal null}. + * @param destKey must not be {@literal null}. + * @return a new {@link RedisZSet} pointing at {@code destKey} + */ RedisZSet unionAndStore(RedisZSet set, String destKey); + /** + * Create a new {@link RedisZSet} by union this sorted set and the collection {@link RedisZSet} and store result in + * destination {@code destKey}. + * + * @param sets must not be {@literal null}. + * @param destKey must not be {@literal null}. + * @return a new {@link RedisZSet} pointing at {@code destKey} + */ RedisZSet unionAndStore(Collection> sets, String destKey); + /** + * Get elements between {@code start} and {@code end} from sorted set. + * + * @param start + * @param end + * @return + */ Set range(long start, long end); + /** + * Get elements in range from {@code start} to {@code end} from sorted set ordered from high to low. + * + * @param start + * @param end + * @return + */ Set reverseRange(long start, long end); /** @@ -104,29 +176,88 @@ default Set reverseRangeByLex(Range range) { */ Set reverseRangeByLex(Range range, Limit limit); + /** + * Get elements where score is between {@code min} and {@code max} from sorted set. + * + * @param min + * @param max + * @return + */ Set rangeByScore(double min, double max); + /** + * Get elements where score is between {@code min} and {@code max} from sorted set ordered from high to low. + * + * @param min + * @param max + * @return + */ Set reverseRangeByScore(double min, double max); + /** + * Get set of {@link RedisZSetCommands.Tuple}s between {@code start} and {@code end} from sorted set. + * + * @param start + * @param end + * @return + */ Set> rangeWithScores(long start, long end); + /** + * Get set of {@link RedisZSetCommands.Tuple}s in range from {@code start} to {@code end} from sorted set ordered from + * high to low. + * + * @param start + * @param end + * @return + */ Set> reverseRangeWithScores(long start, long end); + /** + * Get set of {@link RedisZSetCommands.Tuple}s where score is between {@code min} and {@code max} from sorted set. + * + * @param min + * @param max + * @return + */ Set> rangeByScoreWithScores(double min, double max); + /** + * Get set of {@link RedisZSetCommands.Tuple}s where score is between {@code min} and {@code max} from sorted set + * ordered from high to low. + * + * @param min + * @param max + * @return + */ Set> reverseRangeByScoreWithScores(double min, double max); + /** + * Remove elements in range between {@code start} and {@code end} from sorted set. + * + * @param start + * @param end + * @return {@code this} set. + */ RedisZSet remove(long start, long end); /** * Remove all elements in range. * * @param range must not be {@literal null}. - * @return never {@literal null}. + * @return {@code this} set. * @since 2.5 */ + // TODO: Switch to RedisZSet Set removeByLex(Range range); + /** + * Remove elements with scores between {@code min} and {@code max} from sorted set with the bound key. + * + * @param min + * @param max + * @return {@code this} set. + */ RedisZSet removeByScore(double min, double max); /** @@ -217,6 +348,28 @@ default boolean addIfAbsent(E e) { */ E first(); + /** + * Removes the first (lowest) object at the top of this sorted set and returns that object as the value of this + * function. + * + * @return the first (lowest) element currently in this sorted set. + * @throws NoSuchElementException sorted set is empty. + * @since 2.6 + */ + E popFirst(); + + /** + * Removes the first (lowest) object at the top of this sorted set and returns that object as the value of this + * function. Blocks connection until element available or {@code timeout} reached. + * + * @param timeout + * @param unit must not be {@literal null}. + * @return the first (lowest) element currently in this sorted set. + * @throws NoSuchElementException sorted set is empty. + * @since 2.6 + */ + E popFirst(long timeout, TimeUnit unit); + /** * Returns the last (highest) element currently in this sorted set. * @@ -225,6 +378,28 @@ default boolean addIfAbsent(E e) { */ E last(); + /** + * Removes the last (highest) object at the top of this sorted set and returns that object as the value of this + * function. + * + * @return the last (highest) element currently in this sorted set. + * @throws NoSuchElementException sorted set is empty. + * @since 2.6 + */ + E popLast(); + + /** + * Removes the last (highest) object at the top of this sorted set and returns that object as the value of this + * function. Blocks connection until element available or {@code timeout} reached. + * + * @param timeout + * @param unit must not be {@literal null}. + * @return the last (highest) element currently in this sorted set. + * @throws NoSuchElementException sorted set is empty. + * @since 2.6 + */ + E popLast(long timeout, TimeUnit unit); + /** * @since 1.4 * @return diff --git a/src/test/java/org/springframework/data/redis/connection/AbstractConnectionIntegrationTests.java b/src/test/java/org/springframework/data/redis/connection/AbstractConnectionIntegrationTests.java index cacafc14fe..e45e3c33e4 100644 --- a/src/test/java/org/springframework/data/redis/connection/AbstractConnectionIntegrationTests.java +++ b/src/test/java/org/springframework/data/redis/connection/AbstractConnectionIntegrationTests.java @@ -1830,6 +1830,52 @@ void zLexCountTest() { assertThat((Long) results.get(11)).isEqualTo(3); } + @Test // GH-2007 + @EnabledOnCommand("ZPOPMIN") + void zPopMin() { + + actual.add(connection.zAdd("myzset", 1, "a")); + actual.add(connection.zAdd("myzset", 2, "b")); + actual.add(connection.zAdd("myzset", 3, "c")); + actual.add(connection.zAdd("myzset", 4, "d")); + + actual.add(connection.zPopMin("myzset")); + actual.add(connection.bZPopMin("myzset", 1, TimeUnit.SECONDS)); + actual.add(connection.zPopMin("myzset", 2)); + actual.add(connection.zPopMin("myzset")); + + List results = getResults(); + + assertThat(results.get(4)).isEqualTo(new DefaultStringTuple("a".getBytes(), "a", 1D)); + assertThat(results.get(5)).isEqualTo(new DefaultStringTuple("b".getBytes(), "b", 2D)); + assertThat((Collection) results.get(6)).containsExactly(new DefaultStringTuple("c".getBytes(), "c", 3D), + new DefaultStringTuple("d".getBytes(), "d", 4D)); + assertThat(results.get(7)).isNull(); + } + + @Test // GH-2007 + @EnabledOnCommand("ZPOPMAX") + void zPopMax() { + + actual.add(connection.zAdd("myzset", 1, "a")); + actual.add(connection.zAdd("myzset", 2, "b")); + actual.add(connection.zAdd("myzset", 3, "c")); + actual.add(connection.zAdd("myzset", 4, "d")); + + actual.add(connection.zPopMax("myzset")); + actual.add(connection.bZPopMax("myzset", 1, TimeUnit.SECONDS)); + actual.add(connection.zPopMax("myzset", 2)); + actual.add(connection.zPopMax("myzset")); + + List results = getResults(); + + assertThat(results.get(4)).isEqualTo(new DefaultStringTuple("d".getBytes(), "d", 4D)); + assertThat(results.get(5)).isEqualTo(new DefaultStringTuple("c".getBytes(), "c", 3D)); + assertThat((Collection) results.get(6)).containsExactly(new DefaultStringTuple("b".getBytes(), "b", 2D), + new DefaultStringTuple("a".getBytes(), "a", 1D)); + assertThat(results.get(7)).isNull(); + } + @Test void testZIncrBy() { actual.add(connection.zAdd("myset", 2, "Bob")); @@ -2028,6 +2074,7 @@ void testZRemRangeByLex() { @Test void testZRemRangeByScore() { + actual.add(connection.zAdd("myset", 2, "Bob")); actual.add(connection.zAdd("myset", 1, "James")); actual.add(connection.zRemRangeByScore("myset", 0d, 1d)); @@ -2038,6 +2085,7 @@ void testZRemRangeByScore() { @Test void testZRevRank() { + actual.add(connection.zAdd("myset", 2, "Bob")); actual.add(connection.zAdd("myset", 1, "James")); actual.add(connection.zAdd("myset", 3, "Joe")); @@ -2047,6 +2095,7 @@ void testZRevRank() { @Test void testZScore() { + actual.add(connection.zAdd("myset", 2, "Bob")); actual.add(connection.zAdd("myset", 1, "James")); actual.add(connection.zAdd("myset", 3, "Joe")); @@ -2054,8 +2103,20 @@ void testZScore() { verifyResults(Arrays.asList(new Object[] { true, true, true, 3d })); } + @Test + @EnabledOnCommand("ZMSCORE") + void testZMScore() { + + actual.add(connection.zAdd("myset", 2, "Bob")); + actual.add(connection.zAdd("myset", 1, "James")); + actual.add(connection.zAdd("myset", 3, "Joe")); + actual.add(connection.zMScore("myset", "James", "Joe", "Dave")); + verifyResults(Arrays.asList(new Object[] { true, true, true, Arrays.asList(1d, 3d, null) })); + } + @Test void testZUnionStore() { + actual.add(connection.zAdd("myset", 2, "Bob")); actual.add(connection.zAdd("myset", 1, "James")); actual.add(connection.zAdd("myset", 5, "Joe")); @@ -2069,6 +2130,7 @@ void testZUnionStore() { @Test void testZUnionStoreAggWeights() { + actual.add(connection.zAdd("myset", 2, "Bob")); actual.add(connection.zAdd("myset", 1, "James")); actual.add(connection.zAdd("myset", 4, "Joe")); diff --git a/src/test/java/org/springframework/data/redis/connection/ClusterConnectionTests.java b/src/test/java/org/springframework/data/redis/connection/ClusterConnectionTests.java index 90547fb6b2..661fc4fd41 100644 --- a/src/test/java/org/springframework/data/redis/connection/ClusterConnectionTests.java +++ b/src/test/java/org/springframework/data/redis/connection/ClusterConnectionTests.java @@ -610,6 +610,18 @@ public interface ClusterConnectionTests { // DATAREDIS-315 void zInterStoreShouldWorkForSameSlotKeys(); + // GH-2007 + void zPopMinShouldWorkCorrectly(); + + // GH-2007 + void bzPopMinShouldWorkCorrectly(); + + // GH-2007 + void zPopMaxShouldWorkCorrectly(); + + // GH-2007 + void bzPopMaxShouldWorkCorrectly(); + // DATAREDIS-315 void zRangeByLexShouldReturnResultCorrectly(); @@ -670,6 +682,9 @@ public interface ClusterConnectionTests { // DATAREDIS-315 void zScoreShouldRetrieveScoreForValue(); + // GH-2038 + void zMScoreShouldRetrieveScoreForValues(); + // DATAREDIS-315 void zUnionStoreShouldThrowExceptionWhenKeysDoNotMapToSameSlots(); diff --git a/src/test/java/org/springframework/data/redis/connection/DefaultStringRedisConnectionPipelineTests.java b/src/test/java/org/springframework/data/redis/connection/DefaultStringRedisConnectionPipelineTests.java index e7f2707653..9d0e4d9dcf 100644 --- a/src/test/java/org/springframework/data/redis/connection/DefaultStringRedisConnectionPipelineTests.java +++ b/src/test/java/org/springframework/data/redis/connection/DefaultStringRedisConnectionPipelineTests.java @@ -1330,8 +1330,16 @@ public void testZScore() { super.testZScore(); } + @Test + public void testZMScore() { + + doReturn(Collections.singletonList(Arrays.asList(1d, 3d))).when(nativeConnection).closePipeline(); + super.testZMScore(); + } + @Test public void testZUnionStoreAggWeightsBytes() { + doReturn(Collections.singletonList(5L)).when(nativeConnection).closePipeline(); super.testZUnionStoreAggWeightsBytes(); } diff --git a/src/test/java/org/springframework/data/redis/connection/DefaultStringRedisConnectionPipelineTxTests.java b/src/test/java/org/springframework/data/redis/connection/DefaultStringRedisConnectionPipelineTxTests.java index 2691e66c46..7e1badabc1 100644 --- a/src/test/java/org/springframework/data/redis/connection/DefaultStringRedisConnectionPipelineTxTests.java +++ b/src/test/java/org/springframework/data/redis/connection/DefaultStringRedisConnectionPipelineTxTests.java @@ -1433,6 +1433,14 @@ public void testZScore() { super.testZScore(); } + @Test + public void testZMScore() { + + doReturn(Collections.singletonList(Collections.singletonList(Arrays.asList(1d, 3d)))).when(nativeConnection) + .closePipeline(); + super.testZMScore(); + } + @Test public void testZUnionStoreAggWeightsBytes() { doReturn(Collections.singletonList(Collections.singletonList(5L))).when(nativeConnection).closePipeline(); diff --git a/src/test/java/org/springframework/data/redis/connection/DefaultStringRedisConnectionTests.java b/src/test/java/org/springframework/data/redis/connection/DefaultStringRedisConnectionTests.java index fe09f3610c..53942841ae 100644 --- a/src/test/java/org/springframework/data/redis/connection/DefaultStringRedisConnectionTests.java +++ b/src/test/java/org/springframework/data/redis/connection/DefaultStringRedisConnectionTests.java @@ -1624,6 +1624,14 @@ public void testZScore() { verifyResults(Collections.singletonList(3d)); } + @Test + public void testZMScore() { + + doReturn(Arrays.asList(1d, 3d)).when(nativeConnection).zMScore(fooBytes, barBytes, bar2Bytes); + actual.add(connection.zMScore(foo, bar, bar2)); + verifyResults(Collections.singletonList(Arrays.asList(1d, 3d))); + } + @Test public void testZUnionStoreAggWeightsBytes() { doReturn(5L).when(nativeConnection).zUnionStore(eq(fooBytes), eq(Aggregate.MAX), any(Weights.class), eq(fooBytes)); diff --git a/src/test/java/org/springframework/data/redis/connection/DefaultStringRedisConnectionTxTests.java b/src/test/java/org/springframework/data/redis/connection/DefaultStringRedisConnectionTxTests.java index c846f0bb29..08c51943db 100644 --- a/src/test/java/org/springframework/data/redis/connection/DefaultStringRedisConnectionTxTests.java +++ b/src/test/java/org/springframework/data/redis/connection/DefaultStringRedisConnectionTxTests.java @@ -1316,6 +1316,13 @@ public void testZScore() { super.testZScore(); } + @Test + public void testZMScore() { + + doReturn(Collections.singletonList(Arrays.asList(1d, 3d))).when(nativeConnection).exec(); + super.testZMScore(); + } + @Test public void testZUnionStoreAggWeightsBytes() { doReturn(Collections.singletonList(5L)).when(nativeConnection).exec(); diff --git a/src/test/java/org/springframework/data/redis/connection/RedisConnectionUnitTests.java b/src/test/java/org/springframework/data/redis/connection/RedisConnectionUnitTests.java index 019f0f4916..7f787b794c 100644 --- a/src/test/java/org/springframework/data/redis/connection/RedisConnectionUnitTests.java +++ b/src/test/java/org/springframework/data/redis/connection/RedisConnectionUnitTests.java @@ -833,6 +833,10 @@ public Double zScore(byte[] key, byte[] value) { return delegate.zScore(key, value); } + public List zMScore(byte[] key, byte[][] values) { + return delegate.zMScore(key, values); + } + public Long zRemRange(byte[] key, long begin, long end) { return delegate.zRemRange(key, begin, end); } diff --git a/src/test/java/org/springframework/data/redis/connection/jedis/JedisClusterConnectionTests.java b/src/test/java/org/springframework/data/redis/connection/jedis/JedisClusterConnectionTests.java index 1ad156d01d..b14727033d 100644 --- a/src/test/java/org/springframework/data/redis/connection/jedis/JedisClusterConnectionTests.java +++ b/src/test/java/org/springframework/data/redis/connection/jedis/JedisClusterConnectionTests.java @@ -2026,6 +2026,56 @@ public void zInterStoreShouldWorkForSameSlotKeys() { assertThat(nativeConnection.zrange(SAME_SLOT_KEY_3_BYTES, 0, -1)).contains(VALUE_2_BYTES); } + @Test // GH-2007 + @EnabledOnCommand("ZPOPMIN") + public void zPopMinShouldWorkCorrectly() { + + nativeConnection.zadd(KEY_1_BYTES, 10D, VALUE_1_BYTES); + nativeConnection.zadd(KEY_1_BYTES, 20D, VALUE_2_BYTES); + nativeConnection.zadd(KEY_1_BYTES, 30D, VALUE_3_BYTES); + + assertThat(clusterConnection.zPopMin(KEY_1_BYTES)).isEqualTo(new DefaultTuple(VALUE_1_BYTES, 10D)); + assertThat(clusterConnection.zPopMin(KEY_1_BYTES, 2)).containsExactly(new DefaultTuple(VALUE_2_BYTES, 20D), + new DefaultTuple(VALUE_3_BYTES, 30D)); + } + + @Test // GH-2007 + @EnabledOnCommand("BZPOPMIN") + public void bzPopMinShouldWorkCorrectly() { + + nativeConnection.zadd(KEY_1_BYTES, 10D, VALUE_1_BYTES); + nativeConnection.zadd(KEY_1_BYTES, 20D, VALUE_2_BYTES); + nativeConnection.zadd(KEY_1_BYTES, 30D, VALUE_3_BYTES); + + assertThat(clusterConnection.bZPopMin(KEY_1_BYTES, 1, TimeUnit.SECONDS)) + .isEqualTo(new DefaultTuple(VALUE_1_BYTES, 10D)); + } + + @Test // GH-2007 + @EnabledOnCommand("ZPOPMAX") + public void zPopMaxShouldWorkCorrectly() { + + nativeConnection.zadd(KEY_1_BYTES, 10D, VALUE_1_BYTES); + nativeConnection.zadd(KEY_1_BYTES, 20D, VALUE_2_BYTES); + nativeConnection.zadd(KEY_1_BYTES, 30D, VALUE_3_BYTES); + + assertThat(clusterConnection.zPopMax(KEY_1_BYTES)).isEqualTo(new DefaultTuple(VALUE_3_BYTES, 30D)); + assertThat(clusterConnection.zPopMax(KEY_1_BYTES, 2)).containsExactly(new DefaultTuple(VALUE_2_BYTES, 20D), + new DefaultTuple(VALUE_1_BYTES, 10D)); + } + + @Test // GH-2007 + @EnabledOnCommand("BZPOPMAX") + public void bzPopMaxShouldWorkCorrectly() { + + nativeConnection.zadd(KEY_1_BYTES, 10D, VALUE_1_BYTES); + nativeConnection.zadd(KEY_1_BYTES, 20D, VALUE_2_BYTES); + nativeConnection.zadd(KEY_1_BYTES, 30D, VALUE_3_BYTES); + + assertThat(clusterConnection.bZPopMax(KEY_1_BYTES, 1, TimeUnit.SECONDS)) + .isEqualTo(new DefaultTuple(VALUE_3_BYTES, 30D)); + } + @Test // DATAREDIS-315 public void zRangeByLexShouldReturnResultCorrectly() { @@ -2304,6 +2354,16 @@ public void zScoreShouldRetrieveScoreForValue() { assertThat(clusterConnection.zScore(KEY_1_BYTES, VALUE_2_BYTES)).isEqualTo(20D); } + @Test // GH-2038 + @EnabledOnCommand("ZMSCORE") + public void zMScoreShouldRetrieveScoreForValues() { + + nativeConnection.zadd(KEY_1_BYTES, 10D, VALUE_1_BYTES); + nativeConnection.zadd(KEY_1_BYTES, 20D, VALUE_2_BYTES); + + assertThat(clusterConnection.zMScore(KEY_1_BYTES, VALUE_1_BYTES, VALUE_2_BYTES)).containsSequence(10D, 20D); + } + @Test // DATAREDIS-315 public void zUnionStoreShouldThrowExceptionWhenKeysDoNotMapToSameSlots() { assertThatExceptionOfType(DataAccessException.class) diff --git a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnectionTests.java b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnectionTests.java index e628e234ed..d6c098feac 100644 --- a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnectionTests.java +++ b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnectionTests.java @@ -2067,6 +2067,56 @@ public void zInterStoreShouldWorkForSameSlotKeys() { assertThat(nativeConnection.zrange(SAME_SLOT_KEY_3, 0, -1)).contains(VALUE_2); } + @Test // GH-2007 + @EnabledOnCommand("ZPOPMIN") + public void zPopMinShouldWorkCorrectly() { + + nativeConnection.zadd(KEY_1, 10D, VALUE_1); + nativeConnection.zadd(KEY_1, 20D, VALUE_2); + nativeConnection.zadd(KEY_1, 30D, VALUE_3); + + assertThat(clusterConnection.zPopMin(KEY_1_BYTES)).isEqualTo(new DefaultTuple(VALUE_1_BYTES, 10D)); + assertThat(clusterConnection.zPopMin(KEY_1_BYTES, 2)).containsExactly(new DefaultTuple(VALUE_2_BYTES, 20D), + new DefaultTuple(VALUE_3_BYTES, 30D)); + } + + @Test // GH-2007 + @EnabledOnCommand("BZPOPMIN") + public void bzPopMinShouldWorkCorrectly() { + + nativeConnection.zadd(KEY_1, 10D, VALUE_1); + nativeConnection.zadd(KEY_1, 20D, VALUE_2); + nativeConnection.zadd(KEY_1, 30D, VALUE_3); + + assertThat(clusterConnection.bZPopMin(KEY_1_BYTES, 1, TimeUnit.SECONDS)) + .isEqualTo(new DefaultTuple(VALUE_1_BYTES, 10D)); + } + + @Test // GH-2007 + @EnabledOnCommand("ZPOPMAX") + public void zPopMaxShouldWorkCorrectly() { + + nativeConnection.zadd(KEY_1, 10D, VALUE_1); + nativeConnection.zadd(KEY_1, 20D, VALUE_2); + nativeConnection.zadd(KEY_1, 30D, VALUE_3); + + assertThat(clusterConnection.zPopMax(KEY_1_BYTES)).isEqualTo(new DefaultTuple(VALUE_3_BYTES, 30D)); + assertThat(clusterConnection.zPopMax(KEY_1_BYTES, 2)).containsExactly(new DefaultTuple(VALUE_2_BYTES, 20D), + new DefaultTuple(VALUE_1_BYTES, 10D)); + } + + @Test // GH-2007 + @EnabledOnCommand("BZPOPMAX") + public void bzPopMaxShouldWorkCorrectly() { + + nativeConnection.zadd(KEY_1, 10D, VALUE_1); + nativeConnection.zadd(KEY_1, 20D, VALUE_2); + nativeConnection.zadd(KEY_1, 30D, VALUE_3); + + assertThat(clusterConnection.bZPopMax(KEY_1_BYTES, 1, TimeUnit.SECONDS)) + .isEqualTo(new DefaultTuple(VALUE_3_BYTES, 30D)); + } + @Test // DATAREDIS-315 public void zRangeByLexShouldReturnResultCorrectly() { @@ -2345,6 +2395,16 @@ public void zScoreShouldRetrieveScoreForValue() { assertThat(clusterConnection.zScore(KEY_1_BYTES, VALUE_2_BYTES)).isEqualTo(20D); } + @Test // GH-2038 + @EnabledOnCommand("ZMSCORE") + public void zMScoreShouldRetrieveScoreForValues() { + + nativeConnection.zadd(KEY_1, 10D, VALUE_1); + nativeConnection.zadd(KEY_1, 20D, VALUE_2); + + assertThat(clusterConnection.zMScore(KEY_1_BYTES, VALUE_1_BYTES, VALUE_2_BYTES)).containsExactly(10D, 20D); + } + @Test // DATAREDIS-315 public void zUnionStoreShouldThrowExceptionWhenKeysDoNotMapToSameSlots() { assertThatExceptionOfType(DataAccessException.class) diff --git a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveZSetCommandsIntegrationTests.java b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveZSetCommandsIntegrationTests.java index 0e4cb2de42..9609c98703 100644 --- a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveZSetCommandsIntegrationTests.java +++ b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveZSetCommandsIntegrationTests.java @@ -22,11 +22,13 @@ import reactor.test.StepVerifier; import java.nio.ByteBuffer; +import java.time.Duration; import java.util.Arrays; import org.springframework.data.domain.Range; import org.springframework.data.redis.connection.DefaultTuple; import org.springframework.data.redis.core.ScanOptions; +import org.springframework.data.redis.test.condition.EnabledOnCommand; import org.springframework.data.redis.test.extension.parametrized.ParameterizedRedisTest; /** @@ -384,6 +386,62 @@ void zCountShouldCountValuesInRangeWithPositiveInfinity() { .isEqualTo(2L); } + @ParameterizedRedisTest // GH-2007 + @EnabledOnCommand("ZPOPMIN") + void zPopMinShouldReturnCorrectly() { + + nativeCommands.zadd(KEY_1, 1D, VALUE_1); + nativeCommands.zadd(KEY_1, 2D, VALUE_2); + nativeCommands.zadd(KEY_1, 3D, VALUE_3); + + connection.zSetCommands().zPopMin(KEY_1_BBUFFER).as(StepVerifier::create) + .expectNext(new DefaultTuple(VALUE_1_BYTES, 1D)).verifyComplete(); + + connection.zSetCommands().zPopMin(KEY_1_BBUFFER, 2).as(StepVerifier::create) + .expectNext(new DefaultTuple(VALUE_2_BYTES, 2D)).expectNext(new DefaultTuple(VALUE_3_BYTES, 3D)) + .verifyComplete(); + } + + @ParameterizedRedisTest // GH-2007 + @EnabledOnCommand("BZPOPMIN") + void bzPopMinShouldReturnCorrectly() { + + nativeCommands.zadd(KEY_1, 1D, VALUE_1); + nativeCommands.zadd(KEY_1, 2D, VALUE_2); + nativeCommands.zadd(KEY_1, 3D, VALUE_3); + + connection.zSetCommands().bZPopMin(KEY_1_BBUFFER, Duration.ofSeconds(1)).as(StepVerifier::create) + .expectNext(new DefaultTuple(VALUE_1_BYTES, 1D)).verifyComplete(); + } + + @ParameterizedRedisTest // GH-2007 + @EnabledOnCommand("ZPOPMAX") + void zPopMaxShouldReturnCorrectly() { + + nativeCommands.zadd(KEY_1, 1D, VALUE_1); + nativeCommands.zadd(KEY_1, 2D, VALUE_2); + nativeCommands.zadd(KEY_1, 3D, VALUE_3); + + connection.zSetCommands().zPopMax(KEY_1_BBUFFER).as(StepVerifier::create) + .expectNext(new DefaultTuple(VALUE_3_BYTES, 3D)).verifyComplete(); + + connection.zSetCommands().zPopMax(KEY_1_BBUFFER, 2).as(StepVerifier::create) + .expectNext(new DefaultTuple(VALUE_2_BYTES, 2D)).expectNext(new DefaultTuple(VALUE_1_BYTES, 1D)) + .verifyComplete(); + } + + @ParameterizedRedisTest // GH-2007 + @EnabledOnCommand("BZPOPMAX") + void bzPopMaxShouldReturnCorrectly() { + + nativeCommands.zadd(KEY_1, 1D, VALUE_1); + nativeCommands.zadd(KEY_1, 2D, VALUE_2); + nativeCommands.zadd(KEY_1, 3D, VALUE_3); + + connection.zSetCommands().bZPopMax(KEY_1_BBUFFER, Duration.ofSeconds(1)).as(StepVerifier::create) + .expectNext(new DefaultTuple(VALUE_3_BYTES, 3D)).verifyComplete(); + } + @ParameterizedRedisTest // DATAREDIS-525 void zCardShouldReturnSizeCorrectly() { @@ -402,6 +460,17 @@ void zScoreShouldReturnScoreCorrectly() { assertThat(connection.zSetCommands().zScore(KEY_1_BBUFFER, VALUE_2_BBUFFER).block()).isEqualTo(2D); } + @ParameterizedRedisTest // GH-2038 + @EnabledOnCommand("ZMSCORE") + void zMScoreShouldReturnScoreCorrectly() { + + nativeCommands.zadd(KEY_1, 1D, VALUE_1); + nativeCommands.zadd(KEY_1, 2D, VALUE_2); + + connection.zSetCommands().zMScore(KEY_1_BBUFFER, Arrays.asList(VALUE_1_BBUFFER, VALUE_2_BBUFFER)) + .as(StepVerifier::create).expectNext(Arrays.asList(1D, 2D)).verifyComplete(); + } + @ParameterizedRedisTest // DATAREDIS-525 void zRemRangeByRankShouldRemoveValuesCorrectly() { diff --git a/src/test/java/org/springframework/data/redis/core/DefaultReactiveZSetOperationsIntegrationTests.java b/src/test/java/org/springframework/data/redis/core/DefaultReactiveZSetOperationsIntegrationTests.java index 2950a54d0a..6c2d00e876 100644 --- a/src/test/java/org/springframework/data/redis/core/DefaultReactiveZSetOperationsIntegrationTests.java +++ b/src/test/java/org/springframework/data/redis/core/DefaultReactiveZSetOperationsIntegrationTests.java @@ -20,6 +20,7 @@ import reactor.test.StepVerifier; +import java.time.Duration; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -38,6 +39,7 @@ import org.springframework.data.redis.core.ReactiveOperationsTestParams.Fixture; import org.springframework.data.redis.serializer.RedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer; +import org.springframework.data.redis.test.condition.EnabledOnCommand; import org.springframework.data.redis.test.extension.parametrized.MethodSource; import org.springframework.data.redis.test.extension.parametrized.ParameterizedRedisTest; @@ -433,6 +435,52 @@ void lexCount() { zSetOperations.lexCount(key, Range.rightOpen("b", "f")).as(StepVerifier::create).expectNext(4L).verifyComplete(); } + @ParameterizedRedisTest // GH-2007 + @EnabledOnCommand("ZPOPMIN") + void popMin() { + + K key = keyFactory.instance(); + V value1 = valueFactory.instance(); + V value2 = valueFactory.instance(); + V value3 = valueFactory.instance(); + V value4 = valueFactory.instance(); + + zSetOperations.add(key, value1, 1).as(StepVerifier::create).expectNext(true).verifyComplete(); + zSetOperations.add(key, value2, 2).as(StepVerifier::create).expectNext(true).verifyComplete(); + zSetOperations.add(key, value3, 3).as(StepVerifier::create).expectNext(true).verifyComplete(); + zSetOperations.add(key, value4, 4).as(StepVerifier::create).expectNext(true).verifyComplete(); + + zSetOperations.popMin(key).as(StepVerifier::create).expectNext(new DefaultTypedTuple<>(value1, 1D)) + .verifyComplete(); + zSetOperations.popMin(key, Duration.ofSeconds(1)).as(StepVerifier::create) + .expectNext(new DefaultTypedTuple<>(value2, 2D)).verifyComplete(); + zSetOperations.popMin(key, 2).as(StepVerifier::create).expectNext(new DefaultTypedTuple<>(value3, 3D)) + .expectNext(new DefaultTypedTuple<>(value4, 4D)).verifyComplete(); + } + + @ParameterizedRedisTest // GH-2007 + @EnabledOnCommand("ZPOPMAX") + void popMax() { + + K key = keyFactory.instance(); + V value1 = valueFactory.instance(); + V value2 = valueFactory.instance(); + V value3 = valueFactory.instance(); + V value4 = valueFactory.instance(); + + zSetOperations.add(key, value1, 1).as(StepVerifier::create).expectNext(true).verifyComplete(); + zSetOperations.add(key, value2, 2).as(StepVerifier::create).expectNext(true).verifyComplete(); + zSetOperations.add(key, value3, 3).as(StepVerifier::create).expectNext(true).verifyComplete(); + zSetOperations.add(key, value4, 4).as(StepVerifier::create).expectNext(true).verifyComplete(); + + zSetOperations.popMax(key).as(StepVerifier::create).expectNext(new DefaultTypedTuple<>(value4, 4D)) + .verifyComplete(); + zSetOperations.popMax(key, Duration.ofSeconds(1)).as(StepVerifier::create) + .expectNext(new DefaultTypedTuple<>(value3, 3D)).verifyComplete(); + zSetOperations.popMax(key, 2).as(StepVerifier::create).expectNext(new DefaultTypedTuple<>(value2, 2D)) + .expectNext(new DefaultTypedTuple<>(value1, 1D)).verifyComplete(); + } + @ParameterizedRedisTest // DATAREDIS-602 void size() { @@ -460,6 +508,21 @@ void score() { zSetOperations.score(key, value2).as(StepVerifier::create).expectNext(10d).verifyComplete(); } + @ParameterizedRedisTest // GH-2038 + @EnabledOnCommand("ZMSCORE") + void scores() { + + K key = keyFactory.instance(); + V value1 = valueFactory.instance(); + V value2 = valueFactory.instance(); + + zSetOperations.add(key, value1, 42.1).as(StepVerifier::create).expectNext(true).verifyComplete(); + zSetOperations.add(key, value2, 10).as(StepVerifier::create).expectNext(true).verifyComplete(); + + zSetOperations.score(key, value1, value2, valueFactory.instance()).as(StepVerifier::create) + .expectNext(Arrays.asList(42.1d, 10d, null)).verifyComplete(); + } + @ParameterizedRedisTest // DATAREDIS-602 void removeRange() { diff --git a/src/test/java/org/springframework/data/redis/core/DefaultZSetOperationsIntegrationTests.java b/src/test/java/org/springframework/data/redis/core/DefaultZSetOperationsIntegrationTests.java index aa1860e006..62f33ef7b5 100644 --- a/src/test/java/org/springframework/data/redis/core/DefaultZSetOperationsIntegrationTests.java +++ b/src/test/java/org/springframework/data/redis/core/DefaultZSetOperationsIntegrationTests.java @@ -24,6 +24,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.BeforeEach; @@ -35,6 +36,7 @@ import org.springframework.data.redis.connection.RedisZSetCommands; import org.springframework.data.redis.connection.RedisZSetCommands.Weights; import org.springframework.data.redis.core.ZSetOperations.TypedTuple; +import org.springframework.data.redis.test.condition.EnabledOnCommand; import org.springframework.data.redis.test.extension.parametrized.MethodSource; import org.springframework.data.redis.test.extension.parametrized.ParameterizedRedisTest; @@ -128,6 +130,48 @@ void testLexCountBounded() { assertThat(zSetOps.lexCount(key, RedisZSetCommands.Range.range().gt(value1))).isEqualTo(2); } + @ParameterizedRedisTest // GH-2007 + @EnabledOnCommand("ZPOPMIN") + void testPopMin() { + + K key = keyFactory.instance(); + V value1 = valueFactory.instance(); + V value2 = valueFactory.instance(); + V value3 = valueFactory.instance(); + V value4 = valueFactory.instance(); + + zSetOps.add(key, value1, 1); + zSetOps.add(key, value2, 2); + zSetOps.add(key, value3, 3); + zSetOps.add(key, value4, 4); + + assertThat(zSetOps.popMin(key)).isEqualTo(new DefaultTypedTuple<>(value1, 1d)); + assertThat(zSetOps.popMin(key, 2)).containsExactly(new DefaultTypedTuple<>(value2, 2d), + new DefaultTypedTuple<>(value3, 3d)); + assertThat(zSetOps.popMin(key, 1, TimeUnit.SECONDS)).isEqualTo(new DefaultTypedTuple<>(value4, 4d)); + } + + @ParameterizedRedisTest // GH-2007 + @EnabledOnCommand("ZPOPMAX") + void testPopMax() { + + K key = keyFactory.instance(); + V value1 = valueFactory.instance(); + V value2 = valueFactory.instance(); + V value3 = valueFactory.instance(); + V value4 = valueFactory.instance(); + + zSetOps.add(key, value1, 1); + zSetOps.add(key, value2, 2); + zSetOps.add(key, value3, 3); + zSetOps.add(key, value4, 4); + + assertThat(zSetOps.popMax(key)).isEqualTo(new DefaultTypedTuple<>(value4, 4d)); + assertThat(zSetOps.popMax(key, 2)).containsExactly(new DefaultTypedTuple<>(value3, 3d), + new DefaultTypedTuple<>(value2, 2d)); + assertThat(zSetOps.popMax(key, 1, TimeUnit.SECONDS)).isEqualTo(new DefaultTypedTuple<>(value1, 1d)); + } + @ParameterizedRedisTest void testIncrementScore() { @@ -339,6 +383,21 @@ void testRemove() { assertThat(zSetOps.range(key, 0, -1)).containsOnly(value2); } + @ParameterizedRedisTest + void testScore() { + + K key = keyFactory.instance(); + V value1 = valueFactory.instance(); + V value2 = valueFactory.instance(); + V value3 = valueFactory.instance(); + + zSetOps.add(key, value1, 1.9); + zSetOps.add(key, value2, 3.7); + zSetOps.add(key, value3, 5.8); + + assertThat(zSetOps.score(key, value1, value2, valueFactory.instance())).containsExactly(1.9d, 3.7d, null); + } + @ParameterizedRedisTest void zCardRetrievesDataCorrectly() { diff --git a/src/test/java/org/springframework/data/redis/support/collections/AbstractRedisZSetTestIntegration.java b/src/test/java/org/springframework/data/redis/support/collections/AbstractRedisZSetTestIntegration.java index ee42ccb61c..71355615cd 100644 --- a/src/test/java/org/springframework/data/redis/support/collections/AbstractRedisZSetTestIntegration.java +++ b/src/test/java/org/springframework/data/redis/support/collections/AbstractRedisZSetTestIntegration.java @@ -23,6 +23,7 @@ import java.util.Iterator; import java.util.NoSuchElementException; import java.util.Set; +import java.util.concurrent.TimeUnit; import org.assertj.core.data.Offset; import org.junit.jupiter.api.BeforeEach; @@ -37,6 +38,7 @@ import org.springframework.data.redis.core.Cursor; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.ZSetOperations.TypedTuple; +import org.springframework.data.redis.test.condition.EnabledOnCommand; import org.springframework.data.redis.test.extension.parametrized.ParameterizedRedisTest; /** @@ -119,6 +121,38 @@ void testFirst() { assertThat(zSet.first()).isEqualTo(t1); } + @ParameterizedRedisTest // GH-2038 + @EnabledOnCommand("ZPOPMIN") + void testPopFirst() { + + T t1 = getT(); + T t2 = getT(); + T t3 = getT(); + + zSet.add(t1, 3); + zSet.add(t2, 4); + zSet.add(t3, 5); + + assertThat(zSet.popFirst()).isEqualTo(t1); + assertThat(zSet).hasSize(2); + } + + @ParameterizedRedisTest // GH-2038 + @EnabledOnCommand("ZPOPMIN") + void testPopFirstWithTimeout() { + + T t1 = getT(); + T t2 = getT(); + T t3 = getT(); + + zSet.add(t1, 3); + zSet.add(t2, 4); + zSet.add(t3, 5); + + assertThat(zSet.popFirst(1, TimeUnit.SECONDS)).isEqualTo(t1); + assertThat(zSet).hasSize(2); + } + @ParameterizedRedisTest void testFirstException() { assertThatExceptionOfType(NoSuchElementException.class).isThrownBy(() -> zSet.first()); @@ -126,6 +160,7 @@ void testFirstException() { @ParameterizedRedisTest void testLast() { + T t1 = getT(); T t2 = getT(); T t3 = getT(); @@ -138,6 +173,38 @@ void testLast() { assertThat(zSet.last()).isEqualTo(t3); } + @ParameterizedRedisTest + @EnabledOnCommand("ZPOPMAX") + void testPopLast() { + + T t1 = getT(); + T t2 = getT(); + T t3 = getT(); + + zSet.add(t1, 3); + zSet.add(t2, 4); + zSet.add(t3, 5); + + assertThat(zSet.popLast()).isEqualTo(t3); + assertThat(zSet).hasSize(2); + } + + @ParameterizedRedisTest + @EnabledOnCommand("ZPOPMAX") + void testPopLastWithTimeout() { + + T t1 = getT(); + T t2 = getT(); + T t3 = getT(); + + zSet.add(t1, 3); + zSet.add(t2, 4); + zSet.add(t3, 5); + + assertThat(zSet.popLast(1, TimeUnit.SECONDS)).isEqualTo(t3); + assertThat(zSet).hasSize(2); + } + @ParameterizedRedisTest void testLastException() { assertThatExceptionOfType(NoSuchElementException.class).isThrownBy(() -> zSet.last()); diff --git a/src/test/java/org/springframework/data/redis/support/collections/RedisZSetIntegrationTests.java b/src/test/java/org/springframework/data/redis/support/collections/RedisZSetIntegrationTests.java index 86c87d8b12..b76406eee5 100644 --- a/src/test/java/org/springframework/data/redis/support/collections/RedisZSetIntegrationTests.java +++ b/src/test/java/org/springframework/data/redis/support/collections/RedisZSetIntegrationTests.java @@ -36,7 +36,7 @@ public RedisZSetIntegrationTests(ObjectFactory factory, RedisTemplate te } RedisStore copyStore(RedisStore store) { - return new DefaultRedisZSet(store.getKey().toString(), store.getOperations()); + return RedisZSet.create(store.getKey(), store.getOperations()); } AbstractRedisCollection createCollection() {