diff --git a/pom.xml b/pom.xml
index 9d60cefa32..20d7780592 100644
--- a/pom.xml
+++ b/pom.xml
@@ -5,7 +5,7 @@
org.springframework.data
spring-data-redis
- 2.2.0.BUILD-SNAPSHOT
+ 2.2.0.DATAREDIS-976-SNAPSHOT
Spring Data Redis
diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/ClusterConnectionProvider.java b/src/main/java/org/springframework/data/redis/connection/lettuce/ClusterConnectionProvider.java
index 902b2c891b..3ebd799a2c 100644
--- a/src/main/java/org/springframework/data/redis/connection/lettuce/ClusterConnectionProvider.java
+++ b/src/main/java/org/springframework/data/redis/connection/lettuce/ClusterConnectionProvider.java
@@ -19,6 +19,7 @@
import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
+import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
@@ -33,6 +34,7 @@
*
* @author Mark Paluch
* @author Christoph Strobl
+ * @author Bruce Cloud
* @since 2.0
*/
class ClusterConnectionProvider implements LettuceConnectionProvider, RedisClientProvider {
@@ -93,7 +95,8 @@ class ClusterConnectionProvider implements LettuceConnectionProvider, RedisClien
}
}
- if (connectionType.equals(StatefulRedisPubSubConnection.class)) {
+ if (connectionType.equals(StatefulRedisPubSubConnection.class)
+ || connectionType.equals(StatefulRedisClusterPubSubConnection.class)) {
return client.connectPubSubAsync(codec) //
.thenApply(connectionType::cast);
diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnection.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnection.java
index 28a114e024..bec2054973 100644
--- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnection.java
+++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnection.java
@@ -22,7 +22,6 @@
import io.lettuce.core.cluster.SlotHash;
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
import io.lettuce.core.cluster.api.sync.RedisClusterCommands;
-import io.lettuce.core.cluster.models.partitions.Partitions;
import lombok.RequiredArgsConstructor;
import java.time.Duration;
@@ -35,6 +34,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+
import org.springframework.beans.factory.DisposableBean;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.DataAccessResourceFailureException;
@@ -54,6 +54,9 @@
import org.springframework.util.ObjectUtils;
/**
+ * {@code RedisClusterConnection} implementation on top of Lettuce
+ * Redis client.
+ *
* @author Christoph Strobl
* @author Mark Paluch
* @since 1.7
@@ -64,7 +67,6 @@ public class LettuceClusterConnection extends LettuceConnection implements Defau
new LettuceExceptionConverter());
private final Log log = LogFactory.getLog(getClass());
- private final RedisClusterClient clusterClient;
private ClusterCommandExecutor clusterCommandExecutor;
private ClusterTopologyProvider topologyProvider;
@@ -119,8 +121,7 @@ public LettuceClusterConnection(LettuceConnectionProvider connectionProvider) {
Assert.isTrue(connectionProvider instanceof ClusterConnectionProvider,
"LettuceConnectionProvider must be a ClusterConnectionProvider.");
- this.clusterClient = getClient();
- this.topologyProvider = new LettuceClusterTopologyProvider(this.clusterClient);
+ this.topologyProvider = new LettuceClusterTopologyProvider(getClient());
this.clusterCommandExecutor = new ClusterCommandExecutor(this.topologyProvider,
new LettuceClusterNodeResourceProvider(getConnectionProvider()), exceptionConverter);
this.disposeClusterCommandExecutorOnClose = true;
@@ -156,8 +157,7 @@ public LettuceClusterConnection(LettuceConnectionProvider connectionProvider, Cl
Assert.isTrue(connectionProvider instanceof ClusterConnectionProvider,
"LettuceConnectionProvider must be a ClusterConnectionProvider.");
- this.clusterClient = getClient();
- this.topologyProvider = new LettuceClusterTopologyProvider(this.clusterClient);
+ this.topologyProvider = new LettuceClusterTopologyProvider(getClient());
this.clusterCommandExecutor = executor;
this.disposeClusterCommandExecutorOnClose = false;
}
@@ -168,22 +168,20 @@ public LettuceClusterConnection(LettuceConnectionProvider connectionProvider, Cl
*
* @param sharedConnection may be {@literal null} if no shared connection used.
* @param connectionProvider must not be {@literal null}.
- * @param clusterClient must not be {@literal null}.
+ * @param clusterTopologyProvider must not be {@literal null}.
* @param executor must not be {@literal null}.
* @param timeout must not be {@literal null}.
* @since 2.1
*/
- LettuceClusterConnection(@Nullable StatefulRedisClusterConnection sharedConnection,
- LettuceConnectionProvider connectionProvider, RedisClusterClient clusterClient, ClusterCommandExecutor executor,
- Duration timeout) {
+ protected LettuceClusterConnection(@Nullable StatefulRedisClusterConnection sharedConnection,
+ LettuceConnectionProvider connectionProvider, ClusterTopologyProvider clusterTopologyProvider,
+ ClusterCommandExecutor executor, Duration timeout) {
super(sharedConnection, connectionProvider, timeout.toMillis(), 0);
Assert.notNull(executor, "ClusterCommandExecutor must not be null.");
- Assert.notNull(clusterClient, "RedisClusterClient must not be null.");
- this.clusterClient = clusterClient;
- this.topologyProvider = new LettuceClusterTopologyProvider(clusterClient);
+ this.topologyProvider = clusterTopologyProvider;
this.clusterCommandExecutor = executor;
this.disposeClusterCommandExecutorOnClose = false;
}
@@ -203,13 +201,6 @@ private RedisClusterClient getClient() {
connectionProvider.getClass().getName()));
}
- /**
- * @return access to {@link RedisClusterClient} for non-connection access.
- */
- private Partitions getPartitions() {
- return clusterClient.getPartitions();
- }
-
/*
* (non-Javadoc)
* @see org.springframework.data.redis.connection.lettuce.LettuceConnection#geoCommands()
@@ -328,7 +319,11 @@ public Integer clusterGetSlotForKey(byte[] key) {
@Override
public RedisClusterNode clusterGetNodeForSlot(int slot) {
- return LettuceConverters.toRedisClusterNode(getPartitions().getPartitionBySlot(slot));
+ Set nodes = topologyProvider.getTopology().getSlotServingNodes(slot);
+ if (nodes.isEmpty()) {
+ return null;
+ }
+ return nodes.iterator().next();
}
/*
@@ -572,7 +567,7 @@ public void select(int dbIndex) {
*/
@Override
public List clusterGetNodes() {
- return LettuceConverters.partitionsToClusterNodes(getPartitions());
+ return new ArrayList<>(topologyProvider.getTopology().getNodes());
}
/*
diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnection.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnection.java
index cc909a798c..1eae30bb07 100644
--- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnection.java
+++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnection.java
@@ -879,7 +879,21 @@ protected StatefulRedisPubSubConnection switchToPubSub() {
}
private LettuceSubscription initSubscription(MessageListener listener) {
- return new LettuceSubscription(listener, switchToPubSub(), connectionProvider);
+ return doCreateSubscription(listener, switchToPubSub(), connectionProvider);
+ }
+
+ /**
+ * Customization hook to create a {@link LettuceSubscription}.
+ *
+ * @param listener the {@link MessageListener} to notify.
+ * @param connection Pub/Sub connection.
+ * @param connectionProvider the {@link LettuceConnectionProvider} for connection release.
+ * @return a {@link LettuceSubscription}.
+ * @since 2.2
+ */
+ protected LettuceSubscription doCreateSubscription(MessageListener listener,
+ StatefulRedisPubSubConnection connection, LettuceConnectionProvider connectionProvider) {
+ return new LettuceSubscription(listener, connection, connectionProvider);
}
void pipeline(LettuceResult result) {
@@ -1250,11 +1264,11 @@ public CommandOutput getTypeHint(CommandType type, CommandOutput defaultType) {
}
@RequiredArgsConstructor
- private class LettucePoolConnectionProvider implements LettuceConnectionProvider {
+ static class LettucePoolConnectionProvider implements LettuceConnectionProvider {
private final LettucePool pool;
- /*
+ /*
* (non-Javadoc)
* @see org.springframework.data.redis.connection.lettuce.LettuceConnectionProvider#getConnection(java.lang.Class)
*/
@@ -1263,7 +1277,7 @@ private class LettucePoolConnectionProvider implements LettuceConnectionProvider
return connectionType.cast(pool.getResource());
}
- /*
+ /*
* (non-Javadoc)
* @see org.springframework.data.redis.connection.lettuce.LettuceConnectionProvider#getConnectionAsync(java.lang.Class)
*/
@@ -1272,7 +1286,7 @@ private class LettucePoolConnectionProvider implements LettuceConnectionProvider
throw new UnsupportedOperationException("Async operations not supported!");
}
- /*
+ /*
* (non-Javadoc)
* @see org.springframework.data.redis.connection.lettuce.LettuceConnectionProvider#release(io.lettuce.core.api.StatefulConnection)
*/
diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactory.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactory.java
index 44d70afe9e..446d206a46 100644
--- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactory.java
+++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactory.java
@@ -15,6 +15,8 @@
*/
package org.springframework.data.redis.connection.lettuce;
+import static org.springframework.data.redis.connection.lettuce.LettuceConnection.*;
+
import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.ClientOptions;
import io.lettuce.core.ReadFrom;
@@ -40,6 +42,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.dao.DataAccessException;
@@ -273,7 +276,7 @@ public void afterPropertiesSet() {
this.client = createClient();
- this.connectionProvider = createConnectionProvider(client, LettuceConnection.CODEC);
+ this.connectionProvider = createConnectionProvider(client, CODEC);
this.reactiveConnectionProvider = createConnectionProvider(client, LettuceReactiveRedisConnection.CODEC);
if (isClusterAware()) {
@@ -341,13 +344,7 @@ public RedisConnection getConnection() {
}
LettuceConnection connection;
-
- if (pool != null) {
- connection = new LettuceConnection(getSharedConnection(), getTimeout(), null, pool, getDatabase());
- } else {
- connection = new LettuceConnection(getSharedConnection(), connectionProvider, getTimeout(), getDatabase());
- }
-
+ connection = doCreateLettuceConnection(getSharedConnection(), connectionProvider, getTimeout(), getDatabase());
connection.setConvertPipelineAndTxResults(convertPipelineAndTxResults);
return connection;
}
@@ -365,12 +362,51 @@ public RedisClusterConnection getClusterConnection() {
RedisClusterClient clusterClient = (RedisClusterClient) client;
- return getShareNativeConnection()
- ? new LettuceClusterConnection(
- (StatefulRedisClusterConnection) getOrCreateSharedConnection().getConnection(),
- connectionProvider, clusterClient, clusterCommandExecutor, clientConfiguration.getCommandTimeout())
- : new LettuceClusterConnection(null, connectionProvider, clusterClient, clusterCommandExecutor,
- clientConfiguration.getCommandTimeout());
+ StatefulRedisClusterConnection sharedConnection = getShareNativeConnection()
+ ? (StatefulRedisClusterConnection) getOrCreateSharedConnection().getConnection()
+ : null;
+
+ LettuceClusterTopologyProvider topologyProvider = new LettuceClusterTopologyProvider(clusterClient);
+ return doCreateLettuceClusterConnection(sharedConnection, connectionProvider, topologyProvider,
+ clusterCommandExecutor, clientConfiguration.getCommandTimeout());
+ }
+
+ /**
+ * Customization hook for {@link LettuceConnection} creation.
+ *
+ * @param sharedConnection the shared {@link StatefulRedisConnection} if {@link #getShareNativeConnection()} is
+ * {@literal true}; {@literal null} otherwise.
+ * @param connectionProvider the {@link LettuceConnectionProvider} to release connections.
+ * @param timeout command timeout in {@link TimeUnit#MILLISECONDS}.
+ * @param database database index to operate on.
+ * @return the {@link LettuceConnection}.
+ * @since 2.2
+ */
+ protected LettuceConnection doCreateLettuceConnection(StatefulRedisConnection sharedConnection,
+ LettuceConnectionProvider connectionProvider, long timeout, int database) {
+
+ return new LettuceConnection(sharedConnection, connectionProvider, timeout, database);
+ }
+
+ /**
+ * Customization hook for {@link LettuceClusterConnection} creation.
+ *
+ * @param sharedConnection the shared {@link StatefulRedisConnection} if {@link #getShareNativeConnection()} is
+ * {@literal true}; {@literal null} otherwise.
+ * @param connectionProvider the {@link LettuceConnectionProvider} to release connections.
+ * @param topologyProvider the {@link ClusterTopologyProvider}.
+ * @param clusterCommandExecutor the {@link ClusterCommandExecutor} to release connections.
+ * @param commandTimeout command timeout {@link Duration}.
+ * @return the {@link LettuceConnection}.
+ * @since 2.2
+ */
+ protected LettuceClusterConnection doCreateLettuceClusterConnection(
+ StatefulRedisClusterConnection sharedConnection, LettuceConnectionProvider connectionProvider,
+ ClusterTopologyProvider topologyProvider, ClusterCommandExecutor clusterCommandExecutor,
+ Duration commandTimeout) {
+
+ return new LettuceClusterConnection(sharedConnection, connectionProvider, topologyProvider, clusterCommandExecutor,
+ commandTimeout);
}
/*
@@ -909,6 +945,10 @@ protected StatefulConnection getSharedReactiveConnection
private LettuceConnectionProvider createConnectionProvider(AbstractRedisClient client, RedisCodec, ?> codec) {
+ if (this.pool != null) {
+ return new LettucePoolConnectionProvider(this.pool);
+ }
+
LettuceConnectionProvider connectionProvider = doCreateConnectionProvider(client, codec);
if (this.clientConfiguration instanceof LettucePoolingClientConfiguration) {
diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceMessageListener.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceMessageListener.java
index de6c7705d6..2772b82317 100644
--- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceMessageListener.java
+++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceMessageListener.java
@@ -35,7 +35,7 @@ class LettuceMessageListener implements RedisPubSubListener {
this.listener = listener;
}
- /*
+ /*
* (non-Javadoc)
* @see io.lettuce.core.pubsub.RedisPubSubListener#message(java.lang.Object, java.lang.Object)
*/
@@ -43,7 +43,7 @@ public void message(byte[] channel, byte[] message) {
listener.onMessage(new DefaultMessage(channel, message), null);
}
- /*
+ /*
* (non-Javadoc)
* @see io.lettuce.core.pubsub.RedisPubSubListener#message(java.lang.Object, java.lang.Object, java.lang.Object)
*/
@@ -51,25 +51,25 @@ public void message(byte[] pattern, byte[] channel, byte[] message) {
listener.onMessage(new DefaultMessage(channel, message), pattern);
}
- /*
+ /*
* (non-Javadoc)
* @see io.lettuce.core.pubsub.RedisPubSubListener#subscribed(java.lang.Object, long)
*/
public void subscribed(byte[] channel, long count) {}
- /*
+ /*
* (non-Javadoc)
* @see io.lettuce.core.pubsub.RedisPubSubListener#psubscribed(java.lang.Object, long)
*/
public void psubscribed(byte[] pattern, long count) {}
- /*
+ /*
* (non-Javadoc)
* @see io.lettuce.core.pubsub.RedisPubSubListener#unsubscribed(java.lang.Object, long)
*/
public void unsubscribed(byte[] channel, long count) {}
- /*
+ /*
* (non-Javadoc)
* @see io.lettuce.core.pubsub.RedisPubSubListener#punsubscribed(java.lang.Object, long)
*/
diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceSubscription.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceSubscription.java
index c9b785953e..c0a0c9e8ff 100644
--- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceSubscription.java
+++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceSubscription.java
@@ -28,15 +28,23 @@
* @author Mark Paluch
* @author Christoph Strobl
*/
-class LettuceSubscription extends AbstractSubscription {
+public class LettuceSubscription extends AbstractSubscription {
private final StatefulRedisPubSubConnection connection;
private final LettuceMessageListener listener;
private final LettuceConnectionProvider connectionProvider;
private final RedisPubSubCommands pubsub;
- LettuceSubscription(MessageListener listener, StatefulRedisPubSubConnection pubsubConnection,
- LettuceConnectionProvider connectionProvider) {
+ /**
+ * Creates a new {@link LettuceSubscription} given {@link MessageListener}, {@link StatefulRedisPubSubConnection}, and
+ * {@link LettuceConnectionProvider}.
+ *
+ * @param listener the listener to notify, must not be {@literal null}.
+ * @param pubsubConnection must not be {@literal null}.
+ * @param connectionProvider must not be {@literal null}.
+ */
+ protected LettuceSubscription(MessageListener listener,
+ StatefulRedisPubSubConnection pubsubConnection, LettuceConnectionProvider connectionProvider) {
super(listener);
@@ -52,25 +60,25 @@ protected StatefulRedisPubSubConnection getNativeConnection() {
return connection;
}
- /*
+ /*
* (non-Javadoc)
* @see org.springframework.data.redis.connection.util.AbstractSubscription#doClose()
*/
protected void doClose() {
if (!getChannels().isEmpty()) {
- pubsub.unsubscribe(new byte[0]);
+ doUnsubscribe(true, new byte[0]);
}
if (!getPatterns().isEmpty()) {
- pubsub.punsubscribe(new byte[0]);
+ doPUnsubscribe(true, new byte[0]);
}
connection.removeListener(this.listener);
connectionProvider.release(connection);
}
- /*
+ /*
* (non-Javadoc)
* @see org.springframework.data.redis.connection.util.AbstractSubscription#doPsubscribe(byte[][])
*/
@@ -78,7 +86,7 @@ protected void doPsubscribe(byte[]... patterns) {
pubsub.psubscribe(patterns);
}
- /*
+ /*
* (non-Javadoc)
* @see org.springframework.data.redis.connection.util.AbstractSubscription#doPUnsubscribe(boolean, byte[][])
*/
@@ -88,7 +96,7 @@ protected void doPUnsubscribe(boolean all, byte[]... patterns) {
pubsub.punsubscribe(patterns);
}
- /*
+ /*
* (non-Javadoc)
* @see org.springframework.data.redis.connection.util.AbstractSubscription#doSubscribe(byte[][])
*/
@@ -96,7 +104,7 @@ protected void doSubscribe(byte[]... channels) {
pubsub.subscribe(channels);
}
- /*
+ /*
* (non-Javadoc)
* @see org.springframework.data.redis.connection.util.AbstractSubscription#doUnsubscribe(boolean, byte[][])
*/
diff --git a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnectionUnitTests.java b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnectionUnitTests.java
index 8e14db4ed2..94770850cb 100644
--- a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnectionUnitTests.java
+++ b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnectionUnitTests.java
@@ -48,6 +48,7 @@
import org.mockito.junit.MockitoJUnitRunner;
import org.springframework.data.redis.connection.ClusterCommandExecutor;
import org.springframework.data.redis.connection.ClusterNodeResourceProvider;
+import org.springframework.data.redis.connection.ClusterTopologyProvider;
import org.springframework.data.redis.connection.RedisClusterCommands.AddSlots;
import org.springframework.data.redis.connection.RedisClusterNode;
@@ -67,6 +68,7 @@ public class LettuceClusterConnectionUnitTests {
static final byte[] KEY_3_BYTES = KEY_3.getBytes();
@Mock RedisClusterClient clusterMock;
+ @Mock ClusterTopologyProvider topologyProviderMock;
@Mock LettuceConnectionProvider connectionProviderMock;
@Mock ClusterCommandExecutor executorMock;
@@ -363,7 +365,7 @@ public void shouldExecuteOnSharedConnection() {
when(sharedConnectionMock.sync()).thenReturn(sync);
LettuceClusterConnection connection = new LettuceClusterConnection(sharedConnectionMock, connectionProviderMock,
- clusterMock, executorMock, Duration.ZERO);
+ topologyProviderMock, executorMock, Duration.ZERO);
connection.keyCommands().del(KEY_1_BYTES);
@@ -381,7 +383,7 @@ public void shouldExecuteOnDedicatedConnection() {
when(dedicatedConnection.sync()).thenReturn(sync);
LettuceClusterConnection connection = new LettuceClusterConnection(sharedConnectionMock, connectionProviderMock,
- clusterMock, executorMock, Duration.ZERO);
+ topologyProviderMock, executorMock, Duration.ZERO);
connection.listCommands().bLPop(1, KEY_1_BYTES);
diff --git a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceClusterKeyspaceNotificationsTests.java b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceClusterKeyspaceNotificationsTests.java
new file mode 100644
index 0000000000..32b79fafa4
--- /dev/null
+++ b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceClusterKeyspaceNotificationsTests.java
@@ -0,0 +1,194 @@
+/*
+ * Copyright 2019 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.data.redis.connection.lettuce;
+
+import static org.assertj.core.api.Assertions.*;
+
+import io.lettuce.core.RedisClient;
+import io.lettuce.core.RedisURI;
+import io.lettuce.core.SetArgs;
+import io.lettuce.core.api.StatefulRedisConnection;
+import io.lettuce.core.api.sync.RedisCommands;
+import io.lettuce.core.cluster.SlotHash;
+import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
+import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection;
+import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
+
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import org.springframework.data.redis.ConnectionFactoryTracker;
+import org.springframework.data.redis.connection.ClusterCommandExecutor;
+import org.springframework.data.redis.connection.ClusterTopologyProvider;
+import org.springframework.data.redis.connection.MessageListener;
+import org.springframework.data.redis.connection.RedisClusterConnection;
+import org.springframework.data.redis.connection.RedisConfiguration;
+import org.springframework.data.redis.test.util.RedisClusterRule;
+import org.springframework.lang.Nullable;
+
+/**
+ * Integration tests to listen for keyspace notifications.
+ *
+ * @author Mark Paluch
+ */
+public class LettuceClusterKeyspaceNotificationsTests {
+
+ @ClassRule public static RedisClusterRule clusterRule = new RedisClusterRule();
+
+ CustomLettuceConnectionFactory factory;
+ String keyspaceConfig;
+
+ // maps to 127.0.0.1:7381/slot hash 13477
+ String key = "10923";
+
+ @Before
+ public void before() {
+
+ factory = new CustomLettuceConnectionFactory(clusterRule.getConfiguration());
+ factory.setClientResources(LettuceTestClientResources.getSharedClientResources());
+ ConnectionFactoryTracker.add(factory);
+ factory.afterPropertiesSet();
+
+ // enable keyspace events on a specific node.
+ withConnection("127.0.0.1", 7381, commands -> {
+
+ keyspaceConfig = commands.configGet("*").get("notify-keyspace-events");
+ commands.configSet("notify-keyspace-events", "KEx");
+ });
+
+ assertThat(SlotHash.getSlot(key)).isEqualTo(13477);
+ }
+
+ @After
+ public void tearDown() {
+
+ // Restore previous settings.
+ withConnection("127.0.0.1", 7381, commands -> {
+ commands.configSet("notify-keyspace-events", keyspaceConfig);
+ });
+ }
+
+ @Test // DATAREDIS-976
+ public void shouldListenForKeyspaceNotifications() throws Exception {
+
+ CompletableFuture expiry = new CompletableFuture<>();
+
+ RedisClusterConnection connection = factory.getClusterConnection();
+
+ connection.pSubscribe((message, pattern) -> {
+ expiry.complete(new String(message.getBody()) + ":" + new String(message.getChannel()));
+ }, "__keyspace*@*".getBytes());
+
+ withConnection("127.0.0.1", 7381, commands -> {
+ commands.set(key, "foo", SetArgs.Builder.px(1));
+ });
+
+ assertThat(expiry.get(2, TimeUnit.SECONDS)).isEqualTo("expired:__keyspace@0__:10923");
+
+ connection.getSubscription().close();
+ connection.close();
+ }
+
+ private void withConnection(String hostname, int port, Consumer> commandsConsumer) {
+
+ RedisClient client = RedisClient.create(LettuceTestClientResources.getSharedClientResources(),
+ RedisURI.create(hostname, port));
+
+ StatefulRedisConnection connection = client.connect();
+ commandsConsumer.accept(connection.sync());
+
+ connection.close();
+ client.shutdownAsync();
+ }
+
+ static class CustomLettuceConnectionFactory extends LettuceConnectionFactory {
+
+ CustomLettuceConnectionFactory(RedisConfiguration redisConfiguration) {
+ super(redisConfiguration);
+ }
+
+ @Override
+ protected LettuceClusterConnection doCreateLettuceClusterConnection(
+ StatefulRedisClusterConnection sharedConnection, LettuceConnectionProvider connectionProvider,
+ ClusterTopologyProvider topologyProvider, ClusterCommandExecutor clusterCommandExecutor,
+ Duration commandTimeout) {
+ return new CustomLettuceClusterConnection(sharedConnection, connectionProvider, topologyProvider,
+ clusterCommandExecutor, commandTimeout);
+ }
+ }
+
+ static class CustomLettuceClusterConnection extends LettuceClusterConnection {
+
+ CustomLettuceClusterConnection(@Nullable StatefulRedisClusterConnection sharedConnection,
+ LettuceConnectionProvider connectionProvider, ClusterTopologyProvider clusterTopologyProvider,
+ ClusterCommandExecutor executor, Duration timeout) {
+ super(sharedConnection, connectionProvider, clusterTopologyProvider, executor, timeout);
+ }
+
+ @Override
+ protected LettuceSubscription doCreateSubscription(MessageListener listener,
+ StatefulRedisPubSubConnection connection, LettuceConnectionProvider connectionProvider) {
+ return new CustomLettuceSubscription(listener, (StatefulRedisClusterPubSubConnection) connection,
+ connectionProvider);
+ }
+ }
+
+ /**
+ * Customized {@link LettuceSubscription}. Enables
+ * {@link StatefulRedisClusterPubSubConnection#setNodeMessagePropagation(boolean)} and uses
+ * {@link io.lettuce.core.cluster.api.sync.NodeSelection} to subscribe to all master nodes.
+ */
+ static class CustomLettuceSubscription extends LettuceSubscription {
+
+ private final StatefulRedisClusterPubSubConnection connection;
+
+ CustomLettuceSubscription(MessageListener listener, StatefulRedisClusterPubSubConnection connection,
+ LettuceConnectionProvider connectionProvider) {
+ super(listener, connection, connectionProvider);
+ this.connection = connection;
+
+ // Must be enabled for keyspace notification propagation
+ this.connection.setNodeMessagePropagation(true);
+ }
+
+ @Override
+ protected void doPsubscribe(byte[]... patterns) {
+ connection.sync().all().commands().psubscribe(patterns);
+ }
+
+ @Override
+ protected void doPUnsubscribe(boolean all, byte[]... patterns) {
+ connection.sync().all().commands().punsubscribe();
+ }
+
+ @Override
+ protected void doSubscribe(byte[]... channels) {
+ connection.sync().all().commands().subscribe(channels);
+ }
+
+ @Override
+ protected void doUnsubscribe(boolean all, byte[]... channels) {
+ connection.sync().all().commands().unsubscribe();
+ }
+ }
+}
diff --git a/src/test/java/org/springframework/data/redis/listener/PubSubTestParams.java b/src/test/java/org/springframework/data/redis/listener/PubSubTestParams.java
index 6c9757ece8..8ccbd569b5 100644
--- a/src/test/java/org/springframework/data/redis/listener/PubSubTestParams.java
+++ b/src/test/java/org/springframework/data/redis/listener/PubSubTestParams.java
@@ -15,20 +15,24 @@
*/
package org.springframework.data.redis.listener;
-import java.util.Arrays;
+import java.util.ArrayList;
import java.util.Collection;
+import org.junit.runners.model.Statement;
+
import org.springframework.data.redis.ObjectFactory;
import org.springframework.data.redis.Person;
import org.springframework.data.redis.PersonObjectFactory;
import org.springframework.data.redis.RawObjectFactory;
import org.springframework.data.redis.SettingsUtils;
import org.springframework.data.redis.StringObjectFactory;
+import org.springframework.data.redis.connection.RedisClusterConfiguration;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettuceTestClientResources;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
+import org.springframework.data.redis.test.util.RedisClusterRule;
/**
* @author Costin Leau
@@ -76,8 +80,49 @@ public static Collection