Skip to content

Commit 7ea1e27

Browse files
authored
Closes #2940 Sharded PubSub subscriptions not recovered after disconnection and re-connection. (#3026)
Backport to 6.4.x branch
1 parent 385275b commit 7ea1e27

File tree

4 files changed

+56
-5
lines changed

4 files changed

+56
-5
lines changed

src/main/java/io/lettuce/core/pubsub/PubSubEndpoint.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,14 @@ public Set<K> getChannels() {
129129
return unwrap(this.channels);
130130
}
131131

132+
public boolean hasShardChannelSubscriptions() {
133+
return !shardChannels.isEmpty();
134+
}
135+
136+
public Set<K> getShardChannels() {
137+
return unwrap(this.shardChannels);
138+
}
139+
132140
public boolean hasPatternSubscriptions() {
133141
return !patterns.isEmpty();
134142
}

src/main/java/io/lettuce/core/pubsub/StatefulRedisPubSubConnectionImpl.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,10 @@ protected List<RedisFuture<Void>> resubscribe() {
130130
result.add(async().subscribe(toArray(endpoint.getChannels())));
131131
}
132132

133+
if (endpoint.hasShardChannelSubscriptions()) {
134+
result.add(async().ssubscribe(toArray(endpoint.getShardChannels())));
135+
}
136+
133137
if (endpoint.hasPatternSubscriptions()) {
134138
result.add(async().psubscribe(toArray(endpoint.getPatterns())));
135139
}

src/test/java/io/lettuce/core/pubsub/PubSubCommandTest.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,8 @@ class PubSubCommandTest extends AbstractRedisClientTest {
8484

8585
BlockingQueue<Long> counts = listener.getCounts();
8686

87+
BlockingQueue<Long> shardCounts = listener.getShardCounts();
88+
8789
String channel = "channel0";
8890

8991
String shardChannel = "shard-channel";
@@ -521,6 +523,24 @@ void resubscribePatternsOnReconnect() throws Exception {
521523
assertThat(messages.take()).isEqualTo(message);
522524
}
523525

526+
@Test
527+
void resubscribeShardChannelsOnReconnect() throws Exception {
528+
pubsub.ssubscribe(shardChannel);
529+
assertThat(shardChannels.take()).isEqualTo(shardChannel);
530+
assertThat((long) shardCounts.take()).isEqualTo(1);
531+
532+
pubsub.quit();
533+
534+
assertThat(shardChannels.take()).isEqualTo(shardChannel);
535+
assertThat((long) shardCounts.take()).isEqualTo(1);
536+
537+
Wait.untilTrue(pubsub::isOpen).waitOrTimeout();
538+
539+
redis.spublish(shardChannel, shardMessage);
540+
assertThat(shardChannels.take()).isEqualTo(shardChannel);
541+
assertThat(messages.take()).isEqualTo(shardMessage);
542+
}
543+
524544
@Test
525545
void adapter() throws Exception {
526546
final BlockingQueue<Long> localCounts = LettuceFactories.newBlockingQueue();

src/test/java/io/lettuce/core/pubsub/StatefulRedisPubSubConnectionImplUnitTests.java

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
import org.junit.jupiter.api.BeforeEach;
1111
import org.junit.jupiter.api.Test;
1212

13-
import static org.junit.Assert.assertEquals;
13+
import static org.junit.jupiter.api.Assertions.assertEquals;
1414
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
1515
import static org.mockito.Mockito.*;
1616

@@ -78,6 +78,7 @@ void resubscribeChannelSubscription() {
7878
when(mockedEndpoint.hasChannelSubscriptions()).thenReturn(true);
7979
when(mockedEndpoint.getChannels()).thenReturn(new HashSet<>(Arrays.asList(new String[] { "channel1", "channel2" })));
8080
when(mockedEndpoint.hasPatternSubscriptions()).thenReturn(false);
81+
when(mockedEndpoint.hasShardChannelSubscriptions()).thenReturn(false);
8182

8283
List<RedisFuture<Void>> subscriptions = connection.resubscribe();
8384
RedisFuture<Void> commandFuture = subscriptions.get(0);
@@ -87,17 +88,35 @@ void resubscribeChannelSubscription() {
8788
}
8889

8990
@Test
90-
void resubscribeChannelAndPatternSubscription() {
91+
void resubscribeShardChannelSubscription() {
92+
when(mockedEndpoint.hasShardChannelSubscriptions()).thenReturn(true);
93+
when(mockedEndpoint.getShardChannels())
94+
.thenReturn(new HashSet<>(Arrays.asList(new String[] { "shard_channel1", "shard_channel2" })));
95+
when(mockedEndpoint.hasChannelSubscriptions()).thenReturn(false);
96+
when(mockedEndpoint.hasPatternSubscriptions()).thenReturn(false);
97+
98+
List<RedisFuture<Void>> subscriptions = connection.resubscribe();
99+
RedisFuture<Void> commandFuture = subscriptions.get(0);
100+
101+
assertEquals(1, subscriptions.size());
102+
assertInstanceOf(AsyncCommand.class, commandFuture);
103+
}
104+
105+
@Test
106+
void resubscribeChannelAndPatternAndShardChanelSubscription() {
91107
when(mockedEndpoint.hasChannelSubscriptions()).thenReturn(true);
92-
when(mockedEndpoint.getChannels()).thenReturn(new HashSet<>(Arrays.asList(new String[] { "channel1", "channel2" })));
93108
when(mockedEndpoint.hasPatternSubscriptions()).thenReturn(true);
109+
when(mockedEndpoint.hasShardChannelSubscriptions()).thenReturn(true);
110+
when(mockedEndpoint.getChannels()).thenReturn(new HashSet<>(Arrays.asList(new String[] { "channel1", "channel2" })));
94111
when(mockedEndpoint.getPatterns()).thenReturn(new HashSet<>(Arrays.asList(new String[] { "bcast*", "echo" })));
95-
112+
when(mockedEndpoint.getShardChannels())
113+
.thenReturn(new HashSet<>(Arrays.asList(new String[] { "shard_channel1", "shard_channel2" })));
96114
List<RedisFuture<Void>> subscriptions = connection.resubscribe();
97115

98-
assertEquals(2, subscriptions.size());
116+
assertEquals(3, subscriptions.size());
99117
assertInstanceOf(AsyncCommand.class, subscriptions.get(0));
100118
assertInstanceOf(AsyncCommand.class, subscriptions.get(1));
119+
assertInstanceOf(AsyncCommand.class, subscriptions.get(1));
101120
}
102121

103122
}

0 commit comments

Comments
 (0)