Skip to content

Commit e6e7408

Browse files
committed
INT-4186: Add @Reactive and @ChannelListener
JIRA: https://jira.spring.io/browse/INT-4186 Just a PoC how it would look from the end-user perspective * Add `@ChannelListener` which is fully analogue of similar solution in other places - `@RabbitListener`, `@KafkaListener`, `@JmsListener`. In other words it has ability to subscriber to several destinations providing common options for target endpoints The reply will be processed via `@SendTo` or `replyChannel` header. This functionality is fully similar to the `@StreamListener` * Add `@Reactive` annotations' option alongside with `@Poller` to allow to use `ReactiveConsumer` underneath with provided backpressure strategy
1 parent 129ebdc commit e6e7408

File tree

9 files changed

+329
-7
lines changed

9 files changed

+329
-7
lines changed
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
/*
2+
* Copyright 2017 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.annotation;
18+
19+
import java.lang.annotation.Documented;
20+
import java.lang.annotation.ElementType;
21+
import java.lang.annotation.Retention;
22+
import java.lang.annotation.RetentionPolicy;
23+
import java.lang.annotation.Target;
24+
25+
import org.springframework.core.annotation.AliasFor;
26+
import org.springframework.messaging.handler.annotation.MessageMapping;
27+
28+
/**
29+
* Annotation that marks a method to be a listener to inputs channels.
30+
*
31+
* Annotated methods are allowed to have flexible signatures, which determine how the
32+
* method is invoked and how their return results are processed.
33+
*
34+
* <h3>Individual message handler mode</h3>
35+
*
36+
* Non declarative methods are treated as message handler based, and are invoked for each
37+
* incoming message received from that target. In this case, the method can have a
38+
* flexible signature, as described by {@link MessageMapping}.
39+
*
40+
* If the method returns a {@link org.springframework.messaging.Message}, the result will
41+
* be automatically sent to an output channel
42+
* (see {@link org.springframework.messaging.handler.annotation.SendTo}), as follows:
43+
* <ul>
44+
* <li>A result of the type {@link org.springframework.messaging.Message} will be sent
45+
* as-is</li>
46+
* <li>All other results will become the payload of a
47+
* {@link org.springframework.messaging.Message}</li>
48+
* </ul>
49+
*
50+
* The output channel where the return message is sent is determined by consulting
51+
* in the following order:
52+
* <ul>
53+
* <li>The {@link org.springframework.messaging.MessageHeaders} of the resulting
54+
* message.</li>
55+
* <li>The value set on the
56+
* {@link org.springframework.messaging.handler.annotation.SendTo} annotation, if
57+
* present</li>
58+
* </ul>
59+
*
60+
* An example of individual message handler signature is as follows:
61+
*
62+
* <pre>
63+
* {@code
64+
* @ChannelListener("inputChannel")
65+
* @SendTo("outputChannel")
66+
* public String convert(String input) {
67+
* return input.toUppercase();
68+
* }
69+
* }
70+
* </pre>
71+
*
72+
* @author Marius Bogoevici
73+
* @author Ilayaperumal Gopinathan
74+
* @author Artem Bilan
75+
*
76+
* @since 5.0
77+
* @see org.springframework.messaging.handler.annotation.SendTo
78+
*/
79+
@Target({ ElementType.METHOD, ElementType.ANNOTATION_TYPE })
80+
@Retention(RetentionPolicy.RUNTIME)
81+
@MessageMapping
82+
@Documented
83+
public @interface ChannelListener {
84+
85+
/**
86+
* The names of the message channels that the method subscribes to.
87+
*/
88+
@AliasFor("channel")
89+
String[] value() default { };
90+
91+
/**
92+
* The names of the message channels that the method subscribes to.
93+
*/
94+
@AliasFor("value")
95+
String[] channel() default { };
96+
97+
/**
98+
* Specify a "chain" of {@code Advice} beans that will "wrap" the message handler.
99+
* Only the handler is advised, not the downstream flow.
100+
* @return the advice chain.
101+
*/
102+
String[] adviceChain() default { };
103+
104+
/**
105+
* The {@link org.springframework.context.SmartLifecycle} {@code autoStartup} option.
106+
* Can be specified as 'property placeholder', e.g. {@code ${foo.autoStartup}}.
107+
* Defaults to {@code true}.
108+
* @return the auto startup {@code boolean} flag.
109+
*/
110+
String autoStartup() default "";
111+
112+
/**
113+
* Specify a {@link org.springframework.context.SmartLifecycle} {@code phase} option.
114+
* Defaults {@code 0} for {@link org.springframework.integration.endpoint.PollingConsumer}
115+
* and {@code Integer.MIN_VALUE} for {@link org.springframework.integration.endpoint.EventDrivenConsumer}.
116+
* Can be specified as 'property placeholder', e.g. {@code ${foo.phase}}.
117+
* @return the {@code SmartLifecycle} phase.
118+
*/
119+
String phase() default "";
120+
121+
/**
122+
* @return the {@link Poller} options for a polled endpoint
123+
* ({@link org.springframework.integration.scheduling.PollerMetadata}).
124+
* This attribute is an {@code array} just to allow an empty default (no poller).
125+
* Only one {@link Poller} element is allowed.
126+
* Mutually exclusive with {@link #reactive()}.
127+
*/
128+
Poller[] poller() default { };
129+
130+
/**
131+
* @return the {@link Reactive} options for a reactive endpoint.
132+
* This attribute is an {@code array} just to allow an empty default (not reactive).
133+
* Only one {@link Reactive} element is allowed.
134+
* Mutually exclusive with {@link #poller()}.
135+
*/
136+
Reactive[] reactive() default { };
137+
138+
}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Copyright 2017 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.annotation;
18+
19+
import java.lang.annotation.Retention;
20+
import java.lang.annotation.RetentionPolicy;
21+
import java.lang.annotation.Target;
22+
23+
import org.springframework.core.annotation.AliasFor;
24+
import org.springframework.integration.reactive.BackpressureType;
25+
26+
import reactor.core.publisher.BufferOverflowStrategy;
27+
28+
/**
29+
* Provides backpressure options for the Messaging annotations for
30+
* reactive endpoints.
31+
* <p>
32+
* It is an analogue of the XML {@code <reactive/>} element.
33+
* <p>
34+
* Non-reference attributes support Property Placeholder resolutions.
35+
*
36+
* @author Artem Bilan
37+
*
38+
* @since 5.0
39+
*
40+
* @see reactor.core.publisher.Flux#onBackpressureBuffer
41+
* @see reactor.core.publisher.Flux#onBackpressureDrop
42+
* @see reactor.core.publisher.Flux#onBackpressureError
43+
* @see reactor.core.publisher.Flux#onBackpressureLatest
44+
*/
45+
@Target({})
46+
@Retention(RetentionPolicy.RUNTIME)
47+
public @interface Reactive {
48+
49+
/**
50+
* @return The {@link BackpressureType} to use.
51+
*/
52+
@AliasFor("backpressure")
53+
BackpressureType value() default BackpressureType.NONE;
54+
55+
/**
56+
* @return The {@link BackpressureType} to use.
57+
*/
58+
@AliasFor("value")
59+
BackpressureType backpressure() default BackpressureType.NONE;
60+
61+
/**
62+
* @return The {@link java.util.function.Consumer} bean name, called as a callback on backpressure.
63+
*/
64+
String consumer() default "";
65+
66+
/**
67+
* @return The {@link BufferOverflowStrategy} which is used
68+
* in case of {@link BackpressureType#BUFFER} for the {@link #backpressure()}.
69+
*/
70+
BufferOverflowStrategy bufferOverflowStrategy() default BufferOverflowStrategy.ERROR;
71+
72+
/**
73+
* @return the maximum buffer backlog size before immediate error
74+
* in case of {@link BackpressureType#BUFFER} for the {@link #backpressure()}.
75+
* Defaults to {@link Integer#MIN_VALUE} meaning {@code unbounded}.
76+
*/
77+
String bufferMaxSize() default "-2147483648";
78+
79+
}

spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlowDefinition.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
import org.springframework.integration.channel.ChannelInterceptorAware;
3838
import org.springframework.integration.channel.DirectChannel;
3939
import org.springframework.integration.channel.FixedSubscriberChannel;
40-
import org.springframework.integration.channel.MessageChannelReactiveUtils;
40+
import org.springframework.integration.reactive.MessageChannelReactiveUtils;
4141
import org.springframework.integration.channel.ReactiveChannel;
4242
import org.springframework.integration.channel.interceptor.WireTap;
4343
import org.springframework.integration.config.ConsumerEndpointFactoryBean;

spring-integration-core/src/main/java/org/springframework/integration/endpoint/ReactiveConsumer.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import org.reactivestreams.Subscriber;
2323
import org.reactivestreams.Subscription;
2424

25-
import org.springframework.integration.channel.MessageChannelReactiveUtils;
25+
import org.springframework.integration.reactive.MessageChannelReactiveUtils;
2626
import org.springframework.integration.channel.MessagePublishingErrorHandler;
2727
import org.springframework.integration.support.channel.BeanFactoryChannelResolver;
2828
import org.springframework.messaging.Message;
@@ -34,6 +34,7 @@
3434
import reactor.core.Exceptions;
3535
import reactor.core.Receiver;
3636
import reactor.core.Trackable;
37+
import reactor.core.publisher.Flux;
3738
import reactor.core.publisher.Operators;
3839

3940

@@ -45,7 +46,7 @@ public class ReactiveConsumer extends AbstractEndpoint {
4546

4647
private final Operators.SubscriberAdapter<Message<?>, Message<?>> subscriber;
4748

48-
private volatile Publisher<Message<?>> publisher;
49+
private volatile Flux<Message<?>> publisher;
4950

5051
private ErrorHandler errorHandler;
5152

@@ -60,7 +61,7 @@ public ReactiveConsumer(MessageChannel inputChannel, Subscriber<Message<?>> subs
6061
Assert.notNull(subscriber);
6162

6263
Publisher<?> messagePublisher = MessageChannelReactiveUtils.toPublisher(inputChannel);
63-
this.publisher = (Publisher<Message<?>>) messagePublisher;
64+
this.publisher = Flux.from((Publisher<Message<?>>) messagePublisher);
6465

6566
this.subscriber = new Operators.SubscriberAdapter<Message<?>, Message<?>>(subscriber) {
6667

@@ -93,7 +94,8 @@ protected void onInit() throws Exception {
9394

9495
@Override
9596
protected void doStart() {
96-
this.publisher.subscribe(this.subscriber);
97+
this.publisher
98+
.subscribe(this.subscriber);
9799
}
98100

99101
@Override
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright 2017 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.reactive;
18+
19+
/**
20+
* The {@link reactor.core.publisher.Flux} backpressure strategy type.
21+
*
22+
* @author Artem Bilan
23+
*
24+
* @since 5.0
25+
*
26+
* @see org.springframework.integration.annotation.Reactive
27+
* @see reactor.core.publisher.Flux#onBackpressureBuffer
28+
* @see reactor.core.publisher.Flux#onBackpressureDrop
29+
* @see reactor.core.publisher.Flux#onBackpressureError
30+
* @see reactor.core.publisher.Flux#onBackpressureLatest
31+
*/
32+
public enum BackpressureType {
33+
34+
BUFFER, DROP, ERROR, LATEST, NONE
35+
36+
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package org.springframework.integration.channel;
17+
package org.springframework.integration.reactive;
1818

1919
import java.util.Iterator;
2020

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
/**
2+
* Provides utility classes for Reactive Streams integration.
3+
*/
4+
package org.springframework.integration.reactive;

spring-integration-core/src/test/java/org/springframework/integration/channel/reactive/ReactiveChannelTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
import org.springframework.context.annotation.Bean;
3737
import org.springframework.context.annotation.Configuration;
3838
import org.springframework.integration.annotation.ServiceActivator;
39-
import org.springframework.integration.channel.MessageChannelReactiveUtils;
39+
import org.springframework.integration.reactive.MessageChannelReactiveUtils;
4040
import org.springframework.integration.channel.QueueChannel;
4141
import org.springframework.integration.channel.ReactiveChannel;
4242
import org.springframework.integration.config.EnableIntegration;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Copyright 2017 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.channellistener;
18+
19+
import org.junit.runner.RunWith;
20+
21+
import org.springframework.context.annotation.ComponentScan;
22+
import org.springframework.context.annotation.Configuration;
23+
import org.springframework.integration.annotation.ChannelListener;
24+
import org.springframework.integration.annotation.Reactive;
25+
import org.springframework.integration.config.EnableIntegration;
26+
import org.springframework.integration.reactive.BackpressureType;
27+
import org.springframework.stereotype.Service;
28+
import org.springframework.test.annotation.DirtiesContext;
29+
import org.springframework.test.context.junit4.SpringRunner;
30+
31+
import reactor.core.publisher.BufferOverflowStrategy;
32+
33+
/**
34+
* @author Artem Bilan
35+
*
36+
* @since 5.0
37+
*/
38+
@RunWith(SpringRunner.class)
39+
@DirtiesContext
40+
public class ChannelListenerTests {
41+
42+
@Configuration
43+
@ComponentScan
44+
@EnableIntegration
45+
public static class ContextConfiguration {
46+
47+
}
48+
49+
@Service
50+
static class ChannelListenerTestService {
51+
52+
@ChannelListener(channel = "simpleChannel",
53+
reactive = @Reactive(
54+
backpressure = BackpressureType.BUFFER,
55+
bufferOverflowStrategy = BufferOverflowStrategy.ERROR,
56+
bufferMaxSize = "100"))
57+
public String simpleListenerTest(String payload) {
58+
return payload.toUpperCase();
59+
}
60+
61+
}
62+
63+
}

0 commit comments

Comments
 (0)