Skip to content

Commit 6c7a563

Browse files
committed
spring-projectsGH-3902: Add Kotlin Coroutines Support
Fixes spring-projects#3902 * Add `isAsync()` propagation from the `MessagingMethodInvokerHelper` to the `AbstractMessageProducingHandler` to set into its `async` property. The logic is based on a `CompletableFuture`, `Publisher` or Kotlin `suspend` return types of the POJO method * Introduce `IntegrationMessageHandlerMethodFactory` and `IntegrationInvocableHandlerMethod` to extend the logic to newly introduced `ContinuationHandlerMethodArgumentResolver` and call for Kotlin suspend functions. * Remove `MessageHandlerMethodFactoryCreatingFactoryBean` since its logic now is covered with the `IntegrationMessageHandlerMethodFactory` * Kotlin suspend functions are essentially reactive, so use `CoroutinesUtils.invokeSuspendingFunction()` and existing logic in the `AbstractMessageProducingHandler` to deal with `Publisher` reply
1 parent fb1c89f commit 6c7a563

11 files changed

+233
-112
lines changed

build.gradle

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
buildscript {
2-
ext.kotlinVersion = '1.7.10'
2+
ext.kotlinVersion = '1.7.20'
33
ext.isCI = System.getenv('GITHUB_ACTION') || System.getenv('bamboo_buildKey')
44
repositories {
55
mavenCentral()
@@ -80,6 +80,7 @@ ext {
8080
junit4Version = '4.13.2'
8181
junitJupiterVersion = '5.9.0'
8282
jythonVersion = '2.7.3'
83+
kotlinCoroutinesVersion = '1.6.4'
8384
kryoVersion = '5.3.0'
8485
lettuceVersion = '6.2.0.RELEASE'
8586
log4jVersion = '2.19.0'
@@ -164,6 +165,7 @@ allprojects {
164165
mavenBom "io.micrometer:micrometer-tracing-bom:$micrometerTracingVersion"
165166
mavenBom "org.apache.camel:camel-bom:$camelVersion"
166167
mavenBom "org.testcontainers:testcontainers-bom:$testcontainersVersion"
168+
mavenBom "org.jetbrains.kotlinx:kotlinx-coroutines-bom:$kotlinCoroutinesVersion"
167169
}
168170

169171
}
@@ -526,7 +528,7 @@ project('spring-integration-core') {
526528
}
527529
optionalApi "io.github.resilience4j:resilience4j-ratelimiter:$resilience4jVersion"
528530
optionalApi "org.apache.avro:avro:$avroVersion"
529-
optionalApi 'org.jetbrains.kotlin:kotlin-stdlib-jdk8'
531+
optionalApi 'org.jetbrains.kotlinx:kotlinx-coroutines-reactor'
530532

531533
testImplementation "org.aspectj:aspectjweaver:$aspectjVersion"
532534
testImplementation "org.hamcrest:hamcrest-core:$hamcrestVersion"

spring-integration-core/src/main/java/org/springframework/integration/config/DefaultConfiguringBeanFactoryPostProcessor.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.springframework.integration.context.IntegrationContextUtils;
4646
import org.springframework.integration.context.IntegrationProperties;
4747
import org.springframework.integration.handler.LoggingHandler;
48+
import org.springframework.integration.handler.support.IntegrationMessageHandlerMethodFactory;
4849
import org.springframework.integration.json.JsonPathUtils;
4950
import org.springframework.integration.support.DefaultMessageBuilderFactory;
5051
import org.springframework.integration.support.SmartLifecycleRoleController;
@@ -462,10 +463,10 @@ private void registerListMessageHandlerMethodFactory() {
462463
}
463464

464465
private static BeanDefinitionBuilder createMessageHandlerMethodFactoryBeanDefinition(boolean listCapable) {
465-
return BeanDefinitionBuilder.genericBeanDefinition(MessageHandlerMethodFactoryCreatingFactoryBean.class,
466-
() -> new MessageHandlerMethodFactoryCreatingFactoryBean(listCapable))
466+
return BeanDefinitionBuilder.genericBeanDefinition(IntegrationMessageHandlerMethodFactory.class,
467+
() -> new IntegrationMessageHandlerMethodFactory(listCapable))
467468
.addConstructorArgValue(listCapable)
468-
.addPropertyReference("argumentResolverMessageConverter",
469+
.addPropertyReference("messageConverter",
469470
IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME);
470471
}
471472

spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageProducingHandler.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,10 @@
3131
import org.reactivestreams.Publisher;
3232

3333
import org.springframework.beans.factory.BeanFactory;
34+
import org.springframework.beans.factory.BeanFactoryAware;
3435
import org.springframework.core.ReactiveAdapter;
3536
import org.springframework.core.ReactiveAdapterRegistry;
37+
import org.springframework.core.convert.ConversionService;
3638
import org.springframework.integration.IntegrationMessageHeaderAccessor;
3739
import org.springframework.integration.channel.ReactiveStreamsSubscribableChannel;
3840
import org.springframework.integration.context.IntegrationContextUtils;
@@ -522,6 +524,22 @@ protected Object resolveErrorChannel(final MessageHeaders requestHeaders) {
522524
return errorChannel;
523525
}
524526

527+
protected void setupMessageProcessor(MessageProcessor<?> processor) {
528+
if (processor instanceof AbstractMessageProcessor<?> abstractMessageProcessor) {
529+
ConversionService conversionService = getConversionService();
530+
if (conversionService != null) {
531+
abstractMessageProcessor.setConversionService(conversionService);
532+
}
533+
}
534+
BeanFactory beanFactory = getBeanFactory();
535+
if (processor instanceof BeanFactoryAware beanFactoryAware && beanFactory != null) {
536+
beanFactoryAware.setBeanFactory(beanFactory);
537+
}
538+
if (!this.async && processor instanceof MethodInvokingMessageProcessor<?> methodInvokingMessageProcessor) {
539+
this.async = methodInvokingMessageProcessor.isAsync();
540+
}
541+
}
542+
525543
private final class ReplyFutureCallback implements BiConsumer<Object, Throwable> {
526544

527545
private final Message<?> requestMessage;

spring-integration-core/src/main/java/org/springframework/integration/handler/MethodInvokingMessageProcessor.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,10 @@ public boolean isRunning() {
102102
return this.delegate.isRunning();
103103
}
104104

105+
public boolean isAsync() {
106+
return this.delegate.isAsync();
107+
}
108+
105109
@Override
106110
@Nullable
107111
@SuppressWarnings("unchecked")

spring-integration-core/src/main/java/org/springframework/integration/handler/ServiceActivatingHandler.java

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,7 @@
1818

1919
import java.lang.reflect.Method;
2020

21-
import org.springframework.beans.factory.BeanFactoryAware;
2221
import org.springframework.context.Lifecycle;
23-
import org.springframework.core.convert.ConversionService;
2422
import org.springframework.integration.IntegrationPattern;
2523
import org.springframework.integration.IntegrationPatternType;
2624
import org.springframework.integration.annotation.ServiceActivator;
@@ -69,15 +67,7 @@ public IntegrationPatternType getIntegrationPatternType() {
6967

7068
@Override
7169
protected void doInit() {
72-
if (this.processor instanceof AbstractMessageProcessor) {
73-
ConversionService conversionService = getConversionService();
74-
if (conversionService != null) {
75-
((AbstractMessageProcessor<?>) this.processor).setConversionService(conversionService);
76-
}
77-
}
78-
if (this.processor instanceof BeanFactoryAware && this.getBeanFactory() != null) {
79-
((BeanFactoryAware) this.processor).setBeanFactory(this.getBeanFactory());
80-
}
70+
setupMessageProcessor(this.processor);
8171
}
8272

8373
@Override
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.handler.support;
18+
19+
import org.springframework.core.MethodParameter;
20+
import org.springframework.messaging.Message;
21+
import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver;
22+
23+
import reactor.core.publisher.Mono;
24+
25+
/**
26+
* No-op resolver for method arguments of type {@link kotlin.coroutines.Continuation}.
27+
*
28+
* @author Artem Bilan
29+
*
30+
* @since 6.0
31+
*/
32+
public class ContinuationHandlerMethodArgumentResolver implements HandlerMethodArgumentResolver {
33+
34+
@Override
35+
public boolean supportsParameter(MethodParameter parameter) {
36+
return "kotlin.coroutines.Continuation".equals(parameter.getParameterType().getName());
37+
}
38+
39+
@Override
40+
public Object resolveArgument(MethodParameter parameter, Message<?> message) {
41+
return Mono.empty();
42+
}
43+
44+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.handler.support;
18+
19+
import java.lang.reflect.Method;
20+
21+
import org.springframework.core.CoroutinesUtils;
22+
import org.springframework.core.KotlinDetector;
23+
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
24+
25+
/**
26+
* An {@link InvocableHandlerMethod} extension for Spring Integration requirements.
27+
*
28+
* @author Artem Bilan
29+
*
30+
* @since 6.0
31+
*/
32+
public class IntegrationInvocableHandlerMethod extends InvocableHandlerMethod {
33+
34+
public IntegrationInvocableHandlerMethod(Object bean, Method method) {
35+
super(bean, method);
36+
}
37+
38+
@Override
39+
protected Object doInvoke(Object... args) throws Exception {
40+
Method method = getBridgedMethod();
41+
if (KotlinDetector.isSuspendingFunction(method)) {
42+
return CoroutinesUtils.invokeSuspendingFunction(method, getBean(), args);
43+
}
44+
else {
45+
return super.doInvoke(args);
46+
}
47+
}
48+
49+
}
Lines changed: 41 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021 the original author or authors.
2+
* Copyright 2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -14,88 +14,92 @@
1414
* limitations under the License.
1515
*/
1616

17-
package org.springframework.integration.config;
17+
package org.springframework.integration.handler.support;
1818

19+
import java.lang.reflect.Method;
1920
import java.util.ArrayList;
2021
import java.util.List;
2122

22-
import org.springframework.beans.BeansException;
2323
import org.springframework.beans.factory.BeanFactory;
2424
import org.springframework.beans.factory.BeanFactoryAware;
2525
import org.springframework.beans.factory.BeanInitializationException;
26-
import org.springframework.beans.factory.FactoryBean;
2726
import org.springframework.beans.factory.InitializingBean;
28-
import org.springframework.integration.handler.support.CollectionArgumentResolver;
29-
import org.springframework.integration.handler.support.MapArgumentResolver;
30-
import org.springframework.integration.handler.support.PayloadExpressionArgumentResolver;
31-
import org.springframework.integration.handler.support.PayloadsArgumentResolver;
3227
import org.springframework.integration.support.NullAwarePayloadArgumentResolver;
3328
import org.springframework.messaging.converter.MessageConverter;
3429
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
35-
import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory;
3630
import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver;
31+
import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite;
32+
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
3733

3834
/**
39-
* The {@link FactoryBean} for creating integration-specific {@link MessageHandlerMethodFactory} instance.
40-
* It adds these custom {@link HandlerMethodArgumentResolver}s in the order:
41-
* <ul>
42-
* <li>{@link PayloadExpressionArgumentResolver};
43-
* <li>{@link NullAwarePayloadArgumentResolver};
44-
* <li>{@link PayloadsArgumentResolver};
45-
* <li>{@link CollectionArgumentResolver} if {@link #listCapable} is true;
46-
* <li>{@link MapArgumentResolver}.
47-
* </ul>
35+
* Extension of the {@link DefaultMessageHandlerMethodFactory} for Spring Integration requirements.
4836
*
49-
* @author Artyem Bilan
37+
* @author Artem Bilan
5038
*
51-
* @since 5.5.7
39+
* @since 6.0
5240
*/
53-
class MessageHandlerMethodFactoryCreatingFactoryBean
54-
implements FactoryBean<MessageHandlerMethodFactory>, BeanFactoryAware {
41+
public class IntegrationMessageHandlerMethodFactory extends DefaultMessageHandlerMethodFactory {
42+
43+
private final HandlerMethodArgumentResolverComposite argumentResolvers =
44+
new HandlerMethodArgumentResolverComposite();
5545

5646
private final boolean listCapable;
5747

58-
private MessageConverter argumentResolverMessageConverter;
48+
private MessageConverter messageConverter;
5949

6050
private BeanFactory beanFactory;
6151

62-
MessageHandlerMethodFactoryCreatingFactoryBean(boolean listCapable) {
52+
public IntegrationMessageHandlerMethodFactory() {
53+
this(false);
54+
}
55+
56+
public IntegrationMessageHandlerMethodFactory(boolean listCapable) {
6357
this.listCapable = listCapable;
6458
}
6559

66-
public void setArgumentResolverMessageConverter(MessageConverter argumentResolverMessageConverter) {
67-
this.argumentResolverMessageConverter = argumentResolverMessageConverter;
60+
@Override
61+
public void setMessageConverter(MessageConverter messageConverter) {
62+
super.setMessageConverter(messageConverter);
63+
this.messageConverter = messageConverter;
6864
}
6965

7066
@Override
71-
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
67+
public void setBeanFactory(BeanFactory beanFactory) {
68+
super.setBeanFactory(beanFactory);
7269
this.beanFactory = beanFactory;
7370
}
7471

7572
@Override
76-
public Class<?> getObjectType() {
77-
return MessageHandlerMethodFactory.class;
73+
public void afterPropertiesSet() {
74+
setCustomArgumentResolvers(buildArgumentResolvers(this.listCapable));
75+
super.afterPropertiesSet();
7876
}
7977

8078
@Override
81-
public MessageHandlerMethodFactory getObject() {
82-
DefaultMessageHandlerMethodFactory handlerMethodFactory = new DefaultMessageHandlerMethodFactory();
83-
handlerMethodFactory.setBeanFactory(this.beanFactory);
84-
handlerMethodFactory.setMessageConverter(this.argumentResolverMessageConverter);
85-
handlerMethodFactory.setCustomArgumentResolvers(buildArgumentResolvers(this.listCapable));
86-
handlerMethodFactory.afterPropertiesSet();
87-
return handlerMethodFactory;
79+
protected List<HandlerMethodArgumentResolver> initArgumentResolvers() {
80+
List<HandlerMethodArgumentResolver> resolvers = super.initArgumentResolvers();
81+
this.argumentResolvers.addResolvers(resolvers);
82+
return resolvers;
83+
}
84+
85+
@Override
86+
public InvocableHandlerMethod createInvocableHandlerMethod(Object bean, Method method) {
87+
InvocableHandlerMethod handlerMethod = new IntegrationInvocableHandlerMethod(bean, method);
88+
handlerMethod.setMessageMethodArgumentResolvers(this.argumentResolvers);
89+
return handlerMethod;
8890
}
8991

9092
private List<HandlerMethodArgumentResolver> buildArgumentResolvers(boolean listCapable) {
9193
List<HandlerMethodArgumentResolver> resolvers = new ArrayList<>();
9294
resolvers.add(new PayloadExpressionArgumentResolver());
93-
resolvers.add(new NullAwarePayloadArgumentResolver(this.argumentResolverMessageConverter));
95+
resolvers.add(new NullAwarePayloadArgumentResolver(this.messageConverter));
9496
resolvers.add(new PayloadsArgumentResolver());
9597
if (listCapable) {
9698
resolvers.add(new CollectionArgumentResolver(true));
9799
}
98100
resolvers.add(new MapArgumentResolver());
101+
resolvers.add(new ContinuationHandlerMethodArgumentResolver());
102+
99103
for (HandlerMethodArgumentResolver resolver : resolvers) {
100104
if (resolver instanceof BeanFactoryAware) {
101105
((BeanFactoryAware) resolver).setBeanFactory(this.beanFactory);

0 commit comments

Comments
 (0)