getNodes();
}
-
- /**
- * Configuration interface suitable for single node redis connections using local unix domain socket.
- *
- * @author Christoph Strobl
- * @since 2.1
- */
- interface DomainSocketConfiguration extends WithDomainSocket, WithDatabaseIndex, WithPassword {
-
- }
}
diff --git a/src/main/java/org/springframework/data/redis/connection/RedisConnectionFactory.java b/src/main/java/org/springframework/data/redis/connection/RedisConnectionFactory.java
index 1251399e0a..96b7ce01f9 100644
--- a/src/main/java/org/springframework/data/redis/connection/RedisConnectionFactory.java
+++ b/src/main/java/org/springframework/data/redis/connection/RedisConnectionFactory.java
@@ -26,6 +26,18 @@
*/
public interface RedisConnectionFactory extends PersistenceExceptionTranslator {
+ /**
+ * Specifies if pipelined results should be converted to the expected data type.
+ *
+ * If {@literal false}, results of {@link RedisConnection#closePipeline()} and {@link RedisConnection#exec()} will be
+ * of the type returned by the underlying driver. This method is mostly for backwards compatibility with
+ * {@literal 1.0}. It is generally always a good idea to allow results to be converted and deserialized. In fact, this
+ * is now the default behavior.
+ *
+ * @return {@code true} to convert pipeline and transaction results; {@code false} otherwise.
+ */
+ boolean getConvertPipelineAndTxResults();
+
/**
* Returns a suitable {@link RedisConnection connection} for interacting with Redis.
*
@@ -45,18 +57,6 @@ public interface RedisConnectionFactory extends PersistenceExceptionTranslator {
*/
RedisClusterConnection getClusterConnection();
- /**
- * Specifies if pipelined results should be converted to the expected data type.
- *
- * If {@literal false}, results of {@link RedisConnection#closePipeline()} and {@link RedisConnection#exec()} will be
- * of the type returned by the underlying driver. This method is mostly for backwards compatibility with
- * {@literal 1.0}. It is generally always a good idea to allow results to be converted and deserialized. In fact, this
- * is now the default behavior.
- *
- * @return {@code true} to convert pipeline and transaction results; {@code false} otherwise.
- */
- boolean getConvertPipelineAndTxResults();
-
/**
* Returns a suitable {@link RedisSentinelConnection connection} for interacting with Redis Sentinel.
*
diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisConnection.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisConnection.java
index 0c23bb1717..eaa1b5ba4e 100644
--- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisConnection.java
+++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisConnection.java
@@ -15,21 +15,6 @@
*/
package org.springframework.data.redis.connection.jedis;
-import redis.clients.jedis.BuilderFactory;
-import redis.clients.jedis.CommandArguments;
-import redis.clients.jedis.CommandObject;
-import redis.clients.jedis.DefaultJedisClientConfig;
-import redis.clients.jedis.HostAndPort;
-import redis.clients.jedis.Jedis;
-import redis.clients.jedis.JedisClientConfig;
-import redis.clients.jedis.Pipeline;
-import redis.clients.jedis.Response;
-import redis.clients.jedis.Transaction;
-import redis.clients.jedis.commands.ProtocolCommand;
-import redis.clients.jedis.commands.ServerCommands;
-import redis.clients.jedis.exceptions.JedisDataException;
-import redis.clients.jedis.util.Pool;
-
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
@@ -47,7 +32,25 @@
import org.springframework.data.redis.ExceptionTranslationStrategy;
import org.springframework.data.redis.FallbackExceptionTranslationStrategy;
import org.springframework.data.redis.RedisSystemException;
-import org.springframework.data.redis.connection.*;
+import org.springframework.data.redis.connection.AbstractRedisConnection;
+import org.springframework.data.redis.connection.FutureResult;
+import org.springframework.data.redis.connection.MessageListener;
+import org.springframework.data.redis.connection.RedisCommands;
+import org.springframework.data.redis.connection.RedisGeoCommands;
+import org.springframework.data.redis.connection.RedisHashCommands;
+import org.springframework.data.redis.connection.RedisHyperLogLogCommands;
+import org.springframework.data.redis.connection.RedisKeyCommands;
+import org.springframework.data.redis.connection.RedisListCommands;
+import org.springframework.data.redis.connection.RedisNode;
+import org.springframework.data.redis.connection.RedisPipelineException;
+import org.springframework.data.redis.connection.RedisScriptingCommands;
+import org.springframework.data.redis.connection.RedisServerCommands;
+import org.springframework.data.redis.connection.RedisSetCommands;
+import org.springframework.data.redis.connection.RedisStreamCommands;
+import org.springframework.data.redis.connection.RedisStringCommands;
+import org.springframework.data.redis.connection.RedisSubscribedConnectionException;
+import org.springframework.data.redis.connection.RedisZSetCommands;
+import org.springframework.data.redis.connection.Subscription;
import org.springframework.data.redis.connection.convert.TransactionResultConverter;
import org.springframework.data.redis.connection.jedis.JedisInvoker.ResponseCommands;
import org.springframework.data.redis.connection.jedis.JedisResult.JedisResultBuilder;
@@ -56,6 +59,24 @@
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import redis.clients.jedis.BuilderFactory;
+import redis.clients.jedis.CommandArguments;
+import redis.clients.jedis.CommandObject;
+import redis.clients.jedis.DefaultJedisClientConfig;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisClientConfig;
+import redis.clients.jedis.Pipeline;
+import redis.clients.jedis.Response;
+import redis.clients.jedis.Transaction;
+import redis.clients.jedis.commands.ProtocolCommand;
+import redis.clients.jedis.commands.ServerCommands;
+import redis.clients.jedis.exceptions.JedisDataException;
+import redis.clients.jedis.util.Pool;
+
/**
* {@code RedisConnection} implementation on top of Jedis library.
*
@@ -78,18 +99,23 @@
*/
public class JedisConnection extends AbstractRedisConnection {
- private final Log LOGGER = LogFactory.getLog(getClass());
+ private static final ExceptionTranslationStrategy EXCEPTION_TRANSLATION =
+ new FallbackExceptionTranslationStrategy(JedisExceptionConverter.INSTANCE);
- private static final ExceptionTranslationStrategy EXCEPTION_TRANSLATION = new FallbackExceptionTranslationStrategy(
- JedisExceptionConverter.INSTANCE);
+ private boolean convertPipelineAndTxResults = true;
private final Jedis jedis;
+ private final JedisClientConfig sentinelConfig;
+
private final JedisInvoker invoker = new JedisInvoker((directFunction, pipelineFunction, converter,
nullDefault) -> doInvoke(false, directFunction, pipelineFunction, converter, nullDefault));
+
private final JedisInvoker statusInvoker = new JedisInvoker((directFunction, pipelineFunction, converter,
nullDefault) -> doInvoke(true, directFunction, pipelineFunction, converter, nullDefault));
+ private volatile @Nullable JedisSubscription subscription;
+
private final JedisGeoCommands geoCommands = new JedisGeoCommands(this);
private final JedisHashCommands hashCommands = new JedisHashCommands(this);
private final JedisHyperLogLogCommands hllCommands = new JedisHyperLogLogCommands(this);
@@ -102,62 +128,58 @@ public class JedisConnection extends AbstractRedisConnection {
private final JedisStringCommands stringCommands = new JedisStringCommands(this);
private final JedisZSetCommands zSetCommands = new JedisZSetCommands(this);
- private final @Nullable Pool pool;
- private final JedisClientConfig sentinelConfig;
+ private final Log LOGGER = LogFactory.getLog(getClass());
private List pipelinedResults = new ArrayList<>();
+
+ private final @Nullable Pool pool;
+
private Queue>> txResults = new LinkedList<>();
- private volatile @Nullable JedisSubscription subscription;
- private volatile @Nullable Transaction transaction;
private volatile @Nullable Pipeline pipeline;
- private boolean convertPipelineAndTxResults = true;
+ private volatile @Nullable Transaction transaction;
/**
- * Constructs a new JedisConnection
instance.
+ * Constructs a new {@link JedisConnection}.
*
- * @param jedis Jedis entity
+ * @param jedis {@link Jedis} client.
*/
public JedisConnection(Jedis jedis) {
this(jedis, null, 0);
}
/**
- * Constructs a new JedisConnection
instance backed by a jedis pool.
+ * Constructs a new <{@link JedisConnection} backed by a Jedis {@link Pool}.
*
- * @param jedis
- * @param pool can be null, if no pool is used
- * @param dbIndex
+ * @param jedis {@link Jedis} client.
+ * @param pool {@link Pool} of Redis connections; can be null, if no pool is used.
+ * @param dbIndex {@link Integer index} of the Redis database to use.
*/
public JedisConnection(Jedis jedis, Pool pool, int dbIndex) {
this(jedis, pool, dbIndex, null);
}
/**
- * Constructs a new JedisConnection
instance backed by a jedis pool.
+ * Constructs a new <{@link JedisConnection} backed by a Jedis {@link Pool}.
*
- * @param jedis
- * @param pool can be null, if no pool is used
- * @param dbIndex
- * @param clientName the client name, can be {@literal null}.
+ * @param jedis {@link Jedis} client.
+ * @param pool {@link Pool} of Redis connections; can be null, if no pool is used.
+ * @param dbIndex {@link Integer index} of the Redis database to use.
+ * @param clientName {@link String name} given to this client; can be {@literal null}.
* @since 1.8
*/
protected JedisConnection(Jedis jedis, @Nullable Pool pool, int dbIndex, @Nullable String clientName) {
this(jedis, pool, createConfig(dbIndex, clientName), createConfig(dbIndex, clientName));
}
- private static DefaultJedisClientConfig createConfig(int dbIndex, @Nullable String clientName) {
- return DefaultJedisClientConfig.builder().database(dbIndex).clientName(clientName).build();
- }
-
/**
- * Constructs a new JedisConnection
instance backed by a jedis pool.
+ * Constructs a new <{@link JedisConnection} backed by a Jedis {@link Pool}.
*
- * @param jedis
- * @param pool can be null, if no pool is used
- * @param nodeConfig node configuration
- * @param sentinelConfig sentinel configuration
+ * @param jedis {@link Jedis} client.
+ * @param pool {@link Pool} of Redis connections; can be null, if no pool is used.
+ * @param nodeConfig {@literal Redis Node} configuration
+ * @param sentinelConfig {@literal Redis Sentinel} configuration
* @since 2.5
*/
protected JedisConnection(Jedis jedis, @Nullable Pool pool, JedisClientConfig nodeConfig,
@@ -173,13 +195,17 @@ protected JedisConnection(Jedis jedis, @Nullable Pool pool, JedisClientCo
if (nodeConfig.getDatabase() != jedis.getDB()) {
try {
select(nodeConfig.getDatabase());
- } catch (DataAccessException ex) {
+ } catch (DataAccessException cause) {
close();
- throw ex;
+ throw cause;
}
}
}
+ private static DefaultJedisClientConfig createConfig(int dbIndex, @Nullable String clientName) {
+ return DefaultJedisClientConfig.builder().database(dbIndex).clientName(clientName).build();
+ }
+
@Nullable
private Object doInvoke(boolean status, Function directFunction,
Function> pipelineFunction, Converter converter,
@@ -211,9 +237,9 @@ private Object doInvoke(boolean status, Function directFunction,
});
}
- protected DataAccessException convertJedisAccessException(Exception ex) {
- DataAccessException exception = EXCEPTION_TRANSLATION.translate(ex);
- return exception != null ? exception : new RedisSystemException(ex.getMessage(), ex);
+ protected DataAccessException convertJedisAccessException(Exception cause) {
+ DataAccessException exception = EXCEPTION_TRANSLATION.translate(cause);
+ return exception != null ? exception : new RedisSystemException(cause.getMessage(), cause);
}
@Override
@@ -290,6 +316,7 @@ public Object execute(String command, byte[]... args) {
CommandArguments arguments = new CommandArguments(protocolCommand).addObjects(args);
CommandObject commandObject = new CommandObject<>(arguments, BuilderFactory.RAW_OBJECT);
+
if (isPipelined()) {
pipeline(newJedisResult(getRequiredPipeline().executeCommand(commandObject)));
} else {
@@ -308,64 +335,42 @@ public void close() throws DataAccessException {
super.close();
JedisSubscription subscription = this.subscription;
- try {
- if (subscription != null) {
- subscription.close();
- }
- } catch (Exception ex) {
- LOGGER.debug("Cannot terminate subscription", ex);
- } finally {
- this.subscription = null;
- }
- // return the connection to the pool
- if (pool != null) {
- jedis.close();
- return;
+ if (subscription != null) {
+ doExceptionThrowingOperationSafely(subscription::close, "Cannot terminate subscription");
+ this.subscription = null;
}
- // else close the connection normally (doing the try/catch dance)
+ Jedis jedis = getJedis();
- try {
- jedis.quit();
- } catch (Exception ex) {
- LOGGER.debug("Failed to QUIT during close", ex);
- }
-
- try {
- jedis.disconnect();
- } catch (Exception ex) {
- LOGGER.debug("Failed to disconnect during close", ex);
+ // Return connection to the pool
+ if (this.pool != null) {
+ jedis.close();
}
- }
-
- private Exception handleCloseException(@Nullable Exception exceptionToThrow, Exception cause) {
-
- if (exceptionToThrow == null) {
- return cause;
+ else {
+ doExceptionThrowingOperationSafely(jedis::quit, "Failed to quit during close");
+ doExceptionThrowingOperationSafely(jedis::disconnect, "Failed to disconnect during close");
}
-
- return exceptionToThrow;
}
@Override
public Jedis getNativeConnection() {
- return jedis;
+ return this.jedis;
}
@Override
public boolean isClosed() {
- return doWithJedis(it -> !it.isConnected());
+ return !Boolean.TRUE.equals(doWithJedis(Jedis::isConnected));
}
@Override
public boolean isQueueing() {
- return transaction != null;
+ return this.transaction != null;
}
@Override
public boolean isPipelined() {
- return pipeline != null;
+ return this.pipeline != null;
}
@Override
@@ -382,6 +387,7 @@ public void openPipeline() {
@Override
public List closePipeline() {
+
if (pipeline != null) {
try {
return convertPipelineResults();
@@ -390,14 +396,19 @@ public List closePipeline() {
pipelinedResults.clear();
}
}
+
return Collections.emptyList();
}
private List convertPipelineResults() {
+
List results = new ArrayList<>();
+
getRequiredPipeline().sync();
+
Exception cause = null;
- for (JedisResult result : pipelinedResults) {
+
+ for (JedisResult, ?> result : pipelinedResults) {
try {
Object data = result.get();
@@ -418,13 +429,16 @@ private List convertPipelineResults() {
results.add(e);
}
}
+
if (cause != null) {
throw new RedisPipelineException(cause, results);
}
+
return results;
}
- void pipeline(JedisResult result) {
+ void pipeline(JedisResult, ?> result) {
+
if (isQueueing()) {
transaction(result);
} else {
@@ -441,7 +455,7 @@ public byte[] echo(byte[] message) {
Assert.notNull(message, "Message must not be null");
- return invoke().just(j -> j.echo(message));
+ return invoke().just(jedis -> jedis.echo(message));
}
@Override
@@ -451,6 +465,7 @@ public String ping() {
@Override
public void discard() {
+
try {
getRequiredTransaction().discard();
} catch (Exception ex) {
@@ -463,8 +478,8 @@ public void discard() {
@Override
public List exec() {
- try {
+ try {
if (transaction == null) {
throw new InvalidDataAccessApiUsageException("No ongoing transaction; Did you forget to call multi");
}
@@ -474,8 +489,8 @@ public List exec() {
return !CollectionUtils.isEmpty(results)
? new TransactionResultConverter<>(txResults, JedisExceptionConverter.INSTANCE).convert(results)
: results;
- } catch (Exception ex) {
- throw convertJedisAccessException(ex);
+ } catch (Exception cause) {
+ throw convertJedisAccessException(cause);
} finally {
txResults.clear();
transaction = null;
@@ -484,38 +499,34 @@ public List exec() {
@Nullable
public Pipeline getPipeline() {
- return pipeline;
+ return this.pipeline;
}
public Pipeline getRequiredPipeline() {
Pipeline pipeline = getPipeline();
- if (pipeline == null) {
- throw new IllegalStateException("Connection has no active pipeline");
- }
+ Assert.state(pipeline != null, "Connection has no active pipeline");
return pipeline;
}
@Nullable
public Transaction getTransaction() {
- return transaction;
+ return this.transaction;
}
public Transaction getRequiredTransaction() {
Transaction transaction = getTransaction();
- if (transaction == null) {
- throw new IllegalStateException("Connection has no active transaction");
- }
+ Assert.state(transaction != null, "Connection has no active transaction");
return transaction;
}
public Jedis getJedis() {
- return jedis;
+ return this.jedis;
}
/**
@@ -525,7 +536,7 @@ public Jedis getJedis() {
* @since 2.5
*/
JedisInvoker invoke() {
- return invoker;
+ return this.invoker;
}
/**
@@ -536,7 +547,7 @@ JedisInvoker invoke() {
* @since 2.5
*/
JedisInvoker invokeStatus() {
- return statusInvoker;
+ return this.statusInvoker;
}
JedisResult newJedisResult(Response response) {
@@ -555,6 +566,7 @@ JedisStatusResult newStatusResult(Response response) {
@Override
public void multi() {
+
if (isQueueing()) {
return;
}
@@ -563,8 +575,8 @@ public void multi() {
throw new InvalidDataAccessApiUsageException("Cannot use Transaction while a pipeline is open");
}
- doWithJedis(it -> {
- this.transaction = it.multi();
+ doWithJedis(jedis -> {
+ this.transaction = jedis.multi();
});
}
@@ -580,13 +592,14 @@ public void unwatch() {
@Override
public void watch(byte[]... keys) {
+
if (isQueueing()) {
throw new InvalidDataAccessApiUsageException("WATCH is not supported when a transaction is active");
}
- doWithJedis(it -> {
+ doWithJedis(jedis -> {
for (byte[] key : keys) {
- it.watch(key);
+ jedis.watch(key);
}
});
}
@@ -597,17 +610,18 @@ public void watch(byte[]... keys) {
@Override
public Long publish(byte[] channel, byte[] message) {
- return invoke().just(j -> j.publish(channel, message));
+ return invoke().just(jedis -> jedis.publish(channel, message));
}
@Override
public Subscription getSubscription() {
- return subscription;
+ return this.subscription;
}
@Override
public boolean isSubscribed() {
- return (subscription != null && subscription.isAlive());
+ Subscription subscription = getSubscription();
+ return subscription != null && subscription.isAlive();
}
@Override
@@ -666,11 +680,12 @@ public void setConvertPipelineAndTxResults(boolean convertPipelineAndTxResults)
protected boolean isActive(RedisNode node) {
Jedis verification = null;
+
try {
verification = getJedis(node);
verification.connect();
return verification.ping().equalsIgnoreCase("pong");
- } catch (Exception e) {
+ } catch (Exception cause) {
return false;
} finally {
if (verification != null) {
@@ -694,9 +709,8 @@ private T doWithJedis(Function callback) {
try {
return callback.apply(getJedis());
-
- } catch (Exception ex) {
- throw convertJedisAccessException(ex);
+ } catch (Exception cause) {
+ throw convertJedisAccessException(cause);
}
}
@@ -704,9 +718,26 @@ private void doWithJedis(Consumer callback) {
try {
callback.accept(getJedis());
- } catch (Exception ex) {
- throw convertJedisAccessException(ex);
+ } catch (Exception cause) {
+ throw convertJedisAccessException(cause);
+ }
+ }
+
+ private void doExceptionThrowingOperationSafely(ExceptionThrowingOperation operation, String logMessage) {
+
+ try {
+ operation.run();
+ }
+ catch (Exception cause) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(logMessage, cause);
+ }
}
}
+
+ @FunctionalInterface
+ private interface ExceptionThrowingOperation {
+ void run() throws Exception;
+ }
}
diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisConnectionFactory.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisConnectionFactory.java
index abcaa1fd4f..b231900022 100644
--- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisConnectionFactory.java
+++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisConnectionFactory.java
@@ -46,6 +46,7 @@
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.SmartLifecycle;
+import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.InvalidDataAccessApiUsageException;
import org.springframework.dao.InvalidDataAccessResourceUsageException;
@@ -98,19 +99,34 @@ public class JedisConnectionFactory
implements RedisConnectionFactory, InitializingBean, DisposableBean, SmartLifecycle {
private static final Log log = LogFactory.getLog(JedisConnectionFactory.class);
+
private static final ExceptionTranslationStrategy EXCEPTION_TRANSLATION = new PassThroughExceptionTranslationStrategy(
JedisExceptionConverter.INSTANCE);
+ private boolean convertPipelineAndTxResults = true;
+
+ private int phase = 0; // in between min and max values
+
+ private final AtomicReference state = new AtomicReference<>(State.CREATED);
+
+ private @Nullable ClusterCommandExecutor clusterCommandExecutor;
+
+ private @Nullable AsyncTaskExecutor executor;
+
+ private @Nullable ClusterTopologyProvider topologyProvider;
+
+ private JedisClientConfig clientConfig = DefaultJedisClientConfig.builder().build();
+
private final JedisClientConfiguration clientConfiguration;
- private RedisStandaloneConfiguration standaloneConfig = new RedisStandaloneConfiguration("localhost",
- Protocol.DEFAULT_PORT);
+ private @Nullable JedisCluster cluster;
- private @Nullable RedisConfiguration configuration;
+ private @Nullable Pool pool;
- private int phase = 0; // in between min and max values
+ private @Nullable RedisConfiguration configuration;
- private boolean convertPipelineAndTxResults = true;
+ private RedisStandaloneConfiguration standaloneConfig = new RedisStandaloneConfiguration("localhost",
+ Protocol.DEFAULT_PORT);
/**
* Lifecycle state of this factory.
@@ -119,15 +135,6 @@ enum State {
CREATED, STARTING, STARTED, STOPPING, STOPPED, DESTROYED;
}
- private final AtomicReference state = new AtomicReference<>(State.CREATED);
-
- private JedisClientConfig clientConfig = DefaultJedisClientConfig.builder().build();
-
- private @Nullable Pool pool;
- private @Nullable JedisCluster cluster;
- private @Nullable ClusterTopologyProvider topologyProvider;
- private @Nullable ClusterCommandExecutor clusterCommandExecutor;
-
/**
* Constructs a new {@link JedisConnectionFactory} instance with default settings (default connection pooling).
*/
@@ -138,14 +145,14 @@ public JedisConnectionFactory() {
/**
* Constructs a new {@link JedisConnectionFactory} instance given {@link JedisClientConfiguration}.
*
- * @param clientConfig must not be {@literal null}
+ * @param clientConfiguration must not be {@literal null}
* @since 2.0
*/
- private JedisConnectionFactory(JedisClientConfiguration clientConfig) {
+ private JedisConnectionFactory(JedisClientConfiguration clientConfiguration) {
- Assert.notNull(clientConfig, "JedisClientConfiguration must not be null");
+ Assert.notNull(clientConfiguration, "JedisClientConfiguration must not be null");
- this.clientConfiguration = clientConfig;
+ this.clientConfiguration = clientConfiguration;
}
/**
@@ -158,729 +165,760 @@ public JedisConnectionFactory(JedisPoolConfig poolConfig) {
}
/**
- * Constructs a new {@link JedisConnectionFactory} instance using the given {@link JedisPoolConfig} applied to
- * {@link JedisSentinelPool}.
+ * Constructs a new {@link JedisConnectionFactory} instance using the given {@link RedisClusterConfiguration} applied
+ * to create a {@link JedisCluster}.
*
- * @param sentinelConfig must not be {@literal null}.
- * @since 1.4
+ * @param clusterConfiguration must not be {@literal null}.
+ * @since 1.7
*/
- public JedisConnectionFactory(RedisSentinelConfiguration sentinelConfig) {
- this(sentinelConfig, new MutableJedisClientConfiguration());
+ public JedisConnectionFactory(RedisClusterConfiguration clusterConfiguration) {
+ this(clusterConfiguration, new MutableJedisClientConfiguration());
}
/**
- * Constructs a new {@link JedisConnectionFactory} instance using the given {@link JedisPoolConfig} applied to
- * {@link JedisSentinelPool}.
+ * Constructs a new {@link JedisConnectionFactory} instance using the given {@link RedisClusterConfiguration} and
+ * {@link JedisClientConfiguration}.
*
- * @param sentinelConfig the sentinel configuration to use.
- * @param poolConfig pool configuration. Defaulted to new instance if {@literal null}.
- * @since 1.4
+ * @param clusterConfiguration must not be {@literal null}.
+ * @param clientConfiguration must not be {@literal null}.
+ * @since 2.0
*/
- public JedisConnectionFactory(RedisSentinelConfiguration sentinelConfig, @Nullable JedisPoolConfig poolConfig) {
+ public JedisConnectionFactory(RedisClusterConfiguration clusterConfiguration,
+ JedisClientConfiguration clientConfiguration) {
- this.configuration = sentinelConfig;
- this.clientConfiguration = MutableJedisClientConfiguration
- .create(poolConfig != null ? poolConfig : new JedisPoolConfig());
- }
+ this(clientConfiguration);
- /**
- * Constructs a new {@link JedisConnectionFactory} instance using the given {@link RedisClusterConfiguration} applied
- * to create a {@link JedisCluster}.
- *
- * @param clusterConfig must not be {@literal null}.
- * @since 1.7
- */
- public JedisConnectionFactory(RedisClusterConfiguration clusterConfig) {
- this(clusterConfig, new MutableJedisClientConfiguration());
+ Assert.notNull(clusterConfiguration, "RedisClusterConfiguration must not be null");
+
+ this.configuration = clusterConfiguration;
}
/**
* Constructs a new {@link JedisConnectionFactory} instance using the given {@link RedisClusterConfiguration} applied
* to create a {@link JedisCluster}.
*
- * @param clusterConfig must not be {@literal null}.
+ * @param clusterConfiguration must not be {@literal null}.
* @since 1.7
*/
- public JedisConnectionFactory(RedisClusterConfiguration clusterConfig, JedisPoolConfig poolConfig) {
+ public JedisConnectionFactory(RedisClusterConfiguration clusterConfiguration, JedisPoolConfig poolConfig) {
- Assert.notNull(clusterConfig, "RedisClusterConfiguration must not be null");
+ Assert.notNull(clusterConfiguration, "RedisClusterConfiguration must not be null");
- this.configuration = clusterConfig;
+ this.configuration = clusterConfiguration;
this.clientConfiguration = MutableJedisClientConfiguration.create(poolConfig);
}
/**
- * Constructs a new {@link JedisConnectionFactory} instance using the given {@link RedisStandaloneConfiguration}.
+ * Constructs a new {@link JedisConnectionFactory} instance using the given {@link JedisPoolConfig} applied to
+ * {@link JedisSentinelPool}.
*
- * @param standaloneConfig must not be {@literal null}.
- * @since 2.0
+ * @param sentinelConfiguration must not be {@literal null}.
+ * @since 1.4
*/
- public JedisConnectionFactory(RedisStandaloneConfiguration standaloneConfig) {
- this(standaloneConfig, new MutableJedisClientConfiguration());
+ public JedisConnectionFactory(RedisSentinelConfiguration sentinelConfiguration) {
+ this(sentinelConfiguration, new MutableJedisClientConfiguration());
}
/**
- * Constructs a new {@link JedisConnectionFactory} instance using the given {@link RedisStandaloneConfiguration} and
+ * Constructs a new {@link JedisConnectionFactory} instance using the given {@link RedisSentinelConfiguration} and
* {@link JedisClientConfiguration}.
*
- * @param standaloneConfig must not be {@literal null}.
- * @param clientConfig must not be {@literal null}.
+ * @param sentinelConfiguration must not be {@literal null}.
+ * @param clientConfiguration must not be {@literal null}.
* @since 2.0
*/
- public JedisConnectionFactory(RedisStandaloneConfiguration standaloneConfig, JedisClientConfiguration clientConfig) {
+ public JedisConnectionFactory(RedisSentinelConfiguration sentinelConfiguration,
+ JedisClientConfiguration clientConfiguration) {
- this(clientConfig);
+ this(clientConfiguration);
- Assert.notNull(standaloneConfig, "RedisStandaloneConfiguration must not be null");
+ Assert.notNull(sentinelConfiguration, "RedisSentinelConfiguration must not be null");
- this.standaloneConfig = standaloneConfig;
+ this.configuration = sentinelConfiguration;
}
/**
- * Constructs a new {@link JedisConnectionFactory} instance using the given {@link RedisSentinelConfiguration} and
- * {@link JedisClientConfiguration}.
+ * Constructs a new {@link JedisConnectionFactory} instance using the given {@link JedisPoolConfig} applied to
+ * {@link JedisSentinelPool}.
*
- * @param sentinelConfig must not be {@literal null}.
- * @param clientConfig must not be {@literal null}.
- * @since 2.0
+ * @param sentinelConfiguration the sentinel configuration to use.
+ * @param poolConfig pool configuration. Defaulted to new instance if {@literal null}.
+ * @since 1.4
*/
- public JedisConnectionFactory(RedisSentinelConfiguration sentinelConfig, JedisClientConfiguration clientConfig) {
+ public JedisConnectionFactory(RedisSentinelConfiguration sentinelConfiguration,
+ @Nullable JedisPoolConfig poolConfig) {
- this(clientConfig);
-
- Assert.notNull(sentinelConfig, "RedisSentinelConfiguration must not be null");
+ this.configuration = sentinelConfiguration;
+ this.clientConfiguration = MutableJedisClientConfiguration
+ .create(poolConfig != null ? poolConfig : new JedisPoolConfig());
+ }
- this.configuration = sentinelConfig;
+ /**
+ * Constructs a new {@link JedisConnectionFactory} instance using the given {@link RedisStandaloneConfiguration}.
+ *
+ * @param standaloneConfiguration must not be {@literal null}.
+ * @since 2.0
+ */
+ public JedisConnectionFactory(RedisStandaloneConfiguration standaloneConfiguration) {
+ this(standaloneConfiguration, new MutableJedisClientConfiguration());
}
/**
- * Constructs a new {@link JedisConnectionFactory} instance using the given {@link RedisClusterConfiguration} and
+ * Constructs a new {@link JedisConnectionFactory} instance using the given {@link RedisStandaloneConfiguration} and
* {@link JedisClientConfiguration}.
*
- * @param clusterConfig must not be {@literal null}.
- * @param clientConfig must not be {@literal null}.
+ * @param standaloneConfiguration must not be {@literal null}.
+ * @param clientConfiguration must not be {@literal null}.
* @since 2.0
*/
- public JedisConnectionFactory(RedisClusterConfiguration clusterConfig, JedisClientConfiguration clientConfig) {
+ public JedisConnectionFactory(RedisStandaloneConfiguration standaloneConfiguration,
+ JedisClientConfiguration clientConfiguration) {
- this(clientConfig);
+ this(clientConfiguration);
- Assert.notNull(clusterConfig, "RedisClusterConfiguration must not be null");
+ Assert.notNull(standaloneConfiguration, "RedisStandaloneConfiguration must not be null");
- this.configuration = clusterConfig;
+ this.standaloneConfig = standaloneConfiguration;
}
- @Override
- public void afterPropertiesSet() {
+ ClusterCommandExecutor getRequiredClusterCommandExecutor() {
- clientConfig = createClientConfig(getDatabase(), getRedisUsername(), getRedisPassword());
-
- if (isAutoStartup()) {
- start();
+ if (this.clusterCommandExecutor == null) {
+ throw new IllegalStateException("ClusterCommandExecutor not initialized");
}
- }
- JedisClientConfig createSentinelClientConfig(SentinelConfiguration sentinelConfiguration) {
- return createClientConfig(0, sentinelConfiguration.getSentinelUsername(),
- sentinelConfiguration.getSentinelPassword());
+ return this.clusterCommandExecutor;
}
- private JedisClientConfig createClientConfig(int database, @Nullable String username, RedisPassword password) {
-
- DefaultJedisClientConfig.Builder builder = DefaultJedisClientConfig.builder();
-
- clientConfiguration.getClientName().ifPresent(builder::clientName);
- builder.connectionTimeoutMillis(getConnectTimeout());
- builder.socketTimeoutMillis(getReadTimeout());
-
- builder.database(database);
-
- if (!ObjectUtils.isEmpty(username)) {
- builder.user(username);
- }
- password.toOptional().map(String::new).ifPresent(builder::password);
-
- if (isUseSsl()) {
-
- builder.ssl(true);
+ /**
+ * Configures the {@link AsyncTaskExecutor executor} used to execute commands asynchronously across the cluster.
+ *
+ * @param executor {@link AsyncTaskExecutor executor} used to execute commands asynchronously across the cluster.
+ * @since 3.2
+ */
+ public void setExecutor(AsyncTaskExecutor executor) {
- clientConfiguration.getSslSocketFactory().ifPresent(builder::sslSocketFactory);
- clientConfiguration.getHostnameVerifier().ifPresent(builder::hostnameVerifier);
- clientConfiguration.getSslParameters().ifPresent(builder::sslParameters);
- }
+ Assert.notNull(executor, "AsyncTaskExecutor must not be null");
- return builder.build();
+ this.executor = executor;
}
- @Override
- public void start() {
-
- State current = state
- .getAndUpdate(state -> State.CREATED.equals(state) || State.STOPPED.equals(state) ? State.STARTING : state);
-
- if (State.CREATED.equals(current) || State.STOPPED.equals(current)) {
-
- if (getUsePool() && !isRedisClusterAware()) {
- this.pool = createPool();
- }
-
- if (isRedisClusterAware()) {
-
- this.cluster = createCluster();
- this.topologyProvider = createTopologyProvider(this.cluster);
- this.clusterCommandExecutor = new ClusterCommandExecutor(this.topologyProvider,
- new JedisClusterConnection.JedisClusterNodeResourceProvider(this.cluster, this.topologyProvider),
- EXCEPTION_TRANSLATION);
- }
-
- state.set(State.STARTED);
- }
+ /**
+ * Returns the Redis hostname.
+ *
+ * @return the hostName.
+ */
+ public String getHostName() {
+ return standaloneConfig.getHostName();
}
- @Override
- public void stop() {
-
- if (state.compareAndSet(State.STARTED, State.STOPPING)) {
-
- if (getUsePool() && !isRedisClusterAware()) {
- if (pool != null) {
- try {
- this.pool.close();
- this.pool = null;
- } catch (Exception ex) {
- log.warn("Cannot properly close Jedis pool", ex);
- }
- }
- }
-
- if (this.clusterCommandExecutor != null) {
- try {
- this.clusterCommandExecutor.destroy();
- this.clusterCommandExecutor = null;
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- if (this.cluster != null) {
-
- this.topologyProvider = null;
-
- try {
- this.cluster.close();
- this.cluster = null;
- } catch (Exception ex) {
- log.warn("Cannot properly close Jedis cluster", ex);
- }
- }
- state.set(State.STOPPED);
- }
+ /**
+ * Sets the Redis hostname.
+ *
+ * @param hostName the hostname to set.
+ * @deprecated since 2.0, configure the hostname using {@link RedisStandaloneConfiguration}.
+ */
+ @Deprecated
+ public void setHostName(String hostName) {
+ standaloneConfig.setHostName(hostName);
}
- @Override
- public int getPhase() {
- return phase;
+ /**
+ * Returns whether to use SSL.
+ *
+ * @return use of SSL.
+ * @since 1.8
+ */
+ public boolean isUseSsl() {
+ return clientConfiguration.isUseSsl();
}
/**
- * Specify the lifecycle phase for pausing and resuming this executor. The default is {@code 0}.
+ * Sets whether to use SSL.
*
- * @since 3.2
- * @see SmartLifecycle#getPhase()
+ * @param useSsl {@literal true} to use SSL.
+ * @since 1.8
+ * @deprecated since 2.0, configure the SSL usage with {@link JedisClientConfiguration}.
+ * @throws IllegalStateException if {@link JedisClientConfiguration} is immutable.
*/
- public void setPhase(int phase) {
- this.phase = phase;
+ @Deprecated
+ public void setUseSsl(boolean useSsl) {
+ getMutableConfiguration().setUseSsl(useSsl);
}
- @Override
- public boolean isRunning() {
- return State.STARTED.equals(state.get());
+ /**
+ * Returns the password used for authenticating with the Redis server.
+ *
+ * @return password for authentication.
+ */
+ @Nullable
+ public String getPassword() {
+ return getRedisPassword().map(String::new).orElse(null);
}
- private Pool createPool() {
+ @Nullable
+ private String getRedisUsername() {
+ return RedisConfiguration.getUsernameOrElse(this.configuration, standaloneConfig::getUsername);
+ }
- if (isRedisSentinelAware()) {
- return createRedisSentinelPool((RedisSentinelConfiguration) this.configuration);
- }
- return createRedisPool();
+ private RedisPassword getRedisPassword() {
+ return RedisConfiguration.getPasswordOrElse(this.configuration, standaloneConfig::getPassword);
}
/**
- * Creates {@link JedisSentinelPool}.
+ * Sets the password used for authenticating with the Redis server.
*
- * @param config the actual {@link RedisSentinelConfiguration}. Never {@literal null}.
- * @return the {@link Pool} to use. Never {@literal null}.
- * @since 1.4
+ * @param password the password to set.
+ * @deprecated since 2.0, configure the password using {@link RedisStandaloneConfiguration},
+ * {@link RedisSentinelConfiguration} or {@link RedisClusterConfiguration}.
*/
- protected Pool createRedisSentinelPool(RedisSentinelConfiguration config) {
+ @Deprecated
+ public void setPassword(String password) {
- GenericObjectPoolConfig poolConfig = getPoolConfig() != null ? getPoolConfig() : new JedisPoolConfig();
+ if (RedisConfiguration.isAuthenticationAware(configuration)) {
- JedisClientConfig sentinelConfig = createSentinelClientConfig(config);
- return new JedisSentinelPool(config.getMaster().getName(), convertToJedisSentinelSet(config.getSentinels()),
- poolConfig, this.clientConfig, sentinelConfig);
+ ((WithPassword) configuration).setPassword(password);
+ return;
+ }
+
+ standaloneConfig.setPassword(RedisPassword.of(password));
}
/**
- * Creates {@link JedisPool}.
+ * Returns the port used to connect to the Redis instance.
*
- * @return the {@link Pool} to use. Never {@literal null}.
- * @since 1.4
+ * @return the Redis port.
*/
- protected Pool createRedisPool() {
- return new JedisPool(getPoolConfig(), new HostAndPort(getHostName(), getPort()), this.clientConfig);
- }
-
- private JedisCluster createCluster() {
- return createCluster((RedisClusterConfiguration) this.configuration, getPoolConfig());
+ public int getPort() {
+ return standaloneConfig.getPort();
}
/**
- * Template method to create a {@link ClusterTopologyProvider} given {@link JedisCluster}. Creates
- * {@link JedisClusterTopologyProvider} by default.
+ * Sets the port used to connect to the Redis instance.
*
- * @param cluster the {@link JedisCluster}, must not be {@literal null}.
- * @return the {@link ClusterTopologyProvider}.
- * @see JedisClusterTopologyProvider
- * @see 2.2
+ * @param port the Redis port.
+ * @deprecated since 2.0, configure the port using {@link RedisStandaloneConfiguration}.
*/
- protected ClusterTopologyProvider createTopologyProvider(JedisCluster cluster) {
- return new JedisClusterTopologyProvider(cluster);
+ @Deprecated
+ public void setPort(int port) {
+ standaloneConfig.setPort(port);
}
/**
- * Creates {@link JedisCluster} for given {@link RedisClusterConfiguration} and {@link GenericObjectPoolConfig}.
+ * Returns the timeout.
*
- * @param clusterConfig must not be {@literal null}.
- * @param poolConfig can be {@literal null}.
- * @return the actual {@link JedisCluster}.
- * @since 1.7
+ * @return the timeout.
*/
- protected JedisCluster createCluster(RedisClusterConfiguration clusterConfig,
- GenericObjectPoolConfig poolConfig) {
-
- Assert.notNull(clusterConfig, "Cluster configuration must not be null");
-
- Set hostAndPort = new HashSet<>();
- for (RedisNode node : clusterConfig.getClusterNodes()) {
- hostAndPort.add(new HostAndPort(node.getHost(), node.getPort()));
- }
-
- int redirects = clusterConfig.getMaxRedirects() != null ? clusterConfig.getMaxRedirects() : 5;
-
- return new JedisCluster(hostAndPort, this.clientConfig, redirects, poolConfig);
- }
-
- @Override
- public void destroy() {
-
- stop();
- state.set(State.DESTROYED);
- }
-
- @Override
- public RedisConnection getConnection() {
-
- assertInitialized();
-
- if (isRedisClusterAware()) {
- return getClusterConnection();
- }
-
- Jedis jedis = fetchJedisConnector();
- JedisClientConfig sentinelConfig = this.clientConfig;
-
- SentinelConfiguration sentinelConfiguration = getSentinelConfiguration();
- if (sentinelConfiguration != null) {
- sentinelConfig = createSentinelClientConfig(sentinelConfiguration);
- }
-
- JedisConnection connection = (getUsePool() ? new JedisConnection(jedis, pool, this.clientConfig, sentinelConfig)
- : new JedisConnection(jedis, null, this.clientConfig, sentinelConfig));
- connection.setConvertPipelineAndTxResults(convertPipelineAndTxResults);
-
- return postProcessConnection(connection);
+ public int getTimeout() {
+ return getReadTimeout();
}
/**
- * Returns a Jedis instance to be used as a Redis connection. The instance can be newly created or retrieved from a
- * pool.
+ * Sets the timeout.
*
- * @return Jedis instance ready for wrapping into a {@link RedisConnection}.
+ * @param timeout the timeout to set.
+ * @deprecated since 2.0, configure the timeout using {@link JedisClientConfiguration}.
+ * @throws IllegalStateException if {@link JedisClientConfiguration} is immutable.
*/
- protected Jedis fetchJedisConnector() {
- try {
-
- if (getUsePool() && pool != null) {
- return pool.getResource();
- }
-
- Jedis jedis = createJedis();
- // force initialization (see Jedis issue #82)
- jedis.connect();
-
- return jedis;
- } catch (Exception ex) {
- throw new RedisConnectionFailureException("Cannot get Jedis connection", ex);
- }
- }
+ @Deprecated
+ public void setTimeout(int timeout) {
- private Jedis createJedis() {
- return new Jedis(new HostAndPort(getHostName(), getPort()), this.clientConfig);
+ getMutableConfiguration().setReadTimeout(Duration.ofMillis(timeout));
+ getMutableConfiguration().setConnectTimeout(Duration.ofMillis(timeout));
}
/**
- * Post process a newly retrieved connection. Useful for decorating or executing initialization commands on a new
- * connection. This implementation simply returns the connection.
+ * Indicates the use of a connection pool.
+ *
+ * Applies only to single node Redis. Sentinel and Cluster modes use always connection-pooling regardless of the
+ * pooling setting.
*
- * @param connection the jedis connection.
- * @return processed connection
+ * @return the use of connection pooling.
*/
- protected JedisConnection postProcessConnection(JedisConnection connection) {
- return connection;
+ public boolean getUsePool() {
+ // Jedis Sentinel cannot operate without a pool.
+ return isRedisSentinelAware() || getClientConfiguration().isUsePooling();
}
- @Override
- public RedisClusterConnection getClusterConnection() {
-
- assertInitialized();
+ /**
+ * Turns on or off the use of connection pooling.
+ *
+ * @param usePool the usePool to set.
+ * @deprecated since 2.0, configure pooling usage with {@link JedisClientConfiguration}.
+ * @throws IllegalStateException if {@link JedisClientConfiguration} is immutable.
+ * @throws IllegalStateException if configured to use sentinel and {@code usePool} is {@literal false} as Jedis
+ * requires pooling for Redis sentinel use.
+ */
+ @Deprecated
+ public void setUsePool(boolean usePool) {
- if (!isRedisClusterAware()) {
- throw new InvalidDataAccessApiUsageException("Cluster is not configured");
+ if (isRedisSentinelAware() && !usePool) {
+ throw new IllegalStateException("Jedis requires pooling for Redis Sentinel use");
}
- return postProcessConnection(
- new JedisClusterConnection(this.cluster, this.clusterCommandExecutor, this.topologyProvider));
+ getMutableConfiguration().setUsePooling(usePool);
}
/**
- * Post process a newly retrieved connection. Useful for decorating or executing initialization commands on a new
- * connection. This implementation simply returns the connection.
+ * Returns the poolConfig.
*
- * @param connection the jedis connection.
- * @return processed connection.
- * @since 3.2
+ * @return the poolConfig
*/
- protected JedisClusterConnection postProcessConnection(JedisClusterConnection connection) {
- return connection;
+ @Nullable
+ public GenericObjectPoolConfig getPoolConfig() {
+ return clientConfiguration.getPoolConfig().orElse(null);
}
- @Override
- public DataAccessException translateExceptionIfPossible(RuntimeException ex) {
- return EXCEPTION_TRANSLATION.translate(ex);
+ /**
+ * Sets the pool configuration for this factory.
+ *
+ * @param poolConfig the poolConfig to set.
+ * @deprecated since 2.0, configure {@link JedisPoolConfig} using {@link JedisClientConfiguration}.
+ * @throws IllegalStateException if {@link JedisClientConfiguration} is immutable.
+ */
+ @Deprecated
+ public void setPoolConfig(JedisPoolConfig poolConfig) {
+ getMutableConfiguration().setPoolConfig(poolConfig);
}
/**
- * Returns the Redis hostname.
+ * Returns the index of the database.
*
- * @return the hostName.
+ * @return the database index.
*/
- public String getHostName() {
- return standaloneConfig.getHostName();
+ public int getDatabase() {
+ return RedisConfiguration.getDatabaseOrElse(configuration, standaloneConfig::getDatabase);
}
/**
- * Sets the Redis hostname.
+ * Sets the index of the database used by this connection factory. Default is 0.
*
- * @param hostName the hostname to set.
- * @deprecated since 2.0, configure the hostname using {@link RedisStandaloneConfiguration}.
+ * @param index database index.
+ * @deprecated since 2.0, configure the client name using {@link RedisSentinelConfiguration} or
+ * {@link RedisStandaloneConfiguration}.
*/
@Deprecated
- public void setHostName(String hostName) {
- standaloneConfig.setHostName(hostName);
+ public void setDatabase(int index) {
+
+ Assert.isTrue(index >= 0, "invalid DB index (a positive index required)");
+
+ if (RedisConfiguration.isDatabaseIndexAware(configuration)) {
+
+ ((WithDatabaseIndex) configuration).setDatabase(index);
+ return;
+ }
+
+ standaloneConfig.setDatabase(index);
}
/**
- * Returns whether to use SSL.
+ * Returns the client name.
*
- * @return use of SSL.
+ * @return the client name.
* @since 1.8
*/
- public boolean isUseSsl() {
- return clientConfiguration.isUseSsl();
+ @Nullable
+ public String getClientName() {
+ return clientConfiguration.getClientName().orElse(null);
}
/**
- * Sets whether to use SSL.
+ * Sets the client name used by this connection factory. Defaults to none which does not set a client name.
*
- * @param useSsl {@literal true} to use SSL.
+ * @param clientName the client name.
* @since 1.8
- * @deprecated since 2.0, configure the SSL usage with {@link JedisClientConfiguration}.
+ * @deprecated since 2.0, configure the client name using {@link JedisClientConfiguration}.
* @throws IllegalStateException if {@link JedisClientConfiguration} is immutable.
*/
@Deprecated
- public void setUseSsl(boolean useSsl) {
- getMutableConfiguration().setUseSsl(useSsl);
+ public void setClientName(String clientName) {
+ this.getMutableConfiguration().setClientName(clientName);
}
/**
- * Returns the password used for authenticating with the Redis server.
- *
- * @return password for authentication.
+ * @return the {@link JedisClientConfiguration}.
+ * @since 2.0
+ */
+ public JedisClientConfiguration getClientConfiguration() {
+ return this.clientConfiguration;
+ }
+
+ /**
+ * @return the {@link RedisStandaloneConfiguration}.
+ * @since 2.0
*/
@Nullable
- public String getPassword() {
- return getRedisPassword().map(String::new).orElse(null);
+ public RedisStandaloneConfiguration getStandaloneConfiguration() {
+ return this.standaloneConfig;
}
+ /**
+ * @return the {@link RedisStandaloneConfiguration}, may be {@literal null}.
+ * @since 2.0
+ */
@Nullable
- private String getRedisUsername() {
- return RedisConfiguration.getUsernameOrElse(this.configuration, standaloneConfig::getUsername);
+ public RedisSentinelConfiguration getSentinelConfiguration() {
+ return RedisConfiguration.isSentinelConfiguration(configuration) ? (RedisSentinelConfiguration) configuration
+ : null;
}
- private RedisPassword getRedisPassword() {
- return RedisConfiguration.getPasswordOrElse(this.configuration, standaloneConfig::getPassword);
+ /**
+ * @return the {@link RedisClusterConfiguration}, may be {@literal null}.
+ * @since 2.0
+ */
+ @Nullable
+ public RedisClusterConfiguration getClusterConfiguration() {
+ return RedisConfiguration.isClusterConfiguration(configuration) ? (RedisClusterConfiguration) configuration : null;
}
/**
- * Sets the password used for authenticating with the Redis server.
+ * Specifies if pipelined results should be converted to the expected data type. If {@code false}, results of
+ * {@link JedisConnection#closePipeline()} and {@link JedisConnection#exec()} will be of the type returned by the
+ * Jedis driver.
*
- * @param password the password to set.
- * @deprecated since 2.0, configure the password using {@link RedisStandaloneConfiguration},
- * {@link RedisSentinelConfiguration} or {@link RedisClusterConfiguration}.
+ * @return {@code true} to convert pipeline and transaction results; {@code false} otherwise.
*/
- @Deprecated
- public void setPassword(String password) {
-
- if (RedisConfiguration.isAuthenticationAware(configuration)) {
-
- ((WithPassword) configuration).setPassword(password);
- return;
- }
-
- standaloneConfig.setPassword(RedisPassword.of(password));
+ @Override
+ public boolean getConvertPipelineAndTxResults() {
+ return convertPipelineAndTxResults;
}
/**
- * Returns the port used to connect to the Redis instance.
+ * Specifies if pipelined results should be converted to the expected data type. If {@code false}, results of
+ * {@link JedisConnection#closePipeline()} and {@link JedisConnection#exec()} will be of the type returned by the
+ * Jedis driver.
*
- * @return the Redis port.
+ * @param convertPipelineAndTxResults {@code true} to convert pipeline and transaction results; {@code false}
+ * otherwise.
*/
- public int getPort() {
- return standaloneConfig.getPort();
+ public void setConvertPipelineAndTxResults(boolean convertPipelineAndTxResults) {
+ this.convertPipelineAndTxResults = convertPipelineAndTxResults;
}
/**
- * Sets the port used to connect to the Redis instance.
- *
- * @param port the Redis port.
- * @deprecated since 2.0, configure the port using {@link RedisStandaloneConfiguration}.
+ * @return true when {@link RedisSentinelConfiguration} is present.
+ * @since 1.4
*/
- @Deprecated
- public void setPort(int port) {
- standaloneConfig.setPort(port);
+ public boolean isRedisSentinelAware() {
+ return RedisConfiguration.isSentinelConfiguration(configuration);
}
/**
- * Returns the timeout.
- *
- * @return the timeout.
+ * @return true when {@link RedisClusterConfiguration} is present.
+ * @since 2.0
*/
- public int getTimeout() {
- return getReadTimeout();
+ public boolean isRedisClusterAware() {
+ return RedisConfiguration.isClusterConfiguration(configuration);
+ }
+
+ @Override
+ public void afterPropertiesSet() {
+
+ this.clientConfig = createClientConfig(getDatabase(), getRedisUsername(), getRedisPassword());
+
+ if (isAutoStartup()) {
+ start();
+ }
+ }
+
+ private JedisClientConfig createClientConfig(int database, @Nullable String username, RedisPassword password) {
+
+ DefaultJedisClientConfig.Builder builder = DefaultJedisClientConfig.builder();
+
+ this.clientConfiguration.getClientName().ifPresent(builder::clientName);
+ builder.connectionTimeoutMillis(getConnectTimeout());
+ builder.socketTimeoutMillis(getReadTimeout());
+
+ builder.database(database);
+
+ if (!ObjectUtils.isEmpty(username)) {
+ builder.user(username);
+ }
+ password.toOptional().map(String::new).ifPresent(builder::password);
+
+ if (isUseSsl()) {
+
+ builder.ssl(true);
+
+ this.clientConfiguration.getSslSocketFactory().ifPresent(builder::sslSocketFactory);
+ this.clientConfiguration.getHostnameVerifier().ifPresent(builder::hostnameVerifier);
+ this.clientConfiguration.getSslParameters().ifPresent(builder::sslParameters);
+ }
+
+ return builder.build();
+ }
+
+ JedisClientConfig createSentinelClientConfig(SentinelConfiguration sentinelConfiguration) {
+ return createClientConfig(0, sentinelConfiguration.getSentinelUsername(),
+ sentinelConfiguration.getSentinelPassword());
+ }
+
+ @Override
+ public void start() {
+
+ State current = this.state.getAndUpdate(state -> isCreatedOrStopped(state) ? State.STARTING : state);
+
+ if (isCreatedOrStopped(current)) {
+
+ if (getUsePool() && !isRedisClusterAware()) {
+ this.pool = createPool();
+ }
+
+ if (isRedisClusterAware()) {
+
+ this.cluster = createCluster(getClusterConfiguration(), getPoolConfig());
+ this.topologyProvider = createTopologyProvider(this.cluster);
+ this.clusterCommandExecutor = new ClusterCommandExecutor(this.topologyProvider,
+ new JedisClusterConnection.JedisClusterNodeResourceProvider(this.cluster, this.topologyProvider),
+ EXCEPTION_TRANSLATION, executor);
+ }
+
+ this.state.set(State.STARTED);
+ }
+ }
+
+ private boolean isCreatedOrStopped(@Nullable State state) {
+ return State.CREATED.equals(state) || State.STOPPED.equals(state);
+ }
+
+ @Override
+ public void stop() {
+
+ if (this.state.compareAndSet(State.STARTED, State.STOPPING)) {
+
+ if (getUsePool() && !isRedisClusterAware()) {
+ if (this.pool != null) {
+ try {
+ this.pool.close();
+ this.pool = null;
+ } catch (Exception ex) {
+ log.warn("Cannot properly close Jedis pool", ex);
+ }
+ }
+ }
+
+ ClusterCommandExecutor clusterCommandExecutor = this.clusterCommandExecutor;
+
+ if (clusterCommandExecutor != null) {
+ try {
+ clusterCommandExecutor.destroy();
+ this.clusterCommandExecutor = null;
+ } catch (Exception cause) {
+ throw new RuntimeException(cause);
+ }
+ }
+
+ if (this.cluster != null) {
+
+ this.topologyProvider = null;
+
+ try {
+ this.cluster.close();
+ this.cluster = null;
+ } catch (Exception cause) {
+ log.warn("Cannot properly close Jedis cluster", cause);
+ }
+ }
+
+ this.state.set(State.STOPPED);
+ }
+ }
+
+ @Override
+ public int getPhase() {
+ return this.phase;
}
/**
- * Sets the timeout.
+ * Specify the lifecycle phase for pausing and resuming this executor. The default is {@code 0}.
*
- * @param timeout the timeout to set.
- * @deprecated since 2.0, configure the timeout using {@link JedisClientConfiguration}.
- * @throws IllegalStateException if {@link JedisClientConfiguration} is immutable.
+ * @since 3.2
+ * @see SmartLifecycle#getPhase()
*/
- @Deprecated
- public void setTimeout(int timeout) {
+ public void setPhase(int phase) {
+ this.phase = phase;
+ }
- getMutableConfiguration().setReadTimeout(Duration.ofMillis(timeout));
- getMutableConfiguration().setConnectTimeout(Duration.ofMillis(timeout));
+ @Override
+ public boolean isRunning() {
+ return State.STARTED.equals(this.state.get());
}
- /**
- * Indicates the use of a connection pool.
- *
- * Applies only to single node Redis. Sentinel and Cluster modes use always connection-pooling regardless of the
- * pooling setting.
- *
- * @return the use of connection pooling.
- */
- public boolean getUsePool() {
+ private Pool createPool() {
- // Jedis Sentinel cannot operate without a pool.
if (isRedisSentinelAware()) {
- return true;
+ return createRedisSentinelPool(getSentinelConfiguration());
}
-
- return clientConfiguration.isUsePooling();
+ return createRedisPool();
}
/**
- * Turns on or off the use of connection pooling.
+ * Creates {@link JedisSentinelPool}.
*
- * @param usePool the usePool to set.
- * @deprecated since 2.0, configure pooling usage with {@link JedisClientConfiguration}.
- * @throws IllegalStateException if {@link JedisClientConfiguration} is immutable.
- * @throws IllegalStateException if configured to use sentinel and {@code usePool} is {@literal false} as Jedis
- * requires pooling for Redis sentinel use.
+ * @param config the actual {@link RedisSentinelConfiguration}. Never {@literal null}.
+ * @return the {@link Pool} to use. Never {@literal null}.
+ * @since 1.4
*/
- @Deprecated
- public void setUsePool(boolean usePool) {
+ protected Pool createRedisSentinelPool(RedisSentinelConfiguration config) {
- if (isRedisSentinelAware() && !usePool) {
- throw new IllegalStateException("Jedis requires pooling for Redis Sentinel use");
- }
+ GenericObjectPoolConfig poolConfig = getPoolConfig() != null ? getPoolConfig() : new JedisPoolConfig();
- getMutableConfiguration().setUsePooling(usePool);
- }
+ JedisClientConfig sentinelConfig = createSentinelClientConfig(config);
- /**
- * Returns the poolConfig.
- *
- * @return the poolConfig
- */
- @Nullable
- public GenericObjectPoolConfig getPoolConfig() {
- return clientConfiguration.getPoolConfig().orElse(null);
+ return new JedisSentinelPool(config.getMaster().getName(), convertToJedisSentinelSet(config.getSentinels()),
+ poolConfig, this.clientConfig, sentinelConfig);
}
/**
- * Sets the pool configuration for this factory.
+ * Creates {@link JedisPool}.
*
- * @param poolConfig the poolConfig to set.
- * @deprecated since 2.0, configure {@link JedisPoolConfig} using {@link JedisClientConfiguration}.
- * @throws IllegalStateException if {@link JedisClientConfiguration} is immutable.
+ * @return the {@link Pool} to use. Never {@literal null}.
+ * @since 1.4
*/
- @Deprecated
- public void setPoolConfig(JedisPoolConfig poolConfig) {
- getMutableConfiguration().setPoolConfig(poolConfig);
+ protected Pool createRedisPool() {
+ return new JedisPool(getPoolConfig(), new HostAndPort(getHostName(), getPort()), this.clientConfig);
}
/**
- * Returns the index of the database.
+ * Template method to create a {@link ClusterTopologyProvider} given {@link JedisCluster}. Creates
+ * {@link JedisClusterTopologyProvider} by default.
*
- * @return the database index.
+ * @param cluster the {@link JedisCluster}, must not be {@literal null}.
+ * @return the {@link ClusterTopologyProvider}.
+ * @see JedisClusterTopologyProvider
+ * @see 2.2
*/
- public int getDatabase() {
- return RedisConfiguration.getDatabaseOrElse(configuration, standaloneConfig::getDatabase);
+ protected ClusterTopologyProvider createTopologyProvider(JedisCluster cluster) {
+ return new JedisClusterTopologyProvider(cluster);
}
/**
- * Sets the index of the database used by this connection factory. Default is 0.
+ * Creates {@link JedisCluster} for given {@link RedisClusterConfiguration} and {@link GenericObjectPoolConfig}.
*
- * @param index database index.
- * @deprecated since 2.0, configure the client name using {@link RedisSentinelConfiguration} or
- * {@link RedisStandaloneConfiguration}.
+ * @param clusterConfig must not be {@literal null}.
+ * @param poolConfig can be {@literal null}.
+ * @return the actual {@link JedisCluster}.
+ * @since 1.7
*/
- @Deprecated
- public void setDatabase(int index) {
+ protected JedisCluster createCluster(RedisClusterConfiguration clusterConfig,
+ GenericObjectPoolConfig poolConfig) {
- Assert.isTrue(index >= 0, "invalid DB index (a positive index required)");
+ Assert.notNull(clusterConfig, "Cluster configuration must not be null");
- if (RedisConfiguration.isDatabaseIndexAware(configuration)) {
+ Set hostAndPort = new HashSet<>();
- ((WithDatabaseIndex) configuration).setDatabase(index);
- return;
+ for (RedisNode node : clusterConfig.getClusterNodes()) {
+ hostAndPort.add(new HostAndPort(node.getHost(), node.getPort()));
}
- standaloneConfig.setDatabase(index);
- }
+ int redirects = clusterConfig.getMaxRedirects() != null ? clusterConfig.getMaxRedirects() : 5;
- /**
- * Returns the client name.
- *
- * @return the client name.
- * @since 1.8
- */
- @Nullable
- public String getClientName() {
- return clientConfiguration.getClientName().orElse(null);
+ return new JedisCluster(hostAndPort, this.clientConfig, redirects, poolConfig);
}
- /**
- * Sets the client name used by this connection factory. Defaults to none which does not set a client name.
- *
- * @param clientName the client name.
- * @since 1.8
- * @deprecated since 2.0, configure the client name using {@link JedisClientConfiguration}.
- * @throws IllegalStateException if {@link JedisClientConfiguration} is immutable.
- */
- @Deprecated
- public void setClientName(String clientName) {
- this.getMutableConfiguration().setClientName(clientName);
- }
+ @Override
+ public void destroy() {
- /**
- * @return the {@link JedisClientConfiguration}.
- * @since 2.0
- */
- public JedisClientConfiguration getClientConfiguration() {
- return clientConfiguration;
+ stop();
+ state.set(State.DESTROYED);
}
- /**
- * @return the {@link RedisStandaloneConfiguration}.
- * @since 2.0
- */
- @Nullable
- public RedisStandaloneConfiguration getStandaloneConfiguration() {
- return standaloneConfig;
+ @Override
+ public RedisConnection getConnection() {
+
+ assertInitialized();
+
+ if (isRedisClusterAware()) {
+ return getClusterConnection();
+ }
+
+ Jedis jedis = fetchJedisConnector();
+ JedisClientConfig sentinelConfig = this.clientConfig;
+
+ SentinelConfiguration sentinelConfiguration = getSentinelConfiguration();
+
+ if (sentinelConfiguration != null) {
+ sentinelConfig = createSentinelClientConfig(sentinelConfiguration);
+ }
+
+ JedisConnection connection = getUsePool() ? new JedisConnection(jedis, this.pool, this.clientConfig, sentinelConfig)
+ : new JedisConnection(jedis, null, this.clientConfig, sentinelConfig);
+
+ connection.setConvertPipelineAndTxResults(convertPipelineAndTxResults);
+
+ return postProcessConnection(connection);
}
/**
- * @return the {@link RedisStandaloneConfiguration}, may be {@literal null}.
- * @since 2.0
+ * Returns a Jedis instance to be used as a Redis connection. The instance can be newly created or retrieved from a
+ * pool.
+ *
+ * @return Jedis instance ready for wrapping into a {@link RedisConnection}.
*/
- @Nullable
- public RedisSentinelConfiguration getSentinelConfiguration() {
- return RedisConfiguration.isSentinelConfiguration(configuration) ? (RedisSentinelConfiguration) configuration
- : null;
+ protected Jedis fetchJedisConnector() {
+
+ try {
+
+ if (getUsePool() && this.pool != null) {
+ return this.pool.getResource();
+ }
+
+ Jedis jedis = createJedis();
+
+ // force initialization (see Jedis issue #82)
+ jedis.connect();
+
+ return jedis;
+ } catch (Exception cause) {
+ throw new RedisConnectionFailureException("Cannot get Jedis connection", cause);
+ }
}
- /**
- * @return the {@link RedisClusterConfiguration}, may be {@literal null}.
- * @since 2.0
- */
- @Nullable
- public RedisClusterConfiguration getClusterConfiguration() {
- return RedisConfiguration.isClusterConfiguration(configuration) ? (RedisClusterConfiguration) configuration : null;
+ private Jedis createJedis() {
+ return new Jedis(new HostAndPort(getHostName(), getPort()), this.clientConfig);
}
/**
- * Specifies if pipelined results should be converted to the expected data type. If {@code false}, results of
- * {@link JedisConnection#closePipeline()} and {@link JedisConnection#exec()} will be of the type returned by the
- * Jedis driver.
+ * Post process a newly retrieved connection. Useful for decorating or executing initialization commands on a new
+ * connection. This implementation simply returns the connection.
*
- * @return {@code true} to convert pipeline and transaction results; {@code false} otherwise.
+ * @param connection the jedis connection.
+ * @return processed connection
*/
- @Override
- public boolean getConvertPipelineAndTxResults() {
- return convertPipelineAndTxResults;
+ protected JedisConnection postProcessConnection(JedisConnection connection) {
+ return connection;
}
- /**
- * Specifies if pipelined results should be converted to the expected data type. If {@code false}, results of
- * {@link JedisConnection#closePipeline()} and {@link JedisConnection#exec()} will be of the type returned by the
- * Jedis driver.
- *
- * @param convertPipelineAndTxResults {@code true} to convert pipeline and transaction results; {@code false}
- * otherwise.
- */
- public void setConvertPipelineAndTxResults(boolean convertPipelineAndTxResults) {
- this.convertPipelineAndTxResults = convertPipelineAndTxResults;
+ @Override
+ public RedisClusterConnection getClusterConnection() {
+
+ assertInitialized();
+
+ if (!isRedisClusterAware()) {
+ throw new InvalidDataAccessApiUsageException("Cluster is not configured");
+ }
+
+ JedisClusterConnection clusterConnection = new JedisClusterConnection(this.cluster,
+ getRequiredClusterCommandExecutor(), this.topologyProvider);
+
+ return postProcessConnection(clusterConnection);
}
/**
- * @return true when {@link RedisSentinelConfiguration} is present.
- * @since 1.4
+ * Post process a newly retrieved connection. Useful for decorating or executing initialization commands on a new
+ * connection. This implementation simply returns the connection.
+ *
+ * @param connection the jedis connection.
+ * @return processed connection.
+ * @since 3.2
*/
- public boolean isRedisSentinelAware() {
- return RedisConfiguration.isSentinelConfiguration(configuration);
+ protected JedisClusterConnection postProcessConnection(JedisClusterConnection connection) {
+ return connection;
}
- /**
- * @return true when {@link RedisClusterConfiguration} is present.
- * @since 2.0
- */
- public boolean isRedisClusterAware() {
- return RedisConfiguration.isClusterConfiguration(configuration);
+ @Override
+ public DataAccessException translateExceptionIfPossible(RuntimeException ex) {
+ return EXCEPTION_TRANSLATION.translate(ex);
}
@Override
@@ -1078,6 +1116,5 @@ public Duration getConnectTimeout() {
public void setConnectTimeout(Duration connectTimeout) {
this.connectTimeout = connectTimeout;
}
-
}
}
diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnection.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnection.java
index 9d50e0f056..30f1761807 100644
--- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnection.java
+++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnection.java
@@ -100,17 +100,45 @@
*/
public class LettuceConnection extends AbstractRedisConnection {
- private final Log LOGGER = LogFactory.getLog(getClass());
+ private static final ExceptionTranslationStrategy EXCEPTION_TRANSLATION =
+ new FallbackExceptionTranslationStrategy(LettuceExceptionConverter.INSTANCE);
static final RedisCodec CODEC = ByteArrayCodec.INSTANCE;
- private static final ExceptionTranslationStrategy EXCEPTION_TRANSLATION = new FallbackExceptionTranslationStrategy(
- LettuceExceptionConverter.INSTANCE);
private static final TypeHints typeHints = new TypeHints();
+ private static class LettuceTransactionResultConverter extends TransactionResultConverter {
+
+ public LettuceTransactionResultConverter(Queue> txResults,
+ Converter exceptionConverter) {
+
+ super(txResults, exceptionConverter);
+ }
+
+ @Override
+ public List convert(List execResults) {
+ // Lettuce Empty list means null (watched variable modified)
+ return execResults.isEmpty() ? null : super.convert(execResults);
+ }
+ }
+
+ // refers only to main connection as pubsub happens on a different one
+ private boolean convertPipelineAndTxResults = true;
+ private boolean isClosed = false;
+ private boolean isMulti = false;
+ private boolean isPipelined = false;
+
+ private int dbIndex;
+ private final int defaultDbIndex;
+ private final long timeout;
+
+ private final LettuceConnectionProvider connectionProvider;
+
+ private volatile @Nullable LettuceSubscription subscription;
+
private final LettuceGeoCommands geoCommands = new LettuceGeoCommands(this);
private final LettuceHashCommands hashCommands = new LettuceHashCommands(this);
- private final LettuceHyperLogLogCommands hllCommands = new LettuceHyperLogLogCommands(this);
+ private final LettuceHyperLogLogCommands hyperLogLogCommands = new LettuceHyperLogLogCommands(this);
private final LettuceKeyCommands keyCommands = new LettuceKeyCommands(this);
private final LettuceListCommands listCommands = new LettuceListCommands(this);
private final LettuceScriptingCommands scriptingCommands = new LettuceScriptingCommands(this);
@@ -120,66 +148,21 @@ public class LettuceConnection extends AbstractRedisConnection {
private final LettuceStringCommands stringCommands = new LettuceStringCommands(this);
private final LettuceZSetCommands zSetCommands = new LettuceZSetCommands(this);
- private final int defaultDbIndex;
- private int dbIndex;
-
- private final LettuceConnectionProvider connectionProvider;
- private final @Nullable StatefulConnection asyncSharedConn;
- private @Nullable StatefulConnection asyncDedicatedConn;
-
- private final long timeout;
-
- // refers only to main connection as pubsub happens on a different one
- private boolean isClosed = false;
- private boolean isMulti = false;
- private boolean isPipelined = false;
private @Nullable List> ppline;
- private @Nullable PipeliningFlushState flushState;
- private final Queue> txResults = new LinkedList<>();
- private volatile @Nullable LettuceSubscription subscription;
- /** flag indicating whether the connection needs to be dropped or not */
- private boolean convertPipelineAndTxResults = true;
- private PipeliningFlushPolicy pipeliningFlushPolicy = PipeliningFlushPolicy.flushEachCommand();
- LettuceResult, ?> newLettuceResult(Future> resultHolder) {
- return newLettuceResult(resultHolder, (val) -> val);
- }
-
- LettuceResult newLettuceResult(Future resultHolder, Converter converter) {
-
- return LettuceResultBuilder. forResponse(resultHolder).mappedWith(converter)
- .convertPipelineAndTxResults(convertPipelineAndTxResults).build();
- }
+ private final Log LOGGER = LogFactory.getLog(getClass());
- LettuceResult newLettuceResult(Future resultHolder, Converter converter,
- Supplier defaultValue) {
+ private PipeliningFlushPolicy pipeliningFlushPolicy = PipeliningFlushPolicy.flushEachCommand();
- return LettuceResultBuilder. forResponse(resultHolder).mappedWith(converter)
- .convertPipelineAndTxResults(convertPipelineAndTxResults).defaultNullTo(defaultValue).build();
- }
+ private @Nullable PipeliningFlushState pipeliningFlushState;
- LettuceResult newLettuceStatusResult(Future resultHolder) {
- return LettuceResultBuilder. forResponse(resultHolder).buildStatusResult();
- }
+ private final Queue> txResults = new LinkedList<>();
- private class LettuceTransactionResultConverter extends TransactionResultConverter {
- public LettuceTransactionResultConverter(Queue> txResults,
- Converter exceptionConverter) {
- super(txResults, exceptionConverter);
- }
-
- @Override
- public List convert(List execResults) {
- // Lettuce Empty list means null (watched variable modified)
- if (execResults.isEmpty()) {
- return null;
- }
- return super.convert(execResults);
- }
- }
+ private @Nullable StatefulConnection asyncDedicatedConnection;
+ private final @Nullable StatefulConnection asyncSharedConnection;
/**
- * Instantiates a new lettuce connection.
+ * Creates a new {@link LettuceConnection}.
*
* @param timeout The connection timeout (in milliseconds)
* @param client The {@link RedisClient} to use when instantiating a native connection
@@ -189,7 +172,7 @@ public LettuceConnection(long timeout, RedisClient client) {
}
/**
- * Instantiates a new lettuce connection.
+ * Creates a new {@link LettuceConnection}.
*
* @param sharedConnection A native connection that is shared with other {@link LettuceConnection}s. Will not be used
* for transactions or blocking operations
@@ -202,8 +185,10 @@ public LettuceConnection(@Nullable StatefulRedisConnection share
}
/**
- * @param sharedConnection A native connection that is shared with other {@link LettuceConnection}s. Should not be
- * used for transactions or blocking operations.
+ * Creates a new {@link LettuceConnection}.
+ *
+ * @param sharedConnection A native connection that is shared with other {@link LettuceConnection}s.
+ * Should not be used for transactions or blocking operations.
* @param timeout The connection timeout (in milliseconds)
* @param client The {@link RedisClient} to use when making pub/sub connections.
* @param defaultDbIndex The db index to use along with {@link RedisClient} when establishing a dedicated connection.
@@ -213,15 +198,17 @@ public LettuceConnection(@Nullable StatefulRedisConnection share
@Nullable AbstractRedisClient client, int defaultDbIndex) {
this.connectionProvider = new StandaloneConnectionProvider((RedisClient) client, CODEC);
- this.asyncSharedConn = sharedConnection;
+ this.asyncSharedConnection = sharedConnection;
this.timeout = timeout;
this.defaultDbIndex = defaultDbIndex;
this.dbIndex = this.defaultDbIndex;
}
/**
- * @param sharedConnection A native connection that is shared with other {@link LettuceConnection}s. Should not be
- * used for transactions or blocking operations.
+ * Creates a new {@link LettuceConnection}.
+ *
+ * @param sharedConnection A native connection that is shared with other {@link LettuceConnection}s.
+ * Should not be used for transactions or blocking operations.
* @param connectionProvider connection provider to obtain and release native connections.
* @param timeout The connection timeout (in milliseconds)
* @param defaultDbIndex The db index to use along with {@link RedisClient} when establishing a dedicated connection.
@@ -229,12 +216,15 @@ public LettuceConnection(@Nullable StatefulRedisConnection share
*/
public LettuceConnection(@Nullable StatefulRedisConnection sharedConnection,
LettuceConnectionProvider connectionProvider, long timeout, int defaultDbIndex) {
+
this((StatefulConnection) sharedConnection, connectionProvider, timeout, defaultDbIndex);
}
/**
- * @param sharedConnection A native connection that is shared with other {@link LettuceConnection}s. Should not be
- * used for transactions or blocking operations.
+ * Creates a new {@link LettuceConnection}.
+ *
+ * @param sharedConnection A native connection that is shared with other {@link LettuceConnection}s.
+ * Should not be used for transactions or blocking operations.
* @param connectionProvider connection provider to obtain and release native connections.
* @param timeout The connection timeout (in milliseconds)
* @param defaultDbIndex The db index to use along with {@link RedisClient} when establishing a dedicated connection.
@@ -245,17 +235,13 @@ public LettuceConnection(@Nullable StatefulRedisConnection share
Assert.notNull(connectionProvider, "LettuceConnectionProvider must not be null");
- this.asyncSharedConn = sharedConnection;
+ this.asyncSharedConnection = sharedConnection;
this.connectionProvider = connectionProvider;
this.timeout = timeout;
this.defaultDbIndex = defaultDbIndex;
this.dbIndex = this.defaultDbIndex;
}
- protected DataAccessException convertLettuceAccessException(Exception ex) {
- return EXCEPTION_TRANSLATION.translate(ex);
- }
-
@Override
public org.springframework.data.redis.connection.RedisCommands commands() {
return this;
@@ -263,57 +249,61 @@ public org.springframework.data.redis.connection.RedisCommands commands() {
@Override
public RedisGeoCommands geoCommands() {
- return geoCommands;
+ return this.geoCommands;
}
@Override
public RedisHashCommands hashCommands() {
- return hashCommands;
+ return this.hashCommands;
}
@Override
public RedisHyperLogLogCommands hyperLogLogCommands() {
- return hllCommands;
+ return this.hyperLogLogCommands;
}
@Override
public RedisKeyCommands keyCommands() {
- return keyCommands;
+ return this.keyCommands;
}
@Override
public RedisListCommands listCommands() {
- return listCommands;
+ return this.listCommands;
}
@Override
public RedisScriptingCommands scriptingCommands() {
- return scriptingCommands;
+ return this.scriptingCommands;
}
@Override
public RedisSetCommands setCommands() {
- return setCommands;
+ return this.setCommands;
}
@Override
public RedisServerCommands serverCommands() {
- return serverCommands;
+ return this.serverCommands;
}
@Override
public RedisStreamCommands streamCommands() {
- return streamCommands;
+ return this.streamCommands;
}
@Override
public RedisStringCommands stringCommands() {
- return stringCommands;
+ return this.stringCommands;
}
@Override
public RedisZSetCommands zSetCommands() {
- return zSetCommands;
+ return this.zSetCommands;
+ }
+
+ protected DataAccessException convertLettuceAccessException(Exception cause) {
+ return EXCEPTION_TRANSLATION.translate(cause);
}
@Override
@@ -334,23 +324,170 @@ public Object execute(String command, byte[]... args) {
@SuppressWarnings({ "rawtypes", "unchecked" })
public Object execute(String command, @Nullable CommandOutput commandOutputTypeHint, byte[]... args) {
- Assert.hasText(command, "a valid command needs to be specified");
+ Assert.hasText(command, () -> String.format("A valid command [%s] needs to be specified", command));
- String name = command.trim().toUpperCase();
- ProtocolKeyword commandType = getCommandType(name);
+ ProtocolKeyword commandType = getCommandType(command.trim().toUpperCase());
validateCommandIfRunningInTransactionMode(commandType, args);
- CommandArgs cmdArg = new CommandArgs<>(CODEC);
+ CommandArgs commandArguments = new CommandArgs<>(CODEC);
+
if (!ObjectUtils.isEmpty(args)) {
- cmdArg.addKeys(args);
+ commandArguments.addKeys(args);
}
CommandOutput expectedOutput = commandOutputTypeHint != null ? commandOutputTypeHint
: typeHints.getTypeHint(commandType);
- Command cmd = new Command(commandType, expectedOutput, cmdArg);
- return invoke().just(RedisClusterAsyncCommands::dispatch, cmd.getType(), cmd.getOutput(), cmd.getArgs());
+ Command redisCommand = new Command(commandType, expectedOutput, commandArguments);
+
+ return invoke().just(RedisClusterAsyncCommands::dispatch, redisCommand.getType(), redisCommand.getOutput(),
+ redisCommand.getArgs());
+ }
+
+ RedisClusterAsyncCommands getAsyncConnection() {
+
+ if (isQueueing() || isPipelined()) {
+ return getAsyncDedicatedConnection();
+ }
+
+ StatefulConnection sharedConnection = this.asyncSharedConnection;
+
+ if (sharedConnection != null) {
+ if (sharedConnection instanceof StatefulRedisConnection statefulConnection) {
+ return statefulConnection.async();
+ }
+ if (sharedConnection instanceof StatefulRedisClusterConnection statefulClusterConnection) {
+ return statefulClusterConnection.async();
+ }
+ }
+
+ return getAsyncDedicatedConnection();
+ }
+
+ /**
+ * Obtain a {@link LettuceInvoker} to call Lettuce methods using the default {@link #getAsyncConnection() connection}.
+ *
+ * @return the {@link LettuceInvoker}.
+ * @since 2.5
+ */
+ LettuceInvoker invoke() {
+ return invoke(getAsyncConnection());
+ }
+
+ /**
+ * Obtain a {@link LettuceInvoker} to call Lettuce methods using the given {@link RedisClusterAsyncCommands
+ * connection}.
+ *
+ * @param connection the connection to use.
+ * @return the {@link LettuceInvoker}.
+ * @since 2.5
+ */
+ LettuceInvoker invoke(RedisClusterAsyncCommands connection) {
+ return doInvoke(connection, false);
+ }
+
+ /**
+ * Obtain a {@link LettuceInvoker} to call Lettuce methods returning a status response using the default
+ * {@link #getAsyncConnection() connection}. Status responses are not included in transactional and pipeline results.
+ *
+ * @return the {@link LettuceInvoker}.
+ * @since 2.5
+ */
+ LettuceInvoker invokeStatus() {
+ return doInvoke(getAsyncConnection(), true);
+ }
+
+ private LettuceInvoker doInvoke(RedisClusterAsyncCommands connection, boolean statusCommand) {
+
+ if (isPipelined()) {
+
+ return new LettuceInvoker(connection, (future, converter, nullDefault) -> {
+
+ try {
+ if (statusCommand) {
+ pipeline(newLettuceStatusResult(future.get()));
+ } else {
+ pipeline(newLettuceResult(future.get(), converter, nullDefault));
+ }
+ } catch (Exception cause) {
+ throw convertLettuceAccessException(cause);
+ }
+
+ return null;
+ });
+ }
+
+ if (isQueueing()) {
+
+ return new LettuceInvoker(connection, (future, converter, nullDefault) -> {
+
+ try {
+ if (statusCommand) {
+ transaction(newLettuceStatusResult(future.get()));
+ } else {
+ transaction(newLettuceResult(future.get(), converter, nullDefault));
+ }
+ } catch (Exception cause) {
+ throw convertLettuceAccessException(cause);
+ }
+
+ return null;
+ });
+ }
+
+ return new LettuceInvoker(connection, (future, converter, nullDefault) -> {
+
+ try {
+
+ Object result = await(future.get());
+
+ return result != null ? converter.convert(result) : nullDefault.get();
+ } catch (Exception cause) {
+ throw convertLettuceAccessException(cause);
+ }
+ });
+ }
+
+ LettuceResult newLettuceResult(Future resultHolder, Converter converter) {
+
+ return LettuceResultBuilder.forResponse(resultHolder)
+ .mappedWith(converter)
+ .convertPipelineAndTxResults(this.convertPipelineAndTxResults)
+ .build();
+ }
+
+ LettuceResult newLettuceResult(Future resultHolder, Converter converter,
+ Supplier defaultValue) {
+
+ return LettuceResultBuilder.forResponse(resultHolder)
+ .mappedWith(converter)
+ .convertPipelineAndTxResults(this.convertPipelineAndTxResults)
+ .defaultNullTo(defaultValue)
+ .build();
+ }
+
+ LettuceResult newLettuceStatusResult(Future resultHolder) {
+ return LettuceResultBuilder.forResponse(resultHolder).buildStatusResult();
+ }
+
+ void pipeline(LettuceResult, ?> result) {
+
+ PipeliningFlushState pipeliningFlushState = this.pipeliningFlushState;
+
+ if (pipeliningFlushState != null) {
+ pipeliningFlushState.onCommand(getOrCreateDedicatedConnection());
+ }
+
+ if (isQueueing()) {
+ transaction(result);
+ } else {
+ this.ppline.add(result);
+ }
+ }
+
+ void transaction(FutureResult> result) {
+ this.txResults.add(result);
}
@Override
@@ -366,57 +503,60 @@ public void close() {
try {
reset();
- } catch (RuntimeException e) {
- LOGGER.debug("Failed to reset connection during close", e);
+ } catch (RuntimeException cause) {
+ LOGGER.debug("Failed to reset connection during close", cause);
}
}
private void reset() {
- if (asyncDedicatedConn != null) {
+ if (this.asyncDedicatedConnection != null) {
try {
if (customizedDatabaseIndex()) {
- potentiallySelectDatabase(defaultDbIndex);
+ potentiallySelectDatabase(this.defaultDbIndex);
}
- connectionProvider.release(asyncDedicatedConn);
- asyncDedicatedConn = null;
- } catch (RuntimeException ex) {
- throw convertLettuceAccessException(ex);
+ this.connectionProvider.release(this.asyncDedicatedConnection);
+ this.asyncDedicatedConnection = null;
+ } catch (RuntimeException cause) {
+ throw convertLettuceAccessException(cause);
}
}
LettuceSubscription subscription = this.subscription;
- if (subscription != null) {
- if (subscription.isAlive()) {
- subscription.doClose();
- }
- this.subscription = null;
+
+ if (isAlive(subscription)) {
+ subscription.doClose();
}
+ this.subscription = null;
this.dbIndex = defaultDbIndex;
}
@Override
public boolean isClosed() {
- return isClosed && !isSubscribed();
+ return this.isClosed && !isSubscribed();
}
@Override
public RedisClusterAsyncCommands getNativeConnection() {
LettuceSubscription subscription = this.subscription;
- return (subscription != null && subscription.isAlive() ? subscription.getNativeConnection().async()
- : getAsyncConnection());
+
+ return isAlive(subscription) ? subscription.getNativeConnection().async() : getAsyncConnection();
+ }
+
+ private boolean isAlive(@Nullable LettuceSubscription subscription) {
+ return subscription != null && subscription.isAlive();
}
@Override
public boolean isQueueing() {
- return isMulti;
+ return this.isMulti;
}
@Override
public boolean isPipelined() {
- return isPipelined;
+ return this.isPipelined;
}
@Override
@@ -425,8 +565,8 @@ public void openPipeline() {
if (!isPipelined) {
isPipelined = true;
ppline = new ArrayList<>();
- flushState = this.pipeliningFlushPolicy.newPipeline();
- flushState.onOpen(this.getOrCreateDedicatedConnection());
+ pipeliningFlushState = this.pipeliningFlushPolicy.newPipeline();
+ pipeliningFlushState.onOpen(this.getOrCreateDedicatedConnection());
}
}
@@ -437,17 +577,19 @@ public List closePipeline() {
return Collections.emptyList();
}
- flushState.onClose(this.getOrCreateDedicatedConnection());
- flushState = null;
+ pipeliningFlushState.onClose(this.getOrCreateDedicatedConnection());
+ pipeliningFlushState = null;
isPipelined = false;
+
List> futures = new ArrayList<>(ppline.size());
+
for (LettuceResult, ?> result : ppline) {
futures.add(result.getResultHolder());
}
try {
- boolean done = LettuceFutures.awaitAll(timeout, TimeUnit.MILLISECONDS,
- futures.toArray(new RedisFuture[futures.size()]));
+
+ boolean done = LettuceFutures.awaitAll(timeout, TimeUnit.MILLISECONDS, futures.toArray(new RedisFuture[0]));
List results = new ArrayList<>(futures.size());
@@ -458,25 +600,29 @@ public List closePipeline() {
if (result.getResultHolder().getOutput().hasError()) {
- Exception err = new InvalidDataAccessApiUsageException(result.getResultHolder().getOutput().getError());
+ Exception exception = new InvalidDataAccessApiUsageException(result.getResultHolder()
+ .getOutput().getError());
+
// remember only the first error
if (problem == null) {
- problem = err;
+ problem = exception;
}
- results.add(err);
+
+ results.add(exception);
} else if (!result.isStatus()) {
try {
results.add(result.conversionRequired() ? result.convert(result.get()) : result.get());
- } catch (DataAccessException e) {
+ } catch (DataAccessException cause) {
if (problem == null) {
- problem = e;
+ problem = cause;
}
- results.add(e);
+ results.add(cause);
}
}
}
}
+
ppline.clear();
if (problem != null) {
@@ -488,8 +634,8 @@ public List closePipeline() {
}
throw new RedisPipelineException(new QueryTimeoutException("Redis command timed out"));
- } catch (Exception e) {
- throw new RedisPipelineException(e);
+ } catch (Exception cause) {
+ throw new RedisPipelineException(cause);
}
}
@@ -505,15 +651,17 @@ public String ping() {
@Override
public void discard() {
+
isMulti = false;
+
try {
if (isPipelined()) {
pipeline(newLettuceStatusResult(getAsyncDedicatedRedisCommands().discard()));
return;
}
getDedicatedRedisCommands().discard();
- } catch (Exception ex) {
- throw convertLettuceAccessException(ex);
+ } catch (Exception cause) {
+ throw convertLettuceAccessException(cause);
} finally {
txResults.clear();
}
@@ -534,18 +682,21 @@ public List exec() {
LettuceTransactionResultConverter resultConverter = new LettuceTransactionResultConverter(
new LinkedList<>(txResults), exceptionConverter);
- pipeline(newLettuceResult(exec,
- source -> resultConverter.convert(LettuceConverters.transactionResultUnwrapper().convert(source))));
+ pipeline(newLettuceResult(exec, source ->
+ resultConverter.convert(LettuceConverters.transactionResultUnwrapper().convert(source))));
+
return null;
}
TransactionResult transactionResult = getDedicatedRedisCommands().exec();
+
List results = LettuceConverters.transactionResultUnwrapper().convert(transactionResult);
+
return convertPipelineAndTxResults
? new LettuceTransactionResultConverter(txResults, exceptionConverter).convert(results)
: results;
- } catch (Exception ex) {
- throw convertLettuceAccessException(ex);
+ } catch (Exception cause) {
+ throw convertLettuceAccessException(cause);
} finally {
txResults.clear();
}
@@ -553,25 +704,28 @@ public List exec() {
@Override
public void multi() {
+
if (isQueueing()) {
return;
}
+
isMulti = true;
+
try {
if (isPipelined()) {
getAsyncDedicatedRedisCommands().multi();
return;
}
getDedicatedRedisCommands().multi();
- } catch (Exception ex) {
- throw convertLettuceAccessException(ex);
+ } catch (Exception cause) {
+ throw convertLettuceAccessException(cause);
}
}
@Override
public void select(int dbIndex) {
- if (asyncSharedConn != null) {
+ if (asyncSharedConnection != null) {
throw new InvalidDataAccessApiUsageException("Selecting a new database not supported due to shared connection;"
+ " Use separate ConnectionFactorys to work with multiple databases");
}
@@ -595,16 +749,18 @@ public void unwatch() {
return;
}
getDedicatedRedisCommands().unwatch();
- } catch (Exception ex) {
- throw convertLettuceAccessException(ex);
+ } catch (Exception cause) {
+ throw convertLettuceAccessException(cause);
}
}
@Override
public void watch(byte[]... keys) {
+
if (isQueueing()) {
throw new InvalidDataAccessApiUsageException("WATCH is not supported when a transaction is active");
}
+
try {
if (isPipelined()) {
pipeline(newLettuceStatusResult(getAsyncDedicatedRedisCommands().watch(keys)));
@@ -615,8 +771,8 @@ public void watch(byte[]... keys) {
return;
}
getDedicatedRedisCommands().watch(keys);
- } catch (Exception ex) {
- throw convertLettuceAccessException(ex);
+ } catch (Exception cause) {
+ throw convertLettuceAccessException(cause);
}
}
@@ -631,12 +787,13 @@ public Long publish(byte[] channel, byte[] message) {
@Override
public Subscription getSubscription() {
- return subscription;
+ return this.subscription;
}
@Override
public boolean isSubscribed() {
- return (subscription != null && subscription.isAlive());
+ Subscription subscription = getSubscription();
+ return subscription != null && subscription.isAlive();
}
@Override
@@ -649,10 +806,10 @@ public void pSubscribe(MessageListener listener, byte[]... patterns) {
}
try {
- subscription = initSubscription(listener);
- subscription.pSubscribe(patterns);
- } catch (Exception ex) {
- throw convertLettuceAccessException(ex);
+ this.subscription = initSubscription(listener);
+ this.subscription.pSubscribe(patterns);
+ } catch (Exception cause) {
+ throw convertLettuceAccessException(cause);
}
}
@@ -666,21 +823,21 @@ public void subscribe(MessageListener listener, byte[]... channels) {
}
try {
- subscription = initSubscription(listener);
- subscription.subscribe(channels);
- } catch (Exception ex) {
- throw convertLettuceAccessException(ex);
+ this.subscription = initSubscription(listener);
+ this.subscription.subscribe(channels);
+ } catch (Exception cause) {
+ throw convertLettuceAccessException(cause);
}
}
@SuppressWarnings("unchecked")
- T failsafeReadScanValues(List> source, @SuppressWarnings("rawtypes") Converter converter) {
+ T failsafeReadScanValues(List> source, @SuppressWarnings("rawtypes") @Nullable Converter converter) {
try {
return (T) (converter != null ? converter.convert(source) : source);
- } catch (IndexOutOfBoundsException e) {
- // ignore this one
- }
+ } catch (IndexOutOfBoundsException ignore) {
+ }
+
return null;
}
@@ -720,7 +877,8 @@ protected StatefulRedisPubSubConnection switchToPubSub() {
checkSubscription();
reset();
- return connectionProvider.getConnection(StatefulRedisPubSubConnection.class);
+
+ return this.connectionProvider.getConnection(StatefulRedisPubSubConnection.class);
}
/**
@@ -734,128 +892,8 @@ protected StatefulRedisPubSubConnection switchToPubSub() {
*/
protected LettuceSubscription doCreateSubscription(MessageListener listener,
StatefulRedisPubSubConnection connection, LettuceConnectionProvider connectionProvider) {
- return new LettuceSubscription(listener, connection, connectionProvider);
- }
-
- void pipeline(LettuceResult, ?> result) {
-
- if (flushState != null) {
- flushState.onCommand(getOrCreateDedicatedConnection());
- }
-
- if (isQueueing()) {
- transaction(result);
- } else {
- ppline.add(result);
- }
- }
- /**
- * Obtain a {@link LettuceInvoker} to call Lettuce methods using the default {@link #getAsyncConnection() connection}.
- *
- * @return the {@link LettuceInvoker}.
- * @since 2.5
- */
- LettuceInvoker invoke() {
- return invoke(getAsyncConnection());
- }
-
- /**
- * Obtain a {@link LettuceInvoker} to call Lettuce methods using the given {@link RedisClusterAsyncCommands
- * connection}.
- *
- * @param connection the connection to use.
- * @return the {@link LettuceInvoker}.
- * @since 2.5
- */
- LettuceInvoker invoke(RedisClusterAsyncCommands connection) {
- return doInvoke(connection, false);
- }
-
- /**
- * Obtain a {@link LettuceInvoker} to call Lettuce methods returning a status response using the default
- * {@link #getAsyncConnection() connection}. Status responses are not included in transactional and pipeline results.
- *
- * @return the {@link LettuceInvoker}.
- * @since 2.5
- */
- LettuceInvoker invokeStatus() {
- return doInvoke(getAsyncConnection(), true);
- }
-
- private LettuceInvoker doInvoke(RedisClusterAsyncCommands connection, boolean statusCommand) {
-
- if (isPipelined()) {
-
- return new LettuceInvoker(connection, (future, converter, nullDefault) -> {
-
- try {
- if (statusCommand) {
- pipeline(newLettuceStatusResult(future.get()));
- } else {
- pipeline(newLettuceResult(future.get(), converter, nullDefault));
- }
- } catch (Exception ex) {
- throw convertLettuceAccessException(ex);
- }
- return null;
- });
- }
-
- if (isQueueing()) {
-
- return new LettuceInvoker(connection, (future, converter, nullDefault) -> {
- try {
- if (statusCommand) {
- transaction(newLettuceStatusResult(future.get()));
- } else {
- transaction(newLettuceResult(future.get(), converter, nullDefault));
- }
-
- } catch (Exception ex) {
- throw convertLettuceAccessException(ex);
- }
- return null;
- });
- }
-
- return new LettuceInvoker(connection, (future, converter, nullDefault) -> {
-
- try {
-
- Object result = await(future.get());
-
- if (result == null) {
- return nullDefault.get();
- }
-
- return converter.convert(result);
- } catch (Exception ex) {
- throw convertLettuceAccessException(ex);
- }
- });
- }
-
- void transaction(FutureResult> result) {
- txResults.add(result);
- }
-
- RedisClusterAsyncCommands getAsyncConnection() {
-
- if (isQueueing() || isPipelined()) {
- return getAsyncDedicatedConnection();
- }
-
- if (asyncSharedConn != null) {
-
- if (asyncSharedConn instanceof StatefulRedisConnection) {
- return ((StatefulRedisConnection) asyncSharedConn).async();
- }
- if (asyncSharedConn instanceof StatefulRedisClusterConnection) {
- return ((StatefulRedisClusterConnection) asyncSharedConn).async();
- }
- }
- return getAsyncDedicatedConnection();
+ return new LettuceSubscription(listener, connection, connectionProvider);
}
protected RedisClusterCommands getConnection() {
@@ -864,13 +902,12 @@ protected RedisClusterCommands getConnection() {
return getDedicatedConnection();
}
- if (asyncSharedConn != null) {
-
- if (asyncSharedConn instanceof StatefulRedisConnection) {
- return ((StatefulRedisConnection) asyncSharedConn).sync();
+ if (asyncSharedConnection != null) {
+ if (asyncSharedConnection instanceof StatefulRedisConnection statefulConnection) {
+ return statefulConnection.sync();
}
- if (asyncSharedConn instanceof StatefulRedisClusterConnection) {
- return ((StatefulRedisClusterConnection) asyncSharedConn).sync();
+ if (asyncSharedConnection instanceof StatefulRedisClusterConnection statefulClusterConnection) {
+ return statefulClusterConnection.sync();
}
}
@@ -881,15 +918,16 @@ RedisClusterCommands getDedicatedConnection() {
StatefulConnection connection = getOrCreateDedicatedConnection();
- if (connection instanceof StatefulRedisConnection) {
- return ((StatefulRedisConnection) connection).sync();
+ if (connection instanceof StatefulRedisConnection statefulConnection) {
+ return statefulConnection.sync();
}
- if (connection instanceof StatefulRedisClusterConnection) {
- return ((StatefulRedisClusterConnection) connection).sync();
+ if (connection instanceof StatefulRedisClusterConnection statefulClusterConnection) {
+ return statefulClusterConnection.sync();
}
- throw new IllegalStateException(
- String.format("%s is not a supported connection type", connection.getClass().getName()));
+ String message = String.format("%s is not a supported connection type", connection.getClass().getName());
+
+ throw new IllegalStateException(message);
}
protected RedisClusterAsyncCommands getAsyncDedicatedConnection() {
@@ -900,24 +938,25 @@ protected RedisClusterAsyncCommands getAsyncDedicatedConnection(
StatefulConnection connection = getOrCreateDedicatedConnection();
- if (connection instanceof StatefulRedisConnection) {
- return ((StatefulRedisConnection) connection).async();
+ if (connection instanceof StatefulRedisConnection statefulConnection) {
+ return statefulConnection.async();
}
- if (asyncDedicatedConn instanceof StatefulRedisClusterConnection) {
- return ((StatefulRedisClusterConnection) connection).async();
+ if (asyncDedicatedConnection instanceof StatefulRedisClusterConnection statefulClusterConnection) {
+ return statefulClusterConnection.async();
}
- throw new IllegalStateException(
- String.format("%s is not a supported connection type", connection.getClass().getName()));
+ String message = String.format("%s is not a supported connection type", connection.getClass().getName());
+
+ throw new IllegalStateException(message);
}
@SuppressWarnings("unchecked")
protected StatefulConnection doGetAsyncDedicatedConnection() {
- StatefulConnection connection = connectionProvider.getConnection(StatefulConnection.class);
+ StatefulConnection connection = getConnectionProvider().getConnection(StatefulConnection.class);
if (customizedDatabaseIndex()) {
- potentiallySelectDatabase(dbIndex);
+ potentiallySelectDatabase(this.dbIndex);
}
return connection;
@@ -927,14 +966,15 @@ protected StatefulConnection doGetAsyncDedicatedConnection() {
protected boolean isActive(RedisNode node) {
StatefulRedisSentinelConnection connection = null;
+
try {
connection = getConnection(node);
return connection.sync().ping().equalsIgnoreCase("pong");
- } catch (Exception e) {
+ } catch (Exception cause) {
return false;
} finally {
if (connection != null) {
- connectionProvider.release(connection);
+ getConnectionProvider().release(connection);
}
}
}
@@ -943,23 +983,24 @@ protected boolean isActive(RedisNode node) {
protected RedisSentinelConnection getSentinelConnection(RedisNode sentinel) {
StatefulRedisSentinelConnection connection = getConnection(sentinel);
+
return new LettuceSentinelConnection(connection);
}
LettuceConnectionProvider getConnectionProvider() {
- return connectionProvider;
+ return this.connectionProvider;
}
@SuppressWarnings("unchecked")
private StatefulRedisSentinelConnection getConnection(RedisNode sentinel) {
- return ((TargetAware) connectionProvider).getConnection(StatefulRedisSentinelConnection.class,
+ return ((TargetAware) getConnectionProvider()).getConnection(StatefulRedisSentinelConnection.class,
getRedisURI(sentinel));
}
@Nullable
private T await(RedisFuture cmd) {
- if (isMulti) {
+ if (this.isMulti) {
return null;
}
@@ -972,24 +1013,23 @@ private T await(RedisFuture cmd) {
private StatefulConnection getOrCreateDedicatedConnection() {
- if (asyncDedicatedConn == null) {
- asyncDedicatedConn = doGetAsyncDedicatedConnection();
+ if (this.asyncDedicatedConnection == null) {
+ this.asyncDedicatedConnection = doGetAsyncDedicatedConnection();
}
- return asyncDedicatedConn;
+ return this.asyncDedicatedConnection;
}
- @SuppressWarnings("unchecked")
private RedisCommands getDedicatedRedisCommands() {
- return (RedisCommands) getDedicatedConnection();
+ return (RedisCommands) getDedicatedConnection();
}
- @SuppressWarnings("unchecked")
private RedisAsyncCommands getAsyncDedicatedRedisCommands() {
- return (RedisAsyncCommands) getAsyncDedicatedConnection();
+ return (RedisAsyncCommands) getAsyncDedicatedConnection();
}
private void checkSubscription() {
+
if (isSubscribed()) {
throw new RedisSubscribedConnectionException(
"Connection already subscribed; use the connection Subscription to cancel or add new channels");
@@ -1001,7 +1041,12 @@ private LettuceSubscription initSubscription(MessageListener listener) {
}
private RedisURI getRedisURI(RedisNode node) {
- return RedisURI.Builder.redis(node.getHost(), node.getPort()).build();
+ return RedisURI.Builder.redis(node.getHost(), getPort(node)).build();
+ }
+
+ private int getPort(RedisNode node) {
+ Integer port = node.getPort();
+ return port != null ? port : RedisURI.DEFAULT_REDIS_PORT;
}
private boolean customizedDatabaseIndex() {
@@ -1009,8 +1054,9 @@ private boolean customizedDatabaseIndex() {
}
private void potentiallySelectDatabase(int dbIndex) {
- if (asyncDedicatedConn instanceof StatefulRedisConnection) {
- ((StatefulRedisConnection) asyncDedicatedConn).sync().select(dbIndex);
+
+ if (asyncDedicatedConnection instanceof StatefulRedisConnection statefulConnection) {
+ statefulConnection.sync().select(dbIndex);
}
}
@@ -1025,14 +1071,16 @@ private void validateCommandIfRunningInTransactionMode(ProtocolKeyword cmd, byte
}
}
- private void validateCommand(ProtocolKeyword cmd, @Nullable byte[]... args) {
+ private void validateCommand(ProtocolKeyword command, @Nullable byte[]... args) {
+
+ RedisCommand redisCommand = RedisCommand.failsafeCommandLookup(command.name());
- RedisCommand redisCommand = RedisCommand.failsafeCommandLookup(cmd.name());
if (!RedisCommand.UNKNOWN.equals(redisCommand) && redisCommand.requiresArguments()) {
try {
redisCommand.validateArgumentCount(args != null ? args.length : 0);
- } catch (IllegalArgumentException e) {
- throw new InvalidDataAccessApiUsageException(String.format("Validation failed for %s command", cmd), e);
+ } catch (IllegalArgumentException cause) {
+ String message = String.format("Validation failed for %s command", command);
+ throw new InvalidDataAccessApiUsageException(message, cause);
}
}
}
@@ -1041,7 +1089,7 @@ private static ProtocolKeyword getCommandType(String name) {
try {
return CommandType.valueOf(name);
- } catch (IllegalArgumentException e) {
+ } catch (IllegalArgumentException cause) {
return new CustomCommandType(name);
}
}
@@ -1059,7 +1107,7 @@ static class TypeHints {
@SuppressWarnings("rawtypes") //
private static final Map, Constructor> CONSTRUCTORS = new ConcurrentHashMap<>();
- {
+ static {
// INTEGER
COMMAND_OUTPUT_TYPE_MAPPING.put(BITCOUNT, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(BITOP, IntegerOutput.class);
@@ -1219,7 +1267,7 @@ static class TypeHints {
/**
* Returns the {@link CommandOutput} mapped for given {@link CommandType} or {@link ByteArrayOutput} as default.
*
- * @param type
+ * @param type {@link ProtocolKeyword} used to lookup the type hint.
* @return {@link ByteArrayOutput} as default when no matching {@link CommandOutput} available.
*/
@SuppressWarnings("rawtypes")
@@ -1230,28 +1278,33 @@ public CommandOutput getTypeHint(ProtocolKeyword type) {
/**
* Returns the {@link CommandOutput} mapped for given {@link CommandType} given {@link CommandOutput} as default.
*
- * @param type
- * @return
+ * @param type {@link ProtocolKeyword} used to lookup the type hint.
+ * @return the {@link CommandOutput} mapped for given {@link CommandType} given {@link CommandOutput} as default.
*/
@SuppressWarnings("rawtypes")
- public CommandOutput getTypeHint(ProtocolKeyword type, CommandOutput defaultType) {
+ public CommandOutput getTypeHint(@Nullable ProtocolKeyword type, CommandOutput defaultType) {
if (type == null || !COMMAND_OUTPUT_TYPE_MAPPING.containsKey(type)) {
return defaultType;
}
- CommandOutput, ?, ?> outputType = instanciateCommandOutput(COMMAND_OUTPUT_TYPE_MAPPING.get(type));
+
+ CommandOutput, ?, ?> outputType = instantiateCommandOutput(COMMAND_OUTPUT_TYPE_MAPPING.get(type));
+
return outputType != null ? outputType : defaultType;
}
@SuppressWarnings({ "rawtypes", "unchecked" })
- private CommandOutput, ?, ?> instanciateCommandOutput(Class extends CommandOutput> type) {
+ private CommandOutput, ?, ?> instantiateCommandOutput(Class extends CommandOutput> type) {
Assert.notNull(type, "Cannot create instance for 'null' type.");
+
Constructor constructor = CONSTRUCTORS.get(type);
+
if (constructor == null) {
constructor = (Constructor) ClassUtils.getConstructorIfAvailable(type, RedisCodec.class);
CONSTRUCTORS.put(type, constructor);
}
+
return BeanUtils.instantiateClass(constructor, CODEC);
}
}
@@ -1315,7 +1368,7 @@ public interface PipeliningFlushState {
/**
* Callback if the pipeline gets opened.
*
- * @param connection
+ * @param connection Lettuce {@link StatefulConnection}.
* @see #openPipeline()
*/
void onOpen(StatefulConnection, ?> connection);
@@ -1323,7 +1376,7 @@ public interface PipeliningFlushState {
/**
* Callback for each issued Redis command.
*
- * @param connection
+ * @param connection Lettuce {@link StatefulConnection}.
* @see #pipeline(LettuceResult)
*/
void onCommand(StatefulConnection, ?> connection);
@@ -1331,7 +1384,7 @@ public interface PipeliningFlushState {
/**
* Callback if the pipeline gets closed.
*
- * @param connection
+ * @param connection Lettuce {@link StatefulConnection}.
* @see #closePipeline()
*/
void onClose(StatefulConnection, ?> connection);
@@ -1432,45 +1485,40 @@ public void onClose(StatefulConnection, ?> connection) {
/**
* @since 2.3.8
*/
- static class CustomCommandType implements ProtocolKeyword {
-
- private final String name;
-
- CustomCommandType(String name) {
- this.name = name;
- }
+ record CustomCommandType(String name) implements ProtocolKeyword {
@Override
public byte[] getBytes() {
- return name.getBytes(StandardCharsets.US_ASCII);
+ return name().getBytes(StandardCharsets.US_ASCII);
}
@Override
public String name() {
- return name;
+ return this.name;
}
@Override
- public boolean equals(@Nullable Object o) {
+ public boolean equals(@Nullable Object obj) {
- if (this == o) {
+ if (this == obj) {
return true;
}
- if (!(o instanceof CustomCommandType)) {
+
+ if (!(obj instanceof CustomCommandType that)) {
return false;
}
- CustomCommandType that = (CustomCommandType) o;
- return ObjectUtils.nullSafeEquals(name, that.name);
+
+ return ObjectUtils.nullSafeEquals(this.name(), that.name());
}
@Override
public int hashCode() {
- return ObjectUtils.nullSafeHashCode(name);
+ return ObjectUtils.nullSafeHashCode(name());
}
@Override
public String toString() {
- return name;
+ return name();
}
}
}
diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactory.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactory.java
index 6bb2a3bd39..f0daa57ce2 100644
--- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactory.java
+++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactory.java
@@ -49,6 +49,7 @@
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.SmartLifecycle;
+import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.InvalidDataAccessApiUsageException;
import org.springframework.data.redis.ExceptionTranslationStrategy;
@@ -56,7 +57,6 @@
import org.springframework.data.redis.RedisConnectionFailureException;
import org.springframework.data.redis.connection.*;
import org.springframework.data.redis.connection.RedisConfiguration.ClusterConfiguration;
-import org.springframework.data.redis.connection.RedisConfiguration.DomainSocketConfiguration;
import org.springframework.data.redis.connection.RedisConfiguration.WithDatabaseIndex;
import org.springframework.data.redis.connection.RedisConfiguration.WithPassword;
import org.springframework.data.util.Optionals;
@@ -114,24 +114,39 @@ public class LettuceConnectionFactory implements RedisConnectionFactory, Reactiv
private static final ExceptionTranslationStrategy EXCEPTION_TRANSLATION = new PassThroughExceptionTranslationStrategy(
LettuceExceptionConverter.INSTANCE);
+ private boolean validateConnection = false;
+ private boolean shareNativeConnection = true;
+ private boolean eagerInitialization = false;
+ private boolean convertPipelineAndTxResults = true;
+
+ private int phase = 0; // in between min and max values
+
+ private @Nullable AbstractRedisClient client;
+
+ private final AtomicReference state = new AtomicReference<>(State.CREATED);
+
+ private @Nullable ClusterCommandExecutor clusterCommandExecutor;
+
+ private @Nullable AsyncTaskExecutor executor;
+
+ private final LettuceClientConfiguration clientConfiguration;
+
+ private @Nullable LettuceConnectionProvider connectionProvider;
+ private @Nullable LettuceConnectionProvider reactiveConnectionProvider;
+
private final Log log = LogFactory.getLog(getClass());
/** Synchronization monitor for the shared Connection */
private final Object connectionMonitor = new Object();
- private final LettuceClientConfiguration clientConfiguration;
-
- private RedisStandaloneConfiguration standaloneConfig = new RedisStandaloneConfiguration("localhost", 6379);
+ private PipeliningFlushPolicy pipeliningFlushPolicy = PipeliningFlushPolicy.flushEachCommand();
private @Nullable RedisConfiguration configuration;
- private int phase = 0; // in between min and max values
- private boolean validateConnection = false;
- private boolean shareNativeConnection = true;
- private boolean eagerInitialization = false;
- private boolean convertPipelineAndTxResults = true;
+ private RedisStandaloneConfiguration standaloneConfig = new RedisStandaloneConfiguration("localhost", 6379);
- private PipeliningFlushPolicy pipeliningFlushPolicy = PipeliningFlushPolicy.flushEachCommand();
+ private @Nullable SharedConnection connection;
+ private @Nullable SharedConnection reactiveConnection;
/**
* Lifecycle state of this factory.
@@ -140,15 +155,6 @@ enum State {
CREATED, STARTING, STARTED, STOPPING, STOPPED, DESTROYED;
}
- private final AtomicReference state = new AtomicReference<>(State.CREATED);
-
- private @Nullable AbstractRedisClient client;
- private @Nullable LettuceConnectionProvider connectionProvider;
- private @Nullable LettuceConnectionProvider reactiveConnectionProvider;
- private @Nullable SharedConnection connection;
- private @Nullable SharedConnection reactiveConnection;
- private @Nullable ClusterCommandExecutor clusterCommandExecutor;
-
/**
* Constructs a new {@link LettuceConnectionFactory} instance with default settings.
*/
@@ -159,31 +165,24 @@ public LettuceConnectionFactory() {
/**
* Constructs a new {@link LettuceConnectionFactory} instance with default settings.
*/
- public LettuceConnectionFactory(RedisStandaloneConfiguration configuration) {
- this(configuration, new MutableLettuceClientConfiguration());
+ public LettuceConnectionFactory(String host, int port) {
+ this(new RedisStandaloneConfiguration(host, port), new MutableLettuceClientConfiguration());
}
/**
* Constructs a new {@link LettuceConnectionFactory} instance given {@link LettuceClientConfiguration}.
*
- * @param clientConfig must not be {@literal null}
+ * @param clientConfiguration must not be {@literal null}
* @since 2.0
*/
- private LettuceConnectionFactory(LettuceClientConfiguration clientConfig) {
+ private LettuceConnectionFactory(LettuceClientConfiguration clientConfiguration) {
- Assert.notNull(clientConfig, "LettuceClientConfiguration must not be null");
+ Assert.notNull(clientConfiguration, "LettuceClientConfiguration must not be null");
- this.clientConfiguration = clientConfig;
+ this.clientConfiguration = clientConfiguration;
this.configuration = this.standaloneConfig;
}
- /**
- * Constructs a new {@link LettuceConnectionFactory} instance with default settings.
- */
- public LettuceConnectionFactory(String host, int port) {
- this(new RedisStandaloneConfiguration(host, port), new MutableLettuceClientConfiguration());
- }
-
/**
* Constructs a new {@link LettuceConnectionFactory} instance using the given {@link RedisSocketConfiguration}.
*
@@ -195,13 +194,21 @@ public LettuceConnectionFactory(RedisConfiguration redisConfiguration) {
}
/**
- * Constructs a new {@link LettuceConnectionFactory} instance using the given {@link RedisSentinelConfiguration}.
+ * Constructs a new {@link LettuceConnectionFactory} instance using the given
+ * {@link RedisStaticMasterReplicaConfiguration} and {@link LettuceClientConfiguration}.
*
- * @param sentinelConfiguration must not be {@literal null}.
- * @since 1.6
+ * @param redisConfiguration must not be {@literal null}.
+ * @param clientConfiguration must not be {@literal null}.
+ * @since 2.1
*/
- public LettuceConnectionFactory(RedisSentinelConfiguration sentinelConfiguration) {
- this(sentinelConfiguration, new MutableLettuceClientConfiguration());
+ public LettuceConnectionFactory(RedisConfiguration redisConfiguration,
+ LettuceClientConfiguration clientConfiguration) {
+
+ this(clientConfiguration);
+
+ Assert.notNull(redisConfiguration, "RedisConfiguration must not be null");
+
+ this.configuration = redisConfiguration;
}
/**
@@ -216,39 +223,31 @@ public LettuceConnectionFactory(RedisClusterConfiguration clusterConfiguration)
}
/**
- * Constructs a new {@link LettuceConnectionFactory} instance using the given {@link RedisStandaloneConfiguration} and
+ * Constructs a new {@link LettuceConnectionFactory} instance using the given {@link RedisClusterConfiguration} and
* {@link LettuceClientConfiguration}.
*
- * @param standaloneConfig must not be {@literal null}.
- * @param clientConfig must not be {@literal null}.
+ * @param clusterConfiguration must not be {@literal null}.
+ * @param clientConfiguration must not be {@literal null}.
* @since 2.0
*/
- public LettuceConnectionFactory(RedisStandaloneConfiguration standaloneConfig,
- LettuceClientConfiguration clientConfig) {
+ public LettuceConnectionFactory(RedisClusterConfiguration clusterConfiguration,
+ LettuceClientConfiguration clientConfiguration) {
- this(clientConfig);
+ this(clientConfiguration);
- Assert.notNull(standaloneConfig, "RedisStandaloneConfiguration must not be null");
+ Assert.notNull(clusterConfiguration, "RedisClusterConfiguration must not be null");
- this.standaloneConfig = standaloneConfig;
- this.configuration = this.standaloneConfig;
+ this.configuration = clusterConfiguration;
}
/**
- * Constructs a new {@link LettuceConnectionFactory} instance using the given
- * {@link RedisStaticMasterReplicaConfiguration} and {@link LettuceClientConfiguration}.
+ * Constructs a new {@link LettuceConnectionFactory} instance using the given {@link RedisSentinelConfiguration}.
*
- * @param redisConfiguration must not be {@literal null}.
- * @param clientConfig must not be {@literal null}.
- * @since 2.1
+ * @param sentinelConfiguration must not be {@literal null}.
+ * @since 1.6
*/
- public LettuceConnectionFactory(RedisConfiguration redisConfiguration, LettuceClientConfiguration clientConfig) {
-
- this(clientConfig);
-
- Assert.notNull(redisConfiguration, "RedisConfiguration must not be null");
-
- this.configuration = redisConfiguration;
+ public LettuceConnectionFactory(RedisSentinelConfiguration sentinelConfiguration) {
+ this(sentinelConfiguration, new MutableLettuceClientConfiguration());
}
/**
@@ -256,13 +255,13 @@ public LettuceConnectionFactory(RedisConfiguration redisConfiguration, LettuceCl
* {@link LettuceClientConfiguration}.
*
* @param sentinelConfiguration must not be {@literal null}.
- * @param clientConfig must not be {@literal null}.
+ * @param clientConfiguration must not be {@literal null}.
* @since 2.0
*/
public LettuceConnectionFactory(RedisSentinelConfiguration sentinelConfiguration,
- LettuceClientConfiguration clientConfig) {
+ LettuceClientConfiguration clientConfiguration) {
- this(clientConfig);
+ this(clientConfiguration);
Assert.notNull(sentinelConfiguration, "RedisSentinelConfiguration must not be null");
@@ -270,21 +269,29 @@ public LettuceConnectionFactory(RedisSentinelConfiguration sentinelConfiguration
}
/**
- * Constructs a new {@link LettuceConnectionFactory} instance using the given {@link RedisClusterConfiguration} and
+ * Constructs a new {@link LettuceConnectionFactory} instance with default settings.
+ */
+ public LettuceConnectionFactory(RedisStandaloneConfiguration configuration) {
+ this(configuration, new MutableLettuceClientConfiguration());
+ }
+
+ /**
+ * Constructs a new {@link LettuceConnectionFactory} instance using the given {@link RedisStandaloneConfiguration} and
* {@link LettuceClientConfiguration}.
*
- * @param clusterConfiguration must not be {@literal null}.
- * @param clientConfig must not be {@literal null}.
+ * @param standaloneConfiguration must not be {@literal null}.
+ * @param clientConfiguration must not be {@literal null}.
* @since 2.0
*/
- public LettuceConnectionFactory(RedisClusterConfiguration clusterConfiguration,
- LettuceClientConfiguration clientConfig) {
+ public LettuceConnectionFactory(RedisStandaloneConfiguration standaloneConfiguration,
+ LettuceClientConfiguration clientConfiguration) {
- this(clientConfig);
+ this(clientConfiguration);
- Assert.notNull(clusterConfiguration, "RedisClusterConfiguration must not be null");
+ Assert.notNull(standaloneConfiguration, "RedisStandaloneConfiguration must not be null");
- this.configuration = clusterConfiguration;
+ this.standaloneConfig = standaloneConfiguration;
+ this.configuration = this.standaloneConfig;
}
/**
@@ -298,11 +305,12 @@ public LettuceConnectionFactory(RedisClusterConfiguration clusterConfiguration,
* @param redisUri the connection URI in the format of a {@link RedisURI}.
* @return an appropriate {@link RedisConfiguration} instance representing the Redis URI.
* @since 2.5.3
+ * @see #createRedisConfiguration(RedisURI)
* @see RedisURI
*/
public static RedisConfiguration createRedisConfiguration(String redisUri) {
- Assert.hasText(redisUri, "RedisURI must not be null and not empty");
+ Assert.hasText(redisUri, "RedisURI must not be null or empty");
return createRedisConfiguration(RedisURI.create(redisUri));
}
@@ -335,392 +343,112 @@ public static RedisConfiguration createRedisConfiguration(RedisURI redisUri) {
return LettuceConverters.createRedisStandaloneConfiguration(redisUri);
}
- @Override
- public void start() {
-
- State current = state
- .getAndUpdate(state -> State.CREATED.equals(state) || State.STOPPED.equals(state) ? State.STARTING : state);
-
- if (State.CREATED.equals(current) || State.STOPPED.equals(current)) {
-
- this.client = createClient();
-
- this.connectionProvider = new ExceptionTranslatingConnectionProvider(createConnectionProvider(client, CODEC));
- this.reactiveConnectionProvider = new ExceptionTranslatingConnectionProvider(
- createConnectionProvider(client, LettuceReactiveRedisConnection.CODEC));
-
- if (isClusterAware()) {
-
- this.clusterCommandExecutor = new ClusterCommandExecutor(
- new LettuceClusterTopologyProvider((RedisClusterClient) client),
- new LettuceClusterConnection.LettuceClusterNodeResourceProvider(this.connectionProvider),
- EXCEPTION_TRANSLATION);
- }
-
- state.set(State.STARTED);
+ ClusterCommandExecutor getRequiredClusterCommandExecutor() {
- if (getEagerInitialization() && getShareNativeConnection()) {
- initConnection();
- }
+ if (this.clusterCommandExecutor == null) {
+ throw new IllegalStateException("ClusterCommandExecutor not initialized");
}
- }
-
- @Override
- public void stop() {
-
- if (state.compareAndSet(State.STARTED, State.STOPPING)) {
-
- resetConnection();
-
- dispose(connectionProvider);
- connectionProvider = null;
-
- dispose(reactiveConnectionProvider);
- reactiveConnectionProvider = null;
-
- if (client != null) {
- try {
- Duration quietPeriod = clientConfiguration.getShutdownQuietPeriod();
- Duration timeout = clientConfiguration.getShutdownTimeout();
+ return this.clusterCommandExecutor;
+ }
- client.shutdown(quietPeriod.toMillis(), timeout.toMillis(), TimeUnit.MILLISECONDS);
- client = null;
- } catch (Exception e) {
+ /**
+ * Configures the {@link AsyncTaskExecutor executor} used to execute commands asynchronously across the cluster.
+ *
+ * @param executor {@link AsyncTaskExecutor executor} used to execute commands asynchronously across the cluster.
+ * @since 3.2
+ */
+ public void setExecutor(AsyncTaskExecutor executor) {
- if (log.isWarnEnabled()) {
- log.warn(ClassUtils.getShortName(client.getClass()) + " did not shut down gracefully.", e);
- }
- }
- }
- }
+ Assert.notNull(executor, "AsyncTaskExecutor must not be null");
- state.set(State.STOPPED);
+ this.executor = executor;
}
- @Override
- public int getPhase() {
- return phase;
+ /**
+ * Returns the current host.
+ *
+ * @return the host.
+ */
+ public String getHostName() {
+ return RedisConfiguration.getHostOrElse(configuration, standaloneConfig::getHostName);
}
/**
- * Specify the lifecycle phase for pausing and resuming this executor. The default is {@code 0}.
+ * Sets the hostname.
*
- * @since 3.2
- * @see SmartLifecycle#getPhase()
+ * @param hostName the hostname to set.
+ * @deprecated since 2.0, configure the hostname using {@link RedisStandaloneConfiguration}.
*/
- public void setPhase(int phase) {
- this.phase = phase;
+ @Deprecated
+ public void setHostName(String hostName) {
+ standaloneConfig.setHostName(hostName);
}
- @Override
- public boolean isRunning() {
- return State.STARTED.equals(state.get());
+ /**
+ * Returns the current port.
+ *
+ * @return the port.
+ */
+ public int getPort() {
+ return RedisConfiguration.getPortOrElse(configuration, standaloneConfig::getPort);
}
- @Override
- public void afterPropertiesSet() {
- if (isAutoStartup()) {
- start();
- }
+ /**
+ * Sets the port.
+ *
+ * @param port the port to set.
+ * @deprecated since 2.0, configure the port using {@link RedisStandaloneConfiguration}.
+ */
+ @Deprecated
+ public void setPort(int port) {
+ standaloneConfig.setPort(port);
}
- @Override
- public void destroy() {
+ /**
+ * Configures the flushing policy when using pipelining. If not set, defaults to
+ * {@link PipeliningFlushPolicy#flushEachCommand() flush on each command}.
+ *
+ * @param pipeliningFlushPolicy the flushing policy to control when commands get written to the Redis connection.
+ * @see LettuceConnection#openPipeline()
+ * @see StatefulRedisConnection#flushCommands()
+ * @since 2.3
+ */
+ public void setPipeliningFlushPolicy(PipeliningFlushPolicy pipeliningFlushPolicy) {
- stop();
- client = null;
- if (clusterCommandExecutor != null) {
+ Assert.notNull(pipeliningFlushPolicy, "PipeliningFlushingPolicy must not be null");
- try {
- clusterCommandExecutor.destroy();
- } catch (Exception ex) {
- log.warn("Cannot properly close cluster command executor", ex);
- }
- }
- state.set(State.DESTROYED);
+ this.pipeliningFlushPolicy = pipeliningFlushPolicy;
}
- private void dispose(@Nullable LettuceConnectionProvider connectionProvider) {
-
- if (connectionProvider instanceof DisposableBean) {
- try {
- ((DisposableBean) connectionProvider).destroy();
- } catch (Exception e) {
-
- if (log.isWarnEnabled()) {
- log.warn(connectionProvider + " did not shut down gracefully.", e);
- }
- }
- }
+ /**
+ * Returns the connection timeout (in milliseconds).
+ *
+ * @return connection timeout.
+ */
+ public long getTimeout() {
+ return getClientTimeout();
}
- @Override
- public RedisConnection getConnection() {
-
- assertStarted();
-
- if (isClusterAware()) {
- return getClusterConnection();
- }
+ /**
+ * Sets the connection timeout (in milliseconds).
+ *
+ * @param timeout the timeout.
+ * @deprecated since 2.0, configure the timeout using {@link LettuceClientConfiguration}.
+ * @throws IllegalStateException if {@link LettuceClientConfiguration} is immutable.
+ */
+ @Deprecated
+ public void setTimeout(long timeout) {
+ getMutableConfiguration().setTimeout(Duration.ofMillis(timeout));
+ }
- LettuceConnection connection = doCreateLettuceConnection(getSharedConnection(), connectionProvider, getTimeout(),
- getDatabase());
- connection.setConvertPipelineAndTxResults(convertPipelineAndTxResults);
- return connection;
- }
-
- @Override
- public RedisClusterConnection getClusterConnection() {
-
- assertStarted();
-
- if (!isClusterAware()) {
- throw new InvalidDataAccessApiUsageException("Cluster is not configured");
- }
-
- RedisClusterClient clusterClient = (RedisClusterClient) client;
-
- StatefulRedisClusterConnection sharedConnection = getSharedClusterConnection();
-
- LettuceClusterTopologyProvider topologyProvider = new LettuceClusterTopologyProvider(clusterClient);
- return doCreateLettuceClusterConnection(sharedConnection, connectionProvider, topologyProvider,
- clusterCommandExecutor, clientConfiguration.getCommandTimeout());
- }
-
- /**
- * Customization hook for {@link LettuceConnection} creation.
- *
- * @param sharedConnection the shared {@link StatefulRedisConnection} if {@link #getShareNativeConnection()} is
- * {@literal true}; {@literal null} otherwise.
- * @param connectionProvider the {@link LettuceConnectionProvider} to release connections.
- * @param timeout command timeout in {@link TimeUnit#MILLISECONDS}.
- * @param database database index to operate on.
- * @return the {@link LettuceConnection}.
- * @throws IllegalArgumentException if a required parameter is {@literal null}.
- * @since 2.2
- */
- protected LettuceConnection doCreateLettuceConnection(
- @Nullable StatefulRedisConnection sharedConnection, LettuceConnectionProvider connectionProvider,
- long timeout, int database) {
-
- LettuceConnection connection = new LettuceConnection(sharedConnection, connectionProvider, timeout, database);
- connection.setPipeliningFlushPolicy(this.pipeliningFlushPolicy);
-
- return connection;
- }
-
- /**
- * Customization hook for {@link LettuceClusterConnection} creation.
- *
- * @param sharedConnection the shared {@link StatefulRedisConnection} if {@link #getShareNativeConnection()} is
- * {@literal true}; {@literal null} otherwise.
- * @param connectionProvider the {@link LettuceConnectionProvider} to release connections.
- * @param topologyProvider the {@link ClusterTopologyProvider}.
- * @param clusterCommandExecutor the {@link ClusterCommandExecutor} to release connections.
- * @param commandTimeout command timeout {@link Duration}.
- * @return the {@link LettuceConnection}.
- * @throws IllegalArgumentException if a required parameter is {@literal null}.
- * @since 2.2
- */
- protected LettuceClusterConnection doCreateLettuceClusterConnection(
- @Nullable StatefulRedisClusterConnection sharedConnection,
- LettuceConnectionProvider connectionProvider, ClusterTopologyProvider topologyProvider,
- ClusterCommandExecutor clusterCommandExecutor, Duration commandTimeout) {
-
- LettuceClusterConnection connection = new LettuceClusterConnection(sharedConnection, connectionProvider,
- topologyProvider, clusterCommandExecutor, commandTimeout);
- connection.setPipeliningFlushPolicy(this.pipeliningFlushPolicy);
-
- return connection;
- }
-
- @Override
- public LettuceReactiveRedisConnection getReactiveConnection() {
-
- assertStarted();
-
- if (isClusterAware()) {
- return getReactiveClusterConnection();
- }
-
- return getShareNativeConnection()
- ? new LettuceReactiveRedisConnection(getSharedReactiveConnection(), reactiveConnectionProvider)
- : new LettuceReactiveRedisConnection(reactiveConnectionProvider);
- }
-
- @Override
- public LettuceReactiveRedisClusterConnection getReactiveClusterConnection() {
-
- assertStarted();
-
- if (!isClusterAware()) {
- throw new InvalidDataAccessApiUsageException("Cluster is not configured");
- }
-
- RedisClusterClient client = (RedisClusterClient) this.client;
-
- return getShareNativeConnection()
- ? new LettuceReactiveRedisClusterConnection(getSharedReactiveConnection(), reactiveConnectionProvider, client)
- : new LettuceReactiveRedisClusterConnection(reactiveConnectionProvider, client);
- }
-
- /**
- * Initialize the shared connection if {@link #getShareNativeConnection() native connection sharing} is enabled and
- * reset any previously existing connection.
- */
- public void initConnection() {
-
- resetConnection();
-
- if (isClusterAware()) {
- getSharedClusterConnection();
- } else {
- getSharedConnection();
- }
-
- getSharedReactiveConnection();
- }
-
- /**
- * Reset the underlying shared Connection, to be reinitialized on next access.
- */
- public void resetConnection() {
-
- Optionals.toStream(Optional.ofNullable(connection), Optional.ofNullable(reactiveConnection))
- .forEach(SharedConnection::resetConnection);
-
- synchronized (this.connectionMonitor) {
-
- this.connection = null;
- this.reactiveConnection = null;
- }
- }
-
- /**
- * Validate the shared connections and reinitialize if invalid.
- */
- public void validateConnection() {
-
- assertStarted();
-
- getOrCreateSharedConnection().validateConnection();
- getOrCreateSharedReactiveConnection().validateConnection();
- }
-
- private SharedConnection getOrCreateSharedConnection() {
-
- synchronized (this.connectionMonitor) {
-
- if (this.connection == null) {
- this.connection = new SharedConnection<>(connectionProvider);
- }
-
- return this.connection;
- }
- }
-
- private SharedConnection getOrCreateSharedReactiveConnection() {
-
- synchronized (this.connectionMonitor) {
-
- if (this.reactiveConnection == null) {
- this.reactiveConnection = new SharedConnection<>(reactiveConnectionProvider);
- }
-
- return this.reactiveConnection;
- }
- }
-
- @Override
- public DataAccessException translateExceptionIfPossible(RuntimeException ex) {
- return EXCEPTION_TRANSLATION.translate(ex);
- }
-
- /**
- * Returns the current host.
- *
- * @return the host.
- */
- public String getHostName() {
- return RedisConfiguration.getHostOrElse(configuration, standaloneConfig::getHostName);
- }
-
- /**
- * Sets the hostname.
- *
- * @param hostName the hostname to set.
- * @deprecated since 2.0, configure the hostname using {@link RedisStandaloneConfiguration}.
- */
- @Deprecated
- public void setHostName(String hostName) {
- standaloneConfig.setHostName(hostName);
- }
-
- /**
- * Returns the current port.
- *
- * @return the port.
- */
- public int getPort() {
- return RedisConfiguration.getPortOrElse(configuration, standaloneConfig::getPort);
- }
-
- /**
- * Sets the port.
- *
- * @param port the port to set.
- * @deprecated since 2.0, configure the port using {@link RedisStandaloneConfiguration}.
- */
- @Deprecated
- public void setPort(int port) {
- standaloneConfig.setPort(port);
- }
-
- /**
- * Configures the flushing policy when using pipelining. If not set, defaults to
- * {@link PipeliningFlushPolicy#flushEachCommand() flush on each command}.
- *
- * @param pipeliningFlushPolicy the flushing policy to control when commands get written to the Redis connection.
- * @see LettuceConnection#openPipeline()
- * @see StatefulRedisConnection#flushCommands()
- * @since 2.3
- */
- public void setPipeliningFlushPolicy(PipeliningFlushPolicy pipeliningFlushPolicy) {
-
- Assert.notNull(pipeliningFlushPolicy, "PipeliningFlushingPolicy must not be null");
-
- this.pipeliningFlushPolicy = pipeliningFlushPolicy;
- }
-
- /**
- * Returns the connection timeout (in milliseconds).
- *
- * @return connection timeout.
- */
- public long getTimeout() {
- return getClientTimeout();
- }
-
- /**
- * Sets the connection timeout (in milliseconds).
- *
- * @param timeout the timeout.
- * @deprecated since 2.0, configure the timeout using {@link LettuceClientConfiguration}.
- * @throws IllegalStateException if {@link LettuceClientConfiguration} is immutable.
- */
- @Deprecated
- public void setTimeout(long timeout) {
- getMutableConfiguration().setTimeout(Duration.ofMillis(timeout));
- }
-
- /**
- * Returns whether to use SSL.
- *
- * @return use of SSL.
- */
- public boolean isUseSsl() {
- return clientConfiguration.isUseSsl();
+ /**
+ * Returns whether to use SSL.
+ *
+ * @return use of SSL.
+ */
+ public boolean isUseSsl() {
+ return clientConfiguration.isUseSsl();
}
/**
@@ -809,7 +537,7 @@ public void setValidateConnection(boolean validateConnection) {
* @return native connection shared.
*/
public boolean getShareNativeConnection() {
- return shareNativeConnection;
+ return this.shareNativeConnection;
}
/**
@@ -832,7 +560,7 @@ public void setShareNativeConnection(boolean shareNativeConnection) {
* @since 2.2
*/
public boolean getEagerInitialization() {
- return eagerInitialization;
+ return this.eagerInitialization;
}
/**
@@ -865,7 +593,6 @@ public void setDatabase(int index) {
Assert.isTrue(index >= 0, "invalid DB index (a positive index required)");
if (RedisConfiguration.isDatabaseIndexAware(configuration)) {
-
((WithDatabaseIndex) configuration).setDatabase(index);
return;
}
@@ -932,191 +659,509 @@ public AbstractRedisClient getRequiredNativeClient() {
Assert.state(client != null, "Client not yet initialized; Did you forget to call initialize the bean");
- return client;
- }
+ return client;
+ }
+
+ @Nullable
+ private String getRedisUsername() {
+ return RedisConfiguration.getUsernameOrElse(configuration, standaloneConfig::getUsername);
+ }
+
+ /**
+ * Returns the password used for authenticating with the Redis server.
+ *
+ * @return password for authentication or {@literal null} if not set.
+ */
+ @Nullable
+ public String getPassword() {
+ return getRedisPassword().map(String::new).orElse(null);
+ }
+
+ private RedisPassword getRedisPassword() {
+ return RedisConfiguration.getPasswordOrElse(configuration, standaloneConfig::getPassword);
+ }
+
+ /**
+ * Sets the password used for authenticating with the Redis server.
+ *
+ * @param password the password to set
+ * @deprecated since 2.0, configure the password using {@link RedisStandaloneConfiguration},
+ * {@link RedisSentinelConfiguration} or {@link RedisClusterConfiguration}.
+ */
+ @Deprecated
+ public void setPassword(String password) {
+
+ if (RedisConfiguration.isAuthenticationAware(configuration)) {
+ ((WithPassword) configuration).setPassword(password);
+ return;
+ }
+
+ standaloneConfig.setPassword(RedisPassword.of(password));
+ }
+
+ /**
+ * Returns the shutdown timeout for shutting down the RedisClient (in milliseconds).
+ *
+ * @return shutdown timeout.
+ * @since 1.6
+ */
+ public long getShutdownTimeout() {
+ return clientConfiguration.getShutdownTimeout().toMillis();
+ }
+
+ /**
+ * Sets the shutdown timeout for shutting down the RedisClient (in milliseconds).
+ *
+ * @param shutdownTimeout the shutdown timeout.
+ * @since 1.6
+ * @deprecated since 2.0, configure the shutdown timeout using {@link LettuceClientConfiguration}.
+ * @throws IllegalStateException if {@link LettuceClientConfiguration} is immutable.
+ */
+ @Deprecated
+ public void setShutdownTimeout(long shutdownTimeout) {
+ getMutableConfiguration().setShutdownTimeout(Duration.ofMillis(shutdownTimeout));
+ }
+
+ /**
+ * Get the {@link ClientResources} to reuse infrastructure.
+ *
+ * @return {@literal null} if not set.
+ * @since 1.7
+ */
+ @Nullable
+ public ClientResources getClientResources() {
+ return clientConfiguration.getClientResources().orElse(null);
+ }
+
+ /**
+ * Sets the {@link ClientResources} to reuse the client infrastructure.
+ * Set to {@literal null} to not share resources.
+ *
+ * @param clientResources can be {@literal null}.
+ * @since 1.7
+ * @deprecated since 2.0, configure {@link ClientResources} using {@link LettuceClientConfiguration}.
+ * @throws IllegalStateException if {@link LettuceClientConfiguration} is immutable.
+ */
+ @Deprecated
+ public void setClientResources(ClientResources clientResources) {
+ getMutableConfiguration().setClientResources(clientResources);
+ }
+
+ /**
+ * @return the {@link LettuceClientConfiguration}.
+ * @since 2.0
+ */
+ public LettuceClientConfiguration getClientConfiguration() {
+ return this.clientConfiguration;
+ }
+
+ /**
+ * @return the {@link RedisStandaloneConfiguration}.
+ * @since 2.0
+ */
+ public RedisStandaloneConfiguration getStandaloneConfiguration() {
+ return this.standaloneConfig;
+ }
+
+ /**
+ * @return the {@link RedisSocketConfiguration} or {@literal null} if not set.
+ * @since 2.1
+ */
+ @Nullable
+ public RedisSocketConfiguration getSocketConfiguration() {
+ return isDomainSocketAware() ? (RedisSocketConfiguration) this.configuration : null;
+ }
+
+ /**
+ * @return the {@link RedisSentinelConfiguration}, may be {@literal null}.
+ * @since 2.0
+ */
+ @Nullable
+ public RedisSentinelConfiguration getSentinelConfiguration() {
+ return isRedisSentinelAware() ? (RedisSentinelConfiguration) this.configuration : null;
+ }
+
+ /**
+ * @return the {@link RedisClusterConfiguration}, may be {@literal null}.
+ * @since 2.0
+ */
+ @Nullable
+ public RedisClusterConfiguration getClusterConfiguration() {
+ return isClusterAware() ? (RedisClusterConfiguration) this.configuration : null;
+ }
+
+ /**
+ * Specifies if pipelined results should be converted to the expected data type. If {@code false}, results of
+ * {@link LettuceConnection#closePipeline()} and {LettuceConnection#exec()} will be of the type returned by the
+ * Lettuce driver.
+ *
+ * @return {@code true} to convert pipeline and transaction results; {@code false} otherwise.
+ */
+ @Override
+ public boolean getConvertPipelineAndTxResults() {
+ return convertPipelineAndTxResults;
+ }
+
+ /**
+ * Specifies if pipelined and transaction results should be converted to the expected data type. If {@code false},
+ * results of {@link LettuceConnection#closePipeline()} and {LettuceConnection#exec()} will be of the type returned by
+ * the Lettuce driver.
+ *
+ * @param convertPipelineAndTxResults {@code true} to convert pipeline and transaction results; {@code false}
+ * otherwise.
+ */
+ public void setConvertPipelineAndTxResults(boolean convertPipelineAndTxResults) {
+ this.convertPipelineAndTxResults = convertPipelineAndTxResults;
+ }
+
+ /**
+ * @return true when {@link RedisStaticMasterReplicaConfiguration} is present.
+ * @since 2.1
+ */
+ private boolean isStaticMasterReplicaAware() {
+ return RedisConfiguration.isStaticMasterReplicaConfiguration(configuration);
+ }
+
+ /**
+ * @return true when {@link RedisSentinelConfiguration} is present.
+ * @since 1.5
+ */
+ public boolean isRedisSentinelAware() {
+ return RedisConfiguration.isSentinelConfiguration(configuration);
+ }
+
+ /**
+ * @return true when {@link RedisSocketConfiguration} is present.
+ * @since 2.1
+ */
+ private boolean isDomainSocketAware() {
+ return RedisConfiguration.isDomainSocketConfiguration(configuration);
+ }
+
+ /**
+ * @return true when {@link RedisClusterConfiguration} is present.
+ * @since 1.7
+ */
+ public boolean isClusterAware() {
+ return RedisConfiguration.isClusterConfiguration(configuration);
+ }
+
+ @Override
+ public void start() {
+
+ State current = this.state.getAndUpdate(state -> isCreatedOrStopped(state) ? State.STARTING : state);
+
+ if (isCreatedOrStopped(current)) {
+
+ AbstractRedisClient client = createClient();
+ this.client = client;
+ LettuceConnectionProvider connectionProvider = new ExceptionTranslatingConnectionProvider(
+ createConnectionProvider(this.client, CODEC));
+ this.connectionProvider = connectionProvider;
+ this.reactiveConnectionProvider = new ExceptionTranslatingConnectionProvider(
+ createConnectionProvider(this.client, LettuceReactiveRedisConnection.CODEC));
+
+ if (isClusterAware()) {
+ this.clusterCommandExecutor = createClusterCommandExecutor((RedisClusterClient) client, connectionProvider);
+ }
+
+ this.state.set(State.STARTED);
+
+ if (getEagerInitialization() && getShareNativeConnection()) {
+ initConnection();
+ }
+ }
+ }
+
+ private static boolean isCreatedOrStopped(@Nullable State state) {
+ return State.CREATED.equals(state) || State.STOPPED.equals(state);
+ }
+
+ private ClusterCommandExecutor createClusterCommandExecutor(RedisClusterClient client,
+ LettuceConnectionProvider connectionProvider) {
+
+ return new ClusterCommandExecutor(new LettuceClusterTopologyProvider(client),
+ new LettuceClusterConnection.LettuceClusterNodeResourceProvider(connectionProvider), EXCEPTION_TRANSLATION,
+ this.executor);
+ }
+
+ @Override
+ public void stop() {
+
+ if (state.compareAndSet(State.STARTED, State.STOPPING)) {
+
+ resetConnection();
+
+ dispose(connectionProvider);
+ connectionProvider = null;
+
+ dispose(reactiveConnectionProvider);
+ reactiveConnectionProvider = null;
+
+ if (client != null) {
+
+ try {
+ Duration quietPeriod = clientConfiguration.getShutdownQuietPeriod();
+ Duration timeout = clientConfiguration.getShutdownTimeout();
+
+ client.shutdown(quietPeriod.toMillis(), timeout.toMillis(), TimeUnit.MILLISECONDS);
+ client = null;
+ } catch (Exception cause) {
+ if (log.isWarnEnabled()) {
+ log.warn(ClassUtils.getShortName(client.getClass()) + " did not shut down gracefully.", cause);
+ }
+ }
+ }
+ }
+
+ state.set(State.STOPPED);
+ }
+
+ @Override
+ public int getPhase() {
+ return phase;
+ }
+
+ /**
+ * Specify the lifecycle phase for pausing and resuming this executor. The default is {@code 0}.
+ *
+ * @since 3.2
+ * @see SmartLifecycle#getPhase()
+ */
+ public void setPhase(int phase) {
+ this.phase = phase;
+ }
+
+ @Override
+ public boolean isRunning() {
+ return State.STARTED.equals(this.state.get());
+ }
+
+ @Override
+ public void afterPropertiesSet() {
+ if (isAutoStartup()) {
+ start();
+ }
+ }
+
+ @Override
+ public void destroy() {
+
+ stop();
+ this.client = null;
+
+ ClusterCommandExecutor clusterCommandExecutor = this.clusterCommandExecutor;
+
+ if (clusterCommandExecutor != null) {
+ try {
+ clusterCommandExecutor.destroy();
+ this.clusterCommandExecutor = null;
+ } catch (Exception cause) {
+ log.warn("Cannot properly close cluster command executor", cause);
+ }
+ }
+
+ this.state.set(State.DESTROYED);
+ }
+
+ private void dispose(@Nullable LettuceConnectionProvider connectionProvider) {
+
+ if (connectionProvider instanceof DisposableBean disposableBean) {
+ try {
+ disposableBean.destroy();
+ } catch (Exception cause) {
+ if (log.isWarnEnabled()) {
+ log.warn(connectionProvider + " did not shut down gracefully.", cause);
+ }
+ }
+ }
+ }
+
+ @Override
+ public RedisConnection getConnection() {
+
+ assertStarted();
+
+ if (isClusterAware()) {
+ return getClusterConnection();
+ }
+
+ LettuceConnection connection = doCreateLettuceConnection(getSharedConnection(), connectionProvider, getTimeout(),
+ getDatabase());
+
+ connection.setConvertPipelineAndTxResults(this.convertPipelineAndTxResults);
+
+ return connection;
+ }
+
+ @Override
+ public RedisClusterConnection getClusterConnection() {
+
+ assertStarted();
+
+ if (!isClusterAware()) {
+ throw new InvalidDataAccessApiUsageException("Cluster is not configured");
+ }
+
+ RedisClusterClient clusterClient = (RedisClusterClient) client;
+
+ StatefulRedisClusterConnection sharedConnection = getSharedClusterConnection();
+
+ LettuceClusterTopologyProvider topologyProvider = new LettuceClusterTopologyProvider(clusterClient);
- @Nullable
- private String getRedisUsername() {
- return RedisConfiguration.getUsernameOrElse(configuration, standaloneConfig::getUsername);
+ return doCreateLettuceClusterConnection(sharedConnection, this.connectionProvider, topologyProvider,
+ getRequiredClusterCommandExecutor(), this.clientConfiguration.getCommandTimeout());
}
/**
- * Returns the password used for authenticating with the Redis server.
+ * Customization hook for {@link LettuceConnection} creation.
*
- * @return password for authentication or {@literal null} if not set.
+ * @param sharedConnection the shared {@link StatefulRedisConnection} if {@link #getShareNativeConnection()} is
+ * {@literal true}; {@literal null} otherwise.
+ * @param connectionProvider the {@link LettuceConnectionProvider} to release connections.
+ * @param timeout command timeout in {@link TimeUnit#MILLISECONDS}.
+ * @param database database index to operate on.
+ * @return the {@link LettuceConnection}.
+ * @throws IllegalArgumentException if a required parameter is {@literal null}.
+ * @since 2.2
*/
- @Nullable
- public String getPassword() {
- return getRedisPassword().map(String::new).orElse(null);
- }
+ protected LettuceConnection doCreateLettuceConnection(
+ @Nullable StatefulRedisConnection sharedConnection, LettuceConnectionProvider connectionProvider,
+ long timeout, int database) {
- private RedisPassword getRedisPassword() {
- return RedisConfiguration.getPasswordOrElse(configuration, standaloneConfig::getPassword);
+ LettuceConnection connection = new LettuceConnection(sharedConnection, connectionProvider, timeout, database);
+
+ connection.setPipeliningFlushPolicy(this.pipeliningFlushPolicy);
+
+ return connection;
}
/**
- * Sets the password used for authenticating with the Redis server.
+ * Customization hook for {@link LettuceClusterConnection} creation.
*
- * @param password the password to set
- * @deprecated since 2.0, configure the password using {@link RedisStandaloneConfiguration},
- * {@link RedisSentinelConfiguration} or {@link RedisClusterConfiguration}.
+ * @param sharedConnection the shared {@link StatefulRedisConnection} if {@link #getShareNativeConnection()} is
+ * {@literal true}; {@literal null} otherwise.
+ * @param connectionProvider the {@link LettuceConnectionProvider} to release connections.
+ * @param topologyProvider the {@link ClusterTopologyProvider}.
+ * @param clusterCommandExecutor the {@link ClusterCommandExecutor} to release connections.
+ * @param commandTimeout command timeout {@link Duration}.
+ * @return the {@link LettuceConnection}.
+ * @throws IllegalArgumentException if a required parameter is {@literal null}.
+ * @since 2.2
*/
- @Deprecated
- public void setPassword(String password) {
+ protected LettuceClusterConnection doCreateLettuceClusterConnection(
+ @Nullable StatefulRedisClusterConnection