Skip to content

Commit 26d1cf6

Browse files
committed
Polishing.
Inline valueFromLoader method. Refine tests to not rely on the number of runtime CPU cores. See #2079 Original pull request: #2082.
1 parent 5e1c2be commit 26d1cf6

File tree

2 files changed

+88
-34
lines changed

2 files changed

+88
-34
lines changed

src/main/java/org/springframework/data/redis/cache/RedisCache.java

+7-9
Original file line numberDiff line numberDiff line change
@@ -132,13 +132,19 @@ public <T> T get(Object key, Callable<T> valueLoader) {
132132

133133
@SuppressWarnings("unchecked")
134134
private synchronized <T> T getSynchronized(Object key, Callable<T> valueLoader) {
135+
135136
ValueWrapper result = get(key);
136137

137138
if (result != null) {
138139
return (T) result.get();
139140
}
140141

141-
T value = valueFromLoader(key, valueLoader);
142+
T value;
143+
try {
144+
value = valueLoader.call();
145+
} catch (Exception e) {
146+
throw new ValueRetrievalException(key, valueLoader, e);
147+
}
142148
put(key, value);
143149
return value;
144150
}
@@ -391,12 +397,4 @@ private String prefixCacheKey(String key) {
391397
return cacheConfig.getKeyPrefixFor(name) + key;
392398
}
393399

394-
private static <T> T valueFromLoader(Object key, Callable<T> valueLoader) {
395-
396-
try {
397-
return valueLoader.call();
398-
} catch (Exception e) {
399-
throw new ValueRetrievalException(key, valueLoader, e);
400-
}
401-
}
402400
}

src/test/java/org/springframework/data/redis/cache/RedisCacheTests.java

+81-25
Original file line numberDiff line numberDiff line change
@@ -17,22 +17,26 @@
1717

1818
import static org.assertj.core.api.Assertions.*;
1919

20+
import io.netty.util.concurrent.DefaultThreadFactory;
2021
import lombok.AllArgsConstructor;
2122
import lombok.Data;
2223
import lombok.NoArgsConstructor;
2324
import lombok.RequiredArgsConstructor;
2425

2526
import java.io.Serializable;
2627
import java.nio.charset.StandardCharsets;
28+
import java.time.Duration;
2729
import java.util.Collection;
2830
import java.util.Collections;
2931
import java.util.Date;
30-
import java.util.concurrent.ConcurrentHashMap;
31-
import java.util.concurrent.ConcurrentMap;
3232
import java.util.concurrent.CountDownLatch;
33+
import java.util.concurrent.LinkedBlockingDeque;
34+
import java.util.concurrent.ThreadPoolExecutor;
35+
import java.util.concurrent.TimeUnit;
3336
import java.util.concurrent.atomic.AtomicInteger;
37+
import java.util.concurrent.atomic.AtomicReference;
3438
import java.util.function.Consumer;
35-
import java.util.stream.Stream;
39+
import java.util.stream.IntStream;
3640

3741
import org.junit.jupiter.api.BeforeEach;
3842

@@ -46,13 +50,15 @@
4650
import org.springframework.data.redis.serializer.RedisSerializer;
4751
import org.springframework.data.redis.test.extension.parametrized.MethodSource;
4852
import org.springframework.data.redis.test.extension.parametrized.ParameterizedRedisTest;
53+
import org.springframework.lang.Nullable;
4954

5055
/**
5156
* Tests for {@link RedisCache} with {@link DefaultRedisCacheWriter} using different {@link RedisSerializer} and
5257
* {@link RedisConnectionFactory} pairs.
5358
*
5459
* @author Christoph Strobl
5560
* @author Mark Paluch
61+
* @author Piotr Mionskowski
5662
*/
5763
@MethodSource("testParams")
5864
public class RedisCacheTests {
@@ -383,35 +389,85 @@ void cacheShouldFailOnNonConvertibleCacheKey() {
383389
}
384390

385391
@ParameterizedRedisTest // GH-2079
386-
void multipleThreadsLoadValueOnce() {
392+
void multipleThreadsLoadValueOnce() throws InterruptedException {
387393

388-
int threadCount = 5;
394+
int threadCount = 2;
389395

390-
ConcurrentMap<Integer, Integer> valuesByThreadId = new ConcurrentHashMap<>(threadCount);
396+
CountDownLatch prepare = new CountDownLatch(threadCount);
397+
CountDownLatch prepareForReturn = new CountDownLatch(1);
398+
CountDownLatch finished = new CountDownLatch(threadCount);
399+
AtomicInteger retrievals = new AtomicInteger();
400+
AtomicReference<byte[]> storage = new AtomicReference<>();
391401

392-
CountDownLatch waiter = new CountDownLatch(threadCount);
402+
cache = new RedisCache("foo", new RedisCacheWriter() {
403+
@Override
404+
public void put(String name, byte[] key, byte[] value, @Nullable Duration ttl) {
405+
storage.set(value);
406+
}
393407

394-
AtomicInteger threadIds = new AtomicInteger(0);
408+
@Override
409+
public byte[] get(String name, byte[] key) {
395410

396-
AtomicInteger currentValueForKey = new AtomicInteger(0);
411+
prepare.countDown();
412+
try {
413+
prepareForReturn.await(1, TimeUnit.MINUTES);
414+
} catch (InterruptedException e) {
415+
throw new RuntimeException(e);
416+
}
397417

398-
Stream.generate(threadIds::getAndIncrement)
399-
.limit(threadCount)
400-
.parallel()
401-
.forEach((threadId) -> {
402-
waiter.countDown();
403-
try {
404-
waiter.await();
405-
} catch (InterruptedException e) {
406-
e.printStackTrace();
407-
}
408-
Integer valueForThread = cache.get("key", currentValueForKey::incrementAndGet);
409-
valuesByThreadId.put(threadId, valueForThread);
410-
});
418+
return storage.get();
419+
}
411420

412-
valuesByThreadId.forEach((thread, valueForThread) -> {
413-
assertThat(valueForThread).isEqualTo(currentValueForKey.get());
414-
});
421+
@Override
422+
public byte[] putIfAbsent(String name, byte[] key, byte[] value, @Nullable Duration ttl) {
423+
return new byte[0];
424+
}
425+
426+
@Override
427+
public void remove(String name, byte[] key) {
428+
429+
}
430+
431+
@Override
432+
public void clean(String name, byte[] pattern) {
433+
434+
}
435+
436+
@Override
437+
public void clearStatistics(String name) {
438+
439+
}
440+
441+
@Override
442+
public RedisCacheWriter withStatisticsCollector(CacheStatisticsCollector cacheStatisticsCollector) {
443+
return null;
444+
}
445+
446+
@Override
447+
public CacheStatistics getCacheStatistics(String cacheName) {
448+
return null;
449+
}
450+
}, RedisCacheConfiguration.defaultCacheConfig());
451+
452+
ThreadPoolExecutor tpe = new ThreadPoolExecutor(threadCount, threadCount, 1, TimeUnit.MINUTES,
453+
new LinkedBlockingDeque<>(), new DefaultThreadFactory("RedisCacheTests"));
454+
455+
IntStream.range(0, threadCount).forEach(it -> tpe.submit(() -> {
456+
cache.get("foo", retrievals::incrementAndGet);
457+
finished.countDown();
458+
}));
459+
460+
// wait until all Threads have arrived in RedisCacheWriter.get(…)
461+
prepare.await();
462+
463+
// let all threads continue
464+
prepareForReturn.countDown();
465+
466+
// wait until ThreadPoolExecutor has completed.
467+
finished.await();
468+
tpe.shutdown();
469+
470+
assertThat(retrievals).hasValue(1);
415471
}
416472

417473
void doWithConnection(Consumer<RedisConnection> callback) {

0 commit comments

Comments
 (0)