diff --git a/pom.xml b/pom.xml
index a67dc48660..5c09eb588c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -5,7 +5,7 @@
org.springframework.data
spring-data-redis
- 3.2.0-SNAPSHOT
+ 3.2.0-GH-2658-SNAPSHOT
Spring Data Redis
Spring Data module for Redis
diff --git a/src/main/java/org/springframework/data/redis/core/DefaultReactiveGeoOperations.java b/src/main/java/org/springframework/data/redis/core/DefaultReactiveGeoOperations.java
index b14b5d58c0..cdca3be9a1 100644
--- a/src/main/java/org/springframework/data/redis/core/DefaultReactiveGeoOperations.java
+++ b/src/main/java/org/springframework/data/redis/core/DefaultReactiveGeoOperations.java
@@ -68,7 +68,7 @@ public Mono add(K key, Point point, V member) {
Assert.notNull(point, "Point must not be null");
Assert.notNull(member, "Member must not be null");
- return createMono(connection -> connection.geoAdd(rawKey(key), point, rawValue(member)));
+ return createMono(geoCommands -> geoCommands.geoAdd(rawKey(key), point, rawValue(member)));
}
@Override
@@ -77,7 +77,7 @@ public Mono add(K key, GeoLocation location) {
Assert.notNull(key, "Key must not be null");
Assert.notNull(location, "GeoLocation must not be null");
- return createMono(connection -> connection.geoAdd(rawKey(key),
+ return createMono(geoCommands -> geoCommands.geoAdd(rawKey(key),
new GeoLocation<>(rawValue(location.getName()), location.getPoint())));
}
@@ -87,13 +87,13 @@ public Mono add(K key, Map memberCoordinateMap) {
Assert.notNull(key, "Key must not be null");
Assert.notNull(memberCoordinateMap, "MemberCoordinateMap must not be null");
- return createMono(connection -> {
+ return createMono(geoCommands -> {
Mono>> serializedList = Flux
.fromIterable(() -> memberCoordinateMap.entrySet().iterator())
.map(entry -> new GeoLocation<>(rawValue(entry.getKey()), entry.getValue())).collectList();
- return serializedList.flatMap(list -> connection.geoAdd(rawKey(key), list));
+ return serializedList.flatMap(list -> geoCommands.geoAdd(rawKey(key), list));
});
}
@@ -103,12 +103,12 @@ public Mono add(K key, Iterable> geoLocations) {
Assert.notNull(key, "Key must not be null");
Assert.notNull(geoLocations, "GeoLocations must not be null");
- return createMono(connection -> {
+ return createMono(geoCommands -> {
Mono>> serializedList = Flux.fromIterable(geoLocations)
.map(location -> new GeoLocation<>(rawValue(location.getName()), location.getPoint())).collectList();
- return serializedList.flatMap(list -> connection.geoAdd(rawKey(key), list));
+ return serializedList.flatMap(list -> geoCommands.geoAdd(rawKey(key), list));
});
}
@@ -118,11 +118,11 @@ public Flux add(K key, Publisher extends Collection>> loc
Assert.notNull(key, "Key must not be null");
Assert.notNull(locations, "Locations must not be null");
- return createFlux(connection -> Flux.from(locations)
+ return createFlux(geoCommands -> Flux.from(locations)
.map(locationList -> locationList.stream()
.map(location -> new GeoLocation<>(rawValue(location.getName()), location.getPoint()))
.collect(Collectors.toList()))
- .flatMap(list -> connection.geoAdd(rawKey(key), list)));
+ .flatMap(list -> geoCommands.geoAdd(rawKey(key), list)));
}
@Override
@@ -132,7 +132,7 @@ public Mono distance(K key, V member1, V member2) {
Assert.notNull(member1, "Member 1 must not be null");
Assert.notNull(member2, "Member 2 must not be null");
- return createMono(connection -> connection.geoDist(rawKey(key), rawValue(member1), rawValue(member2)));
+ return createMono(geoCommands -> geoCommands.geoDist(rawKey(key), rawValue(member1), rawValue(member2)));
}
@Override
@@ -143,7 +143,7 @@ public Mono distance(K key, V member1, V member2, Metric metric) {
Assert.notNull(member2, "Member 2 must not be null");
Assert.notNull(metric, "Metric must not be null");
- return createMono(connection -> connection.geoDist(rawKey(key), rawValue(member1), rawValue(member2), metric));
+ return createMono(geoCommands -> geoCommands.geoDist(rawKey(key), rawValue(member1), rawValue(member2), metric));
}
@Override
@@ -152,7 +152,7 @@ public Mono hash(K key, V member) {
Assert.notNull(key, "Key must not be null");
Assert.notNull(member, "Member must not be null");
- return createMono(connection -> connection.geoHash(rawKey(key), rawValue(member)));
+ return createMono(geoCommands -> geoCommands.geoHash(rawKey(key), rawValue(member)));
}
@Override
@@ -163,10 +163,10 @@ public final Mono> hash(K key, V... members) {
Assert.notEmpty(members, "Members must not be null or empty");
Assert.noNullElements(members, "Members must not contain null elements");
- return createMono(connection -> Flux.fromArray(members) //
+ return createMono(geoCommands -> Flux.fromArray(members) //
.map(this::rawValue) //
.collectList() //
- .flatMap(serialized -> connection.geoHash(rawKey(key), serialized)));
+ .flatMap(serialized -> geoCommands.geoHash(rawKey(key), serialized)));
}
@Override
@@ -175,7 +175,7 @@ public Mono position(K key, V member) {
Assert.notNull(key, "Key must not be null");
Assert.notNull(member, "Member must not be null");
- return createMono(connection -> connection.geoPos(rawKey(key), rawValue(member)));
+ return createMono(geoCommands -> geoCommands.geoPos(rawKey(key), rawValue(member)));
}
@Override
@@ -186,10 +186,10 @@ public final Mono> position(K key, V... members) {
Assert.notEmpty(members, "Members must not be null or empty");
Assert.noNullElements(members, "Members must not contain null elements");
- return createMono(connection -> Flux.fromArray(members) //
+ return createMono(geoCommands -> Flux.fromArray(members) //
.map(this::rawValue) //
.collectList() //
- .flatMap(serialized -> connection.geoPos(rawKey(key), serialized)));
+ .flatMap(serialized -> geoCommands.geoPos(rawKey(key), serialized)));
}
@Override
@@ -198,7 +198,7 @@ public Flux>> radius(K key, Circle within) {
Assert.notNull(key, "Key must not be null");
Assert.notNull(within, "Circle must not be null");
- return createFlux(connection -> connection.geoRadius(rawKey(key), within).map(this::readGeoResult));
+ return createFlux(geoCommands -> geoCommands.geoRadius(rawKey(key), within).map(this::readGeoResult));
}
@Override
@@ -208,7 +208,7 @@ public Flux>> radius(K key, Circle within, GeoRadiusCom
Assert.notNull(within, "Circle must not be null");
Assert.notNull(args, "GeoRadiusCommandArgs must not be null");
- return createFlux(connection -> connection.geoRadius(rawKey(key), within, args) //
+ return createFlux(geoCommands -> geoCommands.geoRadius(rawKey(key), within, args) //
.map(this::readGeoResult));
}
@@ -218,8 +218,9 @@ public Flux>> radius(K key, V member, double radius) {
Assert.notNull(key, "Key must not be null");
Assert.notNull(member, "Member must not be null");
- return createFlux(connection -> connection.geoRadiusByMember(rawKey(key), rawValue(member), new Distance(radius)) //
- .map(this::readGeoResult));
+ return createFlux(geoCommands ->
+ geoCommands.geoRadiusByMember(rawKey(key), rawValue(member), new Distance(radius)) //
+ .map(this::readGeoResult));
}
@Override
@@ -229,7 +230,7 @@ public Flux>> radius(K key, V member, Distance distance
Assert.notNull(member, "Member must not be null");
Assert.notNull(distance, "Distance must not be null");
- return createFlux(connection -> connection.geoRadiusByMember(rawKey(key), rawValue(member), distance) //
+ return createFlux(geoCommands -> geoCommands.geoRadiusByMember(rawKey(key), rawValue(member), distance) //
.map(this::readGeoResult));
}
@@ -241,7 +242,7 @@ public Flux>> radius(K key, V member, Distance distance
Assert.notNull(distance, "Distance must not be null");
Assert.notNull(args, "GeoRadiusCommandArgs must not be null");
- return createFlux(connection -> connection.geoRadiusByMember(rawKey(key), rawValue(member), distance, args))
+ return createFlux(geoCommands -> geoCommands.geoRadiusByMember(rawKey(key), rawValue(member), distance, args))
.map(this::readGeoResult);
}
@@ -308,8 +309,7 @@ private Flux createFlux(Function> funct
@SuppressWarnings("unchecked")
private GeoReference getGeoReference(GeoReference reference) {
return reference instanceof GeoReference.GeoMemberReference
- ? GeoReference
- .fromMember(rawValue(((GeoMemberReference) reference).getMember()))
+ ? GeoReference.fromMember(rawValue(((GeoMemberReference) reference).getMember()))
: (GeoReference) reference;
}
@@ -327,7 +327,7 @@ private V readValue(ByteBuffer buffer) {
private GeoResult> readGeoResult(GeoResult> source) {
- return new GeoResult<>(new GeoLocation(readValue(source.getContent().getName()), source.getContent().getPoint()),
+ return new GeoResult<>(new GeoLocation<>(readValue(source.getContent().getName()), source.getContent().getPoint()),
source.getDistance());
}
}
diff --git a/src/main/java/org/springframework/data/redis/core/DefaultReactiveHashOperations.java b/src/main/java/org/springframework/data/redis/core/DefaultReactiveHashOperations.java
index 21cab4155a..db9a74bbb9 100644
--- a/src/main/java/org/springframework/data/redis/core/DefaultReactiveHashOperations.java
+++ b/src/main/java/org/springframework/data/redis/core/DefaultReactiveHashOperations.java
@@ -60,10 +60,10 @@ public Mono remove(H key, Object... hashKeys) {
Assert.notEmpty(hashKeys, "Hash keys must not be empty");
Assert.noNullElements(hashKeys, "Hash keys must not contain null elements");
- return createMono(connection -> Flux.fromArray(hashKeys) //
+ return createMono(hashCommands -> Flux.fromArray(hashKeys) //
.map(o -> (HK) o).map(this::rawHashKey) //
.collectList() //
- .flatMap(hks -> connection.hDel(rawKey(key), hks)));
+ .flatMap(hks -> hashCommands.hDel(rawKey(key), hks)));
}
@Override
@@ -73,7 +73,7 @@ public Mono hasKey(H key, Object hashKey) {
Assert.notNull(key, "Key must not be null");
Assert.notNull(hashKey, "Hash key must not be null");
- return createMono(connection -> connection.hExists(rawKey(key), rawHashKey((HK) hashKey)));
+ return createMono(hashCommands -> hashCommands.hExists(rawKey(key), rawHashKey((HK) hashKey)));
}
@Override
@@ -83,7 +83,8 @@ public Mono get(H key, Object hashKey) {
Assert.notNull(key, "Key must not be null");
Assert.notNull(hashKey, "Hash key must not be null");
- return createMono(connection -> connection.hGet(rawKey(key), rawHashKey((HK) hashKey)).map(this::readHashValue));
+ return createMono(hashCommands ->
+ hashCommands.hGet(rawKey(key), rawHashKey((HK) hashKey)).map(this::readHashValue));
}
@Override
@@ -93,10 +94,10 @@ public Mono> multiGet(H key, Collection hashKeys) {
Assert.notNull(hashKeys, "Hash keys must not be null");
Assert.notEmpty(hashKeys, "Hash keys must not be empty");
- return createMono(connection -> Flux.fromIterable(hashKeys) //
+ return createMono(hashCommands -> Flux.fromIterable(hashKeys) //
.map(this::rawHashKey) //
.collectList() //
- .flatMap(hks -> connection.hMGet(rawKey(key), hks)).map(this::deserializeHashValues));
+ .flatMap(hks -> hashCommands.hMGet(rawKey(key), hks)).map(this::deserializeHashValues));
}
@Override
@@ -162,7 +163,7 @@ public Flux keys(H key) {
Assert.notNull(key, "Key must not be null");
- return createFlux(connection -> connection.hKeys(rawKey(key)) //
+ return createFlux(hashCommands -> hashCommands.hKeys(rawKey(key)) //
.map(this::readHashKey));
}
@@ -171,7 +172,7 @@ public Mono size(H key) {
Assert.notNull(key, "Key must not be null");
- return createMono(connection -> connection.hLen(rawKey(key)));
+ return createMono(hashCommands -> hashCommands.hLen(rawKey(key)));
}
@Override
@@ -180,9 +181,9 @@ public Mono putAll(H key, Map extends HK, ? extends HV> map) {
Assert.notNull(key, "Key must not be null");
Assert.notNull(map, "Map must not be null");
- return createMono(connection -> Flux.fromIterable(() -> map.entrySet().iterator()) //
+ return createMono(hashCommands -> Flux.fromIterable(() -> map.entrySet().iterator()) //
.collectMap(entry -> rawHashKey(entry.getKey()), entry -> rawHashValue(entry.getValue())) //
- .flatMap(serialized -> connection.hMSet(rawKey(key), serialized)));
+ .flatMap(serialized -> hashCommands.hMSet(rawKey(key), serialized)));
}
@Override
@@ -192,7 +193,7 @@ public Mono put(H key, HK hashKey, HV value) {
Assert.notNull(hashKey, "Hash key must not be null");
Assert.notNull(value, "Hash value must not be null");
- return createMono(connection -> connection.hSet(rawKey(key), rawHashKey(hashKey), rawHashValue(value)));
+ return createMono(hashCommands -> hashCommands.hSet(rawKey(key), rawHashKey(hashKey), rawHashValue(value)));
}
@Override
@@ -202,7 +203,7 @@ public Mono putIfAbsent(H key, HK hashKey, HV value) {
Assert.notNull(hashKey, "Hash key must not be null");
Assert.notNull(value, "Hash value must not be null");
- return createMono(connection -> connection.hSetNX(rawKey(key), rawHashKey(hashKey), rawHashValue(value)));
+ return createMono(hashCommands -> hashCommands.hSetNX(rawKey(key), rawHashKey(hashKey), rawHashValue(value)));
}
@Override
@@ -210,7 +211,7 @@ public Flux values(H key) {
Assert.notNull(key, "Key must not be null");
- return createFlux(connection -> connection.hVals(rawKey(key)) //
+ return createFlux(hashCommands -> hashCommands.hVals(rawKey(key)) //
.map(this::readHashValue));
}
@@ -219,7 +220,7 @@ public Flux> entries(H key) {
Assert.notNull(key, "Key must not be null");
- return createFlux(connection -> connection.hGetAll(rawKey(key)) //
+ return createFlux(hashCommands -> hashCommands.hGetAll(rawKey(key)) //
.map(this::deserializeHashEntry));
}
@@ -229,7 +230,7 @@ public Flux> scan(H key, ScanOptions options) {
Assert.notNull(key, "Key must not be null");
Assert.notNull(key, "ScanOptions must not be null");
- return createFlux(connection -> connection.hScan(rawKey(key), options) //
+ return createFlux(hashCommands -> hashCommands.hScan(rawKey(key), options) //
.map(this::deserializeHashEntry));
}
diff --git a/src/main/java/org/springframework/data/redis/core/DefaultReactiveHyperLogLogOperations.java b/src/main/java/org/springframework/data/redis/core/DefaultReactiveHyperLogLogOperations.java
index fb43069cec..4c3258a2eb 100644
--- a/src/main/java/org/springframework/data/redis/core/DefaultReactiveHyperLogLogOperations.java
+++ b/src/main/java/org/springframework/data/redis/core/DefaultReactiveHyperLogLogOperations.java
@@ -53,10 +53,10 @@ public final Mono add(K key, V... values) {
Assert.notEmpty(values, "Values must not be null or empty");
Assert.noNullElements(values, "Values must not contain null elements");
- return createMono(connection -> Flux.fromArray(values) //
+ return createMono(hyperLogLogCommands -> Flux.fromArray(values) //
.map(this::rawValue) //
.collectList() //
- .flatMap(serializedValues -> connection.pfAdd(rawKey(key), serializedValues)));
+ .flatMap(serializedValues -> hyperLogLogCommands.pfAdd(rawKey(key), serializedValues)));
}
@Override
@@ -66,10 +66,10 @@ public final Mono size(K... keys) {
Assert.notEmpty(keys, "Keys must not be null or empty");
Assert.noNullElements(keys, "Keys must not contain null elements");
- return createMono(connection -> Flux.fromArray(keys) //
+ return createMono(hyperLogLogCommands -> Flux.fromArray(keys) //
.map(this::rawKey) //
.collectList() //
- .flatMap(connection::pfCount));
+ .flatMap(hyperLogLogCommands::pfCount));
}
@Override
@@ -80,10 +80,10 @@ public final Mono union(K destination, K... sourceKeys) {
Assert.notEmpty(sourceKeys, "Source keys must not be null or empty");
Assert.noNullElements(sourceKeys, "Source keys must not contain null elements");
- return createMono(connection -> Flux.fromArray(sourceKeys) //
+ return createMono(hyperLogLogCommands -> Flux.fromArray(sourceKeys) //
.map(this::rawKey) //
.collectList() //
- .flatMap(serialized -> connection.pfMerge(rawKey(destination), serialized)));
+ .flatMap(serialized -> hyperLogLogCommands.pfMerge(rawKey(destination), serialized)));
}
@Override
diff --git a/src/main/java/org/springframework/data/redis/core/DefaultReactiveListOperations.java b/src/main/java/org/springframework/data/redis/core/DefaultReactiveListOperations.java
index 20b7b08dfd..231c188d27 100644
--- a/src/main/java/org/springframework/data/redis/core/DefaultReactiveListOperations.java
+++ b/src/main/java/org/springframework/data/redis/core/DefaultReactiveListOperations.java
@@ -58,7 +58,7 @@ public Flux range(K key, long start, long end) {
Assert.notNull(key, "Key must not be null");
- return createFlux(connection -> connection.lRange(rawKey(key), start, end).map(this::readValue));
+ return createFlux(listCommands -> listCommands.lRange(rawKey(key), start, end).map(this::readValue));
}
@Override
@@ -66,7 +66,7 @@ public Mono trim(K key, long start, long end) {
Assert.notNull(key, "Key must not be null");
- return createMono(connection -> connection.lTrim(rawKey(key), start, end));
+ return createMono(listCommands -> listCommands.lTrim(rawKey(key), start, end));
}
@Override
@@ -74,7 +74,7 @@ public Mono size(K key) {
Assert.notNull(key, "Key must not be null");
- return createMono(connection -> connection.lLen(rawKey(key)));
+ return createMono(listCommands -> listCommands.lLen(rawKey(key)));
}
@Override
@@ -97,10 +97,10 @@ public Mono leftPushAll(K key, Collection values) {
Assert.notNull(key, "Key must not be null");
Assert.notEmpty(values, "Values must not be null or empty");
- return createMono(connection -> Flux.fromIterable(values) //
+ return createMono(listCommands -> Flux.fromIterable(values) //
.map(this::rawValue) //
.collectList() //
- .flatMap(serialized -> connection.lPush(rawKey(key), serialized)));
+ .flatMap(serialized -> listCommands.lPush(rawKey(key), serialized)));
}
@Override
@@ -108,7 +108,7 @@ public Mono leftPushIfPresent(K key, V value) {
Assert.notNull(key, "Key must not be null");
- return createMono(connection -> connection.lPushX(rawKey(key), rawValue(value)));
+ return createMono(listCommands -> listCommands.lPushX(rawKey(key), rawValue(value)));
}
@Override
@@ -116,7 +116,8 @@ public Mono leftPush(K key, V pivot, V value) {
Assert.notNull(key, "Key must not be null");
- return createMono(connection -> connection.lInsert(rawKey(key), Position.BEFORE, rawValue(pivot), rawValue(value)));
+ return createMono(listCommands ->
+ listCommands.lInsert(rawKey(key), Position.BEFORE, rawValue(pivot), rawValue(value)));
}
@Override
@@ -139,10 +140,10 @@ public Mono rightPushAll(K key, Collection values) {
Assert.notNull(key, "Key must not be null");
Assert.notEmpty(values, "Values must not be null or empty");
- return createMono(connection -> Flux.fromIterable(values) //
+ return createMono(listCommands -> Flux.fromIterable(values) //
.map(this::rawValue) //
.collectList() //
- .flatMap(serialized -> connection.rPush(rawKey(key), serialized)));
+ .flatMap(serialized -> listCommands.rPush(rawKey(key), serialized)));
}
@Override
@@ -150,7 +151,7 @@ public Mono rightPushIfPresent(K key, V value) {
Assert.notNull(key, "Key must not be null");
- return createMono(connection -> connection.rPushX(rawKey(key), rawValue(value)));
+ return createMono(listCommands -> listCommands.rPushX(rawKey(key), rawValue(value)));
}
@Override
@@ -158,7 +159,8 @@ public Mono rightPush(K key, V pivot, V value) {
Assert.notNull(key, "Key must not be null");
- return createMono(connection -> connection.lInsert(rawKey(key), Position.AFTER, rawValue(pivot), rawValue(value)));
+ return createMono(listCommands ->
+ listCommands.lInsert(rawKey(key), Position.AFTER, rawValue(pivot), rawValue(value)));
}
@Override
@@ -169,8 +171,8 @@ public Mono move(K sourceKey, Direction from, K destinationKey, Direction to)
Assert.notNull(from, "From direction must not be null");
Assert.notNull(to, "To direction must not be null");
- return createMono(
- connection -> connection.lMove(rawKey(sourceKey), rawKey(destinationKey), from, to).map(this::readValue));
+ return createMono(listCommands ->
+ listCommands.lMove(rawKey(sourceKey), rawKey(destinationKey), from, to).map(this::readValue));
}
@Override
@@ -182,8 +184,8 @@ public Mono move(K sourceKey, Direction from, K destinationKey, Direction to,
Assert.notNull(to, "To direction must not be null");
Assert.notNull(timeout, "Timeout must not be null");
- return createMono(connection -> connection.bLMove(rawKey(sourceKey), rawKey(destinationKey), from, to, timeout)
- .map(this::readValue));
+ return createMono(listCommands ->
+ listCommands.bLMove(rawKey(sourceKey), rawKey(destinationKey), from, to, timeout).map(this::readValue));
}
@Override
@@ -191,7 +193,7 @@ public Mono set(K key, long index, V value) {
Assert.notNull(key, "Key must not be null");
- return createMono(connection -> connection.lSet(rawKey(key), index, rawValue(value)));
+ return createMono(listCommands -> listCommands.lSet(rawKey(key), index, rawValue(value)));
}
@Override
@@ -200,7 +202,7 @@ public Mono remove(K key, long count, Object value) {
Assert.notNull(key, "Key must not be null");
- return createMono(connection -> connection.lRem(rawKey(key), count, rawValue((V) value)));
+ return createMono(listCommands -> listCommands.lRem(rawKey(key), count, rawValue((V) value)));
}
@Override
@@ -208,7 +210,7 @@ public Mono index(K key, long index) {
Assert.notNull(key, "Key must not be null");
- return createMono(connection -> connection.lIndex(rawKey(key), index).map(this::readValue));
+ return createMono(listCommands -> listCommands.lIndex(rawKey(key), index).map(this::readValue));
}
@Override
@@ -216,7 +218,7 @@ public Mono indexOf(K key, V value) {
Assert.notNull(key, "Key must not be null");
- return createMono(connection -> connection.lPos(rawKey(key), rawValue(value)));
+ return createMono(listCommands -> listCommands.lPos(rawKey(key), rawValue(value)));
}
@Override
@@ -224,7 +226,8 @@ public Mono lastIndexOf(K key, V value) {
Assert.notNull(key, "Key must not be null");
- return createMono(connection -> connection.lPos(LPosCommand.lPosOf(rawValue(value)).from(rawKey(key)).rank(-1)));
+ return createMono(listCommands ->
+ listCommands.lPos(LPosCommand.lPosOf(rawValue(value)).from(rawKey(key)).rank(-1)));
}
@Override
@@ -232,7 +235,7 @@ public Mono leftPop(K key) {
Assert.notNull(key, "Key must not be null");
- return createMono(connection -> connection.lPop(rawKey(key)).map(this::readValue));
+ return createMono(listCommands -> listCommands.lPop(rawKey(key)).map(this::readValue));
}
@@ -243,8 +246,9 @@ public Mono leftPop(K key, Duration timeout) {
Assert.notNull(timeout, "Duration must not be null");
Assert.isTrue(isZeroOrGreater1Second(timeout), "Duration must be either zero or greater or equal to 1 second");
- return createMono(connection -> connection.blPop(Collections.singletonList(rawKey(key)), timeout)
- .map(popResult -> readValue(popResult.getValue())));
+ return createMono(listCommands ->
+ listCommands.blPop(Collections.singletonList(rawKey(key)), timeout)
+ .map(popResult -> readValue(popResult.getValue())));
}
@Override
@@ -252,7 +256,7 @@ public Mono rightPop(K key) {
Assert.notNull(key, "Key must not be null");
- return createMono(connection -> connection.rPop(rawKey(key)).map(this::readValue));
+ return createMono(listCommands -> listCommands.rPop(rawKey(key)).map(this::readValue));
}
@Override
@@ -262,8 +266,9 @@ public Mono rightPop(K key, Duration timeout) {
Assert.notNull(timeout, "Duration must not be null");
Assert.isTrue(isZeroOrGreater1Second(timeout), "Duration must be either zero or greater or equal to 1 second");
- return createMono(connection -> connection.brPop(Collections.singletonList(rawKey(key)), timeout)
- .map(popResult -> readValue(popResult.getValue())));
+ return createMono(listCommands ->
+ listCommands.brPop(Collections.singletonList(rawKey(key)), timeout)
+ .map(popResult -> readValue(popResult.getValue())));
}
@Override
@@ -272,8 +277,8 @@ public Mono rightPopAndLeftPush(K sourceKey, K destinationKey) {
Assert.notNull(sourceKey, "Source key must not be null");
Assert.notNull(destinationKey, "Destination key must not be null");
- return createMono(
- connection -> connection.rPopLPush(rawKey(sourceKey), rawKey(destinationKey)).map(this::readValue));
+ return createMono(listCommands ->
+ listCommands.rPopLPush(rawKey(sourceKey), rawKey(destinationKey)).map(this::readValue));
}
@Override
@@ -284,8 +289,8 @@ public Mono rightPopAndLeftPush(K sourceKey, K destinationKey, Duration timeo
Assert.notNull(timeout, "Duration must not be null");
Assert.isTrue(isZeroOrGreater1Second(timeout), "Duration must be either zero or greater or equal to 1 second");
- return createMono(
- connection -> connection.bRPopLPush(rawKey(sourceKey), rawKey(destinationKey), timeout).map(this::readValue));
+ return createMono(listCommands ->
+ listCommands.bRPopLPush(rawKey(sourceKey), rawKey(destinationKey), timeout).map(this::readValue));
}
@Override
diff --git a/src/main/java/org/springframework/data/redis/core/DefaultReactiveSetOperations.java b/src/main/java/org/springframework/data/redis/core/DefaultReactiveSetOperations.java
index d0bd9acc4b..218b93eb97 100644
--- a/src/main/java/org/springframework/data/redis/core/DefaultReactiveSetOperations.java
+++ b/src/main/java/org/springframework/data/redis/core/DefaultReactiveSetOperations.java
@@ -53,18 +53,19 @@ class DefaultReactiveSetOperations implements ReactiveSetOperations
}
@Override
+ @SuppressWarnings("unchecked")
public Mono add(K key, V... values) {
Assert.notNull(key, "Key must not be null");
if (values.length == 1) {
- return createMono(connection -> connection.sAdd(rawKey(key), rawValue(values[0])));
+ return createMono(setCommands -> setCommands.sAdd(rawKey(key), rawValue(values[0])));
}
- return createMono(connection -> Flux.fromArray(values) //
+ return createMono(setCommands -> Flux.fromArray(values) //
.map(this::rawValue) //
.collectList() //
- .flatMap(serialized -> connection.sAdd(rawKey(key), serialized)));
+ .flatMap(serialized -> setCommands.sAdd(rawKey(key), serialized)));
}
@Override
@@ -74,13 +75,13 @@ public Mono remove(K key, Object... values) {
Assert.notNull(key, "Key must not be null");
if (values.length == 1) {
- return createMono(connection -> connection.sRem(rawKey(key), rawValue((V) values[0])));
+ return createMono(setCommands -> setCommands.sRem(rawKey(key), rawValue((V) values[0])));
}
- return createMono(connection -> Flux.fromArray((V[]) values) //
+ return createMono(setCommands -> Flux.fromArray((V[]) values) //
.map(this::rawValue) //
.collectList() //
- .flatMap(serialized -> connection.sRem(rawKey(key), serialized)));
+ .flatMap(serialized -> setCommands.sRem(rawKey(key), serialized)));
}
@Override
@@ -88,7 +89,7 @@ public Mono pop(K key) {
Assert.notNull(key, "Key must not be null");
- return createMono(connection -> connection.sPop(rawKey(key)).map(this::readValue));
+ return createMono(setCommands -> setCommands.sPop(rawKey(key)).map(this::readValue));
}
@Override
@@ -96,7 +97,7 @@ public Flux pop(K key, long count) {
Assert.notNull(key, "Key must not be null");
- return createFlux(connection -> connection.sPop(rawKey(key), count).map(this::readValue));
+ return createFlux(setCommands -> setCommands.sPop(rawKey(key), count).map(this::readValue));
}
@Override
@@ -105,7 +106,7 @@ public Mono move(K sourceKey, V value, K destKey) {
Assert.notNull(sourceKey, "Source key must not be null");
Assert.notNull(destKey, "Destination key must not be null");
- return createMono(connection -> connection.sMove(rawKey(sourceKey), rawKey(destKey), rawValue(value)));
+ return createMono(setCommands -> setCommands.sMove(rawKey(sourceKey), rawKey(destKey), rawValue(value)));
}
@Override
@@ -113,7 +114,7 @@ public Mono size(K key) {
Assert.notNull(key, "Key must not be null");
- return createMono(connection -> connection.sCard(rawKey(key)));
+ return createMono(setCommands -> setCommands.sCard(rawKey(key)));
}
@Override
@@ -122,31 +123,29 @@ public Mono isMember(K key, Object o) {
Assert.notNull(key, "Key must not be null");
- return createMono(connection -> connection.sIsMember(rawKey(key), rawValue((V) o)));
+ return createMono(setCommands -> setCommands.sIsMember(rawKey(key), rawValue((V) o)));
}
@Override
+ @SuppressWarnings("unchecked")
public Mono