diff --git a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/MessageProducerSupport.java b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/MessageProducerSupport.java index d920a31c14b..b5da2bd2b30 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/MessageProducerSupport.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/MessageProducerSupport.java @@ -30,9 +30,13 @@ import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessagingException; import org.springframework.messaging.support.ErrorMessage; +import org.springframework.messaging.support.GenericMessage; import org.springframework.util.Assert; import org.springframework.util.StringUtils; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; + /** * A support class for producer endpoints that provides a setter for the * output channel and a convenience method for sending Messages. @@ -58,6 +62,12 @@ public abstract class MessageProducerSupport extends AbstractEndpoint implements private volatile boolean shouldTrack = false; + private boolean reactive; + + private Flux> flux; + + private volatile FluxSink> sink; + protected MessageProducerSupport() { this.setPhase(Integer.MAX_VALUE / 2); } @@ -151,6 +161,14 @@ public final void setErrorMessageStrategy(ErrorMessageStrategy errorMessageStrat this.errorMessageStrategy = errorMessageStrategy; } + public boolean isReactive() { + return this.reactive; + } + + public void setReactive(boolean reactive) { + this.reactive = reactive; + } + protected MessagingTemplate getMessagingTemplate() { return this.messagingTemplate; } @@ -182,6 +200,13 @@ protected void onInit() { */ @Override protected void doStart() { + if (this.reactive && this.flux == null) { + this.flux = + Flux.>create(emitter -> this.sink = emitter, FluxSink.OverflowStrategy.IGNORE) + .publish() + .autoConnect(); + this.messagingTemplate.send(getOutputChannel(), new GenericMessage<>(this.flux)); + } } /** @@ -199,12 +224,17 @@ protected void sendMessage(Message message) { if (this.shouldTrack) { message = MessageHistory.write(message, this, this.getMessageBuilderFactory()); } - try { - this.messagingTemplate.send(getOutputChannel(), message); + if (this.flux != null) { + this.sink.next(message); } - catch (RuntimeException e) { - if (!sendErrorMessageIfNecessary(message, e)) { - throw e; + else { + try { + this.messagingTemplate.send(getOutputChannel(), message); + } + catch (RuntimeException e) { + if (!sendErrorMessageIfNecessary(message, e)) { + throw e; + } } } } diff --git a/spring-integration-core/src/test/java/org/springframework/integration/endpoint/ReactiveMessageProducerTests.java b/spring-integration-core/src/test/java/org/springframework/integration/endpoint/ReactiveMessageProducerTests.java new file mode 100644 index 00000000000..cb527d4c953 --- /dev/null +++ b/spring-integration-core/src/test/java/org/springframework/integration/endpoint/ReactiveMessageProducerTests.java @@ -0,0 +1,108 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.endpoint; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.Test; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.config.EnableIntegration; +import org.springframework.integration.support.MessageBuilder; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.GenericMessage; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +import reactor.core.publisher.Flux; + +/** + * @author Gary Russell + * @since 5.1 + * + */ +@SpringJUnitConfig +@DirtiesContext +public class ReactiveMessageProducerTests { + + @Autowired + public Config config; + + @Test + public void test() throws Exception { + for (int i = 0; i < 5; i++) { + this.config.producer().produce(); + } + assertThat(this.config.latch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(this.config.received.get(0).getPayload()).isEqualTo("FOO"); + } + + @Configuration + @EnableIntegration + public static class Config { + + private final List> received = new ArrayList<>(); + + private final CountDownLatch latch = new CountDownLatch(5); + + @Bean + public MyProducer producer() { + MyProducer producer = new MyProducer(); + producer.setReactive(true); + producer.setOutputChannelName("in"); + return producer; + } + + @ServiceActivator(inputChannel = "in", outputChannel = "out") + public Flux> handle1(Flux> flux) { + return flux.map(m -> MessageBuilder.withPayload(((String) m.getPayload()).toUpperCase()).build()); + } + + @ServiceActivator(inputChannel = "out") + public void handle2(final Flux> flux) { + flux.map(m -> { + this.received.add(m); + System.out.println(m); + latch.countDown(); + return m; + }) + .subscribe(); + } + + } + + private static class MyProducer extends MessageProducerSupport { + + MyProducer() { + super(); + } + + void produce() { + sendMessage(new GenericMessage<>("foo")); + } + + } + +}