22
22
import java .util .LinkedHashMap ;
23
23
import java .util .Map ;
24
24
import java .util .Map .Entry ;
25
- import java .util .Objects ;
26
25
import java .util .UUID ;
27
26
import java .util .concurrent .ConcurrentHashMap ;
28
27
import java .util .concurrent .ExecutionException ;
37
36
import java .util .concurrent .locks .ReentrantLock ;
38
37
import java .util .function .Function ;
39
38
40
- import javax .annotation .Nonnull ;
41
-
42
39
import org .apache .commons .logging .Log ;
43
40
import org .apache .commons .logging .LogFactory ;
44
41
54
51
import org .springframework .data .redis .listener .RedisMessageListenerContainer ;
55
52
import org .springframework .data .redis .listener .Topic ;
56
53
import org .springframework .integration .support .locks .ExpirableLockRegistry ;
54
+ import org .springframework .lang .NonNull ;
57
55
import org .springframework .scheduling .concurrent .CustomizableThreadFactory ;
58
56
import org .springframework .util .Assert ;
59
57
import org .springframework .util .ReflectionUtils ;
@@ -117,10 +115,6 @@ protected boolean removeEldestEntry(Entry<String, RedisLock> eldest) {
117
115
118
116
private final StringRedisTemplate redisTemplate ;
119
117
120
- private final RedisUnLockNotifyMessageListener unlockNotifyMessageListener ;
121
-
122
- private final RedisMessageListenerContainer redisMessageListenerContainer ;
123
-
124
118
private final long expireAfter ;
125
119
126
120
private int cacheCapacity = DEFAULT_CAPACITY ;
@@ -141,8 +135,18 @@ protected boolean removeEldestEntry(Entry<String, RedisLock> eldest) {
141
135
private boolean executorExplicitlySet ;
142
136
143
137
private volatile boolean unlinkAvailable = true ;
138
+
144
139
private volatile boolean isRunningRedisMessageListenerContainer = false ;
145
140
141
+ /**
142
+ * It is set via lazy initialization when it is a {@link RedisLockType#PUB_SUB_LOCK}.
143
+ */
144
+ private volatile RedisUnLockNotifyMessageListener unlockNotifyMessageListener ;
145
+ /**
146
+ * It is set via lazy initialization when it is a {@link RedisLockType#PUB_SUB_LOCK}.
147
+ */
148
+ private volatile RedisMessageListenerContainer redisMessageListenerContainer ;
149
+
146
150
/**
147
151
* Constructs a lock registry with the default (60 second) lock expiration.
148
152
* @param connectionFactory The connection factory.
@@ -165,12 +169,13 @@ public RedisLockRegistry(RedisConnectionFactory connectionFactory, String regist
165
169
this .registryKey = registryKey ;
166
170
this .expireAfter = expireAfter ;
167
171
this .unLockChannelKey = registryKey + "-channel" ;
168
- this .unlockNotifyMessageListener = new RedisUnLockNotifyMessageListener ();
169
- this .redisMessageListenerContainer = new RedisMessageListenerContainer ();
170
- setupUnlockMessageListener (connectionFactory );
171
172
}
172
173
173
174
private void setupUnlockMessageListener (RedisConnectionFactory connectionFactory ) {
175
+ Assert .isNull (RedisLockRegistry .this .redisMessageListenerContainer , "'redisMessageListenerContainer' must not have been re-initialized." );
176
+ Assert .isNull (RedisLockRegistry .this .unlockNotifyMessageListener , "'unlockNotifyMessageListener' must not have been re-initialized." );
177
+ RedisLockRegistry .this .redisMessageListenerContainer = new RedisMessageListenerContainer ();
178
+ RedisLockRegistry .this .unlockNotifyMessageListener = new RedisUnLockNotifyMessageListener ();
174
179
final Topic topic = new ChannelTopic (this .unLockChannelKey );
175
180
this .redisMessageListenerContainer .setConnectionFactory (connectionFactory );
176
181
this .redisMessageListenerContainer .setTaskExecutor (this .executor );
@@ -200,19 +205,23 @@ public void setCacheCapacity(int cacheCapacity) {
200
205
this .cacheCapacity = cacheCapacity ;
201
206
}
202
207
208
+
203
209
/**
204
- * The default is {@link RedisLockType.SPIN_LOCK}<br>
205
- * {@link RedisLockType.SPIN_LOCK}: The lock is acquired by periodically(100ms) checking whether the lock can be acquired.<br>
206
- * {@link RedisLockType.PUB_SUB_LOCK}: The lock is accuired by redis pub-sub. <br>
207
- *
208
- * Set the type of unlockType
209
- * Select the lock method.
210
+ * <p>
211
+ * Because pub-sub does not work in some settings(RedisStaticMasterReplicaConfiguration), The default is {@link RedisLockType.SPIN_LOCK}
212
+ * <ul>
213
+ * <li>{@link RedisLockType#SPIN_LOCK}: The lock is acquired by periodically(100ms) checking whether the lock can be acquired.</li>
214
+ * <li>{@link RedisLockType#PUB_SUB_LOCK}: The lock is accuired by redis pub-sub.</li>
215
+ * </ul>
216
+ * <p>
217
+ * Set the type of unlockType, Select the lock method.
210
218
*
211
219
* @param redisLockType obtain RedisLockType
212
- * @since 6.0.0
220
+ * @since 5.5.13
213
221
*/
214
- public void setRedisLockType (@ Nonnull RedisLockType redisLockType ) {
215
- this .redisLockType = Objects .requireNonNull (redisLockType );
222
+ public void setRedisLockType (@ NonNull RedisLockType redisLockType ) {
223
+ Assert .notNull (redisLockType , "'redisLockType' cannot be null" );
224
+ this .redisLockType = redisLockType ;
216
225
}
217
226
218
227
@ Override
@@ -253,7 +262,7 @@ public enum RedisLockType {
253
262
PUB_SUB_LOCK , SPIN_LOCK ;
254
263
}
255
264
256
- private Function <String , RedisLock > getRedisLockConstructor (@ Nonnull RedisLockType redisLockType ) {
265
+ private Function <String , RedisLock > getRedisLockConstructor (@ NonNull RedisLockType redisLockType ) {
257
266
return switch (redisLockType ) {
258
267
case PUB_SUB_LOCK -> RedisPubSubLock ::new ;
259
268
case SPIN_LOCK -> RedisSpinLock ::new ;
@@ -262,6 +271,20 @@ private Function<String, RedisLock> getRedisLockConstructor(@Nonnull RedisLockTy
262
271
263
272
264
273
private abstract class RedisLock implements Lock {
274
+
275
+ private static final String OBTAIN_LOCK_SCRIPT =
276
+ "local lockClientId = redis.call('GET', KEYS[1])\n " +
277
+ "if lockClientId == ARGV[1] then\n " +
278
+ " redis.call('PEXPIRE', KEYS[1], ARGV[2])\n " +
279
+ " return true\n " +
280
+ "elseif not lockClientId then\n " +
281
+ " redis.call('SET', KEYS[1], ARGV[1], 'PX', ARGV[2])\n " +
282
+ " return true\n " +
283
+ "end\n " +
284
+ "return false" ;
285
+
286
+ protected static final RedisScript <Boolean > obtainLockScript = new DefaultRedisScript <>(OBTAIN_LOCK_SCRIPT , Boolean .class );
287
+
265
288
protected final String lockKey ;
266
289
267
290
private final ReentrantLock localLock = new ReentrantLock ();
@@ -385,6 +408,13 @@ private boolean tryRedisLock(long time) throws ExecutionException, InterruptedEx
385
408
return result ;
386
409
}
387
410
411
+ protected Boolean obtainLock () {
412
+ return RedisLockRegistry .this .redisTemplate
413
+ .execute (obtainLockScript , Collections .singletonList (this .lockKey ),
414
+ RedisLockRegistry .this .clientId ,
415
+ String .valueOf (RedisLockRegistry .this .expireAfter ));
416
+ }
417
+
388
418
@ Override
389
419
public final void unlock () {
390
420
if (!this .localLock .isHeldByCurrentThread ()) {
@@ -495,19 +525,10 @@ public boolean equals(Object obj) {
495
525
private RedisLockRegistry getOuterType () {
496
526
return RedisLockRegistry .this ;
497
527
}
528
+
498
529
}
499
530
500
531
private final class RedisPubSubLock extends RedisLock {
501
- private static final String OBTAIN_LOCK_SCRIPT =
502
- "local lockClientId = redis.call('GET', KEYS[1])\n " +
503
- "if lockClientId == ARGV[1] then\n " +
504
- " redis.call('PEXPIRE', KEYS[1], ARGV[2])\n " +
505
- " return true\n " +
506
- "elseif not lockClientId then\n " +
507
- " redis.call('SET', KEYS[1], ARGV[1], 'PX', ARGV[2])\n " +
508
- " return true\n " +
509
- "end\n " +
510
- "return false" ;
511
532
512
533
private static final String UNLINK_UNLOCK_SCRIPT =
513
534
"if (redis.call('unlink', KEYS[1]) == 1) then " +
@@ -522,9 +543,10 @@ private final class RedisPubSubLock extends RedisLock {
522
543
"return true " +
523
544
"end " +
524
545
"return false" ;
525
- private final RedisScript <Boolean > obtainLockScript = new DefaultRedisScript <>(OBTAIN_LOCK_SCRIPT , Boolean .class );
526
- private final RedisScript <Boolean > unLinkUnLockScript = new DefaultRedisScript <>(UNLINK_UNLOCK_SCRIPT , Boolean .class );
527
- private final RedisScript <Boolean > deleteUnLockScript = new DefaultRedisScript <>(DELETE_UNLOCK_SCRIPT , Boolean .class );
546
+
547
+ private static final RedisScript <Boolean > unLinkUnLockScript = new DefaultRedisScript <>(UNLINK_UNLOCK_SCRIPT , Boolean .class );
548
+
549
+ private static final RedisScript <Boolean > deleteUnLockScript = new DefaultRedisScript <>(DELETE_UNLOCK_SCRIPT , Boolean .class );
528
550
529
551
private RedisPubSubLock (String path ) {
530
552
super (path );
@@ -538,14 +560,14 @@ protected boolean tryRedisLockInner(long time) throws ExecutionException, Interr
538
560
@ Override
539
561
protected void removeLockKeyInnerUnlink () {
540
562
RedisLockRegistry .this .redisTemplate .execute (
541
- this . unLinkUnLockScript , Collections .singletonList (this .lockKey ),
563
+ unLinkUnLockScript , Collections .singletonList (this .lockKey ),
542
564
RedisLockRegistry .this .unLockChannelKey );
543
565
}
544
566
545
567
@ Override
546
568
protected void removeLockKeyInnerDelete () {
547
569
RedisLockRegistry .this .redisTemplate .execute (
548
- this . deleteUnLockScript , Collections .singletonList (this .lockKey ),
570
+ deleteUnLockScript , Collections .singletonList (this .lockKey ),
549
571
RedisLockRegistry .this .unLockChannelKey );
550
572
551
573
}
@@ -557,6 +579,7 @@ private boolean subscribeLock(long time) throws ExecutionException, InterruptedE
557
579
}
558
580
559
581
if (!(RedisLockRegistry .this .isRunningRedisMessageListenerContainer
582
+ && RedisLockRegistry .this .redisMessageListenerContainer != null
560
583
&& RedisLockRegistry .this .redisMessageListenerContainer .isRunning ())) {
561
584
runRedisMessageListenerContainer ();
562
585
}
@@ -586,23 +609,23 @@ private boolean subscribeLock(long time) throws ExecutionException, InterruptedE
586
609
return false ;
587
610
}
588
611
589
- private Boolean obtainLock () {
590
- return RedisLockRegistry .this .redisTemplate
591
- .execute (this .obtainLockScript , Collections .singletonList (this .lockKey ),
592
- RedisLockRegistry .this .clientId ,
593
- String .valueOf (RedisLockRegistry .this .expireAfter ));
594
- }
595
-
596
612
private void runRedisMessageListenerContainer () {
597
- synchronized (RedisLockRegistry .this .redisMessageListenerContainer ) {
613
+ synchronized (RedisLockRegistry .this .locks ) {
598
614
if (!(RedisLockRegistry .this .isRunningRedisMessageListenerContainer
615
+ && RedisLockRegistry .this .redisMessageListenerContainer != null
599
616
&& RedisLockRegistry .this .redisMessageListenerContainer .isRunning ())) {
600
- RedisLockRegistry .this .redisMessageListenerContainer .afterPropertiesSet ();
617
+
618
+ if (RedisLockRegistry .this .redisMessageListenerContainer == null ) {
619
+ setupUnlockMessageListener (RedisLockRegistry .this .redisTemplate .getConnectionFactory ());
620
+ RedisLockRegistry .this .redisMessageListenerContainer .afterPropertiesSet ();
621
+ }
622
+
601
623
RedisLockRegistry .this .redisMessageListenerContainer .start ();
602
624
RedisLockRegistry .this .isRunningRedisMessageListenerContainer = true ;
603
625
}
604
626
}
605
627
}
628
+
606
629
}
607
630
608
631
/**
@@ -636,17 +659,6 @@ private void unlockNotify(String lockKey) {
636
659
}
637
660
638
661
private final class RedisSpinLock extends RedisLock {
639
- private static final String OBTAIN_LOCK_SCRIPT =
640
- "local lockClientId = redis.call('GET', KEYS[1])\n " +
641
- "if lockClientId == ARGV[1] then\n " +
642
- " redis.call('PEXPIRE', KEYS[1], ARGV[2])\n " +
643
- " return true\n " +
644
- "elseif not lockClientId then\n " +
645
- " redis.call('SET', KEYS[1], ARGV[1], 'PX', ARGV[2])\n " +
646
- " return true\n " +
647
- "end\n " +
648
- "return false" ;
649
- private final RedisScript <Boolean > obtainLockScript = new DefaultRedisScript <>(OBTAIN_LOCK_SCRIPT , Boolean .class );
650
662
651
663
private RedisSpinLock (String path ) {
652
664
super (path );
@@ -681,12 +693,5 @@ protected void removeLockKeyInnerDelete() {
681
693
RedisLockRegistry .this .redisTemplate .delete (this .lockKey );
682
694
}
683
695
684
- private boolean obtainLock () {
685
- return RedisLockRegistry .this .redisTemplate
686
- .execute (this .obtainLockScript ,
687
- Collections .singletonList (this .lockKey ),
688
- RedisLockRegistry .this .clientId ,
689
- String .valueOf (RedisLockRegistry .this .expireAfter ));
690
- }
691
696
}
692
697
}
0 commit comments