15
15
*/
16
16
package org .springframework .data .redis .cache ;
17
17
18
+ import java .nio .ByteBuffer ;
18
19
import java .nio .charset .StandardCharsets ;
19
20
import java .time .Duration ;
21
+ import java .util .concurrent .CompletableFuture ;
20
22
import java .util .concurrent .TimeUnit ;
21
- import java .util .function .Consumer ;
23
+ import java .util .function .BiFunction ;
22
24
import java .util .function .Function ;
25
+ import java .util .function .Supplier ;
23
26
24
27
import org .springframework .dao .PessimisticLockingFailureException ;
28
+ import org .springframework .data .redis .connection .ReactiveRedisConnection ;
29
+ import org .springframework .data .redis .connection .ReactiveRedisConnectionFactory ;
25
30
import org .springframework .data .redis .connection .RedisConnection ;
26
31
import org .springframework .data .redis .connection .RedisConnectionFactory ;
27
32
import org .springframework .data .redis .connection .RedisStringCommands .SetOption ;
28
33
import org .springframework .data .redis .core .types .Expiration ;
34
+ import org .springframework .data .redis .util .ByteUtils ;
29
35
import org .springframework .lang .Nullable ;
30
36
import org .springframework .util .Assert ;
31
37
38
+ import reactor .core .publisher .Flux ;
39
+ import reactor .core .publisher .Mono ;
40
+
32
41
/**
33
42
* {@link RedisCacheWriter} implementation capable of reading/writing binary data from/to Redis in {@literal standalone}
34
43
* and {@literal cluster} environments, and uses a given {@link RedisConnectionFactory} to obtain the actual
@@ -114,8 +123,8 @@ public byte[] get(String name, byte[] key, @Nullable Duration ttl) {
114
123
Assert .notNull (key , "Key must not be null" );
115
124
116
125
byte [] result = shouldExpireWithin (ttl )
117
- ? execute (name , connection -> connection .stringCommands ().getEx (key , Expiration .from (ttl )))
118
- : execute (name , connection -> connection .stringCommands ().get (key ));
126
+ ? execute (name , connection -> connection .stringCommands ().getEx (key , Expiration .from (ttl )))
127
+ : execute (name , connection -> connection .stringCommands ().get (key ));
119
128
120
129
statistics .incGets (name );
121
130
@@ -128,6 +137,74 @@ public byte[] get(String name, byte[] key, @Nullable Duration ttl) {
128
137
return result ;
129
138
}
130
139
140
+ @ Override
141
+ public CompletableFuture <byte []> retrieve (String name , byte [] key , @ Nullable Duration ttl ) {
142
+
143
+ Assert .notNull (name , "Name must not be null" );
144
+ Assert .notNull (key , "Key must not be null" );
145
+
146
+ CompletableFuture <byte []> result = nonBlockingRetrieveFunction (name ).apply (key , ttl );
147
+
148
+ result = result .thenApply (cachedValue -> {
149
+
150
+ statistics .incGets (name );
151
+
152
+ if (cachedValue != null ) {
153
+ statistics .incHits (name );
154
+ } else {
155
+ statistics .incMisses (name );
156
+ }
157
+
158
+ return cachedValue ;
159
+ });
160
+
161
+ return result ;
162
+ }
163
+
164
+ private BiFunction <byte [], Duration , CompletableFuture <byte []>> nonBlockingRetrieveFunction (String cacheName ) {
165
+ return isReactive () ? reactiveRetrieveFunction (cacheName ) : asyncRetrieveFunction (cacheName );
166
+ }
167
+
168
+ // Function applied for Cache.retrieve(key) when a non-reactive Redis driver is used, such as Jedis.
169
+ private BiFunction <byte [], Duration , CompletableFuture <byte []>> asyncRetrieveFunction (String cacheName ) {
170
+
171
+ return (key , ttl ) -> {
172
+
173
+ Supplier <byte []> getKey = () -> execute (cacheName , connection -> connection .stringCommands ().get (key ));
174
+
175
+ Supplier <byte []> getKeyWithExpiration = () -> execute (cacheName , connection ->
176
+ connection .stringCommands ().getEx (key , Expiration .from (ttl )));
177
+
178
+ return shouldExpireWithin (ttl )
179
+ ? CompletableFuture .supplyAsync (getKeyWithExpiration )
180
+ : CompletableFuture .supplyAsync (getKey );
181
+
182
+ };
183
+ }
184
+
185
+ // Function applied for Cache.retrieve(key) when a reactive Redis driver is used, such as Lettuce.
186
+ private BiFunction <byte [], Duration , CompletableFuture <byte []>> reactiveRetrieveFunction (String cacheName ) {
187
+
188
+ return (key , ttl ) -> {
189
+
190
+ ByteBuffer wrappedKey = ByteBuffer .wrap (key );
191
+
192
+ Flux <?> cacheLockCheckFlux = Flux .interval (Duration .ZERO , this .sleepTime ).takeUntil (count ->
193
+ executeLockFree (connection -> !doCheckLock (cacheName , connection )));
194
+
195
+ Mono <ByteBuffer > getMono = shouldExpireWithin (ttl )
196
+ ? executeReactively (connection -> connection .stringCommands ().getEx (wrappedKey , Expiration .from (ttl )))
197
+ : executeReactively (connection -> connection .stringCommands ().get (wrappedKey ));
198
+
199
+ Mono <ByteBuffer > result = cacheLockCheckFlux .then (getMono );
200
+
201
+ @ SuppressWarnings ("all" )
202
+ Mono <byte []> byteArrayResult = result .map (DefaultRedisCacheWriter ::nullSafeGetBytes );
203
+
204
+ return byteArrayResult .toFuture ();
205
+ };
206
+ }
207
+
131
208
@ Override
132
209
public void put (String name , byte [] key , byte [] value , @ Nullable Duration ttl ) {
133
210
@@ -282,32 +359,42 @@ private Long doUnlock(String name, RedisConnection connection) {
282
359
return connection .keyCommands ().del (createCacheLockKey (name ));
283
360
}
284
361
285
- boolean doCheckLock (String name , RedisConnection connection ) {
286
- return isTrue (connection .keyCommands ().exists (createCacheLockKey (name )));
287
- }
362
+ private <T > T execute (String name , Function <RedisConnection , T > callback ) {
288
363
289
- /**
290
- * @return {@literal true} if {@link RedisCacheWriter} uses locks.
291
- */
292
- private boolean isLockingCacheWriter () {
293
- return !sleepTime .isZero () && !sleepTime .isNegative ();
364
+ try (RedisConnection connection = this .connectionFactory .getConnection ()) {
365
+ checkAndPotentiallyWaitUntilUnlocked (name , connection );
366
+ return callback .apply (connection );
367
+ }
294
368
}
295
369
296
- private <T > T execute ( String name , Function <RedisConnection , T > callback ) {
370
+ private <T > T executeLockFree ( Function <RedisConnection , T > callback ) {
297
371
298
- try (RedisConnection connection = connectionFactory .getConnection ()) {
299
- checkAndPotentiallyWaitUntilUnlocked (name , connection );
372
+ try (RedisConnection connection = this .connectionFactory .getConnection ()) {
300
373
return callback .apply (connection );
301
374
}
302
375
}
303
376
304
- private void executeLockFree (Consumer <RedisConnection > callback ) {
377
+ private <T > T executeReactively (Function <ReactiveRedisConnection , T > callback ) {
378
+
379
+ ReactiveRedisConnection connection = getReactiveRedisConnectionFactory ().getReactiveConnection ();
305
380
306
- try (RedisConnection connection = connectionFactory .getConnection ()) {
307
- callback .accept (connection );
381
+ try {
382
+ return callback .apply (connection );
383
+ }
384
+ finally {
385
+ connection .closeLater ();
308
386
}
309
387
}
310
388
389
+ /**
390
+ * Determines whether this {@link RedisCacheWriter} uses locks during caching operations.
391
+ *
392
+ * @return {@literal true} if {@link RedisCacheWriter} uses locks.
393
+ */
394
+ private boolean isLockingCacheWriter () {
395
+ return !this .sleepTime .isZero () && !this .sleepTime .isNegative ();
396
+ }
397
+
311
398
private void checkAndPotentiallyWaitUntilUnlocked (String name , RedisConnection connection ) {
312
399
313
400
if (!isLockingCacheWriter ()) {
@@ -318,29 +405,46 @@ private void checkAndPotentiallyWaitUntilUnlocked(String name, RedisConnection c
318
405
319
406
try {
320
407
while (doCheckLock (name , connection )) {
321
- Thread .sleep (sleepTime .toMillis ());
408
+ Thread .sleep (this . sleepTime .toMillis ());
322
409
}
323
410
} catch (InterruptedException cause ) {
324
411
325
- // Re-interrupt current thread, to allow other participants to react.
412
+ // Re-interrupt current Thread to allow other participants to react.
326
413
Thread .currentThread ().interrupt ();
327
414
328
415
String message = String .format ("Interrupted while waiting to unlock cache %s" , name );
329
416
330
417
throw new PessimisticLockingFailureException (message , cause );
331
418
} finally {
332
- statistics .incLockTime (name , System .nanoTime () - lockWaitTimeNs );
419
+ this . statistics .incLockTime (name , System .nanoTime () - lockWaitTimeNs );
333
420
}
334
421
}
335
422
423
+ boolean doCheckLock (String name , RedisConnection connection ) {
424
+ return isTrue (connection .keyCommands ().exists (createCacheLockKey (name )));
425
+ }
426
+
427
+ private boolean isReactive () {
428
+ return this .connectionFactory instanceof ReactiveRedisConnectionFactory ;
429
+ }
430
+
431
+ private ReactiveRedisConnectionFactory getReactiveRedisConnectionFactory () {
432
+ return (ReactiveRedisConnectionFactory ) this .connectionFactory ;
433
+ }
434
+
336
435
private static byte [] createCacheLockKey (String name ) {
337
436
return (name + "~lock" ).getBytes (StandardCharsets .UTF_8 );
338
437
}
339
438
340
- private boolean isTrue (@ Nullable Boolean value ) {
439
+ private static boolean isTrue (@ Nullable Boolean value ) {
341
440
return Boolean .TRUE .equals (value );
342
441
}
343
442
443
+ @ Nullable
444
+ private static byte [] nullSafeGetBytes (@ Nullable ByteBuffer value ) {
445
+ return value != null ? ByteUtils .getBytes (value ) : null ;
446
+ }
447
+
344
448
private static boolean shouldExpireWithin (@ Nullable Duration ttl ) {
345
449
return ttl != null && !ttl .isZero () && !ttl .isNegative ();
346
450
}
0 commit comments