15
15
*/
16
16
package org .springframework .data .redis .cache ;
17
17
18
+ import java .nio .ByteBuffer ;
18
19
import java .nio .charset .StandardCharsets ;
19
20
import java .time .Duration ;
21
+ import java .util .concurrent .CompletableFuture ;
20
22
import java .util .concurrent .TimeUnit ;
21
- import java .util .function .Consumer ;
23
+ import java .util .function .BiFunction ;
22
24
import java .util .function .Function ;
25
+ import java .util .function .Supplier ;
23
26
24
27
import org .springframework .dao .PessimisticLockingFailureException ;
28
+ import org .springframework .data .redis .connection .ReactiveRedisConnection ;
29
+ import org .springframework .data .redis .connection .ReactiveRedisConnectionFactory ;
25
30
import org .springframework .data .redis .connection .RedisConnection ;
26
31
import org .springframework .data .redis .connection .RedisConnectionFactory ;
27
32
import org .springframework .data .redis .connection .RedisStringCommands .SetOption ;
28
33
import org .springframework .data .redis .core .types .Expiration ;
34
+ import org .springframework .data .redis .util .ByteUtils ;
29
35
import org .springframework .lang .Nullable ;
30
36
import org .springframework .util .Assert ;
31
37
38
+ import reactor .core .publisher .Flux ;
39
+ import reactor .core .publisher .Mono ;
40
+
32
41
/**
33
42
* {@link RedisCacheWriter} implementation capable of reading/writing binary data from/to Redis in {@literal standalone}
34
43
* and {@literal cluster} environments, and uses a given {@link RedisConnectionFactory} to obtain the actual
@@ -114,8 +123,8 @@ public byte[] get(String name, byte[] key, @Nullable Duration ttl) {
114
123
Assert .notNull (key , "Key must not be null" );
115
124
116
125
byte [] result = shouldExpireWithin (ttl )
117
- ? execute (name , connection -> connection .stringCommands ().getEx (key , Expiration .from (ttl )))
118
- : execute (name , connection -> connection .stringCommands ().get (key ));
126
+ ? execute (name , connection -> connection .stringCommands ().getEx (key , Expiration .from (ttl )))
127
+ : execute (name , connection -> connection .stringCommands ().get (key ));
119
128
120
129
statistics .incGets (name );
121
130
@@ -128,6 +137,81 @@ public byte[] get(String name, byte[] key, @Nullable Duration ttl) {
128
137
return result ;
129
138
}
130
139
140
+ @ Override
141
+ public boolean isRetrieveSupported () {
142
+ return isReactive ();
143
+ }
144
+
145
+ @ Override
146
+ public CompletableFuture <byte []> retrieve (String name , byte [] key , @ Nullable Duration ttl ) {
147
+
148
+ Assert .notNull (name , "Name must not be null" );
149
+ Assert .notNull (key , "Key must not be null" );
150
+
151
+ CompletableFuture <byte []> result = nonBlockingRetrieveFunction (name ).apply (key , ttl );
152
+
153
+ result = result .thenApply (cachedValue -> {
154
+
155
+ statistics .incGets (name );
156
+
157
+ if (cachedValue != null ) {
158
+ statistics .incHits (name );
159
+ } else {
160
+ statistics .incMisses (name );
161
+ }
162
+
163
+ return cachedValue ;
164
+ });
165
+
166
+ return result ;
167
+ }
168
+
169
+ private BiFunction <byte [], Duration , CompletableFuture <byte []>> nonBlockingRetrieveFunction (String cacheName ) {
170
+ return isReactive () ? reactiveRetrieveFunction (cacheName ) : asyncRetrieveFunction (cacheName );
171
+ }
172
+
173
+ // TODO: Possibly remove if we rely on the default Cache.retrieve(..) behavior
174
+ // after assessing RedisCacheWriter.isRetrieveSupported().
175
+ // Function applied for Cache.retrieve(key) when a non-reactive Redis driver is used, such as Jedis.
176
+ private BiFunction <byte [], Duration , CompletableFuture <byte []>> asyncRetrieveFunction (String cacheName ) {
177
+
178
+ return (key , ttl ) -> {
179
+
180
+ Supplier <byte []> getKey = () -> execute (cacheName , connection -> connection .stringCommands ().get (key ));
181
+
182
+ Supplier <byte []> getKeyWithExpiration = () -> execute (cacheName , connection ->
183
+ connection .stringCommands ().getEx (key , Expiration .from (ttl )));
184
+
185
+ return shouldExpireWithin (ttl )
186
+ ? CompletableFuture .supplyAsync (getKeyWithExpiration )
187
+ : CompletableFuture .supplyAsync (getKey );
188
+
189
+ };
190
+ }
191
+
192
+ // Function applied for Cache.retrieve(key) when a reactive Redis driver is used, such as Lettuce.
193
+ private BiFunction <byte [], Duration , CompletableFuture <byte []>> reactiveRetrieveFunction (String cacheName ) {
194
+
195
+ return (key , ttl ) -> {
196
+
197
+ ByteBuffer wrappedKey = ByteBuffer .wrap (key );
198
+
199
+ Flux <?> cacheLockCheckFlux = Flux .interval (Duration .ZERO , this .sleepTime ).takeUntil (count ->
200
+ executeLockFree (connection -> !doCheckLock (cacheName , connection )));
201
+
202
+ Mono <ByteBuffer > getMono = shouldExpireWithin (ttl )
203
+ ? executeReactively (connection -> connection .stringCommands ().getEx (wrappedKey , Expiration .from (ttl )))
204
+ : executeReactively (connection -> connection .stringCommands ().get (wrappedKey ));
205
+
206
+ Mono <ByteBuffer > result = cacheLockCheckFlux .then (getMono );
207
+
208
+ @ SuppressWarnings ("all" )
209
+ Mono <byte []> byteArrayResult = result .map (DefaultRedisCacheWriter ::nullSafeGetBytes );
210
+
211
+ return byteArrayResult .toFuture ();
212
+ };
213
+ }
214
+
131
215
@ Override
132
216
public void put (String name , byte [] key , byte [] value , @ Nullable Duration ttl ) {
133
217
@@ -282,32 +366,42 @@ private Long doUnlock(String name, RedisConnection connection) {
282
366
return connection .keyCommands ().del (createCacheLockKey (name ));
283
367
}
284
368
285
- boolean doCheckLock (String name , RedisConnection connection ) {
286
- return isTrue (connection .keyCommands ().exists (createCacheLockKey (name )));
287
- }
369
+ private <T > T execute (String name , Function <RedisConnection , T > callback ) {
288
370
289
- /**
290
- * @return {@literal true} if {@link RedisCacheWriter} uses locks.
291
- */
292
- private boolean isLockingCacheWriter () {
293
- return !sleepTime .isZero () && !sleepTime .isNegative ();
371
+ try (RedisConnection connection = this .connectionFactory .getConnection ()) {
372
+ checkAndPotentiallyWaitUntilUnlocked (name , connection );
373
+ return callback .apply (connection );
374
+ }
294
375
}
295
376
296
- private <T > T execute ( String name , Function <RedisConnection , T > callback ) {
377
+ private <T > T executeLockFree ( Function <RedisConnection , T > callback ) {
297
378
298
- try (RedisConnection connection = connectionFactory .getConnection ()) {
299
- checkAndPotentiallyWaitUntilUnlocked (name , connection );
379
+ try (RedisConnection connection = this .connectionFactory .getConnection ()) {
300
380
return callback .apply (connection );
301
381
}
302
382
}
303
383
304
- private void executeLockFree ( Consumer < RedisConnection > callback ) {
384
+ private < T > T executeReactively ( Function < ReactiveRedisConnection , T > callback ) {
305
385
306
- try (RedisConnection connection = connectionFactory .getConnection ()) {
307
- callback .accept (connection );
386
+ ReactiveRedisConnection connection = getReactiveRedisConnectionFactory ().getReactiveConnection ();
387
+
388
+ try {
389
+ return callback .apply (connection );
390
+ }
391
+ finally {
392
+ connection .closeLater ();
308
393
}
309
394
}
310
395
396
+ /**
397
+ * Determines whether this {@link RedisCacheWriter} uses locks during caching operations.
398
+ *
399
+ * @return {@literal true} if {@link RedisCacheWriter} uses locks.
400
+ */
401
+ private boolean isLockingCacheWriter () {
402
+ return !this .sleepTime .isZero () && !this .sleepTime .isNegative ();
403
+ }
404
+
311
405
private void checkAndPotentiallyWaitUntilUnlocked (String name , RedisConnection connection ) {
312
406
313
407
if (!isLockingCacheWriter ()) {
@@ -318,29 +412,46 @@ private void checkAndPotentiallyWaitUntilUnlocked(String name, RedisConnection c
318
412
319
413
try {
320
414
while (doCheckLock (name , connection )) {
321
- Thread .sleep (sleepTime .toMillis ());
415
+ Thread .sleep (this . sleepTime .toMillis ());
322
416
}
323
417
} catch (InterruptedException cause ) {
324
418
325
- // Re-interrupt current thread, to allow other participants to react.
419
+ // Re-interrupt current Thread to allow other participants to react.
326
420
Thread .currentThread ().interrupt ();
327
421
328
422
String message = String .format ("Interrupted while waiting to unlock cache %s" , name );
329
423
330
424
throw new PessimisticLockingFailureException (message , cause );
331
425
} finally {
332
- statistics .incLockTime (name , System .nanoTime () - lockWaitTimeNs );
426
+ this . statistics .incLockTime (name , System .nanoTime () - lockWaitTimeNs );
333
427
}
334
428
}
335
429
430
+ boolean doCheckLock (String name , RedisConnection connection ) {
431
+ return isTrue (connection .keyCommands ().exists (createCacheLockKey (name )));
432
+ }
433
+
434
+ private boolean isReactive () {
435
+ return this .connectionFactory instanceof ReactiveRedisConnectionFactory ;
436
+ }
437
+
438
+ private ReactiveRedisConnectionFactory getReactiveRedisConnectionFactory () {
439
+ return (ReactiveRedisConnectionFactory ) this .connectionFactory ;
440
+ }
441
+
336
442
private static byte [] createCacheLockKey (String name ) {
337
443
return (name + "~lock" ).getBytes (StandardCharsets .UTF_8 );
338
444
}
339
445
340
- private boolean isTrue (@ Nullable Boolean value ) {
446
+ private static boolean isTrue (@ Nullable Boolean value ) {
341
447
return Boolean .TRUE .equals (value );
342
448
}
343
449
450
+ @ Nullable
451
+ private static byte [] nullSafeGetBytes (@ Nullable ByteBuffer value ) {
452
+ return value != null ? ByteUtils .getBytes (value ) : null ;
453
+ }
454
+
344
455
private static boolean shouldExpireWithin (@ Nullable Duration ttl ) {
345
456
return ttl != null && !ttl .isZero () && !ttl .isNegative ();
346
457
}
0 commit comments