Skip to content

Rename ReactiveChannel #2135

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@
*
* @since 5.0
*/
public class ReactiveChannel extends AbstractMessageChannel
implements Publisher<Message<?>>, ReactiveSubscribableChannel {
public class FluxMessageChannel extends AbstractMessageChannel
implements Publisher<Message<?>>, FluxSubscribableChannel {

private final List<Subscriber<? super Message<?>>> subscribers = new ArrayList<>();

Expand All @@ -52,11 +52,11 @@ public class ReactiveChannel extends AbstractMessageChannel

private volatile boolean upstreamSubscribed;

public ReactiveChannel() {
public FluxMessageChannel() {
this(DirectProcessor.create());
}

public ReactiveChannel(FluxProcessor<Message<?>, Message<?>> processor) {
public FluxMessageChannel(FluxProcessor<Message<?>, Message<?>> processor) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I missed somehow discussion.
Would you mind explaining to me one more time?
So, what is wrong to request just raw Processor and wrap() internally if that is still required?
Thanks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would be the point of creating a MessageChannel for Mono<Message<?>> - by definition, in an integration flow, we are wiring a producer and consumer together with a pipe to transfer messages.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's still isn't clear.
I talk about Processor ,but you mention Mono somehow...
Sorry, I don't follow

Assert.notNull(processor, "'processor' must not be null");
this.processor = processor;
this.flux = Flux.from(processor);
Expand All @@ -73,7 +73,7 @@ protected boolean doSend(Message<?> message, long timeout) {
public void subscribe(Subscriber<? super Message<?>> subscriber) {
this.subscribers.add(subscriber);

this.flux.doOnCancel(() -> ReactiveChannel.this.subscribers.remove(subscriber))
this.flux.doOnCancel(() -> FluxMessageChannel.this.subscribers.remove(subscriber))
.subscribe(subscriber);

if (!this.upstreamSubscribed) {
Expand All @@ -82,7 +82,7 @@ public void subscribe(Subscriber<? super Message<?>> subscriber) {
}

@Override
public void subscribeTo(Publisher<Message<?>> publisher) {
public void subscribeTo(Flux<Message<?>> publisher) {
this.publishers.add(publisher);
if (!this.subscribers.isEmpty()) {
doSubscribeTo(publisher);
Expand All @@ -91,11 +91,11 @@ public void subscribeTo(Publisher<Message<?>> publisher) {

private void doSubscribeTo(Publisher<Message<?>> publisher) {
Flux.from(publisher)
.doOnSubscribe(s -> ReactiveChannel.this.upstreamSubscribed = true)
.doOnSubscribe(s -> FluxMessageChannel.this.upstreamSubscribed = true)
.doOnComplete(() -> {
ReactiveChannel.this.publishers.remove(publisher);
if (ReactiveChannel.this.publishers.isEmpty()) {
ReactiveChannel.this.upstreamSubscribed = false;
FluxMessageChannel.this.publishers.remove(publisher);
if (FluxMessageChannel.this.publishers.isEmpty()) {
FluxMessageChannel.this.upstreamSubscribed = false;
}
})
.subscribe(this.processor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,18 @@

package org.springframework.integration.channel;

import org.reactivestreams.Publisher;

import org.springframework.messaging.Message;

import reactor.core.publisher.Flux;

/**
* @author Artem Bilan
* @author Gary Russell
*
* @since 5.0
*/
public interface ReactiveSubscribableChannel {
public interface FluxSubscribableChannel {

void subscribeTo(Publisher<Message<?>> publisher);
void subscribeTo(Flux<Message<?>> publisher);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why to be restricted just only to Flux?
What's wrong with the more general contract?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes using Publisher for input arguments and returning Flux/Mono (where one is already used internally) is a pattern we follow in Spring Framework APIs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see reason to return something because ReactiveChannel is a Piublisher per se already.


}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@

import org.springframework.integration.dsl.channel.DirectChannelSpec;
import org.springframework.integration.dsl.channel.ExecutorChannelSpec;
import org.springframework.integration.dsl.channel.FluxMessageChannelSpec;
import org.springframework.integration.dsl.channel.MessageChannels;
import org.springframework.integration.dsl.channel.PriorityChannelSpec;
import org.springframework.integration.dsl.channel.PublishSubscribeChannelSpec;
import org.springframework.integration.dsl.channel.QueueChannelSpec;
import org.springframework.integration.dsl.channel.ReactiveChannelSpec;
import org.springframework.integration.dsl.channel.RendezvousChannelSpec;
import org.springframework.integration.store.ChannelMessageStore;
import org.springframework.integration.store.PriorityCapableChannelMessageStore;
Expand Down Expand Up @@ -132,19 +132,19 @@ public ExecutorChannelSpec executor(String id, Executor executor) {
}


public ReactiveChannelSpec reactive() {
public FluxMessageChannelSpec reactive() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, it isn't reactive() anymore?
flux() then?

return MessageChannels.reactive();
}

public ReactiveChannelSpec reactive(String id) {
public FluxMessageChannelSpec reactive(String id) {
return MessageChannels.reactive(id);
}

public ReactiveChannelSpec reactive(FluxProcessor<Message<?>, Message<?>> processor) {
public FluxMessageChannelSpec reactive(FluxProcessor<Message<?>, Message<?>> processor) {
return MessageChannels.reactive(processor);
}

public ReactiveChannelSpec reactive(String id, FluxProcessor<Message<?>, Message<?>> processor) {
public FluxMessageChannelSpec reactive(String id, FluxProcessor<Message<?>, Message<?>> processor) {
return MessageChannels.reactive(id, processor);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@
import org.springframework.integration.channel.ChannelInterceptorAware;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.FixedSubscriberChannel;
import org.springframework.integration.channel.FluxMessageChannel;
import org.springframework.integration.channel.MessageChannelReactiveUtils;
import org.springframework.integration.channel.ReactiveChannel;
import org.springframework.integration.channel.interceptor.WireTap;
import org.springframework.integration.config.ConsumerEndpointFactoryBean;
import org.springframework.integration.config.SourcePollingChannelAdapterFactoryBean;
Expand Down Expand Up @@ -2550,7 +2550,7 @@ public <T> Publisher<Message<T>> toReactivePublisher() {
publisher = MessageChannelReactiveUtils.toPublisher(channelForPublisher);
}
else {
MessageChannel reactiveChannel = new ReactiveChannel();
MessageChannel reactiveChannel = new FluxMessageChannel();
publisher = (Publisher<Message<T>>) reactiveChannel;
channel(reactiveChannel);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.reactivestreams.Publisher;

import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.ReactiveChannel;
import org.springframework.integration.channel.FluxMessageChannel;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.dsl.channel.MessageChannelSpec;
import org.springframework.integration.dsl.support.FixedSubscriberChannelPrototype;
Expand All @@ -35,10 +35,13 @@
import org.springframework.messaging.MessageChannel;
import org.springframework.util.Assert;

import reactor.core.publisher.Flux;

/**
* The central factory for fluent {@link IntegrationFlowBuilder} API.
*
* @author Artem Bilan
* @author Gary Russell
*
* @since 5.0
*
Expand Down Expand Up @@ -299,15 +302,15 @@ protected void onInit() {
}

/**
* Populate a {@link ReactiveChannel} to the {@link IntegrationFlowBuilder} chain
* Populate a {@link FluxMessageChannel} to the {@link IntegrationFlowBuilder} chain
* and subscribe it to the provided {@link Publisher}.
* @param publisher the {@link Publisher} to subscribe to.
* @return new {@link IntegrationFlowBuilder}.
*/
public static IntegrationFlowBuilder from(Publisher<Message<?>> publisher) {
ReactiveChannel reactiveChannel = new ReactiveChannel();
public static IntegrationFlowBuilder from(Flux<Message<?>> publisher) {
FluxMessageChannel reactiveChannel = new FluxMessageChannel();
reactiveChannel.subscribeTo(publisher);
return from((MessageChannel) reactiveChannel);
return from(reactiveChannel);
}

private static IntegrationFlowBuilder from(MessagingGatewaySupport inboundGateway,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package org.springframework.integration.dsl.channel;

import org.springframework.integration.channel.ReactiveChannel;
import org.springframework.integration.channel.FluxMessageChannel;
import org.springframework.messaging.Message;

import reactor.core.publisher.FluxProcessor;
Expand All @@ -27,14 +27,14 @@
*
* @since 5.0
*/
public class ReactiveChannelSpec extends MessageChannelSpec<ReactiveChannelSpec, ReactiveChannel> {
public class FluxMessageChannelSpec extends MessageChannelSpec<FluxMessageChannelSpec, FluxMessageChannel> {

ReactiveChannelSpec() {
this.channel = new ReactiveChannel();
FluxMessageChannelSpec() {
this.channel = new FluxMessageChannel();
}

ReactiveChannelSpec(FluxProcessor<Message<?>, Message<?>> processor) {
this.channel = new ReactiveChannel(processor);
FluxMessageChannelSpec(FluxProcessor<Message<?>, Message<?>> processor) {
this.channel = new FluxMessageChannel(processor);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -126,19 +126,19 @@ public static <S extends PublishSubscribeChannelSpec<S>> PublishSubscribeChannel
return MessageChannels.<S>publishSubscribe(executor).id(id);
}

public static ReactiveChannelSpec reactive() {
return new ReactiveChannelSpec();
public static FluxMessageChannelSpec reactive() {
return new FluxMessageChannelSpec();
}

public static ReactiveChannelSpec reactive(String id) {
public static FluxMessageChannelSpec reactive(String id) {
return reactive().id(id);
}

public static ReactiveChannelSpec reactive(FluxProcessor<Message<?>, Message<?>> processor) {
return new ReactiveChannelSpec(processor);
public static FluxMessageChannelSpec reactive(FluxProcessor<Message<?>, Message<?>> processor) {
return new FluxMessageChannelSpec(processor);
}

public static ReactiveChannelSpec reactive(String id, FluxProcessor<Message<?>, Message<?>> processor) {
public static FluxMessageChannelSpec reactive(String id, FluxProcessor<Message<?>, Message<?>> processor) {
return reactive(processor).id(id);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.reactivestreams.Publisher;

import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.channel.ReactiveSubscribableChannel;
import org.springframework.integration.channel.FluxSubscribableChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.routingslip.RoutingSlipRouteStrategy;
Expand Down Expand Up @@ -190,7 +190,7 @@ else if (reply instanceof AbstractIntegrationMessageBuilder) {
}

if (this.async && (reply instanceof ListenableFuture<?> || reply instanceof Publisher<?>)) {
if (reply instanceof ListenableFuture<?> || !(getOutputChannel() instanceof ReactiveSubscribableChannel)) {
if (reply instanceof ListenableFuture<?> || !(getOutputChannel() instanceof FluxSubscribableChannel)) {
ListenableFuture<?> future;
if (reply instanceof ListenableFuture<?>) {
future = (ListenableFuture<?>) reply;
Expand Down Expand Up @@ -235,7 +235,7 @@ public void onFailure(Throwable ex) {
});
}
else {
((ReactiveSubscribableChannel) getOutputChannel())
((FluxSubscribableChannel) getOutputChannel())
.subscribeTo(Flux.from((Publisher<?>) reply)
.map(result -> createOutputMessage(result, requestHeaders)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.FluxMessageChannel;
import org.springframework.integration.channel.MessageChannelReactiveUtils;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.channel.ReactiveChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
Expand All @@ -57,10 +57,10 @@
*/
@RunWith(SpringRunner.class)
@DirtiesContext
public class ReactiveChannelTests {
public class FluxMessageChannelTests {

@Autowired
private MessageChannel reactiveChannel;
private MessageChannel fluxMessageChannel;

@Autowired
private MessageChannel queueChannel;
Expand All @@ -69,11 +69,11 @@ public class ReactiveChannelTests {
private PollableChannel errorChannel;

@Test
public void testReactiveMessageChannel() throws InterruptedException {
public void testFluxMessageChannel() throws InterruptedException {
QueueChannel replyChannel = new QueueChannel();

for (int i = 0; i < 10; i++) {
this.reactiveChannel.send(MessageBuilder.withPayload(i).setReplyChannel(replyChannel).build());
this.fluxMessageChannel.send(MessageBuilder.withPayload(i).setReplyChannel(replyChannel).build());
}

for (int i = 0; i < 9; i++) {
Expand Down Expand Up @@ -116,11 +116,11 @@ public QueueChannel errorChannel() {
}

@Bean
public MessageChannel reactiveChannel() {
return new ReactiveChannel();
public MessageChannel fluxMessageChannel() {
return new FluxMessageChannel();
}

@ServiceActivator(inputChannel = "reactiveChannel")
@ServiceActivator(inputChannel = "fluxMessageChannel")
public String handle(int payload) {
if (payload == 5) {
throw new IllegalStateException("intentional");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.channel.ReactiveChannel;
import org.springframework.integration.channel.FluxMessageChannel;
import org.springframework.integration.config.ConsumerEndpointFactoryBean;
import org.springframework.integration.endpoint.ReactiveConsumer;
import org.springframework.integration.handler.MethodInvokingMessageHandler;
Expand All @@ -67,7 +67,7 @@ public class ReactiveConsumerTests {

@Test
public void testReactiveConsumerReactiveChannel() throws InterruptedException {
ReactiveChannel testChannel = new ReactiveChannel(EmitterProcessor.create(false));
FluxMessageChannel testChannel = new FluxMessageChannel(EmitterProcessor.create(false));

List<Message<?>> result = new LinkedList<>();
CountDownLatch stopLatch = new CountDownLatch(2);
Expand Down Expand Up @@ -224,7 +224,7 @@ public void testReactiveConsumerPollableChannel() throws InterruptedException {

@Test
public void testReactiveConsumerViaConsumerEndpointFactoryBean() throws Exception {
ReactiveChannel testChannel = new ReactiveChannel();
FluxMessageChannel testChannel = new FluxMessageChannel();

List<Message<?>> result = new LinkedList<>();
CountDownLatch stopLatch = new CountDownLatch(3);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.springframework.http.HttpStatus;
import org.springframework.http.client.reactive.ClientHttpConnector;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.channel.ReactiveChannel;
import org.springframework.integration.channel.FluxMessageChannel;
import org.springframework.integration.http.HttpHeaders;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
Expand Down Expand Up @@ -63,7 +63,7 @@ public void testReactiveReturn() throws Throwable {
ReactiveHttpRequestExecutingMessageHandler reactiveHandler =
new ReactiveHttpRequestExecutingMessageHandler(destinationUri, webClient);

ReactiveChannel ackChannel = new ReactiveChannel();
FluxMessageChannel ackChannel = new FluxMessageChannel();
reactiveHandler.setOutputChannel(ackChannel);
reactiveHandler.handleMessage(MessageBuilder.withPayload("hello, world").build());

Expand Down