Skip to content

Commit 5a178de

Browse files
committed
Fix AMQP module for ListenableFuture deprecation
* Realign API for `CompletableFuture` * Fix for Spring AMQP classes extracted to top level
1 parent 5572c21 commit 5a178de

File tree

4 files changed

+78
-81
lines changed

4 files changed

+78
-81
lines changed

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/outbound/AbstractAmqpOutboundEndpoint.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.HashMap;
2222
import java.util.Map;
2323
import java.util.UUID;
24+
import java.util.concurrent.CompletableFuture;
2425
import java.util.concurrent.ScheduledFuture;
2526

2627
import org.springframework.amqp.core.MessageDeliveryMode;
@@ -51,7 +52,6 @@
5152
import org.springframework.messaging.MessageChannel;
5253
import org.springframework.util.Assert;
5354
import org.springframework.util.StringUtils;
54-
import org.springframework.util.concurrent.SettableListenableFuture;
5555

5656
/**
5757
* A base {@link AbstractReplyProducingMessageHandler} extension for AMQP message handlers.
@@ -703,7 +703,7 @@ public Message<?> getMessage() {
703703
}
704704

705705
@Override
706-
public SettableListenableFuture<Confirm> getFuture() {
706+
public CompletableFuture<Confirm> getFuture() {
707707
if (this.userData instanceof CorrelationData) {
708708
return ((CorrelationData) this.userData).getFuture();
709709
}

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/outbound/AsyncAmqpOutboundGateway.java

Lines changed: 63 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2021 the original author or authors.
2+
* Copyright 2016-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,11 +16,13 @@
1616

1717
package org.springframework.integration.amqp.outbound;
1818

19+
import java.util.function.BiConsumer;
20+
1921
import org.springframework.amqp.core.AmqpMessageReturnedException;
2022
import org.springframework.amqp.core.AmqpReplyTimeoutException;
2123
import org.springframework.amqp.core.ReturnedMessage;
2224
import org.springframework.amqp.rabbit.AsyncRabbitTemplate;
23-
import org.springframework.amqp.rabbit.AsyncRabbitTemplate.RabbitMessageFuture;
25+
import org.springframework.amqp.rabbit.RabbitMessageFuture;
2426
import org.springframework.amqp.rabbit.connection.CorrelationData;
2527
import org.springframework.amqp.rabbit.connection.CorrelationData.Confirm;
2628
import org.springframework.amqp.rabbit.core.RabbitTemplate;
@@ -33,7 +35,6 @@
3335
import org.springframework.messaging.MessageHandlingException;
3436
import org.springframework.messaging.MessagingException;
3537
import org.springframework.util.Assert;
36-
import org.springframework.util.concurrent.ListenableFutureCallback;
3738

3839
/**
3940
* An outbound gateway where the sending thread is released immediately and the reply
@@ -91,13 +92,13 @@ protected Object handleRequestMessage(Message<?> requestMessage) {
9192
generateRoutingKey(requestMessage), amqpMessage);
9293
CorrelationData correlationData = generateCorrelationData(requestMessage);
9394
if (correlationData != null && future.getConfirm() != null) {
94-
future.getConfirm().addCallback(new CorrelationCallback(correlationData, future));
95+
future.getConfirm().whenComplete(new CorrelationCallback(correlationData, future));
9596
}
96-
future.addCallback(new FutureCallback(requestMessage, correlationData));
97+
future.whenComplete(new FutureCallback(requestMessage, correlationData));
9798
return null;
9899
}
99100

100-
private final class FutureCallback implements ListenableFutureCallback<org.springframework.amqp.core.Message> {
101+
private final class FutureCallback implements BiConsumer<org.springframework.amqp.core.Message, Throwable> {
101102

102103
private final Message<?> requestMessage;
103104

@@ -109,65 +110,65 @@ private final class FutureCallback implements ListenableFutureCallback<org.sprin
109110
}
110111

111112
@Override
112-
public void onSuccess(org.springframework.amqp.core.Message result) {
113-
AbstractIntegrationMessageBuilder<?> replyMessageBuilder = null;
114-
try {
115-
replyMessageBuilder = buildReply(AsyncAmqpOutboundGateway.this.messageConverter, result);
116-
sendOutputs(replyMessageBuilder, this.requestMessage);
117-
}
118-
catch (Exception ex) {
119-
Exception exceptionToLogAndSend = ex;
120-
if (!(ex instanceof MessagingException)) { // NOSONAR
121-
exceptionToLogAndSend = new MessageHandlingException(this.requestMessage,
122-
"failed to handle a message in the [" + AsyncAmqpOutboundGateway.this + ']', ex);
123-
if (replyMessageBuilder != null) {
124-
exceptionToLogAndSend =
125-
new MessagingException(replyMessageBuilder.build(), exceptionToLogAndSend);
113+
public void accept(org.springframework.amqp.core.Message message, Throwable throwable) {
114+
if (throwable == null) {
115+
AbstractIntegrationMessageBuilder<?> replyMessageBuilder = null;
116+
try {
117+
replyMessageBuilder = buildReply(AsyncAmqpOutboundGateway.this.messageConverter, message);
118+
sendOutputs(replyMessageBuilder, this.requestMessage);
119+
}
120+
catch (Exception ex) {
121+
Exception exceptionToLogAndSend = ex;
122+
if (!(ex instanceof MessagingException)) { // NOSONAR
123+
exceptionToLogAndSend = new MessageHandlingException(this.requestMessage,
124+
"failed to handle a message in the [" + AsyncAmqpOutboundGateway.this + ']', ex);
125+
if (replyMessageBuilder != null) {
126+
exceptionToLogAndSend =
127+
new MessagingException(replyMessageBuilder.build(), exceptionToLogAndSend);
128+
}
126129
}
130+
logger.error(exceptionToLogAndSend, () -> "Failed to send async reply: " + message.toString());
131+
sendErrorMessage(this.requestMessage, exceptionToLogAndSend);
127132
}
128-
logger.error(exceptionToLogAndSend, () -> "Failed to send async reply: " + result.toString());
129-
sendErrorMessage(this.requestMessage, exceptionToLogAndSend);
130133
}
131-
}
132-
133-
@Override
134-
public void onFailure(Throwable ex) {
135-
Throwable exceptionToSend = ex;
136-
if (ex instanceof AmqpReplyTimeoutException) {
137-
if (getRequiresReply()) {
138-
exceptionToSend =
139-
new ReplyRequiredException(this.requestMessage, "Timeout on async request/reply", ex);
134+
else {
135+
Throwable exceptionToSend = throwable;
136+
if (throwable instanceof AmqpReplyTimeoutException) {
137+
if (getRequiresReply()) {
138+
exceptionToSend =
139+
new ReplyRequiredException(this.requestMessage, "Timeout on async request/reply",
140+
throwable);
141+
}
142+
else {
143+
logger.debug(() -> "Reply not required and async timeout for " + this.requestMessage);
144+
return;
145+
}
140146
}
141-
else {
142-
logger.debug(() -> "Reply not required and async timeout for " + this.requestMessage);
143-
return;
147+
if (throwable instanceof AmqpMessageReturnedException amre) {
148+
MessageChannel returnChannel = getReturnChannel();
149+
if (returnChannel != null) {
150+
Message<?> returnedMessage = buildReturnedMessage(
151+
new ReturnedMessage(amre.getReturnedMessage(), amre.getReplyCode(), amre.getReplyText(),
152+
amre.getExchange(), amre.getRoutingKey()),
153+
AsyncAmqpOutboundGateway.this.messageConverter);
154+
sendOutput(returnedMessage, returnChannel, true);
155+
}
156+
this.correlationData.setReturned(amre.getReturned());
157+
/*
158+
* Complete the user's future (if present) since the async template will only complete
159+
* once, successfully, or with a failure.
160+
*/
161+
this.correlationData.getFuture().complete(new Confirm(true, null));
144162
}
145-
}
146-
if (ex instanceof AmqpMessageReturnedException) {
147-
AmqpMessageReturnedException amre = (AmqpMessageReturnedException) ex;
148-
MessageChannel returnChannel = getReturnChannel();
149-
if (returnChannel != null) {
150-
Message<?> returnedMessage = buildReturnedMessage(
151-
new ReturnedMessage(amre.getReturnedMessage(), amre.getReplyCode(), amre.getReplyText(),
152-
amre.getExchange(), amre.getRoutingKey()),
153-
AsyncAmqpOutboundGateway.this.messageConverter);
154-
sendOutput(returnedMessage, returnChannel, true);
163+
else {
164+
sendErrorMessage(this.requestMessage, exceptionToSend);
155165
}
156-
this.correlationData.setReturned(amre.getReturned());
157-
/*
158-
* Complete the user's future (if present) since the async template will only complete
159-
* once, successfully, or with a failure.
160-
*/
161-
this.correlationData.getFuture().set(new Confirm(true, null));
162-
}
163-
else {
164-
sendErrorMessage(this.requestMessage, exceptionToSend);
165166
}
166167
}
167168

168169
}
169170

170-
private final class CorrelationCallback implements ListenableFutureCallback<Boolean> {
171+
private final class CorrelationCallback implements BiConsumer<Boolean, Throwable> {
171172

172173
private final CorrelationData correlationData;
173174

@@ -179,19 +180,17 @@ private final class CorrelationCallback implements ListenableFutureCallback<Bool
179180
}
180181

181182
@Override
182-
public void onSuccess(Boolean result) {
183-
try {
184-
handleConfirm(this.correlationData, result, this.replyFuture.getNackCause());
185-
}
186-
catch (Exception e) {
187-
logger.error("Failed to send publisher confirm");
183+
public void accept(Boolean result, Throwable throwable) {
184+
if (result != null) {
185+
try {
186+
handleConfirm(this.correlationData, result, this.replyFuture.getNackCause());
187+
}
188+
catch (Exception e) {
189+
logger.error("Failed to send publisher confirm");
190+
}
188191
}
189192
}
190193

191-
@Override
192-
public void onFailure(Throwable ex) {
193-
}
194-
195194
}
196195

197196
}

spring-integration-amqp/src/test/java/org/springframework/integration/amqp/outbound/AsyncAmqpGatewayTests.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2021 the original author or authors.
2+
* Copyright 2016-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -26,6 +26,7 @@
2626
import static org.mockito.Mockito.spy;
2727

2828
import java.util.UUID;
29+
import java.util.concurrent.CompletableFuture;
2930
import java.util.concurrent.CountDownLatch;
3031
import java.util.concurrent.TimeUnit;
3132
import java.util.function.Supplier;
@@ -35,7 +36,7 @@
3536

3637
import org.springframework.amqp.core.AmqpReplyTimeoutException;
3738
import org.springframework.amqp.rabbit.AsyncRabbitTemplate;
38-
import org.springframework.amqp.rabbit.AsyncRabbitTemplate.RabbitMessageFuture;
39+
import org.springframework.amqp.rabbit.RabbitMessageFuture;
3940
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
4041
import org.springframework.amqp.rabbit.connection.CorrelationData;
4142
import org.springframework.amqp.rabbit.core.RabbitTemplate;
@@ -58,7 +59,6 @@
5859
import org.springframework.messaging.Message;
5960
import org.springframework.messaging.MessagingException;
6061
import org.springframework.messaging.support.ErrorMessage;
61-
import org.springframework.util.concurrent.SettableListenableFuture;
6262

6363
/**
6464
* @author Gary Russell
@@ -198,15 +198,13 @@ void testConfirmsAndReturns() throws Exception {
198198
ackChannel.receive(10000);
199199
ackChannel.purge(null);
200200

201+
RabbitMessageFuture future = mock(RabbitMessageFuture.class);
202+
willReturn("nacknack").given(future).getNackCause();
203+
willReturn(CompletableFuture.completedFuture(false)).given(future).getConfirm();
204+
201205
asyncTemplate = mock(AsyncRabbitTemplate.class);
202-
RabbitMessageFuture future = asyncTemplate.new RabbitMessageFuture(null, null);
203206
willReturn(future).given(asyncTemplate).sendAndReceive(anyString(), anyString(),
204207
any(org.springframework.amqp.core.Message.class));
205-
DirectFieldAccessor dfa = new DirectFieldAccessor(future);
206-
dfa.setPropertyValue("nackCause", "nacknack");
207-
SettableListenableFuture<Boolean> confirmFuture = new SettableListenableFuture<>();
208-
confirmFuture.set(false);
209-
dfa.setPropertyValue("confirm", confirmFuture);
210208
new DirectFieldAccessor(gateway).setPropertyValue("template", asyncTemplate);
211209

212210
message = MessageBuilder.withPayload("buz").setErrorChannel(errorChannel).build();

spring-integration-amqp/src/test/java/org/springframework/integration/amqp/outbound/OutboundEndpointTests.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2020 the original author or authors.
2+
* Copyright 2002-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -23,6 +23,7 @@
2323
import static org.mockito.ArgumentMatchers.isNull;
2424
import static org.mockito.BDDMockito.willAnswer;
2525
import static org.mockito.BDDMockito.willDoNothing;
26+
import static org.mockito.BDDMockito.willReturn;
2627
import static org.mockito.Mockito.mock;
2728
import static org.mockito.Mockito.spy;
2829
import static org.mockito.Mockito.verify;
@@ -35,6 +36,7 @@
3536

3637
import org.springframework.amqp.core.Message;
3738
import org.springframework.amqp.rabbit.AsyncRabbitTemplate;
39+
import org.springframework.amqp.rabbit.RabbitMessageFuture;
3840
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
3941
import org.springframework.amqp.rabbit.connection.CorrelationData;
4042
import org.springframework.amqp.rabbit.core.RabbitTemplate;
@@ -99,7 +101,7 @@ public void testAsyncDelayExpression() {
99101
new SimpleMessageListenerContainer(connectionFactory), "replyTo"));
100102
amqpTemplate.setTaskScheduler(mock(TaskScheduler.class));
101103
AsyncAmqpOutboundGateway gateway = new AsyncAmqpOutboundGateway(amqpTemplate);
102-
willAnswer(invocation -> amqpTemplate.new RabbitMessageFuture("foo", invocation.getArgument(2)))
104+
willReturn(mock(RabbitMessageFuture.class))
103105
.given(amqpTemplate)
104106
.sendAndReceive(anyString(), anyString(), any(Message.class));
105107
gateway.setExchangeName("foo");
@@ -121,8 +123,7 @@ public void testHeaderMapperWinsAdapter() {
121123
RabbitTemplate amqpTemplate = spy(new RabbitTemplate(connectionFactory));
122124
AmqpOutboundEndpoint endpoint = new AmqpOutboundEndpoint(amqpTemplate);
123125
endpoint.setHeadersMappedLast(true);
124-
final AtomicReference<Message> amqpMessage =
125-
new AtomicReference<Message>();
126+
final AtomicReference<Message> amqpMessage = new AtomicReference<>();
126127
willAnswer(invocation -> {
127128
amqpMessage.set(invocation.getArgument(2));
128129
return null;
@@ -146,8 +147,7 @@ public void testHeaderMapperWinsGateway() {
146147
DefaultAmqpHeaderMapper mapper = DefaultAmqpHeaderMapper.inboundMapper();
147148
mapper.setRequestHeaderNames("*");
148149
endpoint.setHeaderMapper(mapper);
149-
final AtomicReference<Message> amqpMessage =
150-
new AtomicReference<Message>();
150+
final AtomicReference<Message> amqpMessage = new AtomicReference<>();
151151
willAnswer(invocation -> {
152152
amqpMessage.set(invocation.getArgument(2));
153153
return null;

0 commit comments

Comments
 (0)