Skip to content

Commit 656b7a3

Browse files
committed
INT-4186: Add @reactive to Messaging Annotations
JIRA: https://jira.spring.io/browse/INT-4186 Just a PoC how it would look from the end-user perspective * Add `@Reactive` annotations' option alongside with `@Poller` to allow to use `ReactiveConsumer` underneath with provided backpressure strategy
1 parent 55e7752 commit 656b7a3

File tree

9 files changed

+146
-8
lines changed

9 files changed

+146
-8
lines changed
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Copyright 2017 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.annotation;
18+
19+
import java.lang.annotation.Retention;
20+
import java.lang.annotation.RetentionPolicy;
21+
import java.lang.annotation.Target;
22+
23+
import org.springframework.core.annotation.AliasFor;
24+
import org.springframework.integration.reactive.BackpressureType;
25+
26+
import reactor.core.publisher.BufferOverflowStrategy;
27+
28+
/**
29+
* Provides backpressure options for the Messaging annotations for
30+
* reactive endpoints.
31+
* <p>
32+
* It is an analogue of the XML {@code <reactive/>} element.
33+
* <p>
34+
* Non-reference attributes support Property Placeholder resolutions.
35+
*
36+
* @author Artem Bilan
37+
*
38+
* @since 5.0
39+
*
40+
* @see reactor.core.publisher.Flux#onBackpressureBuffer
41+
* @see reactor.core.publisher.Flux#onBackpressureDrop
42+
* @see reactor.core.publisher.Flux#onBackpressureError
43+
* @see reactor.core.publisher.Flux#onBackpressureLatest
44+
*/
45+
@Target({})
46+
@Retention(RetentionPolicy.RUNTIME)
47+
public @interface Reactive {
48+
49+
/**
50+
* @return The {@link BackpressureType} to use.
51+
*/
52+
@AliasFor("backpressure")
53+
BackpressureType value() default BackpressureType.NONE;
54+
55+
/**
56+
* @return The {@link BackpressureType} to use.
57+
*/
58+
@AliasFor("value")
59+
BackpressureType backpressure() default BackpressureType.NONE;
60+
61+
/**
62+
* @return The {@link java.util.function.Consumer} bean name, called as a callback on backpressure.
63+
*/
64+
String consumer() default "";
65+
66+
/**
67+
* @return The {@link BufferOverflowStrategy} which is used
68+
* in case of {@link BackpressureType#BUFFER} for the {@link #backpressure()}.
69+
*/
70+
BufferOverflowStrategy bufferOverflowStrategy() default BufferOverflowStrategy.ERROR;
71+
72+
/**
73+
* @return the maximum buffer backlog size before immediate error
74+
* in case of {@link BackpressureType#BUFFER} for the {@link #backpressure()}.
75+
* Defaults to {@link Integer#MIN_VALUE} meaning {@code unbounded}.
76+
*/
77+
String bufferMaxSize() default "-2147483648";
78+
79+
}

spring-integration-core/src/main/java/org/springframework/integration/annotation/ServiceActivator.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,17 @@
115115
* ({@link org.springframework.integration.scheduling.PollerMetadata}).
116116
* This attribute is an {@code array} just to allow an empty default (no poller).
117117
* Only one {@link Poller} element is allowed.
118+
* Mutually exclusive with {@link #reactive()}.
118119
*/
119120
Poller[] poller() default {};
120121

122+
/**
123+
* @return the {@link Reactive} options for a reactive endpoint.
124+
* This attribute is an {@code array} just to allow an empty default (not reactive).
125+
* Only one {@link Reactive} element is allowed.
126+
* Mutually exclusive with {@link #poller()}.
127+
* @since 5.0
128+
*/
129+
Reactive[] reactive() default { };
130+
121131
}

spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlowDefinition.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
import org.springframework.integration.channel.ChannelInterceptorAware;
3939
import org.springframework.integration.channel.DirectChannel;
4040
import org.springframework.integration.channel.FixedSubscriberChannel;
41-
import org.springframework.integration.channel.MessageChannelReactiveUtils;
4241
import org.springframework.integration.channel.ReactiveChannel;
4342
import org.springframework.integration.channel.interceptor.WireTap;
4443
import org.springframework.integration.config.ConsumerEndpointFactoryBean;
@@ -68,6 +67,7 @@
6867
import org.springframework.integration.handler.MessageTriggerAction;
6968
import org.springframework.integration.handler.MethodInvokingMessageProcessor;
7069
import org.springframework.integration.handler.ServiceActivatingHandler;
70+
import org.springframework.integration.reactive.MessageChannelReactiveUtils;
7171
import org.springframework.integration.router.AbstractMappingMessageRouter;
7272
import org.springframework.integration.router.AbstractMessageRouter;
7373
import org.springframework.integration.router.ExpressionEvaluatingRouter;

spring-integration-core/src/main/java/org/springframework/integration/endpoint/ReactiveConsumer.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@
2323
import org.reactivestreams.Subscription;
2424

2525
import org.springframework.context.Lifecycle;
26-
import org.springframework.integration.channel.MessageChannelReactiveUtils;
2726
import org.springframework.integration.channel.MessagePublishingErrorHandler;
27+
import org.springframework.integration.reactive.MessageChannelReactiveUtils;
2828
import org.springframework.integration.support.channel.BeanFactoryChannelResolver;
2929
import org.springframework.messaging.Message;
3030
import org.springframework.messaging.MessageChannel;
@@ -36,6 +36,7 @@
3636
import reactor.core.Exceptions;
3737
import reactor.core.Receiver;
3838
import reactor.core.Trackable;
39+
import reactor.core.publisher.Flux;
3940
import reactor.core.publisher.Operators;
4041

4142

@@ -49,7 +50,7 @@ public class ReactiveConsumer extends AbstractEndpoint {
4950

5051
private final Lifecycle lifecycleDelegate;
5152

52-
private volatile Publisher<Message<?>> publisher;
53+
private volatile Flux<Message<?>> publisher;
5354

5455
private ErrorHandler errorHandler;
5556

@@ -67,7 +68,7 @@ public ReactiveConsumer(MessageChannel inputChannel, Subscriber<Message<?>> subs
6768
Assert.notNull(subscriber, "'subscriber' must not be null");
6869

6970
Publisher<?> messagePublisher = MessageChannelReactiveUtils.toPublisher(inputChannel);
70-
this.publisher = (Publisher<Message<?>>) messagePublisher;
71+
this.publisher = Flux.from((Publisher<Message<?>>) messagePublisher);
7172

7273
this.subscriber = new Operators.SubscriberAdapter<Message<?>, Message<?>>(subscriber) {
7374

@@ -104,7 +105,8 @@ protected void doStart() {
104105
if (this.lifecycleDelegate != null) {
105106
this.lifecycleDelegate.start();
106107
}
107-
this.publisher.subscribe(this.subscriber);
108+
this.publisher
109+
.subscribe(this.subscriber);
108110
}
109111

110112
@Override
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright 2017 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.reactive;
18+
19+
/**
20+
* The {@link reactor.core.publisher.Flux} backpressure strategy type.
21+
*
22+
* @author Artem Bilan
23+
*
24+
* @since 5.0
25+
*
26+
* @see org.springframework.integration.annotation.Reactive
27+
* @see reactor.core.publisher.Flux#onBackpressureBuffer
28+
* @see reactor.core.publisher.Flux#onBackpressureDrop
29+
* @see reactor.core.publisher.Flux#onBackpressureError
30+
* @see reactor.core.publisher.Flux#onBackpressureLatest
31+
*/
32+
public enum BackpressureType {
33+
34+
BUFFER, DROP, ERROR, LATEST, NONE
35+
36+
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package org.springframework.integration.channel;
17+
package org.springframework.integration.reactive;
1818

1919
import java.util.Iterator;
2020

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
/**
2+
* Provides utility classes for Reactive Streams integration.
3+
*/
4+
package org.springframework.integration.reactive;

spring-integration-core/src/test/java/org/springframework/integration/channel/reactive/ReactiveChannelTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
import org.springframework.context.annotation.Bean;
3737
import org.springframework.context.annotation.Configuration;
3838
import org.springframework.integration.annotation.ServiceActivator;
39-
import org.springframework.integration.channel.MessageChannelReactiveUtils;
39+
import org.springframework.integration.reactive.MessageChannelReactiveUtils;
4040
import org.springframework.integration.channel.QueueChannel;
4141
import org.springframework.integration.channel.ReactiveChannel;
4242
import org.springframework.integration.config.EnableIntegration;

spring-integration-core/src/test/java/org/springframework/integration/configuration/EnableIntegrationTests.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@
8383
import org.springframework.integration.annotation.MessagingGateway;
8484
import org.springframework.integration.annotation.Poller;
8585
import org.springframework.integration.annotation.Publisher;
86+
import org.springframework.integration.annotation.Reactive;
8687
import org.springframework.integration.annotation.Role;
8788
import org.springframework.integration.annotation.ServiceActivator;
8889
import org.springframework.integration.annotation.Transformer;
@@ -109,6 +110,7 @@
109110
import org.springframework.integration.history.MessageHistory;
110111
import org.springframework.integration.history.MessageHistoryConfigurer;
111112
import org.springframework.integration.json.JsonPropertyAccessor;
113+
import org.springframework.integration.reactive.BackpressureType;
112114
import org.springframework.integration.scheduling.PollerMetadata;
113115
import org.springframework.integration.support.MessageBuilder;
114116
import org.springframework.integration.support.MutableMessageBuilder;
@@ -138,6 +140,7 @@
138140
import org.springframework.util.ClassUtils;
139141
import org.springframework.util.MultiValueMap;
140142

143+
import reactor.core.publisher.BufferOverflowStrategy;
141144
import reactor.core.publisher.Flux;
142145
import reactor.core.publisher.Mono;
143146

@@ -987,7 +990,11 @@ public AtomicReference<Thread> asyncAnnotationProcessThread() {
987990
}
988991

989992
@Bean
990-
@ServiceActivator(inputChannel = "sendAsyncChannel")
993+
@ServiceActivator(inputChannel = "sendAsyncChannel",
994+
reactive = @Reactive(
995+
backpressure = BackpressureType.BUFFER,
996+
bufferOverflowStrategy = BufferOverflowStrategy.ERROR,
997+
bufferMaxSize = "100"))
991998
@Role("foo")
992999
public MessageHandler sendAsyncHandler() {
9931000
return new MessageHandler() {

0 commit comments

Comments
 (0)