diff --git a/pom.xml b/pom.xml index 734a2c4e77..6afb8998f4 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ org.springframework.data spring-data-redis - 2.0.0.BUILD-SNAPSHOT + 2.0.0.DATAREDIS-659-SNAPSHOT Spring Data Redis diff --git a/src/main/java/org/springframework/data/redis/connection/ReactiveClusterServerCommands.java b/src/main/java/org/springframework/data/redis/connection/ReactiveClusterServerCommands.java new file mode 100644 index 0000000000..d44623442c --- /dev/null +++ b/src/main/java/org/springframework/data/redis/connection/ReactiveClusterServerCommands.java @@ -0,0 +1,186 @@ +/* + * Copyright 2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.redis.connection; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.Properties; + +import org.springframework.data.redis.core.types.RedisClientInfo; + +/** + * Redis Server commands executed in cluster environment using reactive infrastructure. + * + * @author Mark Paluch + * @author Christoph Strobl + * @since 2.0 + */ +public interface ReactiveClusterServerCommands extends ReactiveServerCommands { + + /** + * Start an {@literal Append Only File} rewrite process on the specific server. + * + * @param node must not be {@literal null}. + * @return {@link Mono} indicating command completion. + * @throws IllegalArgumentException when {@code node} is {@literal null}. + * @see RedisServerCommands#bgReWriteAof() + */ + Mono bgReWriteAof(RedisClusterNode node); + + /** + * Start background saving of db on server. + * + * @param node must not be {@literal null}. + * @return {@link Mono} indicating command received by server. Operation success needs to be checked via + * {@link #lastSave(RedisClusterNode)}. + * @throws IllegalArgumentException when {@code node} is {@literal null}. + * @see RedisServerCommands#bgSave() + */ + Mono bgSave(RedisClusterNode node); + + /** + * Get time unix timestamp of last successful {@link #bgSave()} operation in seconds. + * + * @param node must not be {@literal null}. + * @return @return {@link Mono} wrapping unix timestamp. + * @throws IllegalArgumentException when {@code node} is {@literal null}. + * @see RedisServerCommands#lastSave() + */ + Mono lastSave(RedisClusterNode node); + + /** + * Synchronous save current db snapshot on server. + * + * @param node must not be {@literal null}. + * @return {@link Mono} indicating command completion. + * @throws IllegalArgumentException when {@code node} is {@literal null}. + * @see RedisServerCommands#save() + */ + Mono save(RedisClusterNode node); + + /** + * Get the total number of available keys in currently selected database. + * + * @param node must not be {@literal null}. + * @return {@link Mono} wrapping number of keys. + * @throws IllegalArgumentException when {@code node} is {@literal null}. + * @see RedisServerCommands#dbSize() + */ + Mono dbSize(RedisClusterNode node); + + /** + * Delete all keys of the currently selected database. + * + * @param node must not be {@literal null}. {@link Mono} indicating command completion. + * @throws IllegalArgumentException when {@code node} is {@literal null}. + * @see RedisServerCommands#flushDb() + */ + Mono flushDb(RedisClusterNode node); + + /** + * Delete all all keys from all databases. + * + * @param node must not be {@literal null}. + * @return {@link Mono} indicating command completion. + * @throws IllegalArgumentException when {@code node} is {@literal null}. + * @see RedisServerCommands#flushAll() + */ + Mono flushAll(RedisClusterNode node); + + /** + * Load {@literal default} server information like + *
    + *
  • memory
  • + *
  • cpu utilization
  • + *
  • replication
  • + *
+ *

+ * + * @param node must not be {@literal null}. + * @return {@link Mono} wrapping server information. + * @throws IllegalArgumentException when {@code node} is {@literal null}. + * @see RedisServerCommands#info() + */ + Mono info(RedisClusterNode node); + + /** + * Load server information for given {@code selection}. + * + * @param node must not be {@literal null}. + * @param section must not be {@literal null} nor {@literal empty}. + * @return {@link Mono} wrapping server information of given {@code section}. + * @throws IllegalArgumentException when {@code node} is {@literal null}. + * @throws IllegalArgumentException when section is {@literal null} or {@literal empty}. + * @see RedisServerCommands#info(String) + */ + Mono info(RedisClusterNode node, String section); + + /** + * Load configuration parameters for given {@code pattern} from server. + * + * @param node must not be {@literal null}. + * @param pattern must not be {@literal null}. + * @return {@link Mono} wrapping configuration parameters matching given {@code pattern}. + * @throws IllegalArgumentException when {@code node} is {@literal null}. + * @throws IllegalArgumentException when {@code pattern} is {@literal null} or {@literal empty}. + * @see RedisServerCommands#getConfig(String) + */ + Mono getConfig(RedisClusterNode node, String pattern); + + /** + * Set server configuration for {@code param} to {@code value}. + * + * @param node must not be {@literal null}. + * @param param must not be {@literal null} nor {@literal empty}. + * @param value must not be {@literal null} nor {@literal empty}. + * @throws IllegalArgumentException when {@code node} is {@literal null}. + * @throws IllegalArgumentException when {@code pattern} / {@code value} is {@literal null} or {@literal empty}. + * @see RedisServerCommands#setConfig(String, String) + */ + Mono setConfig(RedisClusterNode node, String param, String value); + + /** + * Reset statistic counters on server.
+ * Counters can be retrieved using {@link #info()}. + * + * @param node must not be {@literal null}. + * @return {@link Mono} indicating command completion. + * @throws IllegalArgumentException when {@code node} is {@literal null}. + * @see RedisServerCommands#resetConfigStats() + */ + Mono resetConfigStats(RedisClusterNode node); + + /** + * Request server timestamp using {@code TIME} command. + * + * @param node must not be {@literal null}. + * @return {@link Mono} wrapping current server time in milliseconds. + * @throws IllegalArgumentException when {@code node} is {@literal null}. + * @see RedisServerCommands#time() + */ + Mono time(RedisClusterNode node); + + /** + * Request information and statistics about connected clients. + * + * @param node must not be {@literal null}. + * @return {@link Flux} emitting {@link RedisClientInfo} objects. + * @throws IllegalArgumentException when {@code node} is {@literal null}. + * @see RedisServerCommands#getClientList() + */ + Flux getClientList(RedisClusterNode node); +} diff --git a/src/main/java/org/springframework/data/redis/connection/ReactiveRedisClusterConnection.java b/src/main/java/org/springframework/data/redis/connection/ReactiveRedisClusterConnection.java index 384623da11..d705b5f637 100644 --- a/src/main/java/org/springframework/data/redis/connection/ReactiveRedisClusterConnection.java +++ b/src/main/java/org/springframework/data/redis/connection/ReactiveRedisClusterConnection.java @@ -15,8 +15,11 @@ */ package org.springframework.data.redis.connection; +import reactor.core.publisher.Mono; + /** * @author Christoph Strobl + * @author Mark Paluch * @since 2.0 */ public interface ReactiveRedisClusterConnection extends ReactiveRedisConnection { @@ -47,4 +50,17 @@ public interface ReactiveRedisClusterConnection extends ReactiveRedisConnection @Override ReactiveClusterHyperLogLogCommands hyperLogLogCommands(); + + @Override + ReactiveClusterServerCommands serverCommands(); + + /** + * Test the connection to a specific Redis cluster node. + * + * @param node must not be {@literal null}. + * @return {@link Mono} wrapping server response message - usually {@literal PONG}. + * @throws IllegalArgumentException when {@code node} is {@literal null}. + * @see RedisConnectionCommands#ping() + */ + Mono ping(RedisClusterNode node); } diff --git a/src/main/java/org/springframework/data/redis/connection/ReactiveRedisConnection.java b/src/main/java/org/springframework/data/redis/connection/ReactiveRedisConnection.java index 01f87a36bd..e64dba5e9f 100644 --- a/src/main/java/org/springframework/data/redis/connection/ReactiveRedisConnection.java +++ b/src/main/java/org/springframework/data/redis/connection/ReactiveRedisConnection.java @@ -16,6 +16,7 @@ package org.springframework.data.redis.connection; import lombok.Data; +import reactor.core.publisher.Mono; import java.io.Closeable; import java.nio.ByteBuffer; @@ -96,7 +97,7 @@ public interface ReactiveRedisConnection extends Closeable { /** * Get {@link ReactiveHashCommands}. * - * @return + * @return never {@literal null}. */ ReactiveHashCommands hashCommands(); @@ -114,6 +115,21 @@ public interface ReactiveRedisConnection extends Closeable { */ ReactiveHyperLogLogCommands hyperLogLogCommands(); + /** + * Get {@link ReactiveServerCommands}. + * + * @return never {@literal null}. + */ + ReactiveServerCommands serverCommands(); + + /** + * Test connection. + * + * @return {@link Mono} wrapping server response message - usually {@literal PONG}. + * @see Redis Documentation: PING + */ + Mono ping(); + /** * Base interface for Redis commands executed with a reactive infrastructure. * diff --git a/src/main/java/org/springframework/data/redis/connection/ReactiveServerCommands.java b/src/main/java/org/springframework/data/redis/connection/ReactiveServerCommands.java new file mode 100644 index 0000000000..18fbb21691 --- /dev/null +++ b/src/main/java/org/springframework/data/redis/connection/ReactiveServerCommands.java @@ -0,0 +1,187 @@ +/* + * Copyright 2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.redis.connection; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.Properties; + +import org.springframework.data.redis.core.types.RedisClientInfo; + +/** + * Redis Server commands executed using reactive infrastructure. + * + * @author Mark Paluch + * @author Christoph Strobl + * @since 2.0 + */ +public interface ReactiveServerCommands { + + /** + * Start an {@literal Append Only File} rewrite process on server. + * + * @return {@link Mono} indicating command completion. + * @see Redis Documentation: BGREWRITEAOF + */ + Mono bgReWriteAof(); + + /** + * Start background saving of db on server. + * + * @return {@link Mono} indicating command received by server. Operation success needs to be checked via + * {@link #lastSave()}. + * @see Redis Documentation: BGSAVE + */ + Mono bgSave(); + + /** + * Get time unix timestamp of last successful {@link #bgSave()} operation in seconds. + * + * @return {@link Mono} wrapping unix timestamp. + * @see Redis Documentation: LASTSAVE + */ + Mono lastSave(); + + /** + * Synchronous save current db snapshot on server. + * + * @return {@link Mono} indicating command completion. + * @see Redis Documentation: SAVE + */ + Mono save(); + + /** + * Get the total number of available keys in currently selected database. + * + * @return {@link Mono} wrapping number of keys. + * @see Redis Documentation: DBSIZE + */ + Mono dbSize(); + + /** + * Delete all keys of the currently selected database. + * + * @return {@link Mono} indicating command completion. + * @see Redis Documentation: FLUSHDB + */ + Mono flushDb(); + + /** + * Delete all all keys from all databases. + * + * @return {@link Mono} indicating command completion. + * @see Redis Documentation: FLUSHALL + */ + Mono flushAll(); + + /** + * Load {@literal default} server information like + *

    + *
  • memory
  • + *
  • cpu utilization
  • + *
  • replication
  • + *
+ *

+ * + * @return {@link Mono} wrapping server information. + * @see Redis Documentation: INFO + */ + Mono info(); + + /** + * Load server information for given {@code selection}. + * + * @param section must not be {@literal null} nor {@literal empty}. + * @return {@link Mono} wrapping server information of given {@code section}. + * @throws IllegalArgumentException when section is {@literal null} or {@literal empty}. + * @see Redis Documentation: INFO + */ + Mono info(String section); + + /** + * Load configuration parameters for given {@code pattern} from server. + * + * @param pattern must not be {@literal null}. + * @return {@link Mono} wrapping configuration parameters matching given {@code pattern}. + * @throws IllegalArgumentException when {@code pattern} is {@literal null} or {@literal empty}. + * @see Redis Documentation: CONFIG GET + */ + Mono getConfig(String pattern); + + /** + * Set server configuration for {@code param} to {@code value}. + * + * @param param must not be {@literal null} nor {@literal empty}. + * @param value must not be {@literal null} nor {@literal empty}. + * @throws IllegalArgumentException when {@code pattern} / {@code value} is {@literal null} or {@literal empty}. + * @see Redis Documentation: CONFIG SET + */ + Mono setConfig(String param, String value); + + /** + * Reset statistic counters on server.
+ * Counters can be retrieved using {@link #info()}. + * + * @return {@link Mono} indicating command completion. + * @see Redis Documentation: CONFIG RESETSTAT + */ + Mono resetConfigStats(); + + /** + * Request server timestamp using {@code TIME} command. + * + * @return {@link Mono} wrapping current server time in milliseconds. + * @see Redis Documentation: TIME + */ + Mono time(); + + /** + * Closes a given client connection identified by {@literal host:port}. + * + * @param host of connection to close. Must not be {@literal null} nor {@literal empty}. + * @param port of connection to close + * @return {@link Mono} wrapping {@link String} representation of the command result. + * @throws IllegalArgumentException if {@code host} is {@literal null} or {@literal empty}. + * @see Redis Documentation: CLIENT KILL + */ + Mono killClient(String host, int port); + + /** + * Assign given name to current connection. + * + * @param name must not be {@literal null} nor {@literal empty}. + * @throws IllegalArgumentException when {@code name} is {@literal null} or {@literal empty}. + * @see Redis Documentation: CLIENT SETNAME + */ + Mono setClientName(String name); + + /** + * Returns the name of the current connection. + * + * @return {@link Mono} wrapping the connection name. + * @see Redis Documentation: CLIENT GETNAME + */ + Mono getClientName(); + + /** + * Request information and statistics about connected clients. + * + * @return {@link Flux} emitting {@link RedisClientInfo} objects. + * @see Redis Documentation: CLIENT LIST + */ + Flux getClientList(); +} diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnection.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnection.java index 06d236f878..046f558caf 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnection.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnection.java @@ -29,7 +29,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.LinkedHashMap; -import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -614,35 +613,4 @@ public void destroy() throws Exception { } } } - - /** - * Lettuce specific implementation of {@link ClusterTopologyProvider}. - * - * @author Christoph Strobl - * @since 1.7 - */ - static class LettuceClusterTopologyProvider implements ClusterTopologyProvider { - - private final RedisClusterClient client; - - /** - * @param client must not be {@literal null}. - */ - public LettuceClusterTopologyProvider(RedisClusterClient client) { - - Assert.notNull(client, "RedisClusterClient must not be null."); - this.client = client; - } - - /* - * (non-Javadoc) - * @see org.springframework.data.redis.connection.ClusterTopologyProvider#getTopology() - */ - @Override - public ClusterTopology getTopology() { - return new ClusterTopology( - new LinkedHashSet<>(LettuceConverters.partitionsToClusterNodes(client.getPartitions()))); - } - } - } diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceClusterTopologyProvider.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceClusterTopologyProvider.java new file mode 100644 index 0000000000..46949730b1 --- /dev/null +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceClusterTopologyProvider.java @@ -0,0 +1,55 @@ +/* + * Copyright 2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.redis.connection.lettuce; + +import io.lettuce.core.cluster.RedisClusterClient; + +import java.util.LinkedHashSet; + +import org.springframework.data.redis.connection.ClusterTopology; +import org.springframework.data.redis.connection.ClusterTopologyProvider; +import org.springframework.util.Assert; + +/** + * Lettuce specific implementation of {@link ClusterTopologyProvider}. + * + * @author Christoph Strobl + * @author Mark Paluch + * @since 1.7 + */ +class LettuceClusterTopologyProvider implements ClusterTopologyProvider { + + private final RedisClusterClient client; + + /** + * @param client must not be {@literal null}. + */ + LettuceClusterTopologyProvider(RedisClusterClient client) { + + Assert.notNull(client, "RedisClusterClient must not be null."); + + this.client = client; + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.ClusterTopologyProvider#getTopology() + */ + @Override + public ClusterTopology getTopology() { + return new ClusterTopology(new LinkedHashSet<>(LettuceConverters.partitionsToClusterNodes(client.getPartitions()))); + } +} diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactory.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactory.java index 6425fbb3eb..2278224560 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactory.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactory.java @@ -41,18 +41,7 @@ import org.springframework.data.redis.ExceptionTranslationStrategy; import org.springframework.data.redis.PassThroughExceptionTranslationStrategy; import org.springframework.data.redis.RedisConnectionFailureException; -import org.springframework.data.redis.connection.ClusterCommandExecutor; -import org.springframework.data.redis.connection.Pool; -import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory; -import org.springframework.data.redis.connection.RedisClusterConfiguration; -import org.springframework.data.redis.connection.RedisClusterConnection; -import org.springframework.data.redis.connection.RedisConnection; -import org.springframework.data.redis.connection.RedisConnectionFactory; -import org.springframework.data.redis.connection.RedisNode; -import org.springframework.data.redis.connection.RedisPassword; -import org.springframework.data.redis.connection.RedisSentinelConfiguration; -import org.springframework.data.redis.connection.RedisSentinelConnection; -import org.springframework.data.redis.connection.RedisStandaloneConfiguration; +import org.springframework.data.redis.connection.*; import org.springframework.util.Assert; import org.springframework.util.ClassUtils; @@ -772,7 +761,7 @@ private AbstractRedisClient createRedisClient() { .orElseGet(() -> RedisClusterClient.create(initialUris)); this.clusterCommandExecutor = new ClusterCommandExecutor( - new LettuceClusterConnection.LettuceClusterTopologyProvider(clusterClient), + new LettuceClusterTopologyProvider(clusterClient), new LettuceClusterConnection.LettuceClusterNodeResourceProvider(clusterClient), EXCEPTION_TRANSLATION); clientConfiguration.getClientOptions() // diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConverters.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConverters.java index 314ca5e510..c474220c8e 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConverters.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConverters.java @@ -106,126 +106,101 @@ abstract public class LettuceConverters extends Converters { public static final byte[] NEGATIVE_INFINITY_BYTES; static { - DATE_TO_LONG = new Converter() { - public Long convert(Date source) { - return source != null ? source.getTime() : null; - } - }; - BYTES_LIST_TO_BYTES_SET = new Converter, Set>() { - public Set convert(List results) { - return results != null ? new LinkedHashSet<>(results) : null; - } - }; - BYTES_TO_STRING = new Converter() { - @Override - public String convert(byte[] source) { - if (source == null || Arrays.equals(source, new byte[0])) { - return null; - } - return new String(source); + DATE_TO_LONG = source -> source != null ? source.getTime() : null; + + BYTES_LIST_TO_BYTES_SET = results -> results != null ? new LinkedHashSet<>(results) : null; + + BYTES_TO_STRING = source -> { + if (source == null || Arrays.equals(source, new byte[0])) { + return null; } + return new String(source); }; - STRING_TO_BYTES = new Converter() { - @Override - public byte[] convert(String source) { - if (source == null) { - return null; - } - return source.getBytes(); + STRING_TO_BYTES = source -> { + if (source == null) { + return null; } + return source.getBytes(); }; - BYTES_SET_TO_BYTES_LIST = new Converter, List>() { - public List convert(Set results) { - return results != null ? new ArrayList<>(results) : null; + + BYTES_SET_TO_BYTES_LIST = results -> results != null ? new ArrayList<>(results) : null; + + BYTES_COLLECTION_TO_BYTES_LIST = results -> { + + if (results instanceof List) { + return (List) results; } + return results != null ? new ArrayList<>(results) : null; }; - BYTES_COLLECTION_TO_BYTES_LIST = new Converter, List>() { - public List convert(Collection results) { - if (results instanceof List) { - return (List) results; - } - return results != null ? new ArrayList<>(results) : null; + + KEY_VALUE_TO_BYTES_LIST = source -> { + + if (source == null) { + return null; } + List list = new ArrayList<>(2); + list.add(source.getKey()); + list.add(source.getValue()); + + return list; }; - KEY_VALUE_TO_BYTES_LIST = new Converter, List>() { - public List convert(KeyValue source) { - if (source == null) { - return null; - } - List list = new ArrayList<>(2); - list.add(source.getKey()); - list.add(source.getValue()); - return list; + BYTES_LIST_TO_MAP = source -> { + + if (CollectionUtils.isEmpty(source)) { + return Collections.emptyMap(); } - }; - BYTES_LIST_TO_MAP = new Converter, Map>() { - @Override - public Map convert(final List source) { + Map target = new LinkedHashMap<>(); - if (CollectionUtils.isEmpty(source)) { - return Collections.emptyMap(); - } + Iterator kv = source.iterator(); + while (kv.hasNext()) { + target.put(kv.next(), kv.hasNext() ? kv.next() : null); + } - Map target = new LinkedHashMap<>(); + return target; + }; - Iterator kv = source.iterator(); - while (kv.hasNext()) { - target.put(kv.next(), kv.hasNext() ? kv.next() : null); - } + SCORED_VALUES_TO_TUPLE_SET = source -> { - return target; + if (source == null) { + return null; } - }; - SCORED_VALUES_TO_TUPLE_SET = new Converter>, Set>() { - public Set convert(List> source) { - if (source == null) { - return null; - } - Set tuples = new LinkedHashSet<>(source.size()); - for (ScoredValue value : source) { - tuples.add(LettuceConverters.toTuple(value)); - } - return tuples; + Set tuples = new LinkedHashSet<>(source.size()); + for (ScoredValue value : source) { + tuples.add(LettuceConverters.toTuple(value)); } + return tuples; }; - SCORED_VALUES_TO_TUPLE_LIST = new Converter>, List>() { - public List convert(List> source) { - if (source == null) { - return null; - } - List tuples = new ArrayList<>(source.size()); - for (ScoredValue value : source) { - tuples.add(LettuceConverters.toTuple(value)); - } - return tuples; + SCORED_VALUES_TO_TUPLE_LIST = source -> { + + if (source == null) { + return null; } - }; - SCORED_VALUE_TO_TUPLE = new Converter, Tuple>() { - public Tuple convert(ScoredValue source) { - return source != null ? new DefaultTuple(source.getValue(), Double.valueOf(source.getScore())) : null; + List tuples = new ArrayList<>(source.size()); + for (ScoredValue value : source) { + tuples.add(LettuceConverters.toTuple(value)); } + return tuples; }; - BYTES_LIST_TO_TUPLE_LIST_CONVERTER = new Converter, List>() { - @Override - public List convert(List source) { + SCORED_VALUE_TO_TUPLE = source -> source != null + ? new DefaultTuple(source.getValue(), Double.valueOf(source.getScore())) : null; - if (CollectionUtils.isEmpty(source)) { - return Collections.emptyList(); - } + BYTES_LIST_TO_TUPLE_LIST_CONVERTER = source -> { - List tuples = new ArrayList<>(); - Iterator it = source.iterator(); - while (it.hasNext()) { - tuples.add( - new DefaultTuple(it.next(), it.hasNext() ? Double.valueOf(LettuceConverters.toString(it.next())) : null)); - } - return tuples; + if (CollectionUtils.isEmpty(source)) { + return Collections.emptyList(); } + + List tuples = new ArrayList<>(); + Iterator it = source.iterator(); + while (it.hasNext()) { + tuples.add(new DefaultTuple(it.next(), it.hasNext() ? Double.valueOf(toString(it.next())) : null)); + } + return tuples; }; PARTITIONS_TO_CLUSTER_NODES = new Converter>() { @@ -301,45 +276,24 @@ private Set parseFlags(Set source) { POSITIVE_INFINITY_BYTES = toBytes("+inf"); NEGATIVE_INFINITY_BYTES = toBytes("-inf"); - BYTES_LIST_TO_TIME_CONVERTER = new Converter, Long>() { + BYTES_LIST_TO_TIME_CONVERTER = source -> { - @Override - public Long convert(List source) { + Assert.notEmpty(source, "Received invalid result from server. Expected 2 items in collection."); + Assert.isTrue(source.size() == 2, + "Received invalid nr of arguments from redis server. Expected 2 received " + source.size()); - Assert.notEmpty(source, "Received invalid result from server. Expected 2 items in collection."); - Assert.isTrue(source.size() == 2, - "Received invalid nr of arguments from redis server. Expected 2 received " + source.size()); - - return toTimeMillis(LettuceConverters.toString(source.get(0)), LettuceConverters.toString(source.get(1))); - } + return toTimeMillis(toString(source.get(0)), toString(source.get(1))); }; - GEO_COORDINATE_TO_POINT_CONVERTER = new Converter() { - @Override - public Point convert(io.lettuce.core.GeoCoordinates geoCoordinate) { - return geoCoordinate != null ? new Point(geoCoordinate.getX().doubleValue(), geoCoordinate.getY().doubleValue()) - : null; - } - }; + GEO_COORDINATE_TO_POINT_CONVERTER = geoCoordinate -> geoCoordinate != null + ? new Point(geoCoordinate.getX().doubleValue(), geoCoordinate.getY().doubleValue()) : null; GEO_COORDINATE_LIST_TO_POINT_LIST_CONVERTER = new ListConverter<>(GEO_COORDINATE_TO_POINT_CONVERTER); - KEY_VALUE_UNWRAPPER = new Converter, Object>() { - - @Override - public Object convert(KeyValue source) { - return source.getValueOrElse(null); - } - }; + KEY_VALUE_UNWRAPPER = source -> source.getValueOrElse(null); KEY_VALUE_LIST_UNWRAPPER = new ListConverter<>(KEY_VALUE_UNWRAPPER); - TRANSACTION_RESULT_UNWRAPPER = new Converter>() { - - @Override - public List convert(TransactionResult transactionResult) { - return transactionResult.stream().collect(Collectors.toList()); - } - }; + TRANSACTION_RESULT_UNWRAPPER = transactionResult -> transactionResult.stream().collect(Collectors.toList()); } public static List toTuple(List list) { @@ -355,16 +309,14 @@ public static Point geoCoordinatesToPoint(GeoCoordinates geoCoordinates) { } public static Converter> stringToRedisClientListConverter() { - return new Converter>() { - @Override - public List convert(String source) { - if (!StringUtils.hasText(source)) { - return Collections.emptyList(); - } + return source -> { - return STRING_TO_LIST_OF_CLIENT_INFO.convert(source.split("\\r?\\n")); + if (!StringUtils.hasText(source)) { + return Collections.emptyList(); } + + return STRING_TO_LIST_OF_CLIENT_INFO.convert(source.split("\\r?\\n")); }; } @@ -445,6 +397,7 @@ public static String toString(byte[] source) { } public static ScriptOutputType toScriptOutputType(ReturnType returnType) { + switch (returnType) { case BOOLEAN: return ScriptOutputType.BOOLEAN; @@ -479,6 +432,7 @@ public static Converter, Map> bytesListToMapConvert } public static SortArgs toSortArgs(SortParameters params) { + SortArgs args = new SortArgs(); if (params == null) { return args; @@ -650,6 +604,7 @@ public static List toListOfRedisServer(List> so * @since 1.5 */ public static RedisURI sentinelConfigurationToRedisURI(RedisSentinelConfiguration sentinelConfiguration) { + Assert.notNull(sentinelConfiguration, "RedisSentinelConfiguration is required"); Set sentinels = sentinelConfiguration.getSentinels(); @@ -856,6 +811,27 @@ public static GeoArgs toGeoArgs(GeoRadiusCommandArgs args) { return geoArgs; } + /** + * Convert {@code CONFIG GET} output from a {@link List} to {@link Properties}. + * + * @param input must not be {@literal null}. + * @return the mapped result. + */ + public static Properties toProperties(List input) { + + Assert.notNull(input, "Input list must not be null!"); + Assert.isTrue(input.size() % 2 == 0, "Input list must contain an even number of entries!"); + + Properties properties = new Properties(); + + for (int i = 0; i < input.size(); i += 2) { + + properties.setProperty(input.get(i), input.get(i + 1)); + } + + return properties; + } + /** * Get {@link Converter} capable of {@link Set} of {@link Byte} into {@link GeoResults}. * @@ -863,22 +839,19 @@ public static GeoArgs toGeoArgs(GeoRadiusCommandArgs args) { * @since 1.8 */ public static Converter, GeoResults>> bytesSetToGeoResultsConverter() { - return new Converter, GeoResults>>() { - @Override - public GeoResults> convert(Set source) { + return source -> { - if (CollectionUtils.isEmpty(source)) { - return new GeoResults<>(Collections.>> emptyList()); - } + if (CollectionUtils.isEmpty(source)) { + return new GeoResults<>(Collections.>> emptyList()); + } - List>> results = new ArrayList<>(source.size()); - Iterator it = source.iterator(); - while (it.hasNext()) { - results.add(new GeoResult<>(new GeoLocation<>(it.next(), null), new Distance(0D))); - } - return new GeoResults<>(results); + List>> results = new ArrayList<>(source.size()); + Iterator it = source.iterator(); + while (it.hasNext()) { + results.add(new GeoResult<>(new GeoLocation<>(it.next(), null), new Distance(0D))); } + return new GeoResults<>(results); }; } diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveClusterServerCommands.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveClusterServerCommands.java new file mode 100644 index 0000000000..1877ece49e --- /dev/null +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveClusterServerCommands.java @@ -0,0 +1,379 @@ +/* + * Copyright 2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.redis.connection.lettuce; + +import io.lettuce.core.api.reactive.RedisServerReactiveCommands; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.util.function.Tuple2; +import reactor.util.function.Tuples; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Date; +import java.util.HashSet; +import java.util.List; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.Set; +import java.util.function.BiConsumer; +import java.util.function.BinaryOperator; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collector; + +import org.reactivestreams.Publisher; +import org.springframework.data.redis.connection.ClusterTopologyProvider; +import org.springframework.data.redis.connection.ReactiveClusterServerCommands; +import org.springframework.data.redis.connection.RedisClusterNode; +import org.springframework.data.redis.core.types.RedisClientInfo; +import org.springframework.data.redis.util.ByteUtils; +import org.springframework.util.Assert; + +/** + * {@link ReactiveClusterServerCommands} implementation for {@literal Lettuce}. + * + * @author Mark Paluch + * @author Christoph Strobl + * @since 2.0 + */ +class LettuceReactiveClusterServerCommands extends LettuceReactiveServerCommands + implements ReactiveClusterServerCommands { + + private final LettuceReactiveRedisClusterConnection connection; + private final ClusterTopologyProvider topologyProvider; + + /** + * Create new {@link LettuceReactiveGeoCommands}. + * + * @param connection must not be {@literal null}. + * @param topologyProvider must not be {@literal null}. + * @throws IllegalArgumentException when {@code connection} is {@literal null}. + * @throws IllegalArgumentException when {@code topologyProvider} is {@literal null}. + */ + public LettuceReactiveClusterServerCommands(LettuceReactiveRedisClusterConnection connection, + ClusterTopologyProvider topologyProvider) { + + super(connection); + + this.connection = connection; + this.topologyProvider = topologyProvider; + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.ReactiveClusterServerCommands#bgReWriteAof(org.springframework.data.redis.connection.RedisClusterNode) + */ + @Override + public Mono bgReWriteAof(RedisClusterNode node) { + return connection.execute(node, RedisServerReactiveCommands::bgrewriteaof).next(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.ReactiveClusterServerCommands#bgSave(org.springframework.data.redis.connection.RedisClusterNode) + */ + @Override + public Mono bgSave(RedisClusterNode node) { + return connection.execute(node, RedisServerReactiveCommands::bgsave).next(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.ReactiveClusterServerCommands#lastSave(org.springframework.data.redis.connection.RedisClusterNode) + */ + @Override + public Mono lastSave(RedisClusterNode node) { + return connection.execute(node, RedisServerReactiveCommands::lastsave).map(Date::getTime).next(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.ReactiveClusterServerCommands#save(org.springframework.data.redis.connection.RedisClusterNode) + */ + @Override + public Mono save(RedisClusterNode node) { + return connection.execute(node, RedisServerReactiveCommands::save).next(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.ReactiveClusterServerCommands#dbSize(org.springframework.data.redis.connection.RedisClusterNode) + */ + @Override + public Mono dbSize(RedisClusterNode node) { + return connection.execute(node, RedisServerReactiveCommands::dbsize).next(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.ReactiveClusterServerCommands#flushDb(org.springframework.data.redis.connection.RedisClusterNode) + */ + @Override + public Mono flushDb(RedisClusterNode node) { + return connection.execute(node, RedisServerReactiveCommands::flushdb).next(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.ReactiveClusterServerCommands#flushAll(org.springframework.data.redis.connection.RedisClusterNode) + */ + @Override + public Mono flushAll(RedisClusterNode node) { + return connection.execute(node, RedisServerReactiveCommands::flushall).next(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.lettuce.LettuceReactiveServerCommands#info() + */ + @Override + public Mono info() { + return Flux.merge(executeOnAllNodes(this::info)).collect(PropertiesCollector.INSTANCE); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.ReactiveClusterServerCommands#info(org.springframework.data.redis.connection.RedisClusterNode) + */ + @Override + public Mono info(RedisClusterNode node) { + + return connection.execute(node, RedisServerReactiveCommands::info) // + .map(LettuceConverters::toProperties) // + .next(); + } + + /* (non-Javadoc) + * @see org.springframework.data.redis.connection.lettuce.LettuceReactiveServerCommands#info(java.lang.String) + */ + @Override + public Mono info(String section) { + + Assert.hasText(section, "Section must not be null nor empty!"); + + return Flux.merge(executeOnAllNodes(redisClusterNode -> info(redisClusterNode, section))) + .collect(PropertiesCollector.INSTANCE); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.ReactiveClusterServerCommands#info(org.springframework.data.redis.connection.RedisClusterNode, java.lang.String) + */ + @Override + public Mono info(RedisClusterNode node, String section) { + + Assert.hasText(section, "Section must not be null nor empty!"); + + return connection.execute(node, c -> c.info(section)) // + .map(LettuceConverters::toProperties).next(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.lettuce.LettuceReactiveServerCommands#getConfig(java.lang.String) + */ + @Override + public Mono getConfig(String pattern) { + + Assert.hasText(pattern, "Pattern must not be null nor empty!"); + + return Flux.merge(executeOnAllNodes(node -> getConfig(node, pattern))) // + .collect(PropertiesCollector.INSTANCE); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.ReactiveClusterServerCommands#getConfig(org.springframework.data.redis.connection.RedisClusterNode, java.lang.String) + */ + @Override + public Mono getConfig(RedisClusterNode node, String pattern) { + + Assert.hasText(pattern, "Pattern must not be null nor empty!"); + + return connection.execute(node, c -> c.configGet(pattern).collectList()) // + .map(LettuceConverters::toProperties) // + .next(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.lettuce.LettuceReactiveServerCommands#setConfig(java.lang.String, java.lang.String) + */ + @Override + public Mono setConfig(String param, String value) { + return Flux.merge(executeOnAllNodes(node -> setConfig(node, param, value))).map(Tuple2::getT2).last(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.ReactiveClusterServerCommands#setConfig(org.springframework.data.redis.connection.RedisClusterNode, java.lang.String, java.lang.String) + */ + @Override + public Mono setConfig(RedisClusterNode node, String param, String value) { + + Assert.hasText(param, "Param must not be null nor empty!"); + Assert.hasText(value, "Value must not be null nor empty!"); + + return connection.execute(node, c -> c.configSet(param, value)).next(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.lettuce.LettuceReactiveServerCommands#resetConfigStats() + */ + @Override + public Mono resetConfigStats() { + return Flux.merge(executeOnAllNodes(this::resetConfigStats)).map(Tuple2::getT2).last(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.ReactiveClusterServerCommands#resetConfigStats(org.springframework.data.redis.connection.RedisClusterNode) + */ + @Override + public Mono resetConfigStats(RedisClusterNode node) { + return connection.execute(node, RedisServerReactiveCommands::configResetstat).next(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.ReactiveClusterServerCommands#time(org.springframework.data.redis.connection.RedisClusterNode) + */ + @Override + public Mono time(RedisClusterNode node) { + + return connection.execute(node, RedisServerReactiveCommands::time) // + .map(ByteUtils::getBytes) // + .collectList() // + .map(LettuceConverters.toTimeConverter()::convert); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.lettuce.LettuceReactiveServerCommands#getClientList() + */ + @Override + public Flux getClientList() { + return Flux.merge(executeOnAllNodesMany(this::getClientList)).map(Tuple2::getT2); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.ReactiveClusterServerCommands#getClientList(org.springframework.data.redis.connection.RedisClusterNode) + */ + @Override + public Flux getClientList(RedisClusterNode node) { + + return connection.execute(node, RedisServerReactiveCommands::clientList) + .flatMapIterable(LettuceConverters.stringToRedisClientListConverter()::convert); + } + + private Collection>> executeOnAllNodes( + Function> callback) { + + Set nodes = topologyProvider.getTopology().getNodes(); + List>> pipeline = new ArrayList<>(nodes.size()); + + for (RedisClusterNode redisClusterNode : nodes) { + pipeline.add(callback.apply(redisClusterNode).map(p -> Tuples.of(redisClusterNode, p))); + } + + return pipeline; + } + + private Collection>> executeOnAllNodesMany( + Function> callback) { + + Set nodes = topologyProvider.getTopology().getNodes(); + List>> pipeline = new ArrayList<>(nodes.size()); + + for (RedisClusterNode redisClusterNode : nodes) { + pipeline.add(callback.apply(redisClusterNode).map(p -> Tuples.of(redisClusterNode, p))); + } + + return pipeline; + } + + /** + * Collector to merge {@link Tuple2} of {@link RedisClusterNode} and {@link Properties} into a single + * {@link Properties} object by prefixing original the keys with {@link RedisClusterNode#asString()}. + */ + private enum PropertiesCollector implements Collector, Properties, Properties> { + + INSTANCE; + + /* + * (non-Javadoc) + * @see java.util.stream.Collector#supplier() + */ + @Override + public Supplier supplier() { + return Properties::new; + } + + /* (non-Javadoc) + * @see java.util.stream.Collector#accumulator() + */ + @Override + public BiConsumer> accumulator() { + + return (properties, tuple) -> { + + for (Entry entry : tuple.getT2().entrySet()) { + properties.put(tuple.getT1().asString() + "." + entry.getKey(), entry.getValue()); + } + }; + } + + /* + * (non-Javadoc) + * @see java.util.stream.Collector#combiner() + */ + @Override + public BinaryOperator combiner() { + + return (left, right) -> { + + Properties merge = new Properties(); + + merge.putAll(left); + merge.putAll(right); + + return merge; + }; + } + + /* + * (non-Javadoc) + * @see java.util.stream.Collector#finisher() + */ + @Override + public Function finisher() { + return properties -> properties; + } + + /* + * (non-Javadoc) + * @see java.util.stream.Collector#characteristics() + */ + @Override + public Set characteristics() { + return new HashSet<>(Arrays.asList(Characteristics.UNORDERED, Characteristics.IDENTITY_FINISH)); + } + } +} diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveRedisClusterConnection.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveRedisClusterConnection.java index c41ea22c08..0deb30ff40 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveRedisClusterConnection.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveRedisClusterConnection.java @@ -15,20 +15,26 @@ */ package org.springframework.data.redis.connection.lettuce; +import io.lettuce.core.api.reactive.BaseRedisReactiveCommands; import io.lettuce.core.api.reactive.RedisReactiveCommands; import io.lettuce.core.cluster.RedisClusterClient; import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; import io.lettuce.core.cluster.api.reactive.RedisClusterReactiveCommands; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import java.nio.ByteBuffer; +import org.springframework.data.redis.connection.ClusterTopologyProvider; import org.springframework.data.redis.connection.ReactiveRedisClusterConnection; +import org.springframework.data.redis.connection.RedisClusterNode; import org.springframework.data.redis.connection.RedisNode; import org.springframework.util.Assert; import org.springframework.util.StringUtils; /** + * {@link ReactiveRedisClusterConnection} implementation for {@literal Lettuce}. + * * @author Christoph Strobl * @author Mark Paluch * @since 2.0 @@ -36,11 +42,25 @@ class LettuceReactiveRedisClusterConnection extends LettuceReactiveRedisConnection implements ReactiveRedisClusterConnection { - public LettuceReactiveRedisClusterConnection(RedisClusterClient client) { + private final ClusterTopologyProvider topologyProvider; + + /** + * Creates new {@link LettuceReactiveRedisClusterConnection}. + * + * @param client must not be {@literal null}. + * @throws IllegalArgumentException when {@code client} is {@literal null}. + * @throws org.springframework.dao.InvalidDataAccessResourceUsageException when {@code client} is not suitable for + * cluster environment. + */ + LettuceReactiveRedisClusterConnection(RedisClusterClient client) { + super(client); + + this.topologyProvider = new LettuceClusterTopologyProvider(client); } - /* (non-Javadoc) + /* + * (non-Javadoc) * @see org.springframework.data.redis.connection.lettuce.LettuceReactiveRedisConnection#keyCommands() */ @Override @@ -48,7 +68,8 @@ public LettuceReactiveClusterKeyCommands keyCommands() { return new LettuceReactiveClusterKeyCommands(this); } - /* (non-Javadoc) + /* + * (non-Javadoc) * @see org.springframework.data.redis.connection.lettuce.LettuceReactiveRedisConnection#listCommands() */ @Override @@ -56,7 +77,8 @@ public LettuceReactiveClusterListCommands listCommands() { return new LettuceReactiveClusterListCommands(this); } - /* (non-Javadoc) + /* + * (non-Javadoc) * @see org.springframework.data.redis.connection.lettuce.LettuceReactiveRedisConnection#setCommands() */ @Override @@ -64,7 +86,8 @@ public LettuceReactiveClusterSetCommands setCommands() { return new LettuceReactiveClusterSetCommands(this); } - /* (non-Javadoc) + /* + * (non-Javadoc) * @see org.springframework.data.redis.connection.lettuce.LettuceReactiveRedisConnection#zSetCommands() */ @Override @@ -72,7 +95,8 @@ public LettuceReactiveClusterZSetCommands zSetCommands() { return new LettuceReactiveClusterZSetCommands(this); } - /* (non-Javadoc) + /* + * (non-Javadoc) * @see org.springframework.data.redis.connection.lettuce.LettuceReactiveRedisConnection#hyperLogLogCommands() */ @Override @@ -80,7 +104,8 @@ public LettuceReactiveClusterHyperLogLogCommands hyperLogLogCommands() { return new LettuceReactiveClusterHyperLogLogCommands(this); } - /* (non-Javadoc) + /* + * (non-Javadoc) * @see org.springframework.data.redis.connection.lettuce.LettuceReactiveRedisConnection#stringCommands() */ @Override @@ -88,7 +113,8 @@ public LettuceReactiveClusterStringCommands stringCommands() { return new LettuceReactiveClusterStringCommands(this); } - /* (non-Javadoc) + /* + * (non-Javadoc) * @see org.springframework.data.redis.connection.lettuce.LettuceReactiveRedisConnection#geoCommands() */ @Override @@ -96,7 +122,8 @@ public LettuceReactiveClusterGeoCommands geoCommands() { return new LettuceReactiveClusterGeoCommands(this); } - /* (non-Javadoc) + /* + * (non-Javadoc) * @see org.springframework.data.redis.connection.lettuce.LettuceReactiveRedisConnection#hashCommands() */ @Override @@ -104,7 +131,8 @@ public LettuceReactiveClusterHashCommands hashCommands() { return new LettuceReactiveClusterHashCommands(this); } - /* (non-Javadoc) + /* + * (non-Javadoc) * @see org.springframework.data.redis.connection.lettuce.LettuceReactiveRedisConnection#numberCommands() */ @Override @@ -112,15 +140,35 @@ public LettuceReactiveClusterNumberCommands numberCommands() { return new LettuceReactiveClusterNumberCommands(this); } + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.lettuce.LettuceReactiveRedisConnection#serverCommands() + */ + @Override + public LettuceReactiveClusterServerCommands serverCommands() { + return new LettuceReactiveClusterServerCommands(this, topologyProvider); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.ReactiveRedisClusterConnection#ping(org.springframework.data.redis.connection.RedisClusterNode) + */ + @Override + public Mono ping(RedisClusterNode node) { + return execute(node, BaseRedisReactiveCommands::ping).next(); + } + /** - * @param callback - * @return + * @param node must not be {@literal null}. + * @param callback must not be {@literal null}. + * @throws IllegalArgumentException when {@code node} or {@code callback} is {@literal null}. + * @return {@link Flux} emitting execution results. */ public Flux execute(RedisNode node, LettuceReactiveCallback callback) { try { + Assert.notNull(node, "RedisClusterNode must not be null!"); Assert.notNull(callback, "ReactiveCallback must not be null!"); - Assert.notNull(node, "Node must not be null!"); } catch (IllegalArgumentException e) { return Flux.error(e); } @@ -128,10 +176,11 @@ public Flux execute(RedisNode node, LettuceReactiveCallback callback) return Flux.defer(() -> callback.doWithCommands(getCommands(node))).onErrorMap(translateException()); } - /* (non-Javadoc) + /* + * (non-Javadoc) * @see org.springframework.data.redis.connection.lettuce.LettuceReactiveRedisConnection#getConnection() */ - @SuppressWarnings("unchecked") + @SuppressWarnings({ "unchecked", "rawtypes" }) @Override protected StatefulRedisClusterConnection getConnection() { @@ -141,14 +190,15 @@ protected StatefulRedisClusterConnection getConnection() return (StatefulRedisClusterConnection) super.getConnection(); } - /* (non-Javadoc) + /* + * (non-Javadoc) * @see org.springframework.data.redis.connection.lettuce.LettuceReactiveRedisConnection#getCommands() */ protected RedisClusterReactiveCommands getCommands() { return getConnection().reactive(); } - @SuppressWarnings("unchecked") + @SuppressWarnings({ "unchecked", "rawtypes" }) protected RedisReactiveCommands getCommands(RedisNode node) { if (!(getConnection() instanceof StatefulRedisClusterConnection)) { diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveRedisConnection.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveRedisConnection.java index 4ec8fd69fe..8cafd44086 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveRedisConnection.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveRedisConnection.java @@ -19,11 +19,13 @@ import io.lettuce.core.RedisClient; import io.lettuce.core.api.StatefulConnection; import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.api.reactive.BaseRedisReactiveCommands; import io.lettuce.core.cluster.RedisClusterClient; import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; import io.lettuce.core.cluster.api.reactive.RedisClusterReactiveCommands; import io.lettuce.core.codec.RedisCodec; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import java.nio.ByteBuffer; import java.util.function.Function; @@ -38,6 +40,7 @@ import org.springframework.data.redis.connection.ReactiveListCommands; import org.springframework.data.redis.connection.ReactiveNumberCommands; import org.springframework.data.redis.connection.ReactiveRedisConnection; +import org.springframework.data.redis.connection.ReactiveServerCommands; import org.springframework.data.redis.connection.ReactiveSetCommands; import org.springframework.data.redis.connection.ReactiveStringCommands; import org.springframework.data.redis.connection.ReactiveZSetCommands; @@ -50,11 +53,18 @@ */ class LettuceReactiveRedisConnection implements ReactiveRedisConnection { - private StatefulConnection connection; - private static final RedisCodec CODEC = ByteBufferCodec.INSTANCE; - public LettuceReactiveRedisConnection(AbstractRedisClient client) { + private StatefulConnection connection; + + /** + * Creates new {@link LettuceReactiveRedisConnection}. + * + * @param client must not be {@literal null}. + * @throws IllegalArgumentException when {@code client} is {@literal null}. + * @throws InvalidDataAccessResourceUsageException when {@code client} is not suitable for connection. + */ + LettuceReactiveRedisConnection(AbstractRedisClient client) { Assert.notNull(client, "RedisClient must not be null!"); @@ -77,7 +87,8 @@ public ReactiveKeyCommands keyCommands() { return new LettuceReactiveKeyCommands(this); } - /* (non-Javadoc) + /* + * (non-Javadoc) * @see org.springframework.data.redis.connection.ReactiveRedisConnection#stringCommands() */ @Override @@ -85,7 +96,8 @@ public ReactiveStringCommands stringCommands() { return new LettuceReactiveStringCommands(this); } - /* (non-Javadoc) + /* + * (non-Javadoc) * @see org.springframework.data.redis.connection.ReactiveRedisConnection#numberCommands() */ @Override @@ -93,7 +105,8 @@ public ReactiveNumberCommands numberCommands() { return new LettuceReactiveNumberCommands(this); } - /* (non-Javadoc) + /* + * (non-Javadoc) * @see org.springframework.data.redis.connection.ReactiveRedisConnection#listCommands() */ @Override @@ -101,7 +114,8 @@ public ReactiveListCommands listCommands() { return new LettuceReactiveListCommands(this); } - /* (non-Javadoc) + /* + * (non-Javadoc) * @see org.springframework.data.redis.connection.ReactiveRedisConnection#setCommands() */ @Override @@ -109,7 +123,8 @@ public ReactiveSetCommands setCommands() { return new LettuceReactiveSetCommands(this); } - /* (non-Javadoc) + /* + * (non-Javadoc) * @see org.springframework.data.redis.connection.ReactiveRedisConnection#zSetCommands() */ @Override @@ -117,7 +132,8 @@ public ReactiveZSetCommands zSetCommands() { return new LettuceReactiveZSetCommands(this); } - /* (non-Javadoc) + /* + * (non-Javadoc) * @see org.springframework.data.redis.connection.ReactiveRedisConnection#hashCommands() */ @Override @@ -125,7 +141,8 @@ public ReactiveHashCommands hashCommands() { return new LettuceReactiveHashCommands(this); } - /* (non-Javadoc) + /* + * (non-Javadoc) * @see org.springframework.data.redis.connection.ReactiveRedisConnection#geoCommands() */ @Override @@ -133,7 +150,8 @@ public ReactiveGeoCommands geoCommands() { return new LettuceReactiveGeoCommands(this); } - /* (non-Javadoc) + /* + * (non-Javadoc) * @see org.springframework.data.redis.connection.ReactiveRedisConnection#hyperLogLogCommands() */ @Override @@ -141,6 +159,24 @@ public ReactiveHyperLogLogCommands hyperLogLogCommands() { return new LettuceReactiveHyperLogLogCommands(this); } + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.ReactiveRedisConnection#hyperLogLogCommands() + */ + @Override + public ReactiveServerCommands serverCommands() { + return new LettuceReactiveServerCommands(this); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.ReactiveRedisConnection#ping() + */ + @Override + public Mono ping() { + return execute(BaseRedisReactiveCommands::ping).next(); + } + /** * @param callback * @return @@ -191,7 +227,7 @@ interface LettuceReactiveCallback { Publisher doWithCommands(RedisClusterReactiveCommands cmd); } - static enum ByteBufferCodec implements RedisCodec { + enum ByteBufferCodec implements RedisCodec { INSTANCE; diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveServerCommands.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveServerCommands.java new file mode 100644 index 0000000000..5ab0453b55 --- /dev/null +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveServerCommands.java @@ -0,0 +1,238 @@ +/* + * Copyright 2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.redis.connection.lettuce; + +import io.lettuce.core.api.reactive.RedisServerReactiveCommands; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.nio.ByteBuffer; +import java.util.Date; +import java.util.Properties; + +import org.springframework.data.redis.connection.ReactiveServerCommands; +import org.springframework.data.redis.core.types.RedisClientInfo; +import org.springframework.data.redis.util.ByteUtils; +import org.springframework.util.Assert; + +/** + * {@link ReactiveServerCommands} implementation for {@literal Lettuce}. + * + * @author Mark Paluch + * @author Christoph Strobl + */ +class LettuceReactiveServerCommands implements ReactiveServerCommands { + + private final LettuceReactiveRedisConnection connection; + + /** + * Create new {@link LettuceReactiveGeoCommands}. + * + * @param connection must not be {@literal null}. + * @throws IllegalArgumentException when {@code connection} is {@literal null}. + */ + LettuceReactiveServerCommands(LettuceReactiveRedisConnection connection) { + + Assert.notNull(connection, "Connection must not be null!"); + + this.connection = connection; + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.ReactiveServerCommands#bgReWriteAof() + */ + @Override + public Mono bgReWriteAof() { + return connection.execute(RedisServerReactiveCommands::bgrewriteaof).next(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.ReactiveServerCommands#bgSave() + */ + @Override + public Mono bgSave() { + return connection.execute(RedisServerReactiveCommands::bgsave).next(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.ReactiveServerCommands#lastSave() + */ + @Override + public Mono lastSave() { + return connection.execute(RedisServerReactiveCommands::lastsave).next().map(Date::getTime); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.ReactiveServerCommands#save() + */ + @Override + public Mono save() { + return connection.execute(RedisServerReactiveCommands::save).next(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.ReactiveServerCommands#dbSize() + */ + @Override + public Mono dbSize() { + return connection.execute(RedisServerReactiveCommands::dbsize).next(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.ReactiveServerCommands#flushDb() + */ + @Override + public Mono flushDb() { + return connection.execute(RedisServerReactiveCommands::flushdb).next(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.ReactiveServerCommands#flushAll() + */ + @Override + public Mono flushAll() { + return connection.execute(RedisServerReactiveCommands::flushall).next(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.ReactiveServerCommands#info() + */ + @Override + public Mono info() { + + return connection.execute(RedisServerReactiveCommands::info) // + .map(LettuceConverters::toProperties) // + .next(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.ReactiveServerCommands#info(java.lang.String) + */ + @Override + public Mono info(String section) { + + Assert.hasText(section, "Section must not be null nor empty!"); + + return connection.execute(c -> c.info(section)) // + .map(LettuceConverters::toProperties) // + .next(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.ReactiveServerCommands#getConfig(java.lang.String) + */ + @Override + public Mono getConfig(String pattern) { + + Assert.hasText(pattern, "Pattern must not be null nor empty!"); + + return connection.execute(c -> c.configGet(pattern).collectList()) // + .map(LettuceConverters::toProperties).next(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.ReactiveServerCommands#setConfig(java.lang.String, java.lang.String) + */ + @Override + public Mono setConfig(String param, String value) { + + Assert.hasText(param, "Param must not be null nor empty!"); + Assert.hasText(value, "Value must not be null nor empty!"); + + return connection.execute(c -> c.configSet(param, value)).next(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.ReactiveServerCommands#resetConfigStats() + */ + @Override + public Mono resetConfigStats() { + return connection.execute(RedisServerReactiveCommands::configResetstat).next(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.ReactiveServerCommands#time() + */ + @Override + public Mono time() { + + return connection.execute(RedisServerReactiveCommands::time) // + .map(ByteUtils::getBytes) // + .collectList() // + .map(LettuceConverters.toTimeConverter()::convert); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.ReactiveServerCommands#killClient(java.lang.String, int) + */ + @Override + public Mono killClient(String host, int port) { + + Assert.notNull(host, "Host must not be null nor empty!"); + + return connection.execute(c -> c.clientKill(String.format("%s:%s", host, port))).next(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.ReactiveServerCommands#setClientName(java.lang.String) + */ + @Override + public Mono setClientName(String name) { + + Assert.hasText(name, "Name must not be null nor empty!"); + + return connection.execute(c -> c.clientSetname(ByteBuffer.wrap(LettuceConverters.toBytes(name)))).next(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.ReactiveServerCommands#getClientName() + */ + @Override + public Mono getClientName() { + + return connection.execute(RedisServerReactiveCommands::clientGetname) // + .map(ByteUtils::getBytes) // + .map(LettuceConverters::toString) // + .next(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.ReactiveServerCommands#getClientList() + */ + @Override + public Flux getClientList() { + + return connection.execute(RedisServerReactiveCommands::clientList) + .flatMapIterable(s -> LettuceConverters.stringToRedisClientListConverter().convert(s)); + } +} diff --git a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnectionUnitTests.java b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnectionUnitTests.java index 69188645ad..d6a33164b1 100644 --- a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnectionUnitTests.java +++ b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnectionUnitTests.java @@ -104,7 +104,7 @@ public void setUp() { when(clusterMock.getPartitions()).thenReturn(partitions); ClusterCommandExecutor executor = new ClusterCommandExecutor( - new LettuceClusterConnection.LettuceClusterTopologyProvider(clusterMock), resourceProvider, + new LettuceClusterTopologyProvider(clusterMock), resourceProvider, LettuceClusterConnection.exceptionConverter); connection = new LettuceClusterConnection(clusterMock, executor) { diff --git a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveClusterServerCommandsTests.java b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveClusterServerCommandsTests.java new file mode 100644 index 0000000000..db80d3aeda --- /dev/null +++ b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveClusterServerCommandsTests.java @@ -0,0 +1,182 @@ +/* + * Copyright 2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.redis.connection.lettuce; + +import static org.assertj.core.api.Assertions.*; +import static org.springframework.data.redis.connection.ClusterTestVariables.*; +import static org.springframework.data.redis.connection.lettuce.LettuceReactiveCommandsTestsBase.*; + +import reactor.test.StepVerifier; + +import org.junit.Test; +import org.springframework.data.redis.connection.RedisClusterNode; + +/** + * @author Mark Paluch + */ +public class LettuceReactiveClusterServerCommandsTests extends LettuceReactiveClusterCommandsTestsBase { + + static final RedisClusterNode NODE1 = new RedisClusterNode(CLUSTER_HOST, MASTER_NODE_1_PORT); + static final RedisClusterNode NODE2 = new RedisClusterNode(CLUSTER_HOST, MASTER_NODE_2_PORT); + static final RedisClusterNode NODE3 = new RedisClusterNode(CLUSTER_HOST, MASTER_NODE_3_PORT); + + @Test // DATAREDIS-659 + public void pingShouldRespondCorrectly() { + StepVerifier.create(connection.ping(NODE1)).expectNext("PONG").verifyComplete(); + } + + @Test // DATAREDIS-659 + public void bgReWriteAofShouldRespondCorrectly() { + StepVerifier.create(connection.serverCommands().bgReWriteAof(NODE1)).expectNextCount(1).verifyComplete(); + } + + @Test // DATAREDIS-659 + public void bgSaveShouldRespondCorrectly() { + StepVerifier.create(connection.serverCommands().bgSave(NODE1)).expectNextCount(1).verifyComplete(); + } + + @Test // DATAREDIS-659 + public void lastSaveShouldRespondCorrectly() { + StepVerifier.create(connection.serverCommands().lastSave(NODE1)).expectNextCount(1).verifyComplete(); + } + + @Test // DATAREDIS-659 + public void saveShouldRespondCorrectly() { + StepVerifier.create(connection.serverCommands().save(NODE1)).expectNext("OK").verifyComplete(); + } + + @Test // DATAREDIS-659 + public void dbSizeShouldRespondCorrectly() { + StepVerifier.create(connection.serverCommands().dbSize(NODE1)).expectNextCount(1).verifyComplete(); + } + + @Test // DATAREDIS-659 + public void flushDbShouldRespondCorrectly() { + + StepVerifier + .create(connection.serverCommands().flushDb() // + .then(connection.stringCommands().set(KEY_1_BBUFFER, VALUE_1_BBUFFER)) // + .then(connection.stringCommands().set(KEY_2_BBUFFER, VALUE_2_BBUFFER))) // + .expectNextCount(1) // + .verifyComplete(); + + StepVerifier.create(connection.serverCommands().dbSize(NODE1)).expectNext(1L).verifyComplete(); + StepVerifier.create(connection.serverCommands().dbSize(NODE3)).expectNext(1L).verifyComplete(); + + StepVerifier.create(connection.serverCommands().flushDb(NODE1)).expectNext("OK").verifyComplete(); + + StepVerifier.create(connection.serverCommands().dbSize(NODE1)).expectNext(0L).verifyComplete(); + StepVerifier.create(connection.serverCommands().dbSize(NODE3)).expectNext(1L).verifyComplete(); + } + + @Test // DATAREDIS-659 + public void flushAllShouldRespondCorrectly() { + + StepVerifier + .create(connection.serverCommands().flushAll() // + .then(connection.stringCommands().set(KEY_1_BBUFFER, VALUE_1_BBUFFER)) // + .then(connection.stringCommands().set(KEY_2_BBUFFER, VALUE_2_BBUFFER))) // + .expectNextCount(1) // + .verifyComplete(); + + StepVerifier.create(connection.serverCommands().dbSize(NODE1)).expectNext(1L).verifyComplete(); + StepVerifier.create(connection.serverCommands().dbSize(NODE3)).expectNext(1L).verifyComplete(); + + StepVerifier.create(connection.serverCommands().flushAll(NODE1)).expectNext("OK").verifyComplete(); + + StepVerifier.create(connection.serverCommands().dbSize(NODE1)).expectNext(0L).verifyComplete(); + StepVerifier.create(connection.serverCommands().dbSize(NODE3)).expectNext(1L).verifyComplete(); + } + + @Test // DATAREDIS-659 + public void infoShouldRespondCorrectly() { + + StepVerifier.create(connection.serverCommands().info(NODE1)) // + .consumeNextWith(properties -> assertThat(properties).containsKey("tcp_port")) // + .verifyComplete(); + } + + @Test // DATAREDIS-659 + public void standaloneInfoWithSectionShouldRespondCorrectly() { + + StepVerifier.create(connection.serverCommands().info(NODE1, "server")) // + .consumeNextWith(properties -> { + assertThat(properties).containsKey("tcp_port").doesNotContainKey("role"); + }) // + .verifyComplete(); + } + + @Test // DATAREDIS-659 + public void getConfigShouldRespondCorrectly() { + + StepVerifier.create(connection.serverCommands().getConfig(NODE1, "*")) // + .consumeNextWith(properties -> { + assertThat(properties).containsEntry("databases", "16"); + }) // + .verifyComplete(); + } + + @Test // DATAREDIS-659 + public void setConfigShouldApplyConfiguration() throws InterruptedException { + + String resetValue = connection.serverCommands().getConfig("slowlog-max-len").map(it -> { + if (it.containsKey("slowlog-max-len")) { + return it.get("slowlog-max-len"); + } + return it.get("127.0.0.1:7379.slowlog-max-len"); + }).block().toString(); + + try { + StepVerifier.create(connection.serverCommands().setConfig("slowlog-max-len", resetValue)) // + .expectNext("OK") // + .verifyComplete(); + + StepVerifier.create(connection.serverCommands().setConfig(NODE1, "slowlog-max-len", "127")) // + .expectNext("OK") // + .verifyComplete(); + + StepVerifier.create(connection.serverCommands().getConfig(NODE1, "slowlog-max-len")) // + .consumeNextWith(properties -> { + assertThat(properties).containsEntry("slowlog-max-len", "127"); + }) // + .verifyComplete(); + + StepVerifier.create(connection.serverCommands().getConfig(NODE2, "slowlog-max-len")) // + .consumeNextWith(properties -> { + assertThat(properties).containsEntry("slowlog-max-len", resetValue); + }) // + .verifyComplete(); + } finally { + connection.serverCommands().setConfig("slowlog-max-len", resetValue).block(); + } + + } + + @Test // DATAREDIS-659 + public void configResetstatShouldRespondCorrectly() { + StepVerifier.create(connection.serverCommands().resetConfigStats(NODE1)).expectNext("OK").verifyComplete(); + } + + @Test // DATAREDIS-659 + public void timeShouldRespondCorrectly() { + StepVerifier.create(connection.serverCommands().time(NODE1)).expectNextCount(1).verifyComplete(); + } + + @Test // DATAREDIS-659 + public void getClientListShouldReportClient() { + StepVerifier.create(connection.serverCommands().getClientList(NODE1)).expectNextCount(1).thenCancel().verify(); + } +} diff --git a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveServerCommandsTests.java b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveServerCommandsTests.java new file mode 100644 index 0000000000..cbe6c43bf3 --- /dev/null +++ b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveServerCommandsTests.java @@ -0,0 +1,213 @@ +/* + * Copyright 2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.redis.connection.lettuce; + +import static org.assertj.core.api.Assertions.*; +import static org.junit.Assume.*; + +import reactor.test.StepVerifier; + +import org.junit.Test; + +/** + * @author Mark Paluch + */ +public class LettuceReactiveServerCommandsTests extends LettuceReactiveCommandsTestsBase { + + @Test // DATAREDIS-659 + public void pingShouldRespondCorrectly() { + StepVerifier.create(connection.ping()).expectNext("PONG").verifyComplete(); + } + + @Test // DATAREDIS-659 + public void bgReWriteAofShouldRespondCorrectly() { + StepVerifier.create(connection.serverCommands().bgReWriteAof()).expectNextCount(1).verifyComplete(); + } + + @Test // DATAREDIS-659 + public void bgSaveShouldRespondCorrectly() { + StepVerifier.create(connection.serverCommands().bgSave()).expectNextCount(1).verifyComplete(); + } + + @Test // DATAREDIS-659 + public void lastSaveShouldRespondCorrectly() { + StepVerifier.create(connection.serverCommands().lastSave()).expectNextCount(1).verifyComplete(); + } + + @Test // DATAREDIS-659 + public void saveShouldRespondCorrectly() { + StepVerifier.create(connection.serverCommands().save()).expectNext("OK").verifyComplete(); + } + + @Test // DATAREDIS-659 + public void dbSizeShouldRespondCorrectly() { + StepVerifier.create(connection.serverCommands().dbSize()).expectNextCount(1).verifyComplete(); + } + + @Test // DATAREDIS-659 + public void flushDbShouldRespondCorrectly() { + + StepVerifier + .create(connection.serverCommands().flushDb() // + .then(connection.stringCommands().set(KEY_1_BBUFFER, VALUE_1_BBUFFER))) // + .expectNextCount(1) // + .verifyComplete(); + + StepVerifier.create(connection.serverCommands().dbSize()).expectNext(1L).verifyComplete(); + + StepVerifier.create(connection.serverCommands().flushDb()).expectNext("OK").verifyComplete(); + + StepVerifier.create(connection.serverCommands().dbSize()).expectNext(0L).verifyComplete(); + } + + @Test // DATAREDIS-659 + public void flushAllShouldRespondCorrectly() { + + StepVerifier + .create(connection.serverCommands().flushAll() // + .then(connection.stringCommands().set(KEY_1_BBUFFER, VALUE_1_BBUFFER))) // + .expectNextCount(1) // + .verifyComplete(); + + StepVerifier.create(connection.serverCommands().dbSize()).expectNext(1L).verifyComplete(); + + StepVerifier.create(connection.serverCommands().flushAll()).expectNext("OK").verifyComplete(); + + StepVerifier.create(connection.serverCommands().dbSize()).expectNext(0L).verifyComplete(); + } + + @Test // DATAREDIS-659 + public void infoShouldRespondCorrectly() { + + if (connection instanceof LettuceReactiveRedisClusterConnection) { + + StepVerifier.create(connection.serverCommands().info()) // + .consumeNextWith(properties -> { + + assertThat(properties) // + .containsKey("127.0.0.1:7379.tcp_port") // + .containsKey("127.0.0.1:7380.tcp_port"); + }) // + .verifyComplete(); + } else { + + StepVerifier.create(connection.serverCommands().info()) // + .consumeNextWith(properties -> assertThat(properties).containsKey("tcp_port")) // + .verifyComplete(); + } + } + + @Test // DATAREDIS-659 + public void standaloneInfoWithSectionShouldRespondCorrectly() { + + if (connection instanceof LettuceReactiveRedisClusterConnection) { + + StepVerifier.create(connection.serverCommands().info("server")) // + .consumeNextWith(properties -> { + assertThat(properties).isNotEmpty() // + .containsKey("127.0.0.1:7379.tcp_port") // + .doesNotContainKey("127.0.0.1:7379.role"); + }) // + .verifyComplete(); + } else { + + StepVerifier.create(connection.serverCommands().info("server")) // + .consumeNextWith(properties -> { + assertThat(properties).containsKey("tcp_port").doesNotContainKey("role"); + }) // + .verifyComplete(); + } + } + + @Test // DATAREDIS-659 + public void getConfigShouldRespondCorrectly() { + + if (connection instanceof LettuceReactiveRedisClusterConnection) { + + StepVerifier.create(connection.serverCommands().getConfig("*")) // + .consumeNextWith(properties -> { + assertThat(properties).containsEntry("127.0.0.1:7379.databases", "16"); + + }) // + .verifyComplete(); + } else { + + StepVerifier.create(connection.serverCommands().getConfig("*")) // + .consumeNextWith(properties -> { + assertThat(properties).containsEntry("databases", "16"); + }) // + .verifyComplete(); + } + } + + @Test // DATAREDIS-659 + public void setConfigShouldApplyConfiguration() { + + String resetValue = connection.serverCommands().getConfig("slowlog-max-len").map(it -> { + if (it.containsKey("slowlog-max-len")) { + return it.get("slowlog-max-len"); + } + return it.get("127.0.0.1:7379.slowlog-max-len"); + }).block().toString(); + + try { + StepVerifier.create(connection.serverCommands().setConfig("slowlog-max-len", "127")) // + .expectNext("OK") // + .verifyComplete(); + + if (connection instanceof LettuceReactiveRedisClusterConnection) { + StepVerifier.create(connection.serverCommands().getConfig("slowlog-max-len")) // + .consumeNextWith(properties -> { + assertThat(properties).containsEntry("127.0.0.1:7379.slowlog-max-len", "127"); + }) // + .verifyComplete(); + } else { + StepVerifier.create(connection.serverCommands().getConfig("slowlog-max-len")) // + .consumeNextWith(properties -> { + assertThat(properties).containsEntry("slowlog-max-len", "127"); + }) // + .verifyComplete(); + } + } finally { + connection.serverCommands().setConfig("slowlog-max-len", resetValue).block(); + } + } + + @Test // DATAREDIS-659 + public void configResetstatShouldRespondCorrectly() { + StepVerifier.create(connection.serverCommands().resetConfigStats()).expectNext("OK").verifyComplete(); + } + + @Test // DATAREDIS-659 + public void timeShouldRespondCorrectly() { + StepVerifier.create(connection.serverCommands().time()).expectNextCount(1).verifyComplete(); + } + + @Test // DATAREDIS-659 + public void setClientNameShouldSetName() { + + // see lettuce-io/lettuce-core#563 + assumeFalse(connection instanceof LettuceReactiveRedisClusterConnection); + + StepVerifier.create(connection.serverCommands().setClientName("foo")).expectNextCount(1).verifyComplete(); + StepVerifier.create(connection.serverCommands().getClientName()).expectNext("foo").verifyComplete(); + } + + @Test // DATAREDIS-659 + public void getClientListShouldReportClient() { + StepVerifier.create(connection.serverCommands().getClientList()).expectNextCount(1).thenCancel().verify(); + } +}