Skip to content

Refactor Jedis and Lettuce RedisConnectionFactory to SmartLifecycle beans #2627

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>3.2.0-SNAPSHOT</version>
<version>3.2.x-2503-SNAPSHOT</version>

<name>Spring Data Redis</name>
<description>Spring Data module for Redis</description>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package org.springframework.data.redis.connection.jedis;

import org.springframework.context.SmartLifecycle;
import redis.clients.jedis.Connection;
import redis.clients.jedis.DefaultJedisClientConfig;
import redis.clients.jedis.HostAndPort;
Expand All @@ -34,6 +35,7 @@
import java.util.LinkedHashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;

import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLParameters;
Expand Down Expand Up @@ -85,7 +87,7 @@
* @see JedisClientConfiguration
* @see Jedis
*/
public class JedisConnectionFactory implements InitializingBean, DisposableBean, RedisConnectionFactory {
public class JedisConnectionFactory implements RedisConnectionFactory, InitializingBean, DisposableBean, SmartLifecycle {

private final static Log log = LogFactory.getLog(JedisConnectionFactory.class);
private static final ExceptionTranslationStrategy EXCEPTION_TRANSLATION = new PassThroughExceptionTranslationStrategy(
Expand All @@ -104,8 +106,10 @@ public class JedisConnectionFactory implements InitializingBean, DisposableBean,
private @Nullable ClusterTopologyProvider topologyProvider;
private @Nullable ClusterCommandExecutor clusterCommandExecutor;

private boolean initialized;
private boolean destroyed;
enum State {
CREATED, STARTING, STARTED, STOPPING, STOPPED, DESTROYED;
}
private AtomicReference<State> state = new AtomicReference<>(State.CREATED);

/**
* Constructs a new {@link JedisConnectionFactory} instance with default settings (default connection pooling).
Expand Down Expand Up @@ -287,24 +291,65 @@ protected JedisConnection postProcessConnection(JedisConnection connection) {
return connection;
}

public void afterPropertiesSet() {
@Override
public void start() {

clientConfig = createClientConfig(getDatabase(), getRedisUsername(), getRedisPassword());
State current = state.getAndUpdate(state -> {
if (State.CREATED.equals(state) || State.STOPPED.equals(state)) {
return State.STARTING;
}
return state;
});

if (State.CREATED.equals(current) || State.STOPPED.equals(current)) {

if (getUsePool() && !isRedisClusterAware()) {
this.pool = createPool();
if (getUsePool() && !isRedisClusterAware()) {
this.pool = createPool();
}

if (isRedisClusterAware()) {

this.cluster = createCluster();
this.topologyProvider = createTopologyProvider(this.cluster);
this.clusterCommandExecutor = new ClusterCommandExecutor(this.topologyProvider,
new JedisClusterConnection.JedisClusterNodeResourceProvider(this.cluster, this.topologyProvider),
EXCEPTION_TRANSLATION);
}

state.set(State.STARTED);
}
}

if (isRedisClusterAware()) {
@Override
public void stop() {

if (state.compareAndSet(State.STARTED, State.STOPPING)) {
if (getUsePool() && !isRedisClusterAware()) {
this.pool.close();
this.pool = null;
}

this.cluster = createCluster();
this.topologyProvider = createTopologyProvider(this.cluster);
this.clusterCommandExecutor = new ClusterCommandExecutor(this.topologyProvider,
new JedisClusterConnection.JedisClusterNodeResourceProvider(this.cluster, this.topologyProvider),
EXCEPTION_TRANSLATION);
if (isRedisClusterAware()) {
try {
this.clusterCommandExecutor.destroy();
} catch (Exception e) {
throw new RuntimeException(e);
}
this.topologyProvider = null;
this.cluster.close();
}
state.set(State.STOPPED);
}
}

this.initialized = true;
@Override
public boolean isRunning() {
return State.STARTED.equals(state.get());
}

@Override
public void afterPropertiesSet() {
clientConfig = createClientConfig(getDatabase(), getRedisUsername(), getRedisPassword());
}

JedisClientConfig createSentinelClientConfig(SentinelConfiguration sentinelConfiguration) {
Expand Down Expand Up @@ -415,6 +460,8 @@ protected JedisCluster createCluster(RedisClusterConfiguration clusterConfig,

public void destroy() {

state.set(State.STOPPING);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to call stop as we have quite some duplications?


if (getUsePool() && pool != null) {

try {
Expand All @@ -440,7 +487,7 @@ public void destroy() {
}
}

this.destroyed = true;
state.set(State.DESTROYED);
}

public RedisConnection getConnection() {
Expand Down Expand Up @@ -866,8 +913,8 @@ private MutableJedisClientConfiguration getMutableConfiguration() {
}

private void assertInitialized() {
Assert.state(this.initialized, "JedisConnectionFactory was not initialized through afterPropertiesSet()");
Assert.state(!this.destroyed, "JedisConnectionFactory was destroyed and cannot be used anymore");
Assert.state(State.STARTED.equals(state.get()), "JedisConnectionFactory was not initialized through afterPropertiesSet()");
Assert.state(!State.STOPPED.equals(state.get()), "JedisConnectionFactory was destroyed and cannot be used anymore");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might want to reflect the lifecycle in the messages, as we want to allow starting after stopping, right?

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.SmartLifecycle;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.InvalidDataAccessApiUsageException;
import org.springframework.data.redis.ExceptionTranslationStrategy;
Expand Down Expand Up @@ -116,8 +118,8 @@
* @author Andrea Como
* @author Chris Bono
*/
public class LettuceConnectionFactory
implements InitializingBean, DisposableBean, RedisConnectionFactory, ReactiveRedisConnectionFactory {
public class LettuceConnectionFactory implements RedisConnectionFactory, ReactiveRedisConnectionFactory,
InitializingBean, DisposableBean, SmartLifecycle {

private static final ExceptionTranslationStrategy EXCEPTION_TRANSLATION = new PassThroughExceptionTranslationStrategy(
LettuceExceptionConverter.INSTANCE);
Expand All @@ -144,8 +146,11 @@ public class LettuceConnectionFactory

private @Nullable ClusterCommandExecutor clusterCommandExecutor;

private boolean initialized;
private boolean destroyed;
enum State {
CREATED, STARTING, STARTED, STOPPING, STOPPED, DESTROYED;
}

private AtomicReference<State> state = new AtomicReference<>(State.CREATED);

/**
* Constructs a new {@link LettuceConnectionFactory} instance with default settings.
Expand Down Expand Up @@ -333,58 +338,106 @@ public static RedisConfiguration createRedisConfiguration(RedisURI redisUri) {
return LettuceConverters.createRedisStandaloneConfiguration(redisUri);
}

public void afterPropertiesSet() {
@Override
public void start() {

State current = state.getAndUpdate(state -> {
if (State.CREATED.equals(state) || State.STOPPED.equals(state)) {
return State.STARTING;
}
return state;
});

this.client = createClient();
if (State.CREATED.equals(current) || State.STOPPED.equals(current)) {

this.connectionProvider = new ExceptionTranslatingConnectionProvider(createConnectionProvider(client, CODEC));
this.reactiveConnectionProvider = new ExceptionTranslatingConnectionProvider(
createConnectionProvider(client, LettuceReactiveRedisConnection.CODEC));
this.client = createClient();

if (isClusterAware()) {
this.connectionProvider = new ExceptionTranslatingConnectionProvider(createConnectionProvider(client, CODEC));
this.reactiveConnectionProvider = new ExceptionTranslatingConnectionProvider(
createConnectionProvider(client, LettuceReactiveRedisConnection.CODEC));

this.clusterCommandExecutor = new ClusterCommandExecutor(
new LettuceClusterTopologyProvider((RedisClusterClient) client),
new LettuceClusterConnection.LettuceClusterNodeResourceProvider(this.connectionProvider),
EXCEPTION_TRANSLATION);
}
if (isClusterAware()) {

this.clusterCommandExecutor = new ClusterCommandExecutor(
new LettuceClusterTopologyProvider((RedisClusterClient) client),
new LettuceClusterConnection.LettuceClusterNodeResourceProvider(this.connectionProvider),
EXCEPTION_TRANSLATION);
}

state.set(State.STARTED);

this.initialized = true;
if (getEagerInitialization() && getShareNativeConnection()) {
initConnection();
}
}
}

if (getEagerInitialization() && getShareNativeConnection()) {
initConnection();
@Override
public void stop() {

if (state.compareAndSet(State.STARTED, State.STOPPING)) {
resetConnection();
dispose(connectionProvider);
dispose(reactiveConnectionProvider);
this.client.close();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should consider the shutdown timeout to prevent long shutdown times.

state.set(State.STOPPED);
}
}

@Override
public boolean isRunning() {
return State.STARTED.equals(state.get());
}

@Override
public void afterPropertiesSet() {
// customization hook. initialization happens in start
}

@Override
public void destroy() {

resetConnection();
if (State.STOPPED.equals(state.get())) {
if (clusterCommandExecutor != null) {

if (clusterCommandExecutor != null) {
try {
clusterCommandExecutor.destroy();
} catch (Exception ex) {
log.warn("Cannot properly close cluster command executor", ex);
}
}
} else if (state.compareAndSet(State.STARTED, State.STOPPING)) {

try {
clusterCommandExecutor.destroy();
} catch (Exception ex) {
log.warn("Cannot properly close cluster command executor", ex);
resetConnection();

if (clusterCommandExecutor != null) {

try {
clusterCommandExecutor.destroy();
} catch (Exception ex) {
log.warn("Cannot properly close cluster command executor", ex);
}
}
}

dispose(connectionProvider);
dispose(reactiveConnectionProvider);
dispose(connectionProvider);
dispose(reactiveConnectionProvider);

try {
Duration quietPeriod = clientConfiguration.getShutdownQuietPeriod();
Duration timeout = clientConfiguration.getShutdownTimeout();
client.shutdown(quietPeriod.toMillis(), timeout.toMillis(), TimeUnit.MILLISECONDS);
} catch (Exception e) {
try {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That code could go into stop and upon destroy it would be neat to reuse what stop does.

Duration quietPeriod = clientConfiguration.getShutdownQuietPeriod();
Duration timeout = clientConfiguration.getShutdownTimeout();
client.shutdown(quietPeriod.toMillis(), timeout.toMillis(), TimeUnit.MILLISECONDS);
state.set(State.STOPPED);
} catch (Exception e) {

if (log.isWarnEnabled()) {
log.warn((client != null ? ClassUtils.getShortName(client.getClass()) : "LettuceClient")
+ " did not shut down gracefully.", e);
if (log.isWarnEnabled()) {
log.warn((client != null ? ClassUtils.getShortName(client.getClass()) : "LettuceClient")
+ " did not shut down gracefully.", e);
}
}
client = null;
}
state.set(State.DESTROYED);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


this.destroyed = true;
}

private void dispose(LettuceConnectionProvider connectionProvider) {
Expand Down Expand Up @@ -532,8 +585,6 @@ public void initConnection() {
*/
public void resetConnection() {

assertInitialized();

Optionals.toStream(Optional.ofNullable(connection), Optional.ofNullable(reactiveConnection))
.forEach(SharedConnection::resetConnection);

Expand Down Expand Up @@ -1267,8 +1318,12 @@ private RedisClient createBasicClient() {
}

private void assertInitialized() {
Assert.state(this.initialized, "LettuceConnectionFactory was not initialized through afterPropertiesSet()");
Assert.state(!this.destroyed, "LettuceConnectionFactory was destroyed and cannot be used anymore");

Assert.state(State.STARTED.equals(state.get()),
"LettuceConnectionFactory was not initialized through afterPropertiesSet()");
Assert.state(!State.STOPPED.equals(state.get()),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should reword the messages to reflect started/stopped state. Having a check for destroyed it good as well.

"LettuceConnectionFactory was destroyed and cannot be used anymore");

}

private static void applyToAll(RedisURI source, Consumer<RedisURI> action) {
Expand Down
Loading