From aec235f9e40497a96d318dc1af80c2cc21616c5c Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Mon, 17 Jun 2019 14:52:13 +0200 Subject: [PATCH 1/4] DATAREDIS-976 - Prepare issue branch. --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 9d60cefa32..20d7780592 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ org.springframework.data spring-data-redis - 2.2.0.BUILD-SNAPSHOT + 2.2.0.DATAREDIS-976-SNAPSHOT Spring Data Redis From 27585c5ce4856af5ef91150f2bc7756aee1f5928 Mon Sep 17 00:00:00 2001 From: Bruce Cloud Date: Mon, 29 Apr 2019 11:23:30 -0400 Subject: [PATCH 2/4] DATAREDIS-976 - Polishing. ClusterConnectionProvider now accepts cluster-specific connection interfaces for Pub/Sub connections. Original pull request: #450. --- .../lettuce/ClusterConnectionProvider.java | 5 ++++- .../connection/lettuce/LettuceClusterConnection.java | 1 + .../redis/connection/lettuce/LettuceConnection.java | 6 +++--- .../connection/lettuce/LettuceMessageListener.java | 12 ++++++------ 4 files changed, 14 insertions(+), 10 deletions(-) diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/ClusterConnectionProvider.java b/src/main/java/org/springframework/data/redis/connection/lettuce/ClusterConnectionProvider.java index 902b2c891b..3ebd799a2c 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/ClusterConnectionProvider.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/ClusterConnectionProvider.java @@ -19,6 +19,7 @@ import io.lettuce.core.api.StatefulConnection; import io.lettuce.core.cluster.RedisClusterClient; import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; +import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection; import io.lettuce.core.codec.RedisCodec; import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; @@ -33,6 +34,7 @@ * * @author Mark Paluch * @author Christoph Strobl + * @author Bruce Cloud * @since 2.0 */ class ClusterConnectionProvider implements LettuceConnectionProvider, RedisClientProvider { @@ -93,7 +95,8 @@ class ClusterConnectionProvider implements LettuceConnectionProvider, RedisClien } } - if (connectionType.equals(StatefulRedisPubSubConnection.class)) { + if (connectionType.equals(StatefulRedisPubSubConnection.class) + || connectionType.equals(StatefulRedisClusterPubSubConnection.class)) { return client.connectPubSubAsync(codec) // .thenApply(connectionType::cast); diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnection.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnection.java index 28a114e024..f400bb133c 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnection.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnection.java @@ -23,6 +23,7 @@ import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; import io.lettuce.core.cluster.api.sync.RedisClusterCommands; import io.lettuce.core.cluster.models.partitions.Partitions; +import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection; import lombok.RequiredArgsConstructor; import java.time.Duration; diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnection.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnection.java index cc909a798c..130b584031 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnection.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnection.java @@ -1254,7 +1254,7 @@ private class LettucePoolConnectionProvider implements LettuceConnectionProvider private final LettucePool pool; - /* + /* * (non-Javadoc) * @see org.springframework.data.redis.connection.lettuce.LettuceConnectionProvider#getConnection(java.lang.Class) */ @@ -1263,7 +1263,7 @@ private class LettucePoolConnectionProvider implements LettuceConnectionProvider return connectionType.cast(pool.getResource()); } - /* + /* * (non-Javadoc) * @see org.springframework.data.redis.connection.lettuce.LettuceConnectionProvider#getConnectionAsync(java.lang.Class) */ @@ -1272,7 +1272,7 @@ private class LettucePoolConnectionProvider implements LettuceConnectionProvider throw new UnsupportedOperationException("Async operations not supported!"); } - /* + /* * (non-Javadoc) * @see org.springframework.data.redis.connection.lettuce.LettuceConnectionProvider#release(io.lettuce.core.api.StatefulConnection) */ diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceMessageListener.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceMessageListener.java index de6c7705d6..2772b82317 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceMessageListener.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceMessageListener.java @@ -35,7 +35,7 @@ class LettuceMessageListener implements RedisPubSubListener { this.listener = listener; } - /* + /* * (non-Javadoc) * @see io.lettuce.core.pubsub.RedisPubSubListener#message(java.lang.Object, java.lang.Object) */ @@ -43,7 +43,7 @@ public void message(byte[] channel, byte[] message) { listener.onMessage(new DefaultMessage(channel, message), null); } - /* + /* * (non-Javadoc) * @see io.lettuce.core.pubsub.RedisPubSubListener#message(java.lang.Object, java.lang.Object, java.lang.Object) */ @@ -51,25 +51,25 @@ public void message(byte[] pattern, byte[] channel, byte[] message) { listener.onMessage(new DefaultMessage(channel, message), pattern); } - /* + /* * (non-Javadoc) * @see io.lettuce.core.pubsub.RedisPubSubListener#subscribed(java.lang.Object, long) */ public void subscribed(byte[] channel, long count) {} - /* + /* * (non-Javadoc) * @see io.lettuce.core.pubsub.RedisPubSubListener#psubscribed(java.lang.Object, long) */ public void psubscribed(byte[] pattern, long count) {} - /* + /* * (non-Javadoc) * @see io.lettuce.core.pubsub.RedisPubSubListener#unsubscribed(java.lang.Object, long) */ public void unsubscribed(byte[] channel, long count) {} - /* + /* * (non-Javadoc) * @see io.lettuce.core.pubsub.RedisPubSubListener#punsubscribed(java.lang.Object, long) */ From 5ea0772c763fb4b73e5bd788eef390ed89430c7d Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Mon, 17 Jun 2019 14:59:51 +0200 Subject: [PATCH 3/4] DATAREDIS-976 - Add Pub/Sub tests for Redis Cluster. Original pull request: #450. --- .../lettuce/LettuceClusterConnection.java | 5 +- .../data/redis/listener/PubSubTestParams.java | 53 +++++++++++++++++-- .../data/redis/listener/PubSubTests.java | 29 ++++++---- 3 files changed, 70 insertions(+), 17 deletions(-) diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnection.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnection.java index f400bb133c..f6e5e19f3f 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnection.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnection.java @@ -22,8 +22,6 @@ import io.lettuce.core.cluster.SlotHash; import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; import io.lettuce.core.cluster.api.sync.RedisClusterCommands; -import io.lettuce.core.cluster.models.partitions.Partitions; -import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection; import lombok.RequiredArgsConstructor; import java.time.Duration; @@ -55,6 +53,9 @@ import org.springframework.util.ObjectUtils; /** + * {@code RedisClusterConnection} implementation on top of Lettuce + * Redis client. + * * @author Christoph Strobl * @author Mark Paluch * @since 1.7 diff --git a/src/test/java/org/springframework/data/redis/listener/PubSubTestParams.java b/src/test/java/org/springframework/data/redis/listener/PubSubTestParams.java index 6c9757ece8..8ccbd569b5 100644 --- a/src/test/java/org/springframework/data/redis/listener/PubSubTestParams.java +++ b/src/test/java/org/springframework/data/redis/listener/PubSubTestParams.java @@ -15,20 +15,24 @@ */ package org.springframework.data.redis.listener; -import java.util.Arrays; +import java.util.ArrayList; import java.util.Collection; +import org.junit.runners.model.Statement; + import org.springframework.data.redis.ObjectFactory; import org.springframework.data.redis.Person; import org.springframework.data.redis.PersonObjectFactory; import org.springframework.data.redis.RawObjectFactory; import org.springframework.data.redis.SettingsUtils; import org.springframework.data.redis.StringObjectFactory; +import org.springframework.data.redis.connection.RedisClusterConfiguration; import org.springframework.data.redis.connection.jedis.JedisConnectionFactory; import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory; import org.springframework.data.redis.connection.lettuce.LettuceTestClientResources; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.data.redis.test.util.RedisClusterRule; /** * @author Costin Leau @@ -76,8 +80,49 @@ public static Collection testParams() { rawTemplateLtc.setConnectionFactory(lettuceConnFactory); rawTemplateLtc.afterPropertiesSet(); - return Arrays.asList(new Object[][] { { stringFactory, stringTemplate }, { personFactory, personTemplate }, - { rawFactory, rawTemplate }, { stringFactory, stringTemplateLtc }, { personFactory, personTemplateLtc }, - { rawFactory, rawTemplateLtc } }); + Collection parameters = new ArrayList<>(); + parameters.add(new Object[] { stringFactory, stringTemplate }); + parameters.add(new Object[] { personFactory, personTemplate }); + parameters.add(new Object[] { stringFactory, stringTemplateLtc }); + parameters.add(new Object[] { personFactory, personTemplateLtc }); + parameters.add(new Object[] { rawFactory, rawTemplateLtc }); + + if (clusterAvailable()) { + + RedisClusterConfiguration configuration = new RedisClusterConfiguration().clusterNode("127.0.0.1", 7379); + + // add Jedis + JedisConnectionFactory jedisClusterFactory = new JedisConnectionFactory(configuration); + jedisClusterFactory.afterPropertiesSet(); + + RedisTemplate jedisClusterStringTemplate = new StringRedisTemplate(jedisClusterFactory); + + // add Lettuce + LettuceConnectionFactory lettuceClusterFactory = new LettuceConnectionFactory(configuration); + lettuceClusterFactory.setClientResources(LettuceTestClientResources.getSharedClientResources()); + lettuceClusterFactory.afterPropertiesSet(); + + RedisTemplate lettuceClusterStringTemplate = new StringRedisTemplate(lettuceClusterFactory); + + parameters.add(new Object[] { stringFactory, jedisClusterStringTemplate }); + parameters.add(new Object[] { stringFactory, lettuceClusterStringTemplate }); + } + + return parameters; + } + + private static boolean clusterAvailable() { + + try { + new RedisClusterRule().apply(new Statement() { + @Override + public void evaluate() { + + } + }, null).evaluate(); + } catch (Throwable throwable) { + return false; + } + return true; } } diff --git a/src/test/java/org/springframework/data/redis/listener/PubSubTests.java b/src/test/java/org/springframework/data/redis/listener/PubSubTests.java index c2d86c6ed7..e0a3d56027 100644 --- a/src/test/java/org/springframework/data/redis/listener/PubSubTests.java +++ b/src/test/java/org/springframework/data/redis/listener/PubSubTests.java @@ -30,32 +30,31 @@ import org.junit.After; import org.junit.AfterClass; import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; + import org.springframework.core.task.SimpleAsyncTaskExecutor; import org.springframework.core.task.SyncTaskExecutor; import org.springframework.data.redis.ConnectionFactoryTracker; import org.springframework.data.redis.ObjectFactory; -import org.springframework.data.redis.RedisTestProfileValueSource; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.connection.jedis.JedisConnectionFactory; +import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; -import org.springframework.data.redis.test.util.RedisSentinelRule; /** * Base test class for PubSub integration tests * * @author Costin Leau * @author Jennifer Hickey + * @author Mark Paluch */ @RunWith(Parameterized.class) public class PubSubTests { - public @Rule RedisSentinelRule sentinelRule = RedisSentinelRule.withDefaultConfig().sentinelsDisabled(); - private static final String CHANNEL = "pubsub::test"; protected RedisMessageListenerContainer container; @@ -73,11 +72,6 @@ public void handleMessage(Object message) { private final MessageListenerAdapter adapter = new MessageListenerAdapter(handler); - @BeforeClass - public static void shouldRun() { - assumeTrue(RedisTestProfileValueSource.matches("runLongTests", "true")); - } - @Before public void setUp() throws Exception { bag.clear(); @@ -178,6 +172,9 @@ public void testStartNoListeners() { @SuppressWarnings("unchecked") @Test // DATAREDIS-251 public void testStartListenersToNoSpecificChannelTest() throws InterruptedException { + + assumeTrue(isClusterAware(template.getConnectionFactory())); + container.removeMessageListener(adapter, new ChannelTopic(CHANNEL)); container.addMessageListener(adapter, Arrays.asList(new PatternTopic("*"))); container.start(); @@ -193,4 +190,14 @@ public void testStartListenersToNoSpecificChannelTest() throws InterruptedExcept assertThat(set, hasItems(payload)); } + + private static boolean isClusterAware(RedisConnectionFactory connectionFactory) { + + if (connectionFactory instanceof LettuceConnectionFactory) { + return ((LettuceConnectionFactory) connectionFactory).isClusterAware(); + } else if (connectionFactory instanceof JedisConnectionFactory) { + return ((JedisConnectionFactory) connectionFactory).isRedisClusterAware(); + } + return false; + } } From cfc22593b02a4745d2cfd6446d93c793d9f31477 Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Mon, 17 Jun 2019 14:51:33 +0200 Subject: [PATCH 4/4] DATAREDIS-976 - Allow extension of Lettuce Connection and Subscription classes. LettuceConnection, LettuceClusterConnection, and LettuceSubscription can now be properly subclassed so they can be extended and created by LettuceConnectionFactory. LettuceConnectionFactory provides template methods doCreateLettuceConnection and doCreateLettuceClusterConnection. Original pull request: #450. --- .../lettuce/LettuceClusterConnection.java | 35 ++-- .../connection/lettuce/LettuceConnection.java | 18 +- .../lettuce/LettuceConnectionFactory.java | 68 ++++-- .../lettuce/LettuceSubscription.java | 28 ++- .../LettuceClusterConnectionUnitTests.java | 6 +- ...tuceClusterKeyspaceNotificationsTests.java | 194 ++++++++++++++++++ .../redis/test/util/RedisClusterRule.java | 5 + 7 files changed, 305 insertions(+), 49 deletions(-) create mode 100644 src/test/java/org/springframework/data/redis/connection/lettuce/LettuceClusterKeyspaceNotificationsTests.java diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnection.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnection.java index f6e5e19f3f..bec2054973 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnection.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnection.java @@ -34,6 +34,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; + import org.springframework.beans.factory.DisposableBean; import org.springframework.dao.DataAccessException; import org.springframework.dao.DataAccessResourceFailureException; @@ -66,7 +67,6 @@ public class LettuceClusterConnection extends LettuceConnection implements Defau new LettuceExceptionConverter()); private final Log log = LogFactory.getLog(getClass()); - private final RedisClusterClient clusterClient; private ClusterCommandExecutor clusterCommandExecutor; private ClusterTopologyProvider topologyProvider; @@ -121,8 +121,7 @@ public LettuceClusterConnection(LettuceConnectionProvider connectionProvider) { Assert.isTrue(connectionProvider instanceof ClusterConnectionProvider, "LettuceConnectionProvider must be a ClusterConnectionProvider."); - this.clusterClient = getClient(); - this.topologyProvider = new LettuceClusterTopologyProvider(this.clusterClient); + this.topologyProvider = new LettuceClusterTopologyProvider(getClient()); this.clusterCommandExecutor = new ClusterCommandExecutor(this.topologyProvider, new LettuceClusterNodeResourceProvider(getConnectionProvider()), exceptionConverter); this.disposeClusterCommandExecutorOnClose = true; @@ -158,8 +157,7 @@ public LettuceClusterConnection(LettuceConnectionProvider connectionProvider, Cl Assert.isTrue(connectionProvider instanceof ClusterConnectionProvider, "LettuceConnectionProvider must be a ClusterConnectionProvider."); - this.clusterClient = getClient(); - this.topologyProvider = new LettuceClusterTopologyProvider(this.clusterClient); + this.topologyProvider = new LettuceClusterTopologyProvider(getClient()); this.clusterCommandExecutor = executor; this.disposeClusterCommandExecutorOnClose = false; } @@ -170,22 +168,20 @@ public LettuceClusterConnection(LettuceConnectionProvider connectionProvider, Cl * * @param sharedConnection may be {@literal null} if no shared connection used. * @param connectionProvider must not be {@literal null}. - * @param clusterClient must not be {@literal null}. + * @param clusterTopologyProvider must not be {@literal null}. * @param executor must not be {@literal null}. * @param timeout must not be {@literal null}. * @since 2.1 */ - LettuceClusterConnection(@Nullable StatefulRedisClusterConnection sharedConnection, - LettuceConnectionProvider connectionProvider, RedisClusterClient clusterClient, ClusterCommandExecutor executor, - Duration timeout) { + protected LettuceClusterConnection(@Nullable StatefulRedisClusterConnection sharedConnection, + LettuceConnectionProvider connectionProvider, ClusterTopologyProvider clusterTopologyProvider, + ClusterCommandExecutor executor, Duration timeout) { super(sharedConnection, connectionProvider, timeout.toMillis(), 0); Assert.notNull(executor, "ClusterCommandExecutor must not be null."); - Assert.notNull(clusterClient, "RedisClusterClient must not be null."); - this.clusterClient = clusterClient; - this.topologyProvider = new LettuceClusterTopologyProvider(clusterClient); + this.topologyProvider = clusterTopologyProvider; this.clusterCommandExecutor = executor; this.disposeClusterCommandExecutorOnClose = false; } @@ -205,13 +201,6 @@ private RedisClusterClient getClient() { connectionProvider.getClass().getName())); } - /** - * @return access to {@link RedisClusterClient} for non-connection access. - */ - private Partitions getPartitions() { - return clusterClient.getPartitions(); - } - /* * (non-Javadoc) * @see org.springframework.data.redis.connection.lettuce.LettuceConnection#geoCommands() @@ -330,7 +319,11 @@ public Integer clusterGetSlotForKey(byte[] key) { @Override public RedisClusterNode clusterGetNodeForSlot(int slot) { - return LettuceConverters.toRedisClusterNode(getPartitions().getPartitionBySlot(slot)); + Set nodes = topologyProvider.getTopology().getSlotServingNodes(slot); + if (nodes.isEmpty()) { + return null; + } + return nodes.iterator().next(); } /* @@ -574,7 +567,7 @@ public void select(int dbIndex) { */ @Override public List clusterGetNodes() { - return LettuceConverters.partitionsToClusterNodes(getPartitions()); + return new ArrayList<>(topologyProvider.getTopology().getNodes()); } /* diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnection.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnection.java index 130b584031..1eae30bb07 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnection.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnection.java @@ -879,7 +879,21 @@ protected StatefulRedisPubSubConnection switchToPubSub() { } private LettuceSubscription initSubscription(MessageListener listener) { - return new LettuceSubscription(listener, switchToPubSub(), connectionProvider); + return doCreateSubscription(listener, switchToPubSub(), connectionProvider); + } + + /** + * Customization hook to create a {@link LettuceSubscription}. + * + * @param listener the {@link MessageListener} to notify. + * @param connection Pub/Sub connection. + * @param connectionProvider the {@link LettuceConnectionProvider} for connection release. + * @return a {@link LettuceSubscription}. + * @since 2.2 + */ + protected LettuceSubscription doCreateSubscription(MessageListener listener, + StatefulRedisPubSubConnection connection, LettuceConnectionProvider connectionProvider) { + return new LettuceSubscription(listener, connection, connectionProvider); } void pipeline(LettuceResult result) { @@ -1250,7 +1264,7 @@ public CommandOutput getTypeHint(CommandType type, CommandOutput defaultType) { } @RequiredArgsConstructor - private class LettucePoolConnectionProvider implements LettuceConnectionProvider { + static class LettucePoolConnectionProvider implements LettuceConnectionProvider { private final LettucePool pool; diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactory.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactory.java index 44d70afe9e..446d206a46 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactory.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactory.java @@ -15,6 +15,8 @@ */ package org.springframework.data.redis.connection.lettuce; +import static org.springframework.data.redis.connection.lettuce.LettuceConnection.*; + import io.lettuce.core.AbstractRedisClient; import io.lettuce.core.ClientOptions; import io.lettuce.core.ReadFrom; @@ -40,6 +42,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; + import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; import org.springframework.dao.DataAccessException; @@ -273,7 +276,7 @@ public void afterPropertiesSet() { this.client = createClient(); - this.connectionProvider = createConnectionProvider(client, LettuceConnection.CODEC); + this.connectionProvider = createConnectionProvider(client, CODEC); this.reactiveConnectionProvider = createConnectionProvider(client, LettuceReactiveRedisConnection.CODEC); if (isClusterAware()) { @@ -341,13 +344,7 @@ public RedisConnection getConnection() { } LettuceConnection connection; - - if (pool != null) { - connection = new LettuceConnection(getSharedConnection(), getTimeout(), null, pool, getDatabase()); - } else { - connection = new LettuceConnection(getSharedConnection(), connectionProvider, getTimeout(), getDatabase()); - } - + connection = doCreateLettuceConnection(getSharedConnection(), connectionProvider, getTimeout(), getDatabase()); connection.setConvertPipelineAndTxResults(convertPipelineAndTxResults); return connection; } @@ -365,12 +362,51 @@ public RedisClusterConnection getClusterConnection() { RedisClusterClient clusterClient = (RedisClusterClient) client; - return getShareNativeConnection() - ? new LettuceClusterConnection( - (StatefulRedisClusterConnection) getOrCreateSharedConnection().getConnection(), - connectionProvider, clusterClient, clusterCommandExecutor, clientConfiguration.getCommandTimeout()) - : new LettuceClusterConnection(null, connectionProvider, clusterClient, clusterCommandExecutor, - clientConfiguration.getCommandTimeout()); + StatefulRedisClusterConnection sharedConnection = getShareNativeConnection() + ? (StatefulRedisClusterConnection) getOrCreateSharedConnection().getConnection() + : null; + + LettuceClusterTopologyProvider topologyProvider = new LettuceClusterTopologyProvider(clusterClient); + return doCreateLettuceClusterConnection(sharedConnection, connectionProvider, topologyProvider, + clusterCommandExecutor, clientConfiguration.getCommandTimeout()); + } + + /** + * Customization hook for {@link LettuceConnection} creation. + * + * @param sharedConnection the shared {@link StatefulRedisConnection} if {@link #getShareNativeConnection()} is + * {@literal true}; {@literal null} otherwise. + * @param connectionProvider the {@link LettuceConnectionProvider} to release connections. + * @param timeout command timeout in {@link TimeUnit#MILLISECONDS}. + * @param database database index to operate on. + * @return the {@link LettuceConnection}. + * @since 2.2 + */ + protected LettuceConnection doCreateLettuceConnection(StatefulRedisConnection sharedConnection, + LettuceConnectionProvider connectionProvider, long timeout, int database) { + + return new LettuceConnection(sharedConnection, connectionProvider, timeout, database); + } + + /** + * Customization hook for {@link LettuceClusterConnection} creation. + * + * @param sharedConnection the shared {@link StatefulRedisConnection} if {@link #getShareNativeConnection()} is + * {@literal true}; {@literal null} otherwise. + * @param connectionProvider the {@link LettuceConnectionProvider} to release connections. + * @param topologyProvider the {@link ClusterTopologyProvider}. + * @param clusterCommandExecutor the {@link ClusterCommandExecutor} to release connections. + * @param commandTimeout command timeout {@link Duration}. + * @return the {@link LettuceConnection}. + * @since 2.2 + */ + protected LettuceClusterConnection doCreateLettuceClusterConnection( + StatefulRedisClusterConnection sharedConnection, LettuceConnectionProvider connectionProvider, + ClusterTopologyProvider topologyProvider, ClusterCommandExecutor clusterCommandExecutor, + Duration commandTimeout) { + + return new LettuceClusterConnection(sharedConnection, connectionProvider, topologyProvider, clusterCommandExecutor, + commandTimeout); } /* @@ -909,6 +945,10 @@ protected StatefulConnection getSharedReactiveConnection private LettuceConnectionProvider createConnectionProvider(AbstractRedisClient client, RedisCodec codec) { + if (this.pool != null) { + return new LettucePoolConnectionProvider(this.pool); + } + LettuceConnectionProvider connectionProvider = doCreateConnectionProvider(client, codec); if (this.clientConfiguration instanceof LettucePoolingClientConfiguration) { diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceSubscription.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceSubscription.java index c9b785953e..c0a0c9e8ff 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceSubscription.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceSubscription.java @@ -28,15 +28,23 @@ * @author Mark Paluch * @author Christoph Strobl */ -class LettuceSubscription extends AbstractSubscription { +public class LettuceSubscription extends AbstractSubscription { private final StatefulRedisPubSubConnection connection; private final LettuceMessageListener listener; private final LettuceConnectionProvider connectionProvider; private final RedisPubSubCommands pubsub; - LettuceSubscription(MessageListener listener, StatefulRedisPubSubConnection pubsubConnection, - LettuceConnectionProvider connectionProvider) { + /** + * Creates a new {@link LettuceSubscription} given {@link MessageListener}, {@link StatefulRedisPubSubConnection}, and + * {@link LettuceConnectionProvider}. + * + * @param listener the listener to notify, must not be {@literal null}. + * @param pubsubConnection must not be {@literal null}. + * @param connectionProvider must not be {@literal null}. + */ + protected LettuceSubscription(MessageListener listener, + StatefulRedisPubSubConnection pubsubConnection, LettuceConnectionProvider connectionProvider) { super(listener); @@ -52,25 +60,25 @@ protected StatefulRedisPubSubConnection getNativeConnection() { return connection; } - /* + /* * (non-Javadoc) * @see org.springframework.data.redis.connection.util.AbstractSubscription#doClose() */ protected void doClose() { if (!getChannels().isEmpty()) { - pubsub.unsubscribe(new byte[0]); + doUnsubscribe(true, new byte[0]); } if (!getPatterns().isEmpty()) { - pubsub.punsubscribe(new byte[0]); + doPUnsubscribe(true, new byte[0]); } connection.removeListener(this.listener); connectionProvider.release(connection); } - /* + /* * (non-Javadoc) * @see org.springframework.data.redis.connection.util.AbstractSubscription#doPsubscribe(byte[][]) */ @@ -78,7 +86,7 @@ protected void doPsubscribe(byte[]... patterns) { pubsub.psubscribe(patterns); } - /* + /* * (non-Javadoc) * @see org.springframework.data.redis.connection.util.AbstractSubscription#doPUnsubscribe(boolean, byte[][]) */ @@ -88,7 +96,7 @@ protected void doPUnsubscribe(boolean all, byte[]... patterns) { pubsub.punsubscribe(patterns); } - /* + /* * (non-Javadoc) * @see org.springframework.data.redis.connection.util.AbstractSubscription#doSubscribe(byte[][]) */ @@ -96,7 +104,7 @@ protected void doSubscribe(byte[]... channels) { pubsub.subscribe(channels); } - /* + /* * (non-Javadoc) * @see org.springframework.data.redis.connection.util.AbstractSubscription#doUnsubscribe(boolean, byte[][]) */ diff --git a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnectionUnitTests.java b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnectionUnitTests.java index 8e14db4ed2..94770850cb 100644 --- a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnectionUnitTests.java +++ b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnectionUnitTests.java @@ -48,6 +48,7 @@ import org.mockito.junit.MockitoJUnitRunner; import org.springframework.data.redis.connection.ClusterCommandExecutor; import org.springframework.data.redis.connection.ClusterNodeResourceProvider; +import org.springframework.data.redis.connection.ClusterTopologyProvider; import org.springframework.data.redis.connection.RedisClusterCommands.AddSlots; import org.springframework.data.redis.connection.RedisClusterNode; @@ -67,6 +68,7 @@ public class LettuceClusterConnectionUnitTests { static final byte[] KEY_3_BYTES = KEY_3.getBytes(); @Mock RedisClusterClient clusterMock; + @Mock ClusterTopologyProvider topologyProviderMock; @Mock LettuceConnectionProvider connectionProviderMock; @Mock ClusterCommandExecutor executorMock; @@ -363,7 +365,7 @@ public void shouldExecuteOnSharedConnection() { when(sharedConnectionMock.sync()).thenReturn(sync); LettuceClusterConnection connection = new LettuceClusterConnection(sharedConnectionMock, connectionProviderMock, - clusterMock, executorMock, Duration.ZERO); + topologyProviderMock, executorMock, Duration.ZERO); connection.keyCommands().del(KEY_1_BYTES); @@ -381,7 +383,7 @@ public void shouldExecuteOnDedicatedConnection() { when(dedicatedConnection.sync()).thenReturn(sync); LettuceClusterConnection connection = new LettuceClusterConnection(sharedConnectionMock, connectionProviderMock, - clusterMock, executorMock, Duration.ZERO); + topologyProviderMock, executorMock, Duration.ZERO); connection.listCommands().bLPop(1, KEY_1_BYTES); diff --git a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceClusterKeyspaceNotificationsTests.java b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceClusterKeyspaceNotificationsTests.java new file mode 100644 index 0000000000..32b79fafa4 --- /dev/null +++ b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceClusterKeyspaceNotificationsTests.java @@ -0,0 +1,194 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.redis.connection.lettuce; + +import static org.assertj.core.api.Assertions.*; + +import io.lettuce.core.RedisClient; +import io.lettuce.core.RedisURI; +import io.lettuce.core.SetArgs; +import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.api.sync.RedisCommands; +import io.lettuce.core.cluster.SlotHash; +import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; +import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection; +import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; + +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; + +import org.springframework.data.redis.ConnectionFactoryTracker; +import org.springframework.data.redis.connection.ClusterCommandExecutor; +import org.springframework.data.redis.connection.ClusterTopologyProvider; +import org.springframework.data.redis.connection.MessageListener; +import org.springframework.data.redis.connection.RedisClusterConnection; +import org.springframework.data.redis.connection.RedisConfiguration; +import org.springframework.data.redis.test.util.RedisClusterRule; +import org.springframework.lang.Nullable; + +/** + * Integration tests to listen for keyspace notifications. + * + * @author Mark Paluch + */ +public class LettuceClusterKeyspaceNotificationsTests { + + @ClassRule public static RedisClusterRule clusterRule = new RedisClusterRule(); + + CustomLettuceConnectionFactory factory; + String keyspaceConfig; + + // maps to 127.0.0.1:7381/slot hash 13477 + String key = "10923"; + + @Before + public void before() { + + factory = new CustomLettuceConnectionFactory(clusterRule.getConfiguration()); + factory.setClientResources(LettuceTestClientResources.getSharedClientResources()); + ConnectionFactoryTracker.add(factory); + factory.afterPropertiesSet(); + + // enable keyspace events on a specific node. + withConnection("127.0.0.1", 7381, commands -> { + + keyspaceConfig = commands.configGet("*").get("notify-keyspace-events"); + commands.configSet("notify-keyspace-events", "KEx"); + }); + + assertThat(SlotHash.getSlot(key)).isEqualTo(13477); + } + + @After + public void tearDown() { + + // Restore previous settings. + withConnection("127.0.0.1", 7381, commands -> { + commands.configSet("notify-keyspace-events", keyspaceConfig); + }); + } + + @Test // DATAREDIS-976 + public void shouldListenForKeyspaceNotifications() throws Exception { + + CompletableFuture expiry = new CompletableFuture<>(); + + RedisClusterConnection connection = factory.getClusterConnection(); + + connection.pSubscribe((message, pattern) -> { + expiry.complete(new String(message.getBody()) + ":" + new String(message.getChannel())); + }, "__keyspace*@*".getBytes()); + + withConnection("127.0.0.1", 7381, commands -> { + commands.set(key, "foo", SetArgs.Builder.px(1)); + }); + + assertThat(expiry.get(2, TimeUnit.SECONDS)).isEqualTo("expired:__keyspace@0__:10923"); + + connection.getSubscription().close(); + connection.close(); + } + + private void withConnection(String hostname, int port, Consumer> commandsConsumer) { + + RedisClient client = RedisClient.create(LettuceTestClientResources.getSharedClientResources(), + RedisURI.create(hostname, port)); + + StatefulRedisConnection connection = client.connect(); + commandsConsumer.accept(connection.sync()); + + connection.close(); + client.shutdownAsync(); + } + + static class CustomLettuceConnectionFactory extends LettuceConnectionFactory { + + CustomLettuceConnectionFactory(RedisConfiguration redisConfiguration) { + super(redisConfiguration); + } + + @Override + protected LettuceClusterConnection doCreateLettuceClusterConnection( + StatefulRedisClusterConnection sharedConnection, LettuceConnectionProvider connectionProvider, + ClusterTopologyProvider topologyProvider, ClusterCommandExecutor clusterCommandExecutor, + Duration commandTimeout) { + return new CustomLettuceClusterConnection(sharedConnection, connectionProvider, topologyProvider, + clusterCommandExecutor, commandTimeout); + } + } + + static class CustomLettuceClusterConnection extends LettuceClusterConnection { + + CustomLettuceClusterConnection(@Nullable StatefulRedisClusterConnection sharedConnection, + LettuceConnectionProvider connectionProvider, ClusterTopologyProvider clusterTopologyProvider, + ClusterCommandExecutor executor, Duration timeout) { + super(sharedConnection, connectionProvider, clusterTopologyProvider, executor, timeout); + } + + @Override + protected LettuceSubscription doCreateSubscription(MessageListener listener, + StatefulRedisPubSubConnection connection, LettuceConnectionProvider connectionProvider) { + return new CustomLettuceSubscription(listener, (StatefulRedisClusterPubSubConnection) connection, + connectionProvider); + } + } + + /** + * Customized {@link LettuceSubscription}. Enables + * {@link StatefulRedisClusterPubSubConnection#setNodeMessagePropagation(boolean)} and uses + * {@link io.lettuce.core.cluster.api.sync.NodeSelection} to subscribe to all master nodes. + */ + static class CustomLettuceSubscription extends LettuceSubscription { + + private final StatefulRedisClusterPubSubConnection connection; + + CustomLettuceSubscription(MessageListener listener, StatefulRedisClusterPubSubConnection connection, + LettuceConnectionProvider connectionProvider) { + super(listener, connection, connectionProvider); + this.connection = connection; + + // Must be enabled for keyspace notification propagation + this.connection.setNodeMessagePropagation(true); + } + + @Override + protected void doPsubscribe(byte[]... patterns) { + connection.sync().all().commands().psubscribe(patterns); + } + + @Override + protected void doPUnsubscribe(boolean all, byte[]... patterns) { + connection.sync().all().commands().punsubscribe(); + } + + @Override + protected void doSubscribe(byte[]... channels) { + connection.sync().all().commands().subscribe(channels); + } + + @Override + protected void doUnsubscribe(boolean all, byte[]... channels) { + connection.sync().all().commands().unsubscribe(); + } + } +} diff --git a/src/test/java/org/springframework/data/redis/test/util/RedisClusterRule.java b/src/test/java/org/springframework/data/redis/test/util/RedisClusterRule.java index 5fd9a0416f..6c85f60abd 100644 --- a/src/test/java/org/springframework/data/redis/test/util/RedisClusterRule.java +++ b/src/test/java/org/springframework/data/redis/test/util/RedisClusterRule.java @@ -23,6 +23,7 @@ import org.junit.rules.ExternalResource; import org.junit.rules.TestRule; import org.springframework.data.redis.connection.RedisClusterConfiguration; +import org.springframework.data.redis.connection.RedisConfiguration; import org.springframework.data.redis.connection.RedisNode; import org.springframework.data.redis.connection.jedis.JedisConverters; @@ -63,6 +64,10 @@ public void before() { Assume.assumeThat(mode, is("cluster")); } + public RedisConfiguration getConfiguration() { + return this.clusterConfig; + } + private void init() { if (clusterConfig == null) {