-
Notifications
You must be signed in to change notification settings - Fork 534
Rules 3.10 and 3.11 contradict 1.3 and 3.2 and place an unnecessary burden on the subscriber #272
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
I don't understand your conclusion that the invocation of The intent of this is that the Just because synchronous invocations are permitted does not mean rule 1.3 is defeated. Synchronous invocations are permitted as long as 1.3 is maintained. Permitting synchronous invocations permits simple and performance implementations when the use case (typically small amount of data) applies.
I don't yet understand the contradiction so do not see the reason for these changes. Additionally ... Alternative 1 is not wanted. We specifically do not want to force async scheduling if a I don't understand how alternative 2 can be implemented since it is not straight forward or even feasible for a More importantly though, we want to allow it to synchronously invoke
I can see how it can be considered vague, but that is by design to allow the We can elaborate on the wording if that would help, but the principle of allowing synchronous invocation is one we consciously decided upon after much discussion and after several years of implementing code like this and learning that allowing synchronous invocation is desirable.
Re-reading these a few times, the only thing I can discern is that 3.10 and 3.11 take into account the unsubscribed state, whereas 3.2 does not. Is that your concern? I don't see how 1.3 contradicts. Nothing about 3.2/3.10/3.11 breaks the rule about sequential execution. As state above, permitting synchronous invocation does not shed the burden of the
The burden you state is about "timing". But this is on the If you still feel I'm missing the contradiction and burden, can you please provide unit tests or sample code demonstrating the problems and burdens? I ask because I am struggling to understand the stated problem. |
I think the difference between how I read it and how you read it is that you consider "concurrently" in rule 1.3 to only mean that the method cannot be concurrently called by more than one thread, while I read it to mean as any execution which would cause
No, but it might. The footnote for rule 3.3 explicitly demonstrates an example where rule 1.3 is violated by having
That is only so if you accept the second proposed alternative to 3.10/3.11. As it stands, concurrent execution is allowed. If it weren't, rule 3.3. (limit on recursion depth) would not have been necessary. The question is: is the sequence described in that footnote allowed or not? If it is allowed, then it clearly contradicts 1.3.
Right, but the same efficiency could be attained by looping in
:) No, it's on the publisher because and as it stands, there's a burden on the subscriber to call
Sure: class MySubscriber implements Subscriber<Integer> {
Subscription sn;
public void onSubscribe(Subscription s) {
this.sn = s;
s.request(1);
}
public void onNext(Integer x) {
sn.request(1);
System.out.println(x);
}
public void onComplete() {
System.out.println("Done!");
}
} This subscriber might print its elements in the reverse order, only because The semantics of this subscriber's behavior is then dependent on whether or not the subscription chooses to call
I believe it's both feasible and performant because of 2.7 ("A class MySubscription implements Subscription<Integer> {
private final Subscriber<Integer> subscriber;
private boolean withinOnNext; // no need for volatile or synchronization due to total ordering
private long pending; // ditto
// ...
public void request(long n) {
pending += n;
if (withinOnNext)
return;
// rule 2.7 ensures there will be no asynchronous calls to request while we're here:
while ((pending--) > 0)
signalNext();
}
private void signalNext() {
withinOnNext = true;
subscriber.onNext(ThreadLocalRandom.current().nextInt());
withinOnNext = false;
}
} So I've trampolined rather than recursed, and I can't think of a case where this can't be done. Is there? |
@pron, the current specification is good enough and I was able to start rewriting RxJava based on it. Creating a fluent-API is well within the allowed realm. However, I've come to the conclusion that almost all reasonable To see the need for the conservative approach, i.e., the need fully thread-safe and reentrant-safe I've blogged about For 'real' trouble, one can look at hot observables ( |
I'm not saying it isn't, just that it places an unnecessary burden on the subscriber, that can be lifted without hurting performance. As it stands, the spec implicitly (and, IMO, unnecessarily) mandates that the subscriber call
I am not sure I understand your example, but if the producer (or subscription) has its own thread then there's absolutely no reason for it to ride on the subscriber's thread and call |
@pron I agree with your analysis that allowing (bounded) recursion places restrictions on when the Subscriber may call |
Ah, but I addressed that point precisely. I think -- and correct me if I'm wrong -- that there are two ways such a subscriber-publisher pair can interact, and only two:
The spec could then allow a recursion of depth 1, i.e.
Even though I believe rule 2.7 removes any need for this optimization, as it allows both trampolining (either in
as that is the implication of allowing recursion of depth > 1 in (This means two rules -- this one, which is rather burdensome, and 3.3 (limit on recursion depth), which is rather vague -- required to allow an optimization that is probably made unnecessary by rule 2.7) |
I've created a rudimentary implementation for what I believe to be a fully generic thread-less publisher. It handles all possible use cases for such a publisher allowed by the spec, it allows recursion of depth 1 only, and avoids all problems in the use-case I provided ( Is there any purpose served by allowing deeper recursion that is not served by this implementation? |
Few remarks:
|
@akarnokd Thanks! Fixed, except for 2. You are, of course, correct: The |
Apologies, I haven't had enough cycles to keep up with this Issue, is there something that must be fixed or can we close it? |
Hi. Both the spec itself would be simpler (rule 3.3 could then be dropped, and 1.3 would hold true without need for additional caveats), as well as remove a burden from the subscribers, as their behavior would no longer be determined by the timing of the call to If, OTOH, there are uses for arbitrary recursion depth not addressed by the linked publisher, then two clarifications must be made in order for the spec to be consistent:
|
I don't understand how this is to be considered concurrently. Could you elaborate what you mean?
That's a no go.
Perhaps the fix is to recommend any and all synchronous Publishers and Subscribers to appropriately guard for reentrancy. |
One thing we might want to remember: «Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure.» The point is transporting data elements across an asynchronous boundary, and none of the points above would be an issue if we were talking about that. Synchronous execution is a quirky special case, maybe we need to be more explicit about that. |
Excellent, but as it stands, the spec already places this requirement without making it explicit. To make matters very simple, let me state my proposal clearly. Rules 3.10 and 3.11 should be changed to limit the allowed recursion depth to one, namely: 3.10 While the 3.11 While the The generic thread-less publisher I linked to demonstrates that any thread-less publisher can be easily made to abide by these rules without loss to any functionality or performance. Making this change would also render rule 3.3 superfluous so it can be removed. |
Absolutely! As it turns out, limiting recursion depth to 1 takes care all requirements that make that special case necessary, and would render any additional clarification unnecessary (in fact, it would remove a whole rule pertaining to synchronous execution). Placing this inconsequential restriction on the publisher would let all subscribers ignore this special case altogether (as they wouldn't sense any different behavior even in the synchronous case). |
I'll have to think some more about it, but currently, given the wording of 3.10 and 3.11, your MySubscriber works as designed (reading the spec it is clear that you may get onNext called within onSubscribe if you call subscription.request there, and you are a synchronous Subscriber, which you chose for yourself). Given the timing of this discussion (post 1.0), I'd be weary of making any changes that break currently valid implementations. |
To clarify further: not making this change now, will break subscribers that call |
3.10 and 3.11 clearly states that Subscription.request MAY synchronously call methods on the Subscriber, as such, it means that if a Subscriber decides it wants to be synchronous, it needs to deal with this fact, don't you agree?
It will most definitely break existing synchronous Publishers.
Do note that the spec is clear what to expect from a Subscriber PoV, and also, that Synchronous Publishers and Subscribers are not the focal point of the initiative, I have a hard time convincing myself that breaking Publishers post 1.0 for a minor simplification has the right cost-benefit ratio—I'd love to hear what the other @reactive-streams/contributors think about the proposal. A compromise which is non breaking and non-invasive would be to explain the intricacies of synchronous Publishers and Subscribers more clearly. |
@pron So, from my point of view there's nothing >>wrong<< with your proposal—it's an interesting simplification—it's that it has an unfortunate timing and (regrettably) does not fix a spec bug per se (which would make it much easier to justify breaking changes). |
Why don’t we include it with RECOMMENDED status? That would not break anything while still simplifying life for future implementors. |
@rkuhn RECOMMENDED is equivalent to adding to the documentation as |
EDIT: Sorry, I've misunderstood -- yes, you are right. I still think it makes the synchronous case more complicated than need be.
The spec (3.10-11) allows multiple calls to |
@pron What you are overlooking is that the intended use of Subscriber is to dispatch the processing of all signals asynchronously, which makes this point moot. There is no reentrancy once you do that. |
@rkuhn You are probably right, which is why I think it's good that as someone not involved with writing the spec, I can only guess its intentions but know what it actually says :) As the spec stands, it certainly devotes quite a few rules to deal with synchronous behavior -- even if it is just a special case. Indeed, if it is a special case, it is best to simplify it and make it as much of a non-issue for subscribers. In that case, it also stands to reason that simplifying the spec now would not be too onerous for publisher implementors. Also, even if the subscriber does choose to process events by dispatching them to some other worker thread, rules 3.10-11 mean it should still be careful with instruction ordering. For example: class MyAsyncSubscriber implements Subscriber<Integer> {
Subscription sn;
ExecutorService exec = Executors.newSingleThreadExecutor();
public void onSubscribe(Subscription s) {
this.sn = s;
s.request(1);
}
public void onNext(Integer x) {
sn.request(1);
exec.submit(() -> doSomething(x)); // may throw a RejectedExecutionException depending on the subscription's behavior
}
public void onComplete() {
exec.shutdown(); // might be called before the last submit
}
} This may not only submit tasks in an unexpected order (which may matter to someone), it will also fail if So I think what you are saying is that the burden rules 3.10-11 place on the subscriber is not as onerous as I made it out to be, as the burden in this case is not "must not call |
Yes, that is indeed what I meant. We cannot reasonably remove the obligation of the Subscriber to handle reentrant execution of its methods from within calls to |
OK. Although, it might be worthwhile to figure out if there are any existing publishers at all that take advantage of arbitrary-depth recursion. Looking at the list of implementations, I can guess that you would find them only in RxJava (all other implementations seem 100% IO related and unlikely to make use of recursion). If there aren't any (or only very few), you might be inclined to change the spec to simplify the synchronous case. Whatever you decide, thank you (all) for considering this. |
What I propose is the following:
The end result of this proposal is that it doesn't break any current implementations, makes the spec clearer without changing its semantics and makes implementations of synchronous Publishers and Subscribers more straight forward as the documentation will more clearly articulate what needs consideration. Does that sound like an acceptable middle road? |
Sure. Quite reasonable. I do have to ask, though, if it wouldn't be acceptable to say that while keeping recursion depth to 1 is only recommended, this may become a requirement in a future revision? Specs do sometimes tighten their requirements thus making formerly compliant implementation, non-compliant with the new version. I don't think it's reasonable to assume that if you conform with a standard specification, you will remain compliant for all future versions even if you change nothing. Most standards don't make that promise. In this particular case, tightening the requirements will only "negatively" affect a very small number of publisher implementations, and "positively" affect a potentially very large number of subscriber implementation. In any case, your decision is perfectly acceptable. |
I think recommending implementors to keep depth to 1 as a future-proofing strategy is a great suggestion. |
@viktorklang what about adding your foot note then ? |
@smaldini Open a PR and I'll be more than happy to review :) (I am swamped at the moment :S) |
I just saw this issue and I have to say, this kind of confusion is what I had attempted to communicate in #230 @rkuhn 's quote is the money quote: That's exactly right, and I would go further and say synchronous notification ( |
@nomagic I disagree and will try to argue why from my iPhone. Also it allows interesting threading setup like asynchronously dispatching onSubscribe+request only (mitigate slow publisher) versus dispatching all signals (mitigate slow subscriber). Hope it makes sense, just wanted to weigh in my opinion. |
@smaldini
should be forbidden and would be blamed on the We already have built-in mechanisms for backpressure in the Stream world, e.g. pull-based mechanisms like
is worth the attendant confusion by users of the framework, viz. the implementers of
is good enough. |
I don't see what the problem with calling Here is an If you want to prevent any request from |
That's also the way we solve it, it's not really hard and avoid an async hop. Plus iterator is a bit all or nothing, request is fine grained and allows various behaviour like adapted prefetch. I think the key point is that subscriber is location transparent basically. Sent from my iPhone
|
@akarnokd Longer Answer:
Here's the stack (grows down):
I'm taking into account your recursion limitation within
I realize there is zero chance that the spec will be changed; but there isn't a font big or bold enough to explain in plain English that the above behavior is possible (although I don't really know what to do about it). --End Rant |
Closing due to inactivity, please reopen if you are also willing to open a PR about adding a recommendation to keep stack depth growth to 1. |
Rule 1.3 states that "
onSubscribe
,onNext
,onError
andonComplete
signaled to aSubscriber
MUST be signaled sequentially (no concurrent notifications)”, while 3.10 (and similarly 3.11) says ,"While theSubscription
is not cancelled,Subscription.request(long n)
MAY synchronously callonNext
on this (or other) subscriber(s)”. Combined with 3.2 ("The Subscription MUST allow theSubscriber
to callSubscription.request
synchronously from withinonNext
oronSubscribe
") we get thatonNext
et al. may certainly get called concurrently with other calls toonNext
etc., in contradiction to rule 1.3. Rule 3.3 (regarding recursion depth) with its accompanying footnote makes this contradiction abundantly clear.Moreover, rules 3.10 and 3.11 place a burden on the subscriber as to the exact timing of the call to
request
, the same burden (although without need for thread synchronization) that rule 1.3 wishes to prevent. For example,request
must not be called beforeonNext
handles the element.Rules 3.10 and 3.11 presumably exist only so a publisher would not necessarily require a separate worker thread to issue
onNext
, and instead could "ride" on the subscriber's thread. However, given rule 3.3 ("Subscription.request
MUST place an upper bound on possible synchronous recursion betweenPublisher
andSubscriber
"), this only makes sense in two alternative scenarios:In alternative 1, rules 3.10/3.11 allow a simple publisher, publishing one (or a mere few) elements to not require a separate worker thread to issue
onNext
. This, however, seems unnecessary, as this behavior is already possible with the pattern demonstrated in the following diagram (lines are steps within the same thread, while indentation signifies stack depth), initiated on the subscriber's thread:A simple loop employing simple, non-volatile and unsynchronized variables, within
Publisher.subscribe
would then achieve the same desired effect (which presumably necessitated rules 3.10 and 3.11 in the first place) without any loss of generality or performance, and without placing an onerous burden on the publisher, thus shifting a tricky, subtle, and possibly confusing requirement from the subscriber to a rather simple requirement on the publisher (in line with the rest of the spec that favors placing most behavior requirements on the publisher and as few as possible on the subscriber).Another alternative (alternative 2) is that rules 3.10 and 3.11 have been made to facilitate synchronous calls to subscriber's methods from within
request
in cases where the call torequest
does not occur withinonNext
,onSubscribe
etc., but on some worker thread managed by the subscriber, in which case those rules are indeed necessary for the publisher to ride on the subscriber thread. However, catering to that specific scenario alone would satisfy this requirement and would also lift the burden from the subscriber regarding the timing of the call torequest
withinonNext
et al.. Doing so is simple and does not require any special synchronization mechanism due to rule 2.7 ("ASubscriber
MUST ensure that all calls on itsSubscription
take place from the same thread or provide for respective external synchronization.").I therefore propose that rules 3.10 and 3.11 be changed,
under alternative 1, to state the exact opposite of their current wording:
3.10
Subscription.request(long n)
MUST NOT synchronously callonNext
on this (or other) subscriber(s).3.11
Subscription.request(long n)
MUST NOT synchronously callonComplete
oronError
on this (or other) subscriber(s).OR (alternative 2):
3.10 While the
Subscription
is not cancelled,Subscription.request(long n)
MAY synchronously callonNext
on this (or other) subscriber(s), but MUST NOT do so ifSubscription.request(long n)
has been synchronously called from withinonNext
oronSubscribe
.3.11 While the
Subscription
is not cancelled,Subscription.request(long n)
MAY synchronously callonComplete
oronError
on this (or other) subscriber(s), but MUST NOT do so ifSubscription.request(long n)
has been synchronously called from withinonNext
oronSubscribe
.Either of these alternatives would lift the contradiction, remove any burden on the subscriber regarding the exact timing of the call to
request
, and render (the rather vague) rule 3.3 superfluous, thus simplifying the specification. This change would not break any existing subscriber (as behavior restrictions are only lifted), and, at most, require only minor changes to existing publisher implementations.The text was updated successfully, but these errors were encountered: