-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Rename ReactiveChannel #2135
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Rename ReactiveChannel #2135
Conversation
- as discussed last week TODO: - should we take the channel outside of the `AbstractMessageChannel` hierarchy? - avoid blocking interceptors - we would lose channel metrics though - rename `ReactiveConsumer` ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally speaking I don't see reason to rename anything at all. Even when SF comes up with the solution on the matter, we still can live with our because of different packages. OTOH we can always deprecate our in favor of that from SF.
Right now I really don't care how the Reactive channel is implemented. My idea was to make it based on the Porcessor
only to let end-user to provide some customization anyway, like we have with the ExecutorChannel
or QueueChannel
.
Yes, I think we should ditch interceptors and to be honest we won't have to much from the metrics. The hard work in the channel is done in the processor
anyway.
So, in this part I agree: we don't need AbstractMessageChannel
here.
That's all from my side to this PR.
I'm trying now to do something a bit different if we have stepped to this subject.
Will share soon...
|
||
void subscribeTo(Publisher<Message<?>> publisher); | ||
void subscribeTo(Flux<Message<?>> publisher); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why to be restricted just only to Flux
?
What's wrong with the more general contract?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes using Publisher for input arguments and returning Flux/Mono (where one is already used internally) is a pattern we follow in Spring Framework APIs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see reason to return something because ReactiveChannel
is a Piublisher
per se already.
this(DirectProcessor.create()); | ||
} | ||
|
||
public ReactiveChannel(FluxProcessor<Message<?>, Message<?>> processor) { | ||
public FluxMessageChannel(FluxProcessor<Message<?>, Message<?>> processor) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I missed somehow discussion.
Would you mind explaining to me one more time?
So, what is wrong to request just raw Processor
and wrap()
internally if that is still required?
Thanks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What would be the point of creating a MessageChannel
for Mono<Message<?>>
- by definition, in an integration flow, we are wiring a producer and consumer together with a pipe to transfer messages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's still isn't clear.
I talk about Processor
,but you mention Mono
somehow...
Sorry, I don't follow
@@ -132,19 +132,19 @@ public ExecutorChannelSpec executor(String id, Executor executor) { | |||
} | |||
|
|||
|
|||
public ReactiveChannelSpec reactive() { | |||
public FluxMessageChannelSpec reactive() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, it isn't reactive()
anymore?
flux()
then?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed no need to worry too much about future conflicts especially if promoting anything to spring-messaging which requires a special process anyway.
That said my objection was more to the very general nature of the word "Reactive". Flux works better IMO in describing what's unique and in that sense FluxProcessor might be even better. This name could co-exist next to other "reactive" implementations within Spring Integration in the future based on a different reactive library or Reactor version and in that sense perhaps "Reactor" is the better prefix for the channel name.
Does ReactiveSubscriberChannel need to be an interface or just a method (e.g. add(Publisher<?>)
on what is currently ReactiveChannel? There are no other implementations of it at the moment and it could always be added later.
ReactiveConsumer looks like it is Reactive Streams based so the name could be something like ReactiveStreamsEndpoint. It does have some non-essential dependencies on Reactor but probably not an issue if Reactor is a required dependency for reactive support.
|
||
void subscribeTo(Publisher<Message<?>> publisher); | ||
void subscribeTo(Flux<Message<?>> publisher); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes using Publisher for input arguments and returning Flux/Mono (where one is already used internally) is a pattern we follow in Spring Framework APIs.
The problem with direct method on the class is proxying. We definitely need an interface to overcome JDK Proxy nature. Well, I even won't mind if we just make this |
OK. I have just played with something like
and it doesn't work as expected. Looks like I went wrong route in the implementation. 😢 I don't have any ideas how to be right now... |
As I see it, there are two scenarios we have discussed...
The latter is what (I believe) @rstoyanchev was considering in the stomp world (a flux for each websocket). |
OK. I can merge it as is and I can go ahead with the next iteration about Any other thoughts? |
Simple polishing for your confirmation: artembilan@9e87705.
|
I am fine with the polish; thanks.
The problem is that the calling thread is supposed to be non-blocking - giving them the ability to add arbitrary interceptors will possibly (perhaps inadvertently) block the thread, breaking the contract. |
Well, one option on that side is have wire-tap channel as reactive (Flux) as well. |
Merged as ca23176 |
TODO:
AbstractMessageChannel
hierarchy?ReactiveConsumer
?