Skip to content

Fix redundant disposal in LettucePoolingConnectionProvider by implementing SmartLifecycle #3100 #3164

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@
* @author Chris Bono
* @author John Blum
* @author Zhian Chen
* @author UHyeon Jeong
*/
public class LettuceConnectionFactory implements RedisConnectionFactory, ReactiveRedisConnectionFactory,
InitializingBean, DisposableBean, SmartLifecycle {
Expand Down Expand Up @@ -979,6 +980,9 @@ public void stop() {
dispose(reactiveConnectionProvider);
reactiveConnectionProvider = null;

dispose(clusterCommandExecutor);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In these changes, ClusterCommandExecutor is being disposed after disposing the connection provider and so there's no behavioral change. Shouldn't clusterCommandExecutor be disposed (and thus its shared connection returned to the pool) before disposing connectionProvider?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You’re right that there’s no behavioral change in ClusterCommandExecutor itself. However, despite other resources at LettuceConnectionFactory are disposed at stop, only ClusterCommandExecutor disposed at destroy method. I was concerned that if we leave the dispose call in destroy, it could be executed twice in the future, as LettucePoolingConnectionProvider did. Since the stop method uses an AtomicReference guard, this change ensures safe and idempotent disposal.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And I believe the order of disposing connectionProvider doesn’t affect behavior in this case.

Below are the previous implementations of LettucePoolingConnectionProvider#destroy, #release As you can see, this method fully clears all pools and references on the first invocation. If it were called again, it could potentially throw exceptions or behave unexpectedly due to already-released resources.

    @Override
	public void destroy() throws Exception {

		List<CompletableFuture<?>> futures = new ArrayList<>();
		if (!poolRef.isEmpty() || !asyncPoolRef.isEmpty()) {
			log.warn("LettucePoolingConnectionProvider contains unreleased connections");
		}

		if (!inProgressAsyncPoolRef.isEmpty()) {

			log.warn("LettucePoolingConnectionProvider has active connection retrievals");
			inProgressAsyncPoolRef.forEach((k, v) -> futures.add(k.thenApply(StatefulConnection::closeAsync)));
		}

		if (!poolRef.isEmpty()) {

			poolRef.forEach((connection, pool) -> pool.returnObject(connection));
			poolRef.clear();
		}

		if (!asyncPoolRef.isEmpty()) {

			asyncPoolRef.forEach((connection, pool) -> futures.add(pool.release(connection)));
			asyncPoolRef.clear();
		}

		pools.forEach((type, pool) -> pool.close());

		CompletableFuture
				.allOf(futures.stream().map(it -> it.exceptionally(LettuceFutureUtils.ignoreErrors()))
						.toArray(CompletableFuture[]::new)) //
				.thenCompose(ignored -> {

					CompletableFuture[] poolClose = asyncPools.values().stream().map(AsyncPool::closeAsync)
							.map(it -> it.exceptionally(LettuceFutureUtils.ignoreErrors())).toArray(CompletableFuture[]::new);

					return CompletableFuture.allOf(poolClose);
				}) //
				.thenRun(() -> {
					asyncPoolRef.clear();
					inProgressAsyncPoolRef.clear();
				}) //
				.join();

		pools.clear();
	}    
    @Override
	public void release(StatefulConnection<?, ?> connection) {

		GenericObjectPool<StatefulConnection<?, ?>> pool = poolRef.remove(connection);

		if (pool == null) {

			AsyncPool<StatefulConnection<?, ?>> asyncPool = asyncPoolRef.remove(connection);

			if (asyncPool == null) {
				throw new PoolException("Returned connection " + connection
						+ " was either previously returned or does not belong to this connection provider");
			}

			discardIfNecessary(connection);
			asyncPool.release(connection).join();
			return;
		}

		discardIfNecessary(connection);
		pool.returnObject(connection);
	}

To prevent this, the disposal logic was moved to under the state safe guard, ensuring it’s executed only once and making the lifecycle handling more robust.

clusterCommandExecutor = null;

if (client != null) {
try {
Duration quietPeriod = clientConfiguration.getShutdownQuietPeriod();
Expand Down Expand Up @@ -1012,20 +1016,7 @@ public void afterPropertiesSet() {

@Override
public void destroy() {

stop();
this.client = null;

ClusterCommandExecutor clusterCommandExecutor = this.clusterCommandExecutor;

if (clusterCommandExecutor != null) {
try {
clusterCommandExecutor.destroy();
this.clusterCommandExecutor = null;
} catch (Exception ex) {
log.warn("Cannot properly close cluster command executor", ex);
}
}

this.state.set(State.DESTROYED);
}
Expand All @@ -1043,6 +1034,16 @@ private void dispose(@Nullable LettuceConnectionProvider connectionProvider) {
}
}

private void dispose(@Nullable ClusterCommandExecutor commandExecutor) {
if (commandExecutor != null) {
try {
commandExecutor.destroy();
} catch (Exception ex) {
log.warn("Cannot properly close cluster command executor", ex);
}
}
}

@Override
public RedisConnection getConnection() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;

import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.context.SmartLifecycle;
import org.springframework.data.redis.connection.PoolException;
import org.springframework.util.Assert;

Expand All @@ -56,13 +58,16 @@
* @author Mark Paluch
* @author Christoph Strobl
* @author Asmir Mustafic
* @author UHyeon Jeong
* @since 2.0
* @see #getConnection(Class)
*/
class LettucePoolingConnectionProvider implements LettuceConnectionProvider, RedisClientProvider, DisposableBean {
class LettucePoolingConnectionProvider implements LettuceConnectionProvider, RedisClientProvider, DisposableBean,
SmartLifecycle {

private static final Log log = LogFactory.getLog(LettucePoolingConnectionProvider.class);

private final AtomicReference<State> state = new AtomicReference<>(State.CREATED);
private final LettuceConnectionProvider connectionProvider;
private final GenericObjectPoolConfig<StatefulConnection<?, ?>> poolConfig;
private final Map<StatefulConnection<?, ?>, GenericObjectPool<StatefulConnection<?, ?>>> poolRef = new ConcurrentHashMap<>(
Expand All @@ -76,6 +81,10 @@ class LettucePoolingConnectionProvider implements LettuceConnectionProvider, Red
private final Map<Class<?>, AsyncPool<StatefulConnection<?, ?>>> asyncPools = new ConcurrentHashMap<>(32);
private final BoundedPoolConfig asyncPoolConfig;

enum State {
CREATED, STARTING, STARTED, STOPPING, STOPPED, DESTROYED;
}

LettucePoolingConnectionProvider(LettuceConnectionProvider connectionProvider,
LettucePoolingClientConfiguration clientConfiguration) {

Expand Down Expand Up @@ -206,39 +215,51 @@ public CompletableFuture<Void> releaseAsync(StatefulConnection<?, ?> connection)

@Override
public void destroy() throws Exception {
stop();
state.set(State.DESTROYED);
}

List<CompletableFuture<?>> futures = new ArrayList<>();
if (!poolRef.isEmpty() || !asyncPoolRef.isEmpty()) {
log.warn("LettucePoolingConnectionProvider contains unreleased connections");
}

if (!inProgressAsyncPoolRef.isEmpty()) {
@Override
public void start() {
state.set(State.STARTED);
}

log.warn("LettucePoolingConnectionProvider has active connection retrievals");
inProgressAsyncPoolRef.forEach((k, v) -> futures.add(k.thenApply(StatefulConnection::closeAsync)));
}
@Override
public void stop() {
if (state.compareAndSet(State.STARTED, State.STOPPING)) {
List<CompletableFuture<?>> futures = new ArrayList<>();
if (!poolRef.isEmpty() || !asyncPoolRef.isEmpty()) {
log.warn("LettucePoolingConnectionProvider contains unreleased connections");
}

if (!poolRef.isEmpty()) {
if (!inProgressAsyncPoolRef.isEmpty()) {

poolRef.forEach((connection, pool) -> pool.returnObject(connection));
poolRef.clear();
}
log.warn("LettucePoolingConnectionProvider has active connection retrievals");
inProgressAsyncPoolRef.forEach((k, v) -> futures.add(k.thenApply(StatefulConnection::closeAsync)));
}

if (!asyncPoolRef.isEmpty()) {
if (!poolRef.isEmpty()) {

asyncPoolRef.forEach((connection, pool) -> futures.add(pool.release(connection)));
asyncPoolRef.clear();
}
poolRef.forEach((connection, pool) -> pool.returnObject(connection));
poolRef.clear();
}

if (!asyncPoolRef.isEmpty()) {

asyncPoolRef.forEach((connection, pool) -> futures.add(pool.release(connection)));
asyncPoolRef.clear();
}

pools.forEach((type, pool) -> pool.close());
pools.forEach((type, pool) -> pool.close());

CompletableFuture
CompletableFuture
.allOf(futures.stream().map(it -> it.exceptionally(LettuceFutureUtils.ignoreErrors()))
.toArray(CompletableFuture[]::new)) //
.toArray(CompletableFuture[]::new)) //
.thenCompose(ignored -> {

CompletableFuture[] poolClose = asyncPools.values().stream().map(AsyncPool::closeAsync)
.map(it -> it.exceptionally(LettuceFutureUtils.ignoreErrors())).toArray(CompletableFuture[]::new);
.map(it -> it.exceptionally(LettuceFutureUtils.ignoreErrors())).toArray(CompletableFuture[]::new);

return CompletableFuture.allOf(poolClose);
}) //
Expand All @@ -248,6 +269,18 @@ public void destroy() throws Exception {
}) //
.join();

pools.clear();
pools.clear();
}
state.set(State.STOPPED);
}

@Override
public boolean isRunning() {
return State.STARTED.equals(this.state.get());
}

@Override
public boolean isAutoStartup() {
return true;
}
}