-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Refactor Jedis and Lettuce RedisConnectionFactory
to SmartLifecycle
beans
#2627
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,6 +15,7 @@ | |
*/ | ||
package org.springframework.data.redis.connection.jedis; | ||
|
||
import org.springframework.context.SmartLifecycle; | ||
import redis.clients.jedis.Connection; | ||
import redis.clients.jedis.DefaultJedisClientConfig; | ||
import redis.clients.jedis.HostAndPort; | ||
|
@@ -34,6 +35,7 @@ | |
import java.util.LinkedHashSet; | ||
import java.util.Optional; | ||
import java.util.Set; | ||
import java.util.concurrent.atomic.AtomicReference; | ||
|
||
import javax.net.ssl.HostnameVerifier; | ||
import javax.net.ssl.SSLParameters; | ||
|
@@ -85,7 +87,7 @@ | |
* @see JedisClientConfiguration | ||
* @see Jedis | ||
*/ | ||
public class JedisConnectionFactory implements InitializingBean, DisposableBean, RedisConnectionFactory { | ||
public class JedisConnectionFactory implements RedisConnectionFactory, InitializingBean, DisposableBean, SmartLifecycle { | ||
|
||
private final static Log log = LogFactory.getLog(JedisConnectionFactory.class); | ||
private static final ExceptionTranslationStrategy EXCEPTION_TRANSLATION = new PassThroughExceptionTranslationStrategy( | ||
|
@@ -104,8 +106,10 @@ public class JedisConnectionFactory implements InitializingBean, DisposableBean, | |
private @Nullable ClusterTopologyProvider topologyProvider; | ||
private @Nullable ClusterCommandExecutor clusterCommandExecutor; | ||
|
||
private boolean initialized; | ||
private boolean destroyed; | ||
enum State { | ||
CREATED, STARTING, STARTED, STOPPING, STOPPED, DESTROYED; | ||
} | ||
private AtomicReference<State> state = new AtomicReference<>(State.CREATED); | ||
|
||
/** | ||
* Constructs a new {@link JedisConnectionFactory} instance with default settings (default connection pooling). | ||
|
@@ -287,24 +291,65 @@ protected JedisConnection postProcessConnection(JedisConnection connection) { | |
return connection; | ||
} | ||
|
||
public void afterPropertiesSet() { | ||
@Override | ||
public void start() { | ||
|
||
clientConfig = createClientConfig(getDatabase(), getRedisUsername(), getRedisPassword()); | ||
State current = state.getAndUpdate(state -> { | ||
if (State.CREATED.equals(state) || State.STOPPED.equals(state)) { | ||
return State.STARTING; | ||
} | ||
return state; | ||
}); | ||
|
||
if (State.CREATED.equals(current) || State.STOPPED.equals(current)) { | ||
|
||
if (getUsePool() && !isRedisClusterAware()) { | ||
this.pool = createPool(); | ||
if (getUsePool() && !isRedisClusterAware()) { | ||
this.pool = createPool(); | ||
} | ||
|
||
if (isRedisClusterAware()) { | ||
|
||
this.cluster = createCluster(); | ||
this.topologyProvider = createTopologyProvider(this.cluster); | ||
this.clusterCommandExecutor = new ClusterCommandExecutor(this.topologyProvider, | ||
new JedisClusterConnection.JedisClusterNodeResourceProvider(this.cluster, this.topologyProvider), | ||
EXCEPTION_TRANSLATION); | ||
} | ||
|
||
state.set(State.STARTED); | ||
} | ||
} | ||
|
||
if (isRedisClusterAware()) { | ||
@Override | ||
public void stop() { | ||
|
||
if (state.compareAndSet(State.STARTED, State.STOPPING)) { | ||
if (getUsePool() && !isRedisClusterAware()) { | ||
this.pool.close(); | ||
this.pool = null; | ||
} | ||
|
||
this.cluster = createCluster(); | ||
this.topologyProvider = createTopologyProvider(this.cluster); | ||
this.clusterCommandExecutor = new ClusterCommandExecutor(this.topologyProvider, | ||
new JedisClusterConnection.JedisClusterNodeResourceProvider(this.cluster, this.topologyProvider), | ||
EXCEPTION_TRANSLATION); | ||
if (isRedisClusterAware()) { | ||
try { | ||
this.clusterCommandExecutor.destroy(); | ||
} catch (Exception e) { | ||
throw new RuntimeException(e); | ||
} | ||
this.topologyProvider = null; | ||
this.cluster.close(); | ||
} | ||
state.set(State.STOPPED); | ||
} | ||
} | ||
|
||
this.initialized = true; | ||
@Override | ||
public boolean isRunning() { | ||
return State.STARTED.equals(state.get()); | ||
} | ||
|
||
@Override | ||
public void afterPropertiesSet() { | ||
clientConfig = createClientConfig(getDatabase(), getRedisUsername(), getRedisPassword()); | ||
} | ||
|
||
JedisClientConfig createSentinelClientConfig(SentinelConfiguration sentinelConfiguration) { | ||
|
@@ -415,6 +460,8 @@ protected JedisCluster createCluster(RedisClusterConfiguration clusterConfig, | |
|
||
public void destroy() { | ||
|
||
state.set(State.STOPPING); | ||
|
||
if (getUsePool() && pool != null) { | ||
|
||
try { | ||
|
@@ -440,7 +487,7 @@ public void destroy() { | |
} | ||
} | ||
|
||
this.destroyed = true; | ||
state.set(State.DESTROYED); | ||
} | ||
|
||
public RedisConnection getConnection() { | ||
|
@@ -866,8 +913,8 @@ private MutableJedisClientConfiguration getMutableConfiguration() { | |
} | ||
|
||
private void assertInitialized() { | ||
Assert.state(this.initialized, "JedisConnectionFactory was not initialized through afterPropertiesSet()"); | ||
Assert.state(!this.destroyed, "JedisConnectionFactory was destroyed and cannot be used anymore"); | ||
Assert.state(State.STARTED.equals(state.get()), "JedisConnectionFactory was not initialized through afterPropertiesSet()"); | ||
Assert.state(!State.STOPPED.equals(state.get()), "JedisConnectionFactory was destroyed and cannot be used anymore"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We might want to reflect the lifecycle in the messages, as we want to allow starting after stopping, right? |
||
} | ||
|
||
/** | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,11 +26,13 @@ | |
import java.util.concurrent.CompletableFuture; | ||
import java.util.concurrent.CompletionStage; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.atomic.AtomicReference; | ||
import java.util.function.Consumer; | ||
import java.util.stream.Collectors; | ||
|
||
import org.springframework.beans.factory.DisposableBean; | ||
import org.springframework.beans.factory.InitializingBean; | ||
import org.springframework.context.SmartLifecycle; | ||
import org.springframework.dao.DataAccessException; | ||
import org.springframework.dao.InvalidDataAccessApiUsageException; | ||
import org.springframework.data.redis.ExceptionTranslationStrategy; | ||
|
@@ -116,8 +118,8 @@ | |
* @author Andrea Como | ||
* @author Chris Bono | ||
*/ | ||
public class LettuceConnectionFactory | ||
implements InitializingBean, DisposableBean, RedisConnectionFactory, ReactiveRedisConnectionFactory { | ||
public class LettuceConnectionFactory implements RedisConnectionFactory, ReactiveRedisConnectionFactory, | ||
InitializingBean, DisposableBean, SmartLifecycle { | ||
|
||
private static final ExceptionTranslationStrategy EXCEPTION_TRANSLATION = new PassThroughExceptionTranslationStrategy( | ||
LettuceExceptionConverter.INSTANCE); | ||
|
@@ -144,8 +146,11 @@ public class LettuceConnectionFactory | |
|
||
private @Nullable ClusterCommandExecutor clusterCommandExecutor; | ||
|
||
private boolean initialized; | ||
private boolean destroyed; | ||
enum State { | ||
CREATED, STARTING, STARTED, STOPPING, STOPPED, DESTROYED; | ||
} | ||
|
||
private AtomicReference<State> state = new AtomicReference<>(State.CREATED); | ||
|
||
/** | ||
* Constructs a new {@link LettuceConnectionFactory} instance with default settings. | ||
|
@@ -333,58 +338,106 @@ public static RedisConfiguration createRedisConfiguration(RedisURI redisUri) { | |
return LettuceConverters.createRedisStandaloneConfiguration(redisUri); | ||
} | ||
|
||
public void afterPropertiesSet() { | ||
@Override | ||
public void start() { | ||
|
||
State current = state.getAndUpdate(state -> { | ||
if (State.CREATED.equals(state) || State.STOPPED.equals(state)) { | ||
return State.STARTING; | ||
} | ||
return state; | ||
}); | ||
|
||
this.client = createClient(); | ||
if (State.CREATED.equals(current) || State.STOPPED.equals(current)) { | ||
|
||
this.connectionProvider = new ExceptionTranslatingConnectionProvider(createConnectionProvider(client, CODEC)); | ||
this.reactiveConnectionProvider = new ExceptionTranslatingConnectionProvider( | ||
createConnectionProvider(client, LettuceReactiveRedisConnection.CODEC)); | ||
this.client = createClient(); | ||
|
||
if (isClusterAware()) { | ||
this.connectionProvider = new ExceptionTranslatingConnectionProvider(createConnectionProvider(client, CODEC)); | ||
this.reactiveConnectionProvider = new ExceptionTranslatingConnectionProvider( | ||
createConnectionProvider(client, LettuceReactiveRedisConnection.CODEC)); | ||
|
||
this.clusterCommandExecutor = new ClusterCommandExecutor( | ||
new LettuceClusterTopologyProvider((RedisClusterClient) client), | ||
new LettuceClusterConnection.LettuceClusterNodeResourceProvider(this.connectionProvider), | ||
EXCEPTION_TRANSLATION); | ||
} | ||
if (isClusterAware()) { | ||
|
||
this.clusterCommandExecutor = new ClusterCommandExecutor( | ||
new LettuceClusterTopologyProvider((RedisClusterClient) client), | ||
new LettuceClusterConnection.LettuceClusterNodeResourceProvider(this.connectionProvider), | ||
EXCEPTION_TRANSLATION); | ||
} | ||
|
||
state.set(State.STARTED); | ||
|
||
this.initialized = true; | ||
if (getEagerInitialization() && getShareNativeConnection()) { | ||
initConnection(); | ||
} | ||
} | ||
} | ||
|
||
if (getEagerInitialization() && getShareNativeConnection()) { | ||
initConnection(); | ||
@Override | ||
public void stop() { | ||
|
||
if (state.compareAndSet(State.STARTED, State.STOPPING)) { | ||
resetConnection(); | ||
dispose(connectionProvider); | ||
dispose(reactiveConnectionProvider); | ||
this.client.close(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we should consider the shutdown timeout to prevent long shutdown times. |
||
state.set(State.STOPPED); | ||
} | ||
} | ||
|
||
@Override | ||
public boolean isRunning() { | ||
return State.STARTED.equals(state.get()); | ||
} | ||
|
||
@Override | ||
public void afterPropertiesSet() { | ||
// customization hook. initialization happens in start | ||
} | ||
|
||
@Override | ||
public void destroy() { | ||
|
||
resetConnection(); | ||
if (State.STOPPED.equals(state.get())) { | ||
if (clusterCommandExecutor != null) { | ||
|
||
if (clusterCommandExecutor != null) { | ||
try { | ||
clusterCommandExecutor.destroy(); | ||
} catch (Exception ex) { | ||
log.warn("Cannot properly close cluster command executor", ex); | ||
} | ||
} | ||
} else if (state.compareAndSet(State.STARTED, State.STOPPING)) { | ||
|
||
try { | ||
clusterCommandExecutor.destroy(); | ||
} catch (Exception ex) { | ||
log.warn("Cannot properly close cluster command executor", ex); | ||
resetConnection(); | ||
|
||
if (clusterCommandExecutor != null) { | ||
|
||
try { | ||
clusterCommandExecutor.destroy(); | ||
} catch (Exception ex) { | ||
log.warn("Cannot properly close cluster command executor", ex); | ||
} | ||
} | ||
} | ||
|
||
dispose(connectionProvider); | ||
dispose(reactiveConnectionProvider); | ||
dispose(connectionProvider); | ||
dispose(reactiveConnectionProvider); | ||
|
||
try { | ||
Duration quietPeriod = clientConfiguration.getShutdownQuietPeriod(); | ||
Duration timeout = clientConfiguration.getShutdownTimeout(); | ||
client.shutdown(quietPeriod.toMillis(), timeout.toMillis(), TimeUnit.MILLISECONDS); | ||
} catch (Exception e) { | ||
try { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That code could go into |
||
Duration quietPeriod = clientConfiguration.getShutdownQuietPeriod(); | ||
Duration timeout = clientConfiguration.getShutdownTimeout(); | ||
client.shutdown(quietPeriod.toMillis(), timeout.toMillis(), TimeUnit.MILLISECONDS); | ||
state.set(State.STOPPED); | ||
} catch (Exception e) { | ||
|
||
if (log.isWarnEnabled()) { | ||
log.warn((client != null ? ClassUtils.getShortName(client.getClass()) : "LettuceClient") | ||
+ " did not shut down gracefully.", e); | ||
if (log.isWarnEnabled()) { | ||
log.warn((client != null ? ClassUtils.getShortName(client.getClass()) : "LettuceClient") | ||
+ " did not shut down gracefully.", e); | ||
} | ||
} | ||
client = null; | ||
} | ||
state.set(State.DESTROYED); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
|
||
this.destroyed = true; | ||
} | ||
|
||
private void dispose(LettuceConnectionProvider connectionProvider) { | ||
|
@@ -532,8 +585,6 @@ public void initConnection() { | |
*/ | ||
public void resetConnection() { | ||
|
||
assertInitialized(); | ||
|
||
Optionals.toStream(Optional.ofNullable(connection), Optional.ofNullable(reactiveConnection)) | ||
.forEach(SharedConnection::resetConnection); | ||
|
||
|
@@ -1267,8 +1318,12 @@ private RedisClient createBasicClient() { | |
} | ||
|
||
private void assertInitialized() { | ||
Assert.state(this.initialized, "LettuceConnectionFactory was not initialized through afterPropertiesSet()"); | ||
Assert.state(!this.destroyed, "LettuceConnectionFactory was destroyed and cannot be used anymore"); | ||
|
||
Assert.state(State.STARTED.equals(state.get()), | ||
"LettuceConnectionFactory was not initialized through afterPropertiesSet()"); | ||
Assert.state(!State.STOPPED.equals(state.get()), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should reword the messages to reflect started/stopped state. Having a check for destroyed it good as well. |
||
"LettuceConnectionFactory was destroyed and cannot be used anymore"); | ||
|
||
} | ||
|
||
private static void applyToAll(RedisURI source, Consumer<RedisURI> action) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense to call
stop
as we have quite some duplications?