diff --git a/build.gradle b/build.gradle index f04257269a0..1a61b5f3c15 100644 --- a/build.gradle +++ b/build.gradle @@ -84,6 +84,7 @@ ext { junit4Version = '4.13.2' junitJupiterVersion = '5.9.0' jythonVersion = '2.7.3' + kotlinCoroutinesVersion = '1.6.4' kryoVersion = '5.3.0' lettuceVersion = '6.2.0.RELEASE' log4jVersion = '2.19.0' @@ -168,6 +169,7 @@ allprojects { mavenBom "org.apache.camel:camel-bom:$camelVersion" mavenBom "org.testcontainers:testcontainers-bom:$testcontainersVersion" mavenBom "org.apache.groovy:groovy-bom:$groovyVersion" + mavenBom "org.jetbrains.kotlinx:kotlinx-coroutines-bom:$kotlinCoroutinesVersion" } } @@ -541,7 +543,7 @@ project('spring-integration-core') { } optionalApi "io.github.resilience4j:resilience4j-ratelimiter:$resilience4jVersion" optionalApi "org.apache.avro:avro:$avroVersion" - optionalApi 'org.jetbrains.kotlin:kotlin-stdlib-jdk8' + optionalApi 'org.jetbrains.kotlinx:kotlinx-coroutines-reactor' testImplementation "org.aspectj:aspectjweaver:$aspectjVersion" testImplementation "org.hamcrest:hamcrest-core:$hamcrestVersion" diff --git a/spring-integration-core/src/main/java/org/springframework/integration/config/DefaultConfiguringBeanFactoryPostProcessor.java b/spring-integration-core/src/main/java/org/springframework/integration/config/DefaultConfiguringBeanFactoryPostProcessor.java index 54b16f68aee..83fb6708d95 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/config/DefaultConfiguringBeanFactoryPostProcessor.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/config/DefaultConfiguringBeanFactoryPostProcessor.java @@ -45,6 +45,7 @@ import org.springframework.integration.context.IntegrationContextUtils; import org.springframework.integration.context.IntegrationProperties; import org.springframework.integration.handler.LoggingHandler; +import org.springframework.integration.handler.support.IntegrationMessageHandlerMethodFactory; import org.springframework.integration.json.JsonPathUtils; import org.springframework.integration.support.DefaultMessageBuilderFactory; import org.springframework.integration.support.SmartLifecycleRoleController; @@ -462,10 +463,10 @@ private void registerListMessageHandlerMethodFactory() { } private static BeanDefinitionBuilder createMessageHandlerMethodFactoryBeanDefinition(boolean listCapable) { - return BeanDefinitionBuilder.genericBeanDefinition(MessageHandlerMethodFactoryCreatingFactoryBean.class, - () -> new MessageHandlerMethodFactoryCreatingFactoryBean(listCapable)) + return BeanDefinitionBuilder.genericBeanDefinition(IntegrationMessageHandlerMethodFactory.class, + () -> new IntegrationMessageHandlerMethodFactory(listCapable)) .addConstructorArgValue(listCapable) - .addPropertyReference("argumentResolverMessageConverter", + .addPropertyReference("messageConverter", IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME); } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/gateway/GatewayMethodInboundMessageMapper.java b/spring-integration-core/src/main/java/org/springframework/integration/gateway/GatewayMethodInboundMessageMapper.java index 89371812203..aa1d80b6e11 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/gateway/GatewayMethodInboundMessageMapper.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/gateway/GatewayMethodInboundMessageMapper.java @@ -44,6 +44,7 @@ import org.springframework.integration.support.AbstractIntegrationMessageBuilder; import org.springframework.integration.support.DefaultMessageBuilderFactory; import org.springframework.integration.support.MessageBuilderFactory; +import org.springframework.integration.util.CoroutinesUtils; import org.springframework.integration.util.MessagingAnnotationUtils; import org.springframework.lang.Nullable; import org.springframework.messaging.Message; @@ -289,6 +290,9 @@ public Message toMessage(MethodArgsHolder holder, @Nullable Map returnType; - MethodInvocationGateway gateway = this.gatewayMap.get(invocation.getMethod()); + Method method = invocation.getMethod(); + Class returnType; + MethodInvocationGateway gateway = this.gatewayMap.get(method); if (gateway != null) { returnType = gateway.returnType; } else { - returnType = invocation.getMethod().getReturnType(); + returnType = method.getReturnType(); } if (this.asyncExecutor != null && !Object.class.equals(returnType)) { Invoker invoker = new Invoker(invocation); @@ -524,7 +528,7 @@ else if (Future.class.isAssignableFrom(returnType)) { + returnType.getSimpleName()); } } - if (Mono.class.isAssignableFrom(returnType)) { + if (Mono.class.isAssignableFrom(returnType) || KotlinDetector.isSuspendingFunction(method)) { return doInvoke(invocation, false); } else { @@ -534,8 +538,7 @@ else if (Future.class.isAssignableFrom(returnType)) { @Nullable protected Object doInvoke(MethodInvocation invocation, boolean runningOnCallerThread) throws Throwable { // NOSONAR - Method method = invocation.getMethod(); - if (AopUtils.isToStringMethod(method)) { + if (AopUtils.isToStringMethod(invocation.getMethod())) { return "gateway proxy for service interface [" + this.serviceInterface + "]"; } try { @@ -575,16 +578,29 @@ private Object invokeGatewayMethod(MethodInvocation invocation, boolean runningO else { response = sendOrSendAndReceive(invocation, gateway, shouldReturnMessage, !oneWay); } - return response(gateway.returnType, shouldReturnMessage, response); + + Object continuation = null; + if (gateway.isSuspendingFunction) { + for (Object argument : invocation.getArguments()) { + if (argument != null && CoroutinesUtils.isContinuation(argument)) { + continuation = argument; + break; + } + } + } + + return response(gateway.returnType, shouldReturnMessage, response, continuation); } @Nullable - private Object response(Class returnType, boolean shouldReturnMessage, @Nullable Object response) { + private Object response(Class returnType, boolean shouldReturnMessage, + @Nullable Object response, @Nullable Object continuation) { + if (shouldReturnMessage) { return response; } else { - return response != null ? convert(response, returnType) : null; + return response != null ? convert(response, returnType, continuation) : null; } } @@ -627,7 +643,7 @@ private Object sendOrSendAndReceive(MethodInvocation invocation, MethodInvocatio Object[] args = invocation.getArguments(); if (shouldReply) { - if (gateway.isMonoReturn) { + if (gateway.isMonoReturn || gateway.isSuspendingFunction) { Mono> messageMono = gateway.sendAndReceiveMessageReactive(args); if (!shouldReturnMessage) { return messageMono.map(Message::getPayload); @@ -641,7 +657,7 @@ private Object sendOrSendAndReceive(MethodInvocation invocation, MethodInvocatio } } else { - if (gateway.isMonoReturn) { + if (gateway.isMonoReturn || gateway.isSuspendingFunction) { return Mono.fromRunnable(() -> gateway.send(args)); } else { @@ -1013,17 +1029,28 @@ protected void doStop() { this.gatewayMap.values().forEach(MethodInvocationGateway::stop); } - @SuppressWarnings("unchecked") @Nullable - private T convert(Object source, Class expectedReturnType) { + @SuppressWarnings("unchecked") + private T convert(Object source, Class expectedReturnType, @Nullable Object continuation) { + if (continuation != null) { + return CoroutinesUtils.monoAwaitSingleOrNull((Mono) source, continuation); + } if (Future.class.isAssignableFrom(expectedReturnType)) { return (T) source; } if (Mono.class.isAssignableFrom(expectedReturnType)) { return (T) source; } - if (getConversionService() != null) { - return getConversionService().convert(source, expectedReturnType); + + + return doConvert(source, expectedReturnType); + } + + @Nullable + private T doConvert(Object source, Class expectedReturnType) { + ConversionService conversionService = getConversionService(); + if (conversionService != null) { + return conversionService.convert(source, expectedReturnType); } else { return this.typeConverter.convertIfNecessary(source, expectedReturnType); @@ -1050,6 +1077,8 @@ private static final class MethodInvocationGateway extends MessagingGatewaySuppo private boolean pollable; + private boolean isSuspendingFunction; + MethodInvocationGateway(GatewayMethodInboundMessageMapper messageMapper) { setRequestMapper(messageMapper); } @@ -1088,6 +1117,7 @@ void setupReturnType(Class serviceInterface, Method method) { this.expectMessage = hasReturnParameterizedWithMessage(resolvableType); } this.isVoidReturn = isVoidReturnType(resolvableType); + this.isSuspendingFunction = KotlinDetector.isSuspendingFunction(method); } private boolean hasReturnParameterizedWithMessage(ResolvableType resolvableType) { diff --git a/spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageProducingHandler.java b/spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageProducingHandler.java index f7e0bffc0dd..4aea97325a9 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageProducingHandler.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageProducingHandler.java @@ -31,8 +31,10 @@ import org.reactivestreams.Publisher; import org.springframework.beans.factory.BeanFactory; +import org.springframework.beans.factory.BeanFactoryAware; import org.springframework.core.ReactiveAdapter; import org.springframework.core.ReactiveAdapterRegistry; +import org.springframework.core.convert.ConversionService; import org.springframework.integration.IntegrationMessageHeaderAccessor; import org.springframework.integration.channel.ReactiveStreamsSubscribableChannel; import org.springframework.integration.context.IntegrationContextUtils; @@ -188,8 +190,7 @@ public Collection getNotPropagatedHeaders() { /** * Add header patterns ("xxx*", "*xxx", "*xxx*" or "xxx*yyy") * that will NOT be copied from the inbound message if - * {@link #shouldCopyRequestHeaders()} is true, instead of overwriting the existing - * set. + * {@link #shouldCopyRequestHeaders()} is true, instead of overwriting the existing set. * @param headers the headers to not propagate from the inbound message. * @since 4.3.10 * @see #setNotPropagatedHeaders(String...) @@ -308,21 +309,24 @@ private void doProduceOutput(Message requestMessage, MessageHeaders requestHe replyChannel = getOutputChannel(); } - if (this.async && (reply instanceof org.springframework.util.concurrent.ListenableFuture - || reply instanceof CompletableFuture - || reply instanceof Publisher)) { + ReactiveAdapter reactiveAdapter = null; - if (reply instanceof Publisher && - replyChannel instanceof ReactiveStreamsSubscribableChannel) { + if (this.async && + (reply instanceof org.springframework.util.concurrent.ListenableFuture + || reply instanceof CompletableFuture + || (reactiveAdapter = ReactiveAdapterRegistry.getSharedInstance().getAdapter(null, reply)) != null)) { - ((ReactiveStreamsSubscribableChannel) replyChannel) + if (replyChannel instanceof ReactiveStreamsSubscribableChannel reactiveStreamsSubscribableChannel) { + Publisher reactiveReply = toPublisherReply(reply, reactiveAdapter); + reactiveStreamsSubscribableChannel .subscribeTo( - Flux.from((Publisher) reply) + Flux.from(reactiveReply) .doOnError((ex) -> sendErrorMessage(requestMessage, ex)) .map(result -> createOutputMessage(result, requestHeaders))); } else { - asyncNonReactiveReply(requestMessage, reply, replyChannel); + CompletableFuture futureReply = toFutureReply(reply, reactiveAdapter); + futureReply.whenComplete(new ReplyFutureCallback(requestMessage, replyChannel)); } } else { @@ -330,6 +334,43 @@ private void doProduceOutput(Message requestMessage, MessageHeaders requestHe } } + private static Publisher toPublisherReply(Object reply, @Nullable ReactiveAdapter reactiveAdapter) { + if (reactiveAdapter != null) { + return reactiveAdapter.toPublisher(reply); + } + else { + return Mono.fromFuture(toCompletableFuture(reply)); + } + } + + private static CompletableFuture toFutureReply(Object reply, @Nullable ReactiveAdapter reactiveAdapter) { + if (reactiveAdapter != null) { + Mono reactiveReply; + Publisher publisher = reactiveAdapter.toPublisher(reply); + if (reactiveAdapter.isMultiValue()) { + reactiveReply = Mono.just(publisher); + } + else { + reactiveReply = Mono.from(publisher); + } + + return reactiveReply.publishOn(Schedulers.boundedElastic()).toFuture(); + } + else { + return toCompletableFuture(reply); + } + } + + @SuppressWarnings("deprecation") + private static CompletableFuture toCompletableFuture(Object reply) { + if (reply instanceof CompletableFuture) { + return (CompletableFuture) reply; + } + else { + return ((org.springframework.util.concurrent.ListenableFuture) reply).completable(); + } + } + private AbstractIntegrationMessageBuilder addRoutingSlipHeader(Object reply, List routingSlip, AtomicInteger routingSlipIndex) { @@ -352,30 +393,6 @@ else if (reply instanceof AbstractIntegrationMessageBuilder) { return builder; } - @SuppressWarnings("deprecation") - private void asyncNonReactiveReply(Message requestMessage, Object reply, @Nullable Object replyChannel) { - CompletableFuture future; - if (reply instanceof CompletableFuture) { - future = (CompletableFuture) reply; - } - else if (reply instanceof org.springframework.util.concurrent.ListenableFuture) { - future = ((org.springframework.util.concurrent.ListenableFuture) reply).completable(); - } - else { - Mono reactiveReply; - ReactiveAdapter adapter = ReactiveAdapterRegistry.getSharedInstance().getAdapter(null, reply); - if (adapter != null && adapter.isMultiValue()) { - reactiveReply = Mono.just(reply); - } - else { - reactiveReply = Mono.from((Publisher) reply); - } - - future = reactiveReply.publishOn(Schedulers.boundedElastic()).toFuture(); - } - future.whenComplete(new ReplyFutureCallback(requestMessage, replyChannel)); - } - private Object getOutputChannelFromRoutingSlip(Object reply, Message requestMessage, List routingSlip, AtomicInteger routingSlipIndex) { @@ -444,7 +461,7 @@ else if (output instanceof AbstractIntegrationMessageBuilder) { * null, and it must be an instance of either String or {@link MessageChannel}. * @param output the output object to send * @param replyChannelArg the 'replyChannel' value from the original request - * @param useArgChannel - use the replyChannel argument (must not be null), not + * @param useArgChannel use the replyChannel argument (must not be null), not * the configured output channel. */ protected void sendOutput(Object output, @Nullable Object replyChannelArg, boolean useArgChannel) { @@ -522,6 +539,22 @@ protected Object resolveErrorChannel(final MessageHeaders requestHeaders) { return errorChannel; } + protected void setupMessageProcessor(MessageProcessor processor) { + if (processor instanceof AbstractMessageProcessor abstractMessageProcessor) { + ConversionService conversionService = getConversionService(); + if (conversionService != null) { + abstractMessageProcessor.setConversionService(conversionService); + } + } + BeanFactory beanFactory = getBeanFactory(); + if (processor instanceof BeanFactoryAware beanFactoryAware && beanFactory != null) { + beanFactoryAware.setBeanFactory(beanFactory); + } + if (!this.async && processor instanceof MethodInvokingMessageProcessor methodInvokingMessageProcessor) { + this.async = methodInvokingMessageProcessor.isAsync(); + } + } + private final class ReplyFutureCallback implements BiConsumer { private final Message requestMessage; diff --git a/spring-integration-core/src/main/java/org/springframework/integration/handler/MethodInvokingMessageProcessor.java b/spring-integration-core/src/main/java/org/springframework/integration/handler/MethodInvokingMessageProcessor.java index 44406304928..95aa253145e 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/handler/MethodInvokingMessageProcessor.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/handler/MethodInvokingMessageProcessor.java @@ -102,6 +102,10 @@ public boolean isRunning() { return this.delegate.isRunning(); } + public boolean isAsync() { + return this.delegate.isAsync(); + } + @Override @Nullable @SuppressWarnings("unchecked") diff --git a/spring-integration-core/src/main/java/org/springframework/integration/handler/ServiceActivatingHandler.java b/spring-integration-core/src/main/java/org/springframework/integration/handler/ServiceActivatingHandler.java index fa3e4978105..3e144fdd87e 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/handler/ServiceActivatingHandler.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/handler/ServiceActivatingHandler.java @@ -18,9 +18,7 @@ import java.lang.reflect.Method; -import org.springframework.beans.factory.BeanFactoryAware; import org.springframework.context.Lifecycle; -import org.springframework.core.convert.ConversionService; import org.springframework.integration.IntegrationPattern; import org.springframework.integration.IntegrationPatternType; import org.springframework.integration.annotation.ServiceActivator; @@ -69,15 +67,7 @@ public IntegrationPatternType getIntegrationPatternType() { @Override protected void doInit() { - if (this.processor instanceof AbstractMessageProcessor) { - ConversionService conversionService = getConversionService(); - if (conversionService != null) { - ((AbstractMessageProcessor) this.processor).setConversionService(conversionService); - } - } - if (this.processor instanceof BeanFactoryAware && this.getBeanFactory() != null) { - ((BeanFactoryAware) this.processor).setBeanFactory(this.getBeanFactory()); - } + setupMessageProcessor(this.processor); } @Override diff --git a/spring-integration-core/src/main/java/org/springframework/integration/handler/support/ContinuationHandlerMethodArgumentResolver.java b/spring-integration-core/src/main/java/org/springframework/integration/handler/support/ContinuationHandlerMethodArgumentResolver.java new file mode 100644 index 00000000000..8522d24e07f --- /dev/null +++ b/spring-integration-core/src/main/java/org/springframework/integration/handler/support/ContinuationHandlerMethodArgumentResolver.java @@ -0,0 +1,45 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.handler.support; + +import org.springframework.core.MethodParameter; +import org.springframework.integration.util.CoroutinesUtils; +import org.springframework.messaging.Message; +import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver; + +import reactor.core.publisher.Mono; + +/** + * No-op resolver for method arguments of type {@link kotlin.coroutines.Continuation}. + * + * @author Artem Bilan + * + * @since 6.0 + */ +public class ContinuationHandlerMethodArgumentResolver implements HandlerMethodArgumentResolver { + + @Override + public boolean supportsParameter(MethodParameter parameter) { + return CoroutinesUtils.isContinuationType(parameter.getParameterType()); + } + + @Override + public Object resolveArgument(MethodParameter parameter, Message message) { + return Mono.empty(); + } + +} diff --git a/spring-integration-core/src/main/java/org/springframework/integration/handler/support/IntegrationInvocableHandlerMethod.java b/spring-integration-core/src/main/java/org/springframework/integration/handler/support/IntegrationInvocableHandlerMethod.java new file mode 100644 index 00000000000..130ec9a066a --- /dev/null +++ b/spring-integration-core/src/main/java/org/springframework/integration/handler/support/IntegrationInvocableHandlerMethod.java @@ -0,0 +1,49 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.handler.support; + +import java.lang.reflect.Method; + +import org.springframework.core.CoroutinesUtils; +import org.springframework.core.KotlinDetector; +import org.springframework.messaging.handler.invocation.InvocableHandlerMethod; + +/** + * An {@link InvocableHandlerMethod} extension for Spring Integration requirements. + * + * @author Artem Bilan + * + * @since 6.0 + */ +public class IntegrationInvocableHandlerMethod extends InvocableHandlerMethod { + + public IntegrationInvocableHandlerMethod(Object bean, Method method) { + super(bean, method); + } + + @Override + protected Object doInvoke(Object... args) throws Exception { + Method method = getBridgedMethod(); + if (KotlinDetector.isSuspendingFunction(method)) { + return CoroutinesUtils.invokeSuspendingFunction(method, getBean(), args); + } + else { + return super.doInvoke(args); + } + } + +} diff --git a/spring-integration-core/src/main/java/org/springframework/integration/config/MessageHandlerMethodFactoryCreatingFactoryBean.java b/spring-integration-core/src/main/java/org/springframework/integration/handler/support/IntegrationMessageHandlerMethodFactory.java similarity index 50% rename from spring-integration-core/src/main/java/org/springframework/integration/config/MessageHandlerMethodFactoryCreatingFactoryBean.java rename to spring-integration-core/src/main/java/org/springframework/integration/handler/support/IntegrationMessageHandlerMethodFactory.java index 9e5b208f802..0ae125b2dbc 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/config/MessageHandlerMethodFactoryCreatingFactoryBean.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/handler/support/IntegrationMessageHandlerMethodFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 the original author or authors. + * Copyright 2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,88 +14,95 @@ * limitations under the License. */ -package org.springframework.integration.config; +package org.springframework.integration.handler.support; +import java.lang.reflect.Method; import java.util.ArrayList; import java.util.List; -import org.springframework.beans.BeansException; import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.BeanFactoryAware; import org.springframework.beans.factory.BeanInitializationException; -import org.springframework.beans.factory.FactoryBean; import org.springframework.beans.factory.InitializingBean; -import org.springframework.integration.handler.support.CollectionArgumentResolver; -import org.springframework.integration.handler.support.MapArgumentResolver; -import org.springframework.integration.handler.support.PayloadExpressionArgumentResolver; -import org.springframework.integration.handler.support.PayloadsArgumentResolver; +import org.springframework.core.KotlinDetector; import org.springframework.integration.support.NullAwarePayloadArgumentResolver; import org.springframework.messaging.converter.MessageConverter; import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory; -import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory; import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver; +import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite; +import org.springframework.messaging.handler.invocation.InvocableHandlerMethod; /** - * The {@link FactoryBean} for creating integration-specific {@link MessageHandlerMethodFactory} instance. - * It adds these custom {@link HandlerMethodArgumentResolver}s in the order: - *
    - *
  • {@link PayloadExpressionArgumentResolver}; - *
  • {@link NullAwarePayloadArgumentResolver}; - *
  • {@link PayloadsArgumentResolver}; - *
  • {@link CollectionArgumentResolver} if {@link #listCapable} is true; - *
  • {@link MapArgumentResolver}. - *
+ * Extension of the {@link DefaultMessageHandlerMethodFactory} for Spring Integration requirements. * - * @author Artyem Bilan + * @author Artem Bilan * - * @since 5.5.7 + * @since 6.0 */ -class MessageHandlerMethodFactoryCreatingFactoryBean - implements FactoryBean, BeanFactoryAware { +public class IntegrationMessageHandlerMethodFactory extends DefaultMessageHandlerMethodFactory { + + private final HandlerMethodArgumentResolverComposite argumentResolvers = + new HandlerMethodArgumentResolverComposite(); private final boolean listCapable; - private MessageConverter argumentResolverMessageConverter; + private MessageConverter messageConverter; private BeanFactory beanFactory; - MessageHandlerMethodFactoryCreatingFactoryBean(boolean listCapable) { + public IntegrationMessageHandlerMethodFactory() { + this(false); + } + + public IntegrationMessageHandlerMethodFactory(boolean listCapable) { this.listCapable = listCapable; } - public void setArgumentResolverMessageConverter(MessageConverter argumentResolverMessageConverter) { - this.argumentResolverMessageConverter = argumentResolverMessageConverter; + @Override + public void setMessageConverter(MessageConverter messageConverter) { + super.setMessageConverter(messageConverter); + this.messageConverter = messageConverter; } @Override - public void setBeanFactory(BeanFactory beanFactory) throws BeansException { + public void setBeanFactory(BeanFactory beanFactory) { + super.setBeanFactory(beanFactory); this.beanFactory = beanFactory; } @Override - public Class getObjectType() { - return MessageHandlerMethodFactory.class; + public void afterPropertiesSet() { + setCustomArgumentResolvers(buildArgumentResolvers(this.listCapable)); + super.afterPropertiesSet(); + } + + @Override + protected List initArgumentResolvers() { + List resolvers = super.initArgumentResolvers(); + this.argumentResolvers.addResolvers(resolvers); + return resolvers; } @Override - public MessageHandlerMethodFactory getObject() { - DefaultMessageHandlerMethodFactory handlerMethodFactory = new DefaultMessageHandlerMethodFactory(); - handlerMethodFactory.setBeanFactory(this.beanFactory); - handlerMethodFactory.setMessageConverter(this.argumentResolverMessageConverter); - handlerMethodFactory.setCustomArgumentResolvers(buildArgumentResolvers(this.listCapable)); - handlerMethodFactory.afterPropertiesSet(); - return handlerMethodFactory; + public InvocableHandlerMethod createInvocableHandlerMethod(Object bean, Method method) { + InvocableHandlerMethod handlerMethod = new IntegrationInvocableHandlerMethod(bean, method); + handlerMethod.setMessageMethodArgumentResolvers(this.argumentResolvers); + return handlerMethod; } private List buildArgumentResolvers(boolean listCapable) { List resolvers = new ArrayList<>(); resolvers.add(new PayloadExpressionArgumentResolver()); - resolvers.add(new NullAwarePayloadArgumentResolver(this.argumentResolverMessageConverter)); + resolvers.add(new NullAwarePayloadArgumentResolver(this.messageConverter)); resolvers.add(new PayloadsArgumentResolver()); if (listCapable) { resolvers.add(new CollectionArgumentResolver(true)); } resolvers.add(new MapArgumentResolver()); + if (KotlinDetector.isKotlinPresent()) { + resolvers.add(new ContinuationHandlerMethodArgumentResolver()); + } + for (HandlerMethodArgumentResolver resolver : resolvers) { if (resolver instanceof BeanFactoryAware) { ((BeanFactoryAware) resolver).setBeanFactory(this.beanFactory); diff --git a/spring-integration-core/src/main/java/org/springframework/integration/handler/support/MessagingMethodInvokerHelper.java b/spring-integration-core/src/main/java/org/springframework/integration/handler/support/MessagingMethodInvokerHelper.java index c525a48f59a..01711466755 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/handler/support/MessagingMethodInvokerHelper.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/handler/support/MessagingMethodInvokerHelper.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2021 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -49,6 +49,7 @@ import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; import org.springframework.context.Lifecycle; import org.springframework.context.expression.StandardBeanExpressionResolver; +import org.springframework.core.KotlinDetector; import org.springframework.core.LocalVariableTableParameterNameDiscoverer; import org.springframework.core.MethodParameter; import org.springframework.core.ParameterNameDiscoverer; @@ -73,13 +74,13 @@ import org.springframework.integration.context.IntegrationContextUtils; import org.springframework.integration.core.Pausable; import org.springframework.integration.support.MutableMessage; -import org.springframework.integration.support.NullAwarePayloadArgumentResolver; import org.springframework.integration.support.converter.ConfigurableCompositeMessageConverter; import org.springframework.integration.support.json.JsonObjectMapper; import org.springframework.integration.support.json.JsonObjectMapperProvider; import org.springframework.integration.support.management.ManageableLifecycle; import org.springframework.integration.util.AbstractExpressionEvaluator; import org.springframework.integration.util.AnnotatedMethodFilter; +import org.springframework.integration.util.CoroutinesUtils; import org.springframework.integration.util.FixedMethodFilter; import org.springframework.integration.util.MessagingAnnotationUtils; import org.springframework.integration.util.UniqueMethodFilter; @@ -91,9 +92,7 @@ import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.handler.annotation.Headers; import org.springframework.messaging.handler.annotation.Payload; -import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory; import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory; -import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver; import org.springframework.messaging.handler.invocation.InvocableHandlerMethod; import org.springframework.messaging.handler.invocation.MethodArgumentResolutionException; import org.springframework.util.Assert; @@ -119,7 +118,6 @@ * @author Gary Russell * @author Artem Bilan * @author Trung Pham - * * @since 2.0 */ public class MessagingMethodInvokerHelper extends AbstractExpressionEvaluator implements ManageableLifecycle { @@ -162,8 +160,7 @@ public class MessagingMethodInvokerHelper extends AbstractExpressionEvaluator im SPEL_COMPILERS.put(SpelCompilerMode.MIXED, EXPRESSION_PARSER_MIXED); } - private MessageHandlerMethodFactory messageHandlerMethodFactory = - new DefaultMessageHandlerMethodFactory(); + private MessageHandlerMethodFactory messageHandlerMethodFactory; private final Object targetObject; @@ -491,7 +488,7 @@ private Object processInternal(ParametersWrapper parameters) { private synchronized void initialize() { if (isProvidedMessageHandlerFactoryBean()) { - LOGGER.info("Overriding default instance of MessageHandlerMethodFactory with provided one."); + LOGGER.trace("Overriding default instance of MessageHandlerMethodFactory with the one provided."); this.messageHandlerMethodFactory = getBeanFactory() .getBean( @@ -521,7 +518,6 @@ private boolean isProvidedMessageHandlerFactoryBean() { * This should not be needed in production but we have many tests * that don't run in an application context. */ - private void initializeHandler(HandlerMethod candidate) { ExpressionParser parser; if (candidate.useSpelInvoker == null) { @@ -547,35 +543,13 @@ private void configureLocalMessageHandlerFactory() { messageConverter.setBeanFactory(beanFactory); messageConverter.afterPropertiesSet(); - List customArgumentResolvers = new LinkedList<>(); - PayloadExpressionArgumentResolver payloadExpressionArgumentResolver = new PayloadExpressionArgumentResolver(); - PayloadsArgumentResolver payloadsArgumentResolver = new PayloadsArgumentResolver(); - - customArgumentResolvers.add(payloadExpressionArgumentResolver); - customArgumentResolvers.add(new NullAwarePayloadArgumentResolver(messageConverter)); - customArgumentResolvers.add(payloadsArgumentResolver); - - CollectionArgumentResolver collectionArgumentResolver = null; - - if (this.canProcessMessageList) { - collectionArgumentResolver = new CollectionArgumentResolver(true); - customArgumentResolvers.add(collectionArgumentResolver); - } - - MapArgumentResolver mapArgumentResolver = new MapArgumentResolver(); - customArgumentResolvers.add(mapArgumentResolver); - payloadExpressionArgumentResolver.setBeanFactory(beanFactory); - payloadsArgumentResolver.setBeanFactory(beanFactory); - mapArgumentResolver.setBeanFactory(beanFactory); - if (collectionArgumentResolver != null) { - collectionArgumentResolver.setBeanFactory(beanFactory); - } - - DefaultMessageHandlerMethodFactory localHandlerMethodFactory = - (DefaultMessageHandlerMethodFactory) this.messageHandlerMethodFactory; + IntegrationMessageHandlerMethodFactory localHandlerMethodFactory = + new IntegrationMessageHandlerMethodFactory(this.canProcessMessageList); localHandlerMethodFactory.setMessageConverter(messageConverter); - localHandlerMethodFactory.setCustomArgumentResolvers(customArgumentResolvers); + localHandlerMethodFactory.setBeanFactory(beanFactory); localHandlerMethodFactory.afterPropertiesSet(); + + this.messageHandlerMethodFactory = localHandlerMethodFactory; } @Nullable @@ -802,7 +776,7 @@ private HandlerMethod obtainHandlerMethodIfAny(Method methodToProcess) { AopUtils.selectInvocableMethod(methodToProcess, ClassUtils.getUserClass(this.targetObject))); } catch (Exception ex) { - LOGGER.debug(ex, "Method [" + methodToProcess + "] is not eligible for Message handling."); + LOGGER.debug(ex, "Method [" + methodToProcess + "] is not eligible for Message handling."); return null; } @@ -828,9 +802,9 @@ private boolean isMethodEligible(Method methodToProcess) { private boolean isPausableMethod(Method pausableMethod) { Class declaringClass = pausableMethod.getDeclaringClass(); boolean pausable = (Pausable.class.isAssignableFrom(declaringClass) - || Lifecycle.class.isAssignableFrom(declaringClass)) + || Lifecycle.class.isAssignableFrom(declaringClass)) && ReflectionUtils.findMethod(Pausable.class, pausableMethod.getName(), - pausableMethod.getParameterTypes()) != null; + pausableMethod.getParameterTypes()) != null; if (pausable) { this.logger.trace(() -> pausableMethod + " is not considered a candidate method unless explicitly requested"); } @@ -870,7 +844,7 @@ private void populateHandlerMethod(Map, HandlerMethod> candidateMethods if (handlerMethod1.isMessageMethod()) { if (fallbackMessageMethods.containsKey(targetParameterType)) { // we need to check for duplicate type matches, - // but only if we end up falling back + // but only if we end up falling back, // and we'll only keep track of the first one ambiguousFallbackMessageGenericType.compareAndSet(null, targetParameterType); } @@ -910,7 +884,6 @@ private void findSingleSpecificMethodOnInterfacesIfProxy(Map, HandlerMe Map, HandlerMethod> candidateMethods) { if (AopUtils.isAopProxy(this.targetObject)) { final AtomicReference targetMethod = new AtomicReference<>(); - final AtomicReference> targetClass = new AtomicReference<>(); Class[] interfaces = ((Advised) this.targetObject).getProxiedInterfaces(); for (Class clazz : interfaces) { ReflectionUtils.doWithMethods(clazz, method1 -> { @@ -920,7 +893,6 @@ private void findSingleSpecificMethodOnInterfacesIfProxy(Map, HandlerMe } else { targetMethod.set(method1); - targetClass.set(clazz); } }, method12 -> method12.getName().equals(this.methodName)); } @@ -1047,6 +1019,14 @@ private static boolean isMethodDefinedOnObjectClass(Method method) { && method.getParameterTypes().length == 0)); } + public boolean isAsync() { + if (this.handlerMethodsList.size() == 1) { + Method methodToCheck = this.handlerMethodsList.get(0).values().iterator().next().method; + return KotlinDetector.isSuspendingFunction(methodToCheck); + } + return false; + } + /** * Helper class for generating and exposing metadata for a candidate handler method. The metadata includes the SpEL * expression and the expected payload type. @@ -1079,7 +1059,7 @@ private static class HandlerMethod { // The number of times InvocableHandlerMethod was attempted and failed - enables us to eventually // give up trying to call it when it just doesn't seem to be possible. - // Switching to spelOnly afterwards forever. + // Switching to 'spelOnly' afterwards forever. private volatile int failedAttempts = 0; HandlerMethod(Method method, boolean canProcessMessageList) { @@ -1191,6 +1171,9 @@ else if (Map.class.isAssignableFrom(parameterType)) { populateMapParameterForExpression(sb, parameterType); return true; } + else if (CoroutinesUtils.isContinuationType(parameterType)) { + sb.append("null"); + } else { sb.append("payload"); setExclusiveTargetParameterType(parameterTypeDescriptor, methodParameter); @@ -1370,7 +1353,7 @@ public static class ParametersWrapper { /** * SpEL Function to retrieve a required header. * @param headers the headers. - * @param header the header name + * @param header the header name * @return the header * @throws IllegalArgumentException if the header does not exist */ diff --git a/spring-integration-core/src/main/java/org/springframework/integration/splitter/AbstractMessageProcessingSplitter.java b/spring-integration-core/src/main/java/org/springframework/integration/splitter/AbstractMessageProcessingSplitter.java index 6dacd405656..846ba9ac352 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/splitter/AbstractMessageProcessingSplitter.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/splitter/AbstractMessageProcessingSplitter.java @@ -18,67 +18,57 @@ import java.util.Collection; -import org.springframework.beans.factory.BeanFactoryAware; import org.springframework.context.Lifecycle; -import org.springframework.core.convert.ConversionService; -import org.springframework.integration.handler.AbstractMessageProcessor; import org.springframework.integration.handler.MessageProcessor; import org.springframework.integration.support.management.ManageableLifecycle; import org.springframework.messaging.Message; import org.springframework.util.Assert; /** - * Base class for Message Splitter implementations that delegate to a - * {@link MessageProcessor} instance. + * Base class for Message Splitter implementations that delegate to a {@link MessageProcessor} instance. * * @author Mark Fisher * @author Artem Bilan + * * @since 2.0 */ -abstract class AbstractMessageProcessingSplitter extends AbstractMessageSplitter - implements ManageableLifecycle { +abstract class AbstractMessageProcessingSplitter extends AbstractMessageSplitter implements ManageableLifecycle { - private final MessageProcessor> messageProcessor; + private final MessageProcessor> processor; protected AbstractMessageProcessingSplitter(MessageProcessor> expressionEvaluatingMessageProcessor) { Assert.notNull(expressionEvaluatingMessageProcessor, "messageProcessor must not be null"); - this.messageProcessor = expressionEvaluatingMessageProcessor; + this.processor = expressionEvaluatingMessageProcessor; } @Override protected void doInit() { - ConversionService conversionService = getConversionService(); - if (conversionService != null && this.messageProcessor instanceof AbstractMessageProcessor) { - ((AbstractMessageProcessor) this.messageProcessor).setConversionService(conversionService); - } - if (this.messageProcessor instanceof BeanFactoryAware && this.getBeanFactory() != null) { - ((BeanFactoryAware) this.messageProcessor).setBeanFactory(this.getBeanFactory()); - } + setupMessageProcessor(this.processor); } @Override protected final Object splitMessage(Message message) { - return this.messageProcessor.processMessage(message); + return this.processor.processMessage(message); } @Override public void start() { - if (this.messageProcessor instanceof Lifecycle) { - ((Lifecycle) this.messageProcessor).start(); + if (this.processor instanceof Lifecycle lifecycle) { + lifecycle.start(); } } @Override public void stop() { - if (this.messageProcessor instanceof Lifecycle) { - ((Lifecycle) this.messageProcessor).stop(); + if (this.processor instanceof Lifecycle lifecycle) { + lifecycle.stop(); } } @Override public boolean isRunning() { - return !(this.messageProcessor instanceof Lifecycle) || ((Lifecycle) this.messageProcessor).isRunning(); + return !(this.processor instanceof Lifecycle) || ((Lifecycle) this.processor).isRunning(); } } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/util/CoroutinesUtils.java b/spring-integration-core/src/main/java/org/springframework/integration/util/CoroutinesUtils.java new file mode 100644 index 00000000000..4d9a4566334 --- /dev/null +++ b/spring-integration-core/src/main/java/org/springframework/integration/util/CoroutinesUtils.java @@ -0,0 +1,57 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.util; + +import org.springframework.core.KotlinDetector; +import org.springframework.lang.Nullable; +import org.springframework.util.Assert; + +import reactor.core.publisher.Mono; + +/** + * Additional utilities for working with Kotlin Coroutines. + * + * @author Artem Bilan + * + * @since 6.0 + * + * @see org.springframework.core.CoroutinesUtils + */ +public final class CoroutinesUtils { + + public static boolean isContinuation(Object candidate) { + return isContinuationType(candidate.getClass()); + } + + public static boolean isContinuationType(Class candidate) { + return KotlinDetector.isKotlinPresent() && kotlin.coroutines.Continuation.class.isAssignableFrom(candidate); + } + + @Nullable + @SuppressWarnings("unchecked") + public static T monoAwaitSingleOrNull(Mono source, Object continuation) { + Assert.state(isContinuation(continuation), () -> + "The 'continuation' must be an instance of 'kotlin.coroutines.Continuation', but it is: " + + continuation.getClass()); + return (T) kotlinx.coroutines.reactor.MonoKt.awaitSingleOrNull( + source, (kotlin.coroutines.Continuation) continuation); + } + + private CoroutinesUtils() { + } + +} diff --git a/spring-integration-core/src/test/kotlin/org/springframework/integration/function/FunctionsTests.kt b/spring-integration-core/src/test/kotlin/org/springframework/integration/function/FunctionsTests.kt index 82fad5b4950..46b98f1b461 100644 --- a/spring-integration-core/src/test/kotlin/org/springframework/integration/function/FunctionsTests.kt +++ b/spring-integration-core/src/test/kotlin/org/springframework/integration/function/FunctionsTests.kt @@ -17,27 +17,23 @@ package org.springframework.integration.function import assertk.assertThat -import assertk.assertions.containsAll -import assertk.assertions.isEqualTo -import assertk.assertions.isNotNull -import assertk.assertions.isTrue -import assertk.assertions.size +import assertk.assertions.* +import kotlinx.coroutines.flow.* +import kotlinx.coroutines.runBlocking import org.junit.jupiter.api.Test import org.springframework.beans.factory.annotation.Autowired import org.springframework.beans.factory.annotation.Qualifier import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration -import org.springframework.integration.annotation.EndpointId -import org.springframework.integration.annotation.InboundChannelAdapter -import org.springframework.integration.annotation.Poller -import org.springframework.integration.annotation.ServiceActivator -import org.springframework.integration.annotation.Transformer +import org.springframework.integration.annotation.* import org.springframework.integration.channel.DirectChannel +import org.springframework.integration.channel.FluxMessageChannel import org.springframework.integration.channel.QueueChannel import org.springframework.integration.config.EnableIntegration import org.springframework.integration.dsl.integrationFlow import org.springframework.integration.endpoint.SourcePollingChannelAdapter import org.springframework.integration.gateway.GatewayProxyFactoryBean +import org.springframework.integration.handler.ServiceActivatingHandler import org.springframework.messaging.Message import org.springframework.messaging.MessageChannel import org.springframework.messaging.PollableChannel @@ -46,8 +42,10 @@ import org.springframework.messaging.support.GenericMessage import org.springframework.messaging.support.MessageBuilder import org.springframework.test.annotation.DirtiesContext import org.springframework.test.context.junit.jupiter.SpringJUnitConfig +import reactor.core.publisher.Flux import reactor.core.publisher.Mono import reactor.test.StepVerifier +import java.time.Duration import java.util.* import java.util.concurrent.CountDownLatch import java.util.concurrent.TimeUnit @@ -88,8 +86,8 @@ class FunctionsTests { val replyChannel = QueueChannel() val message = MessageBuilder.withPayload("foo") - .setReplyChannel(replyChannel) - .build() + .setReplyChannel(replyChannel) + .build() this.functionServiceChannel.send(message) @@ -98,8 +96,8 @@ class FunctionsTests { val payload = receive?.payload assertThat(payload) - .isNotNull() - .isEqualTo("FOO") + .isNotNull() + .isEqualTo("FOO") } @Test @@ -139,8 +137,8 @@ class FunctionsTests { val mono = this.monoFunction.apply("test") StepVerifier.create(mono.map(Message<*>::getPayload).cast(String::class.java)) - .expectNext("TEST") - .verifyComplete() + .expectNext("TEST") + .verifyComplete() val gateways = this.monoFunctionGateway.gateways assertThat(gateways).size().isEqualTo(3) @@ -148,8 +146,97 @@ class FunctionsTests { assertThat(methodNames).containsAll("apply", "andThen", "compose") } + @Autowired + private lateinit var suspendServiceChannel: MessageChannel + + @Test + fun `verify suspend function`() { + val replyChannel = FluxMessageChannel() + val testPayload = "test coroutine" + val stepVerifier = + StepVerifier.create(Flux.from(replyChannel).map(Message<*>::getPayload).cast(String::class.java)) + .expectNext(testPayload.uppercase()) + .thenCancel() + .verifyLater() + + suspendServiceChannel.send( + MessageBuilder.withPayload(testPayload) + .setReplyChannel(replyChannel) + .build() + ) + + stepVerifier.verify(Duration.ofSeconds(10)) + } + + @Autowired + private lateinit var flowServiceChannel: MessageChannel + + @Test + fun `verify flow function`() { + val replyChannel = FluxMessageChannel() + val testPayload = "test flow" + val stepVerifier = + StepVerifier.create(Flux.from(replyChannel).map(Message<*>::getPayload).cast(String::class.java)) + .expectNext("$testPayload #1", "$testPayload #2", "$testPayload #3") + .thenCancel() + .verifyLater() + + flowServiceChannel.send( + MessageBuilder.withPayload(testPayload) + .setReplyChannel(replyChannel) + .build() + ) + + stepVerifier.verify(Duration.ofSeconds(10)) + } + + @Autowired + private lateinit var syncFlowServiceChannel: MessageChannel + + @Test + fun `verify sync flow function reply`() { + val replyChannel = QueueChannel() + val testPayload = "test flow" + + syncFlowServiceChannel.send( + MessageBuilder.withPayload(testPayload) + .setReplyChannel(replyChannel) + .build() + ) + + val receive = replyChannel.receive(10_000) + + val payload = receive?.payload + + assertThat(payload) + .isNotNull() + .isInstanceOf(Flow::class) + + runBlocking { + val strings = (payload as Flow).toList() + assertThat(strings).containsExactly("Sync $testPayload #1", "Sync $testPayload #2", "Sync $testPayload #3") + } + } + + @Autowired + private lateinit var suspendRequestChannel: DirectChannel + + @Autowired + private lateinit var suspendFunGateway: SuspendFunGateway + + @Test + fun `suspend gateway`() { + suspendRequestChannel.subscribe(ServiceActivatingHandler { m -> m.payload.toString().uppercase() }) + + runBlocking { + val reply = suspendFunGateway.suspendGateway("test suspend gateway") + assertThat(reply).isEqualTo("TEST SUSPEND GATEWAY") + } + } + @Configuration @EnableIntegration + @IntegrationComponentScan class Config { @Bean @@ -171,8 +258,10 @@ class FunctionsTests { fun counterChannel() = DirectChannel() @Bean - @InboundChannelAdapter(value = "counterChannel", autoStartup = "false", - poller = Poller(fixedRate = "10", maxMessagesPerPoll = "1")) + @InboundChannelAdapter( + value = "counterChannel", autoStartup = "false", + poller = Poller(fixedRate = "10", maxMessagesPerPoll = "1") + ) @EndpointId("kotlinSupplierChannelAdapter") fun kotlinSupplier(): () -> String { return { "baz" } @@ -180,16 +269,43 @@ class FunctionsTests { @Bean fun flowFromSupplier() = - integrationFlow({ "" }, { poller { it.fixedDelay(10).maxMessagesPerPoll(1) } }) { - transform { "blank" } - channel { queue("fromSupplierQueue") } - } + integrationFlow({ "" }, { poller { it.fixedDelay(10).maxMessagesPerPoll(1) } }) { + transform { "blank" } + channel { queue("fromSupplierQueue") } + } @Bean fun monoFunctionGateway() = - integrationFlow({ proxyDefaultMethods(true) }) { - handle({ p, _ -> Mono.just(p).map(String::uppercase) }) { async(true) } + integrationFlow({ proxyDefaultMethods(true) }) { + handle({ p, _ -> Mono.just(p).map(String::uppercase) }) { async(true) } + } + + + @ServiceActivator(inputChannel = "suspendServiceChannel") + suspend fun suspendServiceFunction(payload: String) = payload.uppercase() + + @ServiceActivator(inputChannel = "flowServiceChannel", async = "true") + fun flowServiceFunction(payload: String) = + flow { + for (i in 1..3) { + emit("$payload #$i") } + } + + @ServiceActivator(inputChannel = "syncFlowServiceChannel") + fun syncFlowServiceFunction(payload: String) = + (1..3).asFlow() + .map { "Sync $payload #$it" } + + @Bean + fun suspendRequestChannel() = DirectChannel() + + } + + @MessagingGateway(defaultRequestChannel = "suspendRequestChannel") + interface SuspendFunGateway { + + suspend fun suspendGateway(payload: String): String } diff --git a/spring-integration-groovy/src/test/java/org/springframework/integration/groovy/config/GroovySplitterTests.java b/spring-integration-groovy/src/test/java/org/springframework/integration/groovy/config/GroovySplitterTests.java index a8dc75243df..1a0ea52ab92 100644 --- a/spring-integration-groovy/src/test/java/org/springframework/integration/groovy/config/GroovySplitterTests.java +++ b/spring-integration-groovy/src/test/java/org/springframework/integration/groovy/config/GroovySplitterTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,8 +18,7 @@ import static org.assertj.core.api.Assertions.assertThat; -import org.junit.Test; -import org.junit.runner.RunWith; +import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; @@ -32,16 +31,14 @@ import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; -import org.springframework.test.context.ContextConfiguration; -import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; /** * @author Mark Fisher * @author Artem Bilan * @since 2.0 */ -@ContextConfiguration -@RunWith(SpringJUnit4ClassRunner.class) +@SpringJUnitConfig public class GroovySplitterTests { @Autowired @@ -82,10 +79,10 @@ public void inlineScript() { public void testInt2433VerifyRiddingOfMessageProcessorsWrapping() { assertThat(this.groovySplitterMessageHandler instanceof MethodInvokingSplitter).isTrue(); @SuppressWarnings("rawtypes") - MessageProcessor messageProcessor = TestUtils.getPropertyValue(this.groovySplitterMessageHandler, - "messageProcessor", MessageProcessor.class); + MessageProcessor messageProcessor = + TestUtils.getPropertyValue(this.groovySplitterMessageHandler, "processor", MessageProcessor.class); //before it was MethodInvokingMessageProcessor - assertThat(messageProcessor instanceof GroovyScriptExecutingMessageProcessor).isTrue(); + assertThat(messageProcessor).isInstanceOf(GroovyScriptExecutingMessageProcessor.class); } } diff --git a/src/reference/asciidoc/functions-support.adoc b/src/reference/asciidoc/functions-support.adoc index dfce0d14488..86d9428c884 100644 --- a/src/reference/asciidoc/functions-support.adoc +++ b/src/reference/asciidoc/functions-support.adoc @@ -110,32 +110,3 @@ public IntegrationFlow supplierFlow() { ==== This function support is useful when used together with the https://cloud.spring.io/spring-cloud-function/[Spring Cloud Function] framework, where we have a function catalog and can refer to its member functions from an integration flow definition. - -[[kotlin-functions-support]] -==== Kotlin Lambdas - -The Framework also has been improved to support Kotlin lambdas for functions, so now you can use a combination of the Kotlin language and Spring Integration flow definitions: - -==== -[source, java] ----- -@Bean -@Transformer(inputChannel = "functionServiceChannel") -fun kotlinFunction(): (String) -> String { - return { it.toUpperCase() } -} - -@Bean -@ServiceActivator(inputChannel = "messageConsumerServiceChannel") -fun kotlinConsumer(): (Message) -> Unit { - return { print(it) } -} - -@Bean -@InboundChannelAdapter(value = "counterChannel", - poller = [Poller(fixedRate = "10", maxMessagesPerPoll = "1")]) -fun kotlinSupplier(): () -> String { - return { "baz" } -} ----- -==== diff --git a/src/reference/asciidoc/gateway.adoc b/src/reference/asciidoc/gateway.adoc index b172b49660f..619dc20f217 100644 --- a/src/reference/asciidoc/gateway.adoc +++ b/src/reference/asciidoc/gateway.adoc @@ -771,6 +771,8 @@ mono.subscribe(invoice -> handleInvoice(invoice)); The calling thread continues, with `handleInvoice()` being called when the flow completes. +Also see <<./kotlin-functions.adoc#kotlin-coroutines,Kotlin Coroutines>> for more information. + ===== Downstream Flows Returning an Asynchronous Type As mentioned in the <> section above, if you wish some downstream component to return a message with an async payload (`Future`, `Mono`, and others), you must explicitly set the async executor to `null` (or `""` when using XML configuration). diff --git a/src/reference/asciidoc/index-single.adoc b/src/reference/asciidoc/index-single.adoc index 0ae67197e7c..cb773feb104 100644 --- a/src/reference/asciidoc/index-single.adoc +++ b/src/reference/asciidoc/index-single.adoc @@ -37,6 +37,8 @@ include::./kotlin-dsl.adoc[] include::./system-management.adoc[] +include::./reactive-streams.adoc[] + include::./endpoint-summary.adoc[] include::./amqp.adoc[] diff --git a/src/reference/asciidoc/kotlin-functions.adoc b/src/reference/asciidoc/kotlin-functions.adoc new file mode 100644 index 00000000000..14455f0d6cb --- /dev/null +++ b/src/reference/asciidoc/kotlin-functions.adoc @@ -0,0 +1,88 @@ +[[kotlin-functions-support]] +=== Kotlin Support + +The Framework also has been improved to support Kotlin lambdas for functions, so now you can use a combination of the Kotlin language and Spring Integration flow definitions: + +==== +[source, kotlin] +---- +@Bean +@Transformer(inputChannel = "functionServiceChannel") +fun kotlinFunction(): (String) -> String { + return { it.toUpperCase() } +} + +@Bean +@ServiceActivator(inputChannel = "messageConsumerServiceChannel") +fun kotlinConsumer(): (Message) -> Unit { + return { print(it) } +} + +@Bean +@InboundChannelAdapter(value = "counterChannel", + poller = Poller(fixedRate = "10", maxMessagesPerPoll = "1")) +fun kotlinSupplier(): () -> String { + return { "baz" } +} +---- +==== + +[[kotlin-coroutines]] +==== Kotlin Coroutines + +Starting with version 6.0, Spring Integration provides support for https://kotlinlang.org/docs/coroutines-guide.html[Kotlin Coroutines]. +Now `suspend` functions and `kotlinx.coroutines.Deferred` & `kotlinx.coroutines.flow.Flow` return types can be used for service methods: + +==== +[source, kotlin] +---- +@ServiceActivator(inputChannel = "suspendServiceChannel", outputChannel = "resultChannel") +suspend fun suspendServiceFunction(payload: String) = payload.uppercase() + +@ServiceActivator(inputChannel = "flowServiceChannel", outputChannel = "resultChannel", async = "true") +fun flowServiceFunction(payload: String) = + flow { + for (i in 1..3) { + emit("$payload #$i") + } + } +---- +==== + +The framework treats them as Reactive Streams interactions and uses `ReactiveAdapterRegistry` to convert to respective `Mono` and `Flux` reactor types. +Such a function reply is processed then in the reply channel, if it is a `ReactiveStreamsSubscribableChannel`, or as a result of `CompletableFuture` in the respective callback. + +NOTE: The functions with `Flow` result are not `async` by default on the `@ServiceActivator`, so `Flow` instance is produced as a reply message payload. +It is the target application's responsibility to process this object as a coroutine or convert it to `Flux`, respectively. + +The `@MessagingGateway` interface methods also can be marked with a `suspend` modifier when declared in Kotlin. +The framework utilizes a `Mono` internally to perform request-reply using the downstream flow. +Such a `Mono` result is processed by the `MonoKt.awaitSingleOrNull()` API internally to fulfil a `kotlin.coroutines.Continuation` argument fo the called `suspend` function of the gateway: + +==== +[source, kotlin] +---- +@MessagingGateway(defaultRequestChannel = "suspendRequestChannel") +interface SuspendFunGateway { + + suspend fun suspendGateway(payload: String): String + +} +---- +==== + +This method has to be called as a coroutine according to Kotlin language requirements: + +==== +[source, kotlin] +---- +@Autowired +private lateinit var suspendFunGateway: SuspendFunGateway + +fun someServiceMethod() { + runBlocking { + val reply = suspendFunGateway.suspendGateway("test suspend gateway") + } +} +---- +==== diff --git a/src/reference/asciidoc/messaging-endpoints.adoc b/src/reference/asciidoc/messaging-endpoints.adoc index 374b7b37a38..5c9d7e77d0b 100644 --- a/src/reference/asciidoc/messaging-endpoints.adoc +++ b/src/reference/asciidoc/messaging-endpoints.adoc @@ -19,4 +19,6 @@ include::./handler-advice.adoc[] include::./logging-adapter.adoc[] include::./functions-support.adoc[] + +include::./kotlin-functions.adoc[] // BE SURE TO PRECEDE ALL include:: with a blank line - see https://asciidoctor.org/docs/user-manual/#include-partitioning diff --git a/src/reference/asciidoc/reactive-streams.adoc b/src/reference/asciidoc/reactive-streams.adoc index ca86fa3643f..6e8133f0d93 100644 --- a/src/reference/asciidoc/reactive-streams.adoc +++ b/src/reference/asciidoc/reactive-streams.adoc @@ -49,6 +49,8 @@ With a `ReactiveStreamsSubscribableChannel` for the `outputChannel`, there is no See <<./service-activator.adoc#async-service-activator,Asynchronous Service Activator>> for more information. +Also see <<./kotlin-functions.adoc#kotlin-coroutines,Kotlin Coroutines>> for more information. + === `FluxMessageChannel` and `ReactiveStreamsConsumer` The `FluxMessageChannel` is a combined implementation of `MessageChannel` and `Publisher>`. diff --git a/src/reference/asciidoc/whats-new.adoc b/src/reference/asciidoc/whats-new.adoc index 5f07220e256..873bb96c96c 100644 --- a/src/reference/asciidoc/whats-new.adoc +++ b/src/reference/asciidoc/whats-new.adoc @@ -79,6 +79,12 @@ See <<./scripting.adoc#scripting,Scripting Support>> for more information. The Apache Cassandra Spring Integration Extensions project has been migrated as the `spring-integration-cassandra` module. See <<./cassandra.adoc#cassandra,Apache Cassandra Support>> for more information. +[[x6.0-kotlin-coroutines]] +==== Kotlin Coroutines + +Kotlin Coroutines support has been introduced to the framework. + +See <<./kotlin-functions.adoc#kotlin-coroutines,Kotlin Coroutines>> for more information. [[x6.0-general]] === General Changes