Skip to content

Commit 564b749

Browse files
committed
* Rework reactive() attribute of messaging annotations ot a single @Reactive value
with default as `@Reactive(ValueConstants.DEFAULT_NONE)` * Fix language in docs * Fix `MessagingAnnotationUtils.resolveAttribute()` to use `requiredType.isInstance()` instead of comparing classes since annotation instances are `Proxy` at runtime
1 parent 3846084 commit 564b749

File tree

12 files changed

+57
-62
lines changed

12 files changed

+57
-62
lines changed

spring-integration-core/src/main/java/org/springframework/integration/annotation/Aggregator.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import java.lang.annotation.RetentionPolicy;
2323
import java.lang.annotation.Target;
2424

25+
import org.springframework.messaging.handler.annotation.ValueConstants;
26+
2527
/**
2628
* Indicates that a method is capable of aggregating messages.
2729
* <p>
@@ -109,11 +111,10 @@
109111
Poller[] poller() default { };
110112

111113
/**
112-
* @return the {@link Reactive} options for a consumer endpoint.
113-
* This attribute is an {@code array} just to allow an empty default (not reactive).
114-
* Only one {@link Reactive} element is allowed.
114+
* @return the {@link Reactive} marker for a consumer endpoint.
115115
* Mutually exclusive with {@link #poller()}.
116+
* @since 5.5
116117
*/
117-
Reactive[] reactive() default { };
118+
Reactive reactive() default @Reactive(ValueConstants.DEFAULT_NONE);
118119

119120
}

spring-integration-core/src/main/java/org/springframework/integration/annotation/BridgeFrom.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import java.lang.annotation.RetentionPolicy;
2323
import java.lang.annotation.Target;
2424

25+
import org.springframework.messaging.handler.annotation.ValueConstants;
26+
2527
/**
2628
* Messaging Annotation to mark a {@link org.springframework.context.annotation.Bean}
2729
* method for a {@link org.springframework.messaging.MessageChannel} to produce a
@@ -74,11 +76,10 @@
7476
Poller[] poller() default { };
7577

7678
/**
77-
* @return the {@link Reactive} options for a consumer endpoint.
78-
* This attribute is an {@code array} just to allow an empty default (not reactive).
79-
* Only one {@link Reactive} element is allowed.
79+
* @return the {@link Reactive} marker for a consumer endpoint.
8080
* Mutually exclusive with {@link #poller()}.
81+
* @since 5.5
8182
*/
82-
Reactive[] reactive() default { };
83+
Reactive reactive() default @Reactive(ValueConstants.DEFAULT_NONE);
8384

8485
}

spring-integration-core/src/main/java/org/springframework/integration/annotation/BridgeTo.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import java.lang.annotation.RetentionPolicy;
2323
import java.lang.annotation.Target;
2424

25+
import org.springframework.messaging.handler.annotation.ValueConstants;
26+
2527
/**
2628
* Messaging Annotation to mark a {@link org.springframework.context.annotation.Bean}
2729
* method for a {@link org.springframework.messaging.MessageChannel} to produce a
@@ -81,11 +83,10 @@
8183
Poller[] poller() default { };
8284

8385
/**
84-
* @return the {@link Reactive} options for a consumer endpoint.
85-
* This attribute is an {@code array} just to allow an empty default (not reactive).
86-
* Only one {@link Reactive} element is allowed.
86+
* @return the {@link Reactive} marker for a consumer endpoint.
8787
* Mutually exclusive with {@link #poller()}.
88+
* @since 5.5
8889
*/
89-
Reactive[] reactive() default { };
90+
Reactive reactive() default @Reactive(ValueConstants.DEFAULT_NONE);
9091

9192
}

spring-integration-core/src/main/java/org/springframework/integration/annotation/Filter.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import java.lang.annotation.RetentionPolicy;
2323
import java.lang.annotation.Target;
2424

25+
import org.springframework.messaging.handler.annotation.ValueConstants;
26+
2527
/**
2628
* Indicates that a method is capable of playing the role of a Message Filter.
2729
* <p>
@@ -132,12 +134,10 @@
132134
Poller[] poller() default { };
133135

134136
/**
135-
* @return the {@link Reactive} options for a consumer endpoint.
136-
* This attribute is an {@code array} just to allow an empty default (not reactive).
137-
* Only one {@link Reactive} element is allowed.
137+
* @return the {@link Reactive} marker for a consumer endpoint.
138138
* Mutually exclusive with {@link #poller()}.
139+
* @since 5.5
139140
*/
140-
Reactive[] reactive() default { };
141-
141+
Reactive reactive() default @Reactive(ValueConstants.DEFAULT_NONE);
142142

143143
}

spring-integration-core/src/main/java/org/springframework/integration/annotation/Reactive.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import java.lang.annotation.Target;
2222

2323
/**
24-
* Provides a reactive configuration options for the consumer endpoint making
24+
* Provides reactive configuration options for the consumer endpoint making
2525
* any input channel as a reactive stream source of data.
2626
*
2727
* @author Artem Bilan

spring-integration-core/src/main/java/org/springframework/integration/annotation/Router.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import java.lang.annotation.RetentionPolicy;
2323
import java.lang.annotation.Target;
2424

25+
import org.springframework.messaging.handler.annotation.ValueConstants;
26+
2527
/**
2628
* Indicates that a method is capable of resolving to a channel or channel name
2729
* based on a message, message header(s), or both.
@@ -155,11 +157,10 @@
155157
Poller[] poller() default { };
156158

157159
/**
158-
* @return the {@link Reactive} options for a consumer endpoint.
159-
* This attribute is an {@code array} just to allow an empty default (not reactive).
160-
* Only one {@link Reactive} element is allowed.
160+
* @return the {@link Reactive} marker for a consumer endpoint.
161161
* Mutually exclusive with {@link #poller()}.
162+
* @since 5.5
162163
*/
163-
Reactive[] reactive() default { };
164+
Reactive reactive() default @Reactive(ValueConstants.DEFAULT_NONE);
164165

165166
}

spring-integration-core/src/main/java/org/springframework/integration/annotation/ServiceActivator.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import java.lang.annotation.RetentionPolicy;
2323
import java.lang.annotation.Target;
2424

25+
import org.springframework.messaging.handler.annotation.ValueConstants;
26+
2527
/**
2628
* Indicates that a method is capable of handling a message or message payload.
2729
* <p>
@@ -126,11 +128,10 @@
126128
Poller[] poller() default { };
127129

128130
/**
129-
* @return the {@link Reactive} options for a consumer endpoint.
130-
* This attribute is an {@code array} just to allow an empty default (not reactive).
131-
* Only one {@link Reactive} element is allowed.
131+
* @return the {@link Reactive} marker for a consumer endpoint.
132132
* Mutually exclusive with {@link #poller()}.
133+
* @since 5.5
133134
*/
134-
Reactive[] reactive() default { };
135+
Reactive reactive() default @Reactive(ValueConstants.DEFAULT_NONE);
135136

136137
}

spring-integration-core/src/main/java/org/springframework/integration/annotation/Splitter.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import java.lang.annotation.RetentionPolicy;
2323
import java.lang.annotation.Target;
2424

25+
import org.springframework.messaging.handler.annotation.ValueConstants;
26+
2527
/**
2628
* Indicates that a method is capable of splitting a single message or message
2729
* payload to produce multiple messages or payloads.
@@ -120,11 +122,10 @@
120122
Poller[] poller() default { };
121123

122124
/**
123-
* @return the {@link Reactive} options for a consumer endpoint.
124-
* This attribute is an {@code array} just to allow an empty default (not reactive).
125-
* Only one {@link Reactive} element is allowed.
125+
* @return the {@link Reactive} marker for a consumer endpoint.
126126
* Mutually exclusive with {@link #poller()}.
127+
* @since 5.5
127128
*/
128-
Reactive[] reactive() default { };
129+
Reactive reactive() default @Reactive(ValueConstants.DEFAULT_NONE);
129130

130131
}

spring-integration-core/src/main/java/org/springframework/integration/annotation/Transformer.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import java.lang.annotation.RetentionPolicy;
2323
import java.lang.annotation.Target;
2424

25+
import org.springframework.messaging.handler.annotation.ValueConstants;
26+
2527
/**
2628
* Indicates that a method is capable of transforming a message, message header,
2729
* or message payload.
@@ -96,11 +98,10 @@
9698
Poller[] poller() default { };
9799

98100
/**
99-
* @return the {@link Reactive} options for a consumer endpoint.
100-
* This attribute is an {@code array} just to allow an empty default (not reactive).
101-
* Only one {@link Reactive} element is allowed.
101+
* @return the {@link Reactive} marker for a consumer endpoint.
102102
* Mutually exclusive with {@link #poller()}.
103+
* @since 5.5
103104
*/
104-
Reactive[] reactive() default { };
105+
Reactive reactive() default @Reactive(ValueConstants.DEFAULT_NONE);
105106

106107
}

spring-integration-core/src/main/java/org/springframework/integration/config/annotation/AbstractMethodAnnotationPostProcessor.java

Lines changed: 10 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@
8383
import org.springframework.messaging.SubscribableChannel;
8484
import org.springframework.messaging.core.DestinationResolutionException;
8585
import org.springframework.messaging.core.DestinationResolver;
86+
import org.springframework.messaging.handler.annotation.ValueConstants;
8687
import org.springframework.scheduling.Trigger;
8788
import org.springframework.scheduling.support.CronTrigger;
8889
import org.springframework.scheduling.support.PeriodicTrigger;
@@ -376,41 +377,34 @@ protected AbstractEndpoint createEndpoint(MessageHandler handler, @SuppressWarni
376377
protected AbstractEndpoint doCreateEndpoint(MessageHandler handler, MessageChannel inputChannel,
377378
List<Annotation> annotations) {
378379

379-
AbstractEndpoint endpoint;
380-
381-
382380
Poller[] pollers = MessagingAnnotationUtils.resolveAttribute(annotations, "poller", Poller[].class);
383-
Reactive[] reactive = MessagingAnnotationUtils.resolveAttribute(annotations, "reactive", Reactive[].class);
381+
Reactive reactive = MessagingAnnotationUtils.resolveAttribute(annotations, "reactive", Reactive.class);
382+
boolean reactiveProvided = reactive != null && !ValueConstants.DEFAULT_NONE.equals(reactive.value());
384383

385-
Assert.state(ObjectUtils.isEmpty(reactive) || ObjectUtils.isEmpty(pollers),
384+
Assert.state(!reactiveProvided || ObjectUtils.isEmpty(pollers),
386385
"The 'poller' and 'reactive' are mutually exclusive.");
387386

388-
if (inputChannel instanceof Publisher ||
389-
handler instanceof ReactiveMessageHandlerAdapter ||
390-
!ObjectUtils.isEmpty(reactive)) {
391-
392-
endpoint = reactiveStreamsConsumer(inputChannel, handler, reactive);
387+
if (inputChannel instanceof Publisher || handler instanceof ReactiveMessageHandlerAdapter || reactiveProvided) {
388+
return reactiveStreamsConsumer(inputChannel, handler, reactiveProvided ? reactive : null);
393389
}
394390
else if (inputChannel instanceof SubscribableChannel) {
395391
Assert.state(ObjectUtils.isEmpty(pollers), () ->
396392
"A '@Poller' should not be specified for Annotation-based " +
397393
"endpoint, since '" + inputChannel + "' is a SubscribableChannel (not pollable).");
398-
endpoint = new EventDrivenConsumer((SubscribableChannel) inputChannel, handler);
394+
return new EventDrivenConsumer((SubscribableChannel) inputChannel, handler);
399395
}
400396
else if (inputChannel instanceof PollableChannel) {
401-
endpoint = pollingConsumer(inputChannel, handler, pollers);
397+
return pollingConsumer(inputChannel, handler, pollers);
402398
}
403399
else {
404400
throw new IllegalArgumentException("Unsupported 'inputChannel' type: '"
405401
+ inputChannel.getClass().getName() + "'. " +
406402
"Must be one of 'SubscribableChannel', 'PollableChannel' or 'ReactiveStreamsSubscribableChannel'");
407403
}
408-
409-
return endpoint;
410404
}
411405

412406
private ReactiveStreamsConsumer reactiveStreamsConsumer(MessageChannel channel, MessageHandler handler,
413-
Reactive[] reactives) {
407+
Reactive reactive) {
414408

415409
ReactiveStreamsConsumer reactiveStreamsConsumer;
416410
if (handler instanceof ReactiveMessageHandlerAdapter) {
@@ -421,10 +415,7 @@ private ReactiveStreamsConsumer reactiveStreamsConsumer(MessageChannel channel,
421415
reactiveStreamsConsumer = new ReactiveStreamsConsumer(channel, handler);
422416
}
423417

424-
if (!ObjectUtils.isEmpty(reactives)) {
425-
Assert.state(reactives.length == 1,
426-
"The 'reactive' for an Annotation-based endpoint can have only one '@Reactive'.");
427-
Reactive reactive = reactives[0];
418+
if (reactive != null) {
428419
String functionBeanName = reactive.value();
429420
if (StringUtils.hasText(functionBeanName)) {
430421
@SuppressWarnings("unchecked")
@@ -434,7 +425,6 @@ private ReactiveStreamsConsumer reactiveStreamsConsumer(MessageChannel channel,
434425
}
435426
}
436427

437-
438428
return reactiveStreamsConsumer;
439429
}
440430

spring-integration-core/src/main/java/org/springframework/integration/util/MessagingAnnotationUtils.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2019 the original author or authors.
2+
* Copyright 2014-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -61,11 +61,9 @@ public final class MessagingAnnotationUtils {
6161
@SuppressWarnings("unchecked")
6262
public static <T> T resolveAttribute(List<Annotation> annotations, String name, Class<T> requiredType) {
6363
for (Annotation annotation : annotations) {
64-
if (annotation != null) {
65-
Object value = AnnotationUtils.getValue(annotation, name);
66-
if (value != null && value.getClass() == requiredType && hasValue(value)) {
67-
return (T) value;
68-
}
64+
Object value = AnnotationUtils.getValue(annotation, name);
65+
if (requiredType.isInstance(value) && hasValue(value)) {
66+
return (T) value;
6967
}
7068
}
7169
return null;

src/reference/asciidoc/dsl.adoc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -265,8 +265,8 @@ The `PollerSpec` is a `FactoryBean` that generates the `PollerMetadata` object f
265265
[[java-dsl-reactive]]
266266
=== The `reactive()` Endpoint
267267

268-
Starting with version 5.5, the `ConsumerEndpointSpec` provide a `reactive()` configuration property with an optional customizer `Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>>`.
269-
This option makes the target endpoint as a `ReactiveStreamsConsumer` instance independently of the input channel type, which is turned to the `Flux` via `IntegrationReactiveUtils.messageChannelToFlux()`.
268+
Starting with version 5.5, the `ConsumerEndpointSpec` provides a `reactive()` configuration property with an optional customizer `Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>>`.
269+
This option configures the target endpoint as a `ReactiveStreamsConsumer` instance, independently of the input channel type, which is converted to a `Flux` via `IntegrationReactiveUtils.messageChannelToFlux()`.
270270
The provided function is used from the `Flux.transform()` operator to customize (`publishOn()`, `log()`, `doOnNext()` etc.) a reactive stream source from the input channel.
271271

272272
The following example demonstrates how to change the publishing thread from the input channel independently of the final subscriber and producer to that `DirectChannel`:

0 commit comments

Comments
 (0)