Skip to content

Support Requesting Infinite (firehose) #62

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
benjchristensen opened this issue Jun 11, 2014 · 44 comments
Closed

Support Requesting Infinite (firehose) #62

benjchristensen opened this issue Jun 11, 2014 · 44 comments

Comments

@benjchristensen
Copy link
Contributor

I suggest modifying the spec to allow signaling desire for unrestricted emission of data.

This was brought up at https://github.com/reactive-streams/reactive-streams/pull/61/files#r13451851

Currently the spec permits only positive numbers passed to request(n). I suggest accepting a sentinel such as request(-1) to mean "send as much as you want as fast as you want".

The two use cases I'm aware of where this would be valuable are:

1) Time Based Processors

If a Processor is doing sampling, throttling, windowing, debouncing or other such time based work then calling request(n) doesn't make sense. It wants to receive everything without any artificial delays.

2) Synchronous Processing

If my Subscriber is processing synchronously (natural backpressure, not blocking threads, just CPU-bound) then there is no need for the request(n) back-and-forth and overhead of tracking this counter.

In both of the above cases the current spec requires doing something like request(Integer.MAX_VALUE) and decrementing on every onNext and invoking request again occasionally. This adds unnecessary overhead. It also means that the Publisher/Subscription has no hint that it could optimize and just firehose the data. This affects the implementation and a Publisher can have a "fast-path" for cases where request(-1) is invoked that doesn't involve queuing, pausing or bookkeeping at all.

@jbrisbin
Copy link

I want to +1 this but with a tiny tweak: the number should be any negative number not just -1. This would be the case when using things like indexOf which might return a negative number smaller than -1.

@benjchristensen
Copy link
Contributor Author

That change is fine with me.

The spec would also need to include something stating what to do if a positive number is first seen and then a negative. I think that should be considered illegal. It's either always positive, or a single negative request.

@alexandru
Copy link

I would like this change. While playing with the contract, I also felt the need for this - sending an infinite size request would complicate the publisher's logic a little, but having to request Int.MaxValue in a subscriber and keeping track of how many events came in is so annoying.

@alexandru
Copy link

Is there consensus on this? I want to know if I can rely on it.

@viktorklang
Copy link
Contributor

Sorry for the delayed response, a lot of things happening last week!

I have multiple issues with the proposal:

  1. Magic numbers are bad, especially when they can easily be produced by Subscriber implementations doing poor demand tracking.
  2. The performance argument falls short since if the Publisher is always slower than the Subscriber, the added "overhead" of tracking demand won't matter
  3. It only makes sense for synchronous subscribers, which is not really a part of the goal of the project:
    "Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure on the JVM." (emphasis mine)
  4. If tracking the demand is hard for Subscriber implementors, it is super easy to add a wrapper that will keep track of that for them.
  5. Which leaves the performance argument, and to be honest, is this really a measurable issue? (remember, all async will need the demand tracking anyway)
  6. Semantics: What does it mean if I ask for 10, then infinite, then 10?
  7. Semantics: What does it mean if I ask for infinite then infinite?
  8. Semantics: Does this proposal mean that 0 is the only illegal demand?

Spec already supports Long.MAX_VALUE in demand, can someone provide me with a real scenario where this wouldn't suffice? (especially given that we can replenish it infinitely)

@alexandru
Copy link

@viktorklang I do agree that it makes the API confusing and that may be reason enough to drop the proposal.

@viktorklang
Copy link
Contributor

@reactive-streams/contributors — please chime in.

@tmontgomery
Copy link

I think the use case can be supported, in essence, with @viktorklang suggestion of Long.MAX_VALUE. Which is, IMO, clearer. But I would suggest that the wording be expanded to include what a negative demand means. i.e. a negative value could implicitly mean Long.MAX_VALUE and get a similar effect.

@viktorklang
Copy link
Contributor

@tmontgomery Thanks for the feedback! I'd love to hear your thoughts on the issues I listed earlier, if you have time!

@benjchristensen
Copy link
Contributor Author

  1. The performance argument falls short since if the Publisher is always slower than the Subscriber, the added "overhead" of tracking demand won't matter

Yes, but what about use cases where that isn't the case? Why force overhead when it's not needed?

  1. It only makes sense for synchronous subscribers, which is not really a part of the goal of the project:

A temporal operator can be fully async and not care about backpressure. It is saying "send me everything you have asynchronously ... I'll handle dropping, buffering, throttling, sampling, etc on my own".

  1. If tracking the demand is hard for Subscriber implementors, it is super easy to add a wrapper that will keep track of that for them.

Overhead.

  1. Which leaves the performance argument, and to be honest, is this really a measurable issue? (remember, all async will need the demand tracking anyway)

Here are performance numbers from tests I've done while implementing this:

Simple (but slower)

Benchmark                                             (size)   Mode   Samples        Score  Score error    Units
r.o.OperatorRangePerf.rangeWithBackpressureRequest         1  thrpt         5 25305229.669   365192.400    ops/s
r.o.OperatorRangePerf.rangeWithBackpressureRequest      1000  thrpt         5    55866.431     1740.223    ops/s
r.o.OperatorRangePerf.rangeWithBackpressureRequest   1000000  thrpt         5       58.004        3.260    ops/s
        @Override
        public void request(int n) {
            int _c = REQUESTED_UPDATER.getAndAdd(this, n);
            if (_c == 0) {
                while (index <= end) {
                    if (o.isUnsubscribed()) {
                        return;
                    }
                    o.onNext(index++);
                    if (REQUESTED_UPDATER.decrementAndGet(this) == 0) {
                        // we're done emitting the number requested so return
                        return;
                    }
                }
                o.onCompleted();
            }
        }

Optimized (more complicated code)

Benchmark                                             (size)   Mode   Samples        Score  Score error    Units
r.o.OperatorRangePerf.rangeWithBackpressureRequest         1  thrpt         5 25605731.105   840897.434    ops/s
r.o.OperatorRangePerf.rangeWithBackpressureRequest      1000  thrpt         5   169525.120     6061.041    ops/s
r.o.OperatorRangePerf.rangeWithBackpressureRequest   1000000  thrpt         5      174.192        5.519    ops/s
r.o.OperatorRangePerf.rangeWithoutBackpressure             1  thrpt         5 38173004.870  1401113.698    ops/s
r.o.OperatorRangePerf.rangeWithoutBackpressure          1000  thrpt         5   201785.698     6240.267    ops/s
r.o.OperatorRangePerf.rangeWithoutBackpressure       1000000  thrpt         5      209.788        9.367    ops/s
        @Override
        public void request(int n) {
            if (n < 0) {
                // fast-path without backpressure
                for (int i = index; i <= end; i++) {
                    if (o.isUnsubscribed()) {
                        return;
                    }
                    o.onNext(i);
                }
                o.onCompleted();
            } else if (n > 0) {
                // backpressure is requested
                int _c = REQUESTED_UPDATER.getAndAdd(this, n);
                if (_c == 0) {
                    while (true) {
                        /*
                         * This complicated logic is done to avoid touching the volatile `index` and `requested` values
                         * during the loop itself. If they are touched during the loop the performance is impacted significantly.
                         */
                        int numLeft = start + (end - index);
                        int e = Math.min(numLeft, requested);
                        boolean completeOnFinish = numLeft < requested;
                        int stopAt = e + index;
                        for (int i = index; i < stopAt; i++) {
                            if (o.isUnsubscribed()) {
                                return;
                            }
                            o.onNext(i);
                        }
                        index += e;
                        if (completeOnFinish) {
                            o.onCompleted();
                            return;
                        }
                        if (REQUESTED_UPDATER.addAndGet(this, -e) == 0) {
                            // we're done emitting the number requested so return
                            return;
                        }
                    }
                }
            }
        }

Note how the no-backpressure version hits 209m onNext/second, optimized backpressure 174m onNext/second and simple backpressure 58m onNext/second.

And this is an optimal case where only a single request for the entire size (1,000,000) occurs. The need for request to use a volatile makes it so touching it in a tight loop is expensive, hence the "optimized" version that performs far better than the simple one.

What should I do differently here to achieve the performance of 209m onNext/second without a "no-backpressure" option? Long.MAX_VALUE doesn't solve that, as I still need to go through the backpressure logic.

  1. Semantics: What does it mean if I ask for 10, then infinite, then 10?

Asking for infinite anytime after a non-infinite value would be illegal.

  1. Semantics: What does it mean if I ask for infinite then infinite?

Illegal or no-op.

  1. Semantics: Does this proposal mean that 0 is the only illegal demand?

Illegal or no-op.

@tmontgomery
Copy link

I like that separation of concerns. That is actually quite clarifying and should be worked in,

BTW, I think the performance argument of should the value be Long.MAX_VALUE is "hells nah!". But that leaves the problem of whether to prevent that value. And if so, what should be the value as a cap. I tend to favor the idea of allow large values, but recommend against them with the impact being made more explicit. I.e. recommend a value, and explain the desire to use a reasonable value is to reduce jitter, etc.

@viktorklang
Copy link
Contributor

Hi Ben,

Thanks for the reply!

Just for my understanding, does the omission of a reply to point number 1 mean that you agree or disagree?

  1. The performance argument falls short since if the Publisher is always slower than the Subscriber, the added "overhead" of tracking demand won't matter
    Yes, but what about use cases where that isn't the case? Why force overhead when it's not needed?

The same argument can be used against the proposal of the feature: why incur the overhead of implementing the feature for all Publishers when it's not needed?

  1. It only makes sense for synchronous subscribers, which is not really a part of the goal of the project:
    A temporal operator can be fully async and not care about backpressure. It is saying "send me everything you have asynchronously ... I'll handle dropping, buffering, throttling, sampling, etc on my own".

See my point number 2 (not my answer to your answer of point number 2)

  1. If tracking the demand is hard for Subscriber implementors, it is super easy to add a wrapper that will keep track of that for them.
    Overhead.

Just to clarify my viewpoint: I am against trading correctness for performance, back-pressure is if not the main reason, one of the main ones, for doing Reactive Streams. I remain unconvinced that the 20% performance gain is worth the overhead of implementing it for all Publishers as well as removing a chance to detect errors in demand tracking (negative demand requested would be a sign of arithmetic bug instead of a request for infinite elements as per proposed)

  1. Which leaves the performance argument, and to be honest, is this really a measurable issue? (remember, all async will need the demand tracking anyway)
    Here are performance numbers from tests I've done while implementing this:

Regarding the benchmark, my first impression is that the optimized backpressure has excellent performance, safety will never come for free, but the cost seems quite OK in order to not complicate every single Publisher that will ever be written by having them both implement back-pressure AND fast-path behavior,
and if REQUESTED_UPDATER is indeed an Atomic*Updater from the JDK, you are currently willing to sacrifice 10-15% performance compared to using Unsafe already.

And this is an optimal case where only a single request for the entire size (1,000,000) occurs. The need for request to use a volatile makes it so touching it in a tight loop is expensive, hence the "optimized" version that performs far better than the simple one.

Would you mind elaborating on the statement above, I'm not following the implications of it.

What should I do differently here to achieve the performance of 209m onNext/second without a "no-backpressure" option? Long.MAX_VALUE doesn't solve that, as I still need to go through the backpressure logic.

How will you deal with megamorphic call sites due to different implementations of Subscriber for the same Publisher? (That I fear will kill your 209m bench result)

  1. Semantics: What does it mean if I ask for 10, then infinite, then 10?
    Asking for infinite anytime after a non-infinite value would be illegal.

  2. Semantics: What does it mean if I ask for infinite then infinite?
    Illegal or no-op.

  3. Semantics: Does this proposal mean that 0 is the only illegal demand?
    Illegal or no-op.

Since infinite demand needs to be supported by both publisher and subscriber (it requires the progress of the publisher to be suspended until the progress of subscriber has finished) and it currently works with quite high performance, if I might say so, with demand tracking (correctness), it only affects a subset of a subset of situations, it sounds like something that should lie outside of the defined contract.

It'll take more to convince me, and the currently proposed solution leaves a lot to be desired as per my previous argumentation.

What are the thoughts of the @reactive-streams/contributors ?

@viktorklang
Copy link
Contributor

I like that separation of concerns. That is actually quite clarifying and should be worked in,

Sorry, you lost me there, what specifically do you refer to? :)

But that leaves the problem of whether to prevent that value. And if so, what should be the value as a cap. I >tend to favor the idea of allow large values, but recommend against them with the impact being made more >explicit. I.e. recommend a value, and explain the desire to use a reasonable value is to reduce jitter, etc.

The issues is that as demand is defined, it is a signal on the max number of elements the receiver is willing to buffer—one has to, as they say, be careful not to bite off more than one can chew.

@jbrisbin
Copy link

We don't want to sacrifice clarity of purpose but performance is one of my chief concerns at all times and if I am profiling an application, it's not helpful to see the library eating up all my CPU cycles so I can't tell where the bottlenecks in my own application are.

@viktorklang performance is a driving concern for everything we do in Reactor and I suppose our definition of "good enough" is quite different. On my MacBook Pro I can get JDK 8 to iterate about 200M ops/sec which is what @benjchristensen is showing above. That's the benchmark I work against. I know a single thread on my machine won't work faster than that in a tight loop. 50M events/sec may sound like "good enough" but when it's possible to do 200M/sec it's clearly a very expensive scenario and that cost should be justified by something concrete.

Keep in mind that many of us see a thread boundary as just as significant as a network boundary and while others might be focusing on using Reactive Streams between hosts, we're more focused on using it between threads and needing to process over 100M events per second is not unrealistic. Whatever we can do in the spec to facilitate that (i.e. by supporting a fast path if the implementation chooses to implement it) is worth the effort spent now to work through the implications.

What I don't want to see is automatically excluding a class of applications that, while numerically a minority, are actually very important kinds of applications which run systems that people depend on to work at extremely high volumes. Telco and financial apps are very demanding and if we are to use Reactive Streams in those domains then we cannot dismiss any improvement we can make that will get us closer to 200M events/sec.

I don't see that a zero request is meaningful but I'm not sure it rises to the level of "illegal" in my mind. If we leave the implementor to decide whether negative values are significant or not, then requesting zero items would result in an immediate onComplete() call. The implementation can decide whether it wants to treat negative values in the same way or not. That way we're not forcing either party to act one way or the other. If you use a particular implementation that has a "fast path" then the spec assumes you know what you're doing and are assuming any associated risks.

@viktorklang
Copy link
Contributor

Hi Jon!

We don't want to sacrifice clarity of purpose but performance is one of my chief concerns at all times and if I am profiling an application,
it's not helpful to see the library eating up all my CPU cycles so I can't tell where the bottlenecks in my own application are.

I think we all(?) agree that we don't want to create something that creates an unreasonable tax on performance, but I think 187m compared to 209m is pretty good? So the question is our definition of unreasonable.

Considering that this is an extremely synthetic benchmark having only one flow going, considering that concurrent flows will exist in a range from a few to a few million at the same time in the same VM one has to consider the scalability tax vs the performance tax.

@viktorklang performance is a driving concern for everything we do in Reactor and I suppose our definition of "good enough" is quite different. On my MacBook Pro I can get JDK 8 to iterate about 200M ops/sec which is what @benjchristensen is showing above. That's the benchmark I work against. I know a single thread on my machine won't work faster than that in a tight loop. 50M events/sec may sound like "good enough" but when it's possible to do 200M/sec it's clearly a very expensive scenario and that cost should be justified by something concrete.

Again, I thought we were talking about 187m vs 209m. Also, you need to take into account the penalties of megamorphic calls, thread scheduling artifacts etc.

Keep in mind that many of us see a thread boundary as just as significant as a network boundary and while others might be focusing on using Reactive Streams between hosts, we're more focused on using it between threads and needing to process over 100M events per second is not unrealistic. Whatever we can do in the spec to facilitate that (i.e. by supporting a fast path if the implementation chooses to implement it) is worth the effort spent now to work through the implications.

Please keep in mind that we are talking about the interop standard, if you know that you're connecting a Reactor Publisher to a Reactor Subscriber, you are of course free to perform any and all optimizations you deem needed.

What I don't want to see is automatically excluding a class of applications that, while numerically a minority, are actually very important kinds of applications which run systems that people depend on to work at extremely high volumes. Telco and financial apps are very demanding and if we are to use Reactive Streams in those domains then we cannot dismiss any improvement we can make that will get us closer to 200M events/sec.

I do hope that it does not come across as automatically excluding anything, I am merely counterarguing a proposal, which has to be expected? (If it does not pass internal scrutiny, how will it ever be able to pass external scrutiny).

I don't see that a zero request is meaningful but I'm not sure it rises to the level of "illegal" in my mind.

Illegal as in "spec violation".

If we leave the implementor to decide whether negative values are significant or not, then requesting zero items would result in an immediate onComplete() call. The implementation can decide whether it wants to treat negative values in the same way or not. That way we're not forcing either party to act one way or the other. If you use a particular implementation that has a "fast path" then the spec assumes you know what you're doing and are assuming any associated risks.

I don't think we even need to do that, as the Publisher can know in "subscribe" whether it is a Subscriber that allows fast path, it is fully possible to employ that kinds of optimizations without burdening the spec IMO (please feel encouraged to challenge me on that!).

I'd love to hear your take on my arguments (the numbered list) as well.

Cheers,

@benjchristensen
Copy link
Contributor Author

does the omission of a reply to point number 1 mean that you agree or disagree?

I skipped it because it is subjective opinion that I generally agree with but in this case works against the higher priority of performance. I tried to keep my response focused on the objective reasons why I'm pursuing support for fast-path.

What I don't want to see is automatically excluding a class of applications that, while numerically a minority, are actually very important kinds of applications which run systems that people depend on to work at extremely high volumes.

This is my perspective as well. If the Reactive Streams spec is to be more than just a bridge between systems, but actually adopted at the core of libraries, it should not impede high performance.

@viktorklang
Copy link
Contributor

On Jul 10, 2014 6:18 PM, "Ben Christensen" [email protected] wrote:

does the omission of a reply to point number 1 mean that you agree or
disagree?

I skipped it because it is subjective opinion that I generally agree with
but in this case works against the higher priority of performance. I tried
to keep my response focused on the objective reasons why I'm pursuing
support for fast-path.

Isn't the relative importance of different characteristics also a matter of
subjective opinion?

I addressed the fast-path concern in my reply to Jon, a solution that
wouldn't complicate the spec nor incur any overhead for the
non-fast-path-enabled Publishers, please have a look.

What I don't want to see is automatically excluding a class of
applications that, while numerically a minority, are actually very
important kinds of applications which run systems that people depend on to
work at extremely high volumes.

This is my perspective as well. If the Reactive Streams spec is to be
more than just a bridge between systems, but actually adopted at the core
of libraries, it should not impede high performance.

Define high-performance, otherwise we're going to have a hard time to know
when things are good enough. (Again, my suggested solution wouldn't impair
perf nor burden spec)


Reply to this email directly or view it on GitHub.

@jbrisbin
Copy link

First off, I want to thank @viktorklang and @benjchristensen for taking the time to argue their points thoroughly. We might not agree on the particulars but it's worthwhile to kick this around a little, even at the risk of dragging out something that ultimately results in impasse. :)

I think we all(?) agree that we don't want to create something that creates an unreasonable tax on performance, but I think 187m compared to 209m is pretty good?

In a reasonably large server with 16 threads (1 per CPU) operating at peak load, that difference is over 300M events per second. Not bad? I suppose. Pretty good? Sure. Acceptable? Not if it can be made better.

So the question is our definition of unreasonable.

If we can manage it so that we don't impact performance at all, that would be even better, no? :)

When you are as demanding as we have to be to tease out every last bit of performance as humanly possible, "good enough" doesn't cut it.

Considering that this is an extremely synthetic benchmark having only one flow going, considering that concurrent flows will exist in a range from a few to a few million at the same time in the same VM one has to consider the scalability tax vs the performance tax.

Maybe I'm wrong, but I think this actually argues in favor of "less is more" and that the relatively involved machinery needed to achieve the "optimized" version of bounded publication would be overall less scalable than the version that has none of that.

At any rate, the validity of the benchmark is in its usefulness in comparing one thing to another not in its reflection of a "real world" scenario (whatever those are!). The real application will never achieve the throughput rate of a tight loop JMH benchmark, true. But the point is that there really is a difference between the optimal fast path and the marginally optimal fast path and even if the difference is not as pronounced in a real application, even if the throughput is say only 1/4th that of the benchmark, that's still 80M events per second difference. That's not insignificant if each of those events represents a real world action (a Trade, a dropped call, a change notification that informs an AI algorithm, a sensor reading, etc...).

Please keep in mind that we are talking about the interop standard, if you know that you're connecting a Reactor Publisher to a Reactor Subscriber, you are of course free to perform any and all optimizations you deem needed.

Certainly it would be possible to do that, but if there's a way to communicate this intent directly without an instanceof check, then that would be better IMO. Following that line of thought, interop is precisely the reason it would make sense to recognize < 1 values as infinity. Every library would speak that language without instanceof checks or some other internal mechanism that only worked with that library.

I think it's worth noting that although Reactive Streams was initially targeted at interop, it naturally lends itself to use within the library and is widely applicable (a good thing!). But that means a much more demanding environment than if this was simply a way for one library to talk to another; it's really more than that and these specifications help applications work internally on a small scale with the same efficiencies as those that operate on a larger scale. There's no need to impede that if it's possible to allow that to happen. Akka Streams does not need to concern itself at all with this stuff if it doesn't want to. As long as the spec leaves the door open for this behavior, then it's free to take path "A" which might be "ignore values < 1 as meaningless".

FWIW I don't think Long.MAX_VALUE will work because it doesn't do anything different than if the value was 100 or 10 or whatever.

IMO there's only two possible decisions: "> 0" means give me that amount, "< 1" (0, -1, -100, whatever...makes no difference) means give me whatever, I don't care how many. The spec would simply let the implementation decide whether values < 1 were "infinity", "meaningless" as in no-op and immediate onComplete(), or "meaningless" as in onError(), slap wrist, bad program. As long as it's documented, it doesn't seem necessary IMO to be more strict than that.

It's impossible for the spec to know, after all, whether infinity is "incorrect" or not. Only the implementation will know that.

@viktorklang
Copy link
Contributor

On Thu, Jul 10, 2014 at 7:17 PM, Jon Brisbin [email protected]
wrote:

First off, I want to thank @viktorklang https://github.com/viktorklang
and @benjchristensen https://github.com/benjchristensen for taking the
time to argues there points thoroughly. We might not agree on the
particulars, but it's worthwhile IMO to kick this around a little, even at
the risk of dragging out something that ultimately results in impasse. :)

Exactly, this is how things should be done :)

I think we all(?) agree that we don't want to create something that
creates an unreasonable tax on performance, but I think 187m compared to
209m is pretty good?

In a reasonably large server with 16 threads (1 per CPU) operating at peak
load, that difference is over 300M events per second. Not bad? I
suppose. Pretty good? Sure. Acceptable? Not if it can be made better.

Which is why I am leaning towards having it as an implementation concern—so
as to not burden all implementations of all Publishers.

So the question is our definition of unreasonable.

If we can manage it so that we don't impact performance at all, that
would be even better, no? :)

When you are as demanding as we have to be to tease out every last bit of
performance as humanly possible, "good enough" doesn't cut it.

But if we take that line of reasoning to the extreme: Can we afford having
Subscriber and Publishers being interfaces? The cost of megamorphic
callsites will be a high tax for when things could be monomorphic; can we
afford to use Atomic*FieldUpdaters (as per Ben's example?), can we afford
not to hand-tune Assembler or offload to the GPU? Where does it stop?

Considering that this is an extremely synthetic benchmark having only
one flow going, considering that concurrent flows will exist in a range
from a few to a few million at the same time in the same VM one has to
consider the scalability tax vs the performance tax.

Maybe I'm wrong, but I think this actually argues in favor of "less is
more" and that the relatively involved machinery needed to achieve the
"optimized" version of bounded publication would be overall less scalable
than the version that has none of that.

I'm not sure I follow, could you elaborate on what you mean?

At any rate, the validity of the benchmark is in its usefulness in
comparing one thing to another not in its reflection of a "real world"
scenario (whatever those are!). The real application will never achieve the
throughput rate of a tight loop JMH benchmark, true. But the point is that
there really is a difference between the optimal fast path and the
marginally optimal fast path and even if the difference is not as
pronounced in a real application, even if the throughput is say only 1/4th
that of the benchmark, that's still 80M events per second difference.
That's not insignificant if each of those events represents a real world
action (a Trade, a dropped call, a change notification that informs an AI
algorithm, a sensor reading, etc...).

All of these would be I/O bound though :)

Please keep in mind that we are talking about the interop standard, if
you know that you're connecting a Reactor Publisher to a Reactor
Subscriber, you are of course free to perform any and all optimizations you
deem needed.

Certainly it would be possible to do that, but if there's a way to
communicate this intent directly without an instanceof check, then that
would be better IMO.

My concern is also the laziness of developers, if it is much "easier" to
not care about back-pressure (not needing to track demand) by just calling
"gimmeh everything" we're back at square one where systems fail in
production with OOMEs (which is exactly what this collaboration was trying
to alleviate).

Following that line of thought, interop is precisely the reason it would
make sense to recognize < 1 values as infinity. Every library would speak
that language without instanceof checks or some other internal mechanism
that only worked with that library.

The "only" issue with doing the instanceof check is that you couldn't
mix-n-match the high-performance Subscribers between libraries. (Which is
not to say that it isn't a serious problem)
But it would be trading off the safety of implementations (being able to
detect bugs) for performance, which is something I'd consider not only
hacky but also quite unfair (the minority of Subscribers gaining from the
sacrifice of the majority—assuming we all agree that the
high-perf-sync-Subscriber indeed is a minority)

I think it's worth noting that although Reactive Streams was initially
targeted at interop, it naturally lends itself to use within the library
and is widely applicable (a good thing!).

Absolutely!

But that means a much more demanding environment than if this was simply a
way for one library to talk to another; it's really more than that and
these specifications help applications work internally on a small scale
with the same efficiencies as those that operate on a larger scale. There's
no need to impede that if it's possible to allow that to happen. Akka
Streams does not need to concern itself at all with this stuff if it
doesn't want to. As long as the spec leaves the door open for this
behavior, then it's free to take path "A" which might be "ignore values < 1
as meaningless".

I know, but just so I am clear, I'm not talking from the Akka Streams
perspective in this discussion, I am solely focussing on the spec and
common APIs while concerned about everyone that tries to implement it.

FWIW I don't think Long.MAX_VALUE will work because it doesn't do
anything different than if the value was 100 or 10 or whatever.

How'd you mean? Even if you could push 10 items per nanosecond it'd still
take 29.2 years to exhaust Long.MaxValue, I'm pretty sure the app would be
restarted more frequently than that, wouldn't you agree? :)

IMO there's only two possible decisions: "> 0" means give me that
amount, "< 1" (0, -1, -100, whatever...makes no difference) means give me
whatever, I don't care how many. The spec would simply let the
implementation decide whether values < 1 were "infinity", "meaningless" as
in no-op and immediate onComplete(), or "meaningless" as in onError(),
slap wrist, bad program. As long as it's documented, it doesn't seem
necessary IMO to be more strict than that.

It's impossible for the spec to know, after all, whether infinity is
"incorrect" or not. Only the implementation will know that.


Reply to this email directly or view it on GitHub
#62 (comment)
.

Cheers,

@viktorklang
Copy link
Contributor

Dear @reactive-streams/contributors,

So having thought about it some more, here's a proposal that would solve most, if not all, of my concerns while still offering the backpressure escape hatch for maximum performance under restricted situations.

It exhibits the following properties:

  • Does not use magic numbers
  • It has a distinct and deterring name—to signal that it is to be used by those who know how to deal with the consequences of using it
  • It has clear semantics (described further down)
  • Does not pollute the implementation of request(n) by needing to parry against magic value / avoids branching
  • Is obvious to the implementor that it is a behavior that needs to be considered: it is a compilation error not to implement it
  • It allows the implementor to implement it by issuing an onError(new UnsupportedOperationException("Unbounded streaming is not supported by this Publisher"))
  • While still allowing for maximum throughput for the subset of situations where it is safe to do so

public interface Subscription {
/**

  • WARNING: The use of this method may lead to OutOfMemoryErrors if not used under the following conditions:
    • The Publisher does not produce elements faster than the Subscriber can process them fully
    • TODO add other cases/elaborate
  • Signals that the Subscriber is willing and able to receive any and all elements that the Publisher can publish at a rate determined at the sole discretion of the Publisher. Calling this method more than once has the same effect as only calling it once.
    */
    • public void unsafeRequestUnbounded();
      }

Semantics:
request(n) + unsafeRequestUnbounded() == n + all you have
request(n) + unsafeRequestUnbounded() + unsafeRequestUnbounded() == n + all you have and then some
request(n) + unsafeRequestUnbounded() + unsafeRequestUnbounded() + request(n) == n + all you have + then some + n
unsafeRequestUnbounded() + request(n) == all you have + n
unsafeRequestUnbounded() + request(n) + request(n) == all you have + n + n
unsafeRequestUnbounded() + request(n) + request(n) + unsafeRequestUnbounded() == all you have + n + n and then some

Feedback?

@sirthias
Copy link

If adding a "request unbounded" mechanism to the interop API really does have a positive cost/benefit ratio (which I'd doubt for largely the same reasons @viktorklang listed) then I would also prefer moving it into a separate method as suggested by Viktor.
request(n) and requestUnbounded are two different things which require different code paths in all implementations. Rather than funneling both through the request(n) API and branching out the special case internally, separating the two cases properly does appear much cleaner.
Better API, better implementation, better perf (no more special-value check).

@jbrisbin
Copy link

I think I would prefer requestAll() since it's considerably shorter to type and produces less code noise (IMO). I think the base idea that it should have its own code path that is distinct from request(n) is fine.

As to when is "enough" performance improvement enough? Not to be trite, but never. ;) Even if those cases aren't the everyday ones and yes a minority of people need that, etc... I hear this all the time of course. :) But the people that need that kind of extreme performance are frankly rather obsessive about it because the amount of data passing through the system becomes a real burden to the architecture and you start to see where your weak points are.

There's usually a hard limit to what you can improve, of course. But in most cases the limit is much less hard than we think but we only know that because we hammer on it.

@viktorklang
Copy link
Contributor

On Fri, Jul 11, 2014 at 4:24 PM, Jon Brisbin [email protected]
wrote:

I think I would prefer requestAll() since it's considerably shorter to
type and produces less code noise (IMO).

requestAll() does not convey the implications of calling it.
As it should be used infrequently, by a subset of a subset of all
use-cases, and the person using it would have to both understand the
implications as well know what they're doing, so an explicit and
discouraging name is more than warranted.

I think the base idea that it should have its own code path that is
distinct from request(n) is fine.

As to when is "enough" performance improvement enough? Not to be trite,
but never.

But if you say is indeed your opinion, then you surely cannot accept that
we're using interfaces, right?
Are you willing to pay for megamorphic callsites? Or even the overhead of
not having everything directly inlined? (note that it is tongue in cheek,
but I'm holding you to your word)

;) Even if those cases aren't the everyday ones and yes a minority of
people need that, etc... I hear this all the time of course. :) But the
people that need that kind of extreme performance are frankly rather
obsessive about it because the amount of data passing through the system
becomes a real burden to the architecture and you start to see where your
weak points are.

There's usually a hard limit to what you can improve, of course. But in
most cases the limit is much less hard than we think but we only know that
because we hammer on it.


Reply to this email directly or view it on GitHub
#62 (comment)
.

Cheers,

@jbrisbin
Copy link

If we're trying to discourage its use then maybe we should call it requestEverythingButOnlyIfYouReallyKnowWhatYouAreDoingAndAbsolveReactiveStreamsFromAnyAndAllConsequencesOfSaidCall()

or similar. ;) If you provide a public method, people are going to see it in their IDEs and they're going to call it. But it's going to get really noisy in the code if the method name is intentionally long-winded to discourage its use. When we're writing some very sensitive code that has to use this because of the amount of recursive dispatching we do and without this we get some serious trampolining issues, I don't want to have to be penalized for being in that situation by having to dirty my code with references to a method I'm not supposed to call except under dire circumstances. I'm not calling it because I'm lazy and don't want to use request(n). I'm calling it because I have to. If you need more concrete details, @smaldini can explain why this particular method will be of real, measurable benefit to our use case in particular.

@viktorklang As to the cost of polymorphism: there's some great blogs around about that topic [1] [2]. People actually do deal with that issue and some people actually do try to work around those issues to get better performance. You were trying to be hyperbolic to make I point, I know, but it's actually something people worry about in these circles.

You can't win every performance battle, like I alluded to. Some things you have to leave alone. But the number of those things is actually a lot less than what it sometimes seems. Am I personally content with the cost of us using interfaces and the performance "penalty" associated? Honestly, not terribly. But that cost is understandable to get at the larger issue of helping with the bounded cases which are going to predominate (not to mention the fact that we have to pay that penalty ourselves...again in the interest of a larger goal). I'm not going to plant a flag on that issue because it has more fundamental costs associate with it that don't suit the other use cases. But am I personally concerned about that stuff? You bet. But this issue seems like low-hanging fruit to me and the cost to the spec to support this use case seems very negligible.

What I want to make sure we keep in mind is that there are people using this spec (Reactor being a key user, of course) that need ridiculously high performance because of the kinds of apps we're targeting and that it's going to help the spec be better to have included this class of apps as a first-class citizen.

Those cases are a numeric minority for sure, but they pay handsomely for the focus on performance, so I'm quite concerned with it if our customers are. ;)

[1] - http://www.insightfullogic.com/blog/2014/may/12/fast-and-megamorphic-what-influences-method-invoca/
[2] - http://mechanical-sympathy.blogspot.co.uk/2012/04/invoke-interface-optimisations.html

@sirthias
Copy link

+1 for unsafeRequestUnbounded

@benjchristensen
Copy link
Contributor Author

@viktorklang Adding a new method rather than conditional logic within request(n) works for me.

However, I'd rather just name it request(), requestAll(), requestUnbounded() or something like that, not with unsafe in the name.

Both the bounded and unbounded use cases have valid place, in the same library for different operators and uses. We should not put a negative connotation on one of them.

public interface Subscription {
    public void request(int n);
    public void request()/requestAll()/requestUnbounded();
    public void cancel();
}

Take the use cases at the beginning of this discussion as examples. Implementing an operator with them should not require someone feeling they are doing something "unsafe" just because it makes sense to request an unbounded stream.

Besides that, I could be lazy and call request(Long.MAX_VALUE) and run out of memory just as easily as calling requestUnbounded().

As it should be used infrequently, by a subset of a subset of all use-cases

I disagree that it is so small a use case. I started this discussion because it occurs often. This API works very well for both asynchronous and synchronous boundaries (a very good thing), and many use cases are just fine with a firehose of data (even when async), and several require it (anything with temporal requirements such as sampling, windowing, debouncing, etc or when multicasting a hot stream).

So, I'm +1 for your proposal to add a separate method, but -1 on a name involving unsafe.

@viktorklang
Copy link
Contributor

On Fri, Jul 11, 2014 at 4:59 PM, Jon Brisbin [email protected]
wrote:

If we're trying to discourage its use then maybe we should call it
requestEverythingButOnlyIfYouReallyKnowWhatYouAreDoingAndAbsolveReactiveStreamsFromAnyAndAllConsequencesOfSaidCall()

or similar. ;) If you provide a public method, people are going to see it
in their IDEs and they're going to call it.

Absolutely, and if it has a clear name that signals that they are doing
something potentially dangerous, it encourages to understand the
implications rather than an innocent looking name that inspires assumptions
rather then understanding.

But it's going to get really noisy in the code if the method name is
intentionally long-winded to discourage its use.

You are of course at liberty to provide a final static utility method named
x that takes a Subscription and calls the long-winded named method, if
you so will.

When we're writing some very sensitive code that has to use this because
of the amount of recursive dispatching we do and without this we get some
serious trampolining issues, I don't want to have to be penalized for being
in that situation by having to dirty my code with references to a method
I'm not supposed to call except under dire circumstances.

Could you show me an example of these serious trampolining issues?

I'm not calling it because I'm lazy and don't want to use request(n). I'm
calling it because I have to. If you need more concrete details, @smaldini
can explain why this particular method will be of real, measurable benefit
to our use case in particular.

That'd be extremely welcome!

@viktorklang As to the cost of polymorphism: there's some great blogs
around about that topic. People actually do deal with that issue and some
people actually do try to work around those issues to get better
performance. You were trying to be hyperbolic to make I point, I know, but
it's actually something people worry about in these circles.

Absolutely, and my point is that if you need to draw a line somewhere,
where do you draw that line?

You can't win every performance battle, like I alluded to. Some things
you have to leave alone. But the number of those things is actually a lot
less than what it sometimes seems. Am I personally content with the cost of
us using interfaces and the performance "penalty" associated? Honestly, not
terribly. But that cost is understandable to get at the larger issue of
helping with the bounded cases which are going to predominate (not to
mention the fact that we have to pay that penalty ourselves...again in the
interest of a larger goal). I'm not going to plant a flag on that issue
because it has more fundamental costs associate with it that don't suit the
other use cases. But am I personally concerned about that stuff? You bet.
But this issue seems like low-hanging fruit to me and the cost to the spec
to support this use case seems very negligible.

I sympathize with your performance concerns, a lot of time has been
invested in researching the spec so that it can be implemented without
allocations, so that consumers can resize buffers without locks etc, so I
appreciate the focus, we don't want to create something that will not
perform—but as you say, we also need to be prepared to make sacrifices,
just as the cost of megamorphic callsites due to the use of interfaces, in
order to get something that is mutually beneficial.

What I want to make sure we keep in mind is that there are people using
this spec (Reactor being a key user, of course) that need ridiculously high
performance because of the kinds of apps we're targeting and that it's
going to help the spec be better to have included this class of apps as a
first-class citizen.

I sincerely hope my behavior in this discussion shows that I take the
matter seriously—given that I have suggested more than solution to this
perceived performance issue. And I must be honest, I do find it slightly
frustrating though that while I am trying to address the concerns of
others, it seems to me that my concerns aren't being taken seriously. Keep
in mind that I am still unconvinced that the feature bears its weight.

Those cases are a numeric minority for sure, but they pay handsomely for
the focus on performance, so I'm quite concerned with it if our customers
are. ;)

Yes, so why the concern over the name? My initial thought was to call it
'requestOOME()' but given that I try to move things forward I was trying to
be balanced.


Reply to this email directly or view it on GitHub.

Cheers,

@rkuhn
Copy link
Member

rkuhn commented Jul 12, 2014

Since most of the discussion on this issue is focused on performance (which means throughput in this case) and there was an abundance of words and a lack of actual measurements, I have performed some. You can find the logs and code here.

What the benchmark does is to produce elements as quickly as possible to a Subscriber, dropping them whenever there is momentarily no demand. The Subscriber transfers the elements to another thread that busy-spins emptying a bounded queue (high-performance single producer single consumer implementation). Elements are dropped if the queue is momentarily full.

The benchmark is rather unrealistic in a fashion that favors low-latency mechanisms since no allocations and no processing of the elements are done, any differences found will be an upper bound on what can be expected in a real-world use-case. The benchmark was performed on a current MBP 2.3GHz quad-core machine with JDK 1.8.0-b132, for the JMH settings see the source code.

The first set of results is given for a bounded request scheme where batchSize elements are requested at a time and the queue has size 2 * batchSize; an initial request covering the queue size is made at onSubscribe time.

[info] # Run complete. Total time: 00:04:45
[info] 
[info] Benchmark       (batchSize)    Score  Score error    Units Losses
[info] r.Bounded.push             1  99,961       17,522    ns/op    39%
[info] r.Bounded.push            10  56,032        5,065    ns/op     6%
[info] r.Bounded.push           100  29,090        6,632    ns/op    12%
[info] r.Bounded.push          1000  26,511        4,636    ns/op    42%
[info] r.Bounded.push         10000  25,217        7,190    ns/op    43%
[info] r.Bounded.push        100000  25,514        6,651    ns/op    40%

There are two types of throughput to measure: the Score column shows the average time it takes for the Publisher to push one element, which in this case maxes out around 40M/s. The other part is how many elements reach the Subscriber, where we see that roughly 40% are lost due to the absence of demand.

The second set of results pertains to the case of an unbounded request scheme, implemented here using request(-1) to keep the diff minimal. The batchSize parameter controls only the queue size in this case (again 2 * batchSize for each run):

[info] # Run complete. Total time: 00:04:45
[info] 
[info] Benchmark          (batchSize)   Score  Score error    Units Losses
[info] r.Unbounded.push             1  34,944        5,705    ns/op    75%
[info] r.Unbounded.push            10  35,077        8,282    ns/op    49%
[info] r.Unbounded.push           100  28,814        3,724    ns/op    43%
[info] r.Unbounded.push          1000  24,019        6,088    ns/op    41%
[info] r.Unbounded.push         10000  24,058        4,808    ns/op    49%
[info] r.Unbounded.push        100000  21,856        3,454    ns/op    58%

We can see here that the throughput for the Publisher is the same within the error bars, but the actual stream throughput—i.e. how much arrives at the Subscriber—is lower, the losses are higher.

The size of the queue does not play a role beyond 1000 elements, and bigger queues do not make sense in a larger system anyway due to the associated space overhead, hence I am convinced that we are talking about an effect that is negligible at best. The consequences in terms of inter-operability between different Publishers and Subscribers outweigh the benefits of this proposal by far.

(NB: the above benchmarks are not relevant in my book because they only measure the case where the elements to be produced exist already, which allows sending them within the request implementation; my benchmark actually considers an independent producer of data.)

(NB: the above result should not be surprising since demand caching means that synchronization between Publisher and Subscriber is reduced by the batchSize factor and therefore quickly has no impact anymore—no volatile reads or writes in the fast path for demand propagation. Performance is dominated by the actual queueing; in a real-world application it would be dominated by the actual processing way beyond this.)


Besides the performance point, I strongly object to this proposal due to the second use-case that is mentioned in the original description above. In the case of a synchronous Subscriber it is highly problematic to make an unbounded request, since then the whole chain of processing steps that follows will need to use the very same scheme, bounded asynchronous streaming is impossible without message losses after this point. In other words, an unbounded request scheme does not compose well if used in the context of synchronous flow-control.

@benjchristensen
Copy link
Contributor Author

lack of actual measurements

I posted performance numbers done using JMH several days ago: #62 (comment)

What the benchmark does is to produce elements as quickly as possible to a Subscriber, dropping them whenever there is momentarily no demand.

This example is not the type where the performance gains are shown, as per your benchmarks. I have already specified the use cases at the start of this conversation where an infinite request are preferred, and an unbounded queue is not one of them (that is what we're all trying to avoid).

In the case of a synchronous Subscriber it is highly problematic to make an unbounded request, since then the whole chain of processing steps that follows will need to use the very same scheme

I don't see how it breaks composition. If I request Long.MAX_VALUE and then don't comply with the spec when my child requests less, then sure, I break composition, but that's because I am not complying with the spec. For example, I can have an operator stuck in the middle of a chain that requests Long.MAX_VALUE then ignores requests from it's children and firehoses them, all without any changes ... just by breaking the spec. But that has nothing to do with infinite requests, that's just that the operator is breaking the spec and ignoring requests from it's children.

The issue of composition is compliance to the requests it receives, even if it requests upwards an infinite stream, which is exactly what a temporal operator can do (or Long.MAX_VALUE). It can request infinite up and then support backpressure requests down. Thus, I don't see how this discussion affects composition, it is only about performance optimizations.

@benjchristensen
Copy link
Contributor Author

To try and bring this to conclusion, I see the following paths:

  1. Leave things as they are.

  2. Support negative numbers as a "magic number" to request unbounded emission.

  3. Support a new method such as requestAll() to request unbounded emission.

  4. Leave things as they are ... with Long.MAX_VALUE called out as an optional marker for fast path execution.

Since items 2 and 3 are contentious and do make things cluttered, I am okay if we do (1) and leave things as they are but would prefer (4) adding some kind of wording about optionally using Long.MAX_VALUE as a fast path.

I think that practically speaking it is okay for us to fast path request(Long.MAX_VALUE) since I can't see any application ever actually emitting that many values.

As mentioned earlier, even if emitting 10 onNext per nanosecond, it would take 29.2 years. More practically, a single Producer can't even emit 1 per nanosecond (292 years). Our systems today at best are doing a few hundred million per second which means ~1461 years.

So, even if our systems improve dramatically and are doing 10 onNext per nanosecond, that's 29 years, more realistically it's 250-1000+ years that a single thread would have to run to drain Long.MAX_VALUE.

Considering this, can we agree with allowing implementations to fast path request(Long.MAX_VALUE) if they wish to?

The benefit of this is that it is completely optional to provide a fast path, and code that doesn't will just work, even if receiving Long.MAX_VALUE.

I can then write code like this:

        @Override
        public void request(long n) {
            if (n == Long.MAX_VALUE) {
                // fast-path without backpressure
                for (long i = index; i <= end; i++) {
                    if (o.isUnsubscribed()) {
                        return;
                    }
                    o.onNext((int) i);
                }
                o.onCompleted();
            } else if (n > 0) {
                // backpressure is requested
                long _c = REQUESTED_UPDATER.getAndAdd(this, n);
                if (_c == 0) {
                    while (true) {
                        /*
                         * This complicated logic is done to avoid touching the volatile `index` and `requested` values
                         * during the loop itself. If they are touched during the loop the performance is impacted significantly.
                         */
                        long r = requested;
                        long idx = index;
                        long numLeft = end - idx + 1;
                        long e = Math.min(numLeft, r);
                        boolean completeOnFinish = numLeft <= r;
                        long stopAt = e + idx;
                        for (long i = idx; i < stopAt; i++) {
                            if (o.isUnsubscribed()) {
                                return;
                            }
                            o.onNext((int) i);
                        }
                        index = stopAt;

                        if (completeOnFinish) {
                            o.onCompleted();
                            return;
                        }
                        if (REQUESTED_UPDATER.addAndGet(this, -e) == 0) {
                            // we're done emitting the number requested so return
                            return;
                        }
                    }
                }
            }
        }
    }

or like this:

        @Override
        public void request(long n) {
            long _c = REQUESTED_UPDATER.getAndAdd(this, n);
            if (_c == 0) {
                while (true) {
                    /*
                     * This complicated logic is done to avoid touching the volatile `index` and `requested` values
                     * during the loop itself. If they are touched during the loop the performance is impacted significantly.
                     */
                    long r = requested;
                    long idx = index;
                    long numLeft = end - idx + 1;
                    long e = Math.min(numLeft, r);
                    boolean completeOnFinish = numLeft <= r;
                    long stopAt = e + idx;
                    for (long i = idx; i < stopAt; i++) {
                        if (o.isUnsubscribed()) {
                            return;
                        }
                        o.onNext((int) i);
                    }
                    index = stopAt;

                    if (completeOnFinish) {
                        o.onCompleted();
                        return;
                    }
                    if (REQUESTED_UPDATER.addAndGet(this, -e) == 0) {
                        // we're done emitting the number requested so return
                        return;
                    }
                }
            }
        }

and both work, but the first is optimized.

The type of wording could be like this:

A Producer MAY optimize calls to Subscription.request(long n) where n == Long.MAX_VALUE to emit infinitely.

@rkuhn
Copy link
Member

rkuhn commented Jul 13, 2014

Considering this, can we agree with allowing implementations to fast path request(Long.MAX_VALUE) if they wish to?
The benefit of this is that it is completely optional to provide a fast path, and code that doesn't will just work, even if receiving Long.MAX_VALUE.

[snip]

The type of wording could be like this:

A Producer MAY optimize calls to Subscription.request(long n) where n == Long.MAX_VALUE to emit infinitely.

Yes, we can agree on such wording. But I remain unconvinced that any real use-case will ever benefit from it; emitting a contiguous sequence of integers is not representative for any high-volume workload I can imagine and I have demonstrated that temporal operators also do not benefit measurably from unbounded requests.

Beware that requesting Long.MAX_VALUE will not be special in any other way, meaning that a conforming Subscription will terminate with onError if there was previously outstanding demand (as per 3.17).

@benjchristensen
Copy link
Contributor Author

Beware that requesting Long.MAX_VALUE will not be special in any other way,

Yes, that's as I would expect. It is still a "reactive stream" and must fit within the spec. It is just saying that it is requesting Long.MAX_VALUE which can be optimized to mean "unbounded".

Thanks @rkuhn, glad we can agree on this.

@benjchristensen
Copy link
Contributor Author

Apologies to @rkuhn for my earlier comment about the code example using unbounded queues, it does not. I misread the code. It uses bounded queues. However, the point I was trying to make was that in the use cases I'm exploring queueing is not typically going to occur (bounded or unbounded) when request(Long.MAX_SIZE) is used. If queueing is being done it would generally make more sense to use a bounded queue and request the correct size otherwise the point of request(n) is being defeated.

@viktorklang
Copy link
Contributor

So the proposal is to change request(n) to take a long and amend the spec
to allow to interpret an outstanding demand of exactly 2^63-1 as
"effectively unbounded"?
On Jul 14, 2014 4:38 AM, "Ben Christensen" [email protected] wrote:

Beware that requesting Long.MAX_VALUE will not be special in any other way,

Yes, that's as I would expect. It is still a "reactive stream" and must
fit within the spec. It is just saying that it is requesting Long.MAX_VALUE
which can be optimized to mean "unbounded".

Thanks @rkuhn https://github.com/rkuhn, glad we can agree on this.


Reply to this email directly or view it on GitHub
#62 (comment)
.

@viktorklang
Copy link
Contributor

Is this what we mean?

#82

@alexandru
Copy link

@viktorklang after thinking about it, I do agree with most of your concerns and arguments against this feature, however I don't agree with this one:

My concern is also the laziness of developers, if it is much "easier" to not care about back-pressure (not needing to track demand) by just calling "gimmeh everything" we're back at square one where systems fail in production with OOMEs (which is exactly what this collaboration was trying to alleviate).

I don't see that as a problem and also there are cases in which the developer has no control over the data-source, in those cases in which the data-source has no means for implementing back-pressure - like say a client that exposes an interface based on listeners with no way to signal demand and I have encountered such instances many times while working on real-world apps.

In such instances, the library itself needs to deal with it - by throwing a buffer overflow exception, by dropping events on the floor, by doing nothing in which case the process may indeed fail with an OOME, etc... what it does by default is up to the library implementation, tunable by the user.

@viktorklang
Copy link
Contributor

On Mon, Jul 14, 2014 at 2:35 PM, Alexandru Nedelcu <[email protected]

wrote:

@viktorklang https://github.com/viktorklang after thinking about it, I
do agree with most of your concerns and arguments against this feature,

Hi Alexandru,

however I don't agree with this one:

My concern is also the laziness of developers, if it is much "easier" to
not care about back-pressure (not needing to track demand) by just calling
"gimmeh everything" we're back at square one where systems fail in
production with OOMEs (which is exactly what this collaboration was trying
to alleviate).

I don't see that as a problem

How so? (My argument being keeping track of demand is "harder"/"more work"
than just punting and calling requestOOME().)

and also there are cases in which the developer has no control over the
data-source, in those cases in which the data-source has no means for
implementing back-pressure - like say a client that exposes an interface
based on listeners with no way to signal demand and I have encountered such
instances many times while working on real-world apps.

This doesn't relate to request(n), you're talking about a data source
i.e. a Publisher (we are talking about Subscribers)

In such instances, the library itself needs to deal with it - by throwing
a buffer overflow exception, by dropping events on the floor, by doing
nothing in which case the process may indeed fail with an OOME, etc... what
it does by default is up to the library implementation, tunable by the user.

See previous answer.

Did you see #82 ?

Reply to this email directly or view it on GitHub
#62 (comment)
.

Cheers,

@alexandru
Copy link

@viktorklang I submitted the comment too early by mistake :)

My point is that you can't really protect the user from OOMEs - being the responsibility of the library to make it easy/intuitive enough for the user to deal with it and there are really a lot of cases in which the producer is definitely slower than the consumer(s), making it a non-issue.

The laziness of developers shouldn't be in my opinion a concern here, after all people can always request Long.MaxValue and expect to never receive more events than that or break the contract in a myriad of other ways, but the clarity of the spec / the API is another issue, as clarity is needed for users wanting to do the right thing.

So I agree with your points. I also agree with the concerns on performance. I personally prefer clarity of the API over performance issues - but I think you folks will be able to make good decisions on this issue. Just saying that wanting to stop people from willingly shooting themselves in the foot is a slightly different concern than wanting to make it easy for people to do the right thing, from an API perspective.

@viktorklang
Copy link
Contributor

Hi @alexandru,

My point is that you can't really protect the user from OOMEs -

You can make an effort to make it easier to do the right thing than the wrong thing, don't you agree? There's no way to prevent programmers to create bugs, so why have type systems? — That's false logic.

being the responsibility of the library to make it easy/intuitive enough for the user to deal with it and there are really a lot of cases in which the producer is definitely slower than the consumer(s), making it a non-issue.

Au contraire! At the many points is the producer slower than the consumer, so your bugs will show up in production rather than in the compiler or tests, which creates real damage (downtime, delays). Also, one mans producer is another mans consumer, making the important distinction that back pressure needs to work across the entire chain.

The laziness of developers shouldn't be in my opinion a concern here, after all people can always request Long.MaxValue and expect to never receive more events than that or break the contract in a myriad of ways,

What constitutes breakage is exactly what we're tasked to solve. But just because a developer can willfully destroy the entire codebase/loiter the codebase with System.exit(-1)/use floats for monetary amounts/whathaveyou—malicious issues, it doesn't mean that we shouldn't make it unambiguous/straight forward to know what to use—to avoid inadvertent issues. Because frankly, when something does not work as expected, one has to reasonably easy be able to know what one did wrong—corrective maintenance is seldomly a value enhancer.

but the clarity of the spec / the API is another issue, as clarity is needed for users wanting to do the right thing.

No they can't, because it's yet to be legal to request Long.MAX_VALUE (currently an Int)—and as Roland's experiment shows, for any push across an asynchronous boundary, a batch value of > 1000 yields 0-to-no performance gains, meaning that the cap perhaps should be closer to short rather than int.

It seems however, that we can reach consensus and move forward with widening to long and allowing the interpretation of 2^63-1 to mean effectively unbounded. But of course this depends on the vote of the @reactive-streams/contributors

@benjchristensen
Copy link
Contributor Author

So the proposal is to change request(n) to take a long and amend the spec
to allow to interpret an outstanding demand of exactly 2^63-1 as
"effectively unbounded"?

Yes.

@jbrisbin
Copy link

Fine with me.

@tmontgomery
Copy link

As I mentioned, I'm fine with using Long.MAX_VALUE (or 2^63 - 1) to effectively give unbounded behavior. What I take from the discussion is some observations based on implementations today. I think this might be somewhat short sighted. To do some math.... what happens if you use short or int?

I think we can learn from TCPs flow control is not to making the flow control (aka request) window too small.

It is not unrealistic to envision a long RTT between sender and receiver (let's keep it terrestrial and say 100 ms), which implies a large request value to keep the pipe full. Let's say 100 byte objects. At 200M ops/sec, that is roughly 1.9GB per RTT. And the RTT implies roughly a window of 20M objects. Clearly, a short is too small for that use case. And with long haul networks at 100+ Gbps, an int is really close to being too small for todays possibilities.

@rkuhn
Copy link
Member

rkuhn commented Jul 14, 2014

Good point, Todd.

Viktor’s implementation in #82 is very slightly different from the code samples discussed previously in that it focuses on outstanding demand instead of making a certain request value “magic”, which I think is more consistent and allows all the same usages.

Thanks, Ben, for raising this issue!

@viktorklang
Copy link
Contributor

Solved

@ktoso ktoso mentioned this issue Jul 31, 2014
6 tasks
ktoso added a commit to ktoso/reactive-streams that referenced this issue Jul 31, 2014
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

7 participants