|
28 | 28 | import java.util.Collection;
|
29 | 29 | import java.util.Collections;
|
30 | 30 | import java.util.Date;
|
| 31 | +import java.util.concurrent.ConcurrentHashMap; |
| 32 | +import java.util.concurrent.ConcurrentMap; |
| 33 | +import java.util.concurrent.CountDownLatch; |
| 34 | +import java.util.concurrent.atomic.AtomicInteger; |
31 | 35 | import java.util.function.Consumer;
|
| 36 | +import java.util.stream.Stream; |
32 | 37 |
|
33 | 38 | import org.junit.jupiter.api.BeforeEach;
|
34 | 39 |
|
@@ -409,15 +414,47 @@ void cacheShouldFailOnNonConvertibleCacheKey() {
|
409 | 414 | assertThatExceptionOfType(IllegalStateException.class).isThrownBy(() -> cache.put(key, sample));
|
410 | 415 | }
|
411 | 416 |
|
412 |
| - void doWithConnection(Consumer<RedisConnection> callback) { |
413 |
| - RedisConnection connection = connectionFactory.getConnection(); |
414 |
| - try { |
415 |
| - callback.accept(connection); |
416 |
| - } finally { |
417 |
| - connection.close(); |
418 |
| - } |
| 417 | + @ParameterizedRedisTest // GH-2079 |
| 418 | + void multipleThreadsLoadValueOnce() { |
| 419 | + |
| 420 | + int threadCount = 5; |
| 421 | + |
| 422 | + ConcurrentMap<Integer, Integer> valuesByThreadId = new ConcurrentHashMap<>(threadCount); |
| 423 | + |
| 424 | + CountDownLatch waiter = new CountDownLatch(threadCount); |
| 425 | + |
| 426 | + AtomicInteger threadIds = new AtomicInteger(0); |
| 427 | + |
| 428 | + AtomicInteger currentValueForKey = new AtomicInteger(0); |
| 429 | + |
| 430 | + Stream.generate(threadIds::getAndIncrement) |
| 431 | + .limit(threadCount) |
| 432 | + .parallel() |
| 433 | + .forEach((threadId) -> { |
| 434 | + waiter.countDown(); |
| 435 | + try { |
| 436 | + waiter.await(); |
| 437 | + } catch (InterruptedException e) { |
| 438 | + e.printStackTrace(); |
| 439 | + } |
| 440 | + Integer valueForThread = cache.get("key", currentValueForKey::incrementAndGet); |
| 441 | + valuesByThreadId.put(threadId, valueForThread); |
| 442 | + }); |
| 443 | + |
| 444 | + valuesByThreadId.forEach((thread, valueForThread) -> { |
| 445 | + assertThat(valueForThread).isEqualTo(currentValueForKey.get()); |
| 446 | + }); |
419 | 447 | }
|
420 | 448 |
|
| 449 | + void doWithConnection(Consumer<RedisConnection> callback) { |
| 450 | + RedisConnection connection = connectionFactory.getConnection(); |
| 451 | + try { |
| 452 | + callback.accept(connection); |
| 453 | + } finally { |
| 454 | + connection.close(); |
| 455 | + } |
| 456 | + } |
| 457 | + |
421 | 458 | @Data
|
422 | 459 | @NoArgsConstructor
|
423 | 460 | @AllArgsConstructor
|
|
0 commit comments