-
Notifications
You must be signed in to change notification settings - Fork 534
Support Requesting Infinite (firehose) #62
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
I want to +1 this but with a tiny tweak: the number should be any negative number not just -1. This would be the case when using things like indexOf which might return a negative number smaller than -1. |
That change is fine with me. The spec would also need to include something stating what to do if a positive number is first seen and then a negative. I think that should be considered illegal. It's either always positive, or a single negative request. |
I would like this change. While playing with the contract, I also felt the need for this - sending an infinite size request would complicate the publisher's logic a little, but having to request |
Is there consensus on this? I want to know if I can rely on it. |
Sorry for the delayed response, a lot of things happening last week! I have multiple issues with the proposal:
Spec already supports Long.MAX_VALUE in demand, can someone provide me with a real scenario where this wouldn't suffice? (especially given that we can replenish it infinitely) |
@viktorklang I do agree that it makes the API confusing and that may be reason enough to drop the proposal. |
@reactive-streams/contributors — please chime in. |
I think the use case can be supported, in essence, with @viktorklang suggestion of Long.MAX_VALUE. Which is, IMO, clearer. But I would suggest that the wording be expanded to include what a negative demand means. i.e. a negative value could implicitly mean Long.MAX_VALUE and get a similar effect. |
@tmontgomery Thanks for the feedback! I'd love to hear your thoughts on the issues I listed earlier, if you have time! |
Yes, but what about use cases where that isn't the case? Why force overhead when it's not needed?
A temporal operator can be fully async and not care about backpressure. It is saying "send me everything you have asynchronously ... I'll handle dropping, buffering, throttling, sampling, etc on my own".
Overhead.
Here are performance numbers from tests I've done while implementing this: Simple (but slower)
@Override
public void request(int n) {
int _c = REQUESTED_UPDATER.getAndAdd(this, n);
if (_c == 0) {
while (index <= end) {
if (o.isUnsubscribed()) {
return;
}
o.onNext(index++);
if (REQUESTED_UPDATER.decrementAndGet(this) == 0) {
// we're done emitting the number requested so return
return;
}
}
o.onCompleted();
}
} Optimized (more complicated code)
@Override
public void request(int n) {
if (n < 0) {
// fast-path without backpressure
for (int i = index; i <= end; i++) {
if (o.isUnsubscribed()) {
return;
}
o.onNext(i);
}
o.onCompleted();
} else if (n > 0) {
// backpressure is requested
int _c = REQUESTED_UPDATER.getAndAdd(this, n);
if (_c == 0) {
while (true) {
/*
* This complicated logic is done to avoid touching the volatile `index` and `requested` values
* during the loop itself. If they are touched during the loop the performance is impacted significantly.
*/
int numLeft = start + (end - index);
int e = Math.min(numLeft, requested);
boolean completeOnFinish = numLeft < requested;
int stopAt = e + index;
for (int i = index; i < stopAt; i++) {
if (o.isUnsubscribed()) {
return;
}
o.onNext(i);
}
index += e;
if (completeOnFinish) {
o.onCompleted();
return;
}
if (REQUESTED_UPDATER.addAndGet(this, -e) == 0) {
// we're done emitting the number requested so return
return;
}
}
}
}
} Note how the no-backpressure version hits 209m onNext/second, optimized backpressure 174m onNext/second and simple backpressure 58m onNext/second. And this is an optimal case where only a single request for the entire size (1,000,000) occurs. The need for What should I do differently here to achieve the performance of 209m onNext/second without a "no-backpressure" option? Long.MAX_VALUE doesn't solve that, as I still need to go through the backpressure logic.
Asking for infinite anytime after a non-infinite value would be illegal.
Illegal or no-op.
Illegal or no-op. |
I like that separation of concerns. That is actually quite clarifying and should be worked in, BTW, I think the performance argument of should the value be |
Hi Ben, Thanks for the reply! Just for my understanding, does the omission of a reply to point number 1 mean that you agree or disagree?
The same argument can be used against the proposal of the feature: why incur the overhead of implementing the feature for all Publishers when it's not needed?
See my point number 2 (not my answer to your answer of point number 2)
Just to clarify my viewpoint: I am against trading correctness for performance, back-pressure is if not the main reason, one of the main ones, for doing Reactive Streams. I remain unconvinced that the 20% performance gain is worth the overhead of implementing it for all Publishers as well as removing a chance to detect errors in demand tracking (negative demand requested would be a sign of arithmetic bug instead of a request for infinite elements as per proposed)
Regarding the benchmark, my first impression is that the optimized backpressure has excellent performance, safety will never come for free, but the cost seems quite OK in order to not complicate every single Publisher that will ever be written by having them both implement back-pressure AND fast-path behavior,
Would you mind elaborating on the statement above, I'm not following the implications of it.
How will you deal with megamorphic call sites due to different implementations of Subscriber for the same Publisher? (That I fear will kill your 209m bench result)
Since infinite demand needs to be supported by both publisher and subscriber (it requires the progress of the publisher to be suspended until the progress of subscriber has finished) and it currently works with quite high performance, if I might say so, with demand tracking (correctness), it only affects a subset of a subset of situations, it sounds like something that should lie outside of the defined contract. It'll take more to convince me, and the currently proposed solution leaves a lot to be desired as per my previous argumentation. What are the thoughts of the @reactive-streams/contributors ? |
Sorry, you lost me there, what specifically do you refer to? :)
The issues is that as demand is defined, it is a signal on the max number of elements the receiver is willing to buffer—one has to, as they say, be careful not to bite off more than one can chew. |
We don't want to sacrifice clarity of purpose but performance is one of my chief concerns at all times and if I am profiling an application, it's not helpful to see the library eating up all my CPU cycles so I can't tell where the bottlenecks in my own application are. @viktorklang performance is a driving concern for everything we do in Reactor and I suppose our definition of "good enough" is quite different. On my MacBook Pro I can get JDK 8 to iterate about 200M ops/sec which is what @benjchristensen is showing above. That's the benchmark I work against. I know a single thread on my machine won't work faster than that in a tight loop. 50M events/sec may sound like "good enough" but when it's possible to do 200M/sec it's clearly a very expensive scenario and that cost should be justified by something concrete. Keep in mind that many of us see a thread boundary as just as significant as a network boundary and while others might be focusing on using Reactive Streams between hosts, we're more focused on using it between threads and needing to process over 100M events per second is not unrealistic. Whatever we can do in the spec to facilitate that (i.e. by supporting a fast path if the implementation chooses to implement it) is worth the effort spent now to work through the implications. What I don't want to see is automatically excluding a class of applications that, while numerically a minority, are actually very important kinds of applications which run systems that people depend on to work at extremely high volumes. Telco and financial apps are very demanding and if we are to use Reactive Streams in those domains then we cannot dismiss any improvement we can make that will get us closer to 200M events/sec. I don't see that a zero request is meaningful but I'm not sure it rises to the level of "illegal" in my mind. If we leave the implementor to decide whether negative values are significant or not, then requesting zero items would result in an immediate |
Hi Jon!
I think we all(?) agree that we don't want to create something that creates an unreasonable tax on performance, but I think 187m compared to 209m is pretty good? So the question is our definition of unreasonable. Considering that this is an extremely synthetic benchmark having only one flow going, considering that concurrent flows will exist in a range from a few to a few million at the same time in the same VM one has to consider the scalability tax vs the performance tax.
Again, I thought we were talking about 187m vs 209m. Also, you need to take into account the penalties of megamorphic calls, thread scheduling artifacts etc.
Please keep in mind that we are talking about the interop standard, if you know that you're connecting a Reactor Publisher to a Reactor Subscriber, you are of course free to perform any and all optimizations you deem needed.
I do hope that it does not come across as automatically excluding anything, I am merely counterarguing a proposal, which has to be expected? (If it does not pass internal scrutiny, how will it ever be able to pass external scrutiny).
Illegal as in "spec violation".
I don't think we even need to do that, as the Publisher can know in "subscribe" whether it is a Subscriber that allows fast path, it is fully possible to employ that kinds of optimizations without burdening the spec IMO (please feel encouraged to challenge me on that!). I'd love to hear your take on my arguments (the numbered list) as well. Cheers, |
I skipped it because it is subjective opinion that I generally agree with but in this case works against the higher priority of performance. I tried to keep my response focused on the objective reasons why I'm pursuing support for fast-path.
This is my perspective as well. If the Reactive Streams spec is to be more than just a bridge between systems, but actually adopted at the core of libraries, it should not impede high performance. |
On Jul 10, 2014 6:18 PM, "Ben Christensen" [email protected] wrote:
Isn't the relative importance of different characteristics also a matter of I addressed the fast-path concern in my reply to Jon, a solution that
Define high-performance, otherwise we're going to have a hard time to know
|
First off, I want to thank @viktorklang and @benjchristensen for taking the time to argue their points thoroughly. We might not agree on the particulars but it's worthwhile to kick this around a little, even at the risk of dragging out something that ultimately results in impasse. :)
In a reasonably large server with 16 threads (1 per CPU) operating at peak load, that difference is over 300M events per second. Not bad? I suppose. Pretty good? Sure. Acceptable? Not if it can be made better.
If we can manage it so that we don't impact performance at all, that would be even better, no? :) When you are as demanding as we have to be to tease out every last bit of performance as humanly possible, "good enough" doesn't cut it.
Maybe I'm wrong, but I think this actually argues in favor of "less is more" and that the relatively involved machinery needed to achieve the "optimized" version of bounded publication would be overall less scalable than the version that has none of that. At any rate, the validity of the benchmark is in its usefulness in comparing one thing to another not in its reflection of a "real world" scenario (whatever those are!). The real application will never achieve the throughput rate of a tight loop JMH benchmark, true. But the point is that there really is a difference between the optimal fast path and the marginally optimal fast path and even if the difference is not as pronounced in a real application, even if the throughput is say only 1/4th that of the benchmark, that's still 80M events per second difference. That's not insignificant if each of those events represents a real world action (a Trade, a dropped call, a change notification that informs an AI algorithm, a sensor reading, etc...).
Certainly it would be possible to do that, but if there's a way to communicate this intent directly without an I think it's worth noting that although Reactive Streams was initially targeted at interop, it naturally lends itself to use within the library and is widely applicable (a good thing!). But that means a much more demanding environment than if this was simply a way for one library to talk to another; it's really more than that and these specifications help applications work internally on a small scale with the same efficiencies as those that operate on a larger scale. There's no need to impede that if it's possible to allow that to happen. Akka Streams does not need to concern itself at all with this stuff if it doesn't want to. As long as the spec leaves the door open for this behavior, then it's free to take path "A" which might be "ignore values < 1 as meaningless". FWIW I don't think IMO there's only two possible decisions: "> 0" means give me that amount, "< 1" (0, -1, -100, whatever...makes no difference) means give me whatever, I don't care how many. The spec would simply let the implementation decide whether values < 1 were "infinity", "meaningless" as in no-op and immediate It's impossible for the spec to know, after all, whether infinity is "incorrect" or not. Only the implementation will know that. |
On Thu, Jul 10, 2014 at 7:17 PM, Jon Brisbin [email protected]
Exactly, this is how things should be done :)
My concern is also the laziness of developers, if it is much "easier" to
The "only" issue with doing the instanceof check is that you couldn't
Cheers, |
Dear @reactive-streams/contributors, So having thought about it some more, here's a proposal that would solve most, if not all, of my concerns while still offering the backpressure escape hatch for maximum performance under restricted situations. It exhibits the following properties:
public interface Subscription {
Semantics: Feedback? |
If adding a "request unbounded" mechanism to the interop API really does have a positive cost/benefit ratio (which I'd doubt for largely the same reasons @viktorklang listed) then I would also prefer moving it into a separate method as suggested by Viktor. |
I think I would prefer As to when is "enough" performance improvement enough? Not to be trite, but never. ;) Even if those cases aren't the everyday ones and yes a minority of people need that, etc... I hear this all the time of course. :) But the people that need that kind of extreme performance are frankly rather obsessive about it because the amount of data passing through the system becomes a real burden to the architecture and you start to see where your weak points are. There's usually a hard limit to what you can improve, of course. But in most cases the limit is much less hard than we think but we only know that because we hammer on it. |
On Fri, Jul 11, 2014 at 4:24 PM, Jon Brisbin [email protected]
Cheers, |
If we're trying to discourage its use then maybe we should call it or similar. ;) If you provide a public method, people are going to see it in their IDEs and they're going to call it. But it's going to get really noisy in the code if the method name is intentionally long-winded to discourage its use. When we're writing some very sensitive code that has to use this because of the amount of recursive dispatching we do and without this we get some serious trampolining issues, I don't want to have to be penalized for being in that situation by having to dirty my code with references to a method I'm not supposed to call except under dire circumstances. I'm not calling it because I'm lazy and don't want to use @viktorklang As to the cost of polymorphism: there's some great blogs around about that topic [1] [2]. People actually do deal with that issue and some people actually do try to work around those issues to get better performance. You were trying to be hyperbolic to make I point, I know, but it's actually something people worry about in these circles. You can't win every performance battle, like I alluded to. Some things you have to leave alone. But the number of those things is actually a lot less than what it sometimes seems. Am I personally content with the cost of us using interfaces and the performance "penalty" associated? Honestly, not terribly. But that cost is understandable to get at the larger issue of helping with the bounded cases which are going to predominate (not to mention the fact that we have to pay that penalty ourselves...again in the interest of a larger goal). I'm not going to plant a flag on that issue because it has more fundamental costs associate with it that don't suit the other use cases. But am I personally concerned about that stuff? You bet. But this issue seems like low-hanging fruit to me and the cost to the spec to support this use case seems very negligible. What I want to make sure we keep in mind is that there are people using this spec (Reactor being a key user, of course) that need ridiculously high performance because of the kinds of apps we're targeting and that it's going to help the spec be better to have included this class of apps as a first-class citizen. Those cases are a numeric minority for sure, but they pay handsomely for the focus on performance, so I'm quite concerned with it if our customers are. ;) [1] - http://www.insightfullogic.com/blog/2014/may/12/fast-and-megamorphic-what-influences-method-invoca/ |
+1 for |
@viktorklang Adding a new method rather than conditional logic within However, I'd rather just name it Both the bounded and unbounded use cases have valid place, in the same library for different operators and uses. We should not put a negative connotation on one of them. public interface Subscription {
public void request(int n);
public void request()/requestAll()/requestUnbounded();
public void cancel();
} Take the use cases at the beginning of this discussion as examples. Implementing an operator with them should not require someone feeling they are doing something "unsafe" just because it makes sense to request an unbounded stream. Besides that, I could be lazy and call
I disagree that it is so small a use case. I started this discussion because it occurs often. This API works very well for both asynchronous and synchronous boundaries (a very good thing), and many use cases are just fine with a firehose of data (even when async), and several require it (anything with temporal requirements such as sampling, windowing, debouncing, etc or when multicasting a hot stream). So, I'm +1 for your proposal to add a separate method, but -1 on a name involving |
On Fri, Jul 11, 2014 at 4:59 PM, Jon Brisbin [email protected]
Absolutely, and if it has a clear name that signals that they are doing
You are of course at liberty to provide a final static utility method named
Could you show me an example of these serious trampolining issues?
That'd be extremely welcome!
Absolutely, and my point is that if you need to draw a line somewhere,
I sympathize with your performance concerns, a lot of time has been
I sincerely hope my behavior in this discussion shows that I take the
Yes, so why the concern over the name? My initial thought was to call it
Cheers, |
Since most of the discussion on this issue is focused on performance (which means throughput in this case) and there was an abundance of words and a lack of actual measurements, I have performed some. You can find the logs and code here. What the benchmark does is to produce elements as quickly as possible to a Subscriber, dropping them whenever there is momentarily no demand. The Subscriber transfers the elements to another thread that busy-spins emptying a bounded queue (high-performance single producer single consumer implementation). Elements are dropped if the queue is momentarily full. The benchmark is rather unrealistic in a fashion that favors low-latency mechanisms since no allocations and no processing of the elements are done, any differences found will be an upper bound on what can be expected in a real-world use-case. The benchmark was performed on a current MBP 2.3GHz quad-core machine with JDK 1.8.0-b132, for the JMH settings see the source code. The first set of results is given for a bounded request scheme where
There are two types of throughput to measure: the The second set of results pertains to the case of an unbounded request scheme, implemented here using
We can see here that the throughput for the Publisher is the same within the error bars, but the actual stream throughput—i.e. how much arrives at the Subscriber—is lower, the losses are higher. The size of the queue does not play a role beyond 1000 elements, and bigger queues do not make sense in a larger system anyway due to the associated space overhead, hence I am convinced that we are talking about an effect that is negligible at best. The consequences in terms of inter-operability between different Publishers and Subscribers outweigh the benefits of this proposal by far.
Besides the performance point, I strongly object to this proposal due to the second use-case that is mentioned in the original description above. In the case of a synchronous Subscriber it is highly problematic to make an unbounded request, since then the whole chain of processing steps that follows will need to use the very same scheme, bounded asynchronous streaming is impossible without message losses after this point. In other words, an unbounded request scheme does not compose well if used in the context of synchronous flow-control. |
I posted performance numbers done using JMH several days ago: #62 (comment)
This example is not the type where the performance gains are shown, as per your benchmarks. I have already specified the use cases at the start of this conversation where an infinite request are preferred, and an unbounded queue is not one of them (that is what we're all trying to avoid).
I don't see how it breaks composition. If I request Long.MAX_VALUE and then don't comply with the spec when my child requests less, then sure, I break composition, but that's because I am not complying with the spec. For example, I can have an operator stuck in the middle of a chain that requests Long.MAX_VALUE then ignores requests from it's children and firehoses them, all without any changes ... just by breaking the spec. But that has nothing to do with infinite requests, that's just that the operator is breaking the spec and ignoring requests from it's children. The issue of composition is compliance to the requests it receives, even if it requests upwards an infinite stream, which is exactly what a temporal operator can do (or Long.MAX_VALUE). It can request infinite up and then support backpressure requests down. Thus, I don't see how this discussion affects composition, it is only about performance optimizations. |
To try and bring this to conclusion, I see the following paths:
Since items 2 and 3 are contentious and do make things cluttered, I am okay if we do (1) and leave things as they are but would prefer (4) adding some kind of wording about optionally using Long.MAX_VALUE as a fast path. I think that practically speaking it is okay for us to fast path As mentioned earlier, even if emitting 10 So, even if our systems improve dramatically and are doing 10 Considering this, can we agree with allowing implementations to fast path The benefit of this is that it is completely optional to provide a fast path, and code that doesn't will just work, even if receiving Long.MAX_VALUE. I can then write code like this: @Override
public void request(long n) {
if (n == Long.MAX_VALUE) {
// fast-path without backpressure
for (long i = index; i <= end; i++) {
if (o.isUnsubscribed()) {
return;
}
o.onNext((int) i);
}
o.onCompleted();
} else if (n > 0) {
// backpressure is requested
long _c = REQUESTED_UPDATER.getAndAdd(this, n);
if (_c == 0) {
while (true) {
/*
* This complicated logic is done to avoid touching the volatile `index` and `requested` values
* during the loop itself. If they are touched during the loop the performance is impacted significantly.
*/
long r = requested;
long idx = index;
long numLeft = end - idx + 1;
long e = Math.min(numLeft, r);
boolean completeOnFinish = numLeft <= r;
long stopAt = e + idx;
for (long i = idx; i < stopAt; i++) {
if (o.isUnsubscribed()) {
return;
}
o.onNext((int) i);
}
index = stopAt;
if (completeOnFinish) {
o.onCompleted();
return;
}
if (REQUESTED_UPDATER.addAndGet(this, -e) == 0) {
// we're done emitting the number requested so return
return;
}
}
}
}
}
} or like this: @Override
public void request(long n) {
long _c = REQUESTED_UPDATER.getAndAdd(this, n);
if (_c == 0) {
while (true) {
/*
* This complicated logic is done to avoid touching the volatile `index` and `requested` values
* during the loop itself. If they are touched during the loop the performance is impacted significantly.
*/
long r = requested;
long idx = index;
long numLeft = end - idx + 1;
long e = Math.min(numLeft, r);
boolean completeOnFinish = numLeft <= r;
long stopAt = e + idx;
for (long i = idx; i < stopAt; i++) {
if (o.isUnsubscribed()) {
return;
}
o.onNext((int) i);
}
index = stopAt;
if (completeOnFinish) {
o.onCompleted();
return;
}
if (REQUESTED_UPDATER.addAndGet(this, -e) == 0) {
// we're done emitting the number requested so return
return;
}
}
}
} and both work, but the first is optimized. The type of wording could be like this:
|
[snip]
Yes, we can agree on such wording. But I remain unconvinced that any real use-case will ever benefit from it; emitting a contiguous sequence of integers is not representative for any high-volume workload I can imagine and I have demonstrated that temporal operators also do not benefit measurably from unbounded requests. Beware that requesting Long.MAX_VALUE will not be special in any other way, meaning that a conforming Subscription will terminate with onError if there was previously outstanding demand (as per 3.17). |
Yes, that's as I would expect. It is still a "reactive stream" and must fit within the spec. It is just saying that it is requesting Long.MAX_VALUE which can be optimized to mean "unbounded". Thanks @rkuhn, glad we can agree on this. |
Apologies to @rkuhn for my earlier comment about the code example using unbounded queues, it does not. I misread the code. It uses bounded queues. However, the point I was trying to make was that in the use cases I'm exploring queueing is not typically going to occur (bounded or unbounded) when |
So the proposal is to change request(n) to take a long and amend the spec
|
Is this what we mean? |
@viktorklang after thinking about it, I do agree with most of your concerns and arguments against this feature, however I don't agree with this one:
I don't see that as a problem and also there are cases in which the developer has no control over the data-source, in those cases in which the data-source has no means for implementing back-pressure - like say a client that exposes an interface based on listeners with no way to signal demand and I have encountered such instances many times while working on real-world apps. In such instances, the library itself needs to deal with it - by throwing a buffer overflow exception, by dropping events on the floor, by doing nothing in which case the process may indeed fail with an OOME, etc... what it does by default is up to the library implementation, tunable by the user. |
On Mon, Jul 14, 2014 at 2:35 PM, Alexandru Nedelcu <[email protected]
Hi Alexandru,
Did you see #82 ? —
Cheers, |
@viktorklang I submitted the comment too early by mistake :) My point is that you can't really protect the user from OOMEs - being the responsibility of the library to make it easy/intuitive enough for the user to deal with it and there are really a lot of cases in which the producer is definitely slower than the consumer(s), making it a non-issue. The laziness of developers shouldn't be in my opinion a concern here, after all people can always request So I agree with your points. I also agree with the concerns on performance. I personally prefer clarity of the API over performance issues - but I think you folks will be able to make good decisions on this issue. Just saying that wanting to stop people from willingly shooting themselves in the foot is a slightly different concern than wanting to make it easy for people to do the right thing, from an API perspective. |
Hi @alexandru,
You can make an effort to make it easier to do the right thing than the wrong thing, don't you agree? There's no way to prevent programmers to create bugs, so why have type systems? — That's false logic.
Au contraire! At the many points is the producer slower than the consumer, so your bugs will show up in production rather than in the compiler or tests, which creates real damage (downtime, delays). Also, one mans producer is another mans consumer, making the important distinction that back pressure needs to work across the entire chain.
What constitutes breakage is exactly what we're tasked to solve. But just because a developer can willfully destroy the entire codebase/loiter the codebase with System.exit(-1)/use floats for monetary amounts/whathaveyou—malicious issues, it doesn't mean that we shouldn't make it unambiguous/straight forward to know what to use—to avoid inadvertent issues. Because frankly, when something does not work as expected, one has to reasonably easy be able to know what one did wrong—corrective maintenance is seldomly a value enhancer.
No they can't, because it's yet to be legal to request Long.MAX_VALUE (currently an Int)—and as Roland's experiment shows, for any push across an asynchronous boundary, a batch value of > 1000 yields 0-to-no performance gains, meaning that the cap perhaps should be closer to It seems however, that we can reach consensus and move forward with widening to |
Yes. |
Fine with me. |
As I mentioned, I'm fine with using Long.MAX_VALUE (or 2^63 - 1) to effectively give unbounded behavior. What I take from the discussion is some observations based on implementations today. I think this might be somewhat short sighted. To do some math.... what happens if you use I think we can learn from TCPs flow control is not to making the flow control (aka request) window too small. It is not unrealistic to envision a long RTT between sender and receiver (let's keep it terrestrial and say 100 ms), which implies a large request value to keep the pipe full. Let's say 100 byte objects. At 200M ops/sec, that is roughly 1.9GB per RTT. And the RTT implies roughly a window of 20M objects. Clearly, a |
Good point, Todd. Viktor’s implementation in #82 is very slightly different from the code samples discussed previously in that it focuses on outstanding demand instead of making a certain request value “magic”, which I think is more consistent and allows all the same usages. Thanks, Ben, for raising this issue! |
Migrating to this after discussions at reactive-streams/reactive-streams-jvm#62
Solved |
Refs reactive-streams#62, which changed Subscription API in 718c4d4
I suggest modifying the spec to allow signaling desire for unrestricted emission of data.
This was brought up at https://github.com/reactive-streams/reactive-streams/pull/61/files#r13451851
Currently the spec permits only positive numbers passed to
request(n)
. I suggest accepting a sentinel such asrequest(-1)
to mean "send as much as you want as fast as you want".The two use cases I'm aware of where this would be valuable are:
1) Time Based Processors
If a
Processor
is doing sampling, throttling, windowing, debouncing or other such time based work then callingrequest(n)
doesn't make sense. It wants to receive everything without any artificial delays.2) Synchronous Processing
If my
Subscriber
is processing synchronously (natural backpressure, not blocking threads, just CPU-bound) then there is no need for therequest(n)
back-and-forth and overhead of tracking this counter.In both of the above cases the current spec requires doing something like
request(Integer.MAX_VALUE)
and decrementing on everyonNext
and invokingrequest
again occasionally. This adds unnecessary overhead. It also means that thePublisher
/Subscription
has no hint that it could optimize and just firehose the data. This affects the implementation and aPublisher
can have a "fast-path" for cases whererequest(-1)
is invoked that doesn't involve queuing, pausing or bookkeeping at all.The text was updated successfully, but these errors were encountered: