Skip to content

Commit e9f2346

Browse files
authored
INT-4444: Introduce @Reactive & reactive() (#3503)
* INT-4444: Introduce `@Reactive` & `reactive()` JIRA: https://jira.spring.io/browse/INT-4444 Right now the high-level API creates a `ReactiveStreamsConsumer` only when the input channel is a `Publisher<?>` impl or target handler is a `ReactiveMessageHandler` * Add `@Reactive[] reactive()` attribute to messaging annotations * Add `ConsumerEndpointSpec.reactive()` Both options point to the same `ConsumerEndpointFactoryBean.setReactiveCustomizer()` making the target endpoint always as a `ReactiveStreamsConsumer` independently of the input channel and target handler * Use the `Function` to customize a source `Flux` from the channel * Test and document a new feature * * Fix links in docs * * Fix `ReactiveStreamsTests` * * Rework `reactive()` attribute of messaging annotations ot a single `@Reactive` value with default as `@Reactive(ValueConstants.DEFAULT_NONE)` * Fix language in docs * Fix `MessagingAnnotationUtils.resolveAttribute()` to use `requiredType.isInstance()` instead of comparing classes since annotation instances are `Proxy` at runtime
1 parent f154088 commit e9f2346

23 files changed

+448
-98
lines changed

spring-integration-core/src/main/java/org/springframework/integration/annotation/Aggregator.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -22,6 +22,8 @@
2222
import java.lang.annotation.RetentionPolicy;
2323
import java.lang.annotation.Target;
2424

25+
import org.springframework.messaging.handler.annotation.ValueConstants;
26+
2527
/**
2628
* Indicates that a method is capable of aggregating messages.
2729
* <p>
@@ -104,8 +106,15 @@
104106
* @return the {@link Poller} options for a polled endpoint
105107
* ({@link org.springframework.integration.scheduling.PollerMetadata}).
106108
* This attribute is an {@code array} just to allow an empty default (no poller).
107-
* Only one {@link Poller} element is allowed.
109+
* Mutually exclusive with {@link #reactive()}.
108110
*/
109111
Poller[] poller() default { };
110112

113+
/**
114+
* @return the {@link Reactive} marker for a consumer endpoint.
115+
* Mutually exclusive with {@link #poller()}.
116+
* @since 5.5
117+
*/
118+
Reactive reactive() default @Reactive(ValueConstants.DEFAULT_NONE);
119+
111120
}

spring-integration-core/src/main/java/org/springframework/integration/annotation/BridgeFrom.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2019 the original author or authors.
2+
* Copyright 2014-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -22,6 +22,8 @@
2222
import java.lang.annotation.RetentionPolicy;
2323
import java.lang.annotation.Target;
2424

25+
import org.springframework.messaging.handler.annotation.ValueConstants;
26+
2527
/**
2628
* Messaging Annotation to mark a {@link org.springframework.context.annotation.Bean}
2729
* method for a {@link org.springframework.messaging.MessageChannel} to produce a
@@ -69,8 +71,15 @@
6971
* @return the {@link Poller} options for a polled endpoint
7072
* ({@link org.springframework.integration.scheduling.PollerMetadata}).
7173
* This attribute is an {@code array} just to allow an empty default (no poller).
72-
* Only one {@link Poller} element is allowed.
74+
* Mutually exclusive with {@link #reactive()}.
7375
*/
7476
Poller[] poller() default { };
7577

78+
/**
79+
* @return the {@link Reactive} marker for a consumer endpoint.
80+
* Mutually exclusive with {@link #poller()}.
81+
* @since 5.5
82+
*/
83+
Reactive reactive() default @Reactive(ValueConstants.DEFAULT_NONE);
84+
7685
}

spring-integration-core/src/main/java/org/springframework/integration/annotation/BridgeTo.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2019 the original author or authors.
2+
* Copyright 2014-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -22,6 +22,8 @@
2222
import java.lang.annotation.RetentionPolicy;
2323
import java.lang.annotation.Target;
2424

25+
import org.springframework.messaging.handler.annotation.ValueConstants;
26+
2527
/**
2628
* Messaging Annotation to mark a {@link org.springframework.context.annotation.Bean}
2729
* method for a {@link org.springframework.messaging.MessageChannel} to produce a
@@ -50,8 +52,8 @@
5052
public @interface BridgeTo {
5153

5254
/**
53-
* @return the outbound channel name to send the message to
54-
* {@link org.springframework.integration.handler.BridgeHandler}.
55+
* @return the outbound channel name to send the message to for the
56+
* {@link org.springframework.integration.handler.BridgeHandler} reply.
5557
* Optional: when omitted the message is sent to the {@code reply-channel}
5658
* in its headers (if present - an exception is thrown otherwise).
5759
*/
@@ -73,11 +75,18 @@
7375
String phase() default "";
7476

7577
/**
76-
* @return the {@link org.springframework.integration.annotation.Poller} options for a polled endpoint
78+
* @return the {@link Poller} options for a polled endpoint
7779
* ({@link org.springframework.integration.scheduling.PollerMetadata}).
7880
* This attribute is an {@code array} just to allow an empty default (no poller).
79-
* Only one {@link org.springframework.integration.annotation.Poller} element is allowed.
81+
* Mutually exclusive with {@link #reactive()}.
8082
*/
8183
Poller[] poller() default { };
8284

85+
/**
86+
* @return the {@link Reactive} marker for a consumer endpoint.
87+
* Mutually exclusive with {@link #poller()}.
88+
* @since 5.5
89+
*/
90+
Reactive reactive() default @Reactive(ValueConstants.DEFAULT_NONE);
91+
8392
}

spring-integration-core/src/main/java/org/springframework/integration/annotation/Filter.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -22,6 +22,8 @@
2222
import java.lang.annotation.RetentionPolicy;
2323
import java.lang.annotation.Target;
2424

25+
import org.springframework.messaging.handler.annotation.ValueConstants;
26+
2527
/**
2628
* Indicates that a method is capable of playing the role of a Message Filter.
2729
* <p>
@@ -38,6 +40,7 @@
3840
* @author Mark Fisher
3941
* @author Gary Russell
4042
* @author Artem Bilan
43+
*
4144
* @since 2.0
4245
*/
4346
@Target({ ElementType.METHOD, ElementType.ANNOTATION_TYPE })
@@ -126,8 +129,15 @@
126129
* @return the {@link Poller} options for a polled endpoint
127130
* ({@link org.springframework.integration.scheduling.PollerMetadata}).
128131
* This attribute is an {@code array} just to allow an empty default (no poller).
129-
* Only one {@link Poller} element is allowed.
132+
* Mutually exclusive with {@link #reactive()}.
130133
*/
131134
Poller[] poller() default { };
132135

136+
/**
137+
* @return the {@link Reactive} marker for a consumer endpoint.
138+
* Mutually exclusive with {@link #poller()}.
139+
* @since 5.5
140+
*/
141+
Reactive reactive() default @Reactive(ValueConstants.DEFAULT_NONE);
142+
133143
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Copyright 2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.annotation;
18+
19+
import java.lang.annotation.Retention;
20+
import java.lang.annotation.RetentionPolicy;
21+
import java.lang.annotation.Target;
22+
23+
/**
24+
* Provides reactive configuration options for the consumer endpoint making
25+
* any input channel as a reactive stream source of data.
26+
*
27+
* @author Artem Bilan
28+
*
29+
* @since 5.5
30+
*/
31+
@Target({})
32+
@Retention(RetentionPolicy.RUNTIME)
33+
public @interface Reactive {
34+
35+
/**
36+
* @return the function bean name to be used in the
37+
* {@link reactor.core.publisher.Flux#transform} on the input channel
38+
* {@link reactor.core.publisher.Flux}.
39+
*/
40+
String value() default "";
41+
42+
}

spring-integration-core/src/main/java/org/springframework/integration/annotation/Router.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -22,6 +22,8 @@
2222
import java.lang.annotation.RetentionPolicy;
2323
import java.lang.annotation.Target;
2424

25+
import org.springframework.messaging.handler.annotation.ValueConstants;
26+
2527
/**
2628
* Indicates that a method is capable of resolving to a channel or channel name
2729
* based on a message, message header(s), or both.
@@ -150,8 +152,15 @@
150152
* @return the {@link Poller} options for a polled endpoint
151153
* ({@link org.springframework.integration.scheduling.PollerMetadata}).
152154
* This attribute is an {@code array} just to allow an empty default (no poller).
153-
* Only one {@link Poller} element is allowed.
155+
* Mutually exclusive with {@link #reactive()}.
154156
*/
155157
Poller[] poller() default { };
156158

159+
/**
160+
* @return the {@link Reactive} marker for a consumer endpoint.
161+
* Mutually exclusive with {@link #poller()}.
162+
* @since 5.5
163+
*/
164+
Reactive reactive() default @Reactive(ValueConstants.DEFAULT_NONE);
165+
157166
}

spring-integration-core/src/main/java/org/springframework/integration/annotation/ServiceActivator.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -22,6 +22,8 @@
2222
import java.lang.annotation.RetentionPolicy;
2323
import java.lang.annotation.Target;
2424

25+
import org.springframework.messaging.handler.annotation.ValueConstants;
26+
2527
/**
2628
* Indicates that a method is capable of handling a message or message payload.
2729
* <p>
@@ -121,7 +123,15 @@
121123
* ({@link org.springframework.integration.scheduling.PollerMetadata}).
122124
* This attribute is an {@code array} just to allow an empty default (no poller).
123125
* Only one {@link Poller} element is allowed.
126+
* Mutually exclusive with {@link #reactive()}.
124127
*/
125128
Poller[] poller() default { };
126129

130+
/**
131+
* @return the {@link Reactive} marker for a consumer endpoint.
132+
* Mutually exclusive with {@link #poller()}.
133+
* @since 5.5
134+
*/
135+
Reactive reactive() default @Reactive(ValueConstants.DEFAULT_NONE);
136+
127137
}

spring-integration-core/src/main/java/org/springframework/integration/annotation/Splitter.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -22,6 +22,8 @@
2222
import java.lang.annotation.RetentionPolicy;
2323
import java.lang.annotation.Target;
2424

25+
import org.springframework.messaging.handler.annotation.ValueConstants;
26+
2527
/**
2628
* Indicates that a method is capable of splitting a single message or message
2729
* payload to produce multiple messages or payloads.
@@ -115,8 +117,15 @@
115117
* @return the {@link Poller} options for a polled endpoint
116118
* ({@link org.springframework.integration.scheduling.PollerMetadata}).
117119
* This attribute is an {@code array} just to allow an empty default (no poller).
118-
* Only one {@link Poller} element is allowed.
120+
* Mutually exclusive with {@link #reactive()}.
119121
*/
120122
Poller[] poller() default { };
121123

124+
/**
125+
* @return the {@link Reactive} marker for a consumer endpoint.
126+
* Mutually exclusive with {@link #poller()}.
127+
* @since 5.5
128+
*/
129+
Reactive reactive() default @Reactive(ValueConstants.DEFAULT_NONE);
130+
122131
}

spring-integration-core/src/main/java/org/springframework/integration/annotation/Transformer.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -22,6 +22,8 @@
2222
import java.lang.annotation.RetentionPolicy;
2323
import java.lang.annotation.Target;
2424

25+
import org.springframework.messaging.handler.annotation.ValueConstants;
26+
2527
/**
2628
* Indicates that a method is capable of transforming a message, message header,
2729
* or message payload.
@@ -91,8 +93,15 @@
9193
* @return the {@link Poller} options for a polled endpoint
9294
* ({@link org.springframework.integration.scheduling.PollerMetadata}).
9395
* This attribute is an {@code array} just to allow an empty default (no poller).
94-
* Only one {@link Poller} element is allowed.
96+
* Mutually exclusive with {@link #reactive()}.
9597
*/
9698
Poller[] poller() default { };
9799

100+
/**
101+
* @return the {@link Reactive} marker for a consumer endpoint.
102+
* Mutually exclusive with {@link #poller()}.
103+
* @since 5.5
104+
*/
105+
Reactive reactive() default @Reactive(ValueConstants.DEFAULT_NONE);
106+
98107
}

0 commit comments

Comments
 (0)