diff --git a/pom.xml b/pom.xml index 9d60cefa32..20d7780592 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ org.springframework.data spring-data-redis - 2.2.0.BUILD-SNAPSHOT + 2.2.0.DATAREDIS-976-SNAPSHOT Spring Data Redis diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/ClusterConnectionProvider.java b/src/main/java/org/springframework/data/redis/connection/lettuce/ClusterConnectionProvider.java index 902b2c891b..3ebd799a2c 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/ClusterConnectionProvider.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/ClusterConnectionProvider.java @@ -19,6 +19,7 @@ import io.lettuce.core.api.StatefulConnection; import io.lettuce.core.cluster.RedisClusterClient; import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; +import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection; import io.lettuce.core.codec.RedisCodec; import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; @@ -33,6 +34,7 @@ * * @author Mark Paluch * @author Christoph Strobl + * @author Bruce Cloud * @since 2.0 */ class ClusterConnectionProvider implements LettuceConnectionProvider, RedisClientProvider { @@ -93,7 +95,8 @@ class ClusterConnectionProvider implements LettuceConnectionProvider, RedisClien } } - if (connectionType.equals(StatefulRedisPubSubConnection.class)) { + if (connectionType.equals(StatefulRedisPubSubConnection.class) + || connectionType.equals(StatefulRedisClusterPubSubConnection.class)) { return client.connectPubSubAsync(codec) // .thenApply(connectionType::cast); diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnection.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnection.java index 28a114e024..bec2054973 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnection.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnection.java @@ -22,7 +22,6 @@ import io.lettuce.core.cluster.SlotHash; import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; import io.lettuce.core.cluster.api.sync.RedisClusterCommands; -import io.lettuce.core.cluster.models.partitions.Partitions; import lombok.RequiredArgsConstructor; import java.time.Duration; @@ -35,6 +34,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; + import org.springframework.beans.factory.DisposableBean; import org.springframework.dao.DataAccessException; import org.springframework.dao.DataAccessResourceFailureException; @@ -54,6 +54,9 @@ import org.springframework.util.ObjectUtils; /** + * {@code RedisClusterConnection} implementation on top of Lettuce + * Redis client. + * * @author Christoph Strobl * @author Mark Paluch * @since 1.7 @@ -64,7 +67,6 @@ public class LettuceClusterConnection extends LettuceConnection implements Defau new LettuceExceptionConverter()); private final Log log = LogFactory.getLog(getClass()); - private final RedisClusterClient clusterClient; private ClusterCommandExecutor clusterCommandExecutor; private ClusterTopologyProvider topologyProvider; @@ -119,8 +121,7 @@ public LettuceClusterConnection(LettuceConnectionProvider connectionProvider) { Assert.isTrue(connectionProvider instanceof ClusterConnectionProvider, "LettuceConnectionProvider must be a ClusterConnectionProvider."); - this.clusterClient = getClient(); - this.topologyProvider = new LettuceClusterTopologyProvider(this.clusterClient); + this.topologyProvider = new LettuceClusterTopologyProvider(getClient()); this.clusterCommandExecutor = new ClusterCommandExecutor(this.topologyProvider, new LettuceClusterNodeResourceProvider(getConnectionProvider()), exceptionConverter); this.disposeClusterCommandExecutorOnClose = true; @@ -156,8 +157,7 @@ public LettuceClusterConnection(LettuceConnectionProvider connectionProvider, Cl Assert.isTrue(connectionProvider instanceof ClusterConnectionProvider, "LettuceConnectionProvider must be a ClusterConnectionProvider."); - this.clusterClient = getClient(); - this.topologyProvider = new LettuceClusterTopologyProvider(this.clusterClient); + this.topologyProvider = new LettuceClusterTopologyProvider(getClient()); this.clusterCommandExecutor = executor; this.disposeClusterCommandExecutorOnClose = false; } @@ -168,22 +168,20 @@ public LettuceClusterConnection(LettuceConnectionProvider connectionProvider, Cl * * @param sharedConnection may be {@literal null} if no shared connection used. * @param connectionProvider must not be {@literal null}. - * @param clusterClient must not be {@literal null}. + * @param clusterTopologyProvider must not be {@literal null}. * @param executor must not be {@literal null}. * @param timeout must not be {@literal null}. * @since 2.1 */ - LettuceClusterConnection(@Nullable StatefulRedisClusterConnection sharedConnection, - LettuceConnectionProvider connectionProvider, RedisClusterClient clusterClient, ClusterCommandExecutor executor, - Duration timeout) { + protected LettuceClusterConnection(@Nullable StatefulRedisClusterConnection sharedConnection, + LettuceConnectionProvider connectionProvider, ClusterTopologyProvider clusterTopologyProvider, + ClusterCommandExecutor executor, Duration timeout) { super(sharedConnection, connectionProvider, timeout.toMillis(), 0); Assert.notNull(executor, "ClusterCommandExecutor must not be null."); - Assert.notNull(clusterClient, "RedisClusterClient must not be null."); - this.clusterClient = clusterClient; - this.topologyProvider = new LettuceClusterTopologyProvider(clusterClient); + this.topologyProvider = clusterTopologyProvider; this.clusterCommandExecutor = executor; this.disposeClusterCommandExecutorOnClose = false; } @@ -203,13 +201,6 @@ private RedisClusterClient getClient() { connectionProvider.getClass().getName())); } - /** - * @return access to {@link RedisClusterClient} for non-connection access. - */ - private Partitions getPartitions() { - return clusterClient.getPartitions(); - } - /* * (non-Javadoc) * @see org.springframework.data.redis.connection.lettuce.LettuceConnection#geoCommands() @@ -328,7 +319,11 @@ public Integer clusterGetSlotForKey(byte[] key) { @Override public RedisClusterNode clusterGetNodeForSlot(int slot) { - return LettuceConverters.toRedisClusterNode(getPartitions().getPartitionBySlot(slot)); + Set nodes = topologyProvider.getTopology().getSlotServingNodes(slot); + if (nodes.isEmpty()) { + return null; + } + return nodes.iterator().next(); } /* @@ -572,7 +567,7 @@ public void select(int dbIndex) { */ @Override public List clusterGetNodes() { - return LettuceConverters.partitionsToClusterNodes(getPartitions()); + return new ArrayList<>(topologyProvider.getTopology().getNodes()); } /* diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnection.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnection.java index cc909a798c..1eae30bb07 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnection.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnection.java @@ -879,7 +879,21 @@ protected StatefulRedisPubSubConnection switchToPubSub() { } private LettuceSubscription initSubscription(MessageListener listener) { - return new LettuceSubscription(listener, switchToPubSub(), connectionProvider); + return doCreateSubscription(listener, switchToPubSub(), connectionProvider); + } + + /** + * Customization hook to create a {@link LettuceSubscription}. + * + * @param listener the {@link MessageListener} to notify. + * @param connection Pub/Sub connection. + * @param connectionProvider the {@link LettuceConnectionProvider} for connection release. + * @return a {@link LettuceSubscription}. + * @since 2.2 + */ + protected LettuceSubscription doCreateSubscription(MessageListener listener, + StatefulRedisPubSubConnection connection, LettuceConnectionProvider connectionProvider) { + return new LettuceSubscription(listener, connection, connectionProvider); } void pipeline(LettuceResult result) { @@ -1250,11 +1264,11 @@ public CommandOutput getTypeHint(CommandType type, CommandOutput defaultType) { } @RequiredArgsConstructor - private class LettucePoolConnectionProvider implements LettuceConnectionProvider { + static class LettucePoolConnectionProvider implements LettuceConnectionProvider { private final LettucePool pool; - /* + /* * (non-Javadoc) * @see org.springframework.data.redis.connection.lettuce.LettuceConnectionProvider#getConnection(java.lang.Class) */ @@ -1263,7 +1277,7 @@ private class LettucePoolConnectionProvider implements LettuceConnectionProvider return connectionType.cast(pool.getResource()); } - /* + /* * (non-Javadoc) * @see org.springframework.data.redis.connection.lettuce.LettuceConnectionProvider#getConnectionAsync(java.lang.Class) */ @@ -1272,7 +1286,7 @@ private class LettucePoolConnectionProvider implements LettuceConnectionProvider throw new UnsupportedOperationException("Async operations not supported!"); } - /* + /* * (non-Javadoc) * @see org.springframework.data.redis.connection.lettuce.LettuceConnectionProvider#release(io.lettuce.core.api.StatefulConnection) */ diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactory.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactory.java index 44d70afe9e..446d206a46 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactory.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactory.java @@ -15,6 +15,8 @@ */ package org.springframework.data.redis.connection.lettuce; +import static org.springframework.data.redis.connection.lettuce.LettuceConnection.*; + import io.lettuce.core.AbstractRedisClient; import io.lettuce.core.ClientOptions; import io.lettuce.core.ReadFrom; @@ -40,6 +42,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; + import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; import org.springframework.dao.DataAccessException; @@ -273,7 +276,7 @@ public void afterPropertiesSet() { this.client = createClient(); - this.connectionProvider = createConnectionProvider(client, LettuceConnection.CODEC); + this.connectionProvider = createConnectionProvider(client, CODEC); this.reactiveConnectionProvider = createConnectionProvider(client, LettuceReactiveRedisConnection.CODEC); if (isClusterAware()) { @@ -341,13 +344,7 @@ public RedisConnection getConnection() { } LettuceConnection connection; - - if (pool != null) { - connection = new LettuceConnection(getSharedConnection(), getTimeout(), null, pool, getDatabase()); - } else { - connection = new LettuceConnection(getSharedConnection(), connectionProvider, getTimeout(), getDatabase()); - } - + connection = doCreateLettuceConnection(getSharedConnection(), connectionProvider, getTimeout(), getDatabase()); connection.setConvertPipelineAndTxResults(convertPipelineAndTxResults); return connection; } @@ -365,12 +362,51 @@ public RedisClusterConnection getClusterConnection() { RedisClusterClient clusterClient = (RedisClusterClient) client; - return getShareNativeConnection() - ? new LettuceClusterConnection( - (StatefulRedisClusterConnection) getOrCreateSharedConnection().getConnection(), - connectionProvider, clusterClient, clusterCommandExecutor, clientConfiguration.getCommandTimeout()) - : new LettuceClusterConnection(null, connectionProvider, clusterClient, clusterCommandExecutor, - clientConfiguration.getCommandTimeout()); + StatefulRedisClusterConnection sharedConnection = getShareNativeConnection() + ? (StatefulRedisClusterConnection) getOrCreateSharedConnection().getConnection() + : null; + + LettuceClusterTopologyProvider topologyProvider = new LettuceClusterTopologyProvider(clusterClient); + return doCreateLettuceClusterConnection(sharedConnection, connectionProvider, topologyProvider, + clusterCommandExecutor, clientConfiguration.getCommandTimeout()); + } + + /** + * Customization hook for {@link LettuceConnection} creation. + * + * @param sharedConnection the shared {@link StatefulRedisConnection} if {@link #getShareNativeConnection()} is + * {@literal true}; {@literal null} otherwise. + * @param connectionProvider the {@link LettuceConnectionProvider} to release connections. + * @param timeout command timeout in {@link TimeUnit#MILLISECONDS}. + * @param database database index to operate on. + * @return the {@link LettuceConnection}. + * @since 2.2 + */ + protected LettuceConnection doCreateLettuceConnection(StatefulRedisConnection sharedConnection, + LettuceConnectionProvider connectionProvider, long timeout, int database) { + + return new LettuceConnection(sharedConnection, connectionProvider, timeout, database); + } + + /** + * Customization hook for {@link LettuceClusterConnection} creation. + * + * @param sharedConnection the shared {@link StatefulRedisConnection} if {@link #getShareNativeConnection()} is + * {@literal true}; {@literal null} otherwise. + * @param connectionProvider the {@link LettuceConnectionProvider} to release connections. + * @param topologyProvider the {@link ClusterTopologyProvider}. + * @param clusterCommandExecutor the {@link ClusterCommandExecutor} to release connections. + * @param commandTimeout command timeout {@link Duration}. + * @return the {@link LettuceConnection}. + * @since 2.2 + */ + protected LettuceClusterConnection doCreateLettuceClusterConnection( + StatefulRedisClusterConnection sharedConnection, LettuceConnectionProvider connectionProvider, + ClusterTopologyProvider topologyProvider, ClusterCommandExecutor clusterCommandExecutor, + Duration commandTimeout) { + + return new LettuceClusterConnection(sharedConnection, connectionProvider, topologyProvider, clusterCommandExecutor, + commandTimeout); } /* @@ -909,6 +945,10 @@ protected StatefulConnection getSharedReactiveConnection private LettuceConnectionProvider createConnectionProvider(AbstractRedisClient client, RedisCodec codec) { + if (this.pool != null) { + return new LettucePoolConnectionProvider(this.pool); + } + LettuceConnectionProvider connectionProvider = doCreateConnectionProvider(client, codec); if (this.clientConfiguration instanceof LettucePoolingClientConfiguration) { diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceMessageListener.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceMessageListener.java index de6c7705d6..2772b82317 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceMessageListener.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceMessageListener.java @@ -35,7 +35,7 @@ class LettuceMessageListener implements RedisPubSubListener { this.listener = listener; } - /* + /* * (non-Javadoc) * @see io.lettuce.core.pubsub.RedisPubSubListener#message(java.lang.Object, java.lang.Object) */ @@ -43,7 +43,7 @@ public void message(byte[] channel, byte[] message) { listener.onMessage(new DefaultMessage(channel, message), null); } - /* + /* * (non-Javadoc) * @see io.lettuce.core.pubsub.RedisPubSubListener#message(java.lang.Object, java.lang.Object, java.lang.Object) */ @@ -51,25 +51,25 @@ public void message(byte[] pattern, byte[] channel, byte[] message) { listener.onMessage(new DefaultMessage(channel, message), pattern); } - /* + /* * (non-Javadoc) * @see io.lettuce.core.pubsub.RedisPubSubListener#subscribed(java.lang.Object, long) */ public void subscribed(byte[] channel, long count) {} - /* + /* * (non-Javadoc) * @see io.lettuce.core.pubsub.RedisPubSubListener#psubscribed(java.lang.Object, long) */ public void psubscribed(byte[] pattern, long count) {} - /* + /* * (non-Javadoc) * @see io.lettuce.core.pubsub.RedisPubSubListener#unsubscribed(java.lang.Object, long) */ public void unsubscribed(byte[] channel, long count) {} - /* + /* * (non-Javadoc) * @see io.lettuce.core.pubsub.RedisPubSubListener#punsubscribed(java.lang.Object, long) */ diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceSubscription.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceSubscription.java index c9b785953e..c0a0c9e8ff 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceSubscription.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceSubscription.java @@ -28,15 +28,23 @@ * @author Mark Paluch * @author Christoph Strobl */ -class LettuceSubscription extends AbstractSubscription { +public class LettuceSubscription extends AbstractSubscription { private final StatefulRedisPubSubConnection connection; private final LettuceMessageListener listener; private final LettuceConnectionProvider connectionProvider; private final RedisPubSubCommands pubsub; - LettuceSubscription(MessageListener listener, StatefulRedisPubSubConnection pubsubConnection, - LettuceConnectionProvider connectionProvider) { + /** + * Creates a new {@link LettuceSubscription} given {@link MessageListener}, {@link StatefulRedisPubSubConnection}, and + * {@link LettuceConnectionProvider}. + * + * @param listener the listener to notify, must not be {@literal null}. + * @param pubsubConnection must not be {@literal null}. + * @param connectionProvider must not be {@literal null}. + */ + protected LettuceSubscription(MessageListener listener, + StatefulRedisPubSubConnection pubsubConnection, LettuceConnectionProvider connectionProvider) { super(listener); @@ -52,25 +60,25 @@ protected StatefulRedisPubSubConnection getNativeConnection() { return connection; } - /* + /* * (non-Javadoc) * @see org.springframework.data.redis.connection.util.AbstractSubscription#doClose() */ protected void doClose() { if (!getChannels().isEmpty()) { - pubsub.unsubscribe(new byte[0]); + doUnsubscribe(true, new byte[0]); } if (!getPatterns().isEmpty()) { - pubsub.punsubscribe(new byte[0]); + doPUnsubscribe(true, new byte[0]); } connection.removeListener(this.listener); connectionProvider.release(connection); } - /* + /* * (non-Javadoc) * @see org.springframework.data.redis.connection.util.AbstractSubscription#doPsubscribe(byte[][]) */ @@ -78,7 +86,7 @@ protected void doPsubscribe(byte[]... patterns) { pubsub.psubscribe(patterns); } - /* + /* * (non-Javadoc) * @see org.springframework.data.redis.connection.util.AbstractSubscription#doPUnsubscribe(boolean, byte[][]) */ @@ -88,7 +96,7 @@ protected void doPUnsubscribe(boolean all, byte[]... patterns) { pubsub.punsubscribe(patterns); } - /* + /* * (non-Javadoc) * @see org.springframework.data.redis.connection.util.AbstractSubscription#doSubscribe(byte[][]) */ @@ -96,7 +104,7 @@ protected void doSubscribe(byte[]... channels) { pubsub.subscribe(channels); } - /* + /* * (non-Javadoc) * @see org.springframework.data.redis.connection.util.AbstractSubscription#doUnsubscribe(boolean, byte[][]) */ diff --git a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnectionUnitTests.java b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnectionUnitTests.java index 8e14db4ed2..94770850cb 100644 --- a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnectionUnitTests.java +++ b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnectionUnitTests.java @@ -48,6 +48,7 @@ import org.mockito.junit.MockitoJUnitRunner; import org.springframework.data.redis.connection.ClusterCommandExecutor; import org.springframework.data.redis.connection.ClusterNodeResourceProvider; +import org.springframework.data.redis.connection.ClusterTopologyProvider; import org.springframework.data.redis.connection.RedisClusterCommands.AddSlots; import org.springframework.data.redis.connection.RedisClusterNode; @@ -67,6 +68,7 @@ public class LettuceClusterConnectionUnitTests { static final byte[] KEY_3_BYTES = KEY_3.getBytes(); @Mock RedisClusterClient clusterMock; + @Mock ClusterTopologyProvider topologyProviderMock; @Mock LettuceConnectionProvider connectionProviderMock; @Mock ClusterCommandExecutor executorMock; @@ -363,7 +365,7 @@ public void shouldExecuteOnSharedConnection() { when(sharedConnectionMock.sync()).thenReturn(sync); LettuceClusterConnection connection = new LettuceClusterConnection(sharedConnectionMock, connectionProviderMock, - clusterMock, executorMock, Duration.ZERO); + topologyProviderMock, executorMock, Duration.ZERO); connection.keyCommands().del(KEY_1_BYTES); @@ -381,7 +383,7 @@ public void shouldExecuteOnDedicatedConnection() { when(dedicatedConnection.sync()).thenReturn(sync); LettuceClusterConnection connection = new LettuceClusterConnection(sharedConnectionMock, connectionProviderMock, - clusterMock, executorMock, Duration.ZERO); + topologyProviderMock, executorMock, Duration.ZERO); connection.listCommands().bLPop(1, KEY_1_BYTES); diff --git a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceClusterKeyspaceNotificationsTests.java b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceClusterKeyspaceNotificationsTests.java new file mode 100644 index 0000000000..32b79fafa4 --- /dev/null +++ b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceClusterKeyspaceNotificationsTests.java @@ -0,0 +1,194 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.redis.connection.lettuce; + +import static org.assertj.core.api.Assertions.*; + +import io.lettuce.core.RedisClient; +import io.lettuce.core.RedisURI; +import io.lettuce.core.SetArgs; +import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.api.sync.RedisCommands; +import io.lettuce.core.cluster.SlotHash; +import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; +import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection; +import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; + +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; + +import org.springframework.data.redis.ConnectionFactoryTracker; +import org.springframework.data.redis.connection.ClusterCommandExecutor; +import org.springframework.data.redis.connection.ClusterTopologyProvider; +import org.springframework.data.redis.connection.MessageListener; +import org.springframework.data.redis.connection.RedisClusterConnection; +import org.springframework.data.redis.connection.RedisConfiguration; +import org.springframework.data.redis.test.util.RedisClusterRule; +import org.springframework.lang.Nullable; + +/** + * Integration tests to listen for keyspace notifications. + * + * @author Mark Paluch + */ +public class LettuceClusterKeyspaceNotificationsTests { + + @ClassRule public static RedisClusterRule clusterRule = new RedisClusterRule(); + + CustomLettuceConnectionFactory factory; + String keyspaceConfig; + + // maps to 127.0.0.1:7381/slot hash 13477 + String key = "10923"; + + @Before + public void before() { + + factory = new CustomLettuceConnectionFactory(clusterRule.getConfiguration()); + factory.setClientResources(LettuceTestClientResources.getSharedClientResources()); + ConnectionFactoryTracker.add(factory); + factory.afterPropertiesSet(); + + // enable keyspace events on a specific node. + withConnection("127.0.0.1", 7381, commands -> { + + keyspaceConfig = commands.configGet("*").get("notify-keyspace-events"); + commands.configSet("notify-keyspace-events", "KEx"); + }); + + assertThat(SlotHash.getSlot(key)).isEqualTo(13477); + } + + @After + public void tearDown() { + + // Restore previous settings. + withConnection("127.0.0.1", 7381, commands -> { + commands.configSet("notify-keyspace-events", keyspaceConfig); + }); + } + + @Test // DATAREDIS-976 + public void shouldListenForKeyspaceNotifications() throws Exception { + + CompletableFuture expiry = new CompletableFuture<>(); + + RedisClusterConnection connection = factory.getClusterConnection(); + + connection.pSubscribe((message, pattern) -> { + expiry.complete(new String(message.getBody()) + ":" + new String(message.getChannel())); + }, "__keyspace*@*".getBytes()); + + withConnection("127.0.0.1", 7381, commands -> { + commands.set(key, "foo", SetArgs.Builder.px(1)); + }); + + assertThat(expiry.get(2, TimeUnit.SECONDS)).isEqualTo("expired:__keyspace@0__:10923"); + + connection.getSubscription().close(); + connection.close(); + } + + private void withConnection(String hostname, int port, Consumer> commandsConsumer) { + + RedisClient client = RedisClient.create(LettuceTestClientResources.getSharedClientResources(), + RedisURI.create(hostname, port)); + + StatefulRedisConnection connection = client.connect(); + commandsConsumer.accept(connection.sync()); + + connection.close(); + client.shutdownAsync(); + } + + static class CustomLettuceConnectionFactory extends LettuceConnectionFactory { + + CustomLettuceConnectionFactory(RedisConfiguration redisConfiguration) { + super(redisConfiguration); + } + + @Override + protected LettuceClusterConnection doCreateLettuceClusterConnection( + StatefulRedisClusterConnection sharedConnection, LettuceConnectionProvider connectionProvider, + ClusterTopologyProvider topologyProvider, ClusterCommandExecutor clusterCommandExecutor, + Duration commandTimeout) { + return new CustomLettuceClusterConnection(sharedConnection, connectionProvider, topologyProvider, + clusterCommandExecutor, commandTimeout); + } + } + + static class CustomLettuceClusterConnection extends LettuceClusterConnection { + + CustomLettuceClusterConnection(@Nullable StatefulRedisClusterConnection sharedConnection, + LettuceConnectionProvider connectionProvider, ClusterTopologyProvider clusterTopologyProvider, + ClusterCommandExecutor executor, Duration timeout) { + super(sharedConnection, connectionProvider, clusterTopologyProvider, executor, timeout); + } + + @Override + protected LettuceSubscription doCreateSubscription(MessageListener listener, + StatefulRedisPubSubConnection connection, LettuceConnectionProvider connectionProvider) { + return new CustomLettuceSubscription(listener, (StatefulRedisClusterPubSubConnection) connection, + connectionProvider); + } + } + + /** + * Customized {@link LettuceSubscription}. Enables + * {@link StatefulRedisClusterPubSubConnection#setNodeMessagePropagation(boolean)} and uses + * {@link io.lettuce.core.cluster.api.sync.NodeSelection} to subscribe to all master nodes. + */ + static class CustomLettuceSubscription extends LettuceSubscription { + + private final StatefulRedisClusterPubSubConnection connection; + + CustomLettuceSubscription(MessageListener listener, StatefulRedisClusterPubSubConnection connection, + LettuceConnectionProvider connectionProvider) { + super(listener, connection, connectionProvider); + this.connection = connection; + + // Must be enabled for keyspace notification propagation + this.connection.setNodeMessagePropagation(true); + } + + @Override + protected void doPsubscribe(byte[]... patterns) { + connection.sync().all().commands().psubscribe(patterns); + } + + @Override + protected void doPUnsubscribe(boolean all, byte[]... patterns) { + connection.sync().all().commands().punsubscribe(); + } + + @Override + protected void doSubscribe(byte[]... channels) { + connection.sync().all().commands().subscribe(channels); + } + + @Override + protected void doUnsubscribe(boolean all, byte[]... channels) { + connection.sync().all().commands().unsubscribe(); + } + } +} diff --git a/src/test/java/org/springframework/data/redis/listener/PubSubTestParams.java b/src/test/java/org/springframework/data/redis/listener/PubSubTestParams.java index 6c9757ece8..8ccbd569b5 100644 --- a/src/test/java/org/springframework/data/redis/listener/PubSubTestParams.java +++ b/src/test/java/org/springframework/data/redis/listener/PubSubTestParams.java @@ -15,20 +15,24 @@ */ package org.springframework.data.redis.listener; -import java.util.Arrays; +import java.util.ArrayList; import java.util.Collection; +import org.junit.runners.model.Statement; + import org.springframework.data.redis.ObjectFactory; import org.springframework.data.redis.Person; import org.springframework.data.redis.PersonObjectFactory; import org.springframework.data.redis.RawObjectFactory; import org.springframework.data.redis.SettingsUtils; import org.springframework.data.redis.StringObjectFactory; +import org.springframework.data.redis.connection.RedisClusterConfiguration; import org.springframework.data.redis.connection.jedis.JedisConnectionFactory; import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory; import org.springframework.data.redis.connection.lettuce.LettuceTestClientResources; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.data.redis.test.util.RedisClusterRule; /** * @author Costin Leau @@ -76,8 +80,49 @@ public static Collection testParams() { rawTemplateLtc.setConnectionFactory(lettuceConnFactory); rawTemplateLtc.afterPropertiesSet(); - return Arrays.asList(new Object[][] { { stringFactory, stringTemplate }, { personFactory, personTemplate }, - { rawFactory, rawTemplate }, { stringFactory, stringTemplateLtc }, { personFactory, personTemplateLtc }, - { rawFactory, rawTemplateLtc } }); + Collection parameters = new ArrayList<>(); + parameters.add(new Object[] { stringFactory, stringTemplate }); + parameters.add(new Object[] { personFactory, personTemplate }); + parameters.add(new Object[] { stringFactory, stringTemplateLtc }); + parameters.add(new Object[] { personFactory, personTemplateLtc }); + parameters.add(new Object[] { rawFactory, rawTemplateLtc }); + + if (clusterAvailable()) { + + RedisClusterConfiguration configuration = new RedisClusterConfiguration().clusterNode("127.0.0.1", 7379); + + // add Jedis + JedisConnectionFactory jedisClusterFactory = new JedisConnectionFactory(configuration); + jedisClusterFactory.afterPropertiesSet(); + + RedisTemplate jedisClusterStringTemplate = new StringRedisTemplate(jedisClusterFactory); + + // add Lettuce + LettuceConnectionFactory lettuceClusterFactory = new LettuceConnectionFactory(configuration); + lettuceClusterFactory.setClientResources(LettuceTestClientResources.getSharedClientResources()); + lettuceClusterFactory.afterPropertiesSet(); + + RedisTemplate lettuceClusterStringTemplate = new StringRedisTemplate(lettuceClusterFactory); + + parameters.add(new Object[] { stringFactory, jedisClusterStringTemplate }); + parameters.add(new Object[] { stringFactory, lettuceClusterStringTemplate }); + } + + return parameters; + } + + private static boolean clusterAvailable() { + + try { + new RedisClusterRule().apply(new Statement() { + @Override + public void evaluate() { + + } + }, null).evaluate(); + } catch (Throwable throwable) { + return false; + } + return true; } } diff --git a/src/test/java/org/springframework/data/redis/listener/PubSubTests.java b/src/test/java/org/springframework/data/redis/listener/PubSubTests.java index c2d86c6ed7..e0a3d56027 100644 --- a/src/test/java/org/springframework/data/redis/listener/PubSubTests.java +++ b/src/test/java/org/springframework/data/redis/listener/PubSubTests.java @@ -30,32 +30,31 @@ import org.junit.After; import org.junit.AfterClass; import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; + import org.springframework.core.task.SimpleAsyncTaskExecutor; import org.springframework.core.task.SyncTaskExecutor; import org.springframework.data.redis.ConnectionFactoryTracker; import org.springframework.data.redis.ObjectFactory; -import org.springframework.data.redis.RedisTestProfileValueSource; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.connection.jedis.JedisConnectionFactory; +import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; -import org.springframework.data.redis.test.util.RedisSentinelRule; /** * Base test class for PubSub integration tests * * @author Costin Leau * @author Jennifer Hickey + * @author Mark Paluch */ @RunWith(Parameterized.class) public class PubSubTests { - public @Rule RedisSentinelRule sentinelRule = RedisSentinelRule.withDefaultConfig().sentinelsDisabled(); - private static final String CHANNEL = "pubsub::test"; protected RedisMessageListenerContainer container; @@ -73,11 +72,6 @@ public void handleMessage(Object message) { private final MessageListenerAdapter adapter = new MessageListenerAdapter(handler); - @BeforeClass - public static void shouldRun() { - assumeTrue(RedisTestProfileValueSource.matches("runLongTests", "true")); - } - @Before public void setUp() throws Exception { bag.clear(); @@ -178,6 +172,9 @@ public void testStartNoListeners() { @SuppressWarnings("unchecked") @Test // DATAREDIS-251 public void testStartListenersToNoSpecificChannelTest() throws InterruptedException { + + assumeTrue(isClusterAware(template.getConnectionFactory())); + container.removeMessageListener(adapter, new ChannelTopic(CHANNEL)); container.addMessageListener(adapter, Arrays.asList(new PatternTopic("*"))); container.start(); @@ -193,4 +190,14 @@ public void testStartListenersToNoSpecificChannelTest() throws InterruptedExcept assertThat(set, hasItems(payload)); } + + private static boolean isClusterAware(RedisConnectionFactory connectionFactory) { + + if (connectionFactory instanceof LettuceConnectionFactory) { + return ((LettuceConnectionFactory) connectionFactory).isClusterAware(); + } else if (connectionFactory instanceof JedisConnectionFactory) { + return ((JedisConnectionFactory) connectionFactory).isRedisClusterAware(); + } + return false; + } } diff --git a/src/test/java/org/springframework/data/redis/test/util/RedisClusterRule.java b/src/test/java/org/springframework/data/redis/test/util/RedisClusterRule.java index 5fd9a0416f..6c85f60abd 100644 --- a/src/test/java/org/springframework/data/redis/test/util/RedisClusterRule.java +++ b/src/test/java/org/springframework/data/redis/test/util/RedisClusterRule.java @@ -23,6 +23,7 @@ import org.junit.rules.ExternalResource; import org.junit.rules.TestRule; import org.springframework.data.redis.connection.RedisClusterConfiguration; +import org.springframework.data.redis.connection.RedisConfiguration; import org.springframework.data.redis.connection.RedisNode; import org.springframework.data.redis.connection.jedis.JedisConverters; @@ -63,6 +64,10 @@ public void before() { Assume.assumeThat(mode, is("cluster")); } + public RedisConfiguration getConfiguration() { + return this.clusterConfig; + } + private void init() { if (clusterConfig == null) {