From f9a421df9a05935c0f4973a903bf537dfff82138 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Wed, 26 Apr 2017 11:38:13 -0400 Subject: [PATCH] Rename ReactiveChannel - as discussed last week TODO: - should we take the channel outside of the `AbstractMessageChannel` hierarchy? - avoid blocking interceptors - we would lose channel metrics though - rename `ReactiveConsumer` ? --- ...veChannel.java => FluxMessageChannel.java} | 20 +++++++++---------- ...nnel.java => FluxSubscribableChannel.java} | 9 +++++---- .../integration/dsl/Channels.java | 10 +++++----- .../dsl/IntegrationFlowDefinition.java | 4 ++-- .../integration/dsl/IntegrationFlows.java | 13 +++++++----- ...lSpec.java => FluxMessageChannelSpec.java} | 12 +++++------ .../dsl/channel/MessageChannels.java | 12 +++++------ .../AbstractMessageProducingHandler.java | 6 +++--- ...ests.java => FluxMessageChannelTests.java} | 16 +++++++-------- .../reactive/ReactiveConsumerTests.java | 6 +++--- ...tpRequestExecutingMessageHandlerTests.java | 4 ++-- 11 files changed, 58 insertions(+), 54 deletions(-) rename spring-integration-core/src/main/java/org/springframework/integration/channel/{ReactiveChannel.java => FluxMessageChannel.java} (79%) rename spring-integration-core/src/main/java/org/springframework/integration/channel/{ReactiveSubscribableChannel.java => FluxSubscribableChannel.java} (83%) rename spring-integration-core/src/main/java/org/springframework/integration/dsl/channel/{ReactiveChannelSpec.java => FluxMessageChannelSpec.java} (69%) rename spring-integration-core/src/test/java/org/springframework/integration/channel/reactive/{ReactiveChannelTests.java => FluxMessageChannelTests.java} (89%) diff --git a/spring-integration-core/src/main/java/org/springframework/integration/channel/ReactiveChannel.java b/spring-integration-core/src/main/java/org/springframework/integration/channel/FluxMessageChannel.java similarity index 79% rename from spring-integration-core/src/main/java/org/springframework/integration/channel/ReactiveChannel.java rename to spring-integration-core/src/main/java/org/springframework/integration/channel/FluxMessageChannel.java index 1d3d902891c..d332b584239 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/channel/ReactiveChannel.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/channel/FluxMessageChannel.java @@ -37,8 +37,8 @@ * * @since 5.0 */ -public class ReactiveChannel extends AbstractMessageChannel - implements Publisher>, ReactiveSubscribableChannel { +public class FluxMessageChannel extends AbstractMessageChannel + implements Publisher>, FluxSubscribableChannel { private final List>> subscribers = new ArrayList<>(); @@ -52,11 +52,11 @@ public class ReactiveChannel extends AbstractMessageChannel private volatile boolean upstreamSubscribed; - public ReactiveChannel() { + public FluxMessageChannel() { this(DirectProcessor.create()); } - public ReactiveChannel(FluxProcessor, Message> processor) { + public FluxMessageChannel(FluxProcessor, Message> processor) { Assert.notNull(processor, "'processor' must not be null"); this.processor = processor; this.flux = Flux.from(processor); @@ -73,7 +73,7 @@ protected boolean doSend(Message message, long timeout) { public void subscribe(Subscriber> subscriber) { this.subscribers.add(subscriber); - this.flux.doOnCancel(() -> ReactiveChannel.this.subscribers.remove(subscriber)) + this.flux.doOnCancel(() -> FluxMessageChannel.this.subscribers.remove(subscriber)) .subscribe(subscriber); if (!this.upstreamSubscribed) { @@ -82,7 +82,7 @@ public void subscribe(Subscriber> subscriber) { } @Override - public void subscribeTo(Publisher> publisher) { + public void subscribeTo(Flux> publisher) { this.publishers.add(publisher); if (!this.subscribers.isEmpty()) { doSubscribeTo(publisher); @@ -91,11 +91,11 @@ public void subscribeTo(Publisher> publisher) { private void doSubscribeTo(Publisher> publisher) { Flux.from(publisher) - .doOnSubscribe(s -> ReactiveChannel.this.upstreamSubscribed = true) + .doOnSubscribe(s -> FluxMessageChannel.this.upstreamSubscribed = true) .doOnComplete(() -> { - ReactiveChannel.this.publishers.remove(publisher); - if (ReactiveChannel.this.publishers.isEmpty()) { - ReactiveChannel.this.upstreamSubscribed = false; + FluxMessageChannel.this.publishers.remove(publisher); + if (FluxMessageChannel.this.publishers.isEmpty()) { + FluxMessageChannel.this.upstreamSubscribed = false; } }) .subscribe(this.processor); diff --git a/spring-integration-core/src/main/java/org/springframework/integration/channel/ReactiveSubscribableChannel.java b/spring-integration-core/src/main/java/org/springframework/integration/channel/FluxSubscribableChannel.java similarity index 83% rename from spring-integration-core/src/main/java/org/springframework/integration/channel/ReactiveSubscribableChannel.java rename to spring-integration-core/src/main/java/org/springframework/integration/channel/FluxSubscribableChannel.java index 36352266b9f..756666ea914 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/channel/ReactiveSubscribableChannel.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/channel/FluxSubscribableChannel.java @@ -16,17 +16,18 @@ package org.springframework.integration.channel; -import org.reactivestreams.Publisher; - import org.springframework.messaging.Message; +import reactor.core.publisher.Flux; + /** * @author Artem Bilan + * @author Gary Russell * * @since 5.0 */ -public interface ReactiveSubscribableChannel { +public interface FluxSubscribableChannel { - void subscribeTo(Publisher> publisher); + void subscribeTo(Flux> publisher); } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dsl/Channels.java b/spring-integration-core/src/main/java/org/springframework/integration/dsl/Channels.java index a67d2af3495..5148fe06d61 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/dsl/Channels.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/dsl/Channels.java @@ -21,11 +21,11 @@ import org.springframework.integration.dsl.channel.DirectChannelSpec; import org.springframework.integration.dsl.channel.ExecutorChannelSpec; +import org.springframework.integration.dsl.channel.FluxMessageChannelSpec; import org.springframework.integration.dsl.channel.MessageChannels; import org.springframework.integration.dsl.channel.PriorityChannelSpec; import org.springframework.integration.dsl.channel.PublishSubscribeChannelSpec; import org.springframework.integration.dsl.channel.QueueChannelSpec; -import org.springframework.integration.dsl.channel.ReactiveChannelSpec; import org.springframework.integration.dsl.channel.RendezvousChannelSpec; import org.springframework.integration.store.ChannelMessageStore; import org.springframework.integration.store.PriorityCapableChannelMessageStore; @@ -132,19 +132,19 @@ public ExecutorChannelSpec executor(String id, Executor executor) { } - public ReactiveChannelSpec reactive() { + public FluxMessageChannelSpec reactive() { return MessageChannels.reactive(); } - public ReactiveChannelSpec reactive(String id) { + public FluxMessageChannelSpec reactive(String id) { return MessageChannels.reactive(id); } - public ReactiveChannelSpec reactive(FluxProcessor, Message> processor) { + public FluxMessageChannelSpec reactive(FluxProcessor, Message> processor) { return MessageChannels.reactive(processor); } - public ReactiveChannelSpec reactive(String id, FluxProcessor, Message> processor) { + public FluxMessageChannelSpec reactive(String id, FluxProcessor, Message> processor) { return MessageChannels.reactive(id, processor); } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlowDefinition.java b/spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlowDefinition.java index e86c5954e93..69a5ca75982 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlowDefinition.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlowDefinition.java @@ -38,8 +38,8 @@ import org.springframework.integration.channel.ChannelInterceptorAware; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.channel.FixedSubscriberChannel; +import org.springframework.integration.channel.FluxMessageChannel; import org.springframework.integration.channel.MessageChannelReactiveUtils; -import org.springframework.integration.channel.ReactiveChannel; import org.springframework.integration.channel.interceptor.WireTap; import org.springframework.integration.config.ConsumerEndpointFactoryBean; import org.springframework.integration.config.SourcePollingChannelAdapterFactoryBean; @@ -2550,7 +2550,7 @@ public Publisher> toReactivePublisher() { publisher = MessageChannelReactiveUtils.toPublisher(channelForPublisher); } else { - MessageChannel reactiveChannel = new ReactiveChannel(); + MessageChannel reactiveChannel = new FluxMessageChannel(); publisher = (Publisher>) reactiveChannel; channel(reactiveChannel); } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlows.java b/spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlows.java index 09e9a781a26..de2eefc411e 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlows.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlows.java @@ -21,7 +21,7 @@ import org.reactivestreams.Publisher; import org.springframework.integration.channel.DirectChannel; -import org.springframework.integration.channel.ReactiveChannel; +import org.springframework.integration.channel.FluxMessageChannel; import org.springframework.integration.core.MessageSource; import org.springframework.integration.dsl.channel.MessageChannelSpec; import org.springframework.integration.dsl.support.FixedSubscriberChannelPrototype; @@ -35,10 +35,13 @@ import org.springframework.messaging.MessageChannel; import org.springframework.util.Assert; +import reactor.core.publisher.Flux; + /** * The central factory for fluent {@link IntegrationFlowBuilder} API. * * @author Artem Bilan + * @author Gary Russell * * @since 5.0 * @@ -299,15 +302,15 @@ protected void onInit() { } /** - * Populate a {@link ReactiveChannel} to the {@link IntegrationFlowBuilder} chain + * Populate a {@link FluxMessageChannel} to the {@link IntegrationFlowBuilder} chain * and subscribe it to the provided {@link Publisher}. * @param publisher the {@link Publisher} to subscribe to. * @return new {@link IntegrationFlowBuilder}. */ - public static IntegrationFlowBuilder from(Publisher> publisher) { - ReactiveChannel reactiveChannel = new ReactiveChannel(); + public static IntegrationFlowBuilder from(Flux> publisher) { + FluxMessageChannel reactiveChannel = new FluxMessageChannel(); reactiveChannel.subscribeTo(publisher); - return from((MessageChannel) reactiveChannel); + return from(reactiveChannel); } private static IntegrationFlowBuilder from(MessagingGatewaySupport inboundGateway, diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dsl/channel/ReactiveChannelSpec.java b/spring-integration-core/src/main/java/org/springframework/integration/dsl/channel/FluxMessageChannelSpec.java similarity index 69% rename from spring-integration-core/src/main/java/org/springframework/integration/dsl/channel/ReactiveChannelSpec.java rename to spring-integration-core/src/main/java/org/springframework/integration/dsl/channel/FluxMessageChannelSpec.java index c347d6d04b8..5f8c2615252 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/dsl/channel/ReactiveChannelSpec.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/dsl/channel/FluxMessageChannelSpec.java @@ -16,7 +16,7 @@ package org.springframework.integration.dsl.channel; -import org.springframework.integration.channel.ReactiveChannel; +import org.springframework.integration.channel.FluxMessageChannel; import org.springframework.messaging.Message; import reactor.core.publisher.FluxProcessor; @@ -27,14 +27,14 @@ * * @since 5.0 */ -public class ReactiveChannelSpec extends MessageChannelSpec { +public class FluxMessageChannelSpec extends MessageChannelSpec { - ReactiveChannelSpec() { - this.channel = new ReactiveChannel(); + FluxMessageChannelSpec() { + this.channel = new FluxMessageChannel(); } - ReactiveChannelSpec(FluxProcessor, Message> processor) { - this.channel = new ReactiveChannel(processor); + FluxMessageChannelSpec(FluxProcessor, Message> processor) { + this.channel = new FluxMessageChannel(processor); } } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dsl/channel/MessageChannels.java b/spring-integration-core/src/main/java/org/springframework/integration/dsl/channel/MessageChannels.java index 48391dea4d3..f155ca5defc 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/dsl/channel/MessageChannels.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/dsl/channel/MessageChannels.java @@ -126,19 +126,19 @@ public static > PublishSubscribeChannel return MessageChannels.publishSubscribe(executor).id(id); } - public static ReactiveChannelSpec reactive() { - return new ReactiveChannelSpec(); + public static FluxMessageChannelSpec reactive() { + return new FluxMessageChannelSpec(); } - public static ReactiveChannelSpec reactive(String id) { + public static FluxMessageChannelSpec reactive(String id) { return reactive().id(id); } - public static ReactiveChannelSpec reactive(FluxProcessor, Message> processor) { - return new ReactiveChannelSpec(processor); + public static FluxMessageChannelSpec reactive(FluxProcessor, Message> processor) { + return new FluxMessageChannelSpec(processor); } - public static ReactiveChannelSpec reactive(String id, FluxProcessor, Message> processor) { + public static FluxMessageChannelSpec reactive(String id, FluxProcessor, Message> processor) { return reactive(processor).id(id); } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageProducingHandler.java b/spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageProducingHandler.java index 8f2466ad9ec..401e63ae9f3 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageProducingHandler.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageProducingHandler.java @@ -24,7 +24,7 @@ import org.reactivestreams.Publisher; import org.springframework.integration.IntegrationMessageHeaderAccessor; -import org.springframework.integration.channel.ReactiveSubscribableChannel; +import org.springframework.integration.channel.FluxSubscribableChannel; import org.springframework.integration.core.MessageProducer; import org.springframework.integration.core.MessagingTemplate; import org.springframework.integration.routingslip.RoutingSlipRouteStrategy; @@ -190,7 +190,7 @@ else if (reply instanceof AbstractIntegrationMessageBuilder) { } if (this.async && (reply instanceof ListenableFuture || reply instanceof Publisher)) { - if (reply instanceof ListenableFuture || !(getOutputChannel() instanceof ReactiveSubscribableChannel)) { + if (reply instanceof ListenableFuture || !(getOutputChannel() instanceof FluxSubscribableChannel)) { ListenableFuture future; if (reply instanceof ListenableFuture) { future = (ListenableFuture) reply; @@ -235,7 +235,7 @@ public void onFailure(Throwable ex) { }); } else { - ((ReactiveSubscribableChannel) getOutputChannel()) + ((FluxSubscribableChannel) getOutputChannel()) .subscribeTo(Flux.from((Publisher) reply) .map(result -> createOutputMessage(result, requestHeaders))); } diff --git a/spring-integration-core/src/test/java/org/springframework/integration/channel/reactive/ReactiveChannelTests.java b/spring-integration-core/src/test/java/org/springframework/integration/channel/reactive/FluxMessageChannelTests.java similarity index 89% rename from spring-integration-core/src/test/java/org/springframework/integration/channel/reactive/ReactiveChannelTests.java rename to spring-integration-core/src/test/java/org/springframework/integration/channel/reactive/FluxMessageChannelTests.java index eac9b4f7d3d..1161166ee86 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/channel/reactive/ReactiveChannelTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/channel/reactive/FluxMessageChannelTests.java @@ -36,9 +36,9 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.channel.FluxMessageChannel; import org.springframework.integration.channel.MessageChannelReactiveUtils; import org.springframework.integration.channel.QueueChannel; -import org.springframework.integration.channel.ReactiveChannel; import org.springframework.integration.config.EnableIntegration; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; @@ -57,10 +57,10 @@ */ @RunWith(SpringRunner.class) @DirtiesContext -public class ReactiveChannelTests { +public class FluxMessageChannelTests { @Autowired - private MessageChannel reactiveChannel; + private MessageChannel fluxMessageChannel; @Autowired private MessageChannel queueChannel; @@ -69,11 +69,11 @@ public class ReactiveChannelTests { private PollableChannel errorChannel; @Test - public void testReactiveMessageChannel() throws InterruptedException { + public void testFluxMessageChannel() throws InterruptedException { QueueChannel replyChannel = new QueueChannel(); for (int i = 0; i < 10; i++) { - this.reactiveChannel.send(MessageBuilder.withPayload(i).setReplyChannel(replyChannel).build()); + this.fluxMessageChannel.send(MessageBuilder.withPayload(i).setReplyChannel(replyChannel).build()); } for (int i = 0; i < 9; i++) { @@ -116,11 +116,11 @@ public QueueChannel errorChannel() { } @Bean - public MessageChannel reactiveChannel() { - return new ReactiveChannel(); + public MessageChannel fluxMessageChannel() { + return new FluxMessageChannel(); } - @ServiceActivator(inputChannel = "reactiveChannel") + @ServiceActivator(inputChannel = "fluxMessageChannel") public String handle(int payload) { if (payload == 5) { throw new IllegalStateException("intentional"); diff --git a/spring-integration-core/src/test/java/org/springframework/integration/channel/reactive/ReactiveConsumerTests.java b/spring-integration-core/src/test/java/org/springframework/integration/channel/reactive/ReactiveConsumerTests.java index f6512b28f52..8a73265c49d 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/channel/reactive/ReactiveConsumerTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/channel/reactive/ReactiveConsumerTests.java @@ -47,7 +47,7 @@ import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.channel.QueueChannel; -import org.springframework.integration.channel.ReactiveChannel; +import org.springframework.integration.channel.FluxMessageChannel; import org.springframework.integration.config.ConsumerEndpointFactoryBean; import org.springframework.integration.endpoint.ReactiveConsumer; import org.springframework.integration.handler.MethodInvokingMessageHandler; @@ -67,7 +67,7 @@ public class ReactiveConsumerTests { @Test public void testReactiveConsumerReactiveChannel() throws InterruptedException { - ReactiveChannel testChannel = new ReactiveChannel(EmitterProcessor.create(false)); + FluxMessageChannel testChannel = new FluxMessageChannel(EmitterProcessor.create(false)); List> result = new LinkedList<>(); CountDownLatch stopLatch = new CountDownLatch(2); @@ -224,7 +224,7 @@ public void testReactiveConsumerPollableChannel() throws InterruptedException { @Test public void testReactiveConsumerViaConsumerEndpointFactoryBean() throws Exception { - ReactiveChannel testChannel = new ReactiveChannel(); + FluxMessageChannel testChannel = new FluxMessageChannel(); List> result = new LinkedList<>(); CountDownLatch stopLatch = new CountDownLatch(3); diff --git a/spring-integration-http/src/test/java/org/springframework/integration/http/outbound/ReactiveHttpRequestExecutingMessageHandlerTests.java b/spring-integration-http/src/test/java/org/springframework/integration/http/outbound/ReactiveHttpRequestExecutingMessageHandlerTests.java index dde40bbf779..c0cc5cf8eee 100644 --- a/spring-integration-http/src/test/java/org/springframework/integration/http/outbound/ReactiveHttpRequestExecutingMessageHandlerTests.java +++ b/spring-integration-http/src/test/java/org/springframework/integration/http/outbound/ReactiveHttpRequestExecutingMessageHandlerTests.java @@ -29,7 +29,7 @@ import org.springframework.http.HttpStatus; import org.springframework.http.client.reactive.ClientHttpConnector; import org.springframework.integration.channel.QueueChannel; -import org.springframework.integration.channel.ReactiveChannel; +import org.springframework.integration.channel.FluxMessageChannel; import org.springframework.integration.http.HttpHeaders; import org.springframework.integration.support.MessageBuilder; import org.springframework.messaging.Message; @@ -63,7 +63,7 @@ public void testReactiveReturn() throws Throwable { ReactiveHttpRequestExecutingMessageHandler reactiveHandler = new ReactiveHttpRequestExecutingMessageHandler(destinationUri, webClient); - ReactiveChannel ackChannel = new ReactiveChannel(); + FluxMessageChannel ackChannel = new FluxMessageChannel(); reactiveHandler.setOutputChannel(ackChannel); reactiveHandler.handleMessage(MessageBuilder.withPayload("hello, world").build());