Skip to content

Commit 7f2315e

Browse files
committed
GH-3005: Fix SimpleMLC.killOrRestart for closed AC
Fixes: #3005 Issue link: #3005 The `SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.killOrRestart()` is also called during application context shutdown. At this moment we cannot emit events into an application context. Otherwise, it fails with: ``` Exception in thread "rabbitListenerExecutor1" org.springframework.beans.factory.BeanCreationNotAllowedException: Error creating bean with name 'refreshEventListener': Singleton bean creation not allowed while singletons of this factory are in destruction (Do not request a bean from a BeanFactory in a destroy method implementation!) ``` * Introduce `ObservableListenerContainer.isApplicationContextClosed()` and call it as additional condition in the `SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.killOrRestart()` before trying to emit `AsyncConsumerStoppedEvent` The fix for `3.1.x` requires a slightly different approach via `ContextClosedEvent` # Conflicts: # spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/ObservableListenerContainer.java
1 parent 88c04bb commit 7f2315e

File tree

2 files changed

+14
-7
lines changed

2 files changed

+14
-7
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/ObservableListenerContainer.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2023 the original author or authors.
2+
* Copyright 2023-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -24,19 +24,22 @@
2424
import org.springframework.beans.factory.DisposableBean;
2525
import org.springframework.context.ApplicationContext;
2626
import org.springframework.context.ApplicationContextAware;
27+
import org.springframework.context.ConfigurableApplicationContext;
2728
import org.springframework.lang.Nullable;
2829
import org.springframework.util.ClassUtils;
2930

3031
/**
3132
* @author Gary Russell
33+
* @author Artem Bilan
34+
*
3235
* @since 3.0.5
3336
*
3437
*/
3538
public abstract class ObservableListenerContainer extends RabbitAccessor
3639
implements MessageListenerContainer, ApplicationContextAware, BeanNameAware, DisposableBean {
3740

3841
private static final boolean MICROMETER_PRESENT = ClassUtils.isPresent(
39-
"io.micrometer.core.instrument.MeterRegistry", AbstractMessageListenerContainer.class.getClassLoader());
42+
"io.micrometer.core.instrument.MeterRegistry", AbstractMessageListenerContainer.class.getClassLoader());
4043

4144
private ApplicationContext applicationContext;
4245

@@ -119,6 +122,11 @@ protected void checkObservation() {
119122
}
120123
}
121124

125+
protected boolean isApplicationContextClosed() {
126+
return this.applicationContext instanceof ConfigurableApplicationContext configurableCtx
127+
&& configurableCtx.isClosed();
128+
}
129+
122130
@Override
123131
public void setBeanName(String beanName) {
124132
this.beanName = beanName;

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import org.springframework.amqp.rabbit.support.ListenerExecutionFailedException;
6262
import org.springframework.amqp.rabbit.support.RabbitExceptionTranslator;
6363
import org.springframework.amqp.support.ConsumerTagStrategy;
64+
import org.springframework.context.ApplicationEventPublisher;
6465
import org.springframework.core.log.LogMessage;
6566
import org.springframework.jmx.export.annotation.ManagedMetric;
6667
import org.springframework.jmx.support.MetricType;
@@ -798,7 +799,6 @@ protected void adjustConsumers(int deltaArg) {
798799
}
799800
}
800801

801-
802802
/**
803803
* Start up to delta consumers, limited by {@link #setMaxConcurrentConsumers(int)}.
804804
* @param delta the consumers to add.
@@ -863,7 +863,6 @@ private void considerAddingAConsumer() {
863863
}
864864
}
865865

866-
867866
private void considerStoppingAConsumer(BlockingQueueConsumer consumer) {
868867
this.consumersLock.lock();
869868
try {
@@ -1266,7 +1265,6 @@ private final class AsyncMessageProcessingConsumer implements Runnable {
12661265

12671266
private boolean failedExclusive;
12681267

1269-
12701268
AsyncMessageProcessingConsumer(BlockingQueueConsumer consumer) {
12711269
this.consumer = consumer;
12721270
this.start = new CountDownLatch(1);
@@ -1531,8 +1529,9 @@ private void killOrRestart(boolean aborted) {
15311529
try {
15321530
this.consumer.stop();
15331531
SimpleMessageListenerContainer.this.cancellationLock.release(this.consumer);
1534-
if (getApplicationEventPublisher() != null) {
1535-
getApplicationEventPublisher().publishEvent(
1532+
ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
1533+
if (applicationEventPublisher != null && !isApplicationContextClosed()) {
1534+
applicationEventPublisher.publishEvent(
15361535
new AsyncConsumerStoppedEvent(SimpleMessageListenerContainer.this, this.consumer));
15371536
}
15381537
}

0 commit comments

Comments
 (0)