diff --git a/spring-integration-core/src/main/java/org/springframework/integration/annotation/Aggregator.java b/spring-integration-core/src/main/java/org/springframework/integration/annotation/Aggregator.java
index 4f21d762f7e..5fa70a2660e 100644
--- a/spring-integration-core/src/main/java/org/springframework/integration/annotation/Aggregator.java
+++ b/spring-integration-core/src/main/java/org/springframework/integration/annotation/Aggregator.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2019 the original author or authors.
+ * Copyright 2002-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -22,6 +22,8 @@
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
+import org.springframework.messaging.handler.annotation.ValueConstants;
+
/**
* Indicates that a method is capable of aggregating messages.
*
@@ -104,8 +106,15 @@
* @return the {@link Poller} options for a polled endpoint
* ({@link org.springframework.integration.scheduling.PollerMetadata}).
* This attribute is an {@code array} just to allow an empty default (no poller).
- * Only one {@link Poller} element is allowed.
+ * Mutually exclusive with {@link #reactive()}.
*/
Poller[] poller() default { };
+ /**
+ * @return the {@link Reactive} marker for a consumer endpoint.
+ * Mutually exclusive with {@link #poller()}.
+ * @since 5.5
+ */
+ Reactive reactive() default @Reactive(ValueConstants.DEFAULT_NONE);
+
}
diff --git a/spring-integration-core/src/main/java/org/springframework/integration/annotation/BridgeFrom.java b/spring-integration-core/src/main/java/org/springframework/integration/annotation/BridgeFrom.java
index 430b928c413..ba72f9df353 100644
--- a/spring-integration-core/src/main/java/org/springframework/integration/annotation/BridgeFrom.java
+++ b/spring-integration-core/src/main/java/org/springframework/integration/annotation/BridgeFrom.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2014-2019 the original author or authors.
+ * Copyright 2014-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -22,6 +22,8 @@
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
+import org.springframework.messaging.handler.annotation.ValueConstants;
+
/**
* Messaging Annotation to mark a {@link org.springframework.context.annotation.Bean}
* method for a {@link org.springframework.messaging.MessageChannel} to produce a
@@ -69,8 +71,15 @@
* @return the {@link Poller} options for a polled endpoint
* ({@link org.springframework.integration.scheduling.PollerMetadata}).
* This attribute is an {@code array} just to allow an empty default (no poller).
- * Only one {@link Poller} element is allowed.
+ * Mutually exclusive with {@link #reactive()}.
*/
Poller[] poller() default { };
+ /**
+ * @return the {@link Reactive} marker for a consumer endpoint.
+ * Mutually exclusive with {@link #poller()}.
+ * @since 5.5
+ */
+ Reactive reactive() default @Reactive(ValueConstants.DEFAULT_NONE);
+
}
diff --git a/spring-integration-core/src/main/java/org/springframework/integration/annotation/BridgeTo.java b/spring-integration-core/src/main/java/org/springframework/integration/annotation/BridgeTo.java
index 507836af05e..b1d9687059c 100644
--- a/spring-integration-core/src/main/java/org/springframework/integration/annotation/BridgeTo.java
+++ b/spring-integration-core/src/main/java/org/springframework/integration/annotation/BridgeTo.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2014-2019 the original author or authors.
+ * Copyright 2014-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -22,6 +22,8 @@
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
+import org.springframework.messaging.handler.annotation.ValueConstants;
+
/**
* Messaging Annotation to mark a {@link org.springframework.context.annotation.Bean}
* method for a {@link org.springframework.messaging.MessageChannel} to produce a
@@ -50,8 +52,8 @@
public @interface BridgeTo {
/**
- * @return the outbound channel name to send the message to
- * {@link org.springframework.integration.handler.BridgeHandler}.
+ * @return the outbound channel name to send the message to for the
+ * {@link org.springframework.integration.handler.BridgeHandler} reply.
* Optional: when omitted the message is sent to the {@code reply-channel}
* in its headers (if present - an exception is thrown otherwise).
*/
@@ -73,11 +75,18 @@
String phase() default "";
/**
- * @return the {@link org.springframework.integration.annotation.Poller} options for a polled endpoint
+ * @return the {@link Poller} options for a polled endpoint
* ({@link org.springframework.integration.scheduling.PollerMetadata}).
* This attribute is an {@code array} just to allow an empty default (no poller).
- * Only one {@link org.springframework.integration.annotation.Poller} element is allowed.
+ * Mutually exclusive with {@link #reactive()}.
*/
Poller[] poller() default { };
+ /**
+ * @return the {@link Reactive} marker for a consumer endpoint.
+ * Mutually exclusive with {@link #poller()}.
+ * @since 5.5
+ */
+ Reactive reactive() default @Reactive(ValueConstants.DEFAULT_NONE);
+
}
diff --git a/spring-integration-core/src/main/java/org/springframework/integration/annotation/Filter.java b/spring-integration-core/src/main/java/org/springframework/integration/annotation/Filter.java
index 5501ead166f..de1e0c0e81b 100644
--- a/spring-integration-core/src/main/java/org/springframework/integration/annotation/Filter.java
+++ b/spring-integration-core/src/main/java/org/springframework/integration/annotation/Filter.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2019 the original author or authors.
+ * Copyright 2002-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -22,6 +22,8 @@
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
+import org.springframework.messaging.handler.annotation.ValueConstants;
+
/**
* Indicates that a method is capable of playing the role of a Message Filter.
*
@@ -38,6 +40,7 @@
* @author Mark Fisher
* @author Gary Russell
* @author Artem Bilan
+ *
* @since 2.0
*/
@Target({ ElementType.METHOD, ElementType.ANNOTATION_TYPE })
@@ -126,8 +129,15 @@
* @return the {@link Poller} options for a polled endpoint
* ({@link org.springframework.integration.scheduling.PollerMetadata}).
* This attribute is an {@code array} just to allow an empty default (no poller).
- * Only one {@link Poller} element is allowed.
+ * Mutually exclusive with {@link #reactive()}.
*/
Poller[] poller() default { };
+ /**
+ * @return the {@link Reactive} marker for a consumer endpoint.
+ * Mutually exclusive with {@link #poller()}.
+ * @since 5.5
+ */
+ Reactive reactive() default @Reactive(ValueConstants.DEFAULT_NONE);
+
}
diff --git a/spring-integration-core/src/main/java/org/springframework/integration/annotation/Reactive.java b/spring-integration-core/src/main/java/org/springframework/integration/annotation/Reactive.java
new file mode 100644
index 00000000000..b2d0e907cd2
--- /dev/null
+++ b/spring-integration-core/src/main/java/org/springframework/integration/annotation/Reactive.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2021 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.integration.annotation;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Provides reactive configuration options for the consumer endpoint making
+ * any input channel as a reactive stream source of data.
+ *
+ * @author Artem Bilan
+ *
+ * @since 5.5
+ */
+@Target({})
+@Retention(RetentionPolicy.RUNTIME)
+public @interface Reactive {
+
+ /**
+ * @return the function bean name to be used in the
+ * {@link reactor.core.publisher.Flux#transform} on the input channel
+ * {@link reactor.core.publisher.Flux}.
+ */
+ String value() default "";
+
+}
diff --git a/spring-integration-core/src/main/java/org/springframework/integration/annotation/Router.java b/spring-integration-core/src/main/java/org/springframework/integration/annotation/Router.java
index 13c84a8cec1..1baf4025740 100644
--- a/spring-integration-core/src/main/java/org/springframework/integration/annotation/Router.java
+++ b/spring-integration-core/src/main/java/org/springframework/integration/annotation/Router.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2019 the original author or authors.
+ * Copyright 2002-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -22,6 +22,8 @@
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
+import org.springframework.messaging.handler.annotation.ValueConstants;
+
/**
* Indicates that a method is capable of resolving to a channel or channel name
* based on a message, message header(s), or both.
@@ -150,8 +152,15 @@
* @return the {@link Poller} options for a polled endpoint
* ({@link org.springframework.integration.scheduling.PollerMetadata}).
* This attribute is an {@code array} just to allow an empty default (no poller).
- * Only one {@link Poller} element is allowed.
+ * Mutually exclusive with {@link #reactive()}.
*/
Poller[] poller() default { };
+ /**
+ * @return the {@link Reactive} marker for a consumer endpoint.
+ * Mutually exclusive with {@link #poller()}.
+ * @since 5.5
+ */
+ Reactive reactive() default @Reactive(ValueConstants.DEFAULT_NONE);
+
}
diff --git a/spring-integration-core/src/main/java/org/springframework/integration/annotation/ServiceActivator.java b/spring-integration-core/src/main/java/org/springframework/integration/annotation/ServiceActivator.java
index 9b577e61851..86be941aab7 100644
--- a/spring-integration-core/src/main/java/org/springframework/integration/annotation/ServiceActivator.java
+++ b/spring-integration-core/src/main/java/org/springframework/integration/annotation/ServiceActivator.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2019 the original author or authors.
+ * Copyright 2002-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -22,6 +22,8 @@
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
+import org.springframework.messaging.handler.annotation.ValueConstants;
+
/**
* Indicates that a method is capable of handling a message or message payload.
*
@@ -121,7 +123,15 @@
* ({@link org.springframework.integration.scheduling.PollerMetadata}).
* This attribute is an {@code array} just to allow an empty default (no poller).
* Only one {@link Poller} element is allowed.
+ * Mutually exclusive with {@link #reactive()}.
*/
Poller[] poller() default { };
+ /**
+ * @return the {@link Reactive} marker for a consumer endpoint.
+ * Mutually exclusive with {@link #poller()}.
+ * @since 5.5
+ */
+ Reactive reactive() default @Reactive(ValueConstants.DEFAULT_NONE);
+
}
diff --git a/spring-integration-core/src/main/java/org/springframework/integration/annotation/Splitter.java b/spring-integration-core/src/main/java/org/springframework/integration/annotation/Splitter.java
index ad7b69fabcd..78cfcbf7f81 100644
--- a/spring-integration-core/src/main/java/org/springframework/integration/annotation/Splitter.java
+++ b/spring-integration-core/src/main/java/org/springframework/integration/annotation/Splitter.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2019 the original author or authors.
+ * Copyright 2002-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -22,6 +22,8 @@
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
+import org.springframework.messaging.handler.annotation.ValueConstants;
+
/**
* Indicates that a method is capable of splitting a single message or message
* payload to produce multiple messages or payloads.
@@ -115,8 +117,15 @@
* @return the {@link Poller} options for a polled endpoint
* ({@link org.springframework.integration.scheduling.PollerMetadata}).
* This attribute is an {@code array} just to allow an empty default (no poller).
- * Only one {@link Poller} element is allowed.
+ * Mutually exclusive with {@link #reactive()}.
*/
Poller[] poller() default { };
+ /**
+ * @return the {@link Reactive} marker for a consumer endpoint.
+ * Mutually exclusive with {@link #poller()}.
+ * @since 5.5
+ */
+ Reactive reactive() default @Reactive(ValueConstants.DEFAULT_NONE);
+
}
diff --git a/spring-integration-core/src/main/java/org/springframework/integration/annotation/Transformer.java b/spring-integration-core/src/main/java/org/springframework/integration/annotation/Transformer.java
index e31b02e6976..a93595ed0a4 100644
--- a/spring-integration-core/src/main/java/org/springframework/integration/annotation/Transformer.java
+++ b/spring-integration-core/src/main/java/org/springframework/integration/annotation/Transformer.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2019 the original author or authors.
+ * Copyright 2002-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -22,6 +22,8 @@
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
+import org.springframework.messaging.handler.annotation.ValueConstants;
+
/**
* Indicates that a method is capable of transforming a message, message header,
* or message payload.
@@ -91,8 +93,15 @@
* @return the {@link Poller} options for a polled endpoint
* ({@link org.springframework.integration.scheduling.PollerMetadata}).
* This attribute is an {@code array} just to allow an empty default (no poller).
- * Only one {@link Poller} element is allowed.
+ * Mutually exclusive with {@link #reactive()}.
*/
Poller[] poller() default { };
+ /**
+ * @return the {@link Reactive} marker for a consumer endpoint.
+ * Mutually exclusive with {@link #poller()}.
+ * @since 5.5
+ */
+ Reactive reactive() default @Reactive(ValueConstants.DEFAULT_NONE);
+
}
diff --git a/spring-integration-core/src/main/java/org/springframework/integration/config/ConsumerEndpointFactoryBean.java b/spring-integration-core/src/main/java/org/springframework/integration/config/ConsumerEndpointFactoryBean.java
index acc1a70a819..3ae0d8692d4 100644
--- a/spring-integration-core/src/main/java/org/springframework/integration/config/ConsumerEndpointFactoryBean.java
+++ b/spring-integration-core/src/main/java/org/springframework/integration/config/ConsumerEndpointFactoryBean.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2020 the original author or authors.
+ * Copyright 2002-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,10 +17,11 @@
package org.springframework.integration.config;
import java.util.List;
+import java.util.function.Function;
import org.aopalliance.aop.Advice;
-import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.reactivestreams.Publisher;
import org.springframework.aop.framework.Advised;
import org.springframework.aop.framework.ProxyFactory;
@@ -35,6 +36,7 @@
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.SmartLifecycle;
+import org.springframework.core.log.LogAccessor;
import org.springframework.integration.channel.FixedSubscriberChannel;
import org.springframework.integration.context.IntegrationObjectSupport;
import org.springframework.integration.endpoint.AbstractEndpoint;
@@ -46,6 +48,8 @@
import org.springframework.integration.handler.advice.HandleMessageAdvice;
import org.springframework.integration.scheduling.PollerMetadata;
import org.springframework.integration.support.channel.ChannelResolverUtils;
+import org.springframework.lang.Nullable;
+import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.PollableChannel;
@@ -57,6 +61,8 @@
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
+import reactor.core.publisher.Flux;
+
/**
* The {@link FactoryBean} implementation for {@link AbstractEndpoint} population.
@@ -76,7 +82,7 @@ public class ConsumerEndpointFactoryBean
implements FactoryBean, BeanFactoryAware, BeanNameAware, BeanClassLoaderAware,
InitializingBean, SmartLifecycle, DisposableBean {
- private static final Log LOGGER = LogFactory.getLog(ConsumerEndpointFactoryBean.class);
+ private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(ConsumerEndpointFactoryBean.class));
private final Object initializationMonitor = new Object();
@@ -90,6 +96,9 @@ public class ConsumerEndpointFactoryBean
private PollerMetadata pollerMetadata;
+ @Nullable
+ private Function super Flux>, ? extends Publisher>> reactiveCustomizer;
+
private Boolean autoStartup;
private int phase = 0;
@@ -140,6 +149,12 @@ public void setPollerMetadata(PollerMetadata pollerMetadata) {
this.pollerMetadata = pollerMetadata;
}
+ public void setReactiveCustomizer(
+ @Nullable Function super Flux>, ? extends Publisher>> reactiveCustomizer) {
+
+ this.reactiveCustomizer = reactiveCustomizer;
+ }
+
/**
* Specify the {@link DestinationResolver} strategy to use.
* The default is a BeanFactoryChannelResolver.
@@ -192,7 +207,7 @@ public void setTaskScheduler(TaskScheduler taskScheduler) {
@Override
public void afterPropertiesSet() {
if (this.beanName == null) {
- LOGGER.error("The MessageHandler [" + this.handler + "] will be created without a 'componentName'. " +
+ LOGGER.error(() -> "The MessageHandler [" + this.handler + "] will be created without a 'componentName'. " +
"Consider specifying the 'beanName' property on this ConsumerEndpointFactoryBean.");
}
else {
@@ -228,11 +243,9 @@ private void populateComponentNameIfAny() {
}
}
}
- catch (Exception e) {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Could not set component name for handler "
- + this.handler + " for " + this.beanName + " :" + e.getMessage());
- }
+ catch (Exception ex) {
+ LOGGER.debug(() -> "Could not set component name for handler "
+ + this.handler + " for " + this.beanName + " :" + ex.getMessage());
}
}
@@ -297,20 +310,27 @@ private void initializeEndpoint() {
channel = this.inputChannel;
}
Assert.state(channel != null, "one of inputChannelName or inputChannel is required");
- if (channel instanceof SubscribableChannel) {
+
+ Assert.state(this.reactiveCustomizer == null || this.pollerMetadata == null,
+ "The 'pollerMetadata' and 'reactiveCustomizer' are mutually exclusive.");
+
+ if (channel instanceof Publisher ||
+ this.handler instanceof ReactiveMessageHandlerAdapter ||
+ this.reactiveCustomizer != null) {
+
+ reactiveStreamsConsumer(channel);
+ }
+ else if (channel instanceof SubscribableChannel) {
eventDrivenConsumer(channel);
}
else if (channel instanceof PollableChannel) {
pollingConsumer(channel);
}
else {
- if (this.handler instanceof ReactiveMessageHandlerAdapter) {
- this.endpoint = new ReactiveStreamsConsumer(channel,
- ((ReactiveMessageHandlerAdapter) this.handler).getDelegate());
- }
- else {
- this.endpoint = new ReactiveStreamsConsumer(channel, this.handler);
- }
+ throw new IllegalArgumentException("Unsupported 'inputChannel' type: '"
+ + channel.getClass().getName() + "'. " +
+ "Must be one of 'SubscribableChannel', 'PollableChannel' " +
+ "or 'ReactiveStreamsSubscribableChannel'");
}
this.endpoint.setBeanName(this.beanName);
this.endpoint.setBeanFactory(this.beanFactory);
@@ -324,14 +344,27 @@ else if (channel instanceof PollableChannel) {
}
}
+ private void reactiveStreamsConsumer(MessageChannel channel) {
+ ReactiveStreamsConsumer reactiveStreamsConsumer;
+ if (this.handler instanceof ReactiveMessageHandlerAdapter) {
+ reactiveStreamsConsumer = new ReactiveStreamsConsumer(channel,
+ ((ReactiveMessageHandlerAdapter) this.handler).getDelegate());
+ }
+ else {
+ reactiveStreamsConsumer = new ReactiveStreamsConsumer(channel, this.handler);
+ }
+
+ reactiveStreamsConsumer.setReactiveCustomizer(this.reactiveCustomizer);
+
+ this.endpoint = reactiveStreamsConsumer;
+ }
+
private void eventDrivenConsumer(MessageChannel channel) {
Assert.isNull(this.pollerMetadata,
() -> "A poller should not be specified for endpoint '" + this.beanName
+ "', since '" + channel + "' is a SubscribableChannel (not pollable).");
this.endpoint = new EventDrivenConsumer((SubscribableChannel) channel, this.handler);
- if (LOGGER.isWarnEnabled()
- && Boolean.FALSE.equals(this.autoStartup)
- && channel instanceof FixedSubscriberChannel) {
+ if (Boolean.FALSE.equals(this.autoStartup) && channel instanceof FixedSubscriberChannel) {
LOGGER.warn("'autoStartup=\"false\"' has no effect when using a FixedSubscriberChannel");
}
}
diff --git a/spring-integration-core/src/main/java/org/springframework/integration/config/annotation/AbstractMethodAnnotationPostProcessor.java b/spring-integration-core/src/main/java/org/springframework/integration/config/annotation/AbstractMethodAnnotationPostProcessor.java
index 9e809a7141c..88a9940f840 100644
--- a/spring-integration-core/src/main/java/org/springframework/integration/config/annotation/AbstractMethodAnnotationPostProcessor.java
+++ b/spring-integration-core/src/main/java/org/springframework/integration/config/annotation/AbstractMethodAnnotationPostProcessor.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2020 the original author or authors.
+ * Copyright 2002-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -51,6 +51,7 @@
import org.springframework.core.task.TaskExecutor;
import org.springframework.integration.annotation.IdempotentReceiver;
import org.springframework.integration.annotation.Poller;
+import org.springframework.integration.annotation.Reactive;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.MessagePublishingErrorHandler;
import org.springframework.integration.config.IntegrationConfigUtils;
@@ -75,12 +76,14 @@
import org.springframework.integration.util.ClassUtils;
import org.springframework.integration.util.MessagingAnnotationUtils;
import org.springframework.lang.Nullable;
+import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.core.DestinationResolutionException;
import org.springframework.messaging.core.DestinationResolver;
+import org.springframework.messaging.handler.annotation.ValueConstants;
import org.springframework.scheduling.Trigger;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.scheduling.support.PeriodicTrigger;
@@ -89,6 +92,8 @@
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
+import reactor.core.publisher.Flux;
+
/**
* Base class for Method-level annotation post-processors.
*
@@ -291,7 +296,7 @@ public boolean shouldCreateEndpoint(Method method, List annotations)
boolean createEndpoint = StringUtils.hasText(inputChannel);
if (!createEndpoint && beanAnnotationAware()) {
boolean isBean = AnnotatedElementUtils.isAnnotated(method, Bean.class.getName());
- Assert.isTrue(!isBean, "A channel name in '" + getInputChannelAttribute() + "' is required when " +
+ Assert.isTrue(!isBean, () -> "A channel name in '" + getInputChannelAttribute() + "' is required when " +
this.annotationType + " is used on '@Bean' methods.");
}
return createEndpoint;
@@ -362,7 +367,7 @@ protected AbstractEndpoint createEndpoint(MessageHandler handler, @SuppressWarni
throw e;
}
}
- Assert.notNull(inputChannel, "failed to resolve inputChannel '" + inputChannelName + "'");
+ Assert.notNull(inputChannel, () -> "failed to resolve inputChannel '" + inputChannelName + "'");
endpoint = doCreateEndpoint(handler, inputChannel, annotations);
}
@@ -371,35 +376,66 @@ protected AbstractEndpoint createEndpoint(MessageHandler handler, @SuppressWarni
protected AbstractEndpoint doCreateEndpoint(MessageHandler handler, MessageChannel inputChannel,
List annotations) {
- AbstractEndpoint endpoint;
- if (inputChannel instanceof PollableChannel) {
- PollingConsumer pollingConsumer = new PollingConsumer((PollableChannel) inputChannel, handler);
- configurePollingEndpoint(pollingConsumer, annotations);
- endpoint = pollingConsumer;
+
+ Poller[] pollers = MessagingAnnotationUtils.resolveAttribute(annotations, "poller", Poller[].class);
+ Reactive reactive = MessagingAnnotationUtils.resolveAttribute(annotations, "reactive", Reactive.class);
+ boolean reactiveProvided = reactive != null && !ValueConstants.DEFAULT_NONE.equals(reactive.value());
+
+ Assert.state(!reactiveProvided || ObjectUtils.isEmpty(pollers),
+ "The 'poller' and 'reactive' are mutually exclusive.");
+
+ if (inputChannel instanceof Publisher || handler instanceof ReactiveMessageHandlerAdapter || reactiveProvided) {
+ return reactiveStreamsConsumer(inputChannel, handler, reactiveProvided ? reactive : null);
+ }
+ else if (inputChannel instanceof SubscribableChannel) {
+ Assert.state(ObjectUtils.isEmpty(pollers), () ->
+ "A '@Poller' should not be specified for Annotation-based " +
+ "endpoint, since '" + inputChannel + "' is a SubscribableChannel (not pollable).");
+ return new EventDrivenConsumer((SubscribableChannel) inputChannel, handler);
+ }
+ else if (inputChannel instanceof PollableChannel) {
+ return pollingConsumer(inputChannel, handler, pollers);
}
else {
- Poller[] pollers = MessagingAnnotationUtils.resolveAttribute(annotations, "poller", Poller[].class);
- Assert.state(ObjectUtils.isEmpty(pollers), "A '@Poller' should not be specified for Annotation-based " +
- "endpoint, since '" + inputChannel + "' is a SubscribableChannel (not pollable).");
- if (inputChannel instanceof Publisher) {
- if (handler instanceof ReactiveMessageHandlerAdapter) {
- endpoint = new ReactiveStreamsConsumer(inputChannel,
- ((ReactiveMessageHandlerAdapter) handler).getDelegate());
- }
- else {
- endpoint = new ReactiveStreamsConsumer(inputChannel, handler);
- }
- }
- else {
- endpoint = new EventDrivenConsumer((SubscribableChannel) inputChannel, handler);
+ throw new IllegalArgumentException("Unsupported 'inputChannel' type: '"
+ + inputChannel.getClass().getName() + "'. " +
+ "Must be one of 'SubscribableChannel', 'PollableChannel' or 'ReactiveStreamsSubscribableChannel'");
+ }
+ }
+
+ private ReactiveStreamsConsumer reactiveStreamsConsumer(MessageChannel channel, MessageHandler handler,
+ Reactive reactive) {
+
+ ReactiveStreamsConsumer reactiveStreamsConsumer;
+ if (handler instanceof ReactiveMessageHandlerAdapter) {
+ reactiveStreamsConsumer = new ReactiveStreamsConsumer(channel,
+ ((ReactiveMessageHandlerAdapter) handler).getDelegate());
+ }
+ else {
+ reactiveStreamsConsumer = new ReactiveStreamsConsumer(channel, handler);
+ }
+
+ if (reactive != null) {
+ String functionBeanName = reactive.value();
+ if (StringUtils.hasText(functionBeanName)) {
+ @SuppressWarnings("unchecked")
+ Function super Flux>, ? extends Publisher>> reactiveCustomizer =
+ this.beanFactory.getBean(functionBeanName, Function.class);
+ reactiveStreamsConsumer.setReactiveCustomizer(reactiveCustomizer);
}
}
- return endpoint;
+
+ return reactiveStreamsConsumer;
}
- protected void configurePollingEndpoint(AbstractPollingEndpoint pollingEndpoint, List annotations) {
+ private PollingConsumer pollingConsumer(MessageChannel inputChannel, MessageHandler handler, Poller[] pollers) {
+ PollingConsumer pollingConsumer = new PollingConsumer((PollableChannel) inputChannel, handler);
+ configurePollingEndpoint(pollingConsumer, pollers);
+ return pollingConsumer;
+ }
+
+ protected void configurePollingEndpoint(AbstractPollingEndpoint pollingEndpoint, Poller[] pollers) {
PollerMetadata pollerMetadata;
- Poller[] pollers = MessagingAnnotationUtils.resolveAttribute(annotations, "poller", Poller[].class);
if (!ObjectUtils.isEmpty(pollers)) {
Assert.state(pollers.length == 1,
"The 'poller' for an Annotation-based endpoint can have only one '@Poller'.");
@@ -516,7 +552,7 @@ protected String generateHandlerBeanName(String originalBeanName, Method method)
name = baseName;
int count = 1;
while (this.beanFactory.containsBean(name)) {
- name = baseName + "#" + (++count);
+ name = baseName + '#' + (++count);
}
}
return name + IntegrationConfigUtils.HANDLER_ALIAS_SUFFIX;
diff --git a/spring-integration-core/src/main/java/org/springframework/integration/config/annotation/InboundChannelAdapterAnnotationPostProcessor.java b/spring-integration-core/src/main/java/org/springframework/integration/config/annotation/InboundChannelAdapterAnnotationPostProcessor.java
index 831d59ee093..31bcc5671c5 100644
--- a/spring-integration-core/src/main/java/org/springframework/integration/config/annotation/InboundChannelAdapterAnnotationPostProcessor.java
+++ b/spring-integration-core/src/main/java/org/springframework/integration/config/annotation/InboundChannelAdapterAnnotationPostProcessor.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2014-2019 the original author or authors.
+ * Copyright 2014-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -27,6 +27,7 @@
import org.springframework.core.annotation.AnnotatedElementUtils;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.integration.annotation.InboundChannelAdapter;
+import org.springframework.integration.annotation.Poller;
import org.springframework.integration.config.IntegrationConfigUtils;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.endpoint.MethodInvokingMessageSource;
@@ -64,7 +65,7 @@ public Object postProcess(Object bean, String beanName, Method method, List messageSource = null;
+ MessageSource> messageSource;
try {
messageSource = createMessageSource(bean, beanName, method);
}
@@ -80,7 +81,8 @@ public Object postProcess(Object bean, String beanName, Method method, List>, ? extends Publisher>> reactiveCustomizer) {
+ this.endpointFactoryBean.setReactiveCustomizer(reactiveCustomizer);
+ return _this();
+ }
+
@Override
public S role(String role) {
this.endpointFactoryBean.setRole(role);
diff --git a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/ReactiveStreamsConsumer.java b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/ReactiveStreamsConsumer.java
index 0b7901c25d8..c509ca3f596 100644
--- a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/ReactiveStreamsConsumer.java
+++ b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/ReactiveStreamsConsumer.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2020 the original author or authors.
+ * Copyright 2016-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,6 +17,7 @@
package org.springframework.integration.endpoint;
import java.util.function.Consumer;
+import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
@@ -68,6 +69,9 @@ public class ReactiveStreamsConsumer extends AbstractEndpoint implements Integra
@Nullable
private final Lifecycle lifecycleDelegate;
+ @Nullable
+ private Function super Flux>, ? extends Publisher>> reactiveCustomizer;
+
private ErrorHandler errorHandler;
private volatile Disposable subscription;
@@ -126,6 +130,12 @@ public void setErrorHandler(ErrorHandler errorHandler) {
this.errorHandler = errorHandler;
}
+ public void setReactiveCustomizer(
+ @Nullable Function super Flux>, ? extends Publisher>> reactiveCustomizer) {
+
+ this.reactiveCustomizer = reactiveCustomizer;
+ }
+
@Override
public MessageChannel getInputChannel() {
return this.inputChannel;
@@ -163,16 +173,21 @@ protected void doStart() {
this.lifecycleDelegate.start();
}
+ Flux> fluxFromChannel = Flux.from(this.publisher);
+ if (this.reactiveCustomizer != null) {
+ fluxFromChannel = fluxFromChannel.transform(this.reactiveCustomizer);
+ }
+
if (this.reactiveMessageHandler != null) {
this.subscription =
- Flux.from(this.publisher)
+ fluxFromChannel
.flatMap(this.reactiveMessageHandler::handleMessage)
.onErrorContinue((ex, data) -> this.errorHandler.handleError(ex))
.subscribe();
}
else if (this.subscriber != null) {
this.subscription =
- Flux.from(this.publisher)
+ fluxFromChannel
.subscribeWith(new SubscriberDecorator(this.subscriber, this.errorHandler));
}
}
diff --git a/spring-integration-core/src/main/java/org/springframework/integration/util/MessagingAnnotationUtils.java b/spring-integration-core/src/main/java/org/springframework/integration/util/MessagingAnnotationUtils.java
index e9c639a5335..b5190da2c71 100644
--- a/spring-integration-core/src/main/java/org/springframework/integration/util/MessagingAnnotationUtils.java
+++ b/spring-integration-core/src/main/java/org/springframework/integration/util/MessagingAnnotationUtils.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2014-2019 the original author or authors.
+ * Copyright 2014-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -61,11 +61,9 @@ public final class MessagingAnnotationUtils {
@SuppressWarnings("unchecked")
public static T resolveAttribute(List annotations, String name, Class requiredType) {
for (Annotation annotation : annotations) {
- if (annotation != null) {
- Object value = AnnotationUtils.getValue(annotation, name);
- if (value != null && value.getClass() == requiredType && hasValue(value)) {
- return (T) value;
- }
+ Object value = AnnotationUtils.getValue(annotation, name);
+ if (requiredType.isInstance(value) && hasValue(value)) {
+ return (T) value;
}
}
return null;
diff --git a/spring-integration-core/src/test/java/org/springframework/integration/channel/reactive/ReactiveStreamsConsumerTests.java b/spring-integration-core/src/test/java/org/springframework/integration/channel/reactive/ReactiveStreamsConsumerTests.java
index 43276627363..ff2782a632d 100644
--- a/spring-integration-core/src/test/java/org/springframework/integration/channel/reactive/ReactiveStreamsConsumerTests.java
+++ b/spring-integration-core/src/test/java/org/springframework/integration/channel/reactive/ReactiveStreamsConsumerTests.java
@@ -33,6 +33,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -336,4 +337,36 @@ public void testReactiveStreamsConsumerFluxMessageChannelReactiveMessageHandler(
testChannel.destroy();
}
+ @Test
+ public void testReactiveCustomizer() throws Exception {
+ DirectChannel testChannel = new DirectChannel();
+
+ AtomicReference> spied = new AtomicReference<>();
+ AtomicReference> result = new AtomicReference<>();
+ CountDownLatch stopLatch = new CountDownLatch(1);
+
+ MessageHandler messageHandler = m -> {
+ result.set(m);
+ stopLatch.countDown();
+ };
+
+ ConsumerEndpointFactoryBean endpointFactoryBean = new ConsumerEndpointFactoryBean();
+ endpointFactoryBean.setBeanFactory(mock(ConfigurableBeanFactory.class));
+ endpointFactoryBean.setInputChannel(testChannel);
+ endpointFactoryBean.setHandler(messageHandler);
+ endpointFactoryBean.setBeanName("reactiveConsumer");
+ endpointFactoryBean.setReactiveCustomizer(flux -> flux.doOnNext(spied::set));
+ endpointFactoryBean.afterPropertiesSet();
+ endpointFactoryBean.start();
+
+ Message> testMessage = new GenericMessage<>("test");
+ testChannel.send(testMessage);
+
+ assertThat(stopLatch.await(10, TimeUnit.SECONDS)).isTrue();
+ endpointFactoryBean.stop();
+
+ assertThat(result.get()).isSameAs(testMessage);
+ assertThat(spied.get()).isSameAs(testMessage);
+ }
+
}
diff --git a/spring-integration-core/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationsWithBeanAnnotationTests.java b/spring-integration-core/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationsWithBeanAnnotationTests.java
index 9f08fb25582..d77a4f2eb51 100644
--- a/spring-integration-core/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationsWithBeanAnnotationTests.java
+++ b/spring-integration-core/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationsWithBeanAnnotationTests.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2014-2020 the original author or authors.
+ * Copyright 2014-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,6 +21,8 @@
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -48,6 +50,7 @@
import org.springframework.integration.annotation.Filter;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
+import org.springframework.integration.annotation.Reactive;
import org.springframework.integration.annotation.Router;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.annotation.Splitter;
@@ -81,6 +84,7 @@
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.test.StepVerifier;
@@ -145,10 +149,12 @@ public class MessagingAnnotationsWithBeanAnnotationTests {
@Autowired
private MessageChannel messageConsumerServiceChannel;
+ @Autowired
+ private CountDownLatch reactiveCustomizerLatch;
+
@Test
- public void testMessagingAnnotationsFlow() {
+ public void testMessagingAnnotationsFlow() throws InterruptedException {
Stream.of(this.sourcePollingChannelAdapters).forEach(AbstractEndpoint::start);
- //this.sourcePollingChannelAdapter.start();
for (int i = 0; i < 10; i++) {
Message> receive = this.discardChannel.receive(10000);
assertThat(receive).isNotNull();
@@ -164,6 +170,9 @@ public void testMessagingAnnotationsFlow() {
"'messagingAnnotationsWithBeanAnnotationTests.ContextConfiguration.filter.filter.handler'");
}
+
+ assertThat(reactiveCustomizerLatch.await(10, TimeUnit.SECONDS)).isTrue();
+
for (Message> message : this.collector) {
assertThat(((Integer) message.getPayload()) % 2).isNotEqualTo(0);
MessageHistory messageHistory = MessageHistory.read(message);
@@ -336,7 +345,17 @@ public MessageChannel splitterChannel() {
}
@Bean
- @Splitter(inputChannel = "splitterChannel")
+ public CountDownLatch reactiveCustomizerLatch() {
+ return new CountDownLatch(10);
+ }
+
+ @Bean
+ public Function, Flux>> reactiveCustomizer(CountDownLatch reactiveCustomizerLatch) {
+ return flux -> flux.doOnNext(data -> reactiveCustomizerLatch.countDown());
+ }
+
+ @Bean
+ @Splitter(inputChannel = "splitterChannel", reactive = @Reactive("reactiveCustomizer"))
public MessageHandler splitter() {
DefaultMessageSplitter defaultMessageSplitter = new DefaultMessageSplitter();
defaultMessageSplitter.setOutputChannelName("serviceChannel");
diff --git a/spring-integration-core/src/test/java/org/springframework/integration/configuration/EnableIntegrationTests.java b/spring-integration-core/src/test/java/org/springframework/integration/configuration/EnableIntegrationTests.java
index 004127f2ea9..994cd27e1b6 100644
--- a/spring-integration-core/src/test/java/org/springframework/integration/configuration/EnableIntegrationTests.java
+++ b/spring-integration-core/src/test/java/org/springframework/integration/configuration/EnableIntegrationTests.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2014-2020 the original author or authors.
+ * Copyright 2014-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -80,6 +80,7 @@
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.annotation.Publisher;
+import org.springframework.integration.annotation.Reactive;
import org.springframework.integration.annotation.Role;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.annotation.Transformer;
@@ -105,6 +106,7 @@
import org.springframework.integration.endpoint.EventDrivenConsumer;
import org.springframework.integration.endpoint.MethodInvokingMessageSource;
import org.springframework.integration.endpoint.PollingConsumer;
+import org.springframework.integration.endpoint.ReactiveStreamsConsumer;
import org.springframework.integration.expression.SpelPropertyAccessorRegistrar;
import org.springframework.integration.gateway.GatewayProxyFactoryBean;
import org.springframework.integration.handler.ServiceActivatingHandler;
@@ -270,6 +272,9 @@ public class EnableIntegrationTests {
@Autowired
private MessageChannel bridgeToInput;
+ @Autowired
+ private AbstractEndpoint reactiveBridge;
+
@Autowired
private PollableChannel bridgeToOutput;
@@ -655,6 +660,7 @@ public void testBridgeAnnotations() {
assertThat(testMessage).isSameAs(receive);
assertThat(this.metaBridgeOutput.receive(10)).isNull();
+ assertThat(this.reactiveBridge).isInstanceOf(ReactiveStreamsConsumer.class);
this.bridgeToInput.send(testMessage);
receive = this.bridgeToOutput.receive(10_000);
assertThat(receive).isNotNull();
@@ -881,7 +887,7 @@ public AtomicInteger fbInterceptorCounter() {
@Bean
@GlobalChannelInterceptor
public FactoryBean ciFactoryBean() {
- return new AbstractFactoryBean() {
+ return new AbstractFactoryBean<>() {
@Override
public Class> getObjectType() {
@@ -889,7 +895,7 @@ public Class> getObjectType() {
}
@Override
- protected ChannelInterceptor createInstance() throws Exception {
+ protected ChannelInterceptor createInstance() {
return new ChannelInterceptor() {
@Override
@@ -933,7 +939,8 @@ public QueueChannel bridgeToOutput() {
}
@Bean
- @BridgeTo("bridgeToOutput")
+ @BridgeTo(value = "bridgeToOutput", reactive = @Reactive)
+ @EndpointId("reactiveBridge")
public MessageChannel bridgeToInput() {
return new DirectChannel();
}
@@ -1320,7 +1327,7 @@ public String transform(Message message) {
assertThat(message.getHeaders().get("foo")).isEqualTo("FOO");
assertThat(message.getHeaders()).containsKey("calledMethod");
assertThat(message.getHeaders().get("calledMethod")).isEqualTo("echo");
- return this.handle(message.getPayload()) + Arrays.asList(new Throwable().getStackTrace()).toString();
+ return handle(message.getPayload()) + Arrays.asList(new Throwable().getStackTrace()).toString();
}
@Transformer(inputChannel = "gatewayChannel2")
@@ -1330,7 +1337,7 @@ public String transform2(Message message) {
assertThat(message.getHeaders().get("foo")).isEqualTo("FOO");
assertThat(message.getHeaders()).containsKey("calledMethod");
assertThat(message.getHeaders().get("calledMethod")).isEqualTo("echo2");
- return this.handle(message.getPayload()) + "2" + Arrays.asList(new Throwable().getStackTrace()).toString();
+ return handle(message.getPayload()) + "2" + Arrays.asList(new Throwable().getStackTrace()).toString();
}
@MyInboundChannelAdapter1
diff --git a/spring-integration-core/src/test/java/org/springframework/integration/dsl/reactivestreams/ReactiveStreamsTests.java b/spring-integration-core/src/test/java/org/springframework/integration/dsl/reactivestreams/ReactiveStreamsTests.java
index b916d3e7cca..7fd62aa144e 100644
--- a/spring-integration-core/src/test/java/org/springframework/integration/dsl/reactivestreams/ReactiveStreamsTests.java
+++ b/spring-integration-core/src/test/java/org/springframework/integration/dsl/reactivestreams/ReactiveStreamsTests.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2019 the original author or authors.
+ * Copyright 2016-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -46,6 +46,8 @@
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.dsl.context.IntegrationFlowContext;
+import org.springframework.integration.endpoint.AbstractEndpoint;
+import org.springframework.integration.endpoint.ReactiveStreamsConsumer;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.GenericMessage;
@@ -53,6 +55,7 @@
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
import reactor.core.publisher.Flux;
+import reactor.core.scheduler.Schedulers;
/**
@@ -73,6 +76,9 @@ public class ReactiveStreamsTests {
@Qualifier("pollableReactiveFlow")
private Publisher> pollablePublisher;
+ @Autowired
+ private AbstractEndpoint reactiveTransformer;
+
@Autowired
@Qualifier("reactiveStreamsMessageSource")
private Lifecycle messageSource;
@@ -109,12 +115,13 @@ void testReactiveFlow() throws Exception {
this.messageSource.start();
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
String[] strings = results.toArray(new String[0]);
- assertThat(strings).isEqualTo(new String[] { "A", "B", "C", "D", "E", "F" });
+ assertThat(strings).isEqualTo(new String[]{ "A", "B", "C", "D", "E", "F" });
this.messageSource.stop();
}
@Test
void testPollableReactiveFlow() throws Exception {
+ assertThat(this.reactiveTransformer).isInstanceOf(ReactiveStreamsConsumer.class);
this.inputChannel.send(new GenericMessage<>("1,2,3,4,5"));
CountDownLatch latch = new CountDownLatch(6);
@@ -216,9 +223,7 @@ void singleChannelFlowTest() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
Flux.from(this.singleChannelFlow)
.map(m -> m.getPayload().toUpperCase())
- .subscribe(p -> {
- latch.countDown();
- });
+ .subscribe(p -> latch.countDown());
this.singleChannel.send(new GenericMessage<>("foo"));
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
}
@@ -228,9 +233,7 @@ void fixedSubscriberChannelFlowTest() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
Flux.from(this.fixedSubscriberChannelFlow)
.map(m -> m.getPayload().toUpperCase())
- .subscribe(p -> {
- latch.countDown();
- });
+ .subscribe(p -> latch.countDown());
this.fixedSubscriberChannel.send(new GenericMessage<>("bar"));
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
}
@@ -258,7 +261,8 @@ public Publisher> pollableReactiveFlow() {
return IntegrationFlows
.from("inputChannel")
.split(s -> s.delimiters(","))
- .transform(Integer::parseInt)
+ .transform(Integer::parseInt,
+ e -> e.reactive(flux -> flux.publishOn(Schedulers.parallel())).id("reactiveTransformer"))
.channel(MessageChannels.queue())
.log()
.toReactivePublisher();
diff --git a/src/reference/asciidoc/configuration.adoc b/src/reference/asciidoc/configuration.adoc
index 85de3e5ee8f..99620000a0c 100644
--- a/src/reference/asciidoc/configuration.adoc
+++ b/src/reference/asciidoc/configuration.adoc
@@ -471,6 +471,38 @@ Starting with version 4.3.3, the `@Poller` annotation has the `errorChannel` att
This attribute plays the same role as `error-channel` in the `` XML component.
See <<./endpoint.adoc#endpoint-namespace,Endpoint Namespace Support>> for more information.
+The `poller()` attribute on the messaging annotations is mutually exclusive with the `reactive()` attribute.
+See next section for more information.
+
+[[configuration-using-reactive-annotation]]
+==== Using `@Reactive` Annotation
+
+The `ReactiveStreamsConsumer` has been around since version 5.0, but it was applied only when an input channel for the endpoint is a `FluxMessageChannel` (or any `org.reactivestreams.Publisher` implementation).
+Starting with version 5.3, its instance is also created by the framework when the target message handler is a `ReactiveMessageHandler` independently of the input channel type.
+The `@Reactive` sub-annotation (similar to mentioned above `@Poller`) has been introduced for all the messaging annotations starting with version 5.5.
+It accepts an optional `Function super Flux>, ? extends Publisher>>` bean reference and, independently of the input channel type and message handler, turns the target endpoint into the `ReactiveStreamsConsumer` instance.
+The function is used from the `Flux.transform()` operator to apply some customization (`publishOn()`, `doOnNext()`, `log()`, `retry()` etc.) on a reactive stream source from the input channel.
+
+The following example demonstrates how to change the publishing thread from the input channel independently of the final subscriber and producer to that `DirectChannel`:
+
+====
+[source,java]
+----
+@Bean
+public Function, Flux>> publishOnCustomizer() {
+ return flux -> flux.publishOn(Schedulers.parallel());
+}
+
+@ServiceActivator(inputChannel = "directChannel", reactive = @Reactive("publishOnCustomizer"))
+public void handleReactive(String payload) {
+ ...
+}
+----
+====
+
+The `reactive()` attribute on the messaging annotations is mutually exclusive with the `poller()` attribute.
+See <> and <<./reactive-streams.adoc#reactive-streams, Reactive Streams Support>> for more information.
+
==== Using the `@InboundChannelAdapter` Annotation
Version 4.0 introduced the `@InboundChannelAdapter` method-level annotation.
diff --git a/src/reference/asciidoc/dsl.adoc b/src/reference/asciidoc/dsl.adoc
index c1642a2ac4f..a8fce52b317 100644
--- a/src/reference/asciidoc/dsl.adoc
+++ b/src/reference/asciidoc/dsl.adoc
@@ -262,6 +262,31 @@ See https://docs.spring.io/spring-integration/api/org/springframework/integratio
IMPORTANT: If you use the DSL to construct a `PollerSpec` as a `@Bean`, do not call the `get()` method in the bean definition.
The `PollerSpec` is a `FactoryBean` that generates the `PollerMetadata` object from the specification and initializes all of its properties.
+[[java-dsl-reactive]]
+=== The `reactive()` Endpoint
+
+Starting with version 5.5, the `ConsumerEndpointSpec` provides a `reactive()` configuration property with an optional customizer `Function super Flux>, ? extends Publisher>>`.
+This option configures the target endpoint as a `ReactiveStreamsConsumer` instance, independently of the input channel type, which is converted to a `Flux` via `IntegrationReactiveUtils.messageChannelToFlux()`.
+The provided function is used from the `Flux.transform()` operator to customize (`publishOn()`, `log()`, `doOnNext()` etc.) a reactive stream source from the input channel.
+
+The following example demonstrates how to change the publishing thread from the input channel independently of the final subscriber and producer to that `DirectChannel`:
+
+====
+[source,java]
+----
+@Bean
+public IntegrationFlow reactiveEndpointFlow() {
+ return IntegrationFlows
+ .from("inputChannel")
+ .transform(Integer::parseInt,
+ e -> e.reactive(flux -> flux.publishOn(Schedulers.parallel())))
+ .get();
+}
+----
+====
+
+See <<./reactive-streams.adoc#reactive-streams, Reactive Streams Support>> for more information.
+
[[java-dsl-endpoints]]
=== DSL and Endpoint Configuration
diff --git a/src/reference/asciidoc/reactive-streams.adoc b/src/reference/asciidoc/reactive-streams.adoc
index 7d926012607..66602935918 100644
--- a/src/reference/asciidoc/reactive-streams.adoc
+++ b/src/reference/asciidoc/reactive-streams.adoc
@@ -62,10 +62,14 @@ A consumer for the `FluxMessageChannel` must be a `org.reactivestreams.Subscribe
Fortunately, all of the `MessageHandler` implementations in Spring Integration also implement a `CoreSubscriber` from project Reactor.
And thanks to a `ReactiveStreamsConsumer` implementation in between, the whole integration flow configuration is left transparent for target developers.
In this case, the flow behavior is changed from an imperative push model to a reactive pull model.
-A `ReactiveStreamsConsumer` can also be used to turn any `MessageChannel` into a reactive source using `MessageChannelReactiveUtils`, making an integration flow partially reactive.
+A `ReactiveStreamsConsumer` can also be used to turn any `MessageChannel` into a reactive source using `IntegrationReactiveUtils`, making an integration flow partially reactive.
See <<./channel.adoc#flux-message-channel,`FluxMessageChannel`>> for more information.
+Starting with version 5.5, the `ConsumerEndpointSpec` introduces a `reactive()` option to make the endpoint in the flow as a `ReactiveStreamsConsumer` independently of the input channel.
+The optional `Function super Flux>, ? extends Publisher>>` can be provided to customise a source `Flux` from the input channel via `Flux.transform()` operation, e.g. with the `publishOn()`, `doOnNext()`, `retry()` etc.
+This functionality is represented as a `@Reactive` sub-annotation for all the messaging annotation (`@ServiceActivator`, `@Splitter` etc.) via their `reactive()` attribute.
+
=== Source Polling Channel Adapter
Usually, the `SourcePollingChannelAdapter` relies on the task which is initiated by the `TaskScheduler`.
diff --git a/src/reference/asciidoc/whats-new.adoc b/src/reference/asciidoc/whats-new.adoc
index d8e15648067..6e4d32b3893 100644
--- a/src/reference/asciidoc/whats-new.adoc
+++ b/src/reference/asciidoc/whats-new.adoc
@@ -30,6 +30,10 @@ An `AbstractPollingEndpoint` (source polling channel adapter and polling consume
It can be changed to different value later on, e.g. via a Control Bus.
See <<./endpoint.adoc#endpoint-pollingconsumer,Polling Consumer>> for more information.
+The `ConsumerEndpointFactoryBean` now accept a `reactiveCustomizer` `Function` to any input channel as reactive stream source and use a `ReactiveStreamsConsumer` underneath.
+This is covered as a `ConsumerEndpointSpec.reactive()` option in Java DSL and as a `@Reactive` nested annotation for the messaging annotations.
+See <<./reactive-streams.adoc#reactive-streams,Reactive Streams Support>> for more information.
+
[[x5.5-amqp]]
==== AMQP Changes