Skip to content

Rename ReactiveChannel #2135

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed

Conversation

garyrussell
Copy link
Contributor

  • as discussed last week

TODO:

  • should we take the channel outside of the AbstractMessageChannel hierarchy?
    • avoid blocking interceptors
    • we would lose channel metrics though
  • rename ReactiveConsumer ?

- as discussed last week

TODO:
- should we take the channel outside of the `AbstractMessageChannel` hierarchy?
  - avoid blocking interceptors
  - we would lose channel metrics though
- rename `ReactiveConsumer` ?
Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally speaking I don't see reason to rename anything at all. Even when SF comes up with the solution on the matter, we still can live with our because of different packages. OTOH we can always deprecate our in favor of that from SF.

Right now I really don't care how the Reactive channel is implemented. My idea was to make it based on the Porcessor only to let end-user to provide some customization anyway, like we have with the ExecutorChannel or QueueChannel.

Yes, I think we should ditch interceptors and to be honest we won't have to much from the metrics. The hard work in the channel is done in the processor anyway.

So, in this part I agree: we don't need AbstractMessageChannel here.

That's all from my side to this PR.

I'm trying now to do something a bit different if we have stepped to this subject.
Will share soon...


void subscribeTo(Publisher<Message<?>> publisher);
void subscribeTo(Flux<Message<?>> publisher);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why to be restricted just only to Flux?
What's wrong with the more general contract?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes using Publisher for input arguments and returning Flux/Mono (where one is already used internally) is a pattern we follow in Spring Framework APIs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see reason to return something because ReactiveChannel is a Piublisher per se already.

this(DirectProcessor.create());
}

public ReactiveChannel(FluxProcessor<Message<?>, Message<?>> processor) {
public FluxMessageChannel(FluxProcessor<Message<?>, Message<?>> processor) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I missed somehow discussion.
Would you mind explaining to me one more time?
So, what is wrong to request just raw Processor and wrap() internally if that is still required?
Thanks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would be the point of creating a MessageChannel for Mono<Message<?>> - by definition, in an integration flow, we are wiring a producer and consumer together with a pipe to transfer messages.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's still isn't clear.
I talk about Processor ,but you mention Mono somehow...
Sorry, I don't follow

@@ -132,19 +132,19 @@ public ExecutorChannelSpec executor(String id, Executor executor) {
}


public ReactiveChannelSpec reactive() {
public FluxMessageChannelSpec reactive() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, it isn't reactive() anymore?
flux() then?

Copy link

@rstoyanchev rstoyanchev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed no need to worry too much about future conflicts especially if promoting anything to spring-messaging which requires a special process anyway.

That said my objection was more to the very general nature of the word "Reactive". Flux works better IMO in describing what's unique and in that sense FluxProcessor might be even better. This name could co-exist next to other "reactive" implementations within Spring Integration in the future based on a different reactive library or Reactor version and in that sense perhaps "Reactor" is the better prefix for the channel name.

Does ReactiveSubscriberChannel need to be an interface or just a method (e.g. add(Publisher<?>) on what is currently ReactiveChannel? There are no other implementations of it at the moment and it could always be added later.

ReactiveConsumer looks like it is Reactive Streams based so the name could be something like ReactiveStreamsEndpoint. It does have some non-essential dependencies on Reactor but probably not an issue if Reactor is a required dependency for reactive support.


void subscribeTo(Publisher<Message<?>> publisher);
void subscribeTo(Flux<Message<?>> publisher);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes using Publisher for input arguments and returning Flux/Mono (where one is already used internally) is a pattern we follow in Spring Framework APIs.

@artembilan
Copy link
Member

Does ReactiveSubscriberChannel need to be an interface or just a method (e.g. add(Publisher<?>) on what is currently ReactiveChannel?

The problem with direct method on the class is proxying. We definitely need an interface to overcome JDK Proxy nature.

Well, I even won't mind if we just make this ReactiveChannel as Subscriber and feed it into those external Publsihers instead.
How does that sound?

@artembilan
Copy link
Member

OK. I have just played with something like

@Override
public boolean send(Message<?> message) {
	Mono.just(message)
			.subscribe(this.processor);
	return true;
}

and it doesn't work as expected.
Just after consuming the value from one such a Mono, the processor is terminated.
Any other sources aren't reachable any more.

Looks like I went wrong route in the implementation. 😢

I don't have any ideas how to be right now...

@garyrussell
Copy link
Contributor Author

As I see it, there are two scenarios we have discussed...

  • A channel that uses a FluxProcessor<Message<?>> to use SI concepts to establish a connection between a message emitter and consumer.
  • Using a Message<Flux<?>> over any type of channel enabling the receiver of the message to subcsribe.

The latter is what (I believe) @rstoyanchev was considering in the stomp world (a flux for each websocket).
The former is what (I thought) you were trying to achieve.

@artembilan
Copy link
Member

OK.
After several hours of debating inside my head with myself and hitting the wall with any attempts do not complete an internal processor I think I'm fine with such a renaming.

I can merge it as is and I can go ahead with the next iteration about ReactiveStreamsConsumer renaming, some polishing for the reactive() methods.
And the main next idea for the investigation is really about making FluxMessageChannel (maybe really just FluxChannel BTW ?) as direct Subscriber implementation leaving an existing sink for processor bridging.

Any other thoughts?

@artembilan
Copy link
Member

Simple polishing for your confirmation: artembilan@9e87705.

  • I didn't rename ReactiveConsumer yet. Let's do that in the next iteration! I won't mind for agreed ReactiveStreamsConsumer
  • I didn't remove AbstractMessageChannel extension from the FluxMessageChannel. I don't see how ChannelInterceptors support there may be harmful...
    I didn't ditch yet FluxSubscribableChannel interface in favor of direct Subscriber impl. Let it be the next iteration!

@garyrussell
Copy link
Contributor Author

I am fine with the polish; thanks.

I don't see how ChannelInterceptors support there may be harmful.

wiretap->channel->someBlockingEndpoint

The problem is that the calling thread is supposed to be non-blocking - giving them the ability to add arbitrary interceptors will possibly (perhaps inadvertently) block the thread, breaking the contract.

@artembilan
Copy link
Member

Well, one option on that side is have wire-tap channel as reactive (Flux) as well.
Another option is like what I suggest in the #2031, when we can provide properties to configure .publishOn()/subscribeOn() to shift blocking code to the separate Thread, as recommended by Reactive team. If I understand that correctly. of course...

@artembilan
Copy link
Member

Merged as ca23176

@artembilan artembilan closed this May 1, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants