list = new ArrayList<>(this.size);
+
+ while (list.size() < this.size && this.iterator.hasNext()) {
+ list.add(this.iterator.next());
}
return list;
diff --git a/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java b/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java
index 36121249d6..37195ef05d 100644
--- a/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java
+++ b/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java
@@ -15,19 +15,29 @@
*/
package org.springframework.data.redis.cache;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import org.springframework.dao.PessimisticLockingFailureException;
+import org.springframework.data.redis.connection.ReactiveRedisConnection;
+import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisStringCommands.SetOption;
import org.springframework.data.redis.core.types.Expiration;
+import org.springframework.data.redis.util.ByteUtils;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
+import org.springframework.util.ClassUtils;
+import org.springframework.util.ObjectUtils;
/**
* {@link RedisCacheWriter} implementation capable of reading/writing binary data from/to Redis in {@literal standalone}
@@ -35,8 +45,8 @@
* {@link RedisConnection}.
*
* {@link DefaultRedisCacheWriter} can be used in
- * {@link RedisCacheWriter#lockingRedisCacheWriter(RedisConnectionFactory) locking}
- * or {@link RedisCacheWriter#nonLockingRedisCacheWriter(RedisConnectionFactory) non-locking} mode. While
+ * {@link RedisCacheWriter#lockingRedisCacheWriter(RedisConnectionFactory) locking} or
+ * {@link RedisCacheWriter#nonLockingRedisCacheWriter(RedisConnectionFactory) non-locking} mode. While
* {@literal non-locking} aims for maximum performance it may result in overlapping, non-atomic, command execution for
* operations spanning multiple Redis interactions like {@code putIfAbsent}. The {@literal locking} counterpart prevents
* command overlap by setting an explicit lock key and checking against presence of this key which leads to additional
@@ -50,6 +60,9 @@
*/
class DefaultRedisCacheWriter implements RedisCacheWriter {
+ private static final boolean REACTIVE_REDIS_CONNECTION_FACTORY_PRESENT = ClassUtils
+ .isPresent("org.springframework.data.redis.connection.ReactiveRedisConnectionFactory", null);
+
private final BatchStrategy batchStrategy;
private final CacheStatisticsCollector statistics;
@@ -60,6 +73,8 @@ class DefaultRedisCacheWriter implements RedisCacheWriter {
private final TtlFunction lockTtl;
+ private final AsyncCacheWriter asyncCacheWriter;
+
/**
* @param connectionFactory must not be {@literal null}.
* @param batchStrategy must not be {@literal null}.
@@ -100,6 +115,64 @@ class DefaultRedisCacheWriter implements RedisCacheWriter {
this.lockTtl = lockTtl;
this.statistics = cacheStatisticsCollector;
this.batchStrategy = batchStrategy;
+
+ if (REACTIVE_REDIS_CONNECTION_FACTORY_PRESENT && this.connectionFactory instanceof ReactiveRedisConnectionFactory) {
+ asyncCacheWriter = new AsynchronousCacheWriterDelegate();
+ } else {
+ asyncCacheWriter = UnsupportedAsyncCacheWriter.INSTANCE;
+ }
+ }
+
+ @Override
+ public byte[] get(String name, byte[] key) {
+ return get(name, key, null);
+ }
+
+ @Override
+ public byte[] get(String name, byte[] key, @Nullable Duration ttl) {
+
+ Assert.notNull(name, "Name must not be null");
+ Assert.notNull(key, "Key must not be null");
+
+ byte[] result = shouldExpireWithin(ttl)
+ ? execute(name, connection -> connection.stringCommands().getEx(key, Expiration.from(ttl)))
+ : execute(name, connection -> connection.stringCommands().get(key));
+
+ statistics.incGets(name);
+
+ if (result != null) {
+ statistics.incHits(name);
+ } else {
+ statistics.incMisses(name);
+ }
+
+ return result;
+ }
+
+ @Override
+ public boolean supportsAsyncRetrieve() {
+ return asyncCacheWriter.isSupported();
+ }
+
+ @Override
+ public CompletableFuture retrieve(String name, byte[] key, @Nullable Duration ttl) {
+
+ Assert.notNull(name, "Name must not be null");
+ Assert.notNull(key, "Key must not be null");
+
+ return asyncCacheWriter.retrieve(name, key, ttl) //
+ .thenApply(cachedValue -> {
+
+ statistics.incGets(name);
+
+ if (cachedValue != null) {
+ statistics.incHits(name);
+ } else {
+ statistics.incMisses(name);
+ }
+
+ return cachedValue;
+ });
}
@Override
@@ -112,9 +185,10 @@ public void put(String name, byte[] key, byte[] value, @Nullable Duration ttl) {
execute(name, connection -> {
if (shouldExpireWithin(ttl)) {
- connection.set(key, value, Expiration.from(ttl.toMillis(), TimeUnit.MILLISECONDS), SetOption.upsert());
+ connection.stringCommands().set(key, value, Expiration.from(ttl.toMillis(), TimeUnit.MILLISECONDS),
+ SetOption.upsert());
} else {
- connection.set(key, value);
+ connection.stringCommands().set(key, value);
}
return "OK";
@@ -124,29 +198,14 @@ public void put(String name, byte[] key, byte[] value, @Nullable Duration ttl) {
}
@Override
- public byte[] get(String name, byte[] key) {
- return get(name, key, null);
- }
-
- @Override
- public byte[] get(String name, byte[] key, @Nullable Duration ttl) {
+ public CompletableFuture store(String name, byte[] key, byte[] value, @Nullable Duration ttl) {
Assert.notNull(name, "Name must not be null");
Assert.notNull(key, "Key must not be null");
+ Assert.notNull(value, "Value must not be null");
- byte[] result = shouldExpireWithin(ttl)
- ? execute(name, connection -> connection.getEx(key, Expiration.from(ttl)))
- : execute(name, connection -> connection.get(key));
-
- statistics.incGets(name);
-
- if (result != null) {
- statistics.incHits(name);
- } else {
- statistics.incMisses(name);
- }
-
- return result;
+ return asyncCacheWriter.store(name, key, value, ttl) //
+ .thenRun(() -> statistics.incPuts(name));
}
@Override
@@ -167,9 +226,10 @@ public byte[] putIfAbsent(String name, byte[] key, byte[] value, @Nullable Durat
boolean put;
if (shouldExpireWithin(ttl)) {
- put = connection.set(key, value, Expiration.from(ttl), SetOption.ifAbsent());
+ put = ObjectUtils.nullSafeEquals(
+ connection.stringCommands().set(key, value, Expiration.from(ttl), SetOption.ifAbsent()), true);
} else {
- put = connection.setNX(key, value);
+ put = ObjectUtils.nullSafeEquals(connection.stringCommands().setNX(key, value), true);
}
if (put) {
@@ -177,7 +237,7 @@ public byte[] putIfAbsent(String name, byte[] key, byte[] value, @Nullable Durat
return null;
}
- return connection.get(key);
+ return connection.stringCommands().get(key);
} finally {
if (isLockingCacheWriter()) {
@@ -193,7 +253,7 @@ public void remove(String name, byte[] key) {
Assert.notNull(name, "Name must not be null");
Assert.notNull(key, "Key must not be null");
- execute(name, connection -> connection.del(key));
+ execute(name, connection -> connection.keyCommands().del(key));
statistics.incDeletes(name);
}
@@ -257,6 +317,15 @@ void lock(String name) {
execute(name, connection -> doLock(name, name, null, connection));
}
+ @Nullable
+ private Boolean doLock(String name, Object contextualKey, @Nullable Object contextualValue,
+ RedisConnection connection) {
+
+ Expiration expiration = Expiration.from(this.lockTtl.getTimeToLive(contextualKey, contextualValue));
+
+ return connection.stringCommands().set(createCacheLockKey(name), new byte[0], expiration, SetOption.SET_IF_ABSENT);
+ }
+
/**
* Explicitly remove a write lock from a cache.
*
@@ -266,52 +335,35 @@ void unlock(String name) {
executeLockFree(connection -> doUnlock(name, connection));
}
- private Boolean doLock(String name, Object contextualKey, Object contextualValue, RedisConnection connection) {
-
- Expiration expiration = lockTtl == null ? Expiration.persistent()
- : Expiration.from(lockTtl.getTimeToLive(contextualKey, contextualValue));
-
- return connection.set(createCacheLockKey(name), new byte[0], expiration, SetOption.SET_IF_ABSENT);
- }
-
+ @Nullable
private Long doUnlock(String name, RedisConnection connection) {
- return connection.del(createCacheLockKey(name));
- }
-
- boolean doCheckLock(String name, RedisConnection connection) {
- return connection.exists(createCacheLockKey(name));
- }
-
- /**
- * @return {@literal true} if {@link RedisCacheWriter} uses locks.
- */
- private boolean isLockingCacheWriter() {
- return !sleepTime.isZero() && !sleepTime.isNegative();
+ return connection.keyCommands().del(createCacheLockKey(name));
}
private T execute(String name, Function callback) {
- RedisConnection connection = connectionFactory.getConnection();
-
- try {
+ try (RedisConnection connection = this.connectionFactory.getConnection()) {
checkAndPotentiallyWaitUntilUnlocked(name, connection);
return callback.apply(connection);
- } finally {
- connection.close();
}
}
- private void executeLockFree(Consumer callback) {
+ private T executeLockFree(Function callback) {
- RedisConnection connection = connectionFactory.getConnection();
-
- try {
- callback.accept(connection);
- } finally {
- connection.close();
+ try (RedisConnection connection = this.connectionFactory.getConnection()) {
+ return callback.apply(connection);
}
}
+ /**
+ * Determines whether this {@link RedisCacheWriter} uses locks during caching operations.
+ *
+ * @return {@literal true} if {@link RedisCacheWriter} uses locks.
+ */
+ private boolean isLockingCacheWriter() {
+ return !this.sleepTime.isZero() && !this.sleepTime.isNegative();
+ }
+
private void checkAndPotentiallyWaitUntilUnlocked(String name, RedisConnection connection) {
if (!isLockingCacheWriter()) {
@@ -322,26 +374,199 @@ private void checkAndPotentiallyWaitUntilUnlocked(String name, RedisConnection c
try {
while (doCheckLock(name, connection)) {
- Thread.sleep(sleepTime.toMillis());
+ Thread.sleep(this.sleepTime.toMillis());
}
} catch (InterruptedException cause) {
- // Re-interrupt current thread, to allow other participants to react.
+ // Re-interrupt current Thread to allow other participants to react.
Thread.currentThread().interrupt();
- String message = String.format("Interrupted while waiting to unlock cache %s", name);
-
- throw new PessimisticLockingFailureException(message, cause);
+ throw new PessimisticLockingFailureException(String.format("Interrupted while waiting to unlock cache %s", name),
+ cause);
} finally {
- statistics.incLockTime(name, System.nanoTime() - lockWaitTimeNs);
+ this.statistics.incLockTime(name, System.nanoTime() - lockWaitTimeNs);
}
}
+ boolean doCheckLock(String name, RedisConnection connection) {
+ return ObjectUtils.nullSafeEquals(connection.keyCommands().exists(createCacheLockKey(name)), true);
+ }
+
+ byte[] createCacheLockKey(String name) {
+ return (name + "~lock").getBytes(StandardCharsets.UTF_8);
+ }
+
private static boolean shouldExpireWithin(@Nullable Duration ttl) {
return ttl != null && !ttl.isZero() && !ttl.isNegative();
}
- private static byte[] createCacheLockKey(String name) {
- return (name + "~lock").getBytes(StandardCharsets.UTF_8);
+ /**
+ * Interface for asynchronous cache retrieval.
+ *
+ * @since 3.2
+ */
+ interface AsyncCacheWriter {
+
+ /**
+ * @return {@code true} if async cache operations are supported; {@code false} otherwise.
+ */
+ boolean isSupported();
+
+ /**
+ * Retrieve a cache entry asynchronously.
+ *
+ * @param name the cache name from which to retrieve the cache entry.
+ * @param key the cache entry key.
+ * @param ttl optional TTL to set for Time-to-Idle eviction.
+ * @return a future that completes either with a value if the value exists or completing with {@code null} if the
+ * cache does not contain an entry.
+ */
+ CompletableFuture retrieve(String name, byte[] key, @Nullable Duration ttl);
+
+ /**
+ * Store a cache entry asynchronously.
+ *
+ * @param name the cache name which to store the cache entry to.
+ * @param key the key for the cache entry. Must not be {@literal null}.
+ * @param value the value stored for the key. Must not be {@literal null}.
+ * @param ttl optional expiration time. Can be {@literal null}.
+ * @return a future that signals completion.
+ */
+ CompletableFuture store(String name, byte[] key, byte[] value, @Nullable Duration ttl);
+ }
+
+ /**
+ * Unsupported variant of a {@link AsyncCacheWriter}.
+ *
+ * @since 3.2
+ */
+ enum UnsupportedAsyncCacheWriter implements AsyncCacheWriter {
+ INSTANCE;
+
+ @Override
+ public boolean isSupported() {
+ return false;
+ }
+
+ @Override
+ public CompletableFuture retrieve(String name, byte[] key, @Nullable Duration ttl) {
+ throw new UnsupportedOperationException("async retrieve not supported");
+ }
+
+ @Override
+ public CompletableFuture store(String name, byte[] key, byte[] value, @Nullable Duration ttl) {
+ throw new UnsupportedOperationException("async store not supported");
+ }
+ }
+
+ /**
+ * Delegate implementing {@link AsyncCacheWriter} to provide asynchronous cache retrieval and storage operations using
+ * {@link ReactiveRedisConnectionFactory}.
+ *
+ * @since 3.2
+ */
+ class AsynchronousCacheWriterDelegate implements AsyncCacheWriter {
+
+ @Override
+ public boolean isSupported() {
+ return true;
+ }
+
+ @Override
+ public CompletableFuture retrieve(String name, byte[] key, @Nullable Duration ttl) {
+
+ return doWithConnection(connection -> {
+
+ ByteBuffer wrappedKey = ByteBuffer.wrap(key);
+ Mono> cacheLockCheckFlux;
+
+ if (isLockingCacheWriter())
+ cacheLockCheckFlux = waitForLock(connection, name);
+ else {
+ cacheLockCheckFlux = Mono.empty();
+ }
+
+ Mono get = shouldExpireWithin(ttl)
+ ? connection.stringCommands().getEx(wrappedKey, Expiration.from(ttl))
+ : connection.stringCommands().get(wrappedKey);
+
+ return cacheLockCheckFlux.then(get).map(ByteUtils::getBytes).toFuture();
+ });
+ }
+
+ @Override
+ public CompletableFuture store(String name, byte[] key, byte[] value, @Nullable Duration ttl) {
+
+ return doWithConnection(connection -> {
+
+ Mono> mono;
+
+ if (isLockingCacheWriter()) {
+
+ mono = Mono.usingWhen(doLock(name, key, value, connection), unused -> doStore(key, value, ttl, connection),
+ unused -> doUnlock(name, connection));
+ } else {
+ mono = doStore(key, value, ttl, connection);
+ }
+
+ return mono.then().toFuture();
+ });
+ }
+
+ private Mono doStore(byte[] cacheKey, byte[] value, @Nullable Duration ttl,
+ ReactiveRedisConnection connection) {
+
+ ByteBuffer wrappedKey = ByteBuffer.wrap(cacheKey);
+ ByteBuffer wrappedValue = ByteBuffer.wrap(value);
+
+ if (shouldExpireWithin(ttl)) {
+ return connection.stringCommands().set(wrappedKey, wrappedValue,
+ Expiration.from(ttl.toMillis(), TimeUnit.MILLISECONDS), SetOption.upsert());
+ } else {
+ return connection.stringCommands().set(wrappedKey, wrappedValue);
+ }
+ }
+
+ private Mono