Skip to content

Commit 5572c21

Browse files
authored
Fix deprecations around ListenableFuture (#3865)
* Fix deprecations around ListenableFuture SF has deprecated a `ListenableFuture` and API around it * Migrate to `CompletableFuture` everywhere a `ListenableFuture` has been used * Suppress a deprecation for `ListenableFuture` keeping the functionality until the next version * Resolve deprecations nad removals from the latest Spring for Apache Kafka * Fix documentation for the `ListenableFuture` in favor of `CompletableFuture` NOTE: the AMQP module is left as is until `ListenableFuture` deprecation is resolved in Spring AMQP * * Restore some `ListenableFuture` test for messaging gateway
1 parent 2e9bead commit 5572c21

File tree

23 files changed

+196
-259
lines changed

23 files changed

+196
-259
lines changed

spring-integration-core/src/main/java/org/springframework/integration/gateway/GatewayProxyFactoryBean.java

Lines changed: 7 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2021 the original author or authors.
2+
* Copyright 2002-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -26,7 +26,6 @@
2626
import java.util.List;
2727
import java.util.Map;
2828
import java.util.Set;
29-
import java.util.concurrent.Callable;
3029
import java.util.concurrent.CompletableFuture;
3130
import java.util.concurrent.Executor;
3231
import java.util.concurrent.Future;
@@ -49,7 +48,6 @@
4948
import org.springframework.beans.factory.FactoryBean;
5049
import org.springframework.core.MethodParameter;
5150
import org.springframework.core.ResolvableType;
52-
import org.springframework.core.task.AsyncListenableTaskExecutor;
5351
import org.springframework.core.task.AsyncTaskExecutor;
5452
import org.springframework.core.task.SimpleAsyncTaskExecutor;
5553
import org.springframework.core.task.support.TaskExecutorAdapter;
@@ -145,10 +143,6 @@ public class GatewayProxyFactoryBean extends AbstractEndpoint
145143

146144
private boolean asyncExecutorExplicitlySet;
147145

148-
private Class<?> asyncSubmitType;
149-
150-
private Class<?> asyncSubmitListenableType;
151-
152146
private volatile boolean initialized;
153147

154148
private Map<String, GatewayMethodMetadata> methodMetadataMap;
@@ -468,15 +462,6 @@ protected void onInit() {
468462
new ProxyFactory(this.serviceInterface, this);
469463
gatewayProxyFactory.addAdvice(new DefaultMethodInvokingMethodInterceptor());
470464
this.serviceProxy = gatewayProxyFactory.getProxy(this.beanClassLoader);
471-
if (this.asyncExecutor != null) {
472-
Callable<String> task = () -> null;
473-
Future<String> submitType = this.asyncExecutor.submit(task);
474-
this.asyncSubmitType = submitType.getClass();
475-
if (this.asyncExecutor instanceof AsyncListenableTaskExecutor) {
476-
submitType = ((AsyncListenableTaskExecutor) this.asyncExecutor).submitListenable(task);
477-
this.asyncSubmitListenableType = submitType.getClass();
478-
}
479-
}
480465
this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(beanFactory);
481466
this.initialized = true;
482467
}
@@ -511,6 +496,7 @@ public Object getObject() {
511496

512497
@Override
513498
@Nullable
499+
@SuppressWarnings("deprecation")
514500
public Object invoke(final MethodInvocation invocation) throws Throwable { // NOSONAR
515501
final Class<?> returnType;
516502
MethodInvocationGateway gateway = this.gatewayMap.get(invocation.getMethod());
@@ -522,15 +508,16 @@ public Object invoke(final MethodInvocation invocation) throws Throwable { // NO
522508
}
523509
if (this.asyncExecutor != null && !Object.class.equals(returnType)) {
524510
Invoker invoker = new Invoker(invocation);
525-
if (returnType.isAssignableFrom(this.asyncSubmitType)) {
511+
if (Future.class.equals(returnType)) {
526512
return this.asyncExecutor.submit(invoker::get);
527513
}
528-
else if (returnType.isAssignableFrom(this.asyncSubmitListenableType)) {
529-
return ((AsyncListenableTaskExecutor) this.asyncExecutor).submitListenable(invoker::get);
530-
}
531514
else if (CompletableFuture.class.equals(returnType)) { // exact
532515
return CompletableFuture.supplyAsync(invoker, this.asyncExecutor);
533516
}
517+
else if (org.springframework.util.concurrent.ListenableFuture.class.equals(returnType)) {
518+
return ((org.springframework.core.task.AsyncListenableTaskExecutor) this.asyncExecutor)
519+
.submitListenable(invoker::get);
520+
}
534521
else if (Future.class.isAssignableFrom(returnType)) {
535522
logger.debug(() -> "AsyncTaskExecutor submit*() return types are incompatible with the method return " +
536523
"type; running on calling thread; the downstream flow must return the required Future: "

spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageProducingHandler.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@
5151
import org.springframework.util.Assert;
5252
import org.springframework.util.ObjectUtils;
5353
import org.springframework.util.StringUtils;
54-
import org.springframework.util.concurrent.ListenableFuture;
5554

5655
import reactor.core.publisher.Flux;
5756
import reactor.core.publisher.Mono;
@@ -106,9 +105,9 @@ public void setOutputChannelName(String outputChannelName) {
106105
}
107106

108107
/**
109-
* Allow async replies. If the handler reply is a {@link ListenableFuture}, send
110-
* the output when it is satisfied rather than sending the future as the result.
111-
* Ignored for return types other than {@link ListenableFuture}.
108+
* Allow async replies. If the handler reply is a {@link CompletableFuture} or {@link Publisher},
109+
* send the output when it is satisfied rather than sending the future as the result.
110+
* Ignored for return types other than {@link CompletableFuture} or {@link Publisher}.
112111
* @param async true to allow.
113112
* @since 4.3
114113
*/
@@ -299,6 +298,7 @@ else if (reply instanceof AbstractIntegrationMessageBuilder<?>) {
299298
return replyChannel;
300299
}
301300

301+
@SuppressWarnings("deprecation")
302302
private void doProduceOutput(Message<?> requestMessage, MessageHeaders requestHeaders, Object reply,
303303
@Nullable Object replyChannelArg) {
304304

@@ -307,7 +307,7 @@ private void doProduceOutput(Message<?> requestMessage, MessageHeaders requestHe
307307
replyChannel = getOutputChannel();
308308
}
309309

310-
if (this.async && (reply instanceof ListenableFuture<?>
310+
if (this.async && (reply instanceof org.springframework.util.concurrent.ListenableFuture<?>
311311
|| reply instanceof CompletableFuture<?>
312312
|| reply instanceof Publisher<?>)) {
313313

@@ -351,13 +351,14 @@ else if (reply instanceof AbstractIntegrationMessageBuilder) {
351351
return builder;
352352
}
353353

354+
@SuppressWarnings("deprecation")
354355
private void asyncNonReactiveReply(Message<?> requestMessage, Object reply, @Nullable Object replyChannel) {
355356
CompletableFuture<?> future;
356357
if (reply instanceof CompletableFuture<?>) {
357358
future = (CompletableFuture<?>) reply;
358359
}
359-
else if (reply instanceof ListenableFuture<?>) {
360-
future = ((ListenableFuture<?>) reply).completable();
360+
else if (reply instanceof org.springframework.util.concurrent.ListenableFuture<?>) {
361+
future = ((org.springframework.util.concurrent.ListenableFuture<?>) reply).completable();
361362
}
362363
else {
363364
Mono<?> reactiveReply;

spring-integration-core/src/test/java/org/springframework/integration/config/annotation/AnnotatedEndpointActivationTests.java

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,10 +17,12 @@
1717
package org.springframework.integration.config.annotation;
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
2021

21-
import org.junit.Before;
22-
import org.junit.Test;
23-
import org.junit.runner.RunWith;
22+
import java.util.concurrent.CompletableFuture;
23+
24+
import org.junit.jupiter.api.BeforeEach;
25+
import org.junit.jupiter.api.Test;
2426

2527
import org.springframework.beans.factory.annotation.Autowired;
2628
import org.springframework.beans.factory.annotation.Qualifier;
@@ -33,9 +35,7 @@
3335
import org.springframework.messaging.PollableChannel;
3436
import org.springframework.messaging.support.GenericMessage;
3537
import org.springframework.test.annotation.DirtiesContext;
36-
import org.springframework.test.context.junit4.SpringRunner;
37-
import org.springframework.util.concurrent.ListenableFuture;
38-
import org.springframework.util.concurrent.SettableListenableFuture;
38+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
3939

4040
/**
4141
* @author Dave Syer
@@ -44,7 +44,7 @@
4444
* @author Artem Bilan
4545
* @author Yilin Wei
4646
*/
47-
@RunWith(SpringRunner.class)
47+
@SpringJUnitConfig
4848
@DirtiesContext
4949
public class AnnotatedEndpointActivationTests {
5050

@@ -72,7 +72,7 @@ public class AnnotatedEndpointActivationTests {
7272
// them will get the message.
7373
private static volatile int count = 0;
7474

75-
@Before
75+
@BeforeEach
7676
public void resetCount() {
7777
count = 0;
7878
}
@@ -108,11 +108,12 @@ public void sendAndReceiveImplicitInputChannel() {
108108
assertThat(count).isEqualTo(1);
109109
}
110110

111-
@Test(expected = MessageDeliveryException.class)
111+
@Test
112112
@DirtiesContext
113113
public void stopContext() {
114114
applicationContext.stop();
115-
this.input.send(new GenericMessage<>("foo"));
115+
assertThatExceptionOfType(MessageDeliveryException.class)
116+
.isThrownBy(() -> this.input.send(new GenericMessage<>("foo")));
116117
}
117118

118119
@Test
@@ -159,9 +160,9 @@ public String process(String message) {
159160
private static class AnnotatedEndpoint3 {
160161

161162
@ServiceActivator(inputChannel = "inputAsync", outputChannel = "outputAsync", async = "true")
162-
public ListenableFuture<String> process(String message) {
163-
SettableListenableFuture<String> future = new SettableListenableFuture<>();
164-
future.set(message);
163+
public CompletableFuture<String> process(String message) {
164+
CompletableFuture<String> future = new CompletableFuture<>();
165+
future.complete(message);
165166
return future;
166167
}
167168

spring-integration-core/src/test/java/org/springframework/integration/config/xml/GatewayParserTests.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2021 the original author or authors.
2+
* Copyright 2002-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -59,7 +59,6 @@
5959
import org.springframework.messaging.SubscribableChannel;
6060
import org.springframework.messaging.support.ChannelInterceptor;
6161
import org.springframework.messaging.support.GenericMessage;
62-
import org.springframework.scheduling.annotation.AsyncResult;
6362
import org.springframework.test.annotation.DirtiesContext;
6463
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
6564

@@ -403,7 +402,7 @@ private void startResponder(final PollableChannel requestChannel, final MessageC
403402
.setCorrelationId(request.getHeaders().getId()).build();
404403
Object payload = null;
405404
if (request.getPayload().equals("futureSync")) {
406-
payload = new AsyncResult<Message<?>>(reply);
405+
payload = CompletableFuture.completedFuture(reply);
407406
}
408407
else if (request.getPayload().equals("flowCompletable")) {
409408
payload = CompletableFuture.<String>completedFuture("SYNC_COMPLETABLE");
@@ -448,7 +447,7 @@ public void setBeanName(String beanName) {
448447
}
449448

450449
@Override
451-
@SuppressWarnings({ "rawtypes", "unchecked" })
450+
@SuppressWarnings("unchecked")
452451
public <T> Future<T> submit(Callable<T> task) {
453452
try {
454453
Future<?> result = super.submit(task);
@@ -462,7 +461,8 @@ public <T> Future<T> submit(Callable<T> task) {
462461
modifiedMessage = MessageBuilder.fromMessage(message)
463462
.setHeader("executor", this.beanName).build();
464463
}
465-
return new AsyncResult(modifiedMessage);
464+
465+
return (Future<T>) CompletableFuture.completedFuture(modifiedMessage);
466466
}
467467
catch (Exception e) {
468468
throw new IllegalStateException("unexpected exception in testExecutor", e);

spring-integration-core/src/test/java/org/springframework/integration/gateway/AsyncGatewayTests.java

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -21,6 +21,7 @@
2121
import static org.mockito.Mockito.mock;
2222

2323
import java.time.Duration;
24+
import java.util.concurrent.CompletableFuture;
2425
import java.util.concurrent.CountDownLatch;
2526
import java.util.concurrent.ExecutionException;
2627
import java.util.concurrent.Future;
@@ -39,8 +40,6 @@
3940
import org.springframework.messaging.PollableChannel;
4041
import org.springframework.messaging.support.ChannelInterceptor;
4142
import org.springframework.messaging.support.MessageBuilder;
42-
import org.springframework.util.concurrent.ListenableFuture;
43-
import org.springframework.util.concurrent.ListenableFutureCallback;
4443

4544
import reactor.core.publisher.Mono;
4645

@@ -105,22 +104,15 @@ public void listenableFutureWithMessageReturned() throws Exception {
105104
proxyFactory.setBeanFactory(mock(BeanFactory.class));
106105
proxyFactory.afterPropertiesSet();
107106
TestEchoService service = (TestEchoService) proxyFactory.getObject();
108-
ListenableFuture<Message<?>> f = service.returnMessageListenable("foo");
107+
CompletableFuture<Message<?>> f = service.returnMessageListenable("foo");
109108
long start = System.currentTimeMillis();
110109
final AtomicReference<Message<?>> result = new AtomicReference<>();
111110
final CountDownLatch latch = new CountDownLatch(1);
112-
f.addCallback(new ListenableFutureCallback<Message<?>>() {
113-
114-
@Override
115-
public void onSuccess(Message<?> msg) {
116-
result.set(msg);
111+
f.whenComplete((message, throwable) -> {
112+
if (throwable == null) {
113+
result.set(message);
117114
latch.countDown();
118115
}
119-
120-
@Override
121-
public void onFailure(Throwable t) {
122-
}
123-
124116
});
125117
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
126118
long elapsed = System.currentTimeMillis() - start;
@@ -300,7 +292,7 @@ private static void startResponder(final PollableChannel requestChannel) {
300292
String header = (String) input.getHeaders().get("method");
301293
if (header != null && header.startsWith("returnCustomFuture")) {
302294
reply = MessageBuilder.withPayload(new CustomFuture(payload,
303-
(Thread) input.getHeaders().get("thread")))
295+
(Thread) input.getHeaders().get("thread")))
304296
.copyHeaders(input.getHeaders())
305297
.build();
306298
}
@@ -317,7 +309,7 @@ private interface TestEchoService {
317309

318310
Future<?> returnSomething(String s);
319311

320-
ListenableFuture<Message<?>> returnMessageListenable(String s);
312+
CompletableFuture<Message<?>> returnMessageListenable(String s);
321313

322314
@Gateway(headers = @GatewayHeader(name = "method", expression = "#gatewayMethod.name"))
323315
CustomFuture returnCustomFuture(String s);

0 commit comments

Comments
 (0)