Skip to content

Commit 8b2cc61

Browse files
authored
GH-1422: @RabbitListener: Fix Broker-Named Queues
Resolves #1422 Broker-named queues did not work with `@RabbitListener` because the BPP only passed the name of the queue bean, not the bean itself, into the endpoint. Support bean injection; fall back to the previous behavior if a mixture of beans and names are encountered (the containers don't support both types of configuration). Add a note to the javadoc to indicate that broker-named queues are not supported via `queuesToDeclare` and `bindings` properties; such queues must be declared as discrete beans. * Docs. **cherry-pick to `2.4.x` & `2.3.x`**
1 parent 49bc52d commit 8b2cc61

File tree

5 files changed

+88
-26
lines changed

5 files changed

+88
-26
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/annotation/RabbitListener.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2020 the original author or authors.
2+
* Copyright 2014-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -148,6 +148,8 @@
148148
* application context, the queue will be declared on the broker with default
149149
* binding (default exchange with the queue name as the routing key).
150150
* Mutually exclusive with {@link #bindings()} and {@link #queues()}.
151+
* NOTE: Broker-named queues cannot be declared this way, they must be defined
152+
* as beans (with an empty string for the name).
151153
* @return the queue(s) to declare.
152154
* @see org.springframework.amqp.rabbit.listener.MessageListenerContainer
153155
* @since 2.0
@@ -186,6 +188,8 @@
186188
* Array of {@link QueueBinding}s providing the listener's queue names, together
187189
* with the exchange and optional binding information.
188190
* Mutually exclusive with {@link #queues()} and {@link #queuesToDeclare()}.
191+
* NOTE: Broker-named queues cannot be declared this way, they must be defined
192+
* as beans (with an empty string for the name).
189193
* @return the bindings.
190194
* @see org.springframework.amqp.rabbit.listener.MessageListenerContainer
191195
* @since 1.5

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/annotation/RabbitListenerAnnotationBeanPostProcessor.java

Lines changed: 55 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2021 the original author or authors.
2+
* Copyright 2014-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -414,7 +414,19 @@ protected Collection<Declarable> processListener(MethodRabbitListenerEndpoint en
414414
endpoint.setBean(bean);
415415
endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
416416
endpoint.setId(getEndpointId(rabbitListener));
417-
endpoint.setQueueNames(resolveQueues(rabbitListener, declarables));
417+
List<Object> resolvedQueues = resolveQueues(rabbitListener, declarables);
418+
if (!resolvedQueues.isEmpty()) {
419+
if (resolvedQueues.get(0) instanceof String) {
420+
endpoint.setQueueNames(resolvedQueues.stream()
421+
.map(o -> (String) o)
422+
.collect(Collectors.toList()).toArray(new String[0]));
423+
}
424+
else {
425+
endpoint.setQueues(resolvedQueues.stream()
426+
.map(o -> (Queue) o)
427+
.collect(Collectors.toList()).toArray(new Queue[0]));
428+
}
429+
}
418430
endpoint.setConcurrency(resolveExpressionAsStringOrInteger(rabbitListener.concurrency(), "concurrency"));
419431
endpoint.setBeanFactory(this.beanFactory);
420432
endpoint.setReturnExceptions(resolveExpressionAsBoolean(rabbitListener.returnExceptions()));
@@ -625,58 +637,85 @@ private String getEndpointId(RabbitListener rabbitListener) {
625637
}
626638
}
627639

628-
private String[] resolveQueues(RabbitListener rabbitListener, Collection<Declarable> declarables) {
640+
private List<Object> resolveQueues(RabbitListener rabbitListener, Collection<Declarable> declarables) {
629641
String[] queues = rabbitListener.queues();
630642
QueueBinding[] bindings = rabbitListener.bindings();
631643
org.springframework.amqp.rabbit.annotation.Queue[] queuesToDeclare = rabbitListener.queuesToDeclare();
632-
List<String> result = new ArrayList<String>();
644+
List<String> queueNames = new ArrayList<String>();
645+
List<Queue> queueBeans = new ArrayList<Queue>();
633646
if (queues.length > 0) {
634647
for (int i = 0; i < queues.length; i++) {
635-
resolveAsString(resolveExpression(queues[i]), result, true, "queues");
648+
resolveQueues(queues[i], queueNames, queueBeans);
636649
}
637650
}
651+
if (!queueNames.isEmpty()) {
652+
// revert to the previous behavior of just using the name when there is mixture of String and Queue
653+
queueBeans.forEach(qb -> queueNames.add(qb.getName()));
654+
queueBeans.clear();
655+
}
638656
if (queuesToDeclare.length > 0) {
639657
if (queues.length > 0) {
640658
throw new BeanInitializationException(
641659
"@RabbitListener can have only one of 'queues', 'queuesToDeclare', or 'bindings'");
642660
}
643661
for (int i = 0; i < queuesToDeclare.length; i++) {
644-
result.add(declareQueue(queuesToDeclare[i], declarables));
662+
queueNames.add(declareQueue(queuesToDeclare[i], declarables));
645663
}
646664
}
647665
if (bindings.length > 0) {
648666
if (queues.length > 0 || queuesToDeclare.length > 0) {
649667
throw new BeanInitializationException(
650668
"@RabbitListener can have only one of 'queues', 'queuesToDeclare', or 'bindings'");
651669
}
652-
return registerBeansForDeclaration(rabbitListener, declarables);
653-
}
654-
return result.toArray(new String[result.size()]);
670+
return Arrays.stream(registerBeansForDeclaration(rabbitListener, declarables))
671+
.map(s -> (Object) s)
672+
.collect(Collectors.toList());
673+
}
674+
return queueNames.isEmpty()
675+
? queueBeans.stream()
676+
.map(s -> (Object) s)
677+
.collect(Collectors.toList())
678+
: queueNames.stream()
679+
.map(s -> (Object) s)
680+
.collect(Collectors.toList());
681+
682+
}
683+
684+
private void resolveQueues(String queue, List<String> result, List<Queue> queueBeans) {
685+
resolveAsStringOrQueue(resolveExpression(queue), result, queueBeans, "queues");
655686
}
656687

657688
@SuppressWarnings("unchecked")
658-
private void resolveAsString(Object resolvedValue, List<String> result, boolean canBeQueue, String what) {
689+
private void resolveAsStringOrQueue(Object resolvedValue, List<String> names, @Nullable List<Queue> queues,
690+
String what) {
691+
659692
Object resolvedValueToUse = resolvedValue;
660693
if (resolvedValue instanceof String[]) {
661694
resolvedValueToUse = Arrays.asList((String[]) resolvedValue);
662695
}
663-
if (canBeQueue && resolvedValueToUse instanceof Queue) {
664-
result.add(((Queue) resolvedValueToUse).getName());
696+
if (queues != null && resolvedValueToUse instanceof Queue) {
697+
if (!names.isEmpty()) {
698+
// revert to the previous behavior of just using the name when there is mixture of String and Queue
699+
names.add(((Queue) resolvedValueToUse).getName());
700+
}
701+
else {
702+
queues.add((Queue) resolvedValueToUse);
703+
}
665704
}
666705
else if (resolvedValueToUse instanceof String) {
667-
result.add((String) resolvedValueToUse);
706+
names.add((String) resolvedValueToUse);
668707
}
669708
else if (resolvedValueToUse instanceof Iterable) {
670709
for (Object object : (Iterable<Object>) resolvedValueToUse) {
671-
resolveAsString(object, result, canBeQueue, what);
710+
resolveAsStringOrQueue(object, names, queues, what);
672711
}
673712
}
674713
else {
675714
throw new IllegalArgumentException(String.format(
676715
"@RabbitListener."
677716
+ what
678717
+ " can't resolve '%s' as a String[] or a String "
679-
+ (canBeQueue ? "or a Queue" : ""),
718+
+ (queues != null ? "or a Queue" : ""),
680719
resolvedValue));
681720
}
682721
}
@@ -776,7 +815,7 @@ private void registerBindings(QueueBinding binding, String queueName, String exc
776815
final int length = binding.key().length;
777816
routingKeys = new ArrayList<>();
778817
for (int i = 0; i < length; ++i) {
779-
resolveAsString(resolveExpression(binding.key()[i]), routingKeys, false, "@QueueBinding.key");
818+
resolveAsStringOrQueue(resolveExpression(binding.key()[i]), routingKeys, null, "@QueueBinding.key");
780819
}
781820
}
782821
final Map<String, Object> bindingArguments = resolveArguments(binding.arguments());

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/annotation/EnableRabbitIntegrationTests.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2021 the original author or authors.
2+
* Copyright 2014-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -60,6 +60,7 @@
6060
import org.springframework.amqp.core.MessagePostProcessor;
6161
import org.springframework.amqp.core.MessageProperties;
6262
import org.springframework.amqp.core.MessagePropertiesBuilder;
63+
import org.springframework.amqp.core.QueueBuilder;
6364
import org.springframework.amqp.rabbit.config.DirectRabbitListenerContainerFactory;
6465
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
6566
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerEndpoint;
@@ -74,6 +75,7 @@
7475
import org.springframework.amqp.rabbit.junit.LogLevels;
7576
import org.springframework.amqp.rabbit.junit.RabbitAvailable;
7677
import org.springframework.amqp.rabbit.junit.RabbitAvailableCondition;
78+
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
7779
import org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler;
7880
import org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer;
7981
import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
@@ -1017,6 +1019,13 @@ void messagePropertiesParam() {
10171019
})).isEqualTo("foo, myProp=bar");
10181020
}
10191021

1022+
@Test
1023+
void listenerWithBrokerNamedQueue() {
1024+
AbstractMessageListenerContainer container =
1025+
(AbstractMessageListenerContainer) this.registry.getListenerContainer("brokerNamed");
1026+
assertThat(container.getQueueNames()[0]).startsWith("amq.gen");
1027+
}
1028+
10201029
interface TxService {
10211030

10221031
@Transactional
@@ -1419,6 +1428,10 @@ public String mpArgument(String payload, MessageProperties props) {
14191428
return payload + ", myProp=" + props.getHeader("myProp");
14201429
}
14211430

1431+
@RabbitListener(id = "brokerNamed", queues = "#{@brokerNamed}")
1432+
void brokerNamed(String in) {
1433+
}
1434+
14221435
}
14231436

14241437
public static class JsonObject {
@@ -2010,6 +2023,11 @@ public ReplyPostProcessor echoPrefixHeader() {
20102023
};
20112024
}
20122025

2026+
@Bean
2027+
org.springframework.amqp.core.Queue brokerNamed() {
2028+
return QueueBuilder.nonDurable("").autoDelete().exclusive().build();
2029+
}
2030+
20132031
}
20142032

20152033
@RabbitListener(bindings = @QueueBinding

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/annotation/RabbitListenerAnnotationBeanPostProcessorTests.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2020 the original author or authors.
2+
* Copyright 2002-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -187,9 +187,9 @@ public void multipleQueuesTestBean() {
187187
RabbitListenerContainerTestFactory factory = context.getBean(RabbitListenerContainerTestFactory.class);
188188
assertThat(factory.getListenerContainers().size()).as("one container should have been registered").isEqualTo(1);
189189
RabbitListenerEndpoint endpoint = factory.getListenerContainers().get(0).getEndpoint();
190-
final Iterator<String> iterator = ((AbstractRabbitListenerEndpoint) endpoint).getQueueNames().iterator();
191-
assertThat(iterator.next()).isEqualTo("testQueue");
192-
assertThat(iterator.next()).isEqualTo("secondQueue");
190+
final Iterator<Queue> iterator = ((AbstractRabbitListenerEndpoint) endpoint).getQueues().iterator();
191+
assertThat(iterator.next().getName()).isEqualTo("testQueue");
192+
assertThat(iterator.next().getName()).isEqualTo("secondQueue");
193193

194194
context.close();
195195
}
@@ -218,9 +218,9 @@ public void propertyResolvingToExpressionTestBean() {
218218
RabbitListenerContainerTestFactory factory = context.getBean(RabbitListenerContainerTestFactory.class);
219219
assertThat(factory.getListenerContainers().size()).as("one container should have been registered").isEqualTo(1);
220220
RabbitListenerEndpoint endpoint = factory.getListenerContainers().get(0).getEndpoint();
221-
final Iterator<String> iterator = ((AbstractRabbitListenerEndpoint) endpoint).getQueueNames().iterator();
222-
assertThat(iterator.next()).isEqualTo("testQueue");
223-
assertThat(iterator.next()).isEqualTo("secondQueue");
221+
final Iterator<Queue> iterator = ((AbstractRabbitListenerEndpoint) endpoint).getQueues().iterator();
222+
assertThat(iterator.next().getName()).isEqualTo("testQueue");
223+
assertThat(iterator.next().getName()).isEqualTo("secondQueue");
224224

225225
context.close();
226226
}

src/reference/asciidoc/amqp.adoc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2340,7 +2340,8 @@ public class MyService {
23402340

23412341
In the first example, a queue `myQueue` is declared automatically (durable) together with the exchange, if needed,
23422342
and bound to the exchange with the routing key.
2343-
In the second example, an anonymous (exclusive, auto-delete) queue is declared and bound.
2343+
In the second example, an anonymous (exclusive, auto-delete) queue is declared and bound; the queue name is created by the framework using the `Base64UrlNamingStrategy`.
2344+
You cannot declare broker-named queues using this technique; they need to be declared as bean definitions; see <<containers-and-broker-named-queues>>.
23442345
Multiple `QueueBinding` entries can be provided, letting the listener listen to multiple queues.
23452346
In the third example, a queue with the name retrieved from property `my.queue` is declared, if necessary, with the default binding to the default exchange using the queue name as the routing key.
23462347

0 commit comments

Comments
 (0)