-
Notifications
You must be signed in to change notification settings - Fork 13
Publisher<Void> vs Promise #31
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
Given the consensus under reactive-streams/reactive-streams-jvm#268 towards a Promise that is not a Reactive Streams Publisher, it makes sense to at least define the semantics of what using |
To illustrate further, in the current Reactor sample, if read backpressure is added, no demand is generated since Reactor operators do not generate demand by themselves unless the sample is written as follows in which case TcpServer<ByteBuf, ByteBuf> transport = Netty4TcpServer.<ByteBuf, ByteBuf>create(0);
ReactorTcpServer.create(transport)
.start(connection -> {
connection.flatMap(inByteBuf -> {
String text = "Hello " + inByteBuf.toString(Charset.defaultCharset());
ByteBuf outByteBuf = Unpooled.buffer().writeBytes(text.getBytes());
return connection.writeWith(Streams.just(outByteBuf));
}).consume();
return Streams.never();
});
The assumption is that no demand is requested on the `Publisher<Void>` returned from the handler. |
I think I see what is causing the confusion here. Let me start with the Tcp Echo example that is present for RxJava as well as reactor. The code in the example really is wrong, ideally, one should write the echo server as: (RxJava example) RxTcpServer.create(transport)
.startAndAwait(connection -> connection.write(connection.map(bb -> {
String msgStr = "Hello " + bb.toString(defaultCharset());
return Unpooled.buffer().writeBytes(msgStr.getBytes());
}))); with an additional construct on the write to instruct a flush on each write, something like: RxTcpServer.create(transport)
.startAndAwait(connection -> connection.writeAndFlushOnEach(connection.map(bb -> {
String msgStr = "Hello " + bb.toString(defaultCharset());
return Unpooled.buffer().writeBytes(msgStr.getBytes());
}))); However, we could not agree on the flush semantics in our discussions in issue #8 and hence I did not add the method to flush on each write (or similar). When writing the example like this and returning the result of writes from the connection handler, the demand is generated by the writer on the channel which drives the demand on the read. The Coming to the In the reactive streams issue linked here, I see it more towards introducing a 0 or 1 item |
This exposes something we hadn't noticed before in TcpConnectionImpl: @Override
public Publisher<Void> write(final Publisher<? extends W> data) {
return new Publisher<Void>() {
@Override
public void subscribe(Subscriber<? super Void> s) {
nettyChannel.write(data).addListener(new FutureToSubscriberBridge(s));
}
};
} So if you subscribed twice, the write would be performed twice? Can you elaborate on the intent here? In terms of the example, I see your point about how else it might have been written in which case demand is generated by the write side. That's perfectly valid but it's also possible for a TcpHandler to return Publisher that's not connected to a write. For example noticing "quit" as input and completing rather than echoing: ReactorTcpServer.create(transport)
.start(connection -> {
Promise<Void> promise = Promises.prepare();
connection.flatMap(inByteBuf -> {
String input = inByteBuf.toString(Charset.defaultCharset()).trim();
if ("quit".equalsIgnoreCase(input)) {
promise.onComplete();
return promise;
} else {
String text = "Hello " + inByteBuf.toString(Charset.defaultCharset());
ByteBuf outByteBuf = Unpooled.buffer().writeBytes(text.getBytes());
return connection.writeWith(Streams.just(outByteBuf));
}
}).consume();
return promise;
}); I'm not sure why I got the opposite impression of where the Reactive Streams discussion of Promise was headed. Ben's last comment was he doen't like the "is a" relationship of a Promise to a Publisher. Then Victor commented about a (new) spec and "converters for RS Publisher" which I took to mean that such a Promise would not be a RS Publisher. Regardless, I think the question around what the use |
Yep, the write will be performed as many times, the returned Coming to why Are there use cases, where replaying the write is required? Yes, I have scenarios where I use this nature of RxNetty writes, eg: for a TCP connection that sends scheduled heartbeats till the connection is alive. For such cases, you can create and keep this lazy write
Sure, we should discuss this. At this point, it appears to me that what we want is more of a disambiguation that the subscriber of the returned entity ( |
After some discussion we generally agree with this (for now at least) and yes it needs to be documented on TcpConnection and the TcpHandler. That said are there some differences between the two? If the Publisher from the write method acts as a "cold publisher" what about the Publisher from TcpHandler? That seems more like a "hot publisher" that may not triggered by the subscribe, nor repeated for that matter. I can't imagine why replaying the same handler logic, on the same connection, would make sense and in any case the connection should be closed on complete. It is true there should only ever be 1 subscriber (the implementation) but the semantics are worth clarifying in any case. Looking at you example here, a subscribe is necessary to trigger the write. At the same time, Publisher returned from a handler may only be used as a signal to end processing from the server side as I showed with the "quit" string above in which case handling begins even before the handle method returns. In other words it won't be triggered by a subscribe on the returned Publisher and it won't be repeated either. Does that make sense or are we saying a handler should not start processing until there is a subscriber? Regarding the Publisher returned from a write, we think it's a bit surprising that each time a Subscriber is added the write is repeated. We would prefer a more explicit call to write again if you want to repeat the write. |
We should discuss the appropriateness of using a
Publisher<Void>
rather than an actualPromise
. This is related to reactive-streams/reactive-streams-jvm#268 which suggests that a standardizedPromise
type should be created. In that discussion the issue is brought up that aPromise
should not be aPublisher
directly (though it could be converted to one).We are using a
Publisher<Void>
in several places as a stand-in for a "real"Promise
but we treat it differently than we do other publishers in that we don't callonSubscribe
and we don't wait for aSubscription.request
to do work. We expect theSubscriber
passed to thatPublisher
to buffer theonComplete
event until its ready to handle it if it isn't ready immediately.There are two issues here that may or may not be separate:
Publisher
and that deals withonComplete | onError
as always terminal events with defined and well-known semantics?Promise
type under the reactive-streams umbrella, should we define what the semantics of aPublisher<Void>
are since they seem to be different than any other publishers and have to be treated differently as a result?The text was updated successfully, but these errors were encountered: