Skip to content

race condition leads to exceptions when spring context is being shutdown and there is a stream redis listener #2262

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
lrozek opened this issue Feb 16, 2022 · 3 comments
Labels
status: duplicate A duplicate of another issue

Comments

@lrozek
Copy link

lrozek commented Feb 16, 2022

there is a race condition in following methods calls RedisConnectionFactory(LettuceConnectionFactory)#destroy and StreamPollTask#doLoop and DefaultStreamMessageListenerContainer#stop

when DefaultStreamMessageListenerContainer#stop is being called as a result of spring context being shutdown it does not wait until StreamPollTask#doLoop is finished, meaning that redis connection is still in use. Later on redis connection is being closed as a result of
RedisConnectionFactory#destroy while StreamPollTask#doLoop is still doing its last iteration.

This might lead to following exceptions, depending what has finished in LettuceConnectionFactory#destroy. The easiest way to reproduce following exceptions is by stepping through LettuceConnectionFactory#destroy and than resuming StreamPollTask#readRecords

2022-02-16 10:44:27.782  WARN 25768 --- [cTaskExecutor-1] io.lettuce.core.RedisChannelHandler      : Connection is already closed
2022-02-16 10:44:27.787 ERROR 25768 --- [cTaskExecutor-1] ageListenerContainer$LoggingErrorHandler : Unexpected error occurred in scheduled task.

org.springframework.data.redis.RedisSystemException: Redis exception; nested exception is io.lettuce.core.RedisException: Connection closed
	at org.springframework.data.redis.connection.lettuce.LettuceExceptionConverter.convert(LettuceExceptionConverter.java:74) ~[spring-data-redis-2.6.1.jar!/:2.6.1]
	at org.springframework.data.redis.connection.lettuce.LettuceExceptionConverter.convert(LettuceExceptionConverter.java:41) ~[spring-data-redis-2.6.1.jar!/:2.6.1]
	at org.springframework.data.redis.PassThroughExceptionTranslationStrategy.translate(PassThroughExceptionTranslationStrategy.java:44) ~[spring-data-redis-2.6.1.jar!/:2.6.1]
	at org.springframework.data.redis.FallbackExceptionTranslationStrategy.translate(FallbackExceptionTranslationStrategy.java:42) ~[spring-data-redis-2.6.1.jar!/:2.6.1]
	at org.springframework.data.redis.connection.lettuce.LettuceConnection.convertLettuceAccessException(LettuceConnection.java:272) ~[spring-data-redis-2.6.1.jar!/:2.6.1]
	at org.springframework.data.redis.connection.lettuce.LettuceConnection.await(LettuceConnection.java:1063) ~[spring-data-redis-2.6.1.jar!/:2.6.1]
	at org.springframework.data.redis.connection.lettuce.LettuceConnection.lambda$doInvoke$4(LettuceConnection.java:920) ~[spring-data-redis-2.6.1.jar!/:2.6.1]
	at org.springframework.data.redis.connection.lettuce.LettuceInvoker$Synchronizer.invoke(LettuceInvoker.java:673) ~[spring-data-redis-2.6.1.jar!/:2.6.1]
	at org.springframework.data.redis.connection.lettuce.LettuceInvoker$DefaultManyInvocationSpec.toList(LettuceInvoker.java:618) ~[spring-data-redis-2.6.1.jar!/:2.6.1]
	at org.springframework.data.redis.connection.lettuce.LettuceStreamCommands.xRead(LettuceStreamCommands.java:323) ~[spring-data-redis-2.6.1.jar!/:2.6.1]
	at org.springframework.data.redis.stream.DefaultStreamMessageListenerContainer.lambda$null$5(DefaultStreamMessageListenerContainer.java:263) ~[spring-data-redis-2.6.1.jar!/:2.6.1]
	at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:223) ~[spring-data-redis-2.6.1.jar!/:2.6.1]
	at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:190) ~[spring-data-redis-2.6.1.jar!/:2.6.1]
	at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:177) ~[spring-data-redis-2.6.1.jar!/:2.6.1]
	at org.springframework.data.redis.stream.DefaultStreamMessageListenerContainer.lambda$getReadFunction$6(DefaultStreamMessageListenerContainer.java:262) ~[spring-data-redis-2.6.1.jar!/:2.6.1]
	at org.springframework.data.redis.stream.StreamPollTask.readRecords(StreamPollTask.java:166) ~[spring-data-redis-2.6.1.jar!/:2.6.1]
	at org.springframework.data.redis.stream.StreamPollTask.doLoop(StreamPollTask.java:147) ~[spring-data-redis-2.6.1.jar!/:2.6.1]
	at org.springframework.data.redis.stream.StreamPollTask.run(StreamPollTask.java:132) ~[spring-data-redis-2.6.1.jar!/:2.6.1]
	at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: io.lettuce.core.RedisException: Connection closed
	at io.lettuce.core.protocol.DefaultEndpoint.notifyDrainQueuedCommands(DefaultEndpoint.java:679) ~[lettuce-core-6.1.6.RELEASE.jar!/:6.1.6.RELEASE]
	at io.lettuce.core.protocol.CommandHandler.channelInactive(CommandHandler.java:358) ~[lettuce-core-6.1.6.RELEASE.jar!/:6.1.6.RELEASE]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) ~[netty-transport-4.1.73.Final.jar!/:4.1.73.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) ~[netty-transport-4.1.73.Final.jar!/:4.1.73.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241) ~[netty-transport-4.1.73.Final.jar!/:4.1.73.Final]
	at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81) ~[netty-transport-4.1.73.Final.jar!/:4.1.73.Final]
	at io.lettuce.core.protocol.RedisHandshakeHandler.channelInactive(RedisHandshakeHandler.java:89) ~[lettuce-core-6.1.6.RELEASE.jar!/:6.1.6.RELEASE]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) ~[netty-transport-4.1.73.Final.jar!/:4.1.73.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) ~[netty-transport-4.1.73.Final.jar!/:4.1.73.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241) ~[netty-transport-4.1.73.Final.jar!/:4.1.73.Final]
	at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81) ~[netty-transport-4.1.73.Final.jar!/:4.1.73.Final]
	at io.lettuce.core.ChannelGroupListener.channelInactive(ChannelGroupListener.java:69) ~[lettuce-core-6.1.6.RELEASE.jar!/:6.1.6.RELEASE]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) ~[netty-transport-4.1.73.Final.jar!/:4.1.73.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) ~[netty-transport-4.1.73.Final.jar!/:4.1.73.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241) ~[netty-transport-4.1.73.Final.jar!/:4.1.73.Final]
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405) ~[netty-transport-4.1.73.Final.jar!/:4.1.73.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) ~[netty-transport-4.1.73.Final.jar!/:4.1.73.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) ~[netty-transport-4.1.73.Final.jar!/:4.1.73.Final]
	at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901) ~[netty-transport-4.1.73.Final.jar!/:4.1.73.Final]
	at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:813) ~[netty-transport-4.1.73.Final.jar!/:4.1.73.Final]
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) ~[netty-common-4.1.73.Final.jar!/:4.1.73.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469) ~[netty-common-4.1.73.Final.jar!/:4.1.73.Final]
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:503) ~[netty-transport-4.1.73.Final.jar!/:4.1.73.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) ~[netty-common-4.1.73.Final.jar!/:4.1.73.Final]
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.73.Final.jar!/:4.1.73.Final]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.73.Final.jar!/:4.1.73.Final]
	... 1 common frames omitted
2022-02-16 10:49:23.331  WARN 26174 --- [cTaskExecutor-1] io.netty.channel.AbstractChannel         : Force-closing a channel whose registration task was not accepted by an event loop: [id: 0x985b4d05]

java.util.concurrent.RejectedExecutionException: event executor terminated
	at io.netty.util.concurrent.SingleThreadEventExecutor.reject(SingleThreadEventExecutor.java:923) ~[netty-common-4.1.73.Final.jar:4.1.73.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor.offerTask(SingleThreadEventExecutor.java:350) ~[netty-common-4.1.73.Final.jar:4.1.73.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor.addTask(SingleThreadEventExecutor.java:343) ~[netty-common-4.1.73.Final.jar:4.1.73.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:825) ~[netty-common-4.1.73.Final.jar:4.1.73.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:815) ~[netty-common-4.1.73.Final.jar:4.1.73.Final]
	at io.netty.channel.AbstractChannel$AbstractUnsafe.register(AbstractChannel.java:483) ~[netty-transport-4.1.73.Final.jar:4.1.73.Final]
	at io.netty.channel.SingleThreadEventLoop.register(SingleThreadEventLoop.java:87) ~[netty-transport-4.1.73.Final.jar:4.1.73.Final]
	at io.netty.channel.SingleThreadEventLoop.register(SingleThreadEventLoop.java:81) ~[netty-transport-4.1.73.Final.jar:4.1.73.Final]
	at io.netty.channel.MultithreadEventLoopGroup.register(MultithreadEventLoopGroup.java:86) ~[netty-transport-4.1.73.Final.jar:4.1.73.Final]
	at io.netty.bootstrap.AbstractBootstrap.initAndRegister(AbstractBootstrap.java:323) ~[netty-transport-4.1.73.Final.jar:4.1.73.Final]
	at io.netty.bootstrap.Bootstrap.doResolveAndConnect(Bootstrap.java:155) ~[netty-transport-4.1.73.Final.jar:4.1.73.Final]
	at io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:139) ~[netty-transport-4.1.73.Final.jar:4.1.73.Final]
	at io.lettuce.core.AbstractRedisClient.initializeChannelAsync0(AbstractRedisClient.java:409) ~[lettuce-core-6.1.6.RELEASE.jar:6.1.6.RELEASE]
	at io.lettuce.core.AbstractRedisClient.lambda$initializeChannelAsync$2(AbstractRedisClient.java:391) ~[lettuce-core-6.1.6.RELEASE.jar:6.1.6.RELEASE]
	at reactor.core.publisher.LambdaMonoSubscriber.onNext(LambdaMonoSubscriber.java:171) ~[reactor-core-3.4.14.jar:3.4.14]
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210) ~[reactor-core-3.4.14.jar:3.4.14]
	at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180) ~[reactor-core-3.4.14.jar:3.4.14]
	at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854) ~[reactor-core-3.4.14.jar:3.4.14]
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816) ~[reactor-core-3.4.14.jar:3.4.14]
	at reactor.core.publisher.MonoCallable.subscribe(MonoCallable.java:62) ~[reactor-core-3.4.14.jar:3.4.14]
	at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.4.14.jar:3.4.14]
	at reactor.core.publisher.Mono.subscribe(Mono.java:4400) ~[reactor-core-3.4.14.jar:3.4.14]
	at reactor.core.publisher.Mono.subscribeWith(Mono.java:4515) ~[reactor-core-3.4.14.jar:3.4.14]
	at reactor.core.publisher.Mono.subscribe(Mono.java:4371) ~[reactor-core-3.4.14.jar:3.4.14]
	at reactor.core.publisher.Mono.subscribe(Mono.java:4307) ~[reactor-core-3.4.14.jar:3.4.14]
	at reactor.core.publisher.Mono.subscribe(Mono.java:4279) ~[reactor-core-3.4.14.jar:3.4.14]
	at io.lettuce.core.AbstractRedisClient.initializeChannelAsync(AbstractRedisClient.java:386) ~[lettuce-core-6.1.6.RELEASE.jar:6.1.6.RELEASE]
	at io.lettuce.core.RedisClient.connectStatefulAsync(RedisClient.java:325) ~[lettuce-core-6.1.6.RELEASE.jar:6.1.6.RELEASE]
	at io.lettuce.core.RedisClient.connectStandaloneAsync(RedisClient.java:287) ~[lettuce-core-6.1.6.RELEASE.jar:6.1.6.RELEASE]
	at io.lettuce.core.RedisClient.connect(RedisClient.java:216) ~[lettuce-core-6.1.6.RELEASE.jar:6.1.6.RELEASE]
	at org.springframework.data.redis.connection.lettuce.StandaloneConnectionProvider.lambda$getConnection$1(StandaloneConnectionProvider.java:115) ~[spring-data-redis-2.6.1.jar:2.6.1]
	at java.base/java.util.Optional.orElseGet(Optional.java:362) ~[na:na]
	at org.springframework.data.redis.connection.lettuce.StandaloneConnectionProvider.getConnection(StandaloneConnectionProvider.java:115) ~[spring-data-redis-2.6.1.jar:2.6.1]
	at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory$ExceptionTranslatingConnectionProvider.getConnection(LettuceConnectionFactory.java:1595) ~[spring-data-redis-2.6.1.jar:2.6.1]
	at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory$SharedConnection.getNativeConnection(LettuceConnectionFactory.java:1383) ~[spring-data-redis-2.6.1.jar:2.6.1]
	at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory$SharedConnection.getConnection(LettuceConnectionFactory.java:1366) ~[spring-data-redis-2.6.1.jar:2.6.1]
	at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory.getSharedConnection(LettuceConnectionFactory.java:1093) ~[spring-data-redis-2.6.1.jar:2.6.1]
	at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory.getConnection(LettuceConnectionFactory.java:421) ~[spring-data-redis-2.6.1.jar:2.6.1]
	at org.springframework.data.redis.core.RedisConnectionUtils.fetchConnection(RedisConnectionUtils.java:193) ~[spring-data-redis-2.6.1.jar:2.6.1]
	at org.springframework.data.redis.core.RedisConnectionUtils.doGetConnection(RedisConnectionUtils.java:144) ~[spring-data-redis-2.6.1.jar:2.6.1]
	at org.springframework.data.redis.core.RedisConnectionUtils.getConnection(RedisConnectionUtils.java:105) ~[spring-data-redis-2.6.1.jar:2.6.1]
	at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:210) ~[spring-data-redis-2.6.1.jar:2.6.1]
	at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:190) ~[spring-data-redis-2.6.1.jar:2.6.1]
	at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:177) ~[spring-data-redis-2.6.1.jar:2.6.1]
	at org.springframework.data.redis.stream.DefaultStreamMessageListenerContainer.lambda$getReadFunction$6(DefaultStreamMessageListenerContainer.java:262) ~[spring-data-redis-2.6.1.jar:2.6.1]
	at org.springframework.data.redis.stream.StreamPollTask.readRecords(StreamPollTask.java:166) ~[spring-data-redis-2.6.1.jar:2.6.1]
	at org.springframework.data.redis.stream.StreamPollTask.doLoop(StreamPollTask.java:147) ~[spring-data-redis-2.6.1.jar:2.6.1]
	at org.springframework.data.redis.stream.StreamPollTask.run(StreamPollTask.java:132) ~[spring-data-redis-2.6.1.jar:2.6.1]
	at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
2022-02-16 10:54:17.735 ERROR 26812 --- [cTaskExecutor-1] ageListenerContainer$LoggingErrorHandler : Unexpected error occurred in scheduled task.

java.lang.IllegalStateException: LettuceConnectionFactory was destroyed and cannot be used anymore
	at org.springframework.util.Assert.state(Assert.java:76) ~[spring-core-5.3.15.jar:5.3.15]
	at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory.assertInitialized(LettuceConnectionFactory.java:1263) ~[spring-data-redis-2.6.1.jar:2.6.1]
	at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory.getConnection(LettuceConnectionFactory.java:414) ~[spring-data-redis-2.6.1.jar:2.6.1]
	at org.springframework.data.redis.core.RedisConnectionUtils.fetchConnection(RedisConnectionUtils.java:193) ~[spring-data-redis-2.6.1.jar:2.6.1]
	at org.springframework.data.redis.core.RedisConnectionUtils.doGetConnection(RedisConnectionUtils.java:144) ~[spring-data-redis-2.6.1.jar:2.6.1]
	at org.springframework.data.redis.core.RedisConnectionUtils.getConnection(RedisConnectionUtils.java:105) ~[spring-data-redis-2.6.1.jar:2.6.1]
	at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:210) ~[spring-data-redis-2.6.1.jar:2.6.1]
	at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:190) ~[spring-data-redis-2.6.1.jar:2.6.1]
	at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:177) ~[spring-data-redis-2.6.1.jar:2.6.1]
	at org.springframework.data.redis.stream.DefaultStreamMessageListenerContainer.lambda$getReadFunction$6(DefaultStreamMessageListenerContainer.java:262) ~[spring-data-redis-2.6.1.jar:2.6.1]
	at org.springframework.data.redis.stream.StreamPollTask.readRecords(StreamPollTask.java:166) ~[spring-data-redis-2.6.1.jar:2.6.1]
	at org.springframework.data.redis.stream.StreamPollTask.doLoop(StreamPollTask.java:147) ~[spring-data-redis-2.6.1.jar:2.6.1]
	at org.springframework.data.redis.stream.StreamPollTask.run(StreamPollTask.java:132) ~[spring-data-redis-2.6.1.jar:2.6.1]
	at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
@lrozek
Copy link
Author

lrozek commented Feb 16, 2022

I will add a repro project in a minute

@mp911de
Copy link
Member

mp911de commented Feb 16, 2022

This is essentially a duplicate of #2261

@mp911de mp911de closed this as completed Feb 16, 2022
@mp911de mp911de added status: duplicate A duplicate of another issue and removed status: waiting-for-triage An issue we've not yet triaged labels Feb 16, 2022
@lrozek
Copy link
Author

lrozek commented Feb 16, 2022

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status: duplicate A duplicate of another issue
Projects
None yet
Development

No branches or pull requests

3 participants