-
Notifications
You must be signed in to change notification settings - Fork 13
Multiplexing TCP Writes #21
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
Quick question, why multiplexing is not something handled by the composition layer ? Wouldn't it be more easy in the end to address a single Publisher writer or even a Processor writer (that propagates write acks to its subscribers) ? |
Reason2 we have found is:
The first can be perceived as aesthetic but it also affects usability since it is surprising to people not not be able to write more than once without composing to a single Publisher. The second reason is more functionally concrete. If the second didn't exist I wouldn't argue strongly on the first. |
I think the question is also "what benefit would we get by disallowing multiple writes". If we have a reason for that, we will be able to better discuss as to which is more important, allowing or disallowing multiple writes. Control on flush per IMO, disallowing multiple writes is a big assumption and we may be impeding requirements of many protocols built on top of TCP. |
I may not follow then, a publisher can publish chunked data T then complete. Flush is orthogonal to that and actually could be mapped with other reactive stream behavior (e.g. on subscription.request) or directly in the writer method, defaulting to flush on complete. Besides write() can accept any form of Publisher and coud make use of "flushing" Publisher if detected (e.g. FlushPublisher extending Publisher). There are many more issues to consider in merging multiple writes, specifically the concurrency and the ordering. But again I might not understand, I just don't want to create a publisher per chunk write 👍 and I don't see the need for write multiplexing at IO level since the writer is anyway a Subscriber that only accepts non concurrent onNext signals. Unless we want to implement some kind of Thread Stealing / Serialized Subscriber ... It's easy enough to use combinatory operations from Rx or other RS impl like Reactor to leave that decision away to the user e.g. https://gist.github.com/smaldini/4c85515c210503b2092e#file-gistfile1-java-L4 is creating windows of things to accumulate in ByteBuf (Reactor Buffer) merged in flatMap. |
Also if multiple writers for a single connection are allowed, how is Subscription#request meant to work then, are we splitting the demand, dedup ? Assuming the writer uses Reactive Stream contract to propagate back pressure assisted by Subscriber.onSubscribe(Subscription), which is intended to have a single upstream publisher. |
I'll let @NiteshKant provide the detailed answer, but simply the answer for backpressured multiplexed writes is the same as the Putting aside other requirements for multiple That said, |
In order to explain the usecase, let me take a more concrete example: Let us assume we are creating a factorial server that multiplexes multiple factorial requests on the same connection. Each request has a If we do not provide multiple writes, the code will look like this: connection.write(connection.flatMap(req -> calculateFactorial(req))); The above is absolutely fine till we get into errors from write. So, let us assume that one of the factorial calculation caused an encoder error (overflow, malformed response, etc.). What should be the state of write after this? Should we continue or should we abort the rest of the writes? If we continue, how does the producer of the malformed response know that the response was not sent? If we abort, one malformed data aborts other independent results. If we were giving feedback of write for each stream, one could easily rectify the error (if possible) as: // I am assuming that the handler is returning a Publisher<Void> here, otherwise, this requires a subscribe here, which is another issue I will file.
connection.flatMap(req -> connection.write(calculateFactorial(req))
.onErrorResumeNext(t -> {
if (t instanceof EncoderException) {
// corrected response.
return connection.write(Observable.just(req));
} else {
return Observable.error(t);
}
}));
Re: Concurrency The assumption will be that the networking library supports concurrent writes (I do not know any mature library that does not). If not, then yes, we will have to introduce concurrency control to funnel the writes. Re: Ordering Ordering is totally governed by protocol and it is not required by all protocols. If we provide feedback on writes (return
Totally, the publisher will be per stream. In the factorial example above, it is per request.
My usecase above provides a reason for multiplexing. A channel is sequential should not mean that I can not have multiple writers. Multiple writes are mainly because of the nature of processing of data as opposed to the implementation detail of concurrency handling on the channel.
As @benjchristensen mentioned, this is the same usecase as a stream merge. The scenario is that there are multiple streams to write. Whether the streams are merged at the composition layer or at the channel, the backpressure semantics does not change. You will have to split the demand upstream as we can not predict which stream will give the next item on request. |
This is very, very useful information. Thank you for taking the time to lay these specifics out. It has helped our understanding of your goals considerably. @rstoyanchev is going to provide some specific feedback as a result of our discussion today on this and #22. I think we'll spend more time talking about #23 tomorrow. |
Indeed the examples and use cases here and in #22 have helped us understand the motivation for writing with multiple Publisher's. It makes sense to have ReactiveIPC support multiple independent output streams (multiplexing) while managing concurrency and back-pressure across those streams rather than leaving that as a problem to be solved separately. It also makes sense to allow logical streams to be split up (batching) in order to provide a natural opportunity for flushing and for composing and sequencing of I/O operations at boundaries meaningful to the layers above (e.g. an Object serialized as a sequence of data chunks). That said we have concerns still with the proposal. The main concern is that multiplexing and splitting a stream into batches are two completely different use cases but the API does not in any way recognize that. For example when multiplexing it's perfectly okay and it's expected to call Not only that but if the concurrent writing of batches is allowed, it becomes very hard for the implementation to split back-pressure requests fairly. Instead of splitting write capacity across several multiplexed logical streams, with concurrent batch writes there is no way to tell which logical stream is unfairly writing too much. The second concern is that leaving the handling of concurrency to the transport layer is problematic. We should not assume that the underlying transport allows concurrent writes (e.g. Servlet HTTP response, Java standard WebSocket, ZeroMQ, Chronicle, Kafka?) which could lead to different implementations (some using a queue for serialization, others relying on the transport) and so it shouldn't be up to the transport layer to do this. If it is exposed as a feature of ReactiveIPC it should work the same and it should be customizable the same way. On the point of merging back-pressure, reactive streams does not have such concept, so it needs to be defined first and it should be possible to customize. A simple thought might be for each Publish to be given some (fixed) write capacity or a Publisher may get more if others aren't writing as much but then also how do you manage that as more concurrent Unfortunately we don't have an alternative proposal yet. This simply summarizes the concerns we have. |
I think one of the things that was hanging me up is that the A ZeroMQ library would need to be able to express the standard patterns like REQ/REP. Expressed as Groovy this would look like: Publisher<Header> headers
Publisher<Body> body
connection.request {
Publishers.concat(headers, body)
}.subscribe { resp ->
// handle response
} In this case there are no write confirmations because the ZMQ implementation doesn't provide that information. There's also no flushing like in Netty, but the actual |
PR #19 does not suggest implementing batching via multiple writes. I am seeing batching here only for the purpose of flushing and the flush semantics were based on the
I will answer this ignoring batching, as it is not the usecase for multiple writes. With multiple writes, the problem is really a stream merging problem and honoring backpressure. If we are merging
I did not quiet understand the issue with multiple transports making sure that concurrent writes are supported. In fact, I believe it is better done at the transport layer. We can create adapters (using common serialization technique) to adapt to transports that do not support concurrent writes. With us doing it at the ReactiveIPC core irrespective of what the underlying transport does, we would be introducing an unnecessary queuing point. |
Okay forget the term batching for a moment. Here is what I meant. In the simplest case there is one Publisher (e.g. read file and write it) or for multiplexing there are multiple Publishers (e.g. HTTP/2 push reading files and writing each with a separate Publisher). Each of those is a logical sequential stream as per RS spec. #22 points to something very different where we split a single logical stream into multiple Publisher's so we can flush and compose. However we are now entering an entirely different mode of operation on the connection where concurrent writes should probably be illegal and furthermore where writing again before the previous Publisher has completed should also be illegal. In the HTTP/2 push example, I could choose to split a file into multiple Publisher's perhaps to let the browser re-prioritize which files are more important. Clearly I can't just call write in a loop:
To be clear we agree fully with the motivation for #21 and #22. It's more a question of what the API should look like. Perhaps something along the lines of providing a MultiplexWriter to the connection to make it clear that you have one or more concurrent multiplex streams, each of which can be split up further but is sequential and driven by demand:
We disagree here. We see this is a clear point of variation to be exposed by the API for extension. If the merging of multiple write streams is a core feature of ReactiveIPC then the way it distributes back-pressure is a key part of that mechanism and a keep point of interest. Yes in the simplest case you can ask for 1 item of each Publisher but you could also take into account how much each Publisher has to write before making a decision along the lines of Jon's DemandCalculator. Or the HTTP/2 push example again, you might have specific clues available about how to prioritize the write streams.
Right, we don't want to impose a queuing mechanism on the Netty transport which is perfectly capable of handling concurrent writes. However we need to recognize that some transports will need such a mechanism and provide it so that it doesn't have to be re-implemented. Also since merging, buffering and back-pressure are very closely related, as much as possible we see it as a shared mechanism that simply lets transports like Netty do concurrent writes. |
I think there is a misunderstanding about the intent of issue #22. Splitting a single logical stream into multiple // this writes A and if it succeeds then writes B
connection.write(publisherA).concatWith(write(publisherB)) // this writes A and B concurrently
connection.write(publisherA).mergeWith(connection.write(publisherB))
I think it will be a bad idea to split a single file into multiple chunks. A single source of data should be a single
As I mentioned earlier, multiple writes support is primarily driven by disjoint processing of data as opposed to achieving parallelization for write. What I read in your comment here is basically a concat usecase (
How would we know how much a
Perhaps the cause of confusion is that Jon’s proposal of a I think we should discuss write backpressure in a separate issue, I will create one. |
A good API should guide and shouldn't be open for misunderstanding. However I think it comes down to more than a question of interpretation.
The 3rd example under #22 shows two writes part of one logical stream. They happen to be sequential in this example but that may not be the case.
Ideally perhaps but not for us to say. There may be a reason to write some from a file, then flush, then do something, then write some more. Perhaps the answer is to use
We are not trying to achieve parallelization of writes. On the contrary it's the very problem we are trying to point out. It boils down to this. For multiplexing
I think the idea is that it provides hints how much a Publisher wants to write, so the name is probably a little misleading (it's more like hint of how much a Publisher wants to write) but the main point is that write back-pressure should be a point of variation. Thanks for creating #25. |
The idea behind |
Based on example under #22, I might want to send multiple protocol messages with flushing:
Or I might have a server-side component that's simply writing a stream of protocol messages and wants to flush after each message. Clearly I can't call |
I did not follow this, which part of the example above, are you referring to?
Sure, isn't that the reason why we have
You can think of |
@rstoyanchev I realize from the discussions here that issue #22 did not do justice to explaining what Example 3 in the code sample was trying to achieve. So, I have provided explanation in this comment Specifically, I think your comments mixing flush with the sample there is conflating the usecase. We can possibly discuss flush semantics in a different issue if there lies confusions. Flush is an optimization and the only connection with multiplexed writes is the ability to give control for flushing per write
The intent was not to split a single logical stream into multiple writes. All the writes (protocol request, handshake and replies) in the sample are totally disjoint, they are wired together using write composition to follow the protocol. If the intent is not clear of the sample, we can discuss that first in that issue. |
@jbrisbin apologies for replying late to your comment.
This is something that I have though about in RxNetty about the names. One argument is being pedantic on expressing that the operation is not eager and the other argument is that the fact that an operation returns a
I have not at all played with ZMQ, but if this is the case, it makes me feel that the ZMQ abstractions are incorrect and not suitable to be used in this kind of a low level library that provides insights into all networking operations. |
@NiteshKant the example is clear. I asked a what-if question that modified your example a little. I never claimed it was the exact use case you had in your mind. Suppose I want to write objectA through objectZ with flushing after each. The FlushSelector should be required only when that's the kind of strategy that fits the way you think about flushing (i.e. by counting written chunks or bytes). Overall I should be able to stick to one intuitive flushing model. I shouldn't be forced to alternate between write(Publisher) and write(Publisher, FlushSelector) within a few lines of code that do very similar things and where using FlushSelector is so much more work. |
Depends, whether ordering is important or not. If ordering is important, you can do: write(A).concatWith(B).... concatWith(Z) Of course, this means that you create a
I don't think we can say that is illegal. It totally depends on the protocol you are writing. If ordering/sequential writes are desired, then sure it is illegal but the
Guess that is the disconnect. This is the contract for the /**
* A function that is used for determining when a flush has to be invoked on the underlying channel.
*
* @param <W> Type of items emitted by the stream using this selector.
*/
interface FlushSelector<W> {
/**
* Selects whether flush should be invoked on the channel.
*
* @param count The index of this item. Starts with 1.
* @param lastWrittenItem Item which was last written before calling this selector.
*
* @return {@code true} if flush should be invoked.
*/
boolean select(long count, W lastWrittenItem);
} Based on this, the RxTcpServer.create(transport)
.intercept(log())
.startAndAwait(connection -> connection.write(connection.map(bb -> {
String msg = "Hello " + bb.toString(defaultCharset());
return Unpooled.buffer().writeBytes(msg.getBytes());
}), (count, item) -> true)); Notice that the flush selector (lambda: |
May be we should discuss flush semantics in the issue #8 that we were discussing it in some time back. |
Yep there is a clear disconnect right here! When I say publisherA I don't mean a Publisher emitting ObjectA. My expectation is that the ReactiveIPC layer is doing the minimal possible and its input and output works with byte buffers and object conversion is left as a separate concern. So when I say PublisherA I assume a Publisher emitting all the data chunks for ObjectA. I realize now your intent is to pass objects down to the Netty channel and rely on codecs? That means the application is coupled to the transport. I see a conversion mechanism independent of the transport as more useful. |
Let us discuss this first. I have created the issue #27 for it. |
I think the discussions digressed in this issue but we seem to be agreeing in general that we should support multiplexing writes on a channel. I am closing this issue, if there are any disagreements, please re-open. |
A
TcpConnection
should support multiple writes to allow multiplexing. I suggest also that flush semantics need to be supported per writer.For example:
Both A and B can
onNext
on the same TCP connection and then it depends on the protocol to order the messages if required. e.g.: When implementing an HTTP/2 kind of protocol, when there can be multiple streams on the same connection, and every stream frame is independent, one can write the items without the need of ordering across streams.This could instead be done by forcing our APIs to only allow a single
Publisher
to be given to a connection, but then we should not have awrite
method that can be called more than once.I suggest that
write(Publisher<T> p)
(or something matching this signature) exist and that it must support multiple concurrent writers/publishers so that multiplexing is embraced.I also suggest that each writer needs to be able to control flush semantics independently of other writers.
Continuing the example above, if "A" and "B" are both writing, but "A" doesn't care when it flushes but "B" does, "B" should be able to trigger a flush which will of course flush messages from both "A" and "B", but is only triggered by "B".
Or "A" and "B" could have different logic as to when they need to flush and are both permitted independently to trigger flush semantics.
One approach to this is overloading the write method to support each writer providing it's own control for flushing so if "A" does not care but "B" does it could be like this:
The text was updated successfully, but these errors were encountered: