diff --git a/pom.xml b/pom.xml index 279c9dab27..5e08a03ca5 100644 --- a/pom.xml +++ b/pom.xml @@ -1,11 +1,13 @@ - + 4.0.0 org.springframework.data spring-data-redis - 2.6.0-SNAPSHOT + 2.6.0-GH-2110-SNAPSHOT Spring Data Redis diff --git a/src/main/java/org/springframework/data/redis/core/AbstractOperations.java b/src/main/java/org/springframework/data/redis/core/AbstractOperations.java index 6e9e5c39b8..616fc49928 100644 --- a/src/main/java/org/springframework/data/redis/core/AbstractOperations.java +++ b/src/main/java/org/springframework/data/redis/core/AbstractOperations.java @@ -93,8 +93,8 @@ RedisSerializer stringSerializer() { } @Nullable - T execute(RedisCallback callback, boolean exposeConnection) { - return template.execute(callback, exposeConnection); + T execute(RedisCallback callback) { + return template.execute(callback, true); } public RedisOperations getOperations() { diff --git a/src/main/java/org/springframework/data/redis/core/DefaultClusterOperations.java b/src/main/java/org/springframework/data/redis/core/DefaultClusterOperations.java index 4b23f1e39c..41d9538ac2 100644 --- a/src/main/java/org/springframework/data/redis/core/DefaultClusterOperations.java +++ b/src/main/java/org/springframework/data/redis/core/DefaultClusterOperations.java @@ -59,7 +59,7 @@ public Set keys(final RedisClusterNode node, final K pattern) { Assert.notNull(node, "ClusterNode must not be null."); - return execute(connection -> deserializeKeys(connection.keys(node, rawKey(pattern)))); + return doInCluster(connection -> deserializeKeys(connection.keys(node, rawKey(pattern)))); } /* @@ -71,7 +71,7 @@ public K randomKey(final RedisClusterNode node) { Assert.notNull(node, "ClusterNode must not be null."); - return execute(connection -> deserializeKey(connection.randomKey(node))); + return doInCluster(connection -> deserializeKey(connection.randomKey(node))); } /* @@ -83,7 +83,7 @@ public String ping(final RedisClusterNode node) { Assert.notNull(node, "ClusterNode must not be null."); - return execute(connection -> connection.ping(node)); + return doInCluster(connection -> connection.ping(node)); } /* @@ -95,7 +95,7 @@ public void addSlots(final RedisClusterNode node, final int... slots) { Assert.notNull(node, "ClusterNode must not be null."); - execute((RedisClusterCallback) connection -> { + doInCluster((RedisClusterCallback) connection -> { connection.clusterAddSlots(node, slots); return null; }); @@ -123,7 +123,7 @@ public void bgReWriteAof(final RedisClusterNode node) { Assert.notNull(node, "ClusterNode must not be null."); - execute((RedisClusterCallback) connection -> { + doInCluster((RedisClusterCallback) connection -> { connection.bgReWriteAof(node); return null; }); @@ -138,7 +138,7 @@ public void bgSave(final RedisClusterNode node) { Assert.notNull(node, "ClusterNode must not be null."); - execute((RedisClusterCallback) connection -> { + doInCluster((RedisClusterCallback) connection -> { connection.bgSave(node); return null; }); @@ -153,7 +153,7 @@ public void meet(final RedisClusterNode node) { Assert.notNull(node, "ClusterNode must not be null."); - execute((RedisClusterCallback) connection -> { + doInCluster((RedisClusterCallback) connection -> { connection.clusterMeet(node); return null; }); @@ -168,7 +168,7 @@ public void forget(final RedisClusterNode node) { Assert.notNull(node, "ClusterNode must not be null."); - execute((RedisClusterCallback) connection -> { + doInCluster((RedisClusterCallback) connection -> { connection.clusterForget(node); return null; }); @@ -183,7 +183,7 @@ public void flushDb(final RedisClusterNode node) { Assert.notNull(node, "ClusterNode must not be null."); - execute((RedisClusterCallback) connection -> { + doInCluster((RedisClusterCallback) connection -> { connection.flushDb(node); return null; }); @@ -198,7 +198,7 @@ public Collection getSlaves(final RedisClusterNode node) { Assert.notNull(node, "ClusterNode must not be null."); - return execute(connection -> connection.clusterGetSlaves(node)); + return doInCluster(connection -> connection.clusterGetSlaves(node)); } /* @@ -210,7 +210,7 @@ public void save(final RedisClusterNode node) { Assert.notNull(node, "ClusterNode must not be null."); - execute((RedisClusterCallback) connection -> { + doInCluster((RedisClusterCallback) connection -> { connection.save(node); return null; }); @@ -225,7 +225,7 @@ public void shutdown(final RedisClusterNode node) { Assert.notNull(node, "ClusterNode must not be null."); - execute((RedisClusterCallback) connection -> { + doInCluster((RedisClusterCallback) connection -> { connection.shutdown(node); return null; }); @@ -241,7 +241,7 @@ public void reshard(final RedisClusterNode source, final int slot, final RedisCl Assert.notNull(source, "Source node must not be null."); Assert.notNull(target, "Target node must not be null."); - execute((RedisClusterCallback) connection -> { + doInCluster((RedisClusterCallback) connection -> { connection.clusterSetSlot(target, slot, AddSlots.IMPORTING); connection.clusterSetSlot(source, slot, AddSlots.MIGRATING); @@ -262,16 +262,13 @@ public void reshard(final RedisClusterNode source, final int slot, final RedisCl * @return execution result. Can be {@literal null}. */ @Nullable - public T execute(RedisClusterCallback callback) { + T doInCluster(RedisClusterCallback callback) { Assert.notNull(callback, "ClusterCallback must not be null!"); - RedisClusterConnection connection = template.getConnectionFactory().getClusterConnection(); - - try { + try (RedisClusterConnection connection = template.getConnectionFactory().getClusterConnection()) { return callback.doInRedis(connection); - } finally { - connection.close(); } } + } diff --git a/src/main/java/org/springframework/data/redis/core/DefaultGeoOperations.java b/src/main/java/org/springframework/data/redis/core/DefaultGeoOperations.java index deb2009d93..cb10bd26ea 100644 --- a/src/main/java/org/springframework/data/redis/core/DefaultGeoOperations.java +++ b/src/main/java/org/springframework/data/redis/core/DefaultGeoOperations.java @@ -61,7 +61,7 @@ public Long add(K key, Point point, M member) { byte[] rawKey = rawKey(key); byte[] rawMember = rawValue(member); - return execute(connection -> connection.geoAdd(rawKey, point, rawMember), true); + return execute(connection -> connection.geoAdd(rawKey, point, rawMember)); } /* @@ -88,7 +88,7 @@ public Long add(K key, Map memberCoordinateMap) { rawMemberCoordinateMap.put(rawMember, memberCoordinateMap.get(member)); } - return execute(connection -> connection.geoAdd(rawKey, rawMemberCoordinateMap), true); + return execute(connection -> connection.geoAdd(rawKey, rawMemberCoordinateMap)); } /* @@ -117,7 +117,7 @@ public Distance distance(K key, M member1, M member2) { byte[] rawMember1 = rawValue(member1); byte[] rawMember2 = rawValue(member2); - return execute(connection -> connection.geoDist(rawKey, rawMember1, rawMember2), true); + return execute(connection -> connection.geoDist(rawKey, rawMember1, rawMember2)); } /* @@ -131,7 +131,7 @@ public Distance distance(K key, M member1, M member2, Metric metric) { byte[] rawMember1 = rawValue(member1); byte[] rawMember2 = rawValue(member2); - return execute(connection -> connection.geoDist(rawKey, rawMember1, rawMember2, metric), true); + return execute(connection -> connection.geoDist(rawKey, rawMember1, rawMember2, metric)); } /* @@ -144,7 +144,7 @@ public List hash(K key, M... members) { byte[] rawKey = rawKey(key); byte[][] rawMembers = rawValues(members); - return execute(connection -> connection.geoHash(rawKey, rawMembers), true); + return execute(connection -> connection.geoHash(rawKey, rawMembers)); } /* @@ -156,7 +156,7 @@ public List position(K key, M... members) { byte[] rawKey = rawKey(key); byte[][] rawMembers = rawValues(members); - return execute(connection -> connection.geoPos(rawKey, rawMembers), true); + return execute(connection -> connection.geoPos(rawKey, rawMembers)); } /* @@ -168,7 +168,7 @@ public GeoResults> radius(K key, Circle within) { byte[] rawKey = rawKey(key); - GeoResults> raw = execute(connection -> connection.geoRadius(rawKey, within), true); + GeoResults> raw = execute(connection -> connection.geoRadius(rawKey, within)); return deserializeGeoResults(raw); } @@ -182,7 +182,7 @@ public GeoResults> radius(K key, Circle within, GeoRadiusCommandA byte[] rawKey = rawKey(key); - GeoResults> raw = execute(connection -> connection.geoRadius(rawKey, within, args), true); + GeoResults> raw = execute(connection -> connection.geoRadius(rawKey, within, args)); return deserializeGeoResults(raw); } @@ -196,8 +196,8 @@ public GeoResults> radius(K key, M member, double radius) { byte[] rawKey = rawKey(key); byte[] rawMember = rawValue(member); - GeoResults> raw = execute(connection -> connection.geoRadiusByMember(rawKey, rawMember, radius), - true); + GeoResults> raw = execute( + connection -> connection.geoRadiusByMember(rawKey, rawMember, radius)); return deserializeGeoResults(raw); } @@ -213,7 +213,7 @@ public GeoResults> radius(K key, M member, Distance distance) { byte[] rawMember = rawValue(member); GeoResults> raw = execute( - connection -> connection.geoRadiusByMember(rawKey, rawMember, distance), true); + connection -> connection.geoRadiusByMember(rawKey, rawMember, distance)); return deserializeGeoResults(raw); } @@ -229,7 +229,7 @@ public GeoResults> radius(K key, M member, Distance distance, Geo byte[] rawMember = rawValue(member); GeoResults> raw = execute( - connection -> connection.geoRadiusByMember(rawKey, rawMember, distance, param), true); + connection -> connection.geoRadiusByMember(rawKey, rawMember, distance, param)); return deserializeGeoResults(raw); } @@ -243,10 +243,10 @@ public Long remove(K key, M... members) { byte[] rawKey = rawKey(key); byte[][] rawMembers = rawValues(members); - return execute(connection -> connection.zRem(rawKey, rawMembers), true); + return execute(connection -> connection.zRem(rawKey, rawMembers)); } - /* + /* * (non-Javadoc) * @see org.springframework.data.redis.core.GeoOperations#search(java.lang.Object, org.springframework.data.redis.connection.RedisGeoCommands.GeoReference, org.springframework.data.redis.connection.RedisGeoCommands.GeoShape, org.springframework.data.redis.connection.RedisGeoCommands.GeoSearchCommandArgs) */ @@ -258,12 +258,12 @@ public GeoResults> search(K key, GeoReference reference, GeoReference rawMember = getGeoReference(reference); GeoResults> raw = execute( - connection -> connection.geoSearch(rawKey, rawMember, geoPredicate, args), true); + connection -> connection.geoSearch(rawKey, rawMember, geoPredicate, args)); return deserializeGeoResults(raw); } - /* + /* * (non-Javadoc) * @see org.springframework.data.redis.core.GeoOperations#searchAndStore(java.lang.Object, java.lang.Object, org.springframework.data.redis.connection.RedisGeoCommands.GeoReference, org.springframework.data.redis.connection.RedisGeoCommands.GeoShape, org.springframework.data.redis.connection.RedisGeoCommands.GeoSearchStoreCommandArgs) */ @@ -275,7 +275,7 @@ public Long searchAndStore(K key, K destKey, GeoReference reference, byte[] rawDestKey = rawKey(destKey); GeoReference rawMember = getGeoReference(reference); - return execute(connection -> connection.geoSearchStore(rawDestKey, rawKey, rawMember, geoPredicate, args), true); + return execute(connection -> connection.geoSearchStore(rawDestKey, rawKey, rawMember, geoPredicate, args)); } @SuppressWarnings("unchecked") diff --git a/src/main/java/org/springframework/data/redis/core/DefaultHashOperations.java b/src/main/java/org/springframework/data/redis/core/DefaultHashOperations.java index 45f4936308..86868f792c 100644 --- a/src/main/java/org/springframework/data/redis/core/DefaultHashOperations.java +++ b/src/main/java/org/springframework/data/redis/core/DefaultHashOperations.java @@ -52,7 +52,7 @@ public HV get(K key, Object hashKey) { byte[] rawKey = rawKey(key); byte[] rawHashKey = rawHashKey(hashKey); - byte[] rawHashValue = execute(connection -> connection.hGet(rawKey, rawHashKey), true); + byte[] rawHashValue = execute(connection -> connection.hGet(rawKey, rawHashKey)); return (HV) rawHashValue != null ? deserializeHashValue(rawHashValue) : null; } @@ -66,7 +66,7 @@ public Boolean hasKey(K key, Object hashKey) { byte[] rawKey = rawKey(key); byte[] rawHashKey = rawHashKey(hashKey); - return execute(connection -> connection.hExists(rawKey, rawHashKey), true); + return execute(connection -> connection.hExists(rawKey, rawHashKey)); } /* @@ -78,7 +78,7 @@ public Long increment(K key, HK hashKey, long delta) { byte[] rawKey = rawKey(key); byte[] rawHashKey = rawHashKey(hashKey); - return execute(connection -> connection.hIncrBy(rawKey, rawHashKey, delta), true); + return execute(connection -> connection.hIncrBy(rawKey, rawHashKey, delta)); } /* @@ -90,10 +90,10 @@ public Double increment(K key, HK hashKey, double delta) { byte[] rawKey = rawKey(key); byte[] rawHashKey = rawHashKey(hashKey); - return execute(connection -> connection.hIncrBy(rawKey, rawHashKey, delta), true); + return execute(connection -> connection.hIncrBy(rawKey, rawHashKey, delta)); } - /* + /* * (non-Javadoc) * @see org.springframework.data.redis.core.HashOperations#randomField(java.lang.Object) */ @@ -102,10 +102,10 @@ public Double increment(K key, HK hashKey, double delta) { public HK randomKey(K key) { byte[] rawKey = rawKey(key); - return deserializeHashKey(execute(connection -> connection.hRandField(rawKey), true)); + return deserializeHashKey(execute(connection -> connection.hRandField(rawKey))); } - /* + /* * (non-Javadoc) * @see org.springframework.data.redis.core.HashOperations#randomValue(java.lang.Object) */ @@ -114,12 +114,12 @@ public HK randomKey(K key) { public Entry randomEntry(K key) { byte[] rawKey = rawKey(key); - Entry rawEntry = execute(connection -> connection.hRandFieldWithValues(rawKey), true); + Entry rawEntry = execute(connection -> connection.hRandFieldWithValues(rawKey)); return rawEntry == null ? null : Converters.entryOf(deserializeHashKey(rawEntry.getKey()), deserializeHashValue(rawEntry.getValue())); } - /* + /* * (non-Javadoc) * @see org.springframework.data.redis.core.HashOperations#randomFields(java.lang.Object, long) */ @@ -128,11 +128,11 @@ public Entry randomEntry(K key) { public List randomKeys(K key, long count) { byte[] rawKey = rawKey(key); - List rawValues = execute(connection -> connection.hRandField(rawKey, count), true); + List rawValues = execute(connection -> connection.hRandField(rawKey, count)); return deserializeHashKeys(rawValues); } - /* + /* * (non-Javadoc) * @see org.springframework.data.redis.core.HashOperations#randomValues(java.lang.Object, long) */ @@ -142,8 +142,7 @@ public Map randomEntries(K key, long count) { Assert.isTrue(count > 0, "Count must not be negative"); byte[] rawKey = rawKey(key); - List> rawEntries = execute(connection -> connection.hRandFieldWithValues(rawKey, count), - true); + List> rawEntries = execute(connection -> connection.hRandFieldWithValues(rawKey, count)); if (rawEntries == null) { return null; @@ -162,7 +161,7 @@ public Map randomEntries(K key, long count) { public Set keys(K key) { byte[] rawKey = rawKey(key); - Set rawValues = execute(connection -> connection.hKeys(rawKey), true); + Set rawValues = execute(connection -> connection.hKeys(rawKey)); return rawValues != null ? deserializeHashKeys(rawValues) : Collections.emptySet(); } @@ -175,7 +174,7 @@ public Set keys(K key) { public Long size(K key) { byte[] rawKey = rawKey(key); - return execute(connection -> connection.hLen(rawKey), true); + return execute(connection -> connection.hLen(rawKey)); } /* @@ -188,7 +187,7 @@ public Long lengthOfValue(K key, HK hashKey) { byte[] rawKey = rawKey(key); byte[] rawHashKey = rawHashKey(hashKey); - return execute(connection -> connection.hStrLen(rawKey, rawHashKey), true); + return execute(connection -> connection.hStrLen(rawKey, rawHashKey)); } /* @@ -213,7 +212,7 @@ public void putAll(K key, Map m) { execute(connection -> { connection.hMSet(rawKey, hashes); return null; - }, true); + }); } /* @@ -235,7 +234,7 @@ public List multiGet(K key, Collection fields) { rawHashKeys[counter++] = rawHashKey(hashKey); } - List rawValues = execute(connection -> connection.hMGet(rawKey, rawHashKeys), true); + List rawValues = execute(connection -> connection.hMGet(rawKey, rawHashKeys)); return deserializeHashValues(rawValues); } @@ -254,7 +253,7 @@ public void put(K key, HK hashKey, HV value) { execute(connection -> { connection.hSet(rawKey, rawHashKey, rawHashValue); return null; - }, true); + }); } /* @@ -268,7 +267,7 @@ public Boolean putIfAbsent(K key, HK hashKey, HV value) { byte[] rawHashKey = rawHashKey(hashKey); byte[] rawHashValue = rawHashValue(value); - return execute(connection -> connection.hSetNX(rawKey, rawHashKey, rawHashValue), true); + return execute(connection -> connection.hSetNX(rawKey, rawHashKey, rawHashValue)); } /* @@ -279,7 +278,7 @@ public Boolean putIfAbsent(K key, HK hashKey, HV value) { public List values(K key) { byte[] rawKey = rawKey(key); - List rawValues = execute(connection -> connection.hVals(rawKey), true); + List rawValues = execute(connection -> connection.hVals(rawKey)); return rawValues != null ? deserializeHashValues(rawValues) : Collections.emptyList(); } @@ -294,7 +293,7 @@ public Long delete(K key, Object... hashKeys) { byte[] rawKey = rawKey(key); byte[][] rawHashKeys = rawHashKeys(hashKeys); - return execute(connection -> connection.hDel(rawKey, rawHashKeys), true); + return execute(connection -> connection.hDel(rawKey, rawHashKeys)); } /* @@ -305,7 +304,7 @@ public Long delete(K key, Object... hashKeys) { public Map entries(K key) { byte[] rawKey = rawKey(key); - Map entries = execute(connection -> connection.hGetAll(rawKey), true); + Map entries = execute(connection -> connection.hGetAll(rawKey)); return entries != null ? deserializeHashMap(entries) : Collections.emptyMap(); } diff --git a/src/main/java/org/springframework/data/redis/core/DefaultHyperLogLogOperations.java b/src/main/java/org/springframework/data/redis/core/DefaultHyperLogLogOperations.java index 5b917e62c5..a39eb3106a 100644 --- a/src/main/java/org/springframework/data/redis/core/DefaultHyperLogLogOperations.java +++ b/src/main/java/org/springframework/data/redis/core/DefaultHyperLogLogOperations.java @@ -38,7 +38,7 @@ public Long add(K key, V... values) { byte[] rawKey = rawKey(key); byte[][] rawValues = rawValues(values); - return execute(connection -> connection.pfAdd(rawKey, rawValues), true); + return execute(connection -> connection.pfAdd(rawKey, rawValues)); } /* @@ -49,7 +49,7 @@ public Long add(K key, V... values) { public Long size(K... keys) { byte[][] rawKeys = rawKeys(Arrays.asList(keys)); - return execute(connection -> connection.pfCount(rawKeys), true); + return execute(connection -> connection.pfCount(rawKeys)); } /* @@ -65,7 +65,7 @@ public Long union(K destination, K... sourceKeys) { connection.pfMerge(rawDestinationKey, rawSourceKeys); return connection.pfCount(rawDestinationKey); - }, true); + }); } /* diff --git a/src/main/java/org/springframework/data/redis/core/DefaultListOperations.java b/src/main/java/org/springframework/data/redis/core/DefaultListOperations.java index db434929d6..105115a8e9 100644 --- a/src/main/java/org/springframework/data/redis/core/DefaultListOperations.java +++ b/src/main/java/org/springframework/data/redis/core/DefaultListOperations.java @@ -52,7 +52,7 @@ public V index(K key, long index) { protected byte[] inRedis(byte[] rawKey, RedisConnection connection) { return connection.lIndex(rawKey, index); } - }, true); + }); } /* @@ -64,7 +64,7 @@ public Long indexOf(K key, V value) { byte[] rawKey = rawKey(key); byte[] rawValue = rawValue(value); - return execute(connection -> connection.lPos(rawKey, rawValue), true); + return execute(connection -> connection.lPos(rawKey, rawValue)); } /* @@ -80,7 +80,7 @@ public Long lastIndexOf(K key, V value) { List indexes = connection.lPos(rawKey, rawValue, -1, null); return CollectionUtils.firstElement(indexes); - }, true); + }); } /* @@ -96,7 +96,7 @@ public V leftPop(K key) { protected byte[] inRedis(byte[] rawKey, RedisConnection connection) { return connection.lPop(rawKey); } - }, true); + }); } /* @@ -106,7 +106,7 @@ protected byte[] inRedis(byte[] rawKey, RedisConnection connection) { @Override public List leftPop(K key, long count) { byte[] rawKey = rawKey(key); - return execute(connection -> deserializeValues(connection.lPop(rawKey, count)), true); + return execute(connection -> deserializeValues(connection.lPop(rawKey, count))); } /* @@ -124,7 +124,7 @@ protected byte[] inRedis(byte[] rawKey, RedisConnection connection) { List lPop = connection.bLPop(tm, rawKey); return (CollectionUtils.isEmpty(lPop) ? null : lPop.get(1)); } - }, true); + }); } /* @@ -136,7 +136,7 @@ public Long leftPush(K key, V value) { byte[] rawKey = rawKey(key); byte[] rawValue = rawValue(value); - return execute(connection -> connection.lPush(rawKey, rawValue), true); + return execute(connection -> connection.lPush(rawKey, rawValue)); } /* @@ -148,7 +148,7 @@ public Long leftPushAll(K key, V... values) { byte[] rawKey = rawKey(key); byte[][] rawValues = rawValues(values); - return execute(connection -> connection.lPush(rawKey, rawValues), true); + return execute(connection -> connection.lPush(rawKey, rawValues)); } /* @@ -161,7 +161,7 @@ public Long leftPushAll(K key, Collection values) { byte[] rawKey = rawKey(key); byte[][] rawValues = rawValues(values); - return execute(connection -> connection.lPush(rawKey, rawValues), true); + return execute(connection -> connection.lPush(rawKey, rawValues)); } /* @@ -173,7 +173,7 @@ public Long leftPushIfPresent(K key, V value) { byte[] rawKey = rawKey(key); byte[] rawValue = rawValue(value); - return execute(connection -> connection.lPushX(rawKey, rawValue), true); + return execute(connection -> connection.lPushX(rawKey, rawValue)); } /* @@ -186,7 +186,7 @@ public Long leftPush(K key, V pivot, V value) { byte[] rawKey = rawKey(key); byte[] rawPivot = rawValue(pivot); byte[] rawValue = rawValue(value); - return execute(connection -> connection.lInsert(rawKey, Position.BEFORE, rawPivot, rawValue), true); + return execute(connection -> connection.lInsert(rawKey, Position.BEFORE, rawPivot, rawValue)); } /* @@ -197,7 +197,7 @@ public Long leftPush(K key, V pivot, V value) { public Long size(K key) { byte[] rawKey = rawKey(key); - return execute(connection -> connection.lLen(rawKey), true); + return execute(connection -> connection.lLen(rawKey)); } /* @@ -208,7 +208,7 @@ public Long size(K key) { public List range(K key, long start, long end) { byte[] rawKey = rawKey(key); - return execute(connection -> deserializeValues(connection.lRange(rawKey, start, end)), true); + return execute(connection -> deserializeValues(connection.lRange(rawKey, start, end))); } /* @@ -220,7 +220,7 @@ public Long remove(K key, long count, Object value) { byte[] rawKey = rawKey(key); byte[] rawValue = rawValue(value); - return execute(connection -> connection.lRem(rawKey, count, rawValue), true); + return execute(connection -> connection.lRem(rawKey, count, rawValue)); } /* @@ -236,7 +236,7 @@ public V rightPop(K key) { protected byte[] inRedis(byte[] rawKey, RedisConnection connection) { return connection.rPop(rawKey); } - }, true); + }); } /* @@ -246,7 +246,7 @@ protected byte[] inRedis(byte[] rawKey, RedisConnection connection) { @Override public List rightPop(K key, long count) { byte[] rawKey = rawKey(key); - return execute(connection -> deserializeValues(connection.rPop(rawKey, count)), true); + return execute(connection -> deserializeValues(connection.rPop(rawKey, count))); } /* @@ -265,7 +265,7 @@ protected byte[] inRedis(byte[] rawKey, RedisConnection connection) { List bRPop = connection.bRPop(tm, rawKey); return (CollectionUtils.isEmpty(bRPop) ? null : bRPop.get(1)); } - }, true); + }); } /* @@ -277,7 +277,7 @@ public Long rightPush(K key, V value) { byte[] rawKey = rawKey(key); byte[] rawValue = rawValue(value); - return execute(connection -> connection.rPush(rawKey, rawValue), true); + return execute(connection -> connection.rPush(rawKey, rawValue)); } /* @@ -289,7 +289,7 @@ public Long rightPushAll(K key, V... values) { byte[] rawKey = rawKey(key); byte[][] rawValues = rawValues(values); - return execute(connection -> connection.rPush(rawKey, rawValues), true); + return execute(connection -> connection.rPush(rawKey, rawValues)); } /* @@ -301,7 +301,7 @@ public Long rightPushAll(K key, Collection values) { byte[] rawKey = rawKey(key); byte[][] rawValues = rawValues(values); - return execute(connection -> connection.rPush(rawKey, rawValues), true); + return execute(connection -> connection.rPush(rawKey, rawValues)); } /* @@ -313,7 +313,7 @@ public Long rightPushIfPresent(K key, V value) { byte[] rawKey = rawKey(key); byte[] rawValue = rawValue(value); - return execute(connection -> connection.rPushX(rawKey, rawValue), true); + return execute(connection -> connection.rPushX(rawKey, rawValue)); } /* @@ -326,7 +326,7 @@ public Long rightPush(K key, V pivot, V value) { byte[] rawKey = rawKey(key); byte[] rawPivot = rawValue(pivot); byte[] rawValue = rawValue(value); - return execute(connection -> connection.lInsert(rawKey, Position.AFTER, rawPivot, rawValue), true); + return execute(connection -> connection.lInsert(rawKey, Position.AFTER, rawPivot, rawValue)); } /* @@ -343,7 +343,7 @@ public V rightPopAndLeftPush(K sourceKey, K destinationKey) { protected byte[] inRedis(byte[] rawSourceKey, RedisConnection connection) { return connection.rPopLPush(rawSourceKey, rawDestKey); } - }, true); + }); } /* @@ -361,10 +361,10 @@ public V rightPopAndLeftPush(K sourceKey, K destinationKey, long timeout, TimeUn protected byte[] inRedis(byte[] rawSourceKey, RedisConnection connection) { return connection.bRPopLPush(tm, rawSourceKey, rawDestKey); } - }, true); + }); } - /* + /* * (non-Javadoc) * @see org.springframework.data.redis.core.ListOperations#move(java.lang.Object, org.springframework.data.redis.connection.RedisListCommands.Direction, java.lang.Object, org.springframework.data.redis.connection.RedisListCommands.Direction) */ @@ -378,10 +378,10 @@ public V move(K sourceKey, Direction from, K destinationKey, Direction to) { protected byte[] inRedis(byte[] rawSourceKey, RedisConnection connection) { return connection.lMove(rawSourceKey, rawDestKey, from, to); } - }, true); + }); } - /* + /* * (non-Javadoc) * @see org.springframework.data.redis.core.ListOperations#move(java.lang.Object, org.springframework.data.redis.connection.RedisListCommands.Direction, java.lang.Object, org.springframework.data.redis.connection.RedisListCommands.Direction, long, java.util.concurrent.TimeUnit) */ @@ -395,7 +395,7 @@ public V move(K sourceKey, Direction from, K destinationKey, Direction to, long protected byte[] inRedis(byte[] rawSourceKey, RedisConnection connection) { return connection.bLMove(rawSourceKey, rawDestKey, from, to, TimeoutUtils.toDoubleSeconds(timeout, unit)); } - }, true); + }); } /* @@ -413,7 +413,7 @@ protected byte[] inRedis(byte[] rawKey, RedisConnection connection) { connection.lSet(rawKey, index, rawValue); return null; } - }, true); + }); } /* @@ -430,6 +430,6 @@ protected byte[] inRedis(byte[] rawKey, RedisConnection connection) { connection.lTrim(rawKey, start, end); return null; } - }, true); + }); } } diff --git a/src/main/java/org/springframework/data/redis/core/DefaultReactiveGeoOperations.java b/src/main/java/org/springframework/data/redis/core/DefaultReactiveGeoOperations.java index 4131a52302..8444aa256a 100644 --- a/src/main/java/org/springframework/data/redis/core/DefaultReactiveGeoOperations.java +++ b/src/main/java/org/springframework/data/redis/core/DefaultReactiveGeoOperations.java @@ -15,8 +15,6 @@ */ package org.springframework.data.redis.core; -import org.springframework.data.redis.domain.geo.GeoReference; -import org.springframework.data.redis.domain.geo.GeoReference.GeoMemberReference; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -28,6 +26,7 @@ import java.util.stream.Collectors; import org.reactivestreams.Publisher; + import org.springframework.data.geo.Circle; import org.springframework.data.geo.Distance; import org.springframework.data.geo.GeoResult; @@ -37,6 +36,8 @@ import org.springframework.data.redis.connection.RedisGeoCommands; import org.springframework.data.redis.connection.RedisGeoCommands.GeoLocation; import org.springframework.data.redis.connection.RedisGeoCommands.GeoRadiusCommandArgs; +import org.springframework.data.redis.domain.geo.GeoReference; +import org.springframework.data.redis.domain.geo.GeoReference.GeoMemberReference; import org.springframework.data.redis.domain.geo.GeoShape; import org.springframework.data.redis.serializer.RedisSerializationContext; import org.springframework.util.Assert; @@ -320,7 +321,7 @@ public final Mono remove(K key, V... members) { Assert.notEmpty(members, "Members must not be null or empty!"); Assert.noNullElements(members, "Members must not contain null elements!"); - return template.createMono(connection -> Flux.fromArray(members) // + return template.doCreateMono(connection -> Flux.fromArray(members) // .map(this::rawValue) // .collectList() // .flatMap(serialized -> connection.zSetCommands().zRem(rawKey(key), serialized))); @@ -335,10 +336,10 @@ public Mono delete(K key) { Assert.notNull(key, "Key must not be null!"); - return template.createMono(connection -> connection.keyCommands().del(rawKey(key))).map(l -> l != 0); + return template.doCreateMono(connection -> connection.keyCommands().del(rawKey(key))).map(l -> l != 0); } - /* + /* * (non-Javadoc) * @see org.springframework.data.redis.core.ReactiveGeoOperations#search(K, RedisGeoCommands.GeoReference, GeoShape, GeoSearchCommandArgs) */ @@ -350,11 +351,11 @@ public Flux>> search(K key, GeoReference reference, Assert.notNull(reference, "GeoReference must not be null!"); GeoReference rawReference = getGeoReference(reference); - return template.createFlux(connection -> connection.geoCommands() + return template.doCreateFlux(connection -> connection.geoCommands() .geoSearch(rawKey(key), rawReference, geoPredicate, args).map(this::readGeoResult)); } - /* + /* * (non-Javadoc) * @see org.springframework.data.redis.core.ReactiveGeoOperations#searchAndStore(K, K, RedisGeoCommands.GeoReference, GeoShape, GeoSearchStoreCommandArgs) */ @@ -366,7 +367,7 @@ public Mono searchAndStore(K key, K destKey, GeoReference reference, Assert.notNull(reference, "GeoReference must not be null!"); GeoReference rawReference = getGeoReference(reference); - return template.createMono(connection -> connection.geoCommands().geoSearchStore(rawKey(destKey), rawKey(key), + return template.doCreateMono(connection -> connection.geoCommands().geoSearchStore(rawKey(destKey), rawKey(key), rawReference, geoPredicate, args)); } @@ -374,14 +375,14 @@ private Mono createMono(Function> funct Assert.notNull(function, "Function must not be null!"); - return template.createMono(connection -> function.apply(connection.geoCommands())); + return template.doCreateMono(connection -> function.apply(connection.geoCommands())); } private Flux createFlux(Function> function) { Assert.notNull(function, "Function must not be null!"); - return template.createFlux(connection -> function.apply(connection.geoCommands())); + return template.doCreateFlux(connection -> function.apply(connection.geoCommands())); } @SuppressWarnings("unchecked") diff --git a/src/main/java/org/springframework/data/redis/core/DefaultReactiveHashOperations.java b/src/main/java/org/springframework/data/redis/core/DefaultReactiveHashOperations.java index 08b3ba083b..54f863770b 100644 --- a/src/main/java/org/springframework/data/redis/core/DefaultReactiveHashOperations.java +++ b/src/main/java/org/springframework/data/redis/core/DefaultReactiveHashOperations.java @@ -15,20 +15,20 @@ */ package org.springframework.data.redis.core; -import org.springframework.data.redis.connection.convert.Converters; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.function.Function; import org.reactivestreams.Publisher; + import org.springframework.data.redis.connection.ReactiveHashCommands; +import org.springframework.data.redis.connection.convert.Converters; import org.springframework.data.redis.serializer.RedisSerializationContext; import org.springframework.util.Assert; @@ -125,7 +125,7 @@ public Mono increment(H key, HK hashKey, long delta) { Assert.notNull(key, "Key must not be null!"); Assert.notNull(hashKey, "Hash key must not be null!"); - return template.createMono(connection -> connection // + return template.doCreateMono(connection -> connection // .numberCommands() // .hIncrBy(rawKey(key), rawHashKey(hashKey), delta)); } @@ -140,7 +140,7 @@ public Mono increment(H key, HK hashKey, double delta) { Assert.notNull(key, "Key must not be null!"); Assert.notNull(hashKey, "Hash key must not be null!"); - return template.createMono(connection -> connection // + return template.doCreateMono(connection -> connection // .numberCommands() // .hIncrBy(rawKey(key), rawHashKey(hashKey), delta)); } @@ -154,7 +154,7 @@ public Mono randomKey(H key) { Assert.notNull(key, "Key must not be null!"); - return template.createMono(connection -> connection // + return template.doCreateMono(connection -> connection // .hashCommands().hRandField(rawKey(key))).map(this::readHashKey); } @@ -167,7 +167,7 @@ public Mono> randomEntry(H key) { Assert.notNull(key, "Key must not be null!"); - return template.createMono(connection -> connection // + return template.doCreateMono(connection -> connection // .hashCommands().hRandFieldWithValues(rawKey(key))).map(this::deserializeHashEntry); } @@ -180,7 +180,7 @@ public Flux randomKeys(H key, long count) { Assert.notNull(key, "Key must not be null!"); - return template.createFlux(connection -> connection // + return template.doCreateFlux(connection -> connection // .hashCommands().hRandField(rawKey(key), count)).map(this::readHashKey); } @@ -193,7 +193,7 @@ public Flux> randomEntries(H key, long count) { Assert.notNull(key, "Key must not be null!"); - return template.createFlux(connection -> connection // + return template.doCreateFlux(connection -> connection // .hashCommands().hRandFieldWithValues(rawKey(key), count)).map(this::deserializeHashEntry); } @@ -314,21 +314,21 @@ public Mono delete(H key) { Assert.notNull(key, "Key must not be null!"); - return template.createMono(connection -> connection.keyCommands().del(rawKey(key))).map(l -> l != 0); + return template.doCreateMono(connection -> connection.keyCommands().del(rawKey(key))).map(l -> l != 0); } private Mono createMono(Function> function) { Assert.notNull(function, "Function must not be null!"); - return template.createMono(connection -> function.apply(connection.hashCommands())); + return template.doCreateMono(connection -> function.apply(connection.hashCommands())); } private Flux createFlux(Function> function) { Assert.notNull(function, "Function must not be null!"); - return template.createFlux(connection -> function.apply(connection.hashCommands())); + return template.doCreateFlux(connection -> function.apply(connection.hashCommands())); } private ByteBuffer rawKey(H key) { diff --git a/src/main/java/org/springframework/data/redis/core/DefaultReactiveHyperLogLogOperations.java b/src/main/java/org/springframework/data/redis/core/DefaultReactiveHyperLogLogOperations.java index 3fdb8af4dd..98f8993657 100644 --- a/src/main/java/org/springframework/data/redis/core/DefaultReactiveHyperLogLogOperations.java +++ b/src/main/java/org/springframework/data/redis/core/DefaultReactiveHyperLogLogOperations.java @@ -103,14 +103,14 @@ public Mono delete(K key) { Assert.notNull(key, "Key must not be null!"); - return template.createMono(connection -> connection.keyCommands().del(rawKey(key))).map(l -> l != 0); + return template.doCreateMono(connection -> connection.keyCommands().del(rawKey(key))).map(l -> l != 0); } private Mono createMono(Function> function) { Assert.notNull(function, "Function must not be null!"); - return template.createMono(connection -> function.apply(connection.hyperLogLogCommands())); + return template.doCreateMono(connection -> function.apply(connection.hyperLogLogCommands())); } private ByteBuffer rawKey(K key) { diff --git a/src/main/java/org/springframework/data/redis/core/DefaultReactiveListOperations.java b/src/main/java/org/springframework/data/redis/core/DefaultReactiveListOperations.java index adca75fb88..2481785917 100644 --- a/src/main/java/org/springframework/data/redis/core/DefaultReactiveListOperations.java +++ b/src/main/java/org/springframework/data/redis/core/DefaultReactiveListOperations.java @@ -200,7 +200,7 @@ public Mono rightPush(K key, V pivot, V value) { return createMono(connection -> connection.lInsert(rawKey(key), Position.AFTER, rawValue(pivot), rawValue(value))); } - /* + /* * (non-Javadoc) * @see org.springframework.data.redis.core.ReactiveListOperations#move(K, Direction, K, Direction) */ @@ -216,7 +216,7 @@ public Mono move(K sourceKey, Direction from, K destinationKey, Direction to) connection -> connection.lMove(rawKey(sourceKey), rawKey(destinationKey), from, to).map(this::readValue)); } - /* + /* * (non-Javadoc) * @see org.springframework.data.redis.core.ReactiveListOperations#move(K, Direction, K, Direction, Duration) */ @@ -378,21 +378,21 @@ public Mono delete(K key) { Assert.notNull(key, "Key must not be null!"); - return template.createMono(connection -> connection.keyCommands().del(rawKey(key))).map(l -> l != 0); + return template.doCreateMono(connection -> connection.keyCommands().del(rawKey(key))).map(l -> l != 0); } private Mono createMono(Function> function) { Assert.notNull(function, "Function must not be null!"); - return template.createMono(connection -> function.apply(connection.listCommands())); + return template.doCreateMono(connection -> function.apply(connection.listCommands())); } private Flux createFlux(Function> function) { Assert.notNull(function, "Function must not be null!"); - return template.createFlux(connection -> function.apply(connection.listCommands())); + return template.doCreateFlux(connection -> function.apply(connection.listCommands())); } private boolean isZeroOrGreater1Second(Duration timeout) { diff --git a/src/main/java/org/springframework/data/redis/core/DefaultReactiveSetOperations.java b/src/main/java/org/springframework/data/redis/core/DefaultReactiveSetOperations.java index 1f1ba2adf3..6d287c36ee 100644 --- a/src/main/java/org/springframework/data/redis/core/DefaultReactiveSetOperations.java +++ b/src/main/java/org/springframework/data/redis/core/DefaultReactiveSetOperations.java @@ -508,21 +508,21 @@ public Mono delete(K key) { Assert.notNull(key, "Key must not be null!"); - return template.createMono(connection -> connection.keyCommands().del(rawKey(key))).map(l -> l != 0); + return template.doCreateMono(connection -> connection.keyCommands().del(rawKey(key))).map(l -> l != 0); } private Mono createMono(Function> function) { Assert.notNull(function, "Function must not be null!"); - return template.createMono(connection -> function.apply(connection.setCommands())); + return template.doCreateMono(connection -> function.apply(connection.setCommands())); } private Flux createFlux(Function> function) { Assert.notNull(function, "Function must not be null!"); - return template.createFlux(connection -> function.apply(connection.setCommands())); + return template.doCreateFlux(connection -> function.apply(connection.setCommands())); } private ByteBuffer rawKey(K key) { diff --git a/src/main/java/org/springframework/data/redis/core/DefaultReactiveStreamOperations.java b/src/main/java/org/springframework/data/redis/core/DefaultReactiveStreamOperations.java index 0fc811c99a..1ecbd97c9c 100644 --- a/src/main/java/org/springframework/data/redis/core/DefaultReactiveStreamOperations.java +++ b/src/main/java/org/springframework/data/redis/core/DefaultReactiveStreamOperations.java @@ -15,7 +15,6 @@ */ package org.springframework.data.redis.core; -import org.springframework.data.redis.connection.convert.Converters; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -28,10 +27,12 @@ import java.util.function.Function; import org.reactivestreams.Publisher; + import org.springframework.core.convert.ConversionService; import org.springframework.data.domain.Range; import org.springframework.data.redis.connection.ReactiveStreamCommands; import org.springframework.data.redis.connection.RedisZSetCommands.Limit; +import org.springframework.data.redis.connection.convert.Converters; import org.springframework.data.redis.connection.stream.ByteBufferRecord; import org.springframework.data.redis.connection.stream.Consumer; import org.springframework.data.redis.connection.stream.MapRecord; @@ -372,14 +373,14 @@ private Mono createMono(Function> fu Assert.notNull(function, "Function must not be null!"); - return template.createMono(connection -> function.apply(connection.streamCommands())); + return template.doCreateMono(connection -> function.apply(connection.streamCommands())); } private Flux createFlux(Function> function) { Assert.notNull(function, "Function must not be null!"); - return template.createFlux(connection -> function.apply(connection.streamCommands())); + return template.doCreateFlux(connection -> function.apply(connection.streamCommands())); } private ByteBuffer rawKey(K key) { diff --git a/src/main/java/org/springframework/data/redis/core/DefaultReactiveValueOperations.java b/src/main/java/org/springframework/data/redis/core/DefaultReactiveValueOperations.java index 31c8f0849f..a64e501163 100644 --- a/src/main/java/org/springframework/data/redis/core/DefaultReactiveValueOperations.java +++ b/src/main/java/org/springframework/data/redis/core/DefaultReactiveValueOperations.java @@ -244,7 +244,7 @@ public Mono increment(K key) { Assert.notNull(key, "Key must not be null!"); - return template.createMono(connection -> connection.numberCommands().incr(rawKey(key))); + return template.doCreateMono(connection -> connection.numberCommands().incr(rawKey(key))); } /* (non-Javadoc) @@ -255,7 +255,7 @@ public Mono increment(K key, long delta) { Assert.notNull(key, "Key must not be null!"); - return template.createMono(connection -> connection.numberCommands().incrBy(rawKey(key), delta)); + return template.doCreateMono(connection -> connection.numberCommands().incrBy(rawKey(key), delta)); } /* (non-Javadoc) @@ -266,7 +266,7 @@ public Mono increment(K key, double delta) { Assert.notNull(key, "Key must not be null!"); - return template.createMono(connection -> connection.numberCommands().incrBy(rawKey(key), delta)); + return template.doCreateMono(connection -> connection.numberCommands().incrBy(rawKey(key), delta)); } /* (non-Javadoc) @@ -277,7 +277,7 @@ public Mono decrement(K key) { Assert.notNull(key, "Key must not be null!"); - return template.createMono(connection -> connection.numberCommands().decr(rawKey(key))); + return template.doCreateMono(connection -> connection.numberCommands().decr(rawKey(key))); } /* (non-Javadoc) @@ -288,7 +288,7 @@ public Mono decrement(K key, long delta) { Assert.notNull(key, "Key must not be null!"); - return template.createMono(connection -> connection.numberCommands().decrBy(rawKey(key), delta)); + return template.doCreateMono(connection -> connection.numberCommands().decrBy(rawKey(key), delta)); } /* (non-Javadoc) @@ -380,14 +380,14 @@ public Mono delete(K key) { Assert.notNull(key, "Key must not be null!"); - return template.createMono(connection -> connection.keyCommands().del(rawKey(key))).map(l -> l != 0); + return template.doCreateMono(connection -> connection.keyCommands().del(rawKey(key))).map(l -> l != 0); } private Mono createMono(Function> function) { Assert.notNull(function, "Function must not be null!"); - return template.createMono(connection -> function.apply(connection.stringCommands())); + return template.doCreateMono(connection -> function.apply(connection.stringCommands())); } private ByteBuffer rawKey(K key) { diff --git a/src/main/java/org/springframework/data/redis/core/DefaultReactiveZSetOperations.java b/src/main/java/org/springframework/data/redis/core/DefaultReactiveZSetOperations.java index 7b57c80ead..ced2d5afaf 100644 --- a/src/main/java/org/springframework/data/redis/core/DefaultReactiveZSetOperations.java +++ b/src/main/java/org/springframework/data/redis/core/DefaultReactiveZSetOperations.java @@ -120,7 +120,7 @@ public Mono incrementScore(K key, V value, double delta) { return createMono(connection -> connection.zIncrBy(rawKey(key), delta, rawValue(value))); } - /* + /* * (non-Javadoc) * @see org.springframework.data.redis.core.ReactiveZSetOperations#randomMember(K) */ @@ -132,7 +132,7 @@ public Mono randomMember(K key) { return createMono(connection -> connection.zRandMember(rawKey(key))).map(this::readValue); } - /* + /* * (non-Javadoc) * @see org.springframework.data.redis.core.ReactiveZSetOperations#distinctRandomMembers(K, long) */ @@ -145,7 +145,7 @@ public Flux distinctRandomMembers(K key, long count) { return createFlux(connection -> connection.zRandMember(rawKey(key), count)).map(this::readValue); } - /* + /* * (non-Javadoc) * @see org.springframework.data.redis.core.ReactiveZSetOperations#randomMembers(K, long) */ @@ -158,7 +158,7 @@ public Flux randomMembers(K key, long count) { return createFlux(connection -> connection.zRandMember(rawKey(key), -count)).map(this::readValue); } - /* + /* * (non-Javadoc) * @see org.springframework.data.redis.core.ReactiveZSetOperations#randomMemberWithScore(K) */ @@ -170,7 +170,7 @@ public Mono> randomMemberWithScore(K key) { return createMono(connection -> connection.zRandMemberWithScore(rawKey(key))).map(this::readTypedTuple); } - /* + /* * (non-Javadoc) * @see org.springframework.data.redis.core.ReactiveZSetOperations#distinctRandomMembersWithScore(K, long) */ @@ -183,7 +183,7 @@ public Flux> distinctRandomMembersWithScore(K key, long count) { return createFlux(connection -> connection.zRandMemberWithScore(rawKey(key), count)).map(this::readTypedTuple); } - /* + /* * (non-Javadoc) * @see org.springframework.data.redis.core.ReactiveZSetOperations#randomMembersWithScore(K, long) */ @@ -576,7 +576,7 @@ public Mono removeRangeByScore(K key, Range range) { return createMono(connection -> connection.zRemRangeByScore(rawKey(key), range)); } - /* + /* * (non-Javadoc) * @see org.springframework.data.redis.core.ReactiveZSetOperations#difference(K, Collection) */ @@ -592,7 +592,7 @@ public Flux difference(K key, Collection otherKeys) { .flatMapMany(connection::zDiff).map(this::readValue)); } - /* + /* * (non-Javadoc) * @see org.springframework.data.redis.core.ReactiveZSetOperations#differenceWithScores(K, Collection) */ @@ -608,7 +608,7 @@ public Flux> differenceWithScores(K key, Collection otherKeys) .flatMapMany(connection::zDiffWithScores).map(this::readTypedTuple)); } - /* + /* * (non-Javadoc) * @see org.springframework.data.redis.core.ReactiveZSetOperations#differenceAndStore(K, Collection, K) */ @@ -626,7 +626,7 @@ public Mono differenceAndStore(K key, Collection otherKeys, K destKey) } - /* + /* * (non-Javadoc) * @see org.springframework.data.redis.core.ReactiveZSetOperations#intersect(K, Collection) */ @@ -642,7 +642,7 @@ public Flux intersect(K key, Collection otherKeys) { .flatMapMany(connection::zInter).map(this::readValue)); } - /* + /* * (non-Javadoc) * @see org.springframework.data.redis.core.ReactiveZSetOperations#intersectWithScores(K, Collection) */ @@ -658,7 +658,7 @@ public Flux> intersectWithScores(K key, Collection otherKeys) { .flatMapMany(connection::zInterWithScores).map(this::readTypedTuple)); } - /* + /* * (non-Javadoc) * @see org.springframework.data.redis.core.ReactiveZSetOperations#intersectWithScores(K, Collection, Aggregate, Weights) */ @@ -714,7 +714,7 @@ public Mono intersectAndStore(K key, Collection otherKeys, K destKey, A .flatMap(serialized -> connection.zInterStore(rawKey(destKey), serialized, weights, aggregate))); } - /* + /* * (non-Javadoc) * @see org.springframework.data.redis.core.ReactiveZSetOperations#union(K, Collection) */ @@ -730,7 +730,7 @@ public Flux union(K key, Collection otherKeys) { .flatMapMany(connection::zUnion).map(this::readValue)); } - /* + /* * (non-Javadoc) * @see org.springframework.data.redis.core.ReactiveZSetOperations#unionWithScores(K, Collection) */ @@ -746,7 +746,7 @@ public Flux> unionWithScores(K key, Collection otherKeys) { .flatMapMany(connection::zUnionWithScores).map(this::readTypedTuple)); } - /* + /* * (non-Javadoc) * @see org.springframework.data.redis.core.ReactiveZSetOperations#unionWithScores(K, Collection, Aggregate, Weights) */ @@ -877,21 +877,21 @@ public Mono delete(K key) { Assert.notNull(key, "Key must not be null!"); - return template.createMono(connection -> connection.keyCommands().del(rawKey(key))).map(l -> l != 0); + return template.doCreateMono(connection -> connection.keyCommands().del(rawKey(key))).map(l -> l != 0); } private Mono createMono(Function> function) { Assert.notNull(function, "Function must not be null!"); - return template.createMono(connection -> function.apply(connection.zSetCommands())); + return template.doCreateMono(connection -> function.apply(connection.zSetCommands())); } private Flux createFlux(Function> function) { Assert.notNull(function, "Function must not be null!"); - return template.createFlux(connection -> function.apply(connection.zSetCommands())); + return template.doCreateFlux(connection -> function.apply(connection.zSetCommands())); } private ByteBuffer rawKey(K key) { diff --git a/src/main/java/org/springframework/data/redis/core/DefaultSetOperations.java b/src/main/java/org/springframework/data/redis/core/DefaultSetOperations.java index bf3e37a694..d4f7d00b41 100644 --- a/src/main/java/org/springframework/data/redis/core/DefaultSetOperations.java +++ b/src/main/java/org/springframework/data/redis/core/DefaultSetOperations.java @@ -49,7 +49,7 @@ public Long add(K key, V... values) { byte[] rawKey = rawKey(key); byte[][] rawValues = rawValues((Object[]) values); - return execute(connection -> connection.sAdd(rawKey, rawValues), true); + return execute(connection -> connection.sAdd(rawKey, rawValues)); } /* @@ -69,7 +69,7 @@ public Set difference(K key, K otherKey) { public Set difference(K key, Collection otherKeys) { byte[][] rawKeys = rawKeys(key, otherKeys); - Set rawValues = execute(connection -> connection.sDiff(rawKeys), true); + Set rawValues = execute(connection -> connection.sDiff(rawKeys)); return deserializeValues(rawValues); } @@ -82,7 +82,7 @@ public Set difference(K key, Collection otherKeys) { public Set difference(Collection keys) { byte[][] rawKeys = rawKeys(keys); - Set rawValues = execute(connection -> connection.sDiff(rawKeys), true); + Set rawValues = execute(connection -> connection.sDiff(rawKeys)); return deserializeValues(rawValues); } @@ -106,7 +106,7 @@ public Long differenceAndStore(K key, Collection otherKeys, K destKey) { byte[][] rawKeys = rawKeys(key, otherKeys); byte[] rawDestKey = rawKey(destKey); - return execute(connection -> connection.sDiffStore(rawDestKey, rawKeys), true); + return execute(connection -> connection.sDiffStore(rawDestKey, rawKeys)); } /* @@ -119,7 +119,7 @@ public Long differenceAndStore(Collection keys, K destKey) { byte[][] rawKeys = rawKeys(keys); byte[] rawDestKey = rawKey(destKey); - return execute(connection -> connection.sDiffStore(rawDestKey, rawKeys), true); + return execute(connection -> connection.sDiffStore(rawDestKey, rawKeys)); } /* @@ -139,7 +139,7 @@ public Set intersect(K key, K otherKey) { public Set intersect(K key, Collection otherKeys) { byte[][] rawKeys = rawKeys(key, otherKeys); - Set rawValues = execute(connection -> connection.sInter(rawKeys), true); + Set rawValues = execute(connection -> connection.sInter(rawKeys)); return deserializeValues(rawValues); } @@ -152,7 +152,7 @@ public Set intersect(K key, Collection otherKeys) { public Set intersect(Collection keys) { byte[][] rawKeys = rawKeys(keys); - Set rawValues = execute(connection -> connection.sInter(rawKeys), true); + Set rawValues = execute(connection -> connection.sInter(rawKeys)); return deserializeValues(rawValues); } @@ -176,7 +176,7 @@ public Long intersectAndStore(K key, Collection otherKeys, K destKey) { byte[][] rawKeys = rawKeys(key, otherKeys); byte[] rawDestKey = rawKey(destKey); - return execute(connection -> connection.sInterStore(rawDestKey, rawKeys), true); + return execute(connection -> connection.sInterStore(rawDestKey, rawKeys)); } /* @@ -189,7 +189,7 @@ public Long intersectAndStore(Collection keys, K destKey) { byte[][] rawKeys = rawKeys(keys); byte[] rawDestKey = rawKey(destKey); - return execute(connection -> connection.sInterStore(rawDestKey, rawKeys), true); + return execute(connection -> connection.sInterStore(rawDestKey, rawKeys)); } /* @@ -202,7 +202,7 @@ public Boolean isMember(K key, Object o) { byte[] rawKey = rawKey(key); byte[] rawValue = rawValue(o); - return execute(connection -> connection.sIsMember(rawKey, rawValue), true); + return execute(connection -> connection.sIsMember(rawKey, rawValue)); } /* @@ -230,7 +230,7 @@ public Map isMember(K key, Object... objects) { } return isMember; - }, true); + }); } /* @@ -241,7 +241,7 @@ public Map isMember(K key, Object... objects) { public Set members(K key) { byte[] rawKey = rawKey(key); - Set rawValues = execute(connection -> connection.sMembers(rawKey), true); + Set rawValues = execute(connection -> connection.sMembers(rawKey)); return deserializeValues(rawValues); } @@ -257,7 +257,7 @@ public Boolean move(K key, V value, K destKey) { byte[] rawDestKey = rawKey(destKey); byte[] rawValue = rawValue(value); - return execute(connection -> connection.sMove(rawKey, rawDestKey, rawValue), true); + return execute(connection -> connection.sMove(rawKey, rawDestKey, rawValue)); } /* @@ -273,7 +273,7 @@ public V randomMember(K key) { protected byte[] inRedis(byte[] rawKey, RedisConnection connection) { return connection.sRandMember(rawKey); } - }, true); + }); } /* @@ -287,7 +287,7 @@ public Set distinctRandomMembers(K key, long count) { byte[] rawKey = rawKey(key); Set rawValues = execute( - (RedisCallback>) connection -> new HashSet<>(connection.sRandMember(rawKey, count)), true); + (RedisCallback>) connection -> new HashSet<>(connection.sRandMember(rawKey, count))); return deserializeValues(rawValues); } @@ -303,7 +303,7 @@ public List randomMembers(K key, long count) { "Use a positive number for count. " + "This method is already allowing duplicate elements."); byte[] rawKey = rawKey(key); - List rawValues = execute(connection -> connection.sRandMember(rawKey, -count), true); + List rawValues = execute(connection -> connection.sRandMember(rawKey, -count)); return deserializeValues(rawValues); } @@ -317,7 +317,7 @@ public Long remove(K key, Object... values) { byte[] rawKey = rawKey(key); byte[][] rawValues = rawValues(values); - return execute(connection -> connection.sRem(rawKey, rawValues), true); + return execute(connection -> connection.sRem(rawKey, rawValues)); } /* @@ -333,7 +333,7 @@ public V pop(K key) { protected byte[] inRedis(byte[] rawKey, RedisConnection connection) { return connection.sPop(rawKey); } - }, true); + }); } /* @@ -344,7 +344,7 @@ protected byte[] inRedis(byte[] rawKey, RedisConnection connection) { public List pop(K key, long count) { byte[] rawKey = rawKey(key); - List rawValues = execute(connection -> connection.sPop(rawKey, count), true); + List rawValues = execute(connection -> connection.sPop(rawKey, count)); return deserializeValues(rawValues); } @@ -357,7 +357,7 @@ public List pop(K key, long count) { public Long size(K key) { byte[] rawKey = rawKey(key); - return execute(connection -> connection.sCard(rawKey), true); + return execute(connection -> connection.sCard(rawKey)); } /* @@ -377,7 +377,7 @@ public Set union(K key, K otherKey) { public Set union(K key, Collection otherKeys) { byte[][] rawKeys = rawKeys(key, otherKeys); - Set rawValues = execute(connection -> connection.sUnion(rawKeys), true); + Set rawValues = execute(connection -> connection.sUnion(rawKeys)); return deserializeValues(rawValues); } @@ -390,7 +390,7 @@ public Set union(K key, Collection otherKeys) { public Set union(Collection keys) { byte[][] rawKeys = rawKeys(keys); - Set rawValues = execute(connection -> connection.sUnion(rawKeys), true); + Set rawValues = execute(connection -> connection.sUnion(rawKeys)); return deserializeValues(rawValues); } @@ -414,7 +414,7 @@ public Long unionAndStore(K key, Collection otherKeys, K destKey) { byte[][] rawKeys = rawKeys(key, otherKeys); byte[] rawDestKey = rawKey(destKey); - return execute(connection -> connection.sUnionStore(rawDestKey, rawKeys), true); + return execute(connection -> connection.sUnionStore(rawDestKey, rawKeys)); } /* @@ -427,7 +427,7 @@ public Long unionAndStore(Collection keys, K destKey) { byte[][] rawKeys = rawKeys(keys); byte[] rawDestKey = rawKey(destKey); - return execute(connection -> connection.sUnionStore(rawDestKey, rawKeys), true); + return execute(connection -> connection.sUnionStore(rawDestKey, rawKeys)); } /* diff --git a/src/main/java/org/springframework/data/redis/core/DefaultStreamOperations.java b/src/main/java/org/springframework/data/redis/core/DefaultStreamOperations.java index a71cfb3383..d03baa7268 100644 --- a/src/main/java/org/springframework/data/redis/core/DefaultStreamOperations.java +++ b/src/main/java/org/springframework/data/redis/core/DefaultStreamOperations.java @@ -119,7 +119,7 @@ public Object fromHash(Map hash) { public Long acknowledge(K key, String group, String... recordIds) { byte[] rawKey = rawKey(key); - return execute(connection -> connection.xAck(rawKey, group, recordIds), true); + return execute(connection -> connection.xAck(rawKey, group, recordIds)); } /* @@ -137,7 +137,7 @@ public RecordId add(Record record) { ByteRecord binaryRecord = input.serialize(keySerializer(), hashKeySerializer(), hashValueSerializer()); - return execute(connection -> connection.xAdd(binaryRecord), true); + return execute(connection -> connection.xAdd(binaryRecord)); } /* @@ -148,7 +148,7 @@ public RecordId add(Record record) { public Long delete(K key, RecordId... recordIds) { byte[] rawKey = rawKey(key); - return execute(connection -> connection.xDel(rawKey, recordIds), true); + return execute(connection -> connection.xDel(rawKey, recordIds)); } /* @@ -159,7 +159,7 @@ public Long delete(K key, RecordId... recordIds) { public String createGroup(K key, ReadOffset readOffset, String group) { byte[] rawKey = rawKey(key); - return execute(connection -> connection.xGroupCreate(rawKey, group, readOffset, true), true); + return execute(connection -> connection.xGroupCreate(rawKey, group, readOffset, true)); } /* @@ -170,7 +170,7 @@ public String createGroup(K key, ReadOffset readOffset, String group) { public Boolean deleteConsumer(K key, Consumer consumer) { byte[] rawKey = rawKey(key); - return execute(connection -> connection.xGroupDelConsumer(rawKey, consumer), true); + return execute(connection -> connection.xGroupDelConsumer(rawKey, consumer)); } /* @@ -181,7 +181,7 @@ public Boolean deleteConsumer(K key, Consumer consumer) { public Boolean destroyGroup(K key, String group) { byte[] rawKey = rawKey(key); - return execute(connection -> connection.xGroupDestroy(rawKey, group), true); + return execute(connection -> connection.xGroupDestroy(rawKey, group)); } /* @@ -192,7 +192,7 @@ public Boolean destroyGroup(K key, String group) { public XInfoStream info(K key) { byte[] rawKey = rawKey(key); - return execute(connection -> connection.xInfo(rawKey), true); + return execute(connection -> connection.xInfo(rawKey)); } /* @@ -203,7 +203,7 @@ public XInfoStream info(K key) { public XInfoConsumers consumers(K key, String group) { byte[] rawKey = rawKey(key); - return execute(connection -> connection.xInfoConsumers(rawKey, group), true); + return execute(connection -> connection.xInfoConsumers(rawKey, group)); } /* @@ -214,7 +214,7 @@ public XInfoConsumers consumers(K key, String group) { public XInfoGroups groups(K key) { byte[] rawKey = rawKey(key); - return execute(connection -> connection.xInfoGroups(rawKey), true); + return execute(connection -> connection.xInfoGroups(rawKey)); } /* @@ -225,7 +225,7 @@ public XInfoGroups groups(K key) { public PendingMessages pending(K key, String group, Range range, long count) { byte[] rawKey = rawKey(key); - return execute(connection -> connection.xPending(rawKey, group, range, count), true); + return execute(connection -> connection.xPending(rawKey, group, range, count)); } /* @@ -236,7 +236,7 @@ public PendingMessages pending(K key, String group, Range range, long count) public PendingMessages pending(K key, Consumer consumer, Range range, long count) { byte[] rawKey = rawKey(key); - return execute(connection -> connection.xPending(rawKey, consumer, range, count), true); + return execute(connection -> connection.xPending(rawKey, consumer, range, count)); } /* @@ -247,7 +247,7 @@ public PendingMessages pending(K key, Consumer consumer, Range range, long co public PendingMessagesSummary pending(K key, String group) { byte[] rawKey = rawKey(key); - return execute(connection -> connection.xPending(rawKey, group), true); + return execute(connection -> connection.xPending(rawKey, group)); } /* @@ -258,7 +258,7 @@ public PendingMessagesSummary pending(K key, String group) { public Long size(K key) { byte[] rawKey = rawKey(key); - return execute(connection -> connection.xLen(rawKey), true); + return execute(connection -> connection.xLen(rawKey)); } /* @@ -275,7 +275,7 @@ public List> range(K key, Range range, Limit limit) List inRedis(RedisConnection connection) { return connection.xRange(rawKey(key), range, limit); } - }, true); + }); } /* @@ -292,7 +292,7 @@ public List> read(StreamReadOptions readOptions, StreamOffs List inRedis(RedisConnection connection) { return connection.xRead(readOptions, rawStreamOffsets(streams)); } - }, true); + }); } /* @@ -309,7 +309,7 @@ public List> read(Consumer consumer, StreamReadOptions read List inRedis(RedisConnection connection) { return connection.xReadGroup(consumer, readOptions, rawStreamOffsets(streams)); } - }, true); + }); } /* @@ -326,20 +326,20 @@ public List> reverseRange(K key, Range range, Limit List inRedis(RedisConnection connection) { return connection.xRevRange(rawKey(key), range, limit); } - }, true); + }); } @Override public Long trim(K key, long count) { byte[] rawKey = rawKey(key); - return execute(connection -> connection.xTrim(rawKey, count), true); + return execute(connection -> connection.xTrim(rawKey, count)); } @Override public Long trim(K key, long count, boolean approximateTrimming) { byte[] rawKey = rawKey(key); - return execute(connection -> connection.xTrim(rawKey, count, approximateTrimming), true); + return execute(connection -> connection.xTrim(rawKey, count, approximateTrimming)); } @Override diff --git a/src/main/java/org/springframework/data/redis/core/DefaultValueOperations.java b/src/main/java/org/springframework/data/redis/core/DefaultValueOperations.java index 1a9eeb03be..fb29c59b96 100644 --- a/src/main/java/org/springframework/data/redis/core/DefaultValueOperations.java +++ b/src/main/java/org/springframework/data/redis/core/DefaultValueOperations.java @@ -57,7 +57,7 @@ public V get(Object key) { protected byte[] inRedis(byte[] rawKey, RedisConnection connection) { return connection.get(rawKey); } - }, true); + }); } /* @@ -74,7 +74,7 @@ public V getAndDelete(K key) { protected byte[] inRedis(byte[] rawKey, RedisConnection connection) { return connection.getDel(rawKey); } - }, true); + }); } /* @@ -91,7 +91,7 @@ public V getAndExpire(K key, long timeout, TimeUnit unit) { protected byte[] inRedis(byte[] rawKey, RedisConnection connection) { return connection.getEx(rawKey, Expiration.from(timeout, unit)); } - }, true); + }); } /* @@ -108,7 +108,7 @@ public V getAndExpire(K key, Duration timeout) { protected byte[] inRedis(byte[] rawKey, RedisConnection connection) { return connection.getEx(rawKey, Expiration.from(timeout)); } - }, true); + }); } /* @@ -125,7 +125,7 @@ public V getAndPersist(K key) { protected byte[] inRedis(byte[] rawKey, RedisConnection connection) { return connection.getEx(rawKey, Expiration.persistent()); } - }, true); + }); } /* @@ -142,7 +142,7 @@ public V getAndSet(K key, V newValue) { protected byte[] inRedis(byte[] rawKey, RedisConnection connection) { return connection.getSet(rawKey, rawValue); } - }, true); + }); } /* @@ -153,7 +153,7 @@ protected byte[] inRedis(byte[] rawKey, RedisConnection connection) { public Long increment(K key) { byte[] rawKey = rawKey(key); - return execute(connection -> connection.incr(rawKey), true); + return execute(connection -> connection.incr(rawKey)); } /* @@ -164,7 +164,7 @@ public Long increment(K key) { public Long increment(K key, long delta) { byte[] rawKey = rawKey(key); - return execute(connection -> connection.incrBy(rawKey, delta), true); + return execute(connection -> connection.incrBy(rawKey, delta)); } /* @@ -175,7 +175,7 @@ public Long increment(K key, long delta) { public Double increment(K key, double delta) { byte[] rawKey = rawKey(key); - return execute(connection -> connection.incrBy(rawKey, delta), true); + return execute(connection -> connection.incrBy(rawKey, delta)); } /* @@ -186,7 +186,7 @@ public Double increment(K key, double delta) { public Long decrement(K key) { byte[] rawKey = rawKey(key); - return execute(connection -> connection.decr(rawKey), true); + return execute(connection -> connection.decr(rawKey)); } /* @@ -197,7 +197,7 @@ public Long decrement(K key) { public Long decrement(K key, long delta) { byte[] rawKey = rawKey(key); - return execute(connection -> connection.decrBy(rawKey, delta), true); + return execute(connection -> connection.decrBy(rawKey, delta)); } /* @@ -213,7 +213,7 @@ public Integer append(K key, String value) { return execute(connection -> { Long result = connection.append(rawKey, rawString); return (result != null) ? result.intValue() : null; - }, true); + }); } /* @@ -223,7 +223,7 @@ public Integer append(K key, String value) { @Override public String get(K key, long start, long end) { byte[] rawKey = rawKey(key); - byte[] rawReturn = execute(connection -> connection.getRange(rawKey, start, end), true); + byte[] rawReturn = execute(connection -> connection.getRange(rawKey, start, end)); return deserializeString(rawReturn); } @@ -246,7 +246,7 @@ public List multiGet(Collection keys) { rawKeys[counter++] = rawKey(hashKey); } - List rawValues = execute(connection -> connection.mGet(rawKeys), true); + List rawValues = execute(connection -> connection.mGet(rawKeys)); return deserializeValues(rawValues); } @@ -271,7 +271,7 @@ public void multiSet(Map m) { execute(connection -> { connection.mSet(rawKeys); return null; - }, true); + }); } /* @@ -291,7 +291,7 @@ public Boolean multiSetIfAbsent(Map m) { rawKeys.put(rawKey(entry.getKey()), rawValue(entry.getValue())); } - return execute(connection -> connection.mSetNX(rawKeys), true); + return execute(connection -> connection.mSetNX(rawKeys)); } /* @@ -309,7 +309,7 @@ protected byte[] inRedis(byte[] rawKey, RedisConnection connection) { connection.set(rawKey, rawValue); return null; } - }, true); + }); } /* @@ -350,7 +350,7 @@ private boolean failsafeInvokePsetEx(RedisConnection connection) { return !failed; } - }, true); + }); } /* @@ -362,7 +362,7 @@ public Boolean setIfAbsent(K key, V value) { byte[] rawKey = rawKey(key); byte[] rawValue = rawValue(value); - return execute(connection -> connection.setNX(rawKey, rawValue), true); + return execute(connection -> connection.setNX(rawKey, rawValue)); } /* @@ -376,7 +376,7 @@ public Boolean setIfAbsent(K key, V value, long timeout, TimeUnit unit) { byte[] rawValue = rawValue(value); Expiration expiration = Expiration.from(timeout, unit); - return execute(connection -> connection.set(rawKey, rawValue, expiration, SetOption.ifAbsent()), true); + return execute(connection -> connection.set(rawKey, rawValue, expiration, SetOption.ifAbsent())); } /* @@ -390,7 +390,7 @@ public Boolean setIfPresent(K key, V value) { byte[] rawKey = rawKey(key); byte[] rawValue = rawValue(value); - return execute(connection -> connection.set(rawKey, rawValue, Expiration.persistent(), SetOption.ifPresent()), true); + return execute(connection -> connection.set(rawKey, rawValue, Expiration.persistent(), SetOption.ifPresent())); } /* @@ -405,7 +405,7 @@ public Boolean setIfPresent(K key, V value, long timeout, TimeUnit unit) { byte[] rawValue = rawValue(value); Expiration expiration = Expiration.from(timeout, unit); - return execute(connection -> connection.set(rawKey, rawValue, expiration, SetOption.ifPresent()), true); + return execute(connection -> connection.set(rawKey, rawValue, expiration, SetOption.ifPresent())); } /* @@ -421,7 +421,7 @@ public void set(K key, V value, long offset) { execute(connection -> { connection.setRange(rawKey, rawValue, offset); return null; - }, true); + }); } /* @@ -432,7 +432,7 @@ public void set(K key, V value, long offset) { public Long size(K key) { byte[] rawKey = rawKey(key); - return execute(connection -> connection.strLen(rawKey), true); + return execute(connection -> connection.strLen(rawKey)); } /* @@ -443,7 +443,7 @@ public Long size(K key) { public Boolean setBit(K key, long offset, boolean value) { byte[] rawKey = rawKey(key); - return execute(connection -> connection.setBit(rawKey, offset, value), true); + return execute(connection -> connection.setBit(rawKey, offset, value)); } /* @@ -454,7 +454,7 @@ public Boolean setBit(K key, long offset, boolean value) { public Boolean getBit(K key, long offset) { byte[] rawKey = rawKey(key); - return execute(connection -> connection.getBit(rawKey, offset), true); + return execute(connection -> connection.getBit(rawKey, offset)); } /* @@ -465,6 +465,6 @@ public Boolean getBit(K key, long offset) { public List bitField(K key, final BitFieldSubCommands subCommands) { byte[] rawKey = rawKey(key); - return execute(connection -> connection.bitField(rawKey, subCommands), true); + return execute(connection -> connection.bitField(rawKey, subCommands)); } } diff --git a/src/main/java/org/springframework/data/redis/core/DefaultZSetOperations.java b/src/main/java/org/springframework/data/redis/core/DefaultZSetOperations.java index 4c4e588486..196500c610 100644 --- a/src/main/java/org/springframework/data/redis/core/DefaultZSetOperations.java +++ b/src/main/java/org/springframework/data/redis/core/DefaultZSetOperations.java @@ -57,7 +57,7 @@ public Boolean add(K key, V value, double score) { byte[] rawKey = rawKey(key); byte[] rawValue = rawValue(value); - return execute(connection -> connection.zAdd(rawKey, score, rawValue), true); + return execute(connection -> connection.zAdd(rawKey, score, rawValue)); } /* @@ -81,7 +81,7 @@ protected Boolean add(K key, V value, double score, ZAddArgs args) { byte[] rawKey = rawKey(key); byte[] rawValue = rawValue(value); - return execute(connection -> connection.zAdd(rawKey, score, rawValue, args), true); + return execute(connection -> connection.zAdd(rawKey, score, rawValue, args)); } /* @@ -93,7 +93,7 @@ public Long add(K key, Set> tuples) { byte[] rawKey = rawKey(key); Set rawValues = rawTupleValues(tuples); - return execute(connection -> connection.zAdd(rawKey, rawValues), true); + return execute(connection -> connection.zAdd(rawKey, rawValues)); } /* @@ -117,7 +117,7 @@ protected Long add(K key, Set> tuples, ZAddArgs args) { byte[] rawKey = rawKey(key); Set rawValues = rawTupleValues(tuples); - return execute(connection -> connection.zAdd(rawKey, rawValues, args), true); + return execute(connection -> connection.zAdd(rawKey, rawValues, args)); } /* @@ -129,7 +129,7 @@ public Double incrementScore(K key, V value, double delta) { byte[] rawKey = rawKey(key); byte[] rawValue = rawValue(value); - return execute(connection -> connection.zIncrBy(rawKey, delta, rawValue), true); + return execute(connection -> connection.zIncrBy(rawKey, delta, rawValue)); } /* @@ -141,7 +141,7 @@ public V randomMember(K key) { byte[] rawKey = rawKey(key); - return deserializeValue(execute(connection -> connection.zRandMember(rawKey), true)); + return deserializeValue(execute(connection -> connection.zRandMember(rawKey))); } /* @@ -155,7 +155,7 @@ public Set distinctRandomMembers(K key, long count) { byte[] rawKey = rawKey(key); - List result = execute(connection -> connection.zRandMember(rawKey, count), true); + List result = execute(connection -> connection.zRandMember(rawKey, count)); return result != null ? deserializeValues(new LinkedHashSet<>(result)) : null; } @@ -170,7 +170,7 @@ public List randomMembers(K key, long count) { byte[] rawKey = rawKey(key); - List result = execute(connection -> connection.zRandMember(rawKey, count), true); + List result = execute(connection -> connection.zRandMember(rawKey, count)); return deserializeValues(result); } @@ -183,7 +183,7 @@ public TypedTuple randomMemberWithScore(K key) { byte[] rawKey = rawKey(key); - return deserializeTuple(execute(connection -> connection.zRandMemberWithScore(rawKey), true)); + return deserializeTuple(execute(connection -> connection.zRandMemberWithScore(rawKey))); } /* @@ -197,7 +197,7 @@ public Set> distinctRandomMembersWithScore(K key, long count) { byte[] rawKey = rawKey(key); - List result = execute(connection -> connection.zRandMemberWithScore(rawKey, count), true); + List result = execute(connection -> connection.zRandMemberWithScore(rawKey, count)); return result != null ? deserializeTupleValues(new LinkedHashSet<>(result)) : null; } @@ -212,7 +212,7 @@ public List> randomMembersWithScore(K key, long count) { byte[] rawKey = rawKey(key); - List result = execute(connection -> connection.zRandMemberWithScore(rawKey, count), true); + List result = execute(connection -> connection.zRandMemberWithScore(rawKey, count)); return result != null ? deserializeTupleValues(result) : null; } @@ -224,7 +224,7 @@ public List> randomMembersWithScore(K key, long count) { public Set range(K key, long start, long end) { byte[] rawKey = rawKey(key); - Set rawValues = execute(connection -> connection.zRange(rawKey, start, end), true); + Set rawValues = execute(connection -> connection.zRange(rawKey, start, end)); return deserializeValues(rawValues); } @@ -237,7 +237,7 @@ public Set range(K key, long start, long end) { public Set reverseRange(K key, long start, long end) { byte[] rawKey = rawKey(key); - Set rawValues = execute(connection -> connection.zRevRange(rawKey, start, end), true); + Set rawValues = execute(connection -> connection.zRevRange(rawKey, start, end)); return deserializeValues(rawValues); } @@ -250,7 +250,7 @@ public Set reverseRange(K key, long start, long end) { public Set> rangeWithScores(K key, long start, long end) { byte[] rawKey = rawKey(key); - Set rawValues = execute(connection -> connection.zRangeWithScores(rawKey, start, end), true); + Set rawValues = execute(connection -> connection.zRangeWithScores(rawKey, start, end)); return deserializeTupleValues(rawValues); } @@ -263,7 +263,7 @@ public Set> rangeWithScores(K key, long start, long end) { public Set> reverseRangeWithScores(K key, long start, long end) { byte[] rawKey = rawKey(key); - Set rawValues = execute(connection -> connection.zRevRangeWithScores(rawKey, start, end), true); + Set rawValues = execute(connection -> connection.zRevRangeWithScores(rawKey, start, end)); return deserializeTupleValues(rawValues); } @@ -276,7 +276,7 @@ public Set> reverseRangeWithScores(K key, long start, long end) { public Set rangeByLex(K key, Range range, Limit limit) { byte[] rawKey = rawKey(key); - Set rawValues = execute(connection -> connection.zRangeByLex(rawKey, range, limit), true); + Set rawValues = execute(connection -> connection.zRangeByLex(rawKey, range, limit)); return deserializeValues(rawValues); } @@ -289,7 +289,7 @@ public Set rangeByLex(K key, Range range, Limit limit) { public Set reverseRangeByLex(K key, Range range, Limit limit) { byte[] rawKey = rawKey(key); - Set rawValues = execute(connection -> connection.zRevRangeByLex(rawKey, range, limit), true); + Set rawValues = execute(connection -> connection.zRevRangeByLex(rawKey, range, limit)); return deserializeValues(rawValues); } @@ -302,7 +302,7 @@ public Set reverseRangeByLex(K key, Range range, Limit limit) { public Set rangeByScore(K key, double min, double max) { byte[] rawKey = rawKey(key); - Set rawValues = execute(connection -> connection.zRangeByScore(rawKey, min, max), true); + Set rawValues = execute(connection -> connection.zRangeByScore(rawKey, min, max)); return deserializeValues(rawValues); } @@ -315,7 +315,7 @@ public Set rangeByScore(K key, double min, double max) { public Set rangeByScore(K key, double min, double max, long offset, long count) { byte[] rawKey = rawKey(key); - Set rawValues = execute(connection -> connection.zRangeByScore(rawKey, min, max, offset, count), true); + Set rawValues = execute(connection -> connection.zRangeByScore(rawKey, min, max, offset, count)); return deserializeValues(rawValues); } @@ -328,7 +328,7 @@ public Set rangeByScore(K key, double min, double max, long offset, long coun public Set reverseRangeByScore(K key, double min, double max) { byte[] rawKey = rawKey(key); - Set rawValues = execute(connection -> connection.zRevRangeByScore(rawKey, min, max), true); + Set rawValues = execute(connection -> connection.zRevRangeByScore(rawKey, min, max)); return deserializeValues(rawValues); } @@ -341,7 +341,7 @@ public Set reverseRangeByScore(K key, double min, double max) { public Set reverseRangeByScore(K key, double min, double max, long offset, long count) { byte[] rawKey = rawKey(key); - Set rawValues = execute(connection -> connection.zRevRangeByScore(rawKey, min, max, offset, count), true); + Set rawValues = execute(connection -> connection.zRevRangeByScore(rawKey, min, max, offset, count)); return deserializeValues(rawValues); } @@ -354,7 +354,7 @@ public Set reverseRangeByScore(K key, double min, double max, long offset, lo public Set> rangeByScoreWithScores(K key, double min, double max) { byte[] rawKey = rawKey(key); - Set rawValues = execute(connection -> connection.zRangeByScoreWithScores(rawKey, min, max), true); + Set rawValues = execute(connection -> connection.zRangeByScoreWithScores(rawKey, min, max)); return deserializeTupleValues(rawValues); } @@ -367,8 +367,7 @@ public Set> rangeByScoreWithScores(K key, double min, double max) public Set> rangeByScoreWithScores(K key, double min, double max, long offset, long count) { byte[] rawKey = rawKey(key); - Set rawValues = execute(connection -> connection.zRangeByScoreWithScores(rawKey, min, max, offset, count), - true); + Set rawValues = execute(connection -> connection.zRangeByScoreWithScores(rawKey, min, max, offset, count)); return deserializeTupleValues(rawValues); } @@ -381,7 +380,7 @@ public Set> rangeByScoreWithScores(K key, double min, double max, public Set> reverseRangeByScoreWithScores(K key, double min, double max) { byte[] rawKey = rawKey(key); - Set rawValues = execute(connection -> connection.zRevRangeByScoreWithScores(rawKey, min, max), true); + Set rawValues = execute(connection -> connection.zRevRangeByScoreWithScores(rawKey, min, max)); return deserializeTupleValues(rawValues); } @@ -394,8 +393,8 @@ public Set> reverseRangeByScoreWithScores(K key, double min, doubl public Set> reverseRangeByScoreWithScores(K key, double min, double max, long offset, long count) { byte[] rawKey = rawKey(key); - Set rawValues = execute(connection -> connection.zRevRangeByScoreWithScores(rawKey, min, max, offset, count), - true); + Set rawValues = execute( + connection -> connection.zRevRangeByScoreWithScores(rawKey, min, max, offset, count)); return deserializeTupleValues(rawValues); } @@ -413,7 +412,7 @@ public Long rank(K key, Object o) { return execute(connection -> { Long zRank = connection.zRank(rawKey, rawValue); return (zRank != null && zRank.longValue() >= 0 ? zRank : null); - }, true); + }); } /* @@ -429,7 +428,7 @@ public Long reverseRank(K key, Object o) { return execute(connection -> { Long zRank = connection.zRevRank(rawKey, rawValue); return (zRank != null && zRank.longValue() >= 0 ? zRank : null); - }, true); + }); } /* @@ -442,7 +441,7 @@ public Long remove(K key, Object... values) { byte[] rawKey = rawKey(key); byte[][] rawValues = rawValues(values); - return execute(connection -> connection.zRem(rawKey, rawValues), true); + return execute(connection -> connection.zRem(rawKey, rawValues)); } /* @@ -453,7 +452,7 @@ public Long remove(K key, Object... values) { public Long removeRange(K key, long start, long end) { byte[] rawKey = rawKey(key); - return execute(connection -> connection.zRemRange(rawKey, start, end), true); + return execute(connection -> connection.zRemRange(rawKey, start, end)); } /* @@ -464,7 +463,7 @@ public Long removeRange(K key, long start, long end) { public Long removeRangeByLex(K key, Range range) { byte[] rawKey = rawKey(key); - return execute(connection -> connection.zRemRangeByLex(rawKey, range), true); + return execute(connection -> connection.zRemRangeByLex(rawKey, range)); } /* @@ -475,7 +474,7 @@ public Long removeRangeByLex(K key, Range range) { public Long removeRangeByScore(K key, double min, double max) { byte[] rawKey = rawKey(key); - return execute(connection -> connection.zRemRangeByScore(rawKey, min, max), true); + return execute(connection -> connection.zRemRangeByScore(rawKey, min, max)); } /* @@ -487,7 +486,7 @@ public Double score(K key, Object o) { byte[] rawKey = rawKey(key); byte[] rawValue = rawValue(o); - return execute(connection -> connection.zScore(rawKey, rawValue), true); + return execute(connection -> connection.zScore(rawKey, rawValue)); } /* @@ -499,7 +498,7 @@ public List score(K key, Object... o) { byte[] rawKey = rawKey(key); byte[][] rawValues = rawValues(o); - return execute(connection -> connection.zMScore(rawKey, rawValues), true); + return execute(connection -> connection.zMScore(rawKey, rawValues)); } /* @@ -510,7 +509,7 @@ public List score(K key, Object... o) { public Long count(K key, double min, double max) { byte[] rawKey = rawKey(key); - return execute(connection -> connection.zCount(rawKey, min, max), true); + return execute(connection -> connection.zCount(rawKey, min, max)); } /* @@ -521,7 +520,7 @@ public Long count(K key, double min, double max) { public Long lexCount(K key, Range range) { byte[] rawKey = rawKey(key); - return execute(connection -> connection.zLexCount(rawKey, range), true); + return execute(connection -> connection.zLexCount(rawKey, range)); } /* @@ -533,7 +532,7 @@ public Long lexCount(K key, Range range) { public TypedTuple popMin(K key) { byte[] rawKey = rawKey(key); - return deserializeTuple(execute(connection -> connection.zPopMin(rawKey), true)); + return deserializeTuple(execute(connection -> connection.zPopMin(rawKey))); } /* @@ -545,7 +544,7 @@ public TypedTuple popMin(K key) { public Set> popMin(K key, long count) { byte[] rawKey = rawKey(key); - Set result = execute(connection -> connection.zPopMin(rawKey, count), true); + Set result = execute(connection -> connection.zPopMin(rawKey, count)); return deserializeTupleValues(new LinkedHashSet<> (result)); } @@ -558,7 +557,7 @@ public Set> popMin(K key, long count) { public TypedTuple popMin(K key, long timeout, TimeUnit unit) { byte[] rawKey = rawKey(key); - return deserializeTuple(execute(connection -> connection.bZPopMin(rawKey, timeout, unit), true)); + return deserializeTuple(execute(connection -> connection.bZPopMin(rawKey, timeout, unit))); } /* @@ -570,7 +569,7 @@ public TypedTuple popMin(K key, long timeout, TimeUnit unit) { public TypedTuple popMax(K key) { byte[] rawKey = rawKey(key); - return deserializeTuple(execute(connection -> connection.zPopMax(rawKey), true)); + return deserializeTuple(execute(connection -> connection.zPopMax(rawKey))); } /* @@ -582,7 +581,7 @@ public TypedTuple popMax(K key) { public Set> popMax(K key, long count) { byte[] rawKey = rawKey(key); - Set result =execute(connection -> connection.zPopMax(rawKey, count), true); + Set result = execute(connection -> connection.zPopMax(rawKey, count)); return deserializeTupleValues(new LinkedHashSet<>(result)); } @@ -595,7 +594,7 @@ public Set> popMax(K key, long count) { public TypedTuple popMax(K key, long timeout, TimeUnit unit) { byte[] rawKey = rawKey(key); - return deserializeTuple(execute(connection -> connection.bZPopMax(rawKey, timeout, unit), true)); + return deserializeTuple(execute(connection -> connection.bZPopMax(rawKey, timeout, unit))); } /* @@ -615,7 +614,7 @@ public Long size(K key) { public Long zCard(K key) { byte[] rawKey = rawKey(key); - return execute(connection -> connection.zCard(rawKey), true); + return execute(connection -> connection.zCard(rawKey)); } /* @@ -626,7 +625,7 @@ public Long zCard(K key) { public Set difference(K key, Collection otherKeys) { byte[][] rawKeys = rawKeys(key, otherKeys); - Set rawValues = execute(connection -> connection.zDiff(rawKeys), true); + Set rawValues = execute(connection -> connection.zDiff(rawKeys)); return deserializeValues(rawValues); } @@ -638,7 +637,7 @@ public Set difference(K key, Collection otherKeys) { public Set> differenceWithScores(K key, Collection otherKeys) { byte[][] rawKeys = rawKeys(key, otherKeys); - Set result = execute(connection -> connection.zDiffWithScores(rawKeys), true); + Set result = execute(connection -> connection.zDiffWithScores(rawKeys)); return deserializeTupleValues(new LinkedHashSet<>(result)); } @@ -652,7 +651,7 @@ public Long differenceAndStore(K key, Collection otherKeys, K destKey) { byte[][] rawKeys = rawKeys(key, otherKeys); byte[] rawDestKey = rawKey(destKey); - return execute(connection -> connection.zDiffStore(rawDestKey, rawKeys), true); + return execute(connection -> connection.zDiffStore(rawDestKey, rawKeys)); } /* @@ -663,7 +662,7 @@ public Long differenceAndStore(K key, Collection otherKeys, K destKey) { public Set intersect(K key, Collection otherKeys) { byte[][] rawKeys = rawKeys(key, otherKeys); - Set rawValues = execute(connection -> connection.zInter(rawKeys), true); + Set rawValues = execute(connection -> connection.zInter(rawKeys)); return deserializeValues(rawValues); } @@ -675,7 +674,7 @@ public Set intersect(K key, Collection otherKeys) { public Set> intersectWithScores(K key, Collection otherKeys) { byte[][] rawKeys = rawKeys(key, otherKeys); - Set result = execute(connection -> connection.zInterWithScores(rawKeys), true); + Set result = execute(connection -> connection.zInterWithScores(rawKeys)); return deserializeTupleValues(result); } @@ -687,7 +686,7 @@ public Set> intersectWithScores(K key, Collection otherKeys) { public Set> intersectWithScores(K key, Collection otherKeys, Aggregate aggregate, Weights weights) { byte[][] rawKeys = rawKeys(key, otherKeys); - Set result = execute(connection -> connection.zInterWithScores(aggregate, weights, rawKeys), true); + Set result = execute(connection -> connection.zInterWithScores(aggregate, weights, rawKeys)); return deserializeTupleValues(result); } @@ -710,7 +709,7 @@ public Long intersectAndStore(K key, Collection otherKeys, K destKey) { byte[][] rawKeys = rawKeys(key, otherKeys); byte[] rawDestKey = rawKey(destKey); - return execute(connection -> connection.zInterStore(rawDestKey, rawKeys), true); + return execute(connection -> connection.zInterStore(rawDestKey, rawKeys)); } /* @@ -723,7 +722,7 @@ public Long intersectAndStore(K key, Collection otherKeys, K destKey, Aggrega byte[][] rawKeys = rawKeys(key, otherKeys); byte[] rawDestKey = rawKey(destKey); - return execute(connection -> connection.zInterStore(rawDestKey, aggregate, weights, rawKeys), true); + return execute(connection -> connection.zInterStore(rawDestKey, aggregate, weights, rawKeys)); } /* @@ -734,7 +733,7 @@ public Long intersectAndStore(K key, Collection otherKeys, K destKey, Aggrega public Set union(K key, Collection otherKeys) { byte[][] rawKeys = rawKeys(key, otherKeys); - Set rawValues = execute(connection -> connection.zUnion(rawKeys), true); + Set rawValues = execute(connection -> connection.zUnion(rawKeys)); return deserializeValues(rawValues); } @@ -746,7 +745,7 @@ public Set union(K key, Collection otherKeys) { public Set> unionWithScores(K key, Collection otherKeys) { byte[][] rawKeys = rawKeys(key, otherKeys); - Set result = execute(connection -> connection.zUnionWithScores(rawKeys), true); + Set result = execute(connection -> connection.zUnionWithScores(rawKeys)); return deserializeTupleValues(result); } @@ -758,7 +757,7 @@ public Set> unionWithScores(K key, Collection otherKeys) { public Set> unionWithScores(K key, Collection otherKeys, Aggregate aggregate, Weights weights) { byte[][] rawKeys = rawKeys(key, otherKeys); - Set result = execute(connection -> connection.zUnionWithScores(aggregate, weights, rawKeys), true); + Set result = execute(connection -> connection.zUnionWithScores(aggregate, weights, rawKeys)); return deserializeTupleValues( result); } @@ -782,7 +781,7 @@ public Long unionAndStore(K key, Collection otherKeys, K destKey) { byte[][] rawKeys = rawKeys(key, otherKeys); byte[] rawDestKey = rawKey(destKey); - return execute(connection -> connection.zUnionStore(rawDestKey, rawKeys), true); + return execute(connection -> connection.zUnionStore(rawDestKey, rawKeys)); } /* @@ -795,7 +794,7 @@ public Long unionAndStore(K key, Collection otherKeys, K destKey, Aggregate a byte[][] rawKeys = rawKeys(key, otherKeys); byte[] rawDestKey = rawKey(destKey); - return execute(connection -> connection.zUnionStore(rawDestKey, aggregate, weights, rawKeys), true); + return execute(connection -> connection.zUnionStore(rawDestKey, aggregate, weights, rawKeys)); } /* @@ -814,14 +813,14 @@ public Cursor> scan(K key, ScanOptions options) { public Set rangeByScore(K key, String min, String max) { byte[] rawKey = rawKey(key); - return execute(connection -> connection.zRangeByScore(rawKey, min, max), true); + return execute(connection -> connection.zRangeByScore(rawKey, min, max)); } public Set rangeByScore(K key, String min, String max, long offset, long count) { byte[] rawKey = rawKey(key); - return execute(connection -> connection.zRangeByScore(rawKey, min, max, offset, count), true); + return execute(connection -> connection.zRangeByScore(rawKey, min, max, offset, count)); } } diff --git a/src/main/java/org/springframework/data/redis/core/ReactiveRedisOperations.java b/src/main/java/org/springframework/data/redis/core/ReactiveRedisOperations.java index ad0f204d5e..077746ed34 100644 --- a/src/main/java/org/springframework/data/redis/core/ReactiveRedisOperations.java +++ b/src/main/java/org/springframework/data/redis/core/ReactiveRedisOperations.java @@ -63,6 +63,23 @@ public interface ReactiveRedisOperations { */ Flux execute(ReactiveRedisCallback action); + /** + * Executes the given action within a Redis session using the same + * {@link org.springframework.data.redis.connection.ReactiveRedisConnection}. Application exceptions thrown by the + * action object get propagated to the caller (can only be unchecked) whenever possible. Redis exceptions are + * transformed into appropriate DAO ones. Allows for returning a result object, that is a domain object or a + * collection of domain objects. Performs automatic serialization/deserialization for the given objects to and from + * binary data suitable for the Redis storage. Note: Callback code is not supposed to handle transactions itself! Use + * an appropriate transaction manager. Generally, callback code must not touch any Connection lifecycle methods, like + * close, to let the template do its work. + * + * @param return type + * @param action callback object that specifies the Redis action + * @return a result object returned by the action or {@link Flux#empty()}. + * @since 2.6 + */ + Flux executeInSession(ReactiveRedisSessionCallback action); + // ------------------------------------------------------------------------- // Methods dealing with Redis Pub/Sub // ------------------------------------------------------------------------- diff --git a/src/main/java/org/springframework/data/redis/core/ReactiveRedisSessionCallback.java b/src/main/java/org/springframework/data/redis/core/ReactiveRedisSessionCallback.java new file mode 100644 index 0000000000..4b87ab73a5 --- /dev/null +++ b/src/main/java/org/springframework/data/redis/core/ReactiveRedisSessionCallback.java @@ -0,0 +1,48 @@ +/* + * Copyright 2017-2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.redis.core; + +import org.reactivestreams.Publisher; +import org.springframework.dao.DataAccessException; +import org.springframework.data.redis.connection.ReactiveRedisConnection; + +/** + * Generic callback interface for code that wants to use the same {@link ReactiveRedisConnection} avoiding connection + * allocation overhead upon each Template API method call. Allows to execute any number of operations on a single + * {@link ReactiveRedisConnection}, using any type and number of commands. + *

+ * This is particularly useful for issuing multiple calls on the same connection. + * + * @param + * @author Mark Paluch + * @since 2.6 + * @see ReactiveRedisOperations#executeInSession(ReactiveRedisSessionCallback) + */ +public interface ReactiveRedisSessionCallback { + + /** + * Gets called by {@link ReactiveRedisOperations#executeInSession(ReactiveRedisSessionCallback)} with an active Redis + * connection. Does not need to care about activating or closing the {@link ReactiveRedisConnection}. + *

+ * Allows for returning a result object created within the callback, i.e. a domain object or a collection of domain + * objects. + * + * @param operations template associated with a connection. + * @return a result object publisher + * @throws DataAccessException in case of custom exceptions + */ + Publisher doWithOperations(ReactiveRedisOperations operations) throws DataAccessException; +} diff --git a/src/main/java/org/springframework/data/redis/core/ReactiveRedisTemplate.java b/src/main/java/org/springframework/data/redis/core/ReactiveRedisTemplate.java index fba47e095d..48b5764583 100644 --- a/src/main/java/org/springframework/data/redis/core/ReactiveRedisTemplate.java +++ b/src/main/java/org/springframework/data/redis/core/ReactiveRedisTemplate.java @@ -151,6 +151,14 @@ public Flux execute(ReactiveRedisCallback action, boolean exposeConnec return Flux.from(doInConnection(action, exposeConnection)); } + @Override + public Flux executeInSession(ReactiveRedisSessionCallback action) { + + Assert.notNull(action, "Callback object must not be null"); + return Flux + .from(doInConnection(connection -> action.doWithOperations(withConnection(connection)), exposeConnection)); + } + /** * Create a reusable Flux for a {@link ReactiveRedisCallback}. Callback is executed within a connection context. The * connection is released outside the callback. @@ -165,6 +173,21 @@ public Flux createFlux(ReactiveRedisCallback callback) { return Flux.from(doInConnection(callback, exposeConnection)); } + /** + * Internal variant of {@link #createFlux(ReactiveRedisCallback)} bypassing proxy creation. Create a reusable Flux for + * a {@link ReactiveRedisCallback}. Callback is executed within a connection context. The connection is released + * outside the callback. + * + * @param callback must not be {@literal null} + * @return a {@link Flux} wrapping the {@link ReactiveRedisCallback}. + */ + Flux doCreateFlux(ReactiveRedisCallback callback) { + + Assert.notNull(callback, "ReactiveRedisCallback must not be null!"); + + return Flux.from(doInConnection(callback, true)); + } + /** * Create a reusable Mono for a {@link ReactiveRedisCallback}. Callback is executed within a connection context. The * connection is released outside the callback. @@ -179,6 +202,21 @@ public Mono createMono(ReactiveRedisCallback callback) { return Mono.from(doInConnection(callback, exposeConnection)); } + /** + * Internal variant of {@link #createMono(ReactiveRedisCallback)} bypassing proxy creation. Create a reusable Mono for + * a {@link ReactiveRedisCallback}. Callback is executed within a connection context. The connection is released + * outside the callback. + * + * @param callback must not be {@literal null} + * @return a {@link Mono} wrapping the {@link ReactiveRedisCallback}. + */ + Mono doCreateMono(ReactiveRedisCallback callback) { + + Assert.notNull(callback, "ReactiveRedisCallback must not be null!"); + + return Mono.from(doInConnection(callback, true)); + } + /** * Executes the given action object within a connection that can be exposed or not. Additionally, the connection can * be pipelined. Note the results of the pipeline are discarded (making it suitable for write-only scenarios). @@ -188,7 +226,7 @@ public Mono createMono(ReactiveRedisCallback callback) { * @param exposeConnection whether to enforce exposure of the native Redis Connection to callback code * @return object returned by the action */ - private Publisher doInConnection(ReactiveRedisCallback action, boolean exposeConnection) { + Publisher doInConnection(ReactiveRedisCallback action, boolean exposeConnection) { Assert.notNull(action, "Callback object must not be null"); @@ -217,7 +255,7 @@ public Mono convertAndSend(String destination, V message) { Assert.hasText(destination, "Destination channel must not be empty!"); Assert.notNull(message, "Message must not be null!"); - return createMono(connection -> connection.pubSubCommands().publish( + return doCreateMono(connection -> connection.pubSubCommands().publish( getSerializationContext().getStringSerializationPair().write(destination), getSerializationContext().getValueSerializationPair().write(message))); } @@ -266,7 +304,7 @@ public Mono copy(K sourceKey, K targetKey, boolean replace) { Assert.notNull(sourceKey, "Source key must not be null!"); Assert.notNull(targetKey, "Target key must not be null!"); - return createMono(connection -> connection.keyCommands().copy(rawKey(sourceKey), rawKey(targetKey), replace)); + return doCreateMono(connection -> connection.keyCommands().copy(rawKey(sourceKey), rawKey(targetKey), replace)); } /* @@ -278,7 +316,7 @@ public Mono hasKey(K key) { Assert.notNull(key, "Key must not be null!"); - return createMono(connection -> connection.keyCommands().exists(rawKey(key))); + return doCreateMono(connection -> connection.keyCommands().exists(rawKey(key))); } /* @@ -290,7 +328,7 @@ public Mono type(K key) { Assert.notNull(key, "Key must not be null!"); - return createMono(connection -> connection.keyCommands().type(rawKey(key))); + return doCreateMono(connection -> connection.keyCommands().type(rawKey(key))); } /* @@ -302,7 +340,7 @@ public Flux keys(K pattern) { Assert.notNull(pattern, "Pattern must not be null!"); - return createFlux(connection -> connection.keyCommands().keys(rawKey(pattern))) // + return doCreateFlux(connection -> connection.keyCommands().keys(rawKey(pattern))) // .flatMap(Flux::fromIterable) // .map(this::readKey); } @@ -316,7 +354,7 @@ public Flux scan(ScanOptions options) { Assert.notNull(options, "ScanOptions must not be null!"); - return createFlux(connection -> connection.keyCommands().scan(options)) // + return doCreateFlux(connection -> connection.keyCommands().scan(options)) // .map(this::readKey); } @@ -326,7 +364,7 @@ public Flux scan(ScanOptions options) { */ @Override public Mono randomKey() { - return createMono(connection -> connection.keyCommands().randomKey()).map(this::readKey); + return doCreateMono(connection -> connection.keyCommands().randomKey()).map(this::readKey); } /* @@ -339,7 +377,7 @@ public Mono rename(K oldKey, K newKey) { Assert.notNull(oldKey, "Old key must not be null!"); Assert.notNull(newKey, "New Key must not be null!"); - return createMono(connection -> connection.keyCommands().rename(rawKey(oldKey), rawKey(newKey))); + return doCreateMono(connection -> connection.keyCommands().rename(rawKey(oldKey), rawKey(newKey))); } /* @@ -352,7 +390,7 @@ public Mono renameIfAbsent(K oldKey, K newKey) { Assert.notNull(oldKey, "Old key must not be null!"); Assert.notNull(newKey, "New Key must not be null!"); - return createMono(connection -> connection.keyCommands().renameNX(rawKey(oldKey), rawKey(newKey))); + return doCreateMono(connection -> connection.keyCommands().renameNX(rawKey(oldKey), rawKey(newKey))); } /* @@ -368,11 +406,11 @@ public final Mono delete(K... keys) { Assert.noNullElements(keys, "Keys must not contain null elements!"); if (keys.length == 1) { - return createMono(connection -> connection.keyCommands().del(rawKey(keys[0]))); + return doCreateMono(connection -> connection.keyCommands().del(rawKey(keys[0]))); } Mono> listOfKeys = Flux.fromArray(keys).map(this::rawKey).collectList(); - return createMono(connection -> listOfKeys.flatMap(rawKeys -> connection.keyCommands().mDel(rawKeys))); + return doCreateMono(connection -> listOfKeys.flatMap(rawKeys -> connection.keyCommands().mDel(rawKeys))); } /* @@ -384,7 +422,7 @@ public Mono delete(Publisher keys) { Assert.notNull(keys, "Keys must not be null!"); - return createFlux(connection -> connection.keyCommands() // + return doCreateFlux(connection -> connection.keyCommands() // .mDel(Flux.from(keys).map(this::rawKey).buffer(128)) // .map(CommandResponse::getOutput)) // .collect(Collectors.summingLong(value -> value)); @@ -403,11 +441,11 @@ public final Mono unlink(K... keys) { Assert.noNullElements(keys, "Keys must not contain null elements!"); if (keys.length == 1) { - return createMono(connection -> connection.keyCommands().unlink(rawKey(keys[0]))); + return doCreateMono(connection -> connection.keyCommands().unlink(rawKey(keys[0]))); } Mono> listOfKeys = Flux.fromArray(keys).map(this::rawKey).collectList(); - return createMono(connection -> listOfKeys.flatMap(rawKeys -> connection.keyCommands().mUnlink(rawKeys))); + return doCreateMono(connection -> listOfKeys.flatMap(rawKeys -> connection.keyCommands().mUnlink(rawKeys))); } /* @@ -419,7 +457,7 @@ public Mono unlink(Publisher keys) { Assert.notNull(keys, "Keys must not be null!"); - return createFlux(connection -> connection.keyCommands() // + return doCreateFlux(connection -> connection.keyCommands() // .mUnlink(Flux.from(keys).map(this::rawKey).buffer(128)) // .map(CommandResponse::getOutput)) // .collect(Collectors.summingLong(value -> value)); @@ -436,11 +474,11 @@ public Mono expire(K key, Duration timeout) { Assert.notNull(timeout, "Timeout must not be null!"); if (timeout.getNano() == 0) { - return createMono(connection -> connection.keyCommands() // + return doCreateMono(connection -> connection.keyCommands() // .expire(rawKey(key), timeout)); } - return createMono(connection -> connection.keyCommands().pExpire(rawKey(key), timeout)); + return doCreateMono(connection -> connection.keyCommands().pExpire(rawKey(key), timeout)); } /* @@ -454,11 +492,11 @@ public Mono expireAt(K key, Instant expireAt) { Assert.notNull(expireAt, "Expire at must not be null!"); if (expireAt.getNano() == 0) { - return createMono(connection -> connection.keyCommands() // + return doCreateMono(connection -> connection.keyCommands() // .expireAt(rawKey(key), expireAt)); } - return createMono(connection -> connection.keyCommands().pExpireAt(rawKey(key), expireAt)); + return doCreateMono(connection -> connection.keyCommands().pExpireAt(rawKey(key), expireAt)); } /* @@ -470,7 +508,7 @@ public Mono persist(K key) { Assert.notNull(key, "Key must not be null!"); - return createMono(connection -> connection.keyCommands().persist(rawKey(key))); + return doCreateMono(connection -> connection.keyCommands().persist(rawKey(key))); } /* @@ -482,7 +520,7 @@ public Mono getExpire(K key) { Assert.notNull(key, "Key must not be null!"); - return createMono(connection -> connection.keyCommands().pTtl(rawKey(key)).flatMap(expiry -> { + return doCreateMono(connection -> connection.keyCommands().pTtl(rawKey(key)).flatMap(expiry -> { if (expiry == -1) { return Mono.just(Duration.ZERO); @@ -505,7 +543,7 @@ public Mono move(K key, int dbIndex) { Assert.notNull(key, "Key must not be null!"); - return createMono(connection -> connection.keyCommands().move(rawKey(key), dbIndex)); + return doCreateMono(connection -> connection.keyCommands().move(rawKey(key), dbIndex)); } // ------------------------------------------------------------------------- @@ -742,6 +780,31 @@ public RedisSerializationContext getSerializationContext() { return serializationContext; } + private ReactiveRedisOperations withConnection(ReactiveRedisConnection connection) { + return new BoundConnectionRedisTemplate(connection, connectionFactory, serializationContext); + } + + class BoundConnectionRedisTemplate extends ReactiveRedisTemplate { + + private final ReactiveRedisConnection connection; + + public BoundConnectionRedisTemplate(ReactiveRedisConnection connection, + ReactiveRedisConnectionFactory connectionFactory, RedisSerializationContext serializationContext) { + super(connectionFactory, serializationContext, true); + this.connection = connection; + } + + @Override + Publisher doInConnection(ReactiveRedisCallback action, boolean exposeConnection) { + + Assert.notNull(action, "Callback object must not be null"); + + ReactiveRedisConnection connToUse = ReactiveRedisTemplate.this.preProcessConnection(connection, true); + Publisher result = action.doInRedis(connToUse); + return ReactiveRedisTemplate.this.postProcessResult(result, connToUse, true); + } + } + private ByteBuffer rawKey(K key) { return getSerializationContext().getKeySerializationPair().getWriter().write(key); } diff --git a/src/main/kotlin/org/springframework/data/redis/core/ReactiveRedisOperationsExtensions.kt b/src/main/kotlin/org/springframework/data/redis/core/ReactiveRedisOperationsExtensions.kt index f3023d3008..89af9db124 100644 --- a/src/main/kotlin/org/springframework/data/redis/core/ReactiveRedisOperationsExtensions.kt +++ b/src/main/kotlin/org/springframework/data/redis/core/ReactiveRedisOperationsExtensions.kt @@ -22,7 +22,7 @@ import kotlinx.coroutines.reactive.awaitFirstOrNull import kotlinx.coroutines.reactive.awaitSingle import org.springframework.data.redis.connection.DataType import org.springframework.data.redis.connection.ReactiveRedisConnection -import org.springframework.data.redis.connection.ReactiveSubscription.* +import org.springframework.data.redis.connection.ReactiveSubscription.Message import org.springframework.data.redis.core.script.RedisScript import org.springframework.data.redis.listener.Topic import org.springframework.data.redis.serializer.RedisElementReader @@ -36,8 +36,20 @@ import java.time.Instant * @author Sebastien Deleuze * @since 2.2 */ -fun ReactiveRedisOperations.executeAsFlow(action: (ReactiveRedisConnection) -> Flow): Flow = - execute { action(it).asPublisher() }.asFlow() +fun ReactiveRedisOperations.executeAsFlow(action: (ReactiveRedisConnection) -> Flow): Flow { + return execute { action(it).asPublisher() }.asFlow() +} + +/** + * Coroutines variant of [ReactiveRedisOperations.execute]. + * + * @author Mark Paluch + * @since 2.6 + */ +fun ReactiveRedisOperations.executeInSessionAsFlow( + action: (ReactiveRedisOperations) -> Flow +): Flow = + executeInSession { action(it).asPublisher() }.asFlow() /** * Coroutines variant of [ReactiveRedisOperations.execute]. @@ -45,8 +57,12 @@ fun ReactiveRedisOperations.executeAsFlow(acti * @author Sebastien Deleuze * @since 2.2 */ -fun ReactiveRedisOperations.executeAsFlow(script: RedisScript, keys: List = emptyList(), args: List<*> = emptyList()): Flow = - execute(script, keys, args).asFlow() +fun ReactiveRedisOperations.executeAsFlow( + script: RedisScript, + keys: List = emptyList(), + args: List<*> = emptyList() +): Flow = + execute(script, keys, args).asFlow() /** * Coroutines variant of [ReactiveRedisOperations.execute]. diff --git a/src/test/kotlin/org/springframework/data/redis/core/ReactiveRedisOperationsExtensionsUnitTests.kt b/src/test/kotlin/org/springframework/data/redis/core/ReactiveRedisOperationsExtensionsUnitTests.kt index 7e98bd1f62..26cab92921 100644 --- a/src/test/kotlin/org/springframework/data/redis/core/ReactiveRedisOperationsExtensionsUnitTests.kt +++ b/src/test/kotlin/org/springframework/data/redis/core/ReactiveRedisOperationsExtensionsUnitTests.kt @@ -50,7 +50,8 @@ class ReactiveRedisOperationsExtensionsUnitTests { every { operations.execute(any>()) } returns Flux.just("foo") runBlocking { - assertThat(operations.executeAsFlow { flow { emit("foo")} }.toList()).contains("foo") + assertThat(operations.executeAsFlow { flow { emit("foo") } } + .toList()).contains("foo") } verify { @@ -58,12 +59,36 @@ class ReactiveRedisOperationsExtensionsUnitTests { } } + @Test // GH-2110 + fun `executeInSession with calllback`() { + + val operations = mockk>() + every { operations.executeInSession(any>()) } returns Flux.just( + "foo" + ) + + runBlocking { + assertThat(operations.executeInSessionAsFlow { flow { emit("foo") } } + .toList()).contains("foo") + } + + verify { + operations.executeInSession(any>()) + } + } + @Test // DATAREDIS-1033 fun `execute with script`() { val script = RedisScript.of("foo") val operations = mockk>() - every { operations.execute(any>(), any(), any()) } returns Flux.just("foo") + every { + operations.execute( + any>(), + any(), + any() + ) + } returns Flux.just("foo") runBlocking { assertThat(operations.executeAsFlow(script).toList()).contains("foo")