diff --git a/pom.xml b/pom.xml index a67dc48660..5c09eb588c 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ org.springframework.data spring-data-redis - 3.2.0-SNAPSHOT + 3.2.0-GH-2658-SNAPSHOT Spring Data Redis Spring Data module for Redis diff --git a/src/main/java/org/springframework/data/redis/core/DefaultReactiveGeoOperations.java b/src/main/java/org/springframework/data/redis/core/DefaultReactiveGeoOperations.java index b14b5d58c0..cdca3be9a1 100644 --- a/src/main/java/org/springframework/data/redis/core/DefaultReactiveGeoOperations.java +++ b/src/main/java/org/springframework/data/redis/core/DefaultReactiveGeoOperations.java @@ -68,7 +68,7 @@ public Mono add(K key, Point point, V member) { Assert.notNull(point, "Point must not be null"); Assert.notNull(member, "Member must not be null"); - return createMono(connection -> connection.geoAdd(rawKey(key), point, rawValue(member))); + return createMono(geoCommands -> geoCommands.geoAdd(rawKey(key), point, rawValue(member))); } @Override @@ -77,7 +77,7 @@ public Mono add(K key, GeoLocation location) { Assert.notNull(key, "Key must not be null"); Assert.notNull(location, "GeoLocation must not be null"); - return createMono(connection -> connection.geoAdd(rawKey(key), + return createMono(geoCommands -> geoCommands.geoAdd(rawKey(key), new GeoLocation<>(rawValue(location.getName()), location.getPoint()))); } @@ -87,13 +87,13 @@ public Mono add(K key, Map memberCoordinateMap) { Assert.notNull(key, "Key must not be null"); Assert.notNull(memberCoordinateMap, "MemberCoordinateMap must not be null"); - return createMono(connection -> { + return createMono(geoCommands -> { Mono>> serializedList = Flux .fromIterable(() -> memberCoordinateMap.entrySet().iterator()) .map(entry -> new GeoLocation<>(rawValue(entry.getKey()), entry.getValue())).collectList(); - return serializedList.flatMap(list -> connection.geoAdd(rawKey(key), list)); + return serializedList.flatMap(list -> geoCommands.geoAdd(rawKey(key), list)); }); } @@ -103,12 +103,12 @@ public Mono add(K key, Iterable> geoLocations) { Assert.notNull(key, "Key must not be null"); Assert.notNull(geoLocations, "GeoLocations must not be null"); - return createMono(connection -> { + return createMono(geoCommands -> { Mono>> serializedList = Flux.fromIterable(geoLocations) .map(location -> new GeoLocation<>(rawValue(location.getName()), location.getPoint())).collectList(); - return serializedList.flatMap(list -> connection.geoAdd(rawKey(key), list)); + return serializedList.flatMap(list -> geoCommands.geoAdd(rawKey(key), list)); }); } @@ -118,11 +118,11 @@ public Flux add(K key, Publisher>> loc Assert.notNull(key, "Key must not be null"); Assert.notNull(locations, "Locations must not be null"); - return createFlux(connection -> Flux.from(locations) + return createFlux(geoCommands -> Flux.from(locations) .map(locationList -> locationList.stream() .map(location -> new GeoLocation<>(rawValue(location.getName()), location.getPoint())) .collect(Collectors.toList())) - .flatMap(list -> connection.geoAdd(rawKey(key), list))); + .flatMap(list -> geoCommands.geoAdd(rawKey(key), list))); } @Override @@ -132,7 +132,7 @@ public Mono distance(K key, V member1, V member2) { Assert.notNull(member1, "Member 1 must not be null"); Assert.notNull(member2, "Member 2 must not be null"); - return createMono(connection -> connection.geoDist(rawKey(key), rawValue(member1), rawValue(member2))); + return createMono(geoCommands -> geoCommands.geoDist(rawKey(key), rawValue(member1), rawValue(member2))); } @Override @@ -143,7 +143,7 @@ public Mono distance(K key, V member1, V member2, Metric metric) { Assert.notNull(member2, "Member 2 must not be null"); Assert.notNull(metric, "Metric must not be null"); - return createMono(connection -> connection.geoDist(rawKey(key), rawValue(member1), rawValue(member2), metric)); + return createMono(geoCommands -> geoCommands.geoDist(rawKey(key), rawValue(member1), rawValue(member2), metric)); } @Override @@ -152,7 +152,7 @@ public Mono hash(K key, V member) { Assert.notNull(key, "Key must not be null"); Assert.notNull(member, "Member must not be null"); - return createMono(connection -> connection.geoHash(rawKey(key), rawValue(member))); + return createMono(geoCommands -> geoCommands.geoHash(rawKey(key), rawValue(member))); } @Override @@ -163,10 +163,10 @@ public final Mono> hash(K key, V... members) { Assert.notEmpty(members, "Members must not be null or empty"); Assert.noNullElements(members, "Members must not contain null elements"); - return createMono(connection -> Flux.fromArray(members) // + return createMono(geoCommands -> Flux.fromArray(members) // .map(this::rawValue) // .collectList() // - .flatMap(serialized -> connection.geoHash(rawKey(key), serialized))); + .flatMap(serialized -> geoCommands.geoHash(rawKey(key), serialized))); } @Override @@ -175,7 +175,7 @@ public Mono position(K key, V member) { Assert.notNull(key, "Key must not be null"); Assert.notNull(member, "Member must not be null"); - return createMono(connection -> connection.geoPos(rawKey(key), rawValue(member))); + return createMono(geoCommands -> geoCommands.geoPos(rawKey(key), rawValue(member))); } @Override @@ -186,10 +186,10 @@ public final Mono> position(K key, V... members) { Assert.notEmpty(members, "Members must not be null or empty"); Assert.noNullElements(members, "Members must not contain null elements"); - return createMono(connection -> Flux.fromArray(members) // + return createMono(geoCommands -> Flux.fromArray(members) // .map(this::rawValue) // .collectList() // - .flatMap(serialized -> connection.geoPos(rawKey(key), serialized))); + .flatMap(serialized -> geoCommands.geoPos(rawKey(key), serialized))); } @Override @@ -198,7 +198,7 @@ public Flux>> radius(K key, Circle within) { Assert.notNull(key, "Key must not be null"); Assert.notNull(within, "Circle must not be null"); - return createFlux(connection -> connection.geoRadius(rawKey(key), within).map(this::readGeoResult)); + return createFlux(geoCommands -> geoCommands.geoRadius(rawKey(key), within).map(this::readGeoResult)); } @Override @@ -208,7 +208,7 @@ public Flux>> radius(K key, Circle within, GeoRadiusCom Assert.notNull(within, "Circle must not be null"); Assert.notNull(args, "GeoRadiusCommandArgs must not be null"); - return createFlux(connection -> connection.geoRadius(rawKey(key), within, args) // + return createFlux(geoCommands -> geoCommands.geoRadius(rawKey(key), within, args) // .map(this::readGeoResult)); } @@ -218,8 +218,9 @@ public Flux>> radius(K key, V member, double radius) { Assert.notNull(key, "Key must not be null"); Assert.notNull(member, "Member must not be null"); - return createFlux(connection -> connection.geoRadiusByMember(rawKey(key), rawValue(member), new Distance(radius)) // - .map(this::readGeoResult)); + return createFlux(geoCommands -> + geoCommands.geoRadiusByMember(rawKey(key), rawValue(member), new Distance(radius)) // + .map(this::readGeoResult)); } @Override @@ -229,7 +230,7 @@ public Flux>> radius(K key, V member, Distance distance Assert.notNull(member, "Member must not be null"); Assert.notNull(distance, "Distance must not be null"); - return createFlux(connection -> connection.geoRadiusByMember(rawKey(key), rawValue(member), distance) // + return createFlux(geoCommands -> geoCommands.geoRadiusByMember(rawKey(key), rawValue(member), distance) // .map(this::readGeoResult)); } @@ -241,7 +242,7 @@ public Flux>> radius(K key, V member, Distance distance Assert.notNull(distance, "Distance must not be null"); Assert.notNull(args, "GeoRadiusCommandArgs must not be null"); - return createFlux(connection -> connection.geoRadiusByMember(rawKey(key), rawValue(member), distance, args)) + return createFlux(geoCommands -> geoCommands.geoRadiusByMember(rawKey(key), rawValue(member), distance, args)) .map(this::readGeoResult); } @@ -308,8 +309,7 @@ private Flux createFlux(Function> funct @SuppressWarnings("unchecked") private GeoReference getGeoReference(GeoReference reference) { return reference instanceof GeoReference.GeoMemberReference - ? GeoReference - .fromMember(rawValue(((GeoMemberReference) reference).getMember())) + ? GeoReference.fromMember(rawValue(((GeoMemberReference) reference).getMember())) : (GeoReference) reference; } @@ -327,7 +327,7 @@ private V readValue(ByteBuffer buffer) { private GeoResult> readGeoResult(GeoResult> source) { - return new GeoResult<>(new GeoLocation(readValue(source.getContent().getName()), source.getContent().getPoint()), + return new GeoResult<>(new GeoLocation<>(readValue(source.getContent().getName()), source.getContent().getPoint()), source.getDistance()); } } diff --git a/src/main/java/org/springframework/data/redis/core/DefaultReactiveHashOperations.java b/src/main/java/org/springframework/data/redis/core/DefaultReactiveHashOperations.java index 21cab4155a..db9a74bbb9 100644 --- a/src/main/java/org/springframework/data/redis/core/DefaultReactiveHashOperations.java +++ b/src/main/java/org/springframework/data/redis/core/DefaultReactiveHashOperations.java @@ -60,10 +60,10 @@ public Mono remove(H key, Object... hashKeys) { Assert.notEmpty(hashKeys, "Hash keys must not be empty"); Assert.noNullElements(hashKeys, "Hash keys must not contain null elements"); - return createMono(connection -> Flux.fromArray(hashKeys) // + return createMono(hashCommands -> Flux.fromArray(hashKeys) // .map(o -> (HK) o).map(this::rawHashKey) // .collectList() // - .flatMap(hks -> connection.hDel(rawKey(key), hks))); + .flatMap(hks -> hashCommands.hDel(rawKey(key), hks))); } @Override @@ -73,7 +73,7 @@ public Mono hasKey(H key, Object hashKey) { Assert.notNull(key, "Key must not be null"); Assert.notNull(hashKey, "Hash key must not be null"); - return createMono(connection -> connection.hExists(rawKey(key), rawHashKey((HK) hashKey))); + return createMono(hashCommands -> hashCommands.hExists(rawKey(key), rawHashKey((HK) hashKey))); } @Override @@ -83,7 +83,8 @@ public Mono get(H key, Object hashKey) { Assert.notNull(key, "Key must not be null"); Assert.notNull(hashKey, "Hash key must not be null"); - return createMono(connection -> connection.hGet(rawKey(key), rawHashKey((HK) hashKey)).map(this::readHashValue)); + return createMono(hashCommands -> + hashCommands.hGet(rawKey(key), rawHashKey((HK) hashKey)).map(this::readHashValue)); } @Override @@ -93,10 +94,10 @@ public Mono> multiGet(H key, Collection hashKeys) { Assert.notNull(hashKeys, "Hash keys must not be null"); Assert.notEmpty(hashKeys, "Hash keys must not be empty"); - return createMono(connection -> Flux.fromIterable(hashKeys) // + return createMono(hashCommands -> Flux.fromIterable(hashKeys) // .map(this::rawHashKey) // .collectList() // - .flatMap(hks -> connection.hMGet(rawKey(key), hks)).map(this::deserializeHashValues)); + .flatMap(hks -> hashCommands.hMGet(rawKey(key), hks)).map(this::deserializeHashValues)); } @Override @@ -162,7 +163,7 @@ public Flux keys(H key) { Assert.notNull(key, "Key must not be null"); - return createFlux(connection -> connection.hKeys(rawKey(key)) // + return createFlux(hashCommands -> hashCommands.hKeys(rawKey(key)) // .map(this::readHashKey)); } @@ -171,7 +172,7 @@ public Mono size(H key) { Assert.notNull(key, "Key must not be null"); - return createMono(connection -> connection.hLen(rawKey(key))); + return createMono(hashCommands -> hashCommands.hLen(rawKey(key))); } @Override @@ -180,9 +181,9 @@ public Mono putAll(H key, Map map) { Assert.notNull(key, "Key must not be null"); Assert.notNull(map, "Map must not be null"); - return createMono(connection -> Flux.fromIterable(() -> map.entrySet().iterator()) // + return createMono(hashCommands -> Flux.fromIterable(() -> map.entrySet().iterator()) // .collectMap(entry -> rawHashKey(entry.getKey()), entry -> rawHashValue(entry.getValue())) // - .flatMap(serialized -> connection.hMSet(rawKey(key), serialized))); + .flatMap(serialized -> hashCommands.hMSet(rawKey(key), serialized))); } @Override @@ -192,7 +193,7 @@ public Mono put(H key, HK hashKey, HV value) { Assert.notNull(hashKey, "Hash key must not be null"); Assert.notNull(value, "Hash value must not be null"); - return createMono(connection -> connection.hSet(rawKey(key), rawHashKey(hashKey), rawHashValue(value))); + return createMono(hashCommands -> hashCommands.hSet(rawKey(key), rawHashKey(hashKey), rawHashValue(value))); } @Override @@ -202,7 +203,7 @@ public Mono putIfAbsent(H key, HK hashKey, HV value) { Assert.notNull(hashKey, "Hash key must not be null"); Assert.notNull(value, "Hash value must not be null"); - return createMono(connection -> connection.hSetNX(rawKey(key), rawHashKey(hashKey), rawHashValue(value))); + return createMono(hashCommands -> hashCommands.hSetNX(rawKey(key), rawHashKey(hashKey), rawHashValue(value))); } @Override @@ -210,7 +211,7 @@ public Flux values(H key) { Assert.notNull(key, "Key must not be null"); - return createFlux(connection -> connection.hVals(rawKey(key)) // + return createFlux(hashCommands -> hashCommands.hVals(rawKey(key)) // .map(this::readHashValue)); } @@ -219,7 +220,7 @@ public Flux> entries(H key) { Assert.notNull(key, "Key must not be null"); - return createFlux(connection -> connection.hGetAll(rawKey(key)) // + return createFlux(hashCommands -> hashCommands.hGetAll(rawKey(key)) // .map(this::deserializeHashEntry)); } @@ -229,7 +230,7 @@ public Flux> scan(H key, ScanOptions options) { Assert.notNull(key, "Key must not be null"); Assert.notNull(key, "ScanOptions must not be null"); - return createFlux(connection -> connection.hScan(rawKey(key), options) // + return createFlux(hashCommands -> hashCommands.hScan(rawKey(key), options) // .map(this::deserializeHashEntry)); } diff --git a/src/main/java/org/springframework/data/redis/core/DefaultReactiveHyperLogLogOperations.java b/src/main/java/org/springframework/data/redis/core/DefaultReactiveHyperLogLogOperations.java index fb43069cec..4c3258a2eb 100644 --- a/src/main/java/org/springframework/data/redis/core/DefaultReactiveHyperLogLogOperations.java +++ b/src/main/java/org/springframework/data/redis/core/DefaultReactiveHyperLogLogOperations.java @@ -53,10 +53,10 @@ public final Mono add(K key, V... values) { Assert.notEmpty(values, "Values must not be null or empty"); Assert.noNullElements(values, "Values must not contain null elements"); - return createMono(connection -> Flux.fromArray(values) // + return createMono(hyperLogLogCommands -> Flux.fromArray(values) // .map(this::rawValue) // .collectList() // - .flatMap(serializedValues -> connection.pfAdd(rawKey(key), serializedValues))); + .flatMap(serializedValues -> hyperLogLogCommands.pfAdd(rawKey(key), serializedValues))); } @Override @@ -66,10 +66,10 @@ public final Mono size(K... keys) { Assert.notEmpty(keys, "Keys must not be null or empty"); Assert.noNullElements(keys, "Keys must not contain null elements"); - return createMono(connection -> Flux.fromArray(keys) // + return createMono(hyperLogLogCommands -> Flux.fromArray(keys) // .map(this::rawKey) // .collectList() // - .flatMap(connection::pfCount)); + .flatMap(hyperLogLogCommands::pfCount)); } @Override @@ -80,10 +80,10 @@ public final Mono union(K destination, K... sourceKeys) { Assert.notEmpty(sourceKeys, "Source keys must not be null or empty"); Assert.noNullElements(sourceKeys, "Source keys must not contain null elements"); - return createMono(connection -> Flux.fromArray(sourceKeys) // + return createMono(hyperLogLogCommands -> Flux.fromArray(sourceKeys) // .map(this::rawKey) // .collectList() // - .flatMap(serialized -> connection.pfMerge(rawKey(destination), serialized))); + .flatMap(serialized -> hyperLogLogCommands.pfMerge(rawKey(destination), serialized))); } @Override diff --git a/src/main/java/org/springframework/data/redis/core/DefaultReactiveListOperations.java b/src/main/java/org/springframework/data/redis/core/DefaultReactiveListOperations.java index 20b7b08dfd..231c188d27 100644 --- a/src/main/java/org/springframework/data/redis/core/DefaultReactiveListOperations.java +++ b/src/main/java/org/springframework/data/redis/core/DefaultReactiveListOperations.java @@ -58,7 +58,7 @@ public Flux range(K key, long start, long end) { Assert.notNull(key, "Key must not be null"); - return createFlux(connection -> connection.lRange(rawKey(key), start, end).map(this::readValue)); + return createFlux(listCommands -> listCommands.lRange(rawKey(key), start, end).map(this::readValue)); } @Override @@ -66,7 +66,7 @@ public Mono trim(K key, long start, long end) { Assert.notNull(key, "Key must not be null"); - return createMono(connection -> connection.lTrim(rawKey(key), start, end)); + return createMono(listCommands -> listCommands.lTrim(rawKey(key), start, end)); } @Override @@ -74,7 +74,7 @@ public Mono size(K key) { Assert.notNull(key, "Key must not be null"); - return createMono(connection -> connection.lLen(rawKey(key))); + return createMono(listCommands -> listCommands.lLen(rawKey(key))); } @Override @@ -97,10 +97,10 @@ public Mono leftPushAll(K key, Collection values) { Assert.notNull(key, "Key must not be null"); Assert.notEmpty(values, "Values must not be null or empty"); - return createMono(connection -> Flux.fromIterable(values) // + return createMono(listCommands -> Flux.fromIterable(values) // .map(this::rawValue) // .collectList() // - .flatMap(serialized -> connection.lPush(rawKey(key), serialized))); + .flatMap(serialized -> listCommands.lPush(rawKey(key), serialized))); } @Override @@ -108,7 +108,7 @@ public Mono leftPushIfPresent(K key, V value) { Assert.notNull(key, "Key must not be null"); - return createMono(connection -> connection.lPushX(rawKey(key), rawValue(value))); + return createMono(listCommands -> listCommands.lPushX(rawKey(key), rawValue(value))); } @Override @@ -116,7 +116,8 @@ public Mono leftPush(K key, V pivot, V value) { Assert.notNull(key, "Key must not be null"); - return createMono(connection -> connection.lInsert(rawKey(key), Position.BEFORE, rawValue(pivot), rawValue(value))); + return createMono(listCommands -> + listCommands.lInsert(rawKey(key), Position.BEFORE, rawValue(pivot), rawValue(value))); } @Override @@ -139,10 +140,10 @@ public Mono rightPushAll(K key, Collection values) { Assert.notNull(key, "Key must not be null"); Assert.notEmpty(values, "Values must not be null or empty"); - return createMono(connection -> Flux.fromIterable(values) // + return createMono(listCommands -> Flux.fromIterable(values) // .map(this::rawValue) // .collectList() // - .flatMap(serialized -> connection.rPush(rawKey(key), serialized))); + .flatMap(serialized -> listCommands.rPush(rawKey(key), serialized))); } @Override @@ -150,7 +151,7 @@ public Mono rightPushIfPresent(K key, V value) { Assert.notNull(key, "Key must not be null"); - return createMono(connection -> connection.rPushX(rawKey(key), rawValue(value))); + return createMono(listCommands -> listCommands.rPushX(rawKey(key), rawValue(value))); } @Override @@ -158,7 +159,8 @@ public Mono rightPush(K key, V pivot, V value) { Assert.notNull(key, "Key must not be null"); - return createMono(connection -> connection.lInsert(rawKey(key), Position.AFTER, rawValue(pivot), rawValue(value))); + return createMono(listCommands -> + listCommands.lInsert(rawKey(key), Position.AFTER, rawValue(pivot), rawValue(value))); } @Override @@ -169,8 +171,8 @@ public Mono move(K sourceKey, Direction from, K destinationKey, Direction to) Assert.notNull(from, "From direction must not be null"); Assert.notNull(to, "To direction must not be null"); - return createMono( - connection -> connection.lMove(rawKey(sourceKey), rawKey(destinationKey), from, to).map(this::readValue)); + return createMono(listCommands -> + listCommands.lMove(rawKey(sourceKey), rawKey(destinationKey), from, to).map(this::readValue)); } @Override @@ -182,8 +184,8 @@ public Mono move(K sourceKey, Direction from, K destinationKey, Direction to, Assert.notNull(to, "To direction must not be null"); Assert.notNull(timeout, "Timeout must not be null"); - return createMono(connection -> connection.bLMove(rawKey(sourceKey), rawKey(destinationKey), from, to, timeout) - .map(this::readValue)); + return createMono(listCommands -> + listCommands.bLMove(rawKey(sourceKey), rawKey(destinationKey), from, to, timeout).map(this::readValue)); } @Override @@ -191,7 +193,7 @@ public Mono set(K key, long index, V value) { Assert.notNull(key, "Key must not be null"); - return createMono(connection -> connection.lSet(rawKey(key), index, rawValue(value))); + return createMono(listCommands -> listCommands.lSet(rawKey(key), index, rawValue(value))); } @Override @@ -200,7 +202,7 @@ public Mono remove(K key, long count, Object value) { Assert.notNull(key, "Key must not be null"); - return createMono(connection -> connection.lRem(rawKey(key), count, rawValue((V) value))); + return createMono(listCommands -> listCommands.lRem(rawKey(key), count, rawValue((V) value))); } @Override @@ -208,7 +210,7 @@ public Mono index(K key, long index) { Assert.notNull(key, "Key must not be null"); - return createMono(connection -> connection.lIndex(rawKey(key), index).map(this::readValue)); + return createMono(listCommands -> listCommands.lIndex(rawKey(key), index).map(this::readValue)); } @Override @@ -216,7 +218,7 @@ public Mono indexOf(K key, V value) { Assert.notNull(key, "Key must not be null"); - return createMono(connection -> connection.lPos(rawKey(key), rawValue(value))); + return createMono(listCommands -> listCommands.lPos(rawKey(key), rawValue(value))); } @Override @@ -224,7 +226,8 @@ public Mono lastIndexOf(K key, V value) { Assert.notNull(key, "Key must not be null"); - return createMono(connection -> connection.lPos(LPosCommand.lPosOf(rawValue(value)).from(rawKey(key)).rank(-1))); + return createMono(listCommands -> + listCommands.lPos(LPosCommand.lPosOf(rawValue(value)).from(rawKey(key)).rank(-1))); } @Override @@ -232,7 +235,7 @@ public Mono leftPop(K key) { Assert.notNull(key, "Key must not be null"); - return createMono(connection -> connection.lPop(rawKey(key)).map(this::readValue)); + return createMono(listCommands -> listCommands.lPop(rawKey(key)).map(this::readValue)); } @@ -243,8 +246,9 @@ public Mono leftPop(K key, Duration timeout) { Assert.notNull(timeout, "Duration must not be null"); Assert.isTrue(isZeroOrGreater1Second(timeout), "Duration must be either zero or greater or equal to 1 second"); - return createMono(connection -> connection.blPop(Collections.singletonList(rawKey(key)), timeout) - .map(popResult -> readValue(popResult.getValue()))); + return createMono(listCommands -> + listCommands.blPop(Collections.singletonList(rawKey(key)), timeout) + .map(popResult -> readValue(popResult.getValue()))); } @Override @@ -252,7 +256,7 @@ public Mono rightPop(K key) { Assert.notNull(key, "Key must not be null"); - return createMono(connection -> connection.rPop(rawKey(key)).map(this::readValue)); + return createMono(listCommands -> listCommands.rPop(rawKey(key)).map(this::readValue)); } @Override @@ -262,8 +266,9 @@ public Mono rightPop(K key, Duration timeout) { Assert.notNull(timeout, "Duration must not be null"); Assert.isTrue(isZeroOrGreater1Second(timeout), "Duration must be either zero or greater or equal to 1 second"); - return createMono(connection -> connection.brPop(Collections.singletonList(rawKey(key)), timeout) - .map(popResult -> readValue(popResult.getValue()))); + return createMono(listCommands -> + listCommands.brPop(Collections.singletonList(rawKey(key)), timeout) + .map(popResult -> readValue(popResult.getValue()))); } @Override @@ -272,8 +277,8 @@ public Mono rightPopAndLeftPush(K sourceKey, K destinationKey) { Assert.notNull(sourceKey, "Source key must not be null"); Assert.notNull(destinationKey, "Destination key must not be null"); - return createMono( - connection -> connection.rPopLPush(rawKey(sourceKey), rawKey(destinationKey)).map(this::readValue)); + return createMono(listCommands -> + listCommands.rPopLPush(rawKey(sourceKey), rawKey(destinationKey)).map(this::readValue)); } @Override @@ -284,8 +289,8 @@ public Mono rightPopAndLeftPush(K sourceKey, K destinationKey, Duration timeo Assert.notNull(timeout, "Duration must not be null"); Assert.isTrue(isZeroOrGreater1Second(timeout), "Duration must be either zero or greater or equal to 1 second"); - return createMono( - connection -> connection.bRPopLPush(rawKey(sourceKey), rawKey(destinationKey), timeout).map(this::readValue)); + return createMono(listCommands -> + listCommands.bRPopLPush(rawKey(sourceKey), rawKey(destinationKey), timeout).map(this::readValue)); } @Override diff --git a/src/main/java/org/springframework/data/redis/core/DefaultReactiveSetOperations.java b/src/main/java/org/springframework/data/redis/core/DefaultReactiveSetOperations.java index d0bd9acc4b..218b93eb97 100644 --- a/src/main/java/org/springframework/data/redis/core/DefaultReactiveSetOperations.java +++ b/src/main/java/org/springframework/data/redis/core/DefaultReactiveSetOperations.java @@ -53,18 +53,19 @@ class DefaultReactiveSetOperations implements ReactiveSetOperations } @Override + @SuppressWarnings("unchecked") public Mono add(K key, V... values) { Assert.notNull(key, "Key must not be null"); if (values.length == 1) { - return createMono(connection -> connection.sAdd(rawKey(key), rawValue(values[0]))); + return createMono(setCommands -> setCommands.sAdd(rawKey(key), rawValue(values[0]))); } - return createMono(connection -> Flux.fromArray(values) // + return createMono(setCommands -> Flux.fromArray(values) // .map(this::rawValue) // .collectList() // - .flatMap(serialized -> connection.sAdd(rawKey(key), serialized))); + .flatMap(serialized -> setCommands.sAdd(rawKey(key), serialized))); } @Override @@ -74,13 +75,13 @@ public Mono remove(K key, Object... values) { Assert.notNull(key, "Key must not be null"); if (values.length == 1) { - return createMono(connection -> connection.sRem(rawKey(key), rawValue((V) values[0]))); + return createMono(setCommands -> setCommands.sRem(rawKey(key), rawValue((V) values[0]))); } - return createMono(connection -> Flux.fromArray((V[]) values) // + return createMono(setCommands -> Flux.fromArray((V[]) values) // .map(this::rawValue) // .collectList() // - .flatMap(serialized -> connection.sRem(rawKey(key), serialized))); + .flatMap(serialized -> setCommands.sRem(rawKey(key), serialized))); } @Override @@ -88,7 +89,7 @@ public Mono pop(K key) { Assert.notNull(key, "Key must not be null"); - return createMono(connection -> connection.sPop(rawKey(key)).map(this::readValue)); + return createMono(setCommands -> setCommands.sPop(rawKey(key)).map(this::readValue)); } @Override @@ -96,7 +97,7 @@ public Flux pop(K key, long count) { Assert.notNull(key, "Key must not be null"); - return createFlux(connection -> connection.sPop(rawKey(key), count).map(this::readValue)); + return createFlux(setCommands -> setCommands.sPop(rawKey(key), count).map(this::readValue)); } @Override @@ -105,7 +106,7 @@ public Mono move(K sourceKey, V value, K destKey) { Assert.notNull(sourceKey, "Source key must not be null"); Assert.notNull(destKey, "Destination key must not be null"); - return createMono(connection -> connection.sMove(rawKey(sourceKey), rawKey(destKey), rawValue(value))); + return createMono(setCommands -> setCommands.sMove(rawKey(sourceKey), rawKey(destKey), rawValue(value))); } @Override @@ -113,7 +114,7 @@ public Mono size(K key) { Assert.notNull(key, "Key must not be null"); - return createMono(connection -> connection.sCard(rawKey(key))); + return createMono(setCommands -> setCommands.sCard(rawKey(key))); } @Override @@ -122,31 +123,29 @@ public Mono isMember(K key, Object o) { Assert.notNull(key, "Key must not be null"); - return createMono(connection -> connection.sIsMember(rawKey(key), rawValue((V) o))); + return createMono(setCommands -> setCommands.sIsMember(rawKey(key), rawValue((V) o))); } @Override + @SuppressWarnings("unchecked") public Mono> isMember(K key, Object... objects) { Assert.notNull(key, "Key must not be null"); - return createMono(connection -> { - - return Flux.fromArray((V[]) objects) // - .map(this::rawValue) // - .collectList() // - .flatMap(rawValues -> connection.sMIsMember(rawKey(key), rawValues)) // - .map(result -> { + return createMono(setCommands -> Flux.fromArray((V[]) objects) // + .map(this::rawValue) // + .collectList() // + .flatMap(rawValues -> setCommands.sMIsMember(rawKey(key), rawValues)) // + .map(result -> { - Map isMember = new LinkedHashMap<>(result.size()); + Map isMember = new LinkedHashMap<>(result.size()); - for (int i = 0; i < objects.length; i++) { - isMember.put(objects[i], result.get(i)); - } + for (int i = 0; i < objects.length; i++) { + isMember.put(objects[i], result.get(i)); + } - return isMember; - }); - }); + return isMember; + })); } @Override @@ -172,10 +171,10 @@ public Flux intersect(Collection keys) { Assert.notNull(keys, "Keys must not be null"); - return createFlux(connection -> Flux.fromIterable(keys) // + return createFlux(setCommands -> Flux.fromIterable(keys) // .map(this::rawKey) // .collectList() // - .flatMapMany(connection::sInter) // + .flatMapMany(setCommands::sInter) // .map(this::readValue)); } @@ -205,10 +204,10 @@ public Mono intersectAndStore(Collection keys, K destKey) { Assert.notNull(keys, "Keys must not be null"); Assert.notNull(destKey, "Destination key must not be null"); - return createMono(connection -> Flux.fromIterable(keys) // + return createMono(setCommands -> Flux.fromIterable(keys) // .map(this::rawKey) // .collectList() // - .flatMap(rawKeys -> connection.sInterStore(rawKey(destKey), rawKeys))); + .flatMap(rawKeys -> setCommands.sInterStore(rawKey(destKey), rawKeys))); } @Override @@ -234,10 +233,10 @@ public Flux union(Collection keys) { Assert.notNull(keys, "Keys must not be null"); - return createFlux(connection -> Flux.fromIterable(keys) // + return createFlux(setCommands -> Flux.fromIterable(keys) // .map(this::rawKey) // .collectList() // - .flatMapMany(connection::sUnion) // + .flatMapMany(setCommands::sUnion) // .map(this::readValue)); } @@ -267,10 +266,10 @@ public Mono unionAndStore(Collection keys, K destKey) { Assert.notNull(keys, "Keys must not be null"); Assert.notNull(destKey, "Destination key must not be null"); - return createMono(connection -> Flux.fromIterable(keys) // + return createMono(setCommands -> Flux.fromIterable(keys) // .map(this::rawKey) // .collectList() // - .flatMap(rawKeys -> connection.sUnionStore(rawKey(destKey), rawKeys))); + .flatMap(rawKeys -> setCommands.sUnionStore(rawKey(destKey), rawKeys))); } @Override @@ -296,10 +295,10 @@ public Flux difference(Collection keys) { Assert.notNull(keys, "Keys must not be null"); - return createFlux(connection -> Flux.fromIterable(keys) // + return createFlux(setCommands -> Flux.fromIterable(keys) // .map(this::rawKey) // .collectList() // - .flatMapMany(connection::sDiff) // + .flatMapMany(setCommands::sDiff) // .map(this::readValue)); } @@ -329,10 +328,10 @@ public Mono differenceAndStore(Collection keys, K destKey) { Assert.notNull(keys, "Keys must not be null"); Assert.notNull(destKey, "Destination key must not be null"); - return createMono(connection -> Flux.fromIterable(keys) // + return createMono(setCommands -> Flux.fromIterable(keys) // .map(this::rawKey) // .collectList() // - .flatMap(rawKeys -> connection.sDiffStore(rawKey(destKey), rawKeys))); + .flatMap(rawKeys -> setCommands.sDiffStore(rawKey(destKey), rawKeys))); } @Override @@ -340,7 +339,7 @@ public Flux members(K key) { Assert.notNull(key, "Key must not be null"); - return createFlux(connection -> connection.sMembers(rawKey(key)).map(this::readValue)); + return createFlux(setCommands -> setCommands.sMembers(rawKey(key)).map(this::readValue)); } @Override @@ -349,7 +348,7 @@ public Flux scan(K key, ScanOptions options) { Assert.notNull(key, "Key must not be null"); Assert.notNull(options, "ScanOptions must not be null"); - return createFlux(connection -> connection.sScan(rawKey(key), options).map(this::readValue)); + return createFlux(setCommands -> setCommands.sScan(rawKey(key), options).map(this::readValue)); } @Override @@ -357,7 +356,7 @@ public Mono randomMember(K key) { Assert.notNull(key, "Key must not be null"); - return createMono(connection -> connection.sRandMember(rawKey(key)).map(this::readValue)); + return createMono(setCommands -> setCommands.sRandMember(rawKey(key)).map(this::readValue)); } @Override @@ -365,7 +364,7 @@ public Flux distinctRandomMembers(K key, long count) { Assert.isTrue(count > 0, "Negative count not supported; Use randomMembers to allow duplicate elements"); - return createFlux(connection -> connection.sRandMember(rawKey(key), count).map(this::readValue)); + return createFlux(setCommands -> setCommands.sRandMember(rawKey(key), count).map(this::readValue)); } @Override @@ -373,7 +372,7 @@ public Flux randomMembers(K key, long count) { Assert.isTrue(count > 0, "Use a positive number for count; This method is already allowing duplicate elements"); - return createFlux(connection -> connection.sRandMember(rawKey(key), -count).map(this::readValue)); + return createFlux(setCommands -> setCommands.sRandMember(rawKey(key), -count).map(this::readValue)); } @Override diff --git a/src/main/java/org/springframework/data/redis/core/DefaultReactiveStreamOperations.java b/src/main/java/org/springframework/data/redis/core/DefaultReactiveStreamOperations.java index 89c613622c..1244af69e3 100644 --- a/src/main/java/org/springframework/data/redis/core/DefaultReactiveStreamOperations.java +++ b/src/main/java/org/springframework/data/redis/core/DefaultReactiveStreamOperations.java @@ -81,7 +81,7 @@ class DefaultReactiveStreamOperations implements ReactiveStreamOperat if (objectMapper.isSimpleType(targetType) || ClassUtils.isAssignable(ByteBuffer.class, targetType)) { - return new HashMapper() { + return new HashMapper<>() { @Override public Map toHash(Object object) { @@ -101,7 +101,9 @@ public Map toHash(Object object) { @Override public Object fromHash(Map hash) { + Object value = hash.values().iterator().next(); + if (ClassUtils.isAssignableValue(targetType, value)) { return value; } @@ -130,7 +132,7 @@ public Mono acknowledge(K key, String group, RecordId... recordIds) { Assert.notNull(recordIds, "MessageIds must not be null"); Assert.notEmpty(recordIds, "MessageIds must not be empty"); - return createMono(connection -> connection.xAck(rawKey(key), group, recordIds)); + return createMono(streamCommands -> streamCommands.xAck(rawKey(key), group, recordIds)); } @Override @@ -141,13 +143,13 @@ public Mono add(Record record) { MapRecord input = StreamObjectMapper.toMapRecord(this, record); - return createMono(connection -> connection.xAdd(serializeRecord(input))); + return createMono(streamCommands -> streamCommands.xAdd(serializeRecord(input))); } @Override public Flux> claim(K key, String consumerGroup, String newOwner, XClaimOptions xClaimOptions) { - return createFlux(connection -> connection.xClaim(rawKey(key), consumerGroup, newOwner, xClaimOptions) + return createFlux(streamCommands -> streamCommands.xClaim(rawKey(key), consumerGroup, newOwner, xClaimOptions) .map(this::deserializeRecord)); } @@ -157,7 +159,7 @@ public Mono delete(K key, RecordId... recordIds) { Assert.notNull(key, "Key must not be null"); Assert.notNull(recordIds, "MessageIds must not be null"); - return createMono(connection -> connection.xDel(rawKey(key), recordIds)); + return createMono(streamCommands -> streamCommands.xDel(rawKey(key), recordIds)); } @Override @@ -167,7 +169,7 @@ public Mono createGroup(K key, ReadOffset readOffset, String group) { Assert.notNull(readOffset, "ReadOffset must not be null"); Assert.notNull(group, "Group must not be null"); - return createMono(connection -> connection.xGroupCreate(rawKey(key), group, readOffset, true)); + return createMono(streamCommands -> streamCommands.xGroupCreate(rawKey(key), group, readOffset, true)); } @Override @@ -176,7 +178,7 @@ public Mono deleteConsumer(K key, Consumer consumer) { Assert.notNull(key, "Key must not be null"); Assert.notNull(consumer, "Consumer must not be null"); - return createMono(connection -> connection.xGroupDelConsumer(rawKey(key), consumer)); + return createMono(streamCommands -> streamCommands.xGroupDelConsumer(rawKey(key), consumer)); } @Override @@ -185,7 +187,7 @@ public Mono destroyGroup(K key, String group) { Assert.notNull(key, "Key must not be null"); Assert.notNull(group, "Group must not be null"); - return createMono(connection -> connection.xGroupDestroy(rawKey(key), group)); + return createMono(streamCommands -> streamCommands.xGroupDestroy(rawKey(key), group)); } @Override @@ -194,7 +196,7 @@ public Flux consumers(K key, String group) { Assert.notNull(key, "Key must not be null"); Assert.notNull(group, "Group must not be null"); - return createFlux(connection -> connection.xInfoConsumers(rawKey(key), group)); + return createFlux(streamCommands -> streamCommands.xInfoConsumers(rawKey(key), group)); } @Override @@ -202,7 +204,7 @@ public Mono info(K key) { Assert.notNull(key, "Key must not be null"); - return createMono(connection -> connection.xInfo(rawKey(key))); + return createMono(streamCommands -> streamCommands.xInfo(rawKey(key))); } @Override @@ -210,28 +212,31 @@ public Flux groups(K key) { Assert.notNull(key, "Key must not be null"); - return createFlux(connection -> connection.xInfoGroups(rawKey(key))); + return createFlux(streamCommands -> streamCommands.xInfoGroups(rawKey(key))); } @Override public Mono pending(K key, String group, Range range, long count) { ByteBuffer rawKey = rawKey(key); - return createMono(connection -> connection.xPending(rawKey, group, range, count)); + + return createMono(streamCommands -> streamCommands.xPending(rawKey, group, range, count)); } @Override public Mono pending(K key, Consumer consumer, Range range, long count) { ByteBuffer rawKey = rawKey(key); - return createMono(connection -> connection.xPending(rawKey, consumer, range, count)); + + return createMono(streamCommands -> streamCommands.xPending(rawKey, consumer, range, count)); } @Override public Mono pending(K key, String group) { ByteBuffer rawKey = rawKey(key); - return createMono(connection -> connection.xPending(rawKey, group)); + + return createMono(streamCommands -> streamCommands.xPending(rawKey, group)); } @Override @@ -239,7 +244,7 @@ public Mono size(K key) { Assert.notNull(key, "Key must not be null"); - return createMono(connection -> connection.xLen(rawKey(key))); + return createMono(streamCommands -> streamCommands.xLen(rawKey(key))); } @Override @@ -249,35 +254,39 @@ public Flux> range(K key, Range range, Limit limit) Assert.notNull(range, "Range must not be null"); Assert.notNull(limit, "Limit must not be null"); - return createFlux(connection -> connection.xRange(rawKey(key), range, limit).map(this::deserializeRecord)); + return createFlux(streamCommands -> + streamCommands.xRange(rawKey(key), range, limit).map(this::deserializeRecord)); } @Override + @SuppressWarnings("unchecked") public Flux> read(StreamReadOptions readOptions, StreamOffset... streams) { Assert.notNull(readOptions, "StreamReadOptions must not be null"); Assert.notNull(streams, "Streams must not be null"); - return createFlux(connection -> { + return createFlux(streamCommands -> { StreamOffset[] streamOffsets = rawStreamOffsets(streams); - return connection.xRead(readOptions, streamOffsets).map(this::deserializeRecord); + return streamCommands.xRead(readOptions, streamOffsets).map(this::deserializeRecord); }); } @Override - public Flux> read(Consumer consumer, StreamReadOptions readOptions, StreamOffset... streams) { + @SuppressWarnings("unchecked") + public Flux> read(Consumer consumer, StreamReadOptions readOptions, + StreamOffset... streams) { Assert.notNull(consumer, "Consumer must not be null"); Assert.notNull(readOptions, "StreamReadOptions must not be null"); Assert.notNull(streams, "Streams must not be null"); - return createFlux(connection -> { + return createFlux(streamCommands -> { StreamOffset[] streamOffsets = rawStreamOffsets(streams); - return connection.xReadGroup(consumer, readOptions, streamOffsets).map(this::deserializeRecord); + return streamCommands.xReadGroup(consumer, readOptions, streamOffsets).map(this::deserializeRecord); }); } @@ -288,7 +297,8 @@ public Flux> reverseRange(K key, Range range, Limit Assert.notNull(range, "Range must not be null"); Assert.notNull(limit, "Limit must not be null"); - return createFlux(connection -> connection.xRevRange(rawKey(key), range, limit).map(this::deserializeRecord)); + return createFlux(streamCommands -> + streamCommands.xRevRange(rawKey(key), range, limit).map(this::deserializeRecord)); } @Override @@ -300,7 +310,7 @@ public Mono trim(K key, long count) { public Mono trim(K key, long count, boolean approximateTrimming) { Assert.notNull(key, "Key must not be null"); - return createMono(connection -> connection.xTrim(rawKey(key), count, approximateTrimming)); + return createMono(streamCommands -> streamCommands.xTrim(rawKey(key), count, approximateTrimming)); } @Override @@ -333,20 +343,20 @@ private ByteBuffer rawKey(K key) { return serializationContext.getKeySerializationPair().write(key); } - @SuppressWarnings("unchecked") private ByteBuffer rawHashKey(HK key) { try { return serializationContext.getHashKeySerializationPair().write(key); - } catch (IllegalStateException e) {} + } catch (IllegalStateException ignore) {} + return ByteBuffer.wrap(objectMapper.getConversionService().convert(key, byte[].class)); } - @SuppressWarnings("unchecked") private ByteBuffer rawValue(HV value) { try { return serializationContext.getHashValueSerializationPair().write(value); - } catch (IllegalStateException e) {} + } catch (IllegalStateException ignore) {} + return ByteBuffer.wrap(objectMapper.getConversionService().convert(value, byte[].class)); } diff --git a/src/main/java/org/springframework/data/redis/core/DefaultReactiveValueOperations.java b/src/main/java/org/springframework/data/redis/core/DefaultReactiveValueOperations.java index 4951e5dfaa..1fdb3a36ff 100644 --- a/src/main/java/org/springframework/data/redis/core/DefaultReactiveValueOperations.java +++ b/src/main/java/org/springframework/data/redis/core/DefaultReactiveValueOperations.java @@ -60,7 +60,7 @@ public Mono set(K key, V value) { Assert.notNull(key, "Key must not be null"); - return createMono(connection -> connection.set(rawKey(key), rawValue(value))); + return createMono(stringCommands -> stringCommands.set(rawKey(key), rawValue(value))); } @Override @@ -69,8 +69,8 @@ public Mono set(K key, V value, Duration timeout) { Assert.notNull(key, "Key must not be null"); Assert.notNull(timeout, "Duration must not be null"); - return createMono( - connection -> connection.set(rawKey(key), rawValue(value), Expiration.from(timeout), SetOption.UPSERT)); + return createMono(stringCommands -> + stringCommands.set(rawKey(key), rawValue(value), Expiration.from(timeout), SetOption.UPSERT)); } @Override @@ -78,8 +78,8 @@ public Mono setIfAbsent(K key, V value) { Assert.notNull(key, "Key must not be null"); - return createMono( - connection -> connection.set(rawKey(key), rawValue(value), Expiration.persistent(), SetOption.SET_IF_ABSENT)); + return createMono(stringCommands -> + stringCommands.set(rawKey(key), rawValue(value), Expiration.persistent(), SetOption.SET_IF_ABSENT)); } @Override @@ -88,8 +88,8 @@ public Mono setIfAbsent(K key, V value, Duration timeout) { Assert.notNull(key, "Key must not be null"); Assert.notNull(timeout, "Duration must not be null"); - return createMono( - connection -> connection.set(rawKey(key), rawValue(value), Expiration.from(timeout), SetOption.SET_IF_ABSENT)); + return createMono(stringCommands -> + stringCommands.set(rawKey(key), rawValue(value), Expiration.from(timeout), SetOption.SET_IF_ABSENT)); } @Override @@ -97,8 +97,8 @@ public Mono setIfPresent(K key, V value) { Assert.notNull(key, "Key must not be null"); - return createMono( - connection -> connection.set(rawKey(key), rawValue(value), Expiration.persistent(), SetOption.SET_IF_PRESENT)); + return createMono(stringCommands -> + stringCommands.set(rawKey(key), rawValue(value), Expiration.persistent(), SetOption.SET_IF_PRESENT)); } @Override @@ -107,8 +107,8 @@ public Mono setIfPresent(K key, V value, Duration timeout) { Assert.notNull(key, "Key must not be null"); Assert.notNull(timeout, "Duration must not be null"); - return createMono( - connection -> connection.set(rawKey(key), rawValue(value), Expiration.from(timeout), SetOption.SET_IF_PRESENT)); + return createMono(stringCommands -> + stringCommands.set(rawKey(key), rawValue(value), Expiration.from(timeout), SetOption.SET_IF_PRESENT)); } @Override @@ -116,12 +116,12 @@ public Mono multiSet(Map map) { Assert.notNull(map, "Map must not be null"); - return createMono(connection -> { + return createMono(stringCommands -> { Mono> serializedMap = Flux.fromIterable(() -> map.entrySet().iterator()) .collectMap(entry -> rawKey(entry.getKey()), entry -> rawValue(entry.getValue())); - return serializedMap.flatMap(connection::mSet); + return serializedMap.flatMap(stringCommands::mSet); }); } @@ -130,12 +130,12 @@ public Mono multiSetIfAbsent(Map map) { Assert.notNull(map, "Map must not be null"); - return createMono(connection -> { + return createMono(stringCommands -> { Mono> serializedMap = Flux.fromIterable(() -> map.entrySet().iterator()) .collectMap(entry -> rawKey(entry.getKey()), entry -> rawValue(entry.getValue())); - return serializedMap.flatMap(connection::mSetNX); + return serializedMap.flatMap(stringCommands::mSetNX); }); } @@ -145,7 +145,7 @@ public Mono get(Object key) { Assert.notNull(key, "Key must not be null"); - return createMono(connection -> connection.get(rawKey((K) key)) // + return createMono(stringCommands -> stringCommands.get(rawKey((K) key)) // .map(this::readValue)); } @@ -154,7 +154,7 @@ public Mono getAndDelete(K key) { Assert.notNull(key, "Key must not be null"); - return createMono(connection -> connection.getDel(rawKey(key)) // + return createMono(stringCommands -> stringCommands.getDel(rawKey(key)) // .map(this::readValue)); } @@ -164,7 +164,7 @@ public Mono getAndExpire(K key, Duration timeout) { Assert.notNull(key, "Key must not be null"); Assert.notNull(timeout, "Timeout must not be null"); - return createMono(connection -> connection.getEx(rawKey(key), Expiration.from(timeout)) // + return createMono(stringCommands -> stringCommands.getEx(rawKey(key), Expiration.from(timeout)) // .map(this::readValue)); } @@ -173,7 +173,7 @@ public Mono getAndPersist(K key) { Assert.notNull(key, "Key must not be null"); - return createMono(connection -> connection.getEx(rawKey(key), Expiration.persistent()) // + return createMono(stringCommands -> stringCommands.getEx(rawKey(key), Expiration.persistent()) // .map(this::readValue)); } @@ -182,7 +182,7 @@ public Mono getAndSet(K key, V value) { Assert.notNull(key, "Key must not be null"); - return createMono(connection -> connection.getSet(rawKey(key), rawValue(value)).map(value()::read)); + return createMono(stringCommands -> stringCommands.getSet(rawKey(key), rawValue(value)).map(value()::read)); } @Override @@ -190,8 +190,9 @@ public Mono> multiGet(Collection keys) { Assert.notNull(keys, "Keys must not be null"); - return createMono(connection -> Flux.fromIterable(keys).map(key()::write).collectList().flatMap(connection::mGet) - .map(this::deserializeValues)); + return createMono(stringCommands -> + Flux.fromIterable(keys).map(key()::write).collectList().flatMap(stringCommands::mGet) + .map(this::deserializeValues)); } @Override @@ -240,8 +241,8 @@ public Mono append(K key, String value) { Assert.notNull(key, "Key must not be null"); Assert.notNull(value, "Value must not be null"); - return createMono( - connection -> connection.append(rawKey(key), serializationContext.getStringSerializationPair().write(value))); + return createMono(stringCommands -> + stringCommands.append(rawKey(key), serializationContext.getStringSerializationPair().write(value))); } @Override @@ -249,7 +250,7 @@ public Mono get(K key, long start, long end) { Assert.notNull(key, "Key must not be null"); - return createMono(connection -> connection.getRange(rawKey(key), start, end) // + return createMono(stringCommands -> stringCommands.getRange(rawKey(key), start, end) // .map(stringSerializationPair()::read)); } @@ -258,7 +259,7 @@ public Mono set(K key, V value, long offset) { Assert.notNull(key, "Key must not be null"); - return createMono(connection -> connection.setRange(rawKey(key), rawValue(value), offset)); + return createMono(stringCommands -> stringCommands.setRange(rawKey(key), rawValue(value), offset)); } @Override @@ -266,7 +267,7 @@ public Mono size(K key) { Assert.notNull(key, "Key must not be null"); - return createMono(connection -> connection.strLen(rawKey(key))); + return createMono(stringCommands -> stringCommands.strLen(rawKey(key))); } @Override @@ -274,7 +275,7 @@ public Mono setBit(K key, long offset, boolean value) { Assert.notNull(key, "Key must not be null"); - return createMono(connection -> connection.setBit(rawKey(key), offset, value)); + return createMono(stringCommands -> stringCommands.setBit(rawKey(key), offset, value)); } @Override @@ -282,7 +283,7 @@ public Mono getBit(K key, long offset) { Assert.notNull(key, "Key must not be null"); - return createMono(connection -> connection.getBit(rawKey(key), offset)); + return createMono(stringCommands -> stringCommands.getBit(rawKey(key), offset)); } @Override @@ -291,7 +292,7 @@ public Mono> bitField(K key, BitFieldSubCommands subCommands) { Assert.notNull(key, "Key must not be null"); Assert.notNull(subCommands, "BitFieldSubCommands must not be null"); - return createMono(connection -> connection.bitField(rawKey(key), subCommands)); + return createMono(stringCommands -> stringCommands.bitField(rawKey(key), subCommands)); } @Override diff --git a/src/main/java/org/springframework/data/redis/core/DefaultReactiveZSetOperations.java b/src/main/java/org/springframework/data/redis/core/DefaultReactiveZSetOperations.java index 2ff8d6b831..451f5e6810 100644 --- a/src/main/java/org/springframework/data/redis/core/DefaultReactiveZSetOperations.java +++ b/src/main/java/org/springframework/data/redis/core/DefaultReactiveZSetOperations.java @@ -65,7 +65,7 @@ public Mono add(K key, V value, double score) { Assert.notNull(key, "Key must not be null"); - return createMono(connection -> connection.zAdd(rawKey(key), score, rawValue(value)).map(l -> l != 0)); + return createMono(zSetCommands -> zSetCommands.zAdd(rawKey(key), score, rawValue(value)).map(l -> l != 0)); } @Override @@ -74,10 +74,10 @@ public Mono addAll(K key, Collection> tuples) { Assert.notNull(key, "Key must not be null"); Assert.notNull(tuples, "Key must not be null"); - return createMono(connection -> Flux.fromIterable(tuples) // + return createMono(zSetCommands -> Flux.fromIterable(tuples) // .map(t -> new DefaultTuple(ByteUtils.getBytes(rawValue(t.getValue())), t.getScore())) // .collectList() // - .flatMap(serialized -> connection.zAdd(rawKey(key), serialized))); + .flatMap(serialized -> zSetCommands.zAdd(rawKey(key), serialized))); } @Override @@ -88,13 +88,13 @@ public Mono remove(K key, Object... values) { Assert.notNull(values, "Values must not be null"); if (values.length == 1) { - return createMono(connection -> connection.zRem(rawKey(key), rawValue((V) values[0]))); + return createMono(zSetCommands -> zSetCommands.zRem(rawKey(key), rawValue((V) values[0]))); } - return createMono(connection -> Flux.fromArray((V[]) values) // + return createMono(zSetCommands -> Flux.fromArray((V[]) values) // .map(this::rawValue) // .collectList() // - .flatMap(serialized -> connection.zRem(rawKey(key), serialized))); + .flatMap(serialized -> zSetCommands.zRem(rawKey(key), serialized))); } @Override @@ -102,7 +102,7 @@ public Mono incrementScore(K key, V value, double delta) { Assert.notNull(key, "Key must not be null"); - return createMono(connection -> connection.zIncrBy(rawKey(key), delta, rawValue(value))); + return createMono(zSetCommands -> zSetCommands.zIncrBy(rawKey(key), delta, rawValue(value))); } @Override @@ -110,7 +110,7 @@ public Mono randomMember(K key) { Assert.notNull(key, "Key must not be null"); - return createMono(connection -> connection.zRandMember(rawKey(key))).map(this::readValue); + return createMono(zSetCommands -> zSetCommands.zRandMember(rawKey(key))).map(this::readValue); } @Override @@ -119,7 +119,7 @@ public Flux distinctRandomMembers(K key, long count) { Assert.notNull(key, "Key must not be null"); Assert.isTrue(count > 0, "Negative count not supported; Use randomMembers to allow duplicate elements"); - return createFlux(connection -> connection.zRandMember(rawKey(key), count)).map(this::readValue); + return createFlux(zSetCommands -> zSetCommands.zRandMember(rawKey(key), count)).map(this::readValue); } @Override @@ -128,7 +128,7 @@ public Flux randomMembers(K key, long count) { Assert.notNull(key, "Key must not be null"); Assert.isTrue(count > 0, "Use a positive number for count; This method is already allowing duplicate elements"); - return createFlux(connection -> connection.zRandMember(rawKey(key), -count)).map(this::readValue); + return createFlux(zSetCommands -> zSetCommands.zRandMember(rawKey(key), -count)).map(this::readValue); } @Override @@ -136,7 +136,7 @@ public Mono> randomMemberWithScore(K key) { Assert.notNull(key, "Key must not be null"); - return createMono(connection -> connection.zRandMemberWithScore(rawKey(key))).map(this::readTypedTuple); + return createMono(zSetCommands -> zSetCommands.zRandMemberWithScore(rawKey(key))).map(this::readTypedTuple); } @Override @@ -145,7 +145,7 @@ public Flux> distinctRandomMembersWithScore(K key, long count) { Assert.notNull(key, "Key must not be null"); Assert.isTrue(count > 0, "Negative count not supported; Use randomMembers to allow duplicate elements"); - return createFlux(connection -> connection.zRandMemberWithScore(rawKey(key), count)).map(this::readTypedTuple); + return createFlux(zSetCommands -> zSetCommands.zRandMemberWithScore(rawKey(key), count)).map(this::readTypedTuple); } @Override @@ -154,7 +154,7 @@ public Flux> randomMembersWithScore(K key, long count) { Assert.notNull(key, "Key must not be null"); Assert.isTrue(count > 0, "Use a positive number for count; This method is already allowing duplicate elements"); - return createFlux(connection -> connection.zRandMemberWithScore(rawKey(key), -count)).map(this::readTypedTuple); + return createFlux(zSetCommands -> zSetCommands.zRandMemberWithScore(rawKey(key), -count)).map(this::readTypedTuple); } @Override @@ -163,7 +163,7 @@ public Mono rank(K key, Object o) { Assert.notNull(key, "Key must not be null"); - return createMono(connection -> connection.zRank(rawKey(key), rawValue((V) o))); + return createMono(zSetCommands -> zSetCommands.zRank(rawKey(key), rawValue((V) o))); } @Override @@ -172,7 +172,7 @@ public Mono reverseRank(K key, Object o) { Assert.notNull(key, "Key must not be null"); - return createMono(connection -> connection.zRevRank(rawKey(key), rawValue((V) o))); + return createMono(zSetCommands -> zSetCommands.zRevRank(rawKey(key), rawValue((V) o))); } @Override @@ -181,7 +181,7 @@ public Flux range(K key, Range range) { Assert.notNull(key, "Key must not be null"); Assert.notNull(range, "Range must not be null"); - return createFlux(connection -> connection.zRange(rawKey(key), range).map(this::readValue)); + return createFlux(zSetCommands -> zSetCommands.zRange(rawKey(key), range).map(this::readValue)); } @Override @@ -190,7 +190,7 @@ public Flux> rangeWithScores(K key, Range range) { Assert.notNull(key, "Key must not be null"); Assert.notNull(range, "Range must not be null"); - return createFlux(connection -> connection.zRangeWithScores(rawKey(key), range).map(this::readTypedTuple)); + return createFlux(zSetCommands -> zSetCommands.zRangeWithScores(rawKey(key), range).map(this::readTypedTuple)); } @Override @@ -199,7 +199,7 @@ public Flux rangeByScore(K key, Range range) { Assert.notNull(key, "Key must not be null"); Assert.notNull(range, "Range must not be null"); - return createFlux(connection -> connection.zRangeByScore(rawKey(key), range).map(this::readValue)); + return createFlux(zSetCommands -> zSetCommands.zRangeByScore(rawKey(key), range).map(this::readValue)); } @Override @@ -208,7 +208,8 @@ public Flux> rangeByScoreWithScores(K key, Range range) { Assert.notNull(key, "Key must not be null"); Assert.notNull(range, "Range must not be null"); - return createFlux(connection -> connection.zRangeByScoreWithScores(rawKey(key), range).map(this::readTypedTuple)); + return createFlux(zSetCommands -> + zSetCommands.zRangeByScoreWithScores(rawKey(key), range).map(this::readTypedTuple)); } @Override @@ -217,7 +218,7 @@ public Flux rangeByScore(K key, Range range, Limit limit) { Assert.notNull(key, "Key must not be null"); Assert.notNull(range, "Range must not be null"); - return createFlux(connection -> connection.zRangeByScore(rawKey(key), range, limit).map(this::readValue)); + return createFlux(zSetCommands -> zSetCommands.zRangeByScore(rawKey(key), range, limit).map(this::readValue)); } @Override @@ -227,8 +228,8 @@ public Flux> rangeByScoreWithScores(K key, Range range, Li Assert.notNull(range, "Range must not be null"); Assert.notNull(limit, "Limit must not be null"); - return createFlux( - connection -> connection.zRangeByScoreWithScores(rawKey(key), range, limit).map(this::readTypedTuple)); + return createFlux(zSetCommands -> + zSetCommands.zRangeByScoreWithScores(rawKey(key), range, limit).map(this::readTypedTuple)); } @Override @@ -237,7 +238,7 @@ public Flux reverseRange(K key, Range range) { Assert.notNull(key, "Key must not be null"); Assert.notNull(range, "Range must not be null"); - return createFlux(connection -> connection.zRevRange(rawKey(key), range).map(this::readValue)); + return createFlux(zSetCommands -> zSetCommands.zRevRange(rawKey(key), range).map(this::readValue)); } @Override @@ -246,7 +247,8 @@ public Flux> reverseRangeWithScores(K key, Range range) { Assert.notNull(key, "Key must not be null"); Assert.notNull(range, "Range must not be null"); - return createFlux(connection -> connection.zRevRangeWithScores(rawKey(key), range).map(this::readTypedTuple)); + return createFlux(zSetCommands -> + zSetCommands.zRevRangeWithScores(rawKey(key), range).map(this::readTypedTuple)); } @Override @@ -255,7 +257,7 @@ public Flux reverseRangeByScore(K key, Range range) { Assert.notNull(key, "Key must not be null"); Assert.notNull(range, "Range must not be null"); - return createFlux(connection -> connection.zRevRangeByScore(rawKey(key), range).map(this::readValue)); + return createFlux(zSetCommands -> zSetCommands.zRevRangeByScore(rawKey(key), range).map(this::readValue)); } @Override @@ -264,8 +266,8 @@ public Flux> reverseRangeByScoreWithScores(K key, Range ra Assert.notNull(key, "Key must not be null"); Assert.notNull(range, "Range must not be null"); - return createFlux( - connection -> connection.zRevRangeByScoreWithScores(rawKey(key), range).map(this::readTypedTuple)); + return createFlux(zSetCommands -> + zSetCommands.zRevRangeByScoreWithScores(rawKey(key), range).map(this::readTypedTuple)); } @Override @@ -274,7 +276,8 @@ public Flux reverseRangeByScore(K key, Range range, Limit limit) { Assert.notNull(key, "Key must not be null"); Assert.notNull(range, "Range must not be null"); - return createFlux(connection -> connection.zRevRangeByScore(rawKey(key), range, limit).map(this::readValue)); + return createFlux(zSetCommands -> + zSetCommands.zRevRangeByScore(rawKey(key), range, limit).map(this::readValue)); } @Override @@ -284,8 +287,8 @@ public Flux> reverseRangeByScoreWithScores(K key, Range ra Assert.notNull(range, "Range must not be null"); Assert.notNull(limit, "Limit must not be null"); - return createFlux( - connection -> connection.zRevRangeByScoreWithScores(rawKey(key), range, limit).map(this::readTypedTuple)); + return createFlux(zSetCommands -> + zSetCommands.zRevRangeByScoreWithScores(rawKey(key), range, limit).map(this::readTypedTuple)); } @Override @@ -296,7 +299,7 @@ public Mono rangeAndStoreByLex(K srcKey, K dstKey, Range range, Li Assert.notNull(range, "Range must not be null"); Assert.notNull(limit, "Limit must not be null"); - return createMono(connection -> connection.zRangeStoreByLex(rawKey(srcKey), rawKey(dstKey), range, limit)); + return createMono(zSetCommands -> zSetCommands.zRangeStoreByLex(rawKey(srcKey), rawKey(dstKey), range, limit)); } @Override @@ -307,7 +310,8 @@ public Mono reverseRangeAndStoreByLex(K srcKey, K dstKey, Range ra Assert.notNull(range, "Range must not be null"); Assert.notNull(limit, "Limit must not be null"); - return createMono(connection -> connection.zRangeStoreRevByLex(rawKey(srcKey), rawKey(dstKey), range, limit)); + return createMono(zSetCommands -> + zSetCommands.zRangeStoreRevByLex(rawKey(srcKey), rawKey(dstKey), range, limit)); } @Override @@ -318,7 +322,8 @@ public Mono rangeAndStoreByScore(K srcKey, K dstKey, Range range, Assert.notNull(range, "Range must not be null"); Assert.notNull(limit, "Limit must not be null"); - return createMono(connection -> connection.zRangeStoreByScore(rawKey(srcKey), rawKey(dstKey), range, limit)); + return createMono(zSetCommands -> + zSetCommands.zRangeStoreByScore(rawKey(srcKey), rawKey(dstKey), range, limit)); } @Override @@ -329,7 +334,8 @@ public Mono reverseRangeAndStoreByScore(K srcKey, K dstKey, Range Assert.notNull(range, "Range must not be null"); Assert.notNull(limit, "Limit must not be null"); - return createMono(connection -> connection.zRangeStoreRevByScore(rawKey(srcKey), rawKey(dstKey), range, limit)); + return createMono(zSetCommands -> + zSetCommands.zRangeStoreRevByScore(rawKey(srcKey), rawKey(dstKey), range, limit)); } @Override @@ -338,7 +344,8 @@ public Flux> scan(K key, ScanOptions options) { Assert.notNull(key, "Key must not be null"); Assert.notNull(options, "ScanOptions must not be null"); - return createFlux(connection -> connection.zScan(rawKey(key), options).map(this::readTypedTuple)); + return createFlux(zSetCommands -> + zSetCommands.zScan(rawKey(key), options).map(this::readTypedTuple)); } @Override @@ -347,7 +354,7 @@ public Mono count(K key, Range range) { Assert.notNull(key, "Key must not be null"); Assert.notNull(range, "Range must not be null"); - return createMono(connection -> connection.zCount(rawKey(key), range)); + return createMono(zSetCommands -> zSetCommands.zCount(rawKey(key), range)); } @Override @@ -356,7 +363,7 @@ public Mono lexCount(K key, Range range) { Assert.notNull(key, "Key must not be null"); Assert.notNull(range, "Range must not be null"); - return createMono(connection -> connection.zLexCount(rawKey(key), range)); + return createMono(zSetCommands -> zSetCommands.zLexCount(rawKey(key), range)); } @Override @@ -364,7 +371,7 @@ public Mono> popMin(K key) { Assert.notNull(key, "Key must not be null"); - return createMono(connection -> connection.zPopMin(rawKey(key)).map(this::readTypedTuple)); + return createMono(zSetCommands -> zSetCommands.zPopMin(rawKey(key)).map(this::readTypedTuple)); } @Override @@ -372,7 +379,7 @@ public Flux> popMin(K key, long count) { Assert.notNull(key, "Key must not be null"); - return createFlux(connection -> connection.zPopMin(rawKey(key), count).map(this::readTypedTuple)); + return createFlux(zSetCommands -> zSetCommands.zPopMin(rawKey(key), count).map(this::readTypedTuple)); } @Override @@ -381,7 +388,7 @@ public Mono> popMin(K key, Duration timeout) { Assert.notNull(key, "Key must not be null"); Assert.notNull(timeout, "Timeout must not be null"); - return createMono(connection -> connection.bZPopMin(rawKey(key), timeout).map(this::readTypedTuple)); + return createMono(zSetCommands -> zSetCommands.bZPopMin(rawKey(key), timeout).map(this::readTypedTuple)); } @Override @@ -389,7 +396,7 @@ public Mono> popMax(K key) { Assert.notNull(key, "Key must not be null"); - return createMono(connection -> connection.zPopMax(rawKey(key)).map(this::readTypedTuple)); + return createMono(zSetCommands -> zSetCommands.zPopMax(rawKey(key)).map(this::readTypedTuple)); } @Override @@ -397,7 +404,7 @@ public Flux> popMax(K key, long count) { Assert.notNull(key, "Key must not be null"); - return createFlux(connection -> connection.zPopMax(rawKey(key), count).map(this::readTypedTuple)); + return createFlux(zSetCommands -> zSetCommands.zPopMax(rawKey(key), count).map(this::readTypedTuple)); } @Override @@ -406,7 +413,7 @@ public Mono> popMax(K key, Duration timeout) { Assert.notNull(key, "Key must not be null"); Assert.notNull(timeout, "Timeout must not be null"); - return createMono(connection -> connection.bZPopMax(rawKey(key), timeout).map(this::readTypedTuple)); + return createMono(zSetCommands -> zSetCommands.bZPopMax(rawKey(key), timeout).map(this::readTypedTuple)); } @Override @@ -414,7 +421,7 @@ public Mono size(K key) { Assert.notNull(key, "Key must not be null"); - return createMono(connection -> connection.zCard(rawKey(key))); + return createMono(zSetCommands -> zSetCommands.zCard(rawKey(key))); } @Override @@ -423,7 +430,7 @@ public Mono score(K key, Object o) { Assert.notNull(key, "Key must not be null"); - return createMono(connection -> connection.zScore(rawKey(key), rawValue((V) o))); + return createMono(zSetCommands -> zSetCommands.zScore(rawKey(key), rawValue((V) o))); } @Override @@ -432,10 +439,10 @@ public Mono> score(K key, Object... o) { Assert.notNull(key, "Key must not be null"); - return createMono(connection -> Flux.fromArray((V[]) o) // + return createMono(zSetCommands -> Flux.fromArray((V[]) o) // .map(this::rawValue) // .collectList() // - .flatMap(values -> connection.zMScore(rawKey(key), values))); + .flatMap(values -> zSetCommands.zMScore(rawKey(key), values))); } @Override @@ -444,7 +451,7 @@ public Mono removeRange(K key, Range range) { Assert.notNull(key, "Key must not be null"); Assert.notNull(range, "Range must not be null"); - return createMono(connection -> connection.zRemRangeByRank(rawKey(key), range)); + return createMono(zSetCommands -> zSetCommands.zRemRangeByRank(rawKey(key), range)); } @Override @@ -453,7 +460,7 @@ public Mono removeRangeByLex(K key, Range range) { Assert.notNull(key, "Key must not be null"); Assert.notNull(range, "Range must not be null"); - return createMono(connection -> connection.zRemRangeByLex(rawKey(key), range)); + return createMono(zSetCommands -> zSetCommands.zRemRangeByLex(rawKey(key), range)); } @Override @@ -462,7 +469,7 @@ public Mono removeRangeByScore(K key, Range range) { Assert.notNull(key, "Key must not be null"); Assert.notNull(range, "Range must not be null"); - return createMono(connection -> connection.zRemRangeByScore(rawKey(key), range)); + return createMono(zSetCommands -> zSetCommands.zRemRangeByScore(rawKey(key), range)); } @Override @@ -471,10 +478,10 @@ public Flux difference(K key, Collection otherKeys) { Assert.notNull(key, "Key must not be null"); Assert.notNull(otherKeys, "Other keys must not be null"); - return createFlux(connection -> Flux.fromIterable(getKeys(key, otherKeys)) // + return createFlux(zSetCommands -> Flux.fromIterable(getKeys(key, otherKeys)) // .map(this::rawKey) // .collectList() // - .flatMapMany(connection::zDiff).map(this::readValue)); + .flatMapMany(zSetCommands::zDiff).map(this::readValue)); } @Override @@ -483,10 +490,10 @@ public Flux> differenceWithScores(K key, Collection otherKeys) Assert.notNull(key, "Key must not be null"); Assert.notNull(otherKeys, "Other keys must not be null"); - return createFlux(connection -> Flux.fromIterable(getKeys(key, otherKeys)) // + return createFlux(zSetCommands -> Flux.fromIterable(getKeys(key, otherKeys)) // .map(this::rawKey) // .collectList() // - .flatMapMany(connection::zDiffWithScores).map(this::readTypedTuple)); + .flatMapMany(zSetCommands::zDiffWithScores).map(this::readTypedTuple)); } @Override @@ -496,10 +503,10 @@ public Mono differenceAndStore(K key, Collection otherKeys, K destKey) Assert.notNull(otherKeys, "Other keys must not be null"); Assert.notNull(destKey, "Destination key must not be null"); - return createMono(connection -> Flux.fromIterable(getKeys(key, otherKeys)) // + return createMono(zSetCommands -> Flux.fromIterable(getKeys(key, otherKeys)) // .map(this::rawKey) // .collectList() // - .flatMap(serialized -> connection.zDiffStore(rawKey(destKey), serialized))); + .flatMap(serialized -> zSetCommands.zDiffStore(rawKey(destKey), serialized))); } @@ -509,10 +516,10 @@ public Flux intersect(K key, Collection otherKeys) { Assert.notNull(key, "Key must not be null"); Assert.notNull(otherKeys, "Other keys must not be null"); - return createFlux(connection -> Flux.fromIterable(getKeys(key, otherKeys)) // + return createFlux(zSetCommands -> Flux.fromIterable(getKeys(key, otherKeys)) // .map(this::rawKey) // .collectList() // - .flatMapMany(connection::zInter).map(this::readValue)); + .flatMapMany(zSetCommands::zInter).map(this::readValue)); } @Override @@ -521,10 +528,10 @@ public Flux> intersectWithScores(K key, Collection otherKeys) { Assert.notNull(key, "Key must not be null"); Assert.notNull(otherKeys, "Other keys must not be null"); - return createFlux(connection -> Flux.fromIterable(getKeys(key, otherKeys)) // + return createFlux(zSetCommands -> Flux.fromIterable(getKeys(key, otherKeys)) // .map(this::rawKey) // .collectList() // - .flatMapMany(connection::zInterWithScores).map(this::readTypedTuple)); + .flatMapMany(zSetCommands::zInterWithScores).map(this::readTypedTuple)); } @Override @@ -537,10 +544,10 @@ public Flux> intersectWithScores(K key, Collection otherKeys, A Assert.notNull(aggregate, "Aggregate must not be null"); Assert.notNull(weights, "Weights must not be null"); - return createFlux(connection -> Flux.fromIterable(getKeys(key, otherKeys)) // + return createFlux(zSetCommands -> Flux.fromIterable(getKeys(key, otherKeys)) // .map(this::rawKey) // .collectList() // - .flatMapMany(sets -> connection.zInterWithScores(sets, weights, aggregate)).map(this::readTypedTuple)); + .flatMapMany(sets -> zSetCommands.zInterWithScores(sets, weights, aggregate)).map(this::readTypedTuple)); } @Override @@ -550,10 +557,10 @@ public Mono intersectAndStore(K key, Collection otherKeys, K destKey) { Assert.notNull(otherKeys, "Other keys must not be null"); Assert.notNull(destKey, "Destination key must not be null"); - return createMono(connection -> Flux.fromIterable(getKeys(key, otherKeys)) // + return createMono(zSetCommands -> Flux.fromIterable(getKeys(key, otherKeys)) // .map(this::rawKey) // .collectList() // - .flatMap(serialized -> connection.zInterStore(rawKey(destKey), serialized))); + .flatMap(serialized -> zSetCommands.zInterStore(rawKey(destKey), serialized))); } @Override @@ -565,10 +572,10 @@ public Mono intersectAndStore(K key, Collection otherKeys, K destKey, A Assert.notNull(aggregate, "Aggregate must not be null"); Assert.notNull(weights, "Weights must not be null"); - return createMono(connection -> Flux.fromIterable(getKeys(key, otherKeys)) // + return createMono(zSetCommands -> Flux.fromIterable(getKeys(key, otherKeys)) // .map(this::rawKey) // .collectList() // - .flatMap(serialized -> connection.zInterStore(rawKey(destKey), serialized, weights, aggregate))); + .flatMap(serialized -> zSetCommands.zInterStore(rawKey(destKey), serialized, weights, aggregate))); } @Override @@ -577,10 +584,10 @@ public Flux union(K key, Collection otherKeys) { Assert.notNull(key, "Key must not be null"); Assert.notNull(otherKeys, "Other keys must not be null"); - return createFlux(connection -> Flux.fromIterable(getKeys(key, otherKeys)) // + return createFlux(zSetCommands -> Flux.fromIterable(getKeys(key, otherKeys)) // .map(this::rawKey) // .collectList() // - .flatMapMany(connection::zUnion).map(this::readValue)); + .flatMapMany(zSetCommands::zUnion).map(this::readValue)); } @Override @@ -589,10 +596,10 @@ public Flux> unionWithScores(K key, Collection otherKeys) { Assert.notNull(key, "Key must not be null"); Assert.notNull(otherKeys, "Other keys must not be null"); - return createFlux(connection -> Flux.fromIterable(getKeys(key, otherKeys)) // + return createFlux(zSetCommands -> Flux.fromIterable(getKeys(key, otherKeys)) // .map(this::rawKey) // .collectList() // - .flatMapMany(connection::zUnionWithScores).map(this::readTypedTuple)); + .flatMapMany(zSetCommands::zUnionWithScores).map(this::readTypedTuple)); } @Override @@ -603,10 +610,10 @@ public Flux> unionWithScores(K key, Collection otherKeys, Aggre Assert.notNull(aggregate, "Aggregate must not be null"); Assert.notNull(weights, "Weights must not be null"); - return createFlux(connection -> Flux.fromIterable(getKeys(key, otherKeys)) // + return createFlux(zSetCommands -> Flux.fromIterable(getKeys(key, otherKeys)) // .map(this::rawKey) // .collectList() // - .flatMapMany(sets -> connection.zUnionWithScores(sets, weights, aggregate)).map(this::readTypedTuple)); + .flatMapMany(sets -> zSetCommands.zUnionWithScores(sets, weights, aggregate)).map(this::readTypedTuple)); } @Override @@ -626,10 +633,10 @@ public Mono unionAndStore(K key, Collection otherKeys, K destKey) { Assert.notNull(otherKeys, "Other keys must not be null"); Assert.notNull(destKey, "Destination key must not be null"); - return createMono(connection -> Flux.fromIterable(getKeys(key, otherKeys)) // + return createMono(zSetCommands -> Flux.fromIterable(getKeys(key, otherKeys)) // .map(this::rawKey) // .collectList() // - .flatMap(serialized -> connection.zUnionStore(rawKey(destKey), serialized))); + .flatMap(serialized -> zSetCommands.zUnionStore(rawKey(destKey), serialized))); } @Override @@ -641,10 +648,10 @@ public Mono unionAndStore(K key, Collection otherKeys, K destKey, Aggre Assert.notNull(aggregate, "Aggregate must not be null"); Assert.notNull(weights, "Weights must not be null"); - return createMono(connection -> Flux.fromIterable(getKeys(key, otherKeys)) // + return createMono(zSetCommands -> Flux.fromIterable(getKeys(key, otherKeys)) // .map(this::rawKey) // .collectList() // - .flatMap(serialized -> connection.zUnionStore(rawKey(destKey), serialized, weights, aggregate))); + .flatMap(serialized -> zSetCommands.zUnionStore(rawKey(destKey), serialized, weights, aggregate))); } @Override @@ -653,7 +660,7 @@ public Flux rangeByLex(K key, Range range) { Assert.notNull(key, "Key must not be null"); Assert.notNull(range, "Range must not be null"); - return createFlux(connection -> connection.zRangeByLex(rawKey(key), range).map(this::readValue)); + return createFlux(zSetCommands -> zSetCommands.zRangeByLex(rawKey(key), range).map(this::readValue)); } @Override @@ -663,7 +670,7 @@ public Flux rangeByLex(K key, Range range, Limit limit) { Assert.notNull(range, "Range must not be null"); Assert.notNull(limit, "Limit must not be null"); - return createFlux(connection -> connection.zRangeByLex(rawKey(key), range, limit).map(this::readValue)); + return createFlux(zSetCommands -> zSetCommands.zRangeByLex(rawKey(key), range, limit).map(this::readValue)); } @Override @@ -672,7 +679,7 @@ public Flux reverseRangeByLex(K key, Range range) { Assert.notNull(key, "Key must not be null"); Assert.notNull(range, "Range must not be null"); - return createFlux(connection -> connection.zRevRangeByLex(rawKey(key), range).map(this::readValue)); + return createFlux(zSetCommands -> zSetCommands.zRevRangeByLex(rawKey(key), range).map(this::readValue)); } @Override @@ -682,7 +689,7 @@ public Flux reverseRangeByLex(K key, Range range, Limit limit) { Assert.notNull(range, "Range must not be null"); Assert.notNull(limit, "Limit must not be null"); - return createFlux(connection -> connection.zRevRangeByLex(rawKey(key), range, limit).map(this::readValue)); + return createFlux(zSetCommands -> zSetCommands.zRevRangeByLex(rawKey(key), range, limit).map(this::readValue)); } @Override