diff --git a/spring-integration-core/src/main/java/org/springframework/integration/annotation/Reactive.java b/spring-integration-core/src/main/java/org/springframework/integration/annotation/Reactive.java new file mode 100644 index 00000000000..9fdd8958cd1 --- /dev/null +++ b/spring-integration-core/src/main/java/org/springframework/integration/annotation/Reactive.java @@ -0,0 +1,79 @@ +/* + * Copyright 2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.annotation; + +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +import org.springframework.core.annotation.AliasFor; +import org.springframework.integration.reactive.BackpressureType; + +import reactor.core.publisher.BufferOverflowStrategy; + +/** + * Provides backpressure options for the Messaging annotations for + * reactive endpoints. + *

+ * It is an analogue of the XML {@code } element. + *

+ * Non-reference attributes support Property Placeholder resolutions. + * + * @author Artem Bilan + * + * @since 5.0 + * + * @see reactor.core.publisher.Flux#onBackpressureBuffer + * @see reactor.core.publisher.Flux#onBackpressureDrop + * @see reactor.core.publisher.Flux#onBackpressureError + * @see reactor.core.publisher.Flux#onBackpressureLatest + */ +@Target({}) +@Retention(RetentionPolicy.RUNTIME) +public @interface Reactive { + + /** + * @return The {@link BackpressureType} to use. + */ + @AliasFor("backpressure") + BackpressureType value() default BackpressureType.NONE; + + /** + * @return The {@link BackpressureType} to use. + */ + @AliasFor("value") + BackpressureType backpressure() default BackpressureType.NONE; + + /** + * @return The {@link java.util.function.Consumer} bean name, called as a callback on backpressure. + */ + String consumer() default ""; + + /** + * @return The {@link BufferOverflowStrategy} which is used + * in case of {@link BackpressureType#BUFFER} for the {@link #backpressure()}. + */ + BufferOverflowStrategy bufferOverflowStrategy() default BufferOverflowStrategy.ERROR; + + /** + * @return the maximum buffer backlog size before immediate error + * in case of {@link BackpressureType#BUFFER} for the {@link #backpressure()}. + * Defaults to {@link Integer#MIN_VALUE} meaning {@code unbounded}. + */ + String bufferMaxSize() default "-2147483648"; + +} diff --git a/spring-integration-core/src/main/java/org/springframework/integration/annotation/ServiceActivator.java b/spring-integration-core/src/main/java/org/springframework/integration/annotation/ServiceActivator.java index 4750d7e84a4..fa16d5137df 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/annotation/ServiceActivator.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/annotation/ServiceActivator.java @@ -115,7 +115,17 @@ * ({@link org.springframework.integration.scheduling.PollerMetadata}). * This attribute is an {@code array} just to allow an empty default (no poller). * Only one {@link Poller} element is allowed. + * Mutually exclusive with {@link #reactive()}. */ Poller[] poller() default {}; + /** + * @return the {@link Reactive} options for a reactive endpoint. + * This attribute is an {@code array} just to allow an empty default (not reactive). + * Only one {@link Reactive} element is allowed. + * Mutually exclusive with {@link #poller()}. + * @since 5.0 + */ + Reactive[] reactive() default { }; + } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlowDefinition.java b/spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlowDefinition.java index 83d7891a831..2c5486eda4a 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlowDefinition.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlowDefinition.java @@ -38,7 +38,6 @@ import org.springframework.integration.channel.ChannelInterceptorAware; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.channel.FixedSubscriberChannel; -import org.springframework.integration.channel.MessageChannelReactiveUtils; import org.springframework.integration.channel.ReactiveChannel; import org.springframework.integration.channel.interceptor.WireTap; import org.springframework.integration.config.ConsumerEndpointFactoryBean; @@ -68,6 +67,7 @@ import org.springframework.integration.handler.MessageTriggerAction; import org.springframework.integration.handler.MethodInvokingMessageProcessor; import org.springframework.integration.handler.ServiceActivatingHandler; +import org.springframework.integration.reactive.MessageChannelReactiveUtils; import org.springframework.integration.router.AbstractMappingMessageRouter; import org.springframework.integration.router.AbstractMessageRouter; import org.springframework.integration.router.ExpressionEvaluatingRouter; diff --git a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/ReactiveConsumer.java b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/ReactiveConsumer.java index d927d3bb7b9..c8c96919d74 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/ReactiveConsumer.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/ReactiveConsumer.java @@ -23,8 +23,8 @@ import org.reactivestreams.Subscription; import org.springframework.context.Lifecycle; -import org.springframework.integration.channel.MessageChannelReactiveUtils; import org.springframework.integration.channel.MessagePublishingErrorHandler; +import org.springframework.integration.reactive.MessageChannelReactiveUtils; import org.springframework.integration.support.channel.BeanFactoryChannelResolver; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; @@ -36,6 +36,7 @@ import reactor.core.Exceptions; import reactor.core.Receiver; import reactor.core.Trackable; +import reactor.core.publisher.Flux; import reactor.core.publisher.Operators; @@ -49,7 +50,7 @@ public class ReactiveConsumer extends AbstractEndpoint { private final Lifecycle lifecycleDelegate; - private volatile Publisher> publisher; + private volatile Flux> publisher; private ErrorHandler errorHandler; @@ -67,7 +68,7 @@ public ReactiveConsumer(MessageChannel inputChannel, Subscriber> subs Assert.notNull(subscriber, "'subscriber' must not be null"); Publisher messagePublisher = MessageChannelReactiveUtils.toPublisher(inputChannel); - this.publisher = (Publisher>) messagePublisher; + this.publisher = Flux.from((Publisher>) messagePublisher); this.subscriber = new Operators.SubscriberAdapter, Message>(subscriber) { @@ -104,7 +105,8 @@ protected void doStart() { if (this.lifecycleDelegate != null) { this.lifecycleDelegate.start(); } - this.publisher.subscribe(this.subscriber); + this.publisher + .subscribe(this.subscriber); } @Override diff --git a/spring-integration-core/src/main/java/org/springframework/integration/reactive/BackpressureType.java b/spring-integration-core/src/main/java/org/springframework/integration/reactive/BackpressureType.java new file mode 100644 index 00000000000..6ada0a5dd23 --- /dev/null +++ b/spring-integration-core/src/main/java/org/springframework/integration/reactive/BackpressureType.java @@ -0,0 +1,36 @@ +/* + * Copyright 2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.reactive; + +/** + * The {@link reactor.core.publisher.Flux} backpressure strategy type. + * + * @author Artem Bilan + * + * @since 5.0 + * + * @see org.springframework.integration.annotation.Reactive + * @see reactor.core.publisher.Flux#onBackpressureBuffer + * @see reactor.core.publisher.Flux#onBackpressureDrop + * @see reactor.core.publisher.Flux#onBackpressureError + * @see reactor.core.publisher.Flux#onBackpressureLatest + */ +public enum BackpressureType { + + BUFFER, DROP, ERROR, LATEST, NONE + +} diff --git a/spring-integration-core/src/main/java/org/springframework/integration/channel/MessageChannelReactiveUtils.java b/spring-integration-core/src/main/java/org/springframework/integration/reactive/MessageChannelReactiveUtils.java similarity index 98% rename from spring-integration-core/src/main/java/org/springframework/integration/channel/MessageChannelReactiveUtils.java rename to spring-integration-core/src/main/java/org/springframework/integration/reactive/MessageChannelReactiveUtils.java index 506c64bfa79..ddc42024413 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/channel/MessageChannelReactiveUtils.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/reactive/MessageChannelReactiveUtils.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.integration.channel; +package org.springframework.integration.reactive; import java.util.Iterator; diff --git a/spring-integration-core/src/main/java/org/springframework/integration/reactive/package-info.java b/spring-integration-core/src/main/java/org/springframework/integration/reactive/package-info.java new file mode 100644 index 00000000000..47e3d360fbb --- /dev/null +++ b/spring-integration-core/src/main/java/org/springframework/integration/reactive/package-info.java @@ -0,0 +1,4 @@ +/** + * Provides utility classes for Reactive Streams integration. + */ +package org.springframework.integration.reactive; diff --git a/spring-integration-core/src/test/java/org/springframework/integration/channel/reactive/ReactiveChannelTests.java b/spring-integration-core/src/test/java/org/springframework/integration/channel/reactive/ReactiveChannelTests.java index 5baa1ff17ec..cf9a3a00faa 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/channel/reactive/ReactiveChannelTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/channel/reactive/ReactiveChannelTests.java @@ -36,7 +36,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; -import org.springframework.integration.channel.MessageChannelReactiveUtils; +import org.springframework.integration.reactive.MessageChannelReactiveUtils; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.channel.ReactiveChannel; import org.springframework.integration.config.EnableIntegration; diff --git a/spring-integration-core/src/test/java/org/springframework/integration/configuration/EnableIntegrationTests.java b/spring-integration-core/src/test/java/org/springframework/integration/configuration/EnableIntegrationTests.java index 1d4dd14b3e9..50b703257b3 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/configuration/EnableIntegrationTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/configuration/EnableIntegrationTests.java @@ -83,6 +83,7 @@ import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.annotation.Poller; import org.springframework.integration.annotation.Publisher; +import org.springframework.integration.annotation.Reactive; import org.springframework.integration.annotation.Role; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.annotation.Transformer; @@ -109,6 +110,7 @@ import org.springframework.integration.history.MessageHistory; import org.springframework.integration.history.MessageHistoryConfigurer; import org.springframework.integration.json.JsonPropertyAccessor; +import org.springframework.integration.reactive.BackpressureType; import org.springframework.integration.scheduling.PollerMetadata; import org.springframework.integration.support.MessageBuilder; import org.springframework.integration.support.MutableMessageBuilder; @@ -138,6 +140,7 @@ import org.springframework.util.ClassUtils; import org.springframework.util.MultiValueMap; +import reactor.core.publisher.BufferOverflowStrategy; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -987,7 +990,11 @@ public AtomicReference asyncAnnotationProcessThread() { } @Bean - @ServiceActivator(inputChannel = "sendAsyncChannel") + @ServiceActivator(inputChannel = "sendAsyncChannel", + reactive = @Reactive( + backpressure = BackpressureType.BUFFER, + bufferOverflowStrategy = BufferOverflowStrategy.ERROR, + bufferMaxSize = "100")) @Role("foo") public MessageHandler sendAsyncHandler() { return new MessageHandler() {