Skip to content

Commit c2397f7

Browse files
committed
Synchronize get with value loader only if value is absent for given key
The previous version synchronize all calls to `get(key, valueLoader)`. After this PR the calls to value loader will only be synchronised if we do not have value for the given key. Fixes spring-projects#2079
1 parent 866010e commit c2397f7

File tree

2 files changed

+67
-11
lines changed

2 files changed

+67
-11
lines changed

src/main/java/org/springframework/data/redis/cache/RedisCache.java

+23-4
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.springframework.data.redis.util.ByteUtils;
3535
import org.springframework.lang.Nullable;
3636
import org.springframework.util.Assert;
37+
import org.springframework.util.ConcurrentReferenceHashMap;
3738
import org.springframework.util.ObjectUtils;
3839
import org.springframework.util.ReflectionUtils;
3940

@@ -44,6 +45,7 @@
4445
*
4546
* @author Christoph Strobl
4647
* @author Mark Paluch
48+
* @author Piotr Mionskowski
4749
* @see RedisCacheConfiguration
4850
* @see RedisCacheWriter
4951
* @since 2.0
@@ -118,17 +120,34 @@ public RedisCacheWriter getNativeCache() {
118120
*/
119121
@Override
120122
@SuppressWarnings("unchecked")
121-
public synchronized <T> T get(Object key, Callable<T> valueLoader) {
123+
public <T> T get(Object key, Callable<T> valueLoader) {
122124

123125
ValueWrapper result = get(key);
124126

125127
if (result != null) {
126128
return (T) result.get();
127129
}
128130

129-
T value = valueFromLoader(key, valueLoader);
130-
put(key, value);
131-
return value;
131+
return getSynchronized(key, valueLoader);
132+
}
133+
134+
private final ConcurrentReferenceHashMap<String, Object> valueLoaderLocks = new ConcurrentReferenceHashMap<>();
135+
136+
@SuppressWarnings({"unchecked", "SynchronizationOnLocalVariableOrMethodParameter"})
137+
private <T> T getSynchronized(Object key, Callable<T> valueLoader) {
138+
final Object loaderLock = valueLoaderLocks.computeIfAbsent(createCacheKey(key), (String k) -> new Object());
139+
140+
synchronized (loaderLock) {
141+
ValueWrapper result = get(key);
142+
143+
if (result != null) {
144+
return (T) result.get();
145+
}
146+
147+
T value = valueFromLoader(key, valueLoader);
148+
put(key, value);
149+
return value;
150+
}
132151
}
133152

134153
/*

src/test/java/org/springframework/data/redis/cache/RedisCacheTests.java

+44-7
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,12 @@
2828
import java.util.Collection;
2929
import java.util.Collections;
3030
import java.util.Date;
31+
import java.util.concurrent.ConcurrentHashMap;
32+
import java.util.concurrent.ConcurrentMap;
33+
import java.util.concurrent.CountDownLatch;
34+
import java.util.concurrent.atomic.AtomicInteger;
3135
import java.util.function.Consumer;
36+
import java.util.stream.Stream;
3237

3338
import org.junit.jupiter.api.BeforeEach;
3439

@@ -409,15 +414,47 @@ void cacheShouldFailOnNonConvertibleCacheKey() {
409414
assertThatExceptionOfType(IllegalStateException.class).isThrownBy(() -> cache.put(key, sample));
410415
}
411416

412-
void doWithConnection(Consumer<RedisConnection> callback) {
413-
RedisConnection connection = connectionFactory.getConnection();
414-
try {
415-
callback.accept(connection);
416-
} finally {
417-
connection.close();
418-
}
417+
@ParameterizedRedisTest // GH-2079
418+
void multipleThreadsLoadValueOnce() {
419+
420+
int threadCount = 5;
421+
422+
ConcurrentMap<Integer, Integer> valuesByThreadId = new ConcurrentHashMap<>(threadCount);
423+
424+
CountDownLatch waiter = new CountDownLatch(threadCount);
425+
426+
AtomicInteger threadIds = new AtomicInteger(0);
427+
428+
AtomicInteger currentValueForKey = new AtomicInteger(0);
429+
430+
Stream.generate(threadIds::getAndIncrement)
431+
.limit(threadCount)
432+
.parallel()
433+
.forEach((threadId) -> {
434+
waiter.countDown();
435+
try {
436+
waiter.await();
437+
} catch (InterruptedException e) {
438+
e.printStackTrace();
439+
}
440+
Integer valueForThread = cache.get("key", currentValueForKey::incrementAndGet);
441+
valuesByThreadId.put(threadId, valueForThread);
442+
});
443+
444+
valuesByThreadId.forEach((thread, valueForThread) -> {
445+
assertThat(valueForThread).isEqualTo(currentValueForKey.get());
446+
});
419447
}
420448

449+
void doWithConnection(Consumer<RedisConnection> callback) {
450+
RedisConnection connection = connectionFactory.getConnection();
451+
try {
452+
callback.accept(connection);
453+
} finally {
454+
connection.close();
455+
}
456+
}
457+
421458
@Data
422459
@NoArgsConstructor
423460
@AllArgsConstructor

0 commit comments

Comments
 (0)