15
15
*/
16
16
package org .springframework .data .redis .cache ;
17
17
18
- import reactor .core .publisher .Flux ;
19
- import reactor .core .publisher .Mono ;
20
-
21
18
import java .nio .ByteBuffer ;
22
19
import java .nio .charset .StandardCharsets ;
23
20
import java .time .Duration ;
24
21
import java .util .concurrent .CompletableFuture ;
25
22
import java .util .concurrent .TimeUnit ;
26
23
import java .util .concurrent .atomic .AtomicLong ;
24
+ import java .util .function .Consumer ;
27
25
import java .util .function .Function ;
28
26
29
27
import org .springframework .dao .PessimisticLockingFailureException ;
30
28
import org .springframework .data .redis .connection .ReactiveRedisConnection ;
31
29
import org .springframework .data .redis .connection .ReactiveRedisConnectionFactory ;
30
+ import org .springframework .data .redis .connection .ReactiveStringCommands ;
32
31
import org .springframework .data .redis .connection .RedisConnection ;
33
32
import org .springframework .data .redis .connection .RedisConnectionFactory ;
34
33
import org .springframework .data .redis .connection .RedisStringCommands .SetOption ;
39
38
import org .springframework .util .ClassUtils ;
40
39
import org .springframework .util .ObjectUtils ;
41
40
41
+ import reactor .core .publisher .Flux ;
42
+ import reactor .core .publisher .Mono ;
43
+ import reactor .core .publisher .SignalType ;
44
+
42
45
/**
43
46
* {@link RedisCacheWriter} implementation capable of reading/writing binary data from/to Redis in {@literal standalone}
44
47
* and {@literal cluster} environments, and uses a given {@link RedisConnectionFactory} to obtain the actual
45
48
* {@link RedisConnection}.
46
49
* <p>
47
50
* {@link DefaultRedisCacheWriter} can be used in
48
51
* {@link RedisCacheWriter#lockingRedisCacheWriter(RedisConnectionFactory) locking} or
49
- * {@link RedisCacheWriter#nonLockingRedisCacheWriter(RedisConnectionFactory) non-locking} mode. While
50
- * {@literal non-locking} aims for maximum performance it may result in overlapping, non-atomic, command execution for
51
- * operations spanning multiple Redis interactions like {@code putIfAbsent}. The {@literal locking} counterpart prevents
52
- * command overlap by setting an explicit lock key and checking against presence of this key which leads to additional
53
- * requests and potential command wait times.
52
+ * {@link RedisCacheWriter#nonLockingRedisCacheWriter(RedisConnectionFactory) non-locking} mode. While {@literal non-locking}
53
+ * aims for maximum performance it may result in overlapping, non-atomic, command execution for operations spanning
54
+ * multiple Redis interactions like {@code putIfAbsent}. The {@literal locking} counterpart prevents command overlap
55
+ * by setting an explicit lock key and checking against presence of this key which leads to additional requests
56
+ * and potential command wait times.
54
57
*
55
58
* @author Christoph Strobl
56
59
* @author Mark Paluch
60
63
*/
61
64
class DefaultRedisCacheWriter implements RedisCacheWriter {
62
65
63
- private static final boolean REACTIVE_REDIS_CONNECTION_FACTORY_PRESENT = ClassUtils
64
- .isPresent ("org.springframework.data.redis.connection.ReactiveRedisConnectionFactory" , null );
66
+ private static final boolean REACTIVE_REDIS_CONNECTION_FACTORY_PRESENT =
67
+ ClassUtils .isPresent ("org.springframework.data.redis.connection.ReactiveRedisConnectionFactory" , null );
65
68
66
69
private final BatchStrategy batchStrategy ;
67
70
@@ -75,31 +78,21 @@ class DefaultRedisCacheWriter implements RedisCacheWriter {
75
78
76
79
private final AsyncCacheWriter asyncCacheWriter ;
77
80
78
- /**
79
- * @param connectionFactory must not be {@literal null}.
80
- * @param batchStrategy must not be {@literal null}.
81
- */
82
81
DefaultRedisCacheWriter (RedisConnectionFactory connectionFactory , BatchStrategy batchStrategy ) {
83
82
this (connectionFactory , Duration .ZERO , batchStrategy );
84
83
}
85
84
86
85
/**
87
- * @param connectionFactory must not be {@literal null}.
88
- * @param sleepTime sleep time between lock request attempts. Must not be {@literal null}. Use {@link Duration#ZERO}
89
- * to disable locking.
90
- * @param batchStrategy must not be {@literal null}.
86
+ * @param sleepTime sleep time between lock request attempts. Must not be {@literal null}.
87
+ * Use {@link Duration#ZERO} to disable locking.
91
88
*/
92
89
DefaultRedisCacheWriter (RedisConnectionFactory connectionFactory , Duration sleepTime , BatchStrategy batchStrategy ) {
93
90
this (connectionFactory , sleepTime , TtlFunction .persistent (), CacheStatisticsCollector .none (), batchStrategy );
94
91
}
95
92
96
93
/**
97
- * @param connectionFactory must not be {@literal null}.
98
- * @param sleepTime sleep time between lock request attempts. Must not be {@literal null}. Use {@link Duration#ZERO}
99
- * to disable locking.
100
- * @param lockTtl Lock TTL function must not be {@literal null}.
101
- * @param cacheStatisticsCollector must not be {@literal null}.
102
- * @param batchStrategy must not be {@literal null}.
94
+ * @param sleepTime sleep time between lock request attempts. Must not be {@literal null}.
95
+ * Use {@link Duration#ZERO} to disable locking.
103
96
*/
104
97
DefaultRedisCacheWriter (RedisConnectionFactory connectionFactory , Duration sleepTime , TtlFunction lockTtl ,
105
98
CacheStatisticsCollector cacheStatisticsCollector , BatchStrategy batchStrategy ) {
@@ -160,19 +153,19 @@ public CompletableFuture<byte[]> retrieve(String name, byte[] key, @Nullable Dur
160
153
Assert .notNull (name , "Name must not be null" );
161
154
Assert .notNull (key , "Key must not be null" );
162
155
163
- return asyncCacheWriter .retrieve (name , key , ttl ) //
164
- .thenApply (cachedValue -> {
156
+ return asyncCacheWriter .retrieve (name , key , ttl ).thenApply (cachedValue -> {
165
157
166
- statistics .incGets (name );
158
+ statistics .incGets (name );
167
159
168
- if (cachedValue != null ) {
169
- statistics .incHits (name );
170
- } else {
171
- statistics .incMisses (name );
172
- }
160
+ if (cachedValue != null ) {
161
+ statistics .incHits (name );
162
+ }
163
+ else {
164
+ statistics .incMisses (name );
165
+ }
173
166
174
- return cachedValue ;
175
- });
167
+ return cachedValue ;
168
+ });
176
169
}
177
170
178
171
@ Override
@@ -185,8 +178,8 @@ public void put(String name, byte[] key, byte[] value, @Nullable Duration ttl) {
185
178
execute (name , connection -> {
186
179
187
180
if (shouldExpireWithin (ttl )) {
188
- connection .stringCommands (). set ( key , value , Expiration . from ( ttl . toMillis (), TimeUnit . MILLISECONDS ),
189
- SetOption .upsert ());
181
+ connection .stringCommands ()
182
+ . set ( key , value , Expiration . from ( ttl . toMillis (), TimeUnit . MILLISECONDS ), SetOption .upsert ());
190
183
} else {
191
184
connection .stringCommands ().set (key , value );
192
185
}
@@ -204,8 +197,7 @@ public CompletableFuture<Void> store(String name, byte[] key, byte[] value, @Nul
204
197
Assert .notNull (key , "Key must not be null" );
205
198
Assert .notNull (value , "Value must not be null" );
206
199
207
- return asyncCacheWriter .store (name , key , value , ttl ) //
208
- .thenRun (() -> statistics .incPuts (name ));
200
+ return asyncCacheWriter .store (name , key , value , ttl ).thenRun (() -> statistics .incPuts (name ));
209
201
}
210
202
211
203
@ Override
@@ -226,8 +218,8 @@ public byte[] putIfAbsent(String name, byte[] key, byte[] value, @Nullable Durat
226
218
boolean put ;
227
219
228
220
if (shouldExpireWithin (ttl )) {
229
- put = ObjectUtils .nullSafeEquals (
230
- connection . stringCommands () .set (key , value , Expiration .from (ttl ), SetOption .ifAbsent ()), true );
221
+ put = ObjectUtils .nullSafeEquals (connection . stringCommands ()
222
+ .set (key , value , Expiration .from (ttl ), SetOption .ifAbsent ()), true );
231
223
} else {
232
224
put = ObjectUtils .nullSafeEquals (connection .stringCommands ().setNX (key , value ), true );
233
225
}
@@ -377,12 +369,10 @@ private void checkAndPotentiallyWaitUntilUnlocked(String name, RedisConnection c
377
369
Thread .sleep (this .sleepTime .toMillis ());
378
370
}
379
371
} catch (InterruptedException cause ) {
380
-
381
372
// Re-interrupt current Thread to allow other participants to react.
382
373
Thread .currentThread ().interrupt ();
383
-
384
- throw new PessimisticLockingFailureException (String .format ("Interrupted while waiting to unlock cache %s" , name ),
385
- cause );
374
+ String message = String .format ("Interrupted while waiting to unlock cache %s" , name );
375
+ throw new PessimisticLockingFailureException (message , cause );
386
376
} finally {
387
377
this .statistics .incLockTime (name , System .nanoTime () - lockWaitTimeNs );
388
378
}
@@ -418,8 +408,8 @@ interface AsyncCacheWriter {
418
408
* @param name the cache name from which to retrieve the cache entry.
419
409
* @param key the cache entry key.
420
410
* @param ttl optional TTL to set for Time-to-Idle eviction.
421
- * @return a future that completes either with a value if the value exists or completing with {@code null} if the
422
- * cache does not contain an entry.
411
+ * @return a future that completes either with a value if the value exists or completing with {@code null}
412
+ * if the cache does not contain an entry.
423
413
*/
424
414
CompletableFuture <byte []> retrieve (String name , byte [] key , @ Nullable Duration ttl );
425
415
@@ -433,6 +423,7 @@ interface AsyncCacheWriter {
433
423
* @return a future that signals completion.
434
424
*/
435
425
CompletableFuture <Void > store (String name , byte [] key , byte [] value , @ Nullable Duration ttl );
426
+
436
427
}
437
428
438
429
/**
@@ -441,6 +432,7 @@ interface AsyncCacheWriter {
441
432
* @since 3.2
442
433
*/
443
434
enum UnsupportedAsyncCacheWriter implements AsyncCacheWriter {
435
+
444
436
INSTANCE ;
445
437
446
438
@ Override
@@ -460,8 +452,8 @@ public CompletableFuture<Void> store(String name, byte[] key, byte[] value, @Nul
460
452
}
461
453
462
454
/**
463
- * Delegate implementing {@link AsyncCacheWriter} to provide asynchronous cache retrieval and storage operations using
464
- * {@link ReactiveRedisConnectionFactory}.
455
+ * Delegate implementing {@link AsyncCacheWriter} to provide asynchronous cache retrieval and storage operations
456
+ * using {@link ReactiveRedisConnectionFactory}.
465
457
*
466
458
* @since 3.2
467
459
*/
@@ -478,19 +470,16 @@ public CompletableFuture<byte[]> retrieve(String name, byte[] key, @Nullable Dur
478
470
return doWithConnection (connection -> {
479
471
480
472
ByteBuffer wrappedKey = ByteBuffer .wrap (key );
481
- Mono <?> cacheLockCheckFlux ;
482
473
483
- if (isLockingCacheWriter ())
484
- cacheLockCheckFlux = waitForLock (connection , name );
485
- else {
486
- cacheLockCheckFlux = Mono .empty ();
487
- }
474
+ Mono <?> cacheLockCheck = isLockingCacheWriter () ? waitForLock (connection , name ) : Mono .empty ();
475
+
476
+ ReactiveStringCommands stringCommands = connection .stringCommands ();
488
477
489
478
Mono <ByteBuffer > get = shouldExpireWithin (ttl )
490
- ? connection . stringCommands () .getEx (wrappedKey , Expiration . from (ttl ))
491
- : connection . stringCommands () .get (wrappedKey );
479
+ ? stringCommands .getEx (wrappedKey , toExpiration (ttl ))
480
+ : stringCommands .get (wrappedKey );
492
481
493
- return cacheLockCheckFlux .then (get ).map (ByteUtils ::getBytes ).toFuture ();
482
+ return cacheLockCheck .then (get ).map (ByteUtils ::getBytes ).toFuture ();
494
483
});
495
484
}
496
485
@@ -499,15 +488,9 @@ public CompletableFuture<Void> store(String name, byte[] key, byte[] value, @Nul
499
488
500
489
return doWithConnection (connection -> {
501
490
502
- Mono <?> mono ;
503
-
504
- if (isLockingCacheWriter ()) {
505
-
506
- mono = Mono .usingWhen (doLock (name , key , value , connection ), unused -> doStore (key , value , ttl , connection ),
507
- unused -> doUnlock (name , connection ));
508
- } else {
509
- mono = doStore (key , value , ttl , connection );
510
- }
491
+ Mono <?> mono = isLockingCacheWriter ()
492
+ ? doLockStoreUnlock (name , key , value , ttl , connection )
493
+ : doStore (key , value , ttl , connection );
511
494
512
495
return mono .then ().toFuture ();
513
496
});
@@ -519,24 +502,31 @@ private Mono<Boolean> doStore(byte[] cacheKey, byte[] value, @Nullable Duration
519
502
ByteBuffer wrappedKey = ByteBuffer .wrap (cacheKey );
520
503
ByteBuffer wrappedValue = ByteBuffer .wrap (value );
521
504
522
- if (shouldExpireWithin (ttl )) {
523
- return connection .stringCommands ().set (wrappedKey , wrappedValue ,
524
- Expiration .from (ttl .toMillis (), TimeUnit .MILLISECONDS ), SetOption .upsert ());
525
- } else {
526
- return connection .stringCommands ().set (wrappedKey , wrappedValue );
527
- }
505
+ ReactiveStringCommands stringCommands = connection .stringCommands ();
506
+
507
+ return shouldExpireWithin (ttl )
508
+ ? stringCommands .set (wrappedKey , wrappedValue , toExpiration (ttl ), SetOption .upsert ())
509
+ : stringCommands .set (wrappedKey , wrappedValue );
510
+ }
511
+
512
+ private Mono <Boolean > doLockStoreUnlock (String name , byte [] key , byte [] value , @ Nullable Duration ttl ,
513
+ ReactiveRedisConnection connection ) {
514
+
515
+ return Mono .usingWhen (doLock (name , key , value , connection ), unused -> doStore (key , value , ttl , connection ),
516
+ unused -> doUnlock (name , connection ));
528
517
}
529
518
530
519
private Mono <Object > doLock (String name , Object contextualKey , @ Nullable Object contextualValue ,
531
520
ReactiveRedisConnection connection ) {
532
521
533
- Expiration expiration = Expiration .from (lockTtl .getTimeToLive (contextualKey , contextualValue ));
522
+ ByteBuffer key = ByteBuffer .wrap (createCacheLockKey (name ));
523
+ ByteBuffer value = ByteBuffer .wrap (new byte [0 ]);
524
+
525
+ Expiration expiration = toExpiration (contextualKey , contextualValue );
534
526
535
- return connection .stringCommands ()
536
- .set (ByteBuffer .wrap (createCacheLockKey (name )), ByteBuffer .wrap (new byte [0 ]), expiration ,
537
- SetOption .SET_IF_ABSENT ) //
538
- .thenReturn (new Object ()); // Ensure we emit an object, otherwise, the Mono.usingWhen operator doesn't run
539
- // the inner resource function.
527
+ return connection .stringCommands ().set (key , value , expiration , SetOption .SET_IF_ABSENT ) //
528
+ // Ensure we emit an object, otherwise, the Mono.usingWhen operator doesn't run the inner resource function.
529
+ .thenReturn (Boolean .TRUE );
540
530
}
541
531
542
532
private Mono <Void > doUnlock (String name , ReactiveRedisConnection connection ) {
@@ -545,28 +535,59 @@ private Mono<Void> doUnlock(String name, ReactiveRedisConnection connection) {
545
535
546
536
private Mono <Void > waitForLock (ReactiveRedisConnection connection , String cacheName ) {
547
537
548
- AtomicLong lockWaitTimeNs = new AtomicLong ();
549
- byte [] cacheLockKey = createCacheLockKey (cacheName );
538
+ AtomicLong lockWaitNanoTime = new AtomicLong ();
539
+
540
+ Consumer <org .reactivestreams .Subscription > setNanoTimeOnLockWait = subscription ->
541
+ lockWaitNanoTime .set (System .nanoTime ());
550
542
551
- Flux < Long > wait = Flux . interval ( Duration . ZERO , sleepTime );
552
- Mono < Boolean > exists = connection . keyCommands (). exists ( ByteBuffer . wrap ( cacheLockKey )). filter ( it -> ! it );
543
+ Consumer < SignalType > recordStatistics = signalType ->
544
+ statistics . incLockTime ( cacheName , System . nanoTime () - lockWaitNanoTime . get () );
553
545
554
- return wait .doOnSubscribe (subscription -> lockWaitTimeNs .set (System .nanoTime ())) //
555
- .flatMap (it -> exists ) //
556
- .doFinally (signalType -> statistics .incLockTime (cacheName , System .nanoTime () - lockWaitTimeNs .get ())) //
546
+ Function <Long , Mono <Boolean >> doCacheLockExistsCheck = lockWaitTime -> connection .keyCommands ()
547
+ .exists (toCacheLockKey (cacheName )).filter (cacheLockKeyExists -> !cacheLockKeyExists );
548
+
549
+ return waitForLock () //
550
+ .doOnSubscribe (setNanoTimeOnLockWait ) //
551
+ .flatMap (doCacheLockExistsCheck ) //
552
+ .doFinally (recordStatistics ) //
557
553
.next () //
558
554
.then ();
559
555
}
560
556
557
+ private Flux <Long > waitForLock () {
558
+ return Flux .interval (Duration .ZERO , sleepTime );
559
+ }
560
+
561
+ private ByteBuffer toCacheLockKey (String cacheName ) {
562
+ return ByteBuffer .wrap (createCacheLockKey (cacheName ));
563
+ }
564
+
565
+ private Expiration toExpiration (Duration ttl ) {
566
+ return Expiration .from (ttl .toMillis (), TimeUnit .MILLISECONDS );
567
+ }
568
+
569
+ private Expiration toExpiration (Object contextualKey , @ Nullable Object contextualValue ) {
570
+ return Expiration .from (lockTtl .getTimeToLive (contextualKey , contextualValue ));
571
+ }
572
+
573
+ private ReactiveRedisConnectionFactory getReactiveConnectionFactory () {
574
+ return (ReactiveRedisConnectionFactory ) DefaultRedisCacheWriter .this .connectionFactory ;
575
+ }
576
+
577
+ private Mono <ReactiveRedisConnection > getReactiveConnection () {
578
+ return Mono .fromSupplier (getReactiveConnectionFactory ()::getReactiveConnection );
579
+ }
580
+
561
581
private <T > CompletableFuture <T > doWithConnection (
562
582
Function <ReactiveRedisConnection , CompletableFuture <T >> callback ) {
563
583
564
- ReactiveRedisConnectionFactory cf = (ReactiveRedisConnectionFactory ) connectionFactory ;
584
+ Function <ReactiveRedisConnection , Mono <T >> commandExecution = connection ->
585
+ Mono .fromCompletionStage (callback .apply (connection ));
586
+
587
+ Mono <T > result = Mono .usingWhen (getReactiveConnection (), commandExecution ,
588
+ ReactiveRedisConnection ::closeLater );
565
589
566
- return Mono .usingWhen (Mono .fromSupplier (cf ::getReactiveConnection ), //
567
- it -> Mono .fromCompletionStage (callback .apply (it )), //
568
- ReactiveRedisConnection ::closeLater ) //
569
- .toFuture ();
590
+ return result .toFuture ();
570
591
}
571
592
}
572
593
}
0 commit comments