From f643a18304971c1e479e770e5cdc1d0203695ba0 Mon Sep 17 00:00:00 2001 From: Piotr Mionskowski Date: Thu, 10 Jun 2021 23:04:16 +0200 Subject: [PATCH] Synchronize get with value loader only if value is absent for given key The previous version synchronize all calls to `get(key, valueLoader)`. After this PR the calls to value loader will only be synchronised if we do not have value for the given key. Improves #2079 --- .../data/redis/cache/RedisCache.java | 14 ++++- .../data/redis/cache/RedisCacheTests.java | 51 ++++++++++++++++--- 2 files changed, 57 insertions(+), 8 deletions(-) diff --git a/src/main/java/org/springframework/data/redis/cache/RedisCache.java b/src/main/java/org/springframework/data/redis/cache/RedisCache.java index b5567b9fbe..ebaa53acde 100644 --- a/src/main/java/org/springframework/data/redis/cache/RedisCache.java +++ b/src/main/java/org/springframework/data/redis/cache/RedisCache.java @@ -44,6 +44,7 @@ * * @author Christoph Strobl * @author Mark Paluch + * @author Piotr Mionskowski * @see RedisCacheConfiguration * @see RedisCacheWriter * @since 2.0 @@ -118,7 +119,7 @@ public RedisCacheWriter getNativeCache() { */ @Override @SuppressWarnings("unchecked") - public synchronized T get(Object key, Callable valueLoader) { + public T get(Object key, Callable valueLoader) { ValueWrapper result = get(key); @@ -126,6 +127,17 @@ public synchronized T get(Object key, Callable valueLoader) { return (T) result.get(); } + return getSynchronized(key, valueLoader); + } + + @SuppressWarnings("unchecked") + private synchronized T getSynchronized(Object key, Callable valueLoader) { + ValueWrapper result = get(key); + + if (result != null) { + return (T) result.get(); + } + T value = valueFromLoader(key, valueLoader); put(key, value); return value; diff --git a/src/test/java/org/springframework/data/redis/cache/RedisCacheTests.java b/src/test/java/org/springframework/data/redis/cache/RedisCacheTests.java index ef29b13782..ad4f90adc1 100644 --- a/src/test/java/org/springframework/data/redis/cache/RedisCacheTests.java +++ b/src/test/java/org/springframework/data/redis/cache/RedisCacheTests.java @@ -28,7 +28,12 @@ import java.util.Collection; import java.util.Collections; import java.util.Date; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; +import java.util.stream.Stream; import org.junit.jupiter.api.BeforeEach; @@ -409,15 +414,47 @@ void cacheShouldFailOnNonConvertibleCacheKey() { assertThatExceptionOfType(IllegalStateException.class).isThrownBy(() -> cache.put(key, sample)); } - void doWithConnection(Consumer callback) { - RedisConnection connection = connectionFactory.getConnection(); - try { - callback.accept(connection); - } finally { - connection.close(); - } + @ParameterizedRedisTest // GH-2079 + void multipleThreadsLoadValueOnce() { + + int threadCount = 5; + + ConcurrentMap valuesByThreadId = new ConcurrentHashMap<>(threadCount); + + CountDownLatch waiter = new CountDownLatch(threadCount); + + AtomicInteger threadIds = new AtomicInteger(0); + + AtomicInteger currentValueForKey = new AtomicInteger(0); + + Stream.generate(threadIds::getAndIncrement) + .limit(threadCount) + .parallel() + .forEach((threadId) -> { + waiter.countDown(); + try { + waiter.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + Integer valueForThread = cache.get("key", currentValueForKey::incrementAndGet); + valuesByThreadId.put(threadId, valueForThread); + }); + + valuesByThreadId.forEach((thread, valueForThread) -> { + assertThat(valueForThread).isEqualTo(currentValueForKey.get()); + }); } + void doWithConnection(Consumer callback) { + RedisConnection connection = connectionFactory.getConnection(); + try { + callback.accept(connection); + } finally { + connection.close(); + } + } + @Data @NoArgsConstructor @AllArgsConstructor