diff --git a/spring-integration-core/src/main/java/org/springframework/integration/annotation/Reactive.java b/spring-integration-core/src/main/java/org/springframework/integration/annotation/Reactive.java new file mode 100644 index 00000000000..9fdd8958cd1 --- /dev/null +++ b/spring-integration-core/src/main/java/org/springframework/integration/annotation/Reactive.java @@ -0,0 +1,79 @@ +/* + * Copyright 2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.annotation; + +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +import org.springframework.core.annotation.AliasFor; +import org.springframework.integration.reactive.BackpressureType; + +import reactor.core.publisher.BufferOverflowStrategy; + +/** + * Provides backpressure options for the Messaging annotations for + * reactive endpoints. + *
+ * It is an analogue of the XML {@code
+ * Non-reference attributes support Property Placeholder resolutions.
+ *
+ * @author Artem Bilan
+ *
+ * @since 5.0
+ *
+ * @see reactor.core.publisher.Flux#onBackpressureBuffer
+ * @see reactor.core.publisher.Flux#onBackpressureDrop
+ * @see reactor.core.publisher.Flux#onBackpressureError
+ * @see reactor.core.publisher.Flux#onBackpressureLatest
+ */
+@Target({})
+@Retention(RetentionPolicy.RUNTIME)
+public @interface Reactive {
+
+ /**
+ * @return The {@link BackpressureType} to use.
+ */
+ @AliasFor("backpressure")
+ BackpressureType value() default BackpressureType.NONE;
+
+ /**
+ * @return The {@link BackpressureType} to use.
+ */
+ @AliasFor("value")
+ BackpressureType backpressure() default BackpressureType.NONE;
+
+ /**
+ * @return The {@link java.util.function.Consumer} bean name, called as a callback on backpressure.
+ */
+ String consumer() default "";
+
+ /**
+ * @return The {@link BufferOverflowStrategy} which is used
+ * in case of {@link BackpressureType#BUFFER} for the {@link #backpressure()}.
+ */
+ BufferOverflowStrategy bufferOverflowStrategy() default BufferOverflowStrategy.ERROR;
+
+ /**
+ * @return the maximum buffer backlog size before immediate error
+ * in case of {@link BackpressureType#BUFFER} for the {@link #backpressure()}.
+ * Defaults to {@link Integer#MIN_VALUE} meaning {@code unbounded}.
+ */
+ String bufferMaxSize() default "-2147483648";
+
+}
diff --git a/spring-integration-core/src/main/java/org/springframework/integration/annotation/ServiceActivator.java b/spring-integration-core/src/main/java/org/springframework/integration/annotation/ServiceActivator.java
index 4750d7e84a4..fa16d5137df 100644
--- a/spring-integration-core/src/main/java/org/springframework/integration/annotation/ServiceActivator.java
+++ b/spring-integration-core/src/main/java/org/springframework/integration/annotation/ServiceActivator.java
@@ -115,7 +115,17 @@
* ({@link org.springframework.integration.scheduling.PollerMetadata}).
* This attribute is an {@code array} just to allow an empty default (no poller).
* Only one {@link Poller} element is allowed.
+ * Mutually exclusive with {@link #reactive()}.
*/
Poller[] poller() default {};
+ /**
+ * @return the {@link Reactive} options for a reactive endpoint.
+ * This attribute is an {@code array} just to allow an empty default (not reactive).
+ * Only one {@link Reactive} element is allowed.
+ * Mutually exclusive with {@link #poller()}.
+ * @since 5.0
+ */
+ Reactive[] reactive() default { };
+
}
diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlowDefinition.java b/spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlowDefinition.java
index 83d7891a831..2c5486eda4a 100644
--- a/spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlowDefinition.java
+++ b/spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlowDefinition.java
@@ -38,7 +38,6 @@
import org.springframework.integration.channel.ChannelInterceptorAware;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.FixedSubscriberChannel;
-import org.springframework.integration.channel.MessageChannelReactiveUtils;
import org.springframework.integration.channel.ReactiveChannel;
import org.springframework.integration.channel.interceptor.WireTap;
import org.springframework.integration.config.ConsumerEndpointFactoryBean;
@@ -68,6 +67,7 @@
import org.springframework.integration.handler.MessageTriggerAction;
import org.springframework.integration.handler.MethodInvokingMessageProcessor;
import org.springframework.integration.handler.ServiceActivatingHandler;
+import org.springframework.integration.reactive.MessageChannelReactiveUtils;
import org.springframework.integration.router.AbstractMappingMessageRouter;
import org.springframework.integration.router.AbstractMessageRouter;
import org.springframework.integration.router.ExpressionEvaluatingRouter;
diff --git a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/ReactiveConsumer.java b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/ReactiveConsumer.java
index d927d3bb7b9..c8c96919d74 100644
--- a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/ReactiveConsumer.java
+++ b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/ReactiveConsumer.java
@@ -23,8 +23,8 @@
import org.reactivestreams.Subscription;
import org.springframework.context.Lifecycle;
-import org.springframework.integration.channel.MessageChannelReactiveUtils;
import org.springframework.integration.channel.MessagePublishingErrorHandler;
+import org.springframework.integration.reactive.MessageChannelReactiveUtils;
import org.springframework.integration.support.channel.BeanFactoryChannelResolver;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
@@ -36,6 +36,7 @@
import reactor.core.Exceptions;
import reactor.core.Receiver;
import reactor.core.Trackable;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Operators;
@@ -49,7 +50,7 @@ public class ReactiveConsumer extends AbstractEndpoint {
private final Lifecycle lifecycleDelegate;
- private volatile Publisher