-
Notifications
You must be signed in to change notification settings - Fork 13
Should a TcpConnection
provide a flush
semantic?
#8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
IMO this should be handled by the |
In that case, the flush boundaries will be defined by the receiver of the content (reactive-ipc server infrastructure in this case), however, the usecase of providing a flush semantics would be for the producer to define the flush boundaries. IOW, I as an invoker of |
I think "batch writes" is a reasonable feature to give applications (the Publisher in this case) control over buffering of writes because they may be able to anticipate cases when that would be beneficial. However isn't some control over flushing necessary simply because servers have a write buffer? For example if I'm sending a STOMP message over TCP when the full frame is written I want any pending writes flushed as soon as possible, not when the buffer fills up. |
My only concern with disconnecting flush from The We have implemented a batch write in Reactor by exposing a Subscriber<T> writeBatch(); |
The write method currently has no Javadoc. Do you mean for that to be called once providing a Publisher for outgoing data? It sounds more like you intend for it to be called any number of times since you refer to managing multiple write subscribers. Wouldn't it make more sense to have a single Publisher for output? Just like there is one Publisher for input.
So if I have to write and flush 5 times I would call call What I was thinking about is in terms of flushing is that it would simply translate to a call to Netty's |
Any of the @reactive-ipc/owners are welcome to change, tweak, or completely rewrite and reorganize the current code. It's just a sketch to get discussion going. |
@jbrisbin is it so that you think the proposal is to provide a separate Observable<Void> writeAndFlush(Observable<W> msgs, Func1<W, Boolean> flushSelector); I think what you are saying is that the flush is always on
@rstoyanchev It's a good point that the underlying implementation should be able to |
In the ZMQ API you can choose to always send a IMO both ZMQ |
Part of the problem here is that Reactive Streams has no concept of "checkpointing". We can't send an interrupt down a pipeline. There's either the next chunk of data or there's a terminal complete. In order to flush a stream we have to be able to checkpoint it in some way without completing it to demarcate data boundaries. If we use Without using
|
The next chunk of data (or the next item).. there is a Buffer abstraction to represent it in the current code. Couldn't that be used not only as a container for a byte buffer but also to express additional write semantics such a flush hint? |
@rstoyanchev That was my thought. I wanted to see what others had to say before suggesting that since I seem to be playing from a different score here. |
I have two problems with this approach:
In order to explain 1 above, let us consider this simple server that writes a range of integers to every connection. Such a server can be written as: RxTcpServer.create(transport)
.startAndAwait(connection -> connection.write(Observable.<Integer>range(1, 100))); Now, let us consider a protocol in which the client requests for Now, I know that my data is static (it is a static list of 100), so I would want to write If I was to annotate RxTcpServer.create(transport)
.startAndAwait(connection -> connection.write(Observable.<Integer>range(1, 100)
.map(/*convert i to Buffer and add flush boundaries*/))); This means I would have one extra object allocation ( However, if the flushing semantics are decoupled from the source of data, we can layer it in any which way we want. In this case, it will be like: RxTcpServer.create(transport)
.startAndAwait(connection -> connection.write(Observable.<Integer>range(1, 100),
(count, integer) -> count % n)); |
I have experimented with a different way of flushing, one that provides the same capabilities and on the implementation level is very much the same. Rather than providing a https://github.com/rstoyanchev/reactive-ipc-jvm/tree/tcp-poc The end result is functionally equivalent and much cleaner.
|
@rstoyanchev can you provide an example of how this will work when the publisher is coming from a different component? May be taking the same example as I had provided in my previous comment:
In addition let us assume that the range of integers are coming from a component public interface IntegersProvider {
Publisher<Integer> provide();
} So, now the server would be: RxTcpServer.create(transport)
.startAndAwait(connection -> connection.write(integersProvider.provide())); |
Good point since a Publisher may not know how to flush or there may be reasons to use the same Publisher with different flushing strategies. If the write Subscriber exposes some polymorphic behavior (like flushing) that a Publisher doesn’t know how to use or doesn’t want to, it can be adapted. I’ve experimented with a WritePublisher that can adapt any Publisher for writing purposes with flushing. Obtaining a Publisher for writing becomes:
Note that this is only meant to illustrate an idea. The adapter can expose any number of alternative ways to flush. Instead of the simple flush count above, perhaps some flush trigger syntax (like a cron trigger), for example “5/100,2” could mean “flush after every 5th item for the first 100, every 2nd thereafter”. In general it should be very feasible to satisfy most common needs, especially those involving an item count, through a declarative option like that. For more advanced cases the WritePublisher could still be configured with some flush strategy that accepts the last written item and Flushable but maybe we don’t need that to start. |
I think it is useful to see how this approach will look like with different adapter layers vis-a-vis let us say providing a simple |
I think that's looking at it from the implementation side too much. The idea with WritePublisher is that it is provided and it can be used to adapt any Publisher for writing to the transport with flushing. It becomes a good place to split out and expose any number of flush-related convenience options at will (declarative flush trigger, simple flush count, flush callback strategy,...) something we'd be hard-pressed to do in TcpConnection for good reasons. I actually liked the result of adding WritePublisher to my own sample and also having a single method for writing on TcpConnection is important because it communicates clearly there is one way to write and that has nothing to do with flushing.
I'm assuming what can be done with RS should be possible with adapters as well but yes, need to do that in general. |
I am sure we can make it work. It is a question of how intuitive or error prone it can be. In order to explain, let me elaborate how it will look like when used with RxJava: Observable<ByteBuf> stream = Observable.interval(1, TimeUnit.SECONDS)
.map(aLong -> Unpooled.buffer()
.writeBytes(("Interval: " + aLong).getBytes()));
RxTcpServer.create(transport)
.startAndAwait(connection -> connection.write(stream.lift(subscriber -> {
final Flushable flushable = (subscriber instanceof Flushable) ? (Flushable)subscriber : null;
return new Subscriber<ByteBuf>(subscriber) {
private int count;
@Override
public void onCompleted() {
subscriber.onCompleted();
}
@Override
public void onError(Throwable e) {
subscriber.onError(e);
}
@Override
public void onNext(ByteBuf o) {
subscriber.onNext(o);
if (null != flushable && ++count % 2 == 0) {
// flush
}
}
};
}))); (The reason for using lift above is because that is how you can intercept subscription.) If we have Observable<ByteBuf> stream = Observable.interval(1, TimeUnit.SECONDS)
.map(aLong -> Unpooled.buffer()
.writeBytes(("Interval: " + aLong).getBytes()));
RxTcpServer.create(transport)
.startAndAwait(connection -> {
final AtomicInteger itemCount = new AtomicInteger();
return connection.write(stream.map(bb -> {
if (itemCount.incrementAndGet() % 2 == 0) {
connection.flush();
}
}));
}); and when we have the flush selector as proposed in #19 this will look like: RxTcpServer.create(transport)
.startAndAwait(connection -> connection.write(stream), (count, item) -> count % 2 == 0)); In the above you can see the intuitiveness of the approaches. Moving from intuitiveness to error prone nature of the design, let us say, after applying our operator (via
Is there a good reason that the alternative approaches I proposed above are not appropriate? Also, with those approaches do you think the different strategies you mentioned can not be applied without providing overloaded methods on the
Isn’t it so that we are talking about two different ways of writing, viz.,
Whether we push it one level deep by having the |
This is not at all what I meant but I do see the challenge when used with RxJava. A Publisher is a simple interface that's easy to implement to decorate another Publisher transparently. The same is not possible with an Observable so you are going down the path of applying an operator which again is not what I intended at all. I'm not saying that Observable should implement an interface. I understand Observable is about composition and it's functional by nature. However when an Observable is provided to the RxConnection write method, it will no longer be used for its composition API. It will be turned to a Publisher. In other words the write method in RxConnection is nothing but a convenience to spare you from saying toPublisher so I feel there is a way to not lose that convenience and still be able to decorate the Publisher. We'll play around and get back to you.
This is exactly what I'd like to avoid. To me the write(Publisher) method looks like first class citizen and write(Publisher, FlushSelector) like something to use when the other doesn't work. So now I start thinking how can I elegantly use the first method so I get the flush behavior I want? Perhaps I can split into two Publishers, or perhaps if I'm really cool I might think of something along the lines of There should be one way to write and flushing should be expressed as a separate concern. The fact that there is a flush in onComplete is just something that happens. Not another flushing strategy. Even something like this:
which I'll admit looks nice and short but is imperative for the simplest of cases. You're not saying when to flush but how to flush vs something like |
The example I gave was not for decorating
Not really. It just depends on how the implementation uses it. eg: I can transform the passed
It depends on whether we want to provide
Where do you think flush is getting pervasive here? Is it so that every user of an API uses all functionalities of the API? Simple example: Java
AFAIU, we are only saying when to flush, how to flush (how to flush data on the physical socket) is an implementation detail. |
I won't answer inline as we seem to be going in circles. Rather than spending any more time on this, could we go with the |
I agree with Rossen. This discussion is going nowhere and we have places to be and other thorny problems to solve. Please let's start with common ground and make forward momentum to flesh out these other issues. I suspect that as we get further into this we'll see an obvious but good solution. I think right now we're just going to have to agree to disagree on the FlushSelector (which at this point is more of a premature optimization IMO). I'd hate to see this effort miss its goals because we spent a month arguing over a feature not universal to all implementations we know will come soon (ZeroMQ and Java Chronicle to name two) and used only in specific use cases in transports that actually do support it. I think we agree more than we disagree on the other issues and we should focus on those and actually iterate some code. |
This has been discussed partially in for in issue #3 here and here
Flush semantics intend to optimize on reducing system calls to write data on to a physical connection and hence provides a way to do "batch writes" as one system call. This is a fairly established semantic across all network libraries.
This issue is to discuss whether we need flush semantics as part of a
TcpConnection
(and also for other transports)Since, the only pure RS way of writing to a connection is via a
Publisher
, if we provide flush, we will have to connect it to the items emitted by thatPublisher
, which essentially means that we should get a feedback from somewhere, upon each emitted item, whether this emission should be followed by a flush or not.One way of doing so would be like what RxNetty does (in an unreleased version) here via a
flushSelector
like:The
flushSelector
function is invoked after everyonNext
on the stream and if the selector returns true, a flush is invoked. With this we do not need an explicit flush function but the concept is available to the user.The text was updated successfully, but these errors were encountered: