Skip to content

Commit 48d80e0

Browse files
mp911dechristophstrobl
authored andcommitted
Add support for subscription ready/unsubscription done events.
Original Pull Request: #2052
1 parent 8c05f19 commit 48d80e0

13 files changed

+517
-113
lines changed

src/main/java/org/springframework/data/redis/connection/MessageListener.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,14 @@
1818
import org.springframework.lang.Nullable;
1919

2020
/**
21-
* Listener of messages published in Redis.
21+
* Listener of messages published in Redis. A MessageListener can implement {@link SubscriptionListener} to receive
22+
* notifications for subscription states.
2223
*
2324
* @author Costin Leau
2425
* @author Christoph Strobl
26+
* @see SubscriptionListener
2527
*/
28+
@FunctionalInterface
2629
public interface MessageListener {
2730

2831
/**
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Copyright 2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.data.redis.connection;
17+
18+
/**
19+
* Listener for subscription notifications.
20+
* <p/>
21+
* Subscription notifications are reported by Redis as confirmation for subscribe and unsubscribe operations for
22+
* channels and patterns.
23+
*
24+
* @author Mark Paluch
25+
* @since 2.6
26+
*/
27+
public interface SubscriptionListener {
28+
29+
/**
30+
* Empty {@link SubscriptionListener}.
31+
*/
32+
SubscriptionListener EMPTY = new SubscriptionListener() {};
33+
34+
/**
35+
* Notification when Redis has confirmed a channel subscription.
36+
*
37+
* @param channel name of the channel.
38+
* @param count subscriber count.
39+
*/
40+
default void onChannelSubscribed(byte[] channel, long count) {}
41+
42+
/**
43+
* Notification when Redis has confirmed a channel un-subscription.
44+
*
45+
* @param channel name of the channel.
46+
* @param count subscriber count.
47+
*/
48+
default void onChannelUnsubscribed(byte[] channel, long count) {}
49+
50+
/**
51+
* Notification when Redis has confirmed a pattern subscription.
52+
*
53+
* @param pattern the pattern.
54+
* @param count subscriber count.
55+
*/
56+
default void onPatternSubscribed(byte[] pattern, long count) {}
57+
58+
/**
59+
* Notification when Redis has confirmed a pattern un-subscription.
60+
*
61+
* @param pattern the pattern.
62+
* @param count subscriber count.
63+
*/
64+
default void onPatternUnsubscribed(byte[] pattern, long count) {}
65+
66+
}

src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterConnection.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -450,7 +450,7 @@ public void subscribe(MessageListener listener, byte[]... channels) {
450450
"Connection already subscribed; use the connection Subscription to cancel or add new channels");
451451
}
452452
try {
453-
BinaryJedisPubSub jedisPubSub = new JedisMessageListener(listener);
453+
JedisMessageListener jedisPubSub = new JedisMessageListener(listener);
454454
subscription = new JedisSubscription(listener, jedisPubSub, channels, null);
455455
cluster.subscribe(jedisPubSub, channels);
456456
} catch (Exception ex) {
@@ -466,7 +466,7 @@ public void pSubscribe(MessageListener listener, byte[]... patterns) {
466466
"Connection already subscribed; use the connection Subscription to cancel or add new channels");
467467
}
468468
try {
469-
BinaryJedisPubSub jedisPubSub = new JedisMessageListener(listener);
469+
JedisMessageListener jedisPubSub = new JedisMessageListener(listener);
470470
subscription = new JedisSubscription(listener, jedisPubSub, null, patterns);
471471
cluster.psubscribe(jedisPubSub, patterns);
472472
} catch (Exception ex) {

src/main/java/org/springframework/data/redis/connection/jedis/JedisConnection.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -715,7 +715,7 @@ public void pSubscribe(MessageListener listener, byte[]... patterns) {
715715

716716
doWithJedis(it -> {
717717

718-
BinaryJedisPubSub jedisPubSub = new JedisMessageListener(listener);
718+
JedisMessageListener jedisPubSub = new JedisMessageListener(listener);
719719

720720
subscription = new JedisSubscription(listener, jedisPubSub, null, patterns);
721721
it.psubscribe(jedisPubSub, patterns);
@@ -740,7 +740,7 @@ public void subscribe(MessageListener listener, byte[]... channels) {
740740

741741
doWithJedis(it -> {
742742

743-
BinaryJedisPubSub jedisPubSub = new JedisMessageListener(listener);
743+
JedisMessageListener jedisPubSub = new JedisMessageListener(listener);
744744

745745
subscription = new JedisSubscription(listener, jedisPubSub, channels, null);
746746
it.subscribe(jedisPubSub, channels);

src/main/java/org/springframework/data/redis/connection/jedis/JedisMessageListener.java

+12-7
Original file line numberDiff line numberDiff line change
@@ -15,24 +15,29 @@
1515
*/
1616
package org.springframework.data.redis.connection.jedis;
1717

18+
import redis.clients.jedis.BinaryJedisPubSub;
19+
1820
import org.springframework.data.redis.connection.DefaultMessage;
1921
import org.springframework.data.redis.connection.MessageListener;
22+
import org.springframework.data.redis.connection.SubscriptionListener;
2023
import org.springframework.util.Assert;
2124

22-
import redis.clients.jedis.BinaryJedisPubSub;
23-
2425
/**
2526
* MessageListener adapter on top of Jedis.
2627
*
2728
* @author Costin Leau
29+
* @author Mark Paluch
2830
*/
2931
class JedisMessageListener extends BinaryJedisPubSub {
3032

3133
private final MessageListener listener;
34+
private final SubscriptionListener subscriptionListener;
3235

3336
JedisMessageListener(MessageListener listener) {
34-
Assert.notNull(listener, "message listener is required");
37+
Assert.notNull(listener, "MessageListener is required");
3538
this.listener = listener;
39+
this.subscriptionListener = listener instanceof SubscriptionListener ? (SubscriptionListener) listener
40+
: SubscriptionListener.EMPTY;
3641
}
3742

3843
public void onMessage(byte[] channel, byte[] message) {
@@ -44,18 +49,18 @@ public void onPMessage(byte[] pattern, byte[] channel, byte[] message) {
4449
}
4550

4651
public void onPSubscribe(byte[] pattern, int subscribedChannels) {
47-
// no-op
52+
subscriptionListener.onPatternSubscribed(pattern, subscribedChannels);
4853
}
4954

5055
public void onPUnsubscribe(byte[] pattern, int subscribedChannels) {
51-
// no-op
56+
subscriptionListener.onPatternUnsubscribed(pattern, subscribedChannels);
5257
}
5358

5459
public void onSubscribe(byte[] channel, int subscribedChannels) {
55-
// no-op
60+
subscriptionListener.onChannelSubscribed(channel, subscribedChannels);
5661
}
5762

5863
public void onUnsubscribe(byte[] channel, int subscribedChannels) {
59-
// no-op
64+
subscriptionListener.onChannelUnsubscribed(channel, subscribedChannels);
6065
}
6166
}

src/main/java/org/springframework/data/redis/connection/jedis/JedisSubscription.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ class JedisSubscription extends AbstractSubscription {
3030

3131
private final BinaryJedisPubSub jedisPubSub;
3232

33-
JedisSubscription(MessageListener listener, BinaryJedisPubSub jedisPubSub, @Nullable byte[][] channels,
33+
JedisSubscription(MessageListener listener, JedisMessageListener jedisPubSub, @Nullable byte[][] channels,
3434
@Nullable byte[][] patterns) {
3535
super(listener, channels, patterns);
3636
this.jedisPubSub = jedisPubSub;

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceMessageListener.java

+18-4
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,26 @@
1919

2020
import org.springframework.data.redis.connection.DefaultMessage;
2121
import org.springframework.data.redis.connection.MessageListener;
22+
import org.springframework.data.redis.connection.SubscriptionListener;
2223
import org.springframework.util.Assert;
2324

2425
/**
2526
* MessageListener wrapper around Lettuce {@link RedisPubSubListener}.
2627
*
2728
* @author Costin Leau
29+
* @author Mark Paluch
2830
*/
2931
class LettuceMessageListener implements RedisPubSubListener<byte[], byte[]> {
3032

3133
private final MessageListener listener;
34+
private final SubscriptionListener subscriptionListener;
3235

3336
LettuceMessageListener(MessageListener listener) {
3437
Assert.notNull(listener, "MessageListener must not be null!");
38+
3539
this.listener = listener;
40+
this.subscriptionListener = listener instanceof SubscriptionListener ? (SubscriptionListener) listener
41+
: SubscriptionListener.EMPTY;
3642
}
3743

3844
/*
@@ -55,23 +61,31 @@ public void message(byte[] pattern, byte[] channel, byte[] message) {
5561
* (non-Javadoc)
5662
* @see io.lettuce.core.pubsub.RedisPubSubListener#subscribed(java.lang.Object, long)
5763
*/
58-
public void subscribed(byte[] channel, long count) {}
64+
public void subscribed(byte[] channel, long count) {
65+
subscriptionListener.onChannelSubscribed(channel, count);
66+
}
5967

6068
/*
6169
* (non-Javadoc)
6270
* @see io.lettuce.core.pubsub.RedisPubSubListener#psubscribed(java.lang.Object, long)
6371
*/
64-
public void psubscribed(byte[] pattern, long count) {}
72+
public void psubscribed(byte[] pattern, long count) {
73+
subscriptionListener.onPatternSubscribed(pattern, count);
74+
}
6575

6676
/*
6777
* (non-Javadoc)
6878
* @see io.lettuce.core.pubsub.RedisPubSubListener#unsubscribed(java.lang.Object, long)
6979
*/
70-
public void unsubscribed(byte[] channel, long count) {}
80+
public void unsubscribed(byte[] channel, long count) {
81+
subscriptionListener.onChannelUnsubscribed(channel, count);
82+
}
7183

7284
/*
7385
* (non-Javadoc)
7486
* @see io.lettuce.core.pubsub.RedisPubSubListener#punsubscribed(java.lang.Object, long)
7587
*/
76-
public void punsubscribed(byte[] pattern, long count) {}
88+
public void punsubscribed(byte[] pattern, long count) {
89+
subscriptionListener.onPatternUnsubscribed(pattern, count);
90+
}
7791
}

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceSubscription.java

+24-3
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,13 @@
1616
package org.springframework.data.redis.connection.lettuce;
1717

1818
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
19+
import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands;
1920
import io.lettuce.core.pubsub.api.sync.RedisPubSubCommands;
2021

22+
import java.util.ArrayList;
23+
import java.util.List;
24+
import java.util.concurrent.CompletableFuture;
25+
2126
import org.springframework.data.redis.connection.MessageListener;
2227
import org.springframework.data.redis.connection.util.AbstractSubscription;
2328

@@ -37,6 +42,7 @@ public class LettuceSubscription extends AbstractSubscription {
3742
private final LettuceMessageListener listener;
3843
private final LettuceConnectionProvider connectionProvider;
3944
private final RedisPubSubCommands<byte[], byte[]> pubsub;
45+
private final RedisPubSubAsyncCommands<byte[], byte[]> pubSubAsync;
4046

4147
/**
4248
* Creates a new {@link LettuceSubscription} given {@link MessageListener}, {@link StatefulRedisPubSubConnection}, and
@@ -55,6 +61,7 @@ protected LettuceSubscription(MessageListener listener,
5561
this.listener = new LettuceMessageListener(listener);
5662
this.connectionProvider = connectionProvider;
5763
this.pubsub = connection.sync();
64+
this.pubSubAsync = connection.async();
5865

5966
this.connection.addListener(this.listener);
6067
}
@@ -70,15 +77,29 @@ protected StatefulRedisPubSubConnection<byte[], byte[]> getNativeConnection() {
7077
@Override
7178
protected void doClose() {
7279

80+
List<CompletableFuture<?>> futures = new ArrayList<>();
81+
7382
if (!getChannels().isEmpty()) {
74-
doUnsubscribe(true);
83+
futures.add(pubSubAsync.unsubscribe().toCompletableFuture());
7584
}
7685

7786
if (!getPatterns().isEmpty()) {
78-
doPUnsubscribe(true);
87+
futures.add(pubSubAsync.punsubscribe().toCompletableFuture());
88+
}
89+
90+
if (!futures.isEmpty()) {
91+
92+
// this is to ensure completion of the futures and result processing. Since we're unsubscribing first, we expect
93+
// that we receive pub/sub confirmations before the PING response.
94+
futures.add(pubSubAsync.ping().toCompletableFuture());
95+
96+
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).whenComplete((v, t) -> {
97+
connection.removeListener(listener);
98+
});
99+
} else {
100+
connection.removeListener(listener);
79101
}
80102

81-
connection.removeListener(this.listener);
82103
connectionProvider.release(connection);
83104
}
84105

0 commit comments

Comments
 (0)