Skip to content

Commit ba767fa

Browse files
committed
Polishing.
Inline valueFromLoader method. Refine tests to not rely on the number of runtime CPU cores. See #2079 Original pull request: #2082.
1 parent 30aeda5 commit ba767fa

File tree

2 files changed

+88
-34
lines changed

2 files changed

+88
-34
lines changed

src/main/java/org/springframework/data/redis/cache/RedisCache.java

+7-9
Original file line numberDiff line numberDiff line change
@@ -132,13 +132,19 @@ public <T> T get(Object key, Callable<T> valueLoader) {
132132

133133
@SuppressWarnings("unchecked")
134134
private synchronized <T> T getSynchronized(Object key, Callable<T> valueLoader) {
135+
135136
ValueWrapper result = get(key);
136137

137138
if (result != null) {
138139
return (T) result.get();
139140
}
140141

141-
T value = valueFromLoader(key, valueLoader);
142+
T value;
143+
try {
144+
value = valueLoader.call();
145+
} catch (Exception e) {
146+
throw new ValueRetrievalException(key, valueLoader, e);
147+
}
142148
put(key, value);
143149
return value;
144150
}
@@ -391,12 +397,4 @@ private String prefixCacheKey(String key) {
391397
return cacheConfig.getKeyPrefixFor(name) + key;
392398
}
393399

394-
private static <T> T valueFromLoader(Object key, Callable<T> valueLoader) {
395-
396-
try {
397-
return valueLoader.call();
398-
} catch (Exception e) {
399-
throw new ValueRetrievalException(key, valueLoader, e);
400-
}
401-
}
402400
}

src/test/java/org/springframework/data/redis/cache/RedisCacheTests.java

+81-25
Original file line numberDiff line numberDiff line change
@@ -18,22 +18,26 @@
1818
import static org.assertj.core.api.Assertions.*;
1919
import static org.assertj.core.api.Assumptions.*;
2020

21+
import io.netty.util.concurrent.DefaultThreadFactory;
2122
import lombok.AllArgsConstructor;
2223
import lombok.Data;
2324
import lombok.NoArgsConstructor;
2425
import lombok.RequiredArgsConstructor;
2526

2627
import java.io.Serializable;
2728
import java.nio.charset.StandardCharsets;
29+
import java.time.Duration;
2830
import java.util.Collection;
2931
import java.util.Collections;
3032
import java.util.Date;
31-
import java.util.concurrent.ConcurrentHashMap;
32-
import java.util.concurrent.ConcurrentMap;
3333
import java.util.concurrent.CountDownLatch;
34+
import java.util.concurrent.LinkedBlockingDeque;
35+
import java.util.concurrent.ThreadPoolExecutor;
36+
import java.util.concurrent.TimeUnit;
3437
import java.util.concurrent.atomic.AtomicInteger;
38+
import java.util.concurrent.atomic.AtomicReference;
3539
import java.util.function.Consumer;
36-
import java.util.stream.Stream;
40+
import java.util.stream.IntStream;
3741

3842
import org.junit.jupiter.api.BeforeEach;
3943

@@ -48,13 +52,15 @@
4852
import org.springframework.data.redis.serializer.RedisSerializer;
4953
import org.springframework.data.redis.test.extension.parametrized.MethodSource;
5054
import org.springframework.data.redis.test.extension.parametrized.ParameterizedRedisTest;
55+
import org.springframework.lang.Nullable;
5156

5257
/**
5358
* Tests for {@link RedisCache} with {@link DefaultRedisCacheWriter} using different {@link RedisSerializer} and
5459
* {@link RedisConnectionFactory} pairs.
5560
*
5661
* @author Christoph Strobl
5762
* @author Mark Paluch
63+
* @author Piotr Mionskowski
5864
*/
5965
@MethodSource("testParams")
6066
public class RedisCacheTests {
@@ -415,35 +421,85 @@ void cacheShouldFailOnNonConvertibleCacheKey() {
415421
}
416422

417423
@ParameterizedRedisTest // GH-2079
418-
void multipleThreadsLoadValueOnce() {
424+
void multipleThreadsLoadValueOnce() throws InterruptedException {
419425

420-
int threadCount = 5;
426+
int threadCount = 2;
421427

422-
ConcurrentMap<Integer, Integer> valuesByThreadId = new ConcurrentHashMap<>(threadCount);
428+
CountDownLatch prepare = new CountDownLatch(threadCount);
429+
CountDownLatch prepareForReturn = new CountDownLatch(1);
430+
CountDownLatch finished = new CountDownLatch(threadCount);
431+
AtomicInteger retrievals = new AtomicInteger();
432+
AtomicReference<byte[]> storage = new AtomicReference<>();
423433

424-
CountDownLatch waiter = new CountDownLatch(threadCount);
434+
cache = new RedisCache("foo", new RedisCacheWriter() {
435+
@Override
436+
public void put(String name, byte[] key, byte[] value, @Nullable Duration ttl) {
437+
storage.set(value);
438+
}
425439

426-
AtomicInteger threadIds = new AtomicInteger(0);
440+
@Override
441+
public byte[] get(String name, byte[] key) {
427442

428-
AtomicInteger currentValueForKey = new AtomicInteger(0);
443+
prepare.countDown();
444+
try {
445+
prepareForReturn.await(1, TimeUnit.MINUTES);
446+
} catch (InterruptedException e) {
447+
throw new RuntimeException(e);
448+
}
429449

430-
Stream.generate(threadIds::getAndIncrement)
431-
.limit(threadCount)
432-
.parallel()
433-
.forEach((threadId) -> {
434-
waiter.countDown();
435-
try {
436-
waiter.await();
437-
} catch (InterruptedException e) {
438-
e.printStackTrace();
439-
}
440-
Integer valueForThread = cache.get("key", currentValueForKey::incrementAndGet);
441-
valuesByThreadId.put(threadId, valueForThread);
442-
});
450+
return storage.get();
451+
}
443452

444-
valuesByThreadId.forEach((thread, valueForThread) -> {
445-
assertThat(valueForThread).isEqualTo(currentValueForKey.get());
446-
});
453+
@Override
454+
public byte[] putIfAbsent(String name, byte[] key, byte[] value, @Nullable Duration ttl) {
455+
return new byte[0];
456+
}
457+
458+
@Override
459+
public void remove(String name, byte[] key) {
460+
461+
}
462+
463+
@Override
464+
public void clean(String name, byte[] pattern) {
465+
466+
}
467+
468+
@Override
469+
public void clearStatistics(String name) {
470+
471+
}
472+
473+
@Override
474+
public RedisCacheWriter withStatisticsCollector(CacheStatisticsCollector cacheStatisticsCollector) {
475+
return null;
476+
}
477+
478+
@Override
479+
public CacheStatistics getCacheStatistics(String cacheName) {
480+
return null;
481+
}
482+
}, RedisCacheConfiguration.defaultCacheConfig());
483+
484+
ThreadPoolExecutor tpe = new ThreadPoolExecutor(threadCount, threadCount, 1, TimeUnit.MINUTES,
485+
new LinkedBlockingDeque<>(), new DefaultThreadFactory("RedisCacheTests"));
486+
487+
IntStream.range(0, threadCount).forEach(it -> tpe.submit(() -> {
488+
cache.get("foo", retrievals::incrementAndGet);
489+
finished.countDown();
490+
}));
491+
492+
// wait until all Threads have arrived in RedisCacheWriter.get(…)
493+
prepare.await();
494+
495+
// let all threads continue
496+
prepareForReturn.countDown();
497+
498+
// wait until ThreadPoolExecutor has completed.
499+
finished.await();
500+
tpe.shutdown();
501+
502+
assertThat(retrievals).hasValue(1);
447503
}
448504

449505
void doWithConnection(Consumer<RedisConnection> callback) {

0 commit comments

Comments
 (0)