Skip to content

Commit ca1174f

Browse files
committed
GH-3014: Add request/reply support into RabbitAmqpMessageListenerAdapter
Fixes: #3014
1 parent 1741803 commit ca1174f

File tree

7 files changed

+208
-26
lines changed

7 files changed

+208
-26
lines changed

build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -481,6 +481,7 @@ project('spring-rabbitmq-client') {
481481
api "com.rabbitmq.client:amqp-client:$rabbitmqAmqpClientVersion"
482482

483483
testApi project(':spring-rabbit-junit')
484+
testApi 'io.projectreactor:reactor-core'
484485

485486
testRuntimeOnly 'com.fasterxml.jackson.core:jackson-databind'
486487

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/adapter/AbstractAdaptableMessageListener.java

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,10 @@ public void setBeforeSendReplyPostProcessors(MessagePostProcessor... beforeSendR
216216
beforeSendReplyPostProcessors.length);
217217
}
218218

219+
public MessagePostProcessor @Nullable [] getBeforeSendReplyPostProcessors() {
220+
return this.beforeSendReplyPostProcessors;
221+
}
222+
219223
/**
220224
* Set a {@link RetryTemplate} to use when sending replies.
221225
* @param retryTemplate the template.
@@ -369,7 +373,7 @@ protected void handleResult(InvocationResult resultArg, Message request, @Nullab
369373
/**
370374
* Handle the given result object returned from the listener method, sending a
371375
* response message back.
372-
* @param resultArg the result object to handle (never <code>null</code>)
376+
* @param resultArg the result object to handle
373377
* @param request the original request message
374378
* @param channel the Rabbit channel to operate on (maybe <code>null</code>)
375379
* @param source the source data for the method invocation - e.g.
@@ -383,7 +387,7 @@ protected void handleResult(InvocationResult resultArg, Message request, @Nullab
383387
protected void handleResult(@Nullable InvocationResult resultArg, Message request,
384388
@Nullable Channel channel, @Nullable Object source) {
385389

386-
if (channel != null && resultArg != null) {
390+
if (resultArg != null) {
387391
if (resultArg.getReturnValue() instanceof CompletableFuture<?> completable) {
388392
if (!this.isManualAck) {
389393
this.logger.warn("Container AcknowledgeMode must be MANUAL for a Future<?> return type; "
@@ -413,13 +417,9 @@ else if (monoPresent && MonoHandler.isMono(resultArg.getReturnValue())) {
413417
doHandleResult(resultArg, request, channel, source);
414418
}
415419
}
416-
else if (this.logger.isWarnEnabled()) {
417-
this.logger.warn("Listener method returned result [" + resultArg
418-
+ "]: not generating response message for it because no Rabbit Channel given");
419-
}
420420
}
421421

422-
private void asyncSuccess(InvocationResult resultArg, Message request, Channel channel,
422+
private void asyncSuccess(InvocationResult resultArg, Message request, @Nullable Channel channel,
423423
@Nullable Object source, @Nullable Object deferredResult) {
424424

425425
if (deferredResult == null) {
@@ -458,8 +458,9 @@ protected void basicAck(Message request, @Nullable Channel channel) {
458458
}
459459
}
460460

461-
protected void asyncFailure(Message request, Channel channel, Throwable t, @Nullable Object source) {
461+
protected void asyncFailure(Message request, @Nullable Channel channel, Throwable t, @Nullable Object source) {
462462
this.logger.error("Future, Mono, or suspend function was completed with an exception for " + request, t);
463+
Assert.notNull(channel, "'channel' must not be null.");
463464
try {
464465
channel.basicNack(request.getMessageProperties().getDeliveryTag(), false,
465466
ContainerUtils.shouldRequeue(this.defaultRequeueRejected, t, this.logger));
@@ -469,7 +470,7 @@ protected void asyncFailure(Message request, Channel channel, Throwable t, @Null
469470
}
470471
}
471472

472-
protected void doHandleResult(InvocationResult resultArg, Message request, Channel channel,
473+
protected void doHandleResult(InvocationResult resultArg, Message request, @Nullable Channel channel,
473474
@Nullable Object source) {
474475

475476
if (this.logger.isDebugEnabled()) {
@@ -500,12 +501,13 @@ protected void doHandleResult(InvocationResult resultArg, Message request, Chann
500501
/**
501502
* Build a Rabbit message to be sent as response based on the given result object.
502503
* @param channel the Rabbit Channel to operate on.
504+
* Can be null if implementation does not support AMQP 0.9.1.
503505
* @param result the content of the message, as returned from the listener method.
504506
* @param genericType the generic type to populate type headers.
505507
* @return the Rabbit <code>Message</code> (never <code>null</code>).
506508
* @see #setMessageConverter
507509
*/
508-
protected Message buildMessage(Channel channel, @Nullable Object result, @Nullable Type genericType) {
510+
protected Message buildMessage(@Nullable Channel channel, @Nullable Object result, @Nullable Type genericType) {
509511
MessageConverter converter = getMessageConverter();
510512
if (converter != null && !(result instanceof Message)) {
511513
return convert(result, genericType, converter);
@@ -633,7 +635,8 @@ private Address evaluateReplyTo(Message request, @Nullable Object source, @Nulla
633635
* @see #postProcessResponse(Message, Message)
634636
* @see #setReplyPostProcessor(ReplyPostProcessor)
635637
*/
636-
protected void sendResponse(Channel channel, Address replyTo, Message messageIn) {
638+
protected void sendResponse(@Nullable Channel channel, Address replyTo, Message messageIn) {
639+
Assert.notNull(channel, "'channel' must not be null.");
637640
Message message = messageIn;
638641
if (this.beforeSendReplyPostProcessors != null) {
639642
for (MessagePostProcessor postProcessor : this.beforeSendReplyPostProcessors) {

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/adapter/MessagingMessageListenerAdapter.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ public void onMessage(org.springframework.amqp.core.Message amqpMessage, @Nullab
169169
}
170170

171171
@Override
172-
protected void asyncFailure(org.springframework.amqp.core.Message request, Channel channel, Throwable t,
172+
protected void asyncFailure(org.springframework.amqp.core.Message request, @Nullable Channel channel, Throwable t,
173173
@Nullable Object source) {
174174

175175
try {
@@ -183,7 +183,7 @@ protected void asyncFailure(org.springframework.amqp.core.Message request, Chann
183183
super.asyncFailure(request, channel, t, source);
184184
}
185185

186-
private void handleException(org.springframework.amqp.core.Message amqpMessage, @Nullable Channel channel,
186+
protected void handleException(org.springframework.amqp.core.Message amqpMessage, @Nullable Channel channel,
187187
@Nullable Message<?> message, ListenerExecutionFailedException e) throws Exception { // NOSONAR
188188

189189
if (this.errorHandler != null) {
@@ -307,7 +307,7 @@ private String createMessagingErrorMessage(Object payload) {
307307
* @see #setMessageConverter
308308
*/
309309
@Override
310-
protected org.springframework.amqp.core.Message buildMessage(Channel channel, @Nullable Object result,
310+
protected org.springframework.amqp.core.Message buildMessage(@Nullable Channel channel, @Nullable Object result,
311311
@Nullable Type genericType) {
312312

313313
MessageConverter converter = getMessageConverter();

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/adapter/MessageListenerAdapterTests.java

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.concurrent.atomic.AtomicReference;
2525

2626
import com.rabbitmq.client.Channel;
27+
import org.jspecify.annotations.Nullable;
2728
import org.junit.jupiter.api.BeforeEach;
2829
import org.junit.jupiter.api.Test;
2930
import reactor.core.publisher.Mono;
@@ -33,7 +34,6 @@
3334
import org.springframework.amqp.core.Message;
3435
import org.springframework.amqp.core.MessageProperties;
3536
import org.springframework.amqp.support.SendRetryContextAccessor;
36-
import org.springframework.amqp.support.converter.SimpleMessageConverter;
3737
import org.springframework.aop.framework.ProxyFactory;
3838
import org.springframework.retry.RetryPolicy;
3939
import org.springframework.retry.policy.SimpleRetryPolicy;
@@ -67,8 +67,15 @@ public class MessageListenerAdapterTests {
6767
public void init() {
6868
this.messageProperties = new MessageProperties();
6969
this.messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
70-
this.adapter = new MessageListenerAdapter();
71-
this.adapter.setMessageConverter(new SimpleMessageConverter());
70+
this.adapter = new MessageListenerAdapter() {
71+
72+
@Override
73+
protected void doHandleResult(InvocationResult resultArg, Message request, @Nullable Channel channel,
74+
@Nullable Object source) {
75+
76+
}
77+
78+
};
7279
}
7380

7481
@Test
@@ -77,7 +84,7 @@ class ExtendedListenerAdapter extends MessageListenerAdapter {
7784

7885
@Override
7986
protected Object[] buildListenerArguments(Object extractedMessage, Channel channel, Message message) {
80-
return new Object[] { extractedMessage, channel, message };
87+
return new Object[] {extractedMessage, channel, message};
8188
}
8289

8390
}
@@ -131,7 +138,15 @@ public String myPojoMessageMethod(String input) {
131138
}
132139

133140
}
134-
this.adapter = new MessageListenerAdapter(new Delegate(), "myPojoMessageMethod");
141+
this.adapter = new MessageListenerAdapter(new Delegate(), "myPojoMessageMethod") {
142+
143+
@Override
144+
protected void doHandleResult(InvocationResult resultArg, Message request, @Nullable Channel channel,
145+
@Nullable Object source) {
146+
147+
}
148+
149+
};
135150
this.adapter.onMessage(new Message("foo".getBytes(), messageProperties), null);
136151
assertThat(called.get()).isTrue();
137152
}
@@ -146,7 +161,7 @@ public void testExplicitListenerMethod() throws Exception {
146161

147162
@Test
148163
public void testMappedListenerMethod() throws Exception {
149-
Map<String, String> map = new HashMap<String, String>();
164+
Map<String, String> map = new HashMap<>();
150165
map.put("foo", "handle");
151166
map.put("bar", "notDefinedOnInterface");
152167
this.adapter.setDefaultListenerMethod("anotherHandle");
@@ -186,6 +201,7 @@ public void testJdkProxyListener() throws Exception {
186201

187202
@Test
188203
public void testReplyRetry() throws Exception {
204+
this.adapter = new MessageListenerAdapter();
189205
this.adapter.setDefaultListenerMethod("handle");
190206
this.adapter.setDelegate(this.simpleService);
191207
RetryPolicy retryPolicy = new SimpleRetryPolicy(2);
@@ -210,7 +226,7 @@ public void testReplyRetry() throws Exception {
210226
this.adapter.onMessage(message, channel);
211227
assertThat(this.simpleService.called).isEqualTo("handle");
212228
assertThat(replyMessage.get()).isNotNull();
213-
assertThat(new String(replyMessage.get().getBody())).isEqualTo("processedfoo");
229+
assertThat(new String(replyMessage.get().getBody())).isEqualTo("processed foo");
214230
assertThat(replyAddress.get()).isNotNull();
215231
assertThat(replyAddress.get().getExchangeName()).isEqualTo("foo");
216232
assertThat(replyAddress.get().getRoutingKey()).isEqualTo("bar");
@@ -224,7 +240,7 @@ class Delegate {
224240
@SuppressWarnings("unused")
225241
public CompletableFuture<String> myPojoMessageMethod(String input) {
226242
CompletableFuture<String> future = new CompletableFuture<>();
227-
future.complete("processed" + input);
243+
future.complete("processed " + input);
228244
return future;
229245
}
230246

@@ -270,18 +286,18 @@ public static class SimpleService implements Service {
270286
@Override
271287
public String handle(String input) {
272288
called = "handle";
273-
return "processed" + input;
289+
return "processed " + input;
274290
}
275291

276292
@Override
277293
public String anotherHandle(String input) {
278294
called = "anotherHandle";
279-
return "processed" + input;
295+
return "processed " + input;
280296
}
281297

282298
public String notDefinedOnInterface(String input) {
283299
called = "notDefinedOnInterface";
284-
return "processed" + input;
300+
return "processed " + input;
285301
}
286302

287303
}

spring-rabbitmq-client/src/main/java/org/springframework/amqp/rabbitmq/client/listener/RabbitAmqpListenerContainer.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,8 @@ public class RabbitAmqpListenerContainer implements MessageListenerContainer, Be
9999

100100
private @Nullable MessageListener proxy;
101101

102+
private boolean asyncReplies;
103+
102104
private ErrorHandler errorHandler = new ConditionalRejectingErrorHandler();
103105

104106
private @Nullable Collection<MessagePostProcessor> afterReceivePostProcessors;
@@ -255,6 +257,10 @@ public String getListenerId() {
255257
@Override
256258
public void setupMessageListener(MessageListener messageListener) {
257259
this.messageListener = messageListener;
260+
this.asyncReplies = messageListener.isAsyncReplies();
261+
if (this.messageListener instanceof RabbitAmqpMessageListenerAdapter rabbitAmqpMessageListenerAdapter) {
262+
rabbitAmqpMessageListenerAdapter.setConnectionFactory(this.connectionFactory);
263+
}
258264
this.proxy = this.messageListener;
259265
if (!ObjectUtils.isEmpty(this.adviceChain)) {
260266
ProxyFactory factory = new ProxyFactory(messageListener);
@@ -276,6 +282,11 @@ public void afterPropertiesSet() {
276282
Assert.state(this.queues != null, "At least one queue has to be provided for consuming.");
277283
Assert.state(this.messageListener != null, "The 'messageListener' must be provided.");
278284

285+
if (this.asyncReplies && this.autoSettle) {
286+
LOG.info("Enforce MANUAL settlement for async replies.");
287+
this.autoSettle = false;
288+
}
289+
279290
this.messageListener.containerAckMode(this.autoSettle ? AcknowledgeMode.AUTO : AcknowledgeMode.MANUAL);
280291
if (this.messageListener instanceof RabbitAmqpMessageListenerAdapter adapter
281292
&& this.afterReceivePostProcessors != null) {

0 commit comments

Comments
 (0)