diff --git a/spring-integration-core/src/main/java/org/springframework/integration/annotation/Aggregator.java b/spring-integration-core/src/main/java/org/springframework/integration/annotation/Aggregator.java index 4f21d762f7e..5fa70a2660e 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/annotation/Aggregator.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/annotation/Aggregator.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,6 +22,8 @@ import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; +import org.springframework.messaging.handler.annotation.ValueConstants; + /** * Indicates that a method is capable of aggregating messages. *

@@ -104,8 +106,15 @@ * @return the {@link Poller} options for a polled endpoint * ({@link org.springframework.integration.scheduling.PollerMetadata}). * This attribute is an {@code array} just to allow an empty default (no poller). - * Only one {@link Poller} element is allowed. + * Mutually exclusive with {@link #reactive()}. */ Poller[] poller() default { }; + /** + * @return the {@link Reactive} marker for a consumer endpoint. + * Mutually exclusive with {@link #poller()}. + * @since 5.5 + */ + Reactive reactive() default @Reactive(ValueConstants.DEFAULT_NONE); + } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/annotation/BridgeFrom.java b/spring-integration-core/src/main/java/org/springframework/integration/annotation/BridgeFrom.java index 430b928c413..ba72f9df353 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/annotation/BridgeFrom.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/annotation/BridgeFrom.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2019 the original author or authors. + * Copyright 2014-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,6 +22,8 @@ import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; +import org.springframework.messaging.handler.annotation.ValueConstants; + /** * Messaging Annotation to mark a {@link org.springframework.context.annotation.Bean} * method for a {@link org.springframework.messaging.MessageChannel} to produce a @@ -69,8 +71,15 @@ * @return the {@link Poller} options for a polled endpoint * ({@link org.springframework.integration.scheduling.PollerMetadata}). * This attribute is an {@code array} just to allow an empty default (no poller). - * Only one {@link Poller} element is allowed. + * Mutually exclusive with {@link #reactive()}. */ Poller[] poller() default { }; + /** + * @return the {@link Reactive} marker for a consumer endpoint. + * Mutually exclusive with {@link #poller()}. + * @since 5.5 + */ + Reactive reactive() default @Reactive(ValueConstants.DEFAULT_NONE); + } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/annotation/BridgeTo.java b/spring-integration-core/src/main/java/org/springframework/integration/annotation/BridgeTo.java index 507836af05e..b1d9687059c 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/annotation/BridgeTo.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/annotation/BridgeTo.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2019 the original author or authors. + * Copyright 2014-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,6 +22,8 @@ import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; +import org.springframework.messaging.handler.annotation.ValueConstants; + /** * Messaging Annotation to mark a {@link org.springframework.context.annotation.Bean} * method for a {@link org.springframework.messaging.MessageChannel} to produce a @@ -50,8 +52,8 @@ public @interface BridgeTo { /** - * @return the outbound channel name to send the message to - * {@link org.springframework.integration.handler.BridgeHandler}. + * @return the outbound channel name to send the message to for the + * {@link org.springframework.integration.handler.BridgeHandler} reply. * Optional: when omitted the message is sent to the {@code reply-channel} * in its headers (if present - an exception is thrown otherwise). */ @@ -73,11 +75,18 @@ String phase() default ""; /** - * @return the {@link org.springframework.integration.annotation.Poller} options for a polled endpoint + * @return the {@link Poller} options for a polled endpoint * ({@link org.springframework.integration.scheduling.PollerMetadata}). * This attribute is an {@code array} just to allow an empty default (no poller). - * Only one {@link org.springframework.integration.annotation.Poller} element is allowed. + * Mutually exclusive with {@link #reactive()}. */ Poller[] poller() default { }; + /** + * @return the {@link Reactive} marker for a consumer endpoint. + * Mutually exclusive with {@link #poller()}. + * @since 5.5 + */ + Reactive reactive() default @Reactive(ValueConstants.DEFAULT_NONE); + } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/annotation/Filter.java b/spring-integration-core/src/main/java/org/springframework/integration/annotation/Filter.java index 5501ead166f..de1e0c0e81b 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/annotation/Filter.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/annotation/Filter.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,6 +22,8 @@ import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; +import org.springframework.messaging.handler.annotation.ValueConstants; + /** * Indicates that a method is capable of playing the role of a Message Filter. *

@@ -38,6 +40,7 @@ * @author Mark Fisher * @author Gary Russell * @author Artem Bilan + * * @since 2.0 */ @Target({ ElementType.METHOD, ElementType.ANNOTATION_TYPE }) @@ -126,8 +129,15 @@ * @return the {@link Poller} options for a polled endpoint * ({@link org.springframework.integration.scheduling.PollerMetadata}). * This attribute is an {@code array} just to allow an empty default (no poller). - * Only one {@link Poller} element is allowed. + * Mutually exclusive with {@link #reactive()}. */ Poller[] poller() default { }; + /** + * @return the {@link Reactive} marker for a consumer endpoint. + * Mutually exclusive with {@link #poller()}. + * @since 5.5 + */ + Reactive reactive() default @Reactive(ValueConstants.DEFAULT_NONE); + } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/annotation/Reactive.java b/spring-integration-core/src/main/java/org/springframework/integration/annotation/Reactive.java new file mode 100644 index 00000000000..b2d0e907cd2 --- /dev/null +++ b/spring-integration-core/src/main/java/org/springframework/integration/annotation/Reactive.java @@ -0,0 +1,42 @@ +/* + * Copyright 2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.annotation; + +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Provides reactive configuration options for the consumer endpoint making + * any input channel as a reactive stream source of data. + * + * @author Artem Bilan + * + * @since 5.5 + */ +@Target({}) +@Retention(RetentionPolicy.RUNTIME) +public @interface Reactive { + + /** + * @return the function bean name to be used in the + * {@link reactor.core.publisher.Flux#transform} on the input channel + * {@link reactor.core.publisher.Flux}. + */ + String value() default ""; + +} diff --git a/spring-integration-core/src/main/java/org/springframework/integration/annotation/Router.java b/spring-integration-core/src/main/java/org/springframework/integration/annotation/Router.java index 13c84a8cec1..1baf4025740 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/annotation/Router.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/annotation/Router.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,6 +22,8 @@ import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; +import org.springframework.messaging.handler.annotation.ValueConstants; + /** * Indicates that a method is capable of resolving to a channel or channel name * based on a message, message header(s), or both. @@ -150,8 +152,15 @@ * @return the {@link Poller} options for a polled endpoint * ({@link org.springframework.integration.scheduling.PollerMetadata}). * This attribute is an {@code array} just to allow an empty default (no poller). - * Only one {@link Poller} element is allowed. + * Mutually exclusive with {@link #reactive()}. */ Poller[] poller() default { }; + /** + * @return the {@link Reactive} marker for a consumer endpoint. + * Mutually exclusive with {@link #poller()}. + * @since 5.5 + */ + Reactive reactive() default @Reactive(ValueConstants.DEFAULT_NONE); + } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/annotation/ServiceActivator.java b/spring-integration-core/src/main/java/org/springframework/integration/annotation/ServiceActivator.java index 9b577e61851..86be941aab7 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/annotation/ServiceActivator.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/annotation/ServiceActivator.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,6 +22,8 @@ import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; +import org.springframework.messaging.handler.annotation.ValueConstants; + /** * Indicates that a method is capable of handling a message or message payload. *

@@ -121,7 +123,15 @@ * ({@link org.springframework.integration.scheduling.PollerMetadata}). * This attribute is an {@code array} just to allow an empty default (no poller). * Only one {@link Poller} element is allowed. + * Mutually exclusive with {@link #reactive()}. */ Poller[] poller() default { }; + /** + * @return the {@link Reactive} marker for a consumer endpoint. + * Mutually exclusive with {@link #poller()}. + * @since 5.5 + */ + Reactive reactive() default @Reactive(ValueConstants.DEFAULT_NONE); + } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/annotation/Splitter.java b/spring-integration-core/src/main/java/org/springframework/integration/annotation/Splitter.java index ad7b69fabcd..78cfcbf7f81 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/annotation/Splitter.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/annotation/Splitter.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,6 +22,8 @@ import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; +import org.springframework.messaging.handler.annotation.ValueConstants; + /** * Indicates that a method is capable of splitting a single message or message * payload to produce multiple messages or payloads. @@ -115,8 +117,15 @@ * @return the {@link Poller} options for a polled endpoint * ({@link org.springframework.integration.scheduling.PollerMetadata}). * This attribute is an {@code array} just to allow an empty default (no poller). - * Only one {@link Poller} element is allowed. + * Mutually exclusive with {@link #reactive()}. */ Poller[] poller() default { }; + /** + * @return the {@link Reactive} marker for a consumer endpoint. + * Mutually exclusive with {@link #poller()}. + * @since 5.5 + */ + Reactive reactive() default @Reactive(ValueConstants.DEFAULT_NONE); + } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/annotation/Transformer.java b/spring-integration-core/src/main/java/org/springframework/integration/annotation/Transformer.java index e31b02e6976..a93595ed0a4 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/annotation/Transformer.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/annotation/Transformer.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,6 +22,8 @@ import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; +import org.springframework.messaging.handler.annotation.ValueConstants; + /** * Indicates that a method is capable of transforming a message, message header, * or message payload. @@ -91,8 +93,15 @@ * @return the {@link Poller} options for a polled endpoint * ({@link org.springframework.integration.scheduling.PollerMetadata}). * This attribute is an {@code array} just to allow an empty default (no poller). - * Only one {@link Poller} element is allowed. + * Mutually exclusive with {@link #reactive()}. */ Poller[] poller() default { }; + /** + * @return the {@link Reactive} marker for a consumer endpoint. + * Mutually exclusive with {@link #poller()}. + * @since 5.5 + */ + Reactive reactive() default @Reactive(ValueConstants.DEFAULT_NONE); + } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/config/ConsumerEndpointFactoryBean.java b/spring-integration-core/src/main/java/org/springframework/integration/config/ConsumerEndpointFactoryBean.java index acc1a70a819..3ae0d8692d4 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/config/ConsumerEndpointFactoryBean.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/config/ConsumerEndpointFactoryBean.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,10 +17,11 @@ package org.springframework.integration.config; import java.util.List; +import java.util.function.Function; import org.aopalliance.aop.Advice; -import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.reactivestreams.Publisher; import org.springframework.aop.framework.Advised; import org.springframework.aop.framework.ProxyFactory; @@ -35,6 +36,7 @@ import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.SmartLifecycle; +import org.springframework.core.log.LogAccessor; import org.springframework.integration.channel.FixedSubscriberChannel; import org.springframework.integration.context.IntegrationObjectSupport; import org.springframework.integration.endpoint.AbstractEndpoint; @@ -46,6 +48,8 @@ import org.springframework.integration.handler.advice.HandleMessageAdvice; import org.springframework.integration.scheduling.PollerMetadata; import org.springframework.integration.support.channel.ChannelResolverUtils; +import org.springframework.lang.Nullable; +import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.PollableChannel; @@ -57,6 +61,8 @@ import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; +import reactor.core.publisher.Flux; + /** * The {@link FactoryBean} implementation for {@link AbstractEndpoint} population. @@ -76,7 +82,7 @@ public class ConsumerEndpointFactoryBean implements FactoryBean, BeanFactoryAware, BeanNameAware, BeanClassLoaderAware, InitializingBean, SmartLifecycle, DisposableBean { - private static final Log LOGGER = LogFactory.getLog(ConsumerEndpointFactoryBean.class); + private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(ConsumerEndpointFactoryBean.class)); private final Object initializationMonitor = new Object(); @@ -90,6 +96,9 @@ public class ConsumerEndpointFactoryBean private PollerMetadata pollerMetadata; + @Nullable + private Function>, ? extends Publisher>> reactiveCustomizer; + private Boolean autoStartup; private int phase = 0; @@ -140,6 +149,12 @@ public void setPollerMetadata(PollerMetadata pollerMetadata) { this.pollerMetadata = pollerMetadata; } + public void setReactiveCustomizer( + @Nullable Function>, ? extends Publisher>> reactiveCustomizer) { + + this.reactiveCustomizer = reactiveCustomizer; + } + /** * Specify the {@link DestinationResolver} strategy to use. * The default is a BeanFactoryChannelResolver. @@ -192,7 +207,7 @@ public void setTaskScheduler(TaskScheduler taskScheduler) { @Override public void afterPropertiesSet() { if (this.beanName == null) { - LOGGER.error("The MessageHandler [" + this.handler + "] will be created without a 'componentName'. " + + LOGGER.error(() -> "The MessageHandler [" + this.handler + "] will be created without a 'componentName'. " + "Consider specifying the 'beanName' property on this ConsumerEndpointFactoryBean."); } else { @@ -228,11 +243,9 @@ private void populateComponentNameIfAny() { } } } - catch (Exception e) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Could not set component name for handler " - + this.handler + " for " + this.beanName + " :" + e.getMessage()); - } + catch (Exception ex) { + LOGGER.debug(() -> "Could not set component name for handler " + + this.handler + " for " + this.beanName + " :" + ex.getMessage()); } } @@ -297,20 +310,27 @@ private void initializeEndpoint() { channel = this.inputChannel; } Assert.state(channel != null, "one of inputChannelName or inputChannel is required"); - if (channel instanceof SubscribableChannel) { + + Assert.state(this.reactiveCustomizer == null || this.pollerMetadata == null, + "The 'pollerMetadata' and 'reactiveCustomizer' are mutually exclusive."); + + if (channel instanceof Publisher || + this.handler instanceof ReactiveMessageHandlerAdapter || + this.reactiveCustomizer != null) { + + reactiveStreamsConsumer(channel); + } + else if (channel instanceof SubscribableChannel) { eventDrivenConsumer(channel); } else if (channel instanceof PollableChannel) { pollingConsumer(channel); } else { - if (this.handler instanceof ReactiveMessageHandlerAdapter) { - this.endpoint = new ReactiveStreamsConsumer(channel, - ((ReactiveMessageHandlerAdapter) this.handler).getDelegate()); - } - else { - this.endpoint = new ReactiveStreamsConsumer(channel, this.handler); - } + throw new IllegalArgumentException("Unsupported 'inputChannel' type: '" + + channel.getClass().getName() + "'. " + + "Must be one of 'SubscribableChannel', 'PollableChannel' " + + "or 'ReactiveStreamsSubscribableChannel'"); } this.endpoint.setBeanName(this.beanName); this.endpoint.setBeanFactory(this.beanFactory); @@ -324,14 +344,27 @@ else if (channel instanceof PollableChannel) { } } + private void reactiveStreamsConsumer(MessageChannel channel) { + ReactiveStreamsConsumer reactiveStreamsConsumer; + if (this.handler instanceof ReactiveMessageHandlerAdapter) { + reactiveStreamsConsumer = new ReactiveStreamsConsumer(channel, + ((ReactiveMessageHandlerAdapter) this.handler).getDelegate()); + } + else { + reactiveStreamsConsumer = new ReactiveStreamsConsumer(channel, this.handler); + } + + reactiveStreamsConsumer.setReactiveCustomizer(this.reactiveCustomizer); + + this.endpoint = reactiveStreamsConsumer; + } + private void eventDrivenConsumer(MessageChannel channel) { Assert.isNull(this.pollerMetadata, () -> "A poller should not be specified for endpoint '" + this.beanName + "', since '" + channel + "' is a SubscribableChannel (not pollable)."); this.endpoint = new EventDrivenConsumer((SubscribableChannel) channel, this.handler); - if (LOGGER.isWarnEnabled() - && Boolean.FALSE.equals(this.autoStartup) - && channel instanceof FixedSubscriberChannel) { + if (Boolean.FALSE.equals(this.autoStartup) && channel instanceof FixedSubscriberChannel) { LOGGER.warn("'autoStartup=\"false\"' has no effect when using a FixedSubscriberChannel"); } } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/config/annotation/AbstractMethodAnnotationPostProcessor.java b/spring-integration-core/src/main/java/org/springframework/integration/config/annotation/AbstractMethodAnnotationPostProcessor.java index 9e809a7141c..88a9940f840 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/config/annotation/AbstractMethodAnnotationPostProcessor.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/config/annotation/AbstractMethodAnnotationPostProcessor.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -51,6 +51,7 @@ import org.springframework.core.task.TaskExecutor; import org.springframework.integration.annotation.IdempotentReceiver; import org.springframework.integration.annotation.Poller; +import org.springframework.integration.annotation.Reactive; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.channel.MessagePublishingErrorHandler; import org.springframework.integration.config.IntegrationConfigUtils; @@ -75,12 +76,14 @@ import org.springframework.integration.util.ClassUtils; import org.springframework.integration.util.MessagingAnnotationUtils; import org.springframework.lang.Nullable; +import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.PollableChannel; import org.springframework.messaging.SubscribableChannel; import org.springframework.messaging.core.DestinationResolutionException; import org.springframework.messaging.core.DestinationResolver; +import org.springframework.messaging.handler.annotation.ValueConstants; import org.springframework.scheduling.Trigger; import org.springframework.scheduling.support.CronTrigger; import org.springframework.scheduling.support.PeriodicTrigger; @@ -89,6 +92,8 @@ import org.springframework.util.ObjectUtils; import org.springframework.util.StringUtils; +import reactor.core.publisher.Flux; + /** * Base class for Method-level annotation post-processors. * @@ -291,7 +296,7 @@ public boolean shouldCreateEndpoint(Method method, List annotations) boolean createEndpoint = StringUtils.hasText(inputChannel); if (!createEndpoint && beanAnnotationAware()) { boolean isBean = AnnotatedElementUtils.isAnnotated(method, Bean.class.getName()); - Assert.isTrue(!isBean, "A channel name in '" + getInputChannelAttribute() + "' is required when " + + Assert.isTrue(!isBean, () -> "A channel name in '" + getInputChannelAttribute() + "' is required when " + this.annotationType + " is used on '@Bean' methods."); } return createEndpoint; @@ -362,7 +367,7 @@ protected AbstractEndpoint createEndpoint(MessageHandler handler, @SuppressWarni throw e; } } - Assert.notNull(inputChannel, "failed to resolve inputChannel '" + inputChannelName + "'"); + Assert.notNull(inputChannel, () -> "failed to resolve inputChannel '" + inputChannelName + "'"); endpoint = doCreateEndpoint(handler, inputChannel, annotations); } @@ -371,35 +376,66 @@ protected AbstractEndpoint createEndpoint(MessageHandler handler, @SuppressWarni protected AbstractEndpoint doCreateEndpoint(MessageHandler handler, MessageChannel inputChannel, List annotations) { - AbstractEndpoint endpoint; - if (inputChannel instanceof PollableChannel) { - PollingConsumer pollingConsumer = new PollingConsumer((PollableChannel) inputChannel, handler); - configurePollingEndpoint(pollingConsumer, annotations); - endpoint = pollingConsumer; + + Poller[] pollers = MessagingAnnotationUtils.resolveAttribute(annotations, "poller", Poller[].class); + Reactive reactive = MessagingAnnotationUtils.resolveAttribute(annotations, "reactive", Reactive.class); + boolean reactiveProvided = reactive != null && !ValueConstants.DEFAULT_NONE.equals(reactive.value()); + + Assert.state(!reactiveProvided || ObjectUtils.isEmpty(pollers), + "The 'poller' and 'reactive' are mutually exclusive."); + + if (inputChannel instanceof Publisher || handler instanceof ReactiveMessageHandlerAdapter || reactiveProvided) { + return reactiveStreamsConsumer(inputChannel, handler, reactiveProvided ? reactive : null); + } + else if (inputChannel instanceof SubscribableChannel) { + Assert.state(ObjectUtils.isEmpty(pollers), () -> + "A '@Poller' should not be specified for Annotation-based " + + "endpoint, since '" + inputChannel + "' is a SubscribableChannel (not pollable)."); + return new EventDrivenConsumer((SubscribableChannel) inputChannel, handler); + } + else if (inputChannel instanceof PollableChannel) { + return pollingConsumer(inputChannel, handler, pollers); } else { - Poller[] pollers = MessagingAnnotationUtils.resolveAttribute(annotations, "poller", Poller[].class); - Assert.state(ObjectUtils.isEmpty(pollers), "A '@Poller' should not be specified for Annotation-based " + - "endpoint, since '" + inputChannel + "' is a SubscribableChannel (not pollable)."); - if (inputChannel instanceof Publisher) { - if (handler instanceof ReactiveMessageHandlerAdapter) { - endpoint = new ReactiveStreamsConsumer(inputChannel, - ((ReactiveMessageHandlerAdapter) handler).getDelegate()); - } - else { - endpoint = new ReactiveStreamsConsumer(inputChannel, handler); - } - } - else { - endpoint = new EventDrivenConsumer((SubscribableChannel) inputChannel, handler); + throw new IllegalArgumentException("Unsupported 'inputChannel' type: '" + + inputChannel.getClass().getName() + "'. " + + "Must be one of 'SubscribableChannel', 'PollableChannel' or 'ReactiveStreamsSubscribableChannel'"); + } + } + + private ReactiveStreamsConsumer reactiveStreamsConsumer(MessageChannel channel, MessageHandler handler, + Reactive reactive) { + + ReactiveStreamsConsumer reactiveStreamsConsumer; + if (handler instanceof ReactiveMessageHandlerAdapter) { + reactiveStreamsConsumer = new ReactiveStreamsConsumer(channel, + ((ReactiveMessageHandlerAdapter) handler).getDelegate()); + } + else { + reactiveStreamsConsumer = new ReactiveStreamsConsumer(channel, handler); + } + + if (reactive != null) { + String functionBeanName = reactive.value(); + if (StringUtils.hasText(functionBeanName)) { + @SuppressWarnings("unchecked") + Function>, ? extends Publisher>> reactiveCustomizer = + this.beanFactory.getBean(functionBeanName, Function.class); + reactiveStreamsConsumer.setReactiveCustomizer(reactiveCustomizer); } } - return endpoint; + + return reactiveStreamsConsumer; } - protected void configurePollingEndpoint(AbstractPollingEndpoint pollingEndpoint, List annotations) { + private PollingConsumer pollingConsumer(MessageChannel inputChannel, MessageHandler handler, Poller[] pollers) { + PollingConsumer pollingConsumer = new PollingConsumer((PollableChannel) inputChannel, handler); + configurePollingEndpoint(pollingConsumer, pollers); + return pollingConsumer; + } + + protected void configurePollingEndpoint(AbstractPollingEndpoint pollingEndpoint, Poller[] pollers) { PollerMetadata pollerMetadata; - Poller[] pollers = MessagingAnnotationUtils.resolveAttribute(annotations, "poller", Poller[].class); if (!ObjectUtils.isEmpty(pollers)) { Assert.state(pollers.length == 1, "The 'poller' for an Annotation-based endpoint can have only one '@Poller'."); @@ -516,7 +552,7 @@ protected String generateHandlerBeanName(String originalBeanName, Method method) name = baseName; int count = 1; while (this.beanFactory.containsBean(name)) { - name = baseName + "#" + (++count); + name = baseName + '#' + (++count); } } return name + IntegrationConfigUtils.HANDLER_ALIAS_SUFFIX; diff --git a/spring-integration-core/src/main/java/org/springframework/integration/config/annotation/InboundChannelAdapterAnnotationPostProcessor.java b/spring-integration-core/src/main/java/org/springframework/integration/config/annotation/InboundChannelAdapterAnnotationPostProcessor.java index 831d59ee093..31bcc5671c5 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/config/annotation/InboundChannelAdapterAnnotationPostProcessor.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/config/annotation/InboundChannelAdapterAnnotationPostProcessor.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2019 the original author or authors. + * Copyright 2014-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,6 +27,7 @@ import org.springframework.core.annotation.AnnotatedElementUtils; import org.springframework.core.annotation.AnnotationUtils; import org.springframework.integration.annotation.InboundChannelAdapter; +import org.springframework.integration.annotation.Poller; import org.springframework.integration.config.IntegrationConfigUtils; import org.springframework.integration.core.MessageSource; import org.springframework.integration.endpoint.MethodInvokingMessageSource; @@ -64,7 +65,7 @@ public Object postProcess(Object bean, String beanName, Method method, List messageSource = null; + MessageSource messageSource; try { messageSource = createMessageSource(bean, beanName, method); } @@ -80,7 +81,8 @@ public Object postProcess(Object bean, String beanName, Method method, List>, ? extends Publisher>> reactiveCustomizer) { + this.endpointFactoryBean.setReactiveCustomizer(reactiveCustomizer); + return _this(); + } + @Override public S role(String role) { this.endpointFactoryBean.setRole(role); diff --git a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/ReactiveStreamsConsumer.java b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/ReactiveStreamsConsumer.java index 0b7901c25d8..c509ca3f596 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/ReactiveStreamsConsumer.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/ReactiveStreamsConsumer.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 the original author or authors. + * Copyright 2016-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +17,7 @@ package org.springframework.integration.endpoint; import java.util.function.Consumer; +import java.util.function.Function; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; @@ -68,6 +69,9 @@ public class ReactiveStreamsConsumer extends AbstractEndpoint implements Integra @Nullable private final Lifecycle lifecycleDelegate; + @Nullable + private Function>, ? extends Publisher>> reactiveCustomizer; + private ErrorHandler errorHandler; private volatile Disposable subscription; @@ -126,6 +130,12 @@ public void setErrorHandler(ErrorHandler errorHandler) { this.errorHandler = errorHandler; } + public void setReactiveCustomizer( + @Nullable Function>, ? extends Publisher>> reactiveCustomizer) { + + this.reactiveCustomizer = reactiveCustomizer; + } + @Override public MessageChannel getInputChannel() { return this.inputChannel; @@ -163,16 +173,21 @@ protected void doStart() { this.lifecycleDelegate.start(); } + Flux> fluxFromChannel = Flux.from(this.publisher); + if (this.reactiveCustomizer != null) { + fluxFromChannel = fluxFromChannel.transform(this.reactiveCustomizer); + } + if (this.reactiveMessageHandler != null) { this.subscription = - Flux.from(this.publisher) + fluxFromChannel .flatMap(this.reactiveMessageHandler::handleMessage) .onErrorContinue((ex, data) -> this.errorHandler.handleError(ex)) .subscribe(); } else if (this.subscriber != null) { this.subscription = - Flux.from(this.publisher) + fluxFromChannel .subscribeWith(new SubscriberDecorator(this.subscriber, this.errorHandler)); } } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/util/MessagingAnnotationUtils.java b/spring-integration-core/src/main/java/org/springframework/integration/util/MessagingAnnotationUtils.java index e9c639a5335..b5190da2c71 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/util/MessagingAnnotationUtils.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/util/MessagingAnnotationUtils.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2019 the original author or authors. + * Copyright 2014-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -61,11 +61,9 @@ public final class MessagingAnnotationUtils { @SuppressWarnings("unchecked") public static T resolveAttribute(List annotations, String name, Class requiredType) { for (Annotation annotation : annotations) { - if (annotation != null) { - Object value = AnnotationUtils.getValue(annotation, name); - if (value != null && value.getClass() == requiredType && hasValue(value)) { - return (T) value; - } + Object value = AnnotationUtils.getValue(annotation, name); + if (requiredType.isInstance(value) && hasValue(value)) { + return (T) value; } } return null; diff --git a/spring-integration-core/src/test/java/org/springframework/integration/channel/reactive/ReactiveStreamsConsumerTests.java b/spring-integration-core/src/test/java/org/springframework/integration/channel/reactive/ReactiveStreamsConsumerTests.java index 43276627363..ff2782a632d 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/channel/reactive/ReactiveStreamsConsumerTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/channel/reactive/ReactiveStreamsConsumerTests.java @@ -33,6 +33,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -336,4 +337,36 @@ public void testReactiveStreamsConsumerFluxMessageChannelReactiveMessageHandler( testChannel.destroy(); } + @Test + public void testReactiveCustomizer() throws Exception { + DirectChannel testChannel = new DirectChannel(); + + AtomicReference> spied = new AtomicReference<>(); + AtomicReference> result = new AtomicReference<>(); + CountDownLatch stopLatch = new CountDownLatch(1); + + MessageHandler messageHandler = m -> { + result.set(m); + stopLatch.countDown(); + }; + + ConsumerEndpointFactoryBean endpointFactoryBean = new ConsumerEndpointFactoryBean(); + endpointFactoryBean.setBeanFactory(mock(ConfigurableBeanFactory.class)); + endpointFactoryBean.setInputChannel(testChannel); + endpointFactoryBean.setHandler(messageHandler); + endpointFactoryBean.setBeanName("reactiveConsumer"); + endpointFactoryBean.setReactiveCustomizer(flux -> flux.doOnNext(spied::set)); + endpointFactoryBean.afterPropertiesSet(); + endpointFactoryBean.start(); + + Message testMessage = new GenericMessage<>("test"); + testChannel.send(testMessage); + + assertThat(stopLatch.await(10, TimeUnit.SECONDS)).isTrue(); + endpointFactoryBean.stop(); + + assertThat(result.get()).isSameAs(testMessage); + assertThat(spied.get()).isSameAs(testMessage); + } + } diff --git a/spring-integration-core/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationsWithBeanAnnotationTests.java b/spring-integration-core/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationsWithBeanAnnotationTests.java index 9f08fb25582..d77a4f2eb51 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationsWithBeanAnnotationTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationsWithBeanAnnotationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2020 the original author or authors. + * Copyright 2014-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,6 +21,8 @@ import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.function.Function; @@ -48,6 +50,7 @@ import org.springframework.integration.annotation.Filter; import org.springframework.integration.annotation.InboundChannelAdapter; import org.springframework.integration.annotation.Poller; +import org.springframework.integration.annotation.Reactive; import org.springframework.integration.annotation.Router; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.annotation.Splitter; @@ -81,6 +84,7 @@ import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; import reactor.test.StepVerifier; @@ -145,10 +149,12 @@ public class MessagingAnnotationsWithBeanAnnotationTests { @Autowired private MessageChannel messageConsumerServiceChannel; + @Autowired + private CountDownLatch reactiveCustomizerLatch; + @Test - public void testMessagingAnnotationsFlow() { + public void testMessagingAnnotationsFlow() throws InterruptedException { Stream.of(this.sourcePollingChannelAdapters).forEach(AbstractEndpoint::start); - //this.sourcePollingChannelAdapter.start(); for (int i = 0; i < 10; i++) { Message receive = this.discardChannel.receive(10000); assertThat(receive).isNotNull(); @@ -164,6 +170,9 @@ public void testMessagingAnnotationsFlow() { "'messagingAnnotationsWithBeanAnnotationTests.ContextConfiguration.filter.filter.handler'"); } + + assertThat(reactiveCustomizerLatch.await(10, TimeUnit.SECONDS)).isTrue(); + for (Message message : this.collector) { assertThat(((Integer) message.getPayload()) % 2).isNotEqualTo(0); MessageHistory messageHistory = MessageHistory.read(message); @@ -336,7 +345,17 @@ public MessageChannel splitterChannel() { } @Bean - @Splitter(inputChannel = "splitterChannel") + public CountDownLatch reactiveCustomizerLatch() { + return new CountDownLatch(10); + } + + @Bean + public Function, Flux> reactiveCustomizer(CountDownLatch reactiveCustomizerLatch) { + return flux -> flux.doOnNext(data -> reactiveCustomizerLatch.countDown()); + } + + @Bean + @Splitter(inputChannel = "splitterChannel", reactive = @Reactive("reactiveCustomizer")) public MessageHandler splitter() { DefaultMessageSplitter defaultMessageSplitter = new DefaultMessageSplitter(); defaultMessageSplitter.setOutputChannelName("serviceChannel"); diff --git a/spring-integration-core/src/test/java/org/springframework/integration/configuration/EnableIntegrationTests.java b/spring-integration-core/src/test/java/org/springframework/integration/configuration/EnableIntegrationTests.java index 004127f2ea9..994cd27e1b6 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/configuration/EnableIntegrationTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/configuration/EnableIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2020 the original author or authors. + * Copyright 2014-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -80,6 +80,7 @@ import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.annotation.Poller; import org.springframework.integration.annotation.Publisher; +import org.springframework.integration.annotation.Reactive; import org.springframework.integration.annotation.Role; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.annotation.Transformer; @@ -105,6 +106,7 @@ import org.springframework.integration.endpoint.EventDrivenConsumer; import org.springframework.integration.endpoint.MethodInvokingMessageSource; import org.springframework.integration.endpoint.PollingConsumer; +import org.springframework.integration.endpoint.ReactiveStreamsConsumer; import org.springframework.integration.expression.SpelPropertyAccessorRegistrar; import org.springframework.integration.gateway.GatewayProxyFactoryBean; import org.springframework.integration.handler.ServiceActivatingHandler; @@ -270,6 +272,9 @@ public class EnableIntegrationTests { @Autowired private MessageChannel bridgeToInput; + @Autowired + private AbstractEndpoint reactiveBridge; + @Autowired private PollableChannel bridgeToOutput; @@ -655,6 +660,7 @@ public void testBridgeAnnotations() { assertThat(testMessage).isSameAs(receive); assertThat(this.metaBridgeOutput.receive(10)).isNull(); + assertThat(this.reactiveBridge).isInstanceOf(ReactiveStreamsConsumer.class); this.bridgeToInput.send(testMessage); receive = this.bridgeToOutput.receive(10_000); assertThat(receive).isNotNull(); @@ -881,7 +887,7 @@ public AtomicInteger fbInterceptorCounter() { @Bean @GlobalChannelInterceptor public FactoryBean ciFactoryBean() { - return new AbstractFactoryBean() { + return new AbstractFactoryBean<>() { @Override public Class getObjectType() { @@ -889,7 +895,7 @@ public Class getObjectType() { } @Override - protected ChannelInterceptor createInstance() throws Exception { + protected ChannelInterceptor createInstance() { return new ChannelInterceptor() { @Override @@ -933,7 +939,8 @@ public QueueChannel bridgeToOutput() { } @Bean - @BridgeTo("bridgeToOutput") + @BridgeTo(value = "bridgeToOutput", reactive = @Reactive) + @EndpointId("reactiveBridge") public MessageChannel bridgeToInput() { return new DirectChannel(); } @@ -1320,7 +1327,7 @@ public String transform(Message message) { assertThat(message.getHeaders().get("foo")).isEqualTo("FOO"); assertThat(message.getHeaders()).containsKey("calledMethod"); assertThat(message.getHeaders().get("calledMethod")).isEqualTo("echo"); - return this.handle(message.getPayload()) + Arrays.asList(new Throwable().getStackTrace()).toString(); + return handle(message.getPayload()) + Arrays.asList(new Throwable().getStackTrace()).toString(); } @Transformer(inputChannel = "gatewayChannel2") @@ -1330,7 +1337,7 @@ public String transform2(Message message) { assertThat(message.getHeaders().get("foo")).isEqualTo("FOO"); assertThat(message.getHeaders()).containsKey("calledMethod"); assertThat(message.getHeaders().get("calledMethod")).isEqualTo("echo2"); - return this.handle(message.getPayload()) + "2" + Arrays.asList(new Throwable().getStackTrace()).toString(); + return handle(message.getPayload()) + "2" + Arrays.asList(new Throwable().getStackTrace()).toString(); } @MyInboundChannelAdapter1 diff --git a/spring-integration-core/src/test/java/org/springframework/integration/dsl/reactivestreams/ReactiveStreamsTests.java b/spring-integration-core/src/test/java/org/springframework/integration/dsl/reactivestreams/ReactiveStreamsTests.java index b916d3e7cca..7fd62aa144e 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/dsl/reactivestreams/ReactiveStreamsTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/dsl/reactivestreams/ReactiveStreamsTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -46,6 +46,8 @@ import org.springframework.integration.dsl.IntegrationFlows; import org.springframework.integration.dsl.MessageChannels; import org.springframework.integration.dsl.context.IntegrationFlowContext; +import org.springframework.integration.endpoint.AbstractEndpoint; +import org.springframework.integration.endpoint.ReactiveStreamsConsumer; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.support.GenericMessage; @@ -53,6 +55,7 @@ import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; import reactor.core.publisher.Flux; +import reactor.core.scheduler.Schedulers; /** @@ -73,6 +76,9 @@ public class ReactiveStreamsTests { @Qualifier("pollableReactiveFlow") private Publisher> pollablePublisher; + @Autowired + private AbstractEndpoint reactiveTransformer; + @Autowired @Qualifier("reactiveStreamsMessageSource") private Lifecycle messageSource; @@ -109,12 +115,13 @@ void testReactiveFlow() throws Exception { this.messageSource.start(); assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); String[] strings = results.toArray(new String[0]); - assertThat(strings).isEqualTo(new String[] { "A", "B", "C", "D", "E", "F" }); + assertThat(strings).isEqualTo(new String[]{ "A", "B", "C", "D", "E", "F" }); this.messageSource.stop(); } @Test void testPollableReactiveFlow() throws Exception { + assertThat(this.reactiveTransformer).isInstanceOf(ReactiveStreamsConsumer.class); this.inputChannel.send(new GenericMessage<>("1,2,3,4,5")); CountDownLatch latch = new CountDownLatch(6); @@ -216,9 +223,7 @@ void singleChannelFlowTest() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); Flux.from(this.singleChannelFlow) .map(m -> m.getPayload().toUpperCase()) - .subscribe(p -> { - latch.countDown(); - }); + .subscribe(p -> latch.countDown()); this.singleChannel.send(new GenericMessage<>("foo")); assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); } @@ -228,9 +233,7 @@ void fixedSubscriberChannelFlowTest() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); Flux.from(this.fixedSubscriberChannelFlow) .map(m -> m.getPayload().toUpperCase()) - .subscribe(p -> { - latch.countDown(); - }); + .subscribe(p -> latch.countDown()); this.fixedSubscriberChannel.send(new GenericMessage<>("bar")); assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); } @@ -258,7 +261,8 @@ public Publisher> pollableReactiveFlow() { return IntegrationFlows .from("inputChannel") .split(s -> s.delimiters(",")) - .transform(Integer::parseInt) + .transform(Integer::parseInt, + e -> e.reactive(flux -> flux.publishOn(Schedulers.parallel())).id("reactiveTransformer")) .channel(MessageChannels.queue()) .log() .toReactivePublisher(); diff --git a/src/reference/asciidoc/configuration.adoc b/src/reference/asciidoc/configuration.adoc index 85de3e5ee8f..99620000a0c 100644 --- a/src/reference/asciidoc/configuration.adoc +++ b/src/reference/asciidoc/configuration.adoc @@ -471,6 +471,38 @@ Starting with version 4.3.3, the `@Poller` annotation has the `errorChannel` att This attribute plays the same role as `error-channel` in the `` XML component. See <<./endpoint.adoc#endpoint-namespace,Endpoint Namespace Support>> for more information. +The `poller()` attribute on the messaging annotations is mutually exclusive with the `reactive()` attribute. +See next section for more information. + +[[configuration-using-reactive-annotation]] +==== Using `@Reactive` Annotation + +The `ReactiveStreamsConsumer` has been around since version 5.0, but it was applied only when an input channel for the endpoint is a `FluxMessageChannel` (or any `org.reactivestreams.Publisher` implementation). +Starting with version 5.3, its instance is also created by the framework when the target message handler is a `ReactiveMessageHandler` independently of the input channel type. +The `@Reactive` sub-annotation (similar to mentioned above `@Poller`) has been introduced for all the messaging annotations starting with version 5.5. +It accepts an optional `Function>, ? extends Publisher>>` bean reference and, independently of the input channel type and message handler, turns the target endpoint into the `ReactiveStreamsConsumer` instance. +The function is used from the `Flux.transform()` operator to apply some customization (`publishOn()`, `doOnNext()`, `log()`, `retry()` etc.) on a reactive stream source from the input channel. + +The following example demonstrates how to change the publishing thread from the input channel independently of the final subscriber and producer to that `DirectChannel`: + +==== +[source,java] +---- +@Bean +public Function, Flux> publishOnCustomizer() { + return flux -> flux.publishOn(Schedulers.parallel()); +} + +@ServiceActivator(inputChannel = "directChannel", reactive = @Reactive("publishOnCustomizer")) +public void handleReactive(String payload) { + ... +} +---- +==== + +The `reactive()` attribute on the messaging annotations is mutually exclusive with the `poller()` attribute. +See <> and <<./reactive-streams.adoc#reactive-streams, Reactive Streams Support>> for more information. + ==== Using the `@InboundChannelAdapter` Annotation Version 4.0 introduced the `@InboundChannelAdapter` method-level annotation. diff --git a/src/reference/asciidoc/dsl.adoc b/src/reference/asciidoc/dsl.adoc index c1642a2ac4f..a8fce52b317 100644 --- a/src/reference/asciidoc/dsl.adoc +++ b/src/reference/asciidoc/dsl.adoc @@ -262,6 +262,31 @@ See https://docs.spring.io/spring-integration/api/org/springframework/integratio IMPORTANT: If you use the DSL to construct a `PollerSpec` as a `@Bean`, do not call the `get()` method in the bean definition. The `PollerSpec` is a `FactoryBean` that generates the `PollerMetadata` object from the specification and initializes all of its properties. +[[java-dsl-reactive]] +=== The `reactive()` Endpoint + +Starting with version 5.5, the `ConsumerEndpointSpec` provides a `reactive()` configuration property with an optional customizer `Function>, ? extends Publisher>>`. +This option configures the target endpoint as a `ReactiveStreamsConsumer` instance, independently of the input channel type, which is converted to a `Flux` via `IntegrationReactiveUtils.messageChannelToFlux()`. +The provided function is used from the `Flux.transform()` operator to customize (`publishOn()`, `log()`, `doOnNext()` etc.) a reactive stream source from the input channel. + +The following example demonstrates how to change the publishing thread from the input channel independently of the final subscriber and producer to that `DirectChannel`: + +==== +[source,java] +---- +@Bean +public IntegrationFlow reactiveEndpointFlow() { + return IntegrationFlows + .from("inputChannel") + .transform(Integer::parseInt, + e -> e.reactive(flux -> flux.publishOn(Schedulers.parallel()))) + .get(); +} +---- +==== + +See <<./reactive-streams.adoc#reactive-streams, Reactive Streams Support>> for more information. + [[java-dsl-endpoints]] === DSL and Endpoint Configuration diff --git a/src/reference/asciidoc/reactive-streams.adoc b/src/reference/asciidoc/reactive-streams.adoc index 7d926012607..66602935918 100644 --- a/src/reference/asciidoc/reactive-streams.adoc +++ b/src/reference/asciidoc/reactive-streams.adoc @@ -62,10 +62,14 @@ A consumer for the `FluxMessageChannel` must be a `org.reactivestreams.Subscribe Fortunately, all of the `MessageHandler` implementations in Spring Integration also implement a `CoreSubscriber` from project Reactor. And thanks to a `ReactiveStreamsConsumer` implementation in between, the whole integration flow configuration is left transparent for target developers. In this case, the flow behavior is changed from an imperative push model to a reactive pull model. -A `ReactiveStreamsConsumer` can also be used to turn any `MessageChannel` into a reactive source using `MessageChannelReactiveUtils`, making an integration flow partially reactive. +A `ReactiveStreamsConsumer` can also be used to turn any `MessageChannel` into a reactive source using `IntegrationReactiveUtils`, making an integration flow partially reactive. See <<./channel.adoc#flux-message-channel,`FluxMessageChannel`>> for more information. +Starting with version 5.5, the `ConsumerEndpointSpec` introduces a `reactive()` option to make the endpoint in the flow as a `ReactiveStreamsConsumer` independently of the input channel. +The optional `Function>, ? extends Publisher>>` can be provided to customise a source `Flux` from the input channel via `Flux.transform()` operation, e.g. with the `publishOn()`, `doOnNext()`, `retry()` etc. +This functionality is represented as a `@Reactive` sub-annotation for all the messaging annotation (`@ServiceActivator`, `@Splitter` etc.) via their `reactive()` attribute. + === Source Polling Channel Adapter Usually, the `SourcePollingChannelAdapter` relies on the task which is initiated by the `TaskScheduler`. diff --git a/src/reference/asciidoc/whats-new.adoc b/src/reference/asciidoc/whats-new.adoc index d8e15648067..6e4d32b3893 100644 --- a/src/reference/asciidoc/whats-new.adoc +++ b/src/reference/asciidoc/whats-new.adoc @@ -30,6 +30,10 @@ An `AbstractPollingEndpoint` (source polling channel adapter and polling consume It can be changed to different value later on, e.g. via a Control Bus. See <<./endpoint.adoc#endpoint-pollingconsumer,Polling Consumer>> for more information. +The `ConsumerEndpointFactoryBean` now accept a `reactiveCustomizer` `Function` to any input channel as reactive stream source and use a `ReactiveStreamsConsumer` underneath. +This is covered as a `ConsumerEndpointSpec.reactive()` option in Java DSL and as a `@Reactive` nested annotation for the messaging annotations. +See <<./reactive-streams.adoc#reactive-streams,Reactive Streams Support>> for more information. + [[x5.5-amqp]] ==== AMQP Changes