Skip to content

Commit f53c8b9

Browse files
committed
Improve thread safety of RedisMessageListenerContainer.
RedisMessageListenerContainer relies on 2 threads for subscription when patterns and channels topics are present. With Jedis, since the subscription thread blocks while listening for messages, an additional thread is used to subscribe to patterns while the subscription threads subscribe to channels and block. There were some race conditions between those two threads that could corrupt the Jedis stream since operations are not synchronized in JedisSubscription. A lock on the JedisSubscription instance has been added to enforce that operations on the Jedis stream cannot be affected by a concurrent thread. Additionaly, there were no error handling and retry mechanism on the pattern subscription thread. Multiple conditions could trigger an unexpected behavior here, exceptions were not handled and logged to stderr with no notice. Also, if the connection was not subscribed after 3 tries, the thread would exit silently with no log. Defensive measure have been added to retry redis connection failures and the subscription will now retry indefinitely, unless canceled on shutdown and on the main subscription thread errors. Fixes spring-projects#964 for versions before spring-projects#2256 was introduced.
1 parent 0538bd5 commit f53c8b9

File tree

4 files changed

+301
-32
lines changed

4 files changed

+301
-32
lines changed

src/main/java/org/springframework/data/redis/connection/jedis/JedisSubscription.java

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,13 @@ class JedisSubscription extends AbstractSubscription {
4242
*/
4343
@Override
4444
protected void doClose() {
45-
if (!getChannels().isEmpty()) {
46-
jedisPubSub.unsubscribe();
47-
}
48-
if (!getPatterns().isEmpty()) {
49-
jedisPubSub.punsubscribe();
45+
synchronized (this) {
46+
if (!getChannels().isEmpty()) {
47+
jedisPubSub.unsubscribe();
48+
}
49+
if (!getPatterns().isEmpty()) {
50+
jedisPubSub.punsubscribe();
51+
}
5052
}
5153
}
5254

@@ -56,7 +58,9 @@ protected void doClose() {
5658
*/
5759
@Override
5860
protected void doPsubscribe(byte[]... patterns) {
59-
jedisPubSub.psubscribe(patterns);
61+
synchronized (this) {
62+
jedisPubSub.psubscribe(patterns);
63+
}
6064
}
6165

6266
/*
@@ -65,10 +69,12 @@ protected void doPsubscribe(byte[]... patterns) {
6569
*/
6670
@Override
6771
protected void doPUnsubscribe(boolean all, byte[]... patterns) {
68-
if (all) {
69-
jedisPubSub.punsubscribe();
70-
} else {
71-
jedisPubSub.punsubscribe(patterns);
72+
synchronized (this) {
73+
if (all) {
74+
jedisPubSub.punsubscribe();
75+
} else {
76+
jedisPubSub.punsubscribe(patterns);
77+
}
7278
}
7379
}
7480

@@ -78,7 +84,9 @@ protected void doPUnsubscribe(boolean all, byte[]... patterns) {
7884
*/
7985
@Override
8086
protected void doSubscribe(byte[]... channels) {
81-
jedisPubSub.subscribe(channels);
87+
synchronized (this) {
88+
jedisPubSub.subscribe(channels);
89+
}
8290
}
8391

8492
/*
@@ -87,10 +95,12 @@ protected void doSubscribe(byte[]... channels) {
8795
*/
8896
@Override
8997
protected void doUnsubscribe(boolean all, byte[]... channels) {
90-
if (all) {
91-
jedisPubSub.unsubscribe();
92-
} else {
93-
jedisPubSub.unsubscribe(channels);
98+
synchronized (this) {
99+
if (all) {
100+
jedisPubSub.unsubscribe();
101+
} else {
102+
jedisPubSub.unsubscribe(channels);
103+
}
94104
}
95105
}
96106
}

src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java

Lines changed: 47 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.concurrent.CopyOnWriteArraySet;
2626
import java.util.concurrent.Executor;
2727
import java.util.concurrent.TimeUnit;
28+
import java.util.concurrent.atomic.AtomicBoolean;
2829

2930
import org.apache.commons.logging.Log;
3031
import org.apache.commons.logging.LogFactory;
@@ -69,6 +70,7 @@
6970
* @author Way Joke
7071
* @author Thomas Darimont
7172
* @author Mark Paluch
73+
* @author Jacques-Etienne Beaudet
7274
*/
7375
public class RedisMessageListenerContainer implements InitializingBean, DisposableBean, BeanNameAware, SmartLifecycle {
7476

@@ -678,7 +680,6 @@ protected void sleepBeforeRecoveryAttempt() {
678680
* @author Costin Leau
679681
*/
680682
private class SubscriptionTask implements SchedulingAwareRunnable {
681-
682683
/**
683684
* Runnable used, on a parallel thread, to do the initial pSubscribe. This is required since, during initialization,
684685
* both subscribe and pSubscribe might be needed but since the first call is blocking, the second call needs to
@@ -689,36 +690,55 @@ private class SubscriptionTask implements SchedulingAwareRunnable {
689690
private class PatternSubscriptionTask implements SchedulingAwareRunnable {
690691

691692
private long WAIT = 500;
692-
private long ROUNDS = 3;
693+
private AtomicBoolean isThreadRunning = new AtomicBoolean(true);
694+
693695

694696
public boolean isLongLived() {
695697
return false;
696698
}
697699

700+
void cancel() {
701+
isThreadRunning.set(false);
702+
}
703+
698704
public void run() {
699705
// wait for subscription to be initialized
700706
boolean done = false;
701707
// wait 3 rounds for subscription to be initialized
702-
for (int i = 0; i < ROUNDS && !done; i++) {
708+
while(!done && isThreadRunning.get() && !Thread.currentThread().isInterrupted()) {
703709
if (connection != null) {
704-
synchronized (localMonitor) {
705-
if (connection.isSubscribed()) {
706-
done = true;
707-
connection.getSubscription().pSubscribe(unwrap(patternMapping.keySet()));
708-
} else {
709-
try {
710-
Thread.sleep(WAIT);
711-
} catch (InterruptedException ex) {
712-
Thread.currentThread().interrupt();
710+
try {
711+
if (connection.isSubscribed()) {
712+
synchronized (localMonitor) {
713+
connection.getSubscription().pSubscribe(unwrap(patternMapping.keySet()));
714+
done = true;
715+
}
716+
} else {
717+
try {
718+
Thread.sleep(WAIT);
719+
} catch (InterruptedException ex) {
720+
logger.info("PatternSubscriptionTask was interrupted, exiting.");
721+
Thread.currentThread().interrupt();
722+
return;
723+
}
724+
}
725+
} catch(Throwable e) {
726+
if (e instanceof RedisConnectionFailureException) {
727+
if (isRunning() && isThreadRunning.get()) {
728+
logger.error("Connection failure occurred on pattern subscription task. Restarting subscription task after " + recoveryInterval + " ms");
729+
sleepBeforeRecoveryAttempt();
730+
}
731+
} else {
732+
logger.error("PatternSubscriptionTask aborted with exception:", e);
713733
return;
714734
}
715735
}
716-
}
717736
}
718737
}
719738
}
720739
}
721740

741+
private volatile @Nullable PatternSubscriptionTask patternSubscriptionTask;
722742
private volatile @Nullable RedisConnection connection;
723743
private boolean subscriptionTaskRunning = false;
724744
private final Object localMonitor = new Object();
@@ -759,6 +779,7 @@ public void run() {
759779
}
760780
}
761781
} catch (Throwable t) {
782+
cancelPatternSubscriptionTask();
762783
handleSubscriptionException(t);
763784
} finally {
764785
// this block is executed once the subscription thread has ended, this may or may not mean
@@ -789,7 +810,8 @@ private SubscriptionPresentCondition eventuallyPerformSubscription() {
789810
condition = new SubscriptionPresentCondition();
790811
} else {
791812
// schedule the rest of the subscription
792-
subscriptionExecutor.execute(new PatternSubscriptionTask());
813+
patternSubscriptionTask = new PatternSubscriptionTask();
814+
subscriptionExecutor.execute(patternSubscriptionTask);
793815
condition = new PatternSubscriptionPresentCondition();
794816
}
795817

@@ -815,7 +837,7 @@ public boolean passes() {
815837
* Checks whether the current connection has an associated pattern subscription.
816838
*
817839
* @author Thomas Darimont
818-
* @see org.springframework.data.redis.listener.RedisMessageListenerContainer.SubscriptionTask.SubscriptionPresentTestCondition
840+
* @see org.springframework.data.redis.listener.RedisMessageListenerContainer.SubscriptionTask.SubscriptionPresentCondition
819841
*/
820842
private class PatternSubscriptionPresentCondition extends SubscriptionPresentCondition {
821843

@@ -840,7 +862,17 @@ private byte[][] unwrap(Collection<ByteArrayWrapper> holders) {
840862
return unwrapped;
841863
}
842864

865+
private void cancelPatternSubscriptionTask() {
866+
if(patternSubscriptionTask != null) {
867+
synchronized (localMonitor) {
868+
patternSubscriptionTask.cancel();
869+
patternSubscriptionTask = null;
870+
}
871+
}
872+
}
873+
843874
void cancel() {
875+
cancelPatternSubscriptionTask();
844876

845877
if (!listening || connection == null) {
846878
return;

src/test/java/org/springframework/data/redis/listener/PubSubTests.java

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030

3131
import org.junit.jupiter.api.AfterEach;
3232
import org.junit.jupiter.api.BeforeEach;
33-
3433
import org.springframework.core.task.SimpleAsyncTaskExecutor;
3534
import org.springframework.core.task.SyncTaskExecutor;
3635
import org.springframework.data.redis.ObjectFactory;
@@ -55,12 +54,14 @@
5554
public class PubSubTests<T> {
5655

5756
private static final String CHANNEL = "pubsub::test";
57+
private static final String PATTERN = "pattern::pubsub::test";
5858

5959
protected RedisMessageListenerContainer container;
6060
protected ObjectFactory<T> factory;
6161
@SuppressWarnings("rawtypes") protected RedisTemplate template;
6262

6363
private final BlockingDeque<Object> bag = new LinkedBlockingDeque<>(99);
64+
private final BlockingDeque<Object> patternBag = new LinkedBlockingDeque<>(99);
6465

6566
private final Object handler = new Object() {
6667
@SuppressWarnings("unused")
@@ -69,7 +70,15 @@ public void handleMessage(Object message) {
6970
}
7071
};
7172

73+
private final Object patternHandler = new Object() {
74+
@SuppressWarnings("unused")
75+
public void handleMessage(Object message) {
76+
patternBag.add(message);
77+
}
78+
};
79+
7280
private final MessageListenerAdapter adapter = new MessageListenerAdapter(handler);
81+
private final MessageListenerAdapter patternAdapter = new MessageListenerAdapter(patternHandler);
7382

7483
@SuppressWarnings("rawtypes")
7584
public PubSubTests(ObjectFactory<T> factory, RedisTemplate template) {
@@ -84,10 +93,14 @@ public static Collection<Object[]> testParams() {
8493
@BeforeEach
8594
void setUp() throws Exception {
8695
bag.clear();
96+
patternBag.clear();
8797

8898
adapter.setSerializer(template.getValueSerializer());
8999
adapter.afterPropertiesSet();
90100

101+
patternAdapter.setSerializer(template.getValueSerializer());
102+
patternAdapter.afterPropertiesSet();
103+
91104
Phaser phaser = new Phaser(1);
92105

93106
container = new RedisMessageListenerContainer();
@@ -198,6 +211,34 @@ void testStartListenersToNoSpecificChannelTest() throws InterruptedException {
198211
assertThat(set).contains(payload);
199212
}
200213

214+
@SuppressWarnings("unchecked")
215+
@ParameterizedRedisTest // GH-964
216+
void testStartListenersToBothChannelsAndPatternTopics() throws InterruptedException {
217+
218+
assumeThat(isClusterAware(template.getConnectionFactory())).isFalse();
219+
assumeThat(ConnectionUtils.isJedis(template.getConnectionFactory())).isTrue();
220+
221+
PubSubAwaitUtil.runAndAwaitPatternSubscription(template.getRequiredConnectionFactory(), () -> {
222+
223+
container.addMessageListener(patternAdapter, Collections.singletonList(new PatternTopic(PATTERN + "*")));
224+
container.start();
225+
});
226+
227+
T payload = getT();
228+
229+
template.convertAndSend(PATTERN, payload);
230+
template.convertAndSend(CHANNEL, payload);
231+
232+
Set<T> patternSet = new LinkedHashSet<>();
233+
patternSet.add((T) patternBag.poll(3, TimeUnit.SECONDS));
234+
235+
Set<T> channelSet = new LinkedHashSet<>();
236+
channelSet.add((T) bag.poll(3, TimeUnit.SECONDS));
237+
238+
assertThat(channelSet).contains(payload);
239+
assertThat(patternSet).contains(payload);
240+
}
241+
201242
private static boolean isClusterAware(RedisConnectionFactory connectionFactory) {
202243

203244
if (connectionFactory instanceof LettuceConnectionFactory) {

0 commit comments

Comments
 (0)