Skip to content

Commit 0605332

Browse files
committed
INT-4444: Introduce @Reactive & reactive()
JIRA: https://jira.spring.io/browse/INT-4444 Right now the high-level API creates a `ReactiveStreamsConsumer` only when the input channel is a `Publisher<?>` impl or target handler is a `ReactiveMessageHandler` * Add `@Reactive[] reactive()` attribute to messaging annotations * Add `ConsumerEndpointSpec.reactive()` Both options point to the same `ConsumerEndpointFactoryBean.setReactiveCustomizer()` making the target endpoint always as a `ReactiveStreamsConsumer` independently of the input channel and target handler * Use the `Function` to customize a source `Flux` from the channel * Test and document a new feature
1 parent 9761c7f commit 0605332

22 files changed

+481
-95
lines changed

spring-integration-core/src/main/java/org/springframework/integration/annotation/Aggregator.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -104,8 +104,16 @@
104104
* @return the {@link Poller} options for a polled endpoint
105105
* ({@link org.springframework.integration.scheduling.PollerMetadata}).
106106
* This attribute is an {@code array} just to allow an empty default (no poller).
107-
* Only one {@link Poller} element is allowed.
107+
* Mutually exclusive with {@link #reactive()}.
108108
*/
109109
Poller[] poller() default { };
110110

111+
/**
112+
* @return the {@link Reactive} options for a consumer endpoint.
113+
* This attribute is an {@code array} just to allow an empty default (not reactive).
114+
* Only one {@link Reactive} element is allowed.
115+
* Mutually exclusive with {@link #poller()}.
116+
*/
117+
Reactive[] reactive() default { };
118+
111119
}

spring-integration-core/src/main/java/org/springframework/integration/annotation/BridgeFrom.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2019 the original author or authors.
2+
* Copyright 2014-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -69,8 +69,16 @@
6969
* @return the {@link Poller} options for a polled endpoint
7070
* ({@link org.springframework.integration.scheduling.PollerMetadata}).
7171
* This attribute is an {@code array} just to allow an empty default (no poller).
72-
* Only one {@link Poller} element is allowed.
72+
* Mutually exclusive with {@link #reactive()}.
7373
*/
7474
Poller[] poller() default { };
7575

76+
/**
77+
* @return the {@link Reactive} options for a consumer endpoint.
78+
* This attribute is an {@code array} just to allow an empty default (not reactive).
79+
* Only one {@link Reactive} element is allowed.
80+
* Mutually exclusive with {@link #poller()}.
81+
*/
82+
Reactive[] reactive() default { };
83+
7684
}

spring-integration-core/src/main/java/org/springframework/integration/annotation/BridgeTo.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2019 the original author or authors.
2+
* Copyright 2014-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -50,8 +50,8 @@
5050
public @interface BridgeTo {
5151

5252
/**
53-
* @return the outbound channel name to send the message to
54-
* {@link org.springframework.integration.handler.BridgeHandler}.
53+
* @return the outbound channel name to send the message to for the
54+
* {@link org.springframework.integration.handler.BridgeHandler} reply.
5555
* Optional: when omitted the message is sent to the {@code reply-channel}
5656
* in its headers (if present - an exception is thrown otherwise).
5757
*/
@@ -73,11 +73,19 @@
7373
String phase() default "";
7474

7575
/**
76-
* @return the {@link org.springframework.integration.annotation.Poller} options for a polled endpoint
76+
* @return the {@link Poller} options for a polled endpoint
7777
* ({@link org.springframework.integration.scheduling.PollerMetadata}).
7878
* This attribute is an {@code array} just to allow an empty default (no poller).
79-
* Only one {@link org.springframework.integration.annotation.Poller} element is allowed.
79+
* Mutually exclusive with {@link #reactive()}.
8080
*/
8181
Poller[] poller() default { };
8282

83+
/**
84+
* @return the {@link Reactive} options for a consumer endpoint.
85+
* This attribute is an {@code array} just to allow an empty default (not reactive).
86+
* Only one {@link Reactive} element is allowed.
87+
* Mutually exclusive with {@link #poller()}.
88+
*/
89+
Reactive[] reactive() default { };
90+
8391
}

spring-integration-core/src/main/java/org/springframework/integration/annotation/Filter.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -38,6 +38,7 @@
3838
* @author Mark Fisher
3939
* @author Gary Russell
4040
* @author Artem Bilan
41+
*
4142
* @since 2.0
4243
*/
4344
@Target({ ElementType.METHOD, ElementType.ANNOTATION_TYPE })
@@ -126,8 +127,17 @@
126127
* @return the {@link Poller} options for a polled endpoint
127128
* ({@link org.springframework.integration.scheduling.PollerMetadata}).
128129
* This attribute is an {@code array} just to allow an empty default (no poller).
129-
* Only one {@link Poller} element is allowed.
130+
* Mutually exclusive with {@link #reactive()}.
130131
*/
131132
Poller[] poller() default { };
132133

134+
/**
135+
* @return the {@link Reactive} options for a consumer endpoint.
136+
* This attribute is an {@code array} just to allow an empty default (not reactive).
137+
* Only one {@link Reactive} element is allowed.
138+
* Mutually exclusive with {@link #poller()}.
139+
*/
140+
Reactive[] reactive() default { };
141+
142+
133143
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Copyright 2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.annotation;
18+
19+
import java.lang.annotation.Retention;
20+
import java.lang.annotation.RetentionPolicy;
21+
import java.lang.annotation.Target;
22+
23+
/**
24+
* Provides a reactive configuration options for the consumer endpoint making
25+
* any input channel as a reactive stream source of data.
26+
*
27+
* @author Artem Bilan
28+
*
29+
* @since 5.5
30+
*/
31+
@Target({})
32+
@Retention(RetentionPolicy.RUNTIME)
33+
public @interface Reactive {
34+
35+
/**
36+
* @return the function bean name to be used in the
37+
* {@link reactor.core.publisher.Flux#transform} on the input channel
38+
* {@link reactor.core.publisher.Flux}.
39+
*/
40+
String value() default "";
41+
42+
}

spring-integration-core/src/main/java/org/springframework/integration/annotation/Router.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -150,8 +150,16 @@
150150
* @return the {@link Poller} options for a polled endpoint
151151
* ({@link org.springframework.integration.scheduling.PollerMetadata}).
152152
* This attribute is an {@code array} just to allow an empty default (no poller).
153-
* Only one {@link Poller} element is allowed.
153+
* Mutually exclusive with {@link #reactive()}.
154154
*/
155155
Poller[] poller() default { };
156156

157+
/**
158+
* @return the {@link Reactive} options for a consumer endpoint.
159+
* This attribute is an {@code array} just to allow an empty default (not reactive).
160+
* Only one {@link Reactive} element is allowed.
161+
* Mutually exclusive with {@link #poller()}.
162+
*/
163+
Reactive[] reactive() default { };
164+
157165
}

spring-integration-core/src/main/java/org/springframework/integration/annotation/ServiceActivator.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -121,7 +121,16 @@
121121
* ({@link org.springframework.integration.scheduling.PollerMetadata}).
122122
* This attribute is an {@code array} just to allow an empty default (no poller).
123123
* Only one {@link Poller} element is allowed.
124+
* Mutually exclusive with {@link #reactive()}.
124125
*/
125126
Poller[] poller() default { };
126127

128+
/**
129+
* @return the {@link Reactive} options for a consumer endpoint.
130+
* This attribute is an {@code array} just to allow an empty default (not reactive).
131+
* Only one {@link Reactive} element is allowed.
132+
* Mutually exclusive with {@link #poller()}.
133+
*/
134+
Reactive[] reactive() default { };
135+
127136
}

spring-integration-core/src/main/java/org/springframework/integration/annotation/Splitter.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -115,8 +115,16 @@
115115
* @return the {@link Poller} options for a polled endpoint
116116
* ({@link org.springframework.integration.scheduling.PollerMetadata}).
117117
* This attribute is an {@code array} just to allow an empty default (no poller).
118-
* Only one {@link Poller} element is allowed.
118+
* Mutually exclusive with {@link #reactive()}.
119119
*/
120120
Poller[] poller() default { };
121121

122+
/**
123+
* @return the {@link Reactive} options for a consumer endpoint.
124+
* This attribute is an {@code array} just to allow an empty default (not reactive).
125+
* Only one {@link Reactive} element is allowed.
126+
* Mutually exclusive with {@link #poller()}.
127+
*/
128+
Reactive[] reactive() default { };
129+
122130
}

spring-integration-core/src/main/java/org/springframework/integration/annotation/Transformer.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -91,8 +91,16 @@
9191
* @return the {@link Poller} options for a polled endpoint
9292
* ({@link org.springframework.integration.scheduling.PollerMetadata}).
9393
* This attribute is an {@code array} just to allow an empty default (no poller).
94-
* Only one {@link Poller} element is allowed.
94+
* Mutually exclusive with {@link #reactive()}.
9595
*/
9696
Poller[] poller() default { };
9797

98+
/**
99+
* @return the {@link Reactive} options for a consumer endpoint.
100+
* This attribute is an {@code array} just to allow an empty default (not reactive).
101+
* Only one {@link Reactive} element is allowed.
102+
* Mutually exclusive with {@link #poller()}.
103+
*/
104+
Reactive[] reactive() default { };
105+
98106
}

0 commit comments

Comments
 (0)