Skip to content

Rules 3.10 and 3.11 contradict 1.3 and 3.2 and place an unnecessary burden on the subscriber #272

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
pron opened this issue Jun 6, 2015 · 38 comments

Comments

@pron
Copy link

pron commented Jun 6, 2015

Rule 1.3 states that "onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled sequentially (no concurrent notifications)”, while 3.10 (and similarly 3.11) says ,"While the Subscription is not cancelled, Subscription.request(long n) MAY synchronously call onNext on this (or other) subscriber(s)”. Combined with 3.2 ("The Subscription MUST allow the Subscriber to call Subscription.request synchronously from within onNext or onSubscribe") we get that onNext et al. may certainly get called concurrently with other calls to onNext etc., in contradiction to rule 1.3. Rule 3.3 (regarding recursion depth) with its accompanying footnote makes this contradiction abundantly clear.

Moreover, rules 3.10 and 3.11 place a burden on the subscriber as to the exact timing of the call to request, the same burden (although without need for thread synchronization) that rule 1.3 wishes to prevent. For example, request must not be called before onNext handles the element.

Rules 3.10 and 3.11 presumably exist only so a publisher would not necessarily require a separate worker thread to issue onNext, and instead could "ride" on the subscriber's thread. However, given rule 3.3 ("Subscription.request MUST place an upper bound on possible synchronous recursion between Publisher and Subscriber"), this only makes sense in two alternative scenarios:

In alternative 1, rules 3.10/3.11 allow a simple publisher, publishing one (or a mere few) elements to not require a separate worker thread to issue onNext. This, however, seems unnecessary, as this behavior is already possible with the pattern demonstrated in the following diagram (lines are steps within the same thread, while indentation signifies stack depth), initiated on the subscriber's thread:

Publisher.subscribe
        Subscriber.onSubscribe
                Subscription.request
        Subscriber.onNext
                Subscription.request
        ... etc.

A simple loop employing simple, non-volatile and unsynchronized variables, within Publisher.subscribe would then achieve the same desired effect (which presumably necessitated rules 3.10 and 3.11 in the first place) without any loss of generality or performance, and without placing an onerous burden on the publisher, thus shifting a tricky, subtle, and possibly confusing requirement from the subscriber to a rather simple requirement on the publisher (in line with the rest of the spec that favors placing most behavior requirements on the publisher and as few as possible on the subscriber).

Another alternative (alternative 2) is that rules 3.10 and 3.11 have been made to facilitate synchronous calls to subscriber's methods from within request in cases where the call to request does not occur within onNext, onSubscribe etc., but on some worker thread managed by the subscriber, in which case those rules are indeed necessary for the publisher to ride on the subscriber thread. However, catering to that specific scenario alone would satisfy this requirement and would also lift the burden from the subscriber regarding the timing of the call to request within onNext et al.. Doing so is simple and does not require any special synchronization mechanism due to rule 2.7 ("A Subscriber MUST ensure that all calls on its Subscription take place from the same thread or provide for respective external synchronization.").

I therefore propose that rules 3.10 and 3.11 be changed,

under alternative 1, to state the exact opposite of their current wording:


3.10 Subscription.request(long n) MUST NOT synchronously call onNext on this (or other) subscriber(s).

3.11 Subscription.request(long n) MUST NOT synchronously call onComplete or onError on this (or other) subscriber(s).


OR (alternative 2):


3.10 While the Subscription is not cancelled, Subscription.request(long n) MAY synchronously call onNext on this (or other) subscriber(s), but MUST NOT do so if Subscription.request(long n) has been synchronously called from within onNext or onSubscribe.

3.11 While the Subscription is not cancelled, Subscription.request(long n) MAY synchronously call onComplete or onError on this (or other) subscriber(s), but MUST NOT do so if Subscription.request(long n) has been synchronously called from within onNext or onSubscribe.


Either of these alternatives would lift the contradiction, remove any burden on the subscriber regarding the exact timing of the call to request, and render (the rather vague) rule 3.3 superfluous, thus simplifying the specification. This change would not break any existing subscriber (as behavior restrictions are only lifted), and, at most, require only minor changes to existing publisher implementations.

@pron pron changed the title Rules 3.10 and 3.11 contradict 1.3 and 3.2 and cause other problems Rules 3.10 and 3.11 contradict 1.3 and 3.2 and place an unnecessary burden on the subscriber Jun 6, 2015
@benjchristensen
Copy link
Contributor

we get that onNext et al. may certainly get called concurrently with other calls to onNext

I don't understand your conclusion that the invocation of request necessitates concurrent invocation of onNext. A synchronous invocation of request does not mean that request will concurrently invoke onNext.

The intent of this is that the Subscriber does not have the burden of asynchronously scheduling or serializing calls to request. It is the burden of the Publisher to handle both async and sync calls to request and to concern itself with scheduling, callstack recursion, serialization etc whereas the Subscriber can ignore all of those things.

Just because synchronous invocations are permitted does not mean rule 1.3 is defeated. Synchronous invocations are permitted as long as 1.3 is maintained.

Permitting synchronous invocations permits simple and performance implementations when the use case (typically small amount of data) applies.

Either of these alternatives would lift the contradiction

I don't yet understand the contradiction so do not see the reason for these changes. Additionally ...

Alternative 1 is not wanted. We specifically do not want to force async scheduling if a Publisher has good reason to just synchronously emit (such as a single item from an in-memory cache).

I don't understand how alternative 2 can be implemented since it is not straight forward or even feasible for a Subscription to reliably know if it was called synchronously or asynchronously.

More importantly though, we want to allow it to synchronously invoke onNext/onComplete/onError from a synchronous invocation of request if the Subscription/Publisher decides that is the most efficient approach for a small number of events (such as a single item).

render (the rather vague) rule 3.3 superfluous

I can see how it can be considered vague, but that is by design to allow the Subscription/Publisher to make its own design decision while warning implementors to be sure they don't allow unbounded recursion. If an infinite stream is being emitted then certainly async scheduling will be required. If it is a finite but large stream, then it is also likely wanted. If however it is a finite and small stream then synchronous execution may be the correct approach and recursion, even if it occurs, is not a problem.

We can elaborate on the wording if that would help, but the principle of allowing synchronous invocation is one we consciously decided upon after much discussion and after several years of implementing code like this and learning that allowing synchronous invocation is desirable.

Rules 3.10 and 3.11 contradict 1.3 and 3.2

Re-reading these a few times, the only thing I can discern is that 3.10 and 3.11 take into account the unsubscribed state, whereas 3.2 does not. Is that your concern?

I don't see how 1.3 contradicts. Nothing about 3.2/3.10/3.11 breaks the rule about sequential execution. As state above, permitting synchronous invocation does not shed the burden of the Publisher/Subscription to ensure sequential emission.

place an unnecessary burden on the subscriber

The burden you state is about "timing". But this is on the Subscription, not the Subscriber. Since the Subscription is created and emitted by the Publisher, it is the burden of the Publisher and not the Subscriber. Even that requirement of the Subscription is just a conditional check, and one that is permitted to have race conditions as per 2.8. It is a very light burden and one borne by the Subscription, not the Subscriber.

If you still feel I'm missing the contradiction and burden, can you please provide unit tests or sample code demonstrating the problems and burdens? I ask because I am struggling to understand the stated problem.

@pron
Copy link
Author

pron commented Jun 8, 2015

I think the difference between how I read it and how you read it is that you consider "concurrently" in rule 1.3 to only mean that the method cannot be concurrently called by more than one thread, while I read it to mean as any execution which would cause onMethod to concurrently be on a stack more than once (even if there is only one thread and one stack), i.e. any situation which would cause a ConcurrentModificationException if we were dealing with iterators rather than streams.

A synchronous invocation of request does not mean that request will concurrently invoke onNext.

No, but it might. The footnote for rule 3.3 explicitly demonstrates an example where rule 1.3 is violated by having onNext concurrently called.

Synchronous invocations are permitted as long as 1.3 is maintained.

That is only so if you accept the second proposed alternative to 3.10/3.11. As it stands, concurrent execution is allowed. If it weren't, rule 3.3. (limit on recursion depth) would not have been necessary.

The question is: is the sequence described in that footnote allowed or not? If it is allowed, then it clearly contradicts 1.3.

More importantly though, we want to allow it to synchronously invoke onNext/onComplete/onError from a synchronous invocation of request if the Subscription/Publisher decides that is the most efficient approach for a small number of events (such as a single item).

Right, but the same efficiency could be attained by looping in Publisher.susbcribe or as in the example provided at the end of my comment.

The burden you state is about "timing". But this is on the Subscription, not the Subscriber

:) No, it's on the publisher because request might synchronously call onNext.

and as it stands, there's a burden on the subscriber to call request only at very specific locations within onNext.

can you please provide unit tests or sample code demonstrating the problems and burdens?

Sure:

class MySubscriber implements Subscriber<Integer> {
    Subscription sn;

    public void onSubscribe(Subscription s) {
        this.sn = s;
        s.request(1);
    }

    public void onNext(Integer x) {
        sn.request(1);
        System.out.println(x);    
    }

    public void onComplete() {
        System.out.println("Done!"); 
    }
}

This subscriber might print its elements in the reverse order, only because request is called before rather than after println. It may also print Done! before any of the elements. In fact, note how a wrong placement of s.request(1) within onSubscribe could have caused an NPE in onNext.

The semantics of this subscriber's behavior is then dependent on whether or not the subscription chooses to call onNext and/or onComplete synchronously from within request. The spec says absolutely nothing about precisely when in the execution the subscriber must call request (nor should it), yet here the subscription behavior may entirely change the subscribers.

I don't understand how alternative 2 can be implemented since it is not straight forward or even feasible for a Subscription to reliably know if it was called synchronously or asynchronously.

I believe it's both feasible and performant because of 2.7 ("A Subscriber MUST ensure that all calls on its Subscription take place from the same thread or provide for respective external synchronization.") as it ensures a proper happens-before ordering on all calls to the subscription. In fact, the spec enforces a complete happens-before relationship on all calls to the API concerning a single subscription. So:

class MySubscription implements Subscription<Integer> {
    private final Subscriber<Integer> subscriber;
    private boolean withinOnNext; // no need for volatile or synchronization due to total ordering
    private long pending;         // ditto

    // ...

    public void request(long n) {
        pending += n;
        if (withinOnNext)
            return;

        // rule 2.7 ensures there will be no asynchronous calls to request while we're here:
        while ((pending--) > 0)
            signalNext();
    }

    private void signalNext() {
        withinOnNext = true;
        subscriber.onNext(ThreadLocalRandom.current().nextInt());
        withinOnNext = false;
    }
}

So I've trampolined rather than recursed, and I can't think of a case where this can't be done. Is there?

@akarnokd
Copy link
Contributor

akarnokd commented Jun 8, 2015

@pron, the current specification is good enough and I was able to start rewriting RxJava based on it. Creating a fluent-API is well within the allowed realm.

However, I've come to the conclusion that almost all reasonable Subscription implementation should be conservative in its concurrency, but even if request/cancel/onXXX is run synchronously, the request method has to be implemented in a reentrant-safe manner.

To see the need for the conservative approach, i.e., the need fully thread-safe and reentrant-safe request() implementation is required (cancel() might be independently thread-safe but still idempotent), just imagine the subscription driving an observeOn operator where the downstream request() call may grab the production right from the original thread and put it on the observation thread. In other words, request() requires a trampolining to be reentrant-safe (which also puts a limit on recursion) and thread-safe counting of requests and productions. This also makes it somewhat irrelevant from where and when request() is called.

I've blogged about Subscriptions (Producers in RxJava's 1.x terminology) and their related issues similar to your cases a few weeks ago. I'm also planning an entry about how to transition from Producers to Subscriptions.

For 'real' trouble, one can look at hot observables (Processors or Subjects) and see that either they need to be attached to a normal backpressure-aware source, or the caller to Processor.onNext() has to have some Subscription ready before onNext to conform the spec and/or have internal, possibly unbounded buffers. AsyncSubject and ReplaySubject is not really affected, but the rest are. For practical reasons, I've prototyped them so that they don't really expect onSubscribe to be called and don't really care about downstream requests if that would incur extra buffering. The developer has operators to deal with lack of backpressure support as before.

@pron
Copy link
Author

pron commented Jun 8, 2015

the current specification is good enough and I was able to start rewriting RxJava based on it. Creating a fluent-API is well within the allowed realm.

I'm not saying it isn't, just that it places an unnecessary burden on the subscriber, that can be lifted without hurting performance. As it stands, the spec implicitly (and, IMO, unnecessarily) mandates that the subscriber call request only once it's done processing all previous elements.

just imagine the subscription driving an observeOn operator where the downstream request() call may grab the production right from the original thread and put it on the observation thread.

I am not sure I understand your example, but if the producer (or subscription) has its own thread then there's absolutely no reason for it to ride on the subscriber's thread and call onNext synchronously from within request. As I have guessed and @benjchristensen confirmed, the provision for synchronous calls has been made for cold, short streams only. As I believe those can be as efficient without allowing synchronous calls or with limiting recursion to exactly 1 level (i.e. request may call onNext but only if it itself has not been called by onNext).

@rkuhn
Copy link
Member

rkuhn commented Jun 8, 2015

@pron I agree with your analysis that allowing (bounded) recursion places restrictions on when the Subscriber may call request(), and I see your point that technically this could be called concurrent signaling—we may want to revisit the formulation of this clause. What I don’t agree with is your claim that disallowing recursion would not inhibit the creation of thread-less Publishers: the call scheme you sketch only works if the Subscriber always requests from each callback, starting in onSubscribe, but this is not mandated. While requesting in onSubscribe might be common, there are reasons to not do it if the Subscriber is still initializing at this point (for example), and onNext() will likely fail to call request() as soon as back-pressure is encountered—which is the key feature of Reactive Streams. Initially I was opposed to any synchronous processing of any of the callbacks, but over time and through long and hard discussions I was convinced that for practical reasons we should allow this optimization, at the cost that you observe.

@pron
Copy link
Author

pron commented Jun 9, 2015

@rkuhn

the call scheme you sketch only works if the Subscriber always requests from each callback, starting in onSubscribe, but this is not mandated.

Ah, but I addressed that point precisely. I think -- and correct me if I'm wrong -- that there are two ways such a subscriber-publisher pair can interact, and only two:

  1. The subscriber may call request in onSubscribe, in which case a simple trampoline loop in subscribe would "ride" the subscriber's thread.
  2. The subscriber may call request asynchronously at some point. However, rule 2.7 mandates that all such calls must originate on the same thread or appear to do so (with proper synchronization). Therefore a trampoline loop within request that detects this asynchronous usage will be able to efficiently ride the subscriber's thread without requiring any sort of synchronization. The subscriber is not allowed to issue another asynchronous call to request as long as this one hasn't returned (this does not place an additional burden on the subscriber, as this behavior also exists with recursion).

The spec could then allow a recursion of depth 1, i.e. request may call onXXX but only if it itself has not been synchronously called by onXXX. The example I supplied at the end of my long response to @benjchristensen (MySubscription) illustrates how to easily do that. I believe it is entirely threadsafe (thanks to rule 2.7), it is certainly simple enough (especially as its burden is now on the publisher), and its performance is as good as a recursive call. A longer example could cover both cases (EDIT: done. See comment below).

I was convinced that for practical reasons we should allow this optimization, at the cost that you observe.

Even though I believe rule 2.7 removes any need for this optimization, as it allows both trampolining (either in subscribe or request or both) as well as easy detection of recursive calls, at the very least it is necessary to specify another rule saying:


Subscriber MUST NOT call Subscription.request(long n) before all previously received elements have been processed.


as that is the implication of allowing recursion of depth > 1 in request.

(This means two rules -- this one, which is rather burdensome, and 3.3 (limit on recursion depth), which is rather vague -- required to allow an optimization that is probably made unnecessary by rule 2.7)

@pron
Copy link
Author

pron commented Jun 9, 2015

I've created a rudimentary implementation for what I believe to be a fully generic thread-less publisher. It handles all possible use cases for such a publisher allowed by the spec, it allows recursion of depth 1 only, and avoids all problems in the use-case I provided (MySubscriber). Also, I believe it is simple enough, and just as performant as a recursive solution. It can serve as a basis for a helper class provided alongside the API.

Is there any purpose served by allowing deeper recursion that is not served by this implementation?

@akarnokd
Copy link
Contributor

akarnokd commented Jun 9, 2015

Few remarks:

  1. In onSubscribe(), you may want to set recursive to false before supply() is called.
  2. You can combine recursive and pending: the transition from 0 to n by pending indicates a supply run can start.
  3. You should validate n in request() to be positive and handle a long overflow.
  4. You don't really need to set recursive back to false after an error or completion event.
  5. I'd chech !done in the onXXX methods just in case.

@pron
Copy link
Author

pron commented Jun 9, 2015

@akarnokd Thanks! Fixed, except for 2. You are, of course, correct: The recursive field is not required. You can tell by the value of pending whether the call is recursive or not (recursive == true iff pending > 0 at the beginning of the request method). But I wanted to make the recursion-detection explicit because this is just for demonstration purposes.

@viktorklang
Copy link
Contributor

Apologies, I haven't had enough cycles to keep up with this Issue, is there something that must be fixed or can we close it?

@pron
Copy link
Author

pron commented Jun 24, 2015

Hi.
Well, if this generic thread-less publisher does indeed address all of the possible motivations for allowing arbitrary-depth recursion -- and it does so with no more than one recursive call -- then I see no reason why the spec shouldn't be simplified (I can't think of an example where this publisher wouldn't meet all the needs of a thread-less publisher or would perform any worse than arbitrary-depth recursion).

Both the spec itself would be simpler (rule 3.3 could then be dropped, and 1.3 would hold true without need for additional caveats), as well as remove a burden from the subscribers, as their behavior would no longer be determined by the timing of the call to request. In that case, rules 3.10 and 3.11 should be changed as per my suggestion in "alternative 2".

If, OTOH, there are uses for arbitrary recursion depth not addressed by the linked publisher, then two clarifications must be made in order for the spec to be consistent:

  1. Rule 1.3 would need to clarify that subscriber methods may, in fact, be called concurrently -- if they are called recursively by the subscription.
  2. An additional rule must be added stating that a subscriber must not call request before it has finished processing all previously received elements, and that no computation in any way affected by previous requests may be done following the call to request. As it stands today, conformant subscribers that call request too soon, may behave in unexpected ways -- or crash -- depending on exactly the subscription is implemented (see the MySubscriber example I provided in one of the comments -- it fully abides by the spec, yet it is wrong).

@viktorklang
Copy link
Contributor

Rule 1.3 would need to clarify that subscriber methods may, in fact, be called concurrently -- if they are called recursively by the subscription.

I don't understand how this is to be considered concurrently. Could you elaborate what you mean?

An additional rule must be added stating that a subscriber must not call request before it has finished processing all previously received elements, and that no computation in any way affected by previous requests may be done following the call to request.

That's a no go. request can be called whenever the Subscriber wants more elements, if that is to accomodate for a resized input buffer or to keep the buffer optimally filled based on downstream demand is to its own discretion.

As it stands today, conformant subscribers that call request too soon, may behave in unexpected ways -- or crash -- depending on exactly the subscription is implemented (see the MySubscriber example I provided in one of the comments -- it fully abides by the spec, yet it is wrong).

Perhaps the fix is to recommend any and all synchronous Publishers and Subscribers to appropriately guard for reentrancy.

@rkuhn
Copy link
Member

rkuhn commented Jun 24, 2015

One thing we might want to remember: «Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure.» The point is transporting data elements across an asynchronous boundary, and none of the points above would be an issue if we were talking about that. Synchronous execution is a quirky special case, maybe we need to be more explicit about that.

@pron
Copy link
Author

pron commented Jun 24, 2015

That's a no go.

Excellent, but as it stands, the spec already places this requirement without making it explicit. MySubscriber in my example will fail even though it perfectly conforms to the spec because of rules 3.10 and 3.11.

To make matters very simple, let me state my proposal clearly. Rules 3.10 and 3.11 should be changed to limit the allowed recursion depth to one, namely:


3.10 While the Subscription is not cancelled, Subscription.request(long n) MAY synchronously call onNext on this (or other) subscriber(s), but MUST NOT do so if Subscription.request(long n) has been synchronously called from within onNext or onSubscribe.

3.11 While the Subscription is not cancelled, Subscription.request(long n) MAY synchronously call onComplete or onError on this (or other) subscriber(s), but MUST NOT do so if Subscription.request(long n) has been synchronously called from within onNext or onSubscribe.


The generic thread-less publisher I linked to demonstrates that any thread-less publisher can be easily made to abide by these rules without loss to any functionality or performance. Making this change would also render rule 3.3 superfluous so it can be removed.

@pron
Copy link
Author

pron commented Jun 24, 2015

@rkuhn

Synchronous execution is a quirky special case

Absolutely! As it turns out, limiting recursion depth to 1 takes care all requirements that make that special case necessary, and would render any additional clarification unnecessary (in fact, it would remove a whole rule pertaining to synchronous execution). Placing this inconsequential restriction on the publisher would let all subscribers ignore this special case altogether (as they wouldn't sense any different behavior even in the synchronous case).

@viktorklang
Copy link
Contributor

@pron

I'll have to think some more about it, but currently, given the wording of 3.10 and 3.11, your MySubscriber works as designed (reading the spec it is clear that you may get onNext called within onSubscribe if you call subscription.request there, and you are a synchronous Subscriber, which you chose for yourself).

Given the timing of this discussion (post 1.0), I'd be weary of making any changes that break currently valid implementations.

@pron
Copy link
Author

pron commented Jun 24, 2015

@viktorklang

  1. The spec says nothing about being a synchronous subscriber -- only a synchronous publisher/subscription. Any restriction placed on a subscriber when subscribing to a synchornous source should then be applied to all subscribers, as the subscriber is oblivious to the synchronous nature of the publisher (in fact, the spec allows the publisher to switch from async to syn at any time).
  2. This change would not break any currently valid subscribers, as (implicit) restrictions are only being lifted, and probably few if any existing publishers. Not making it, however, might cause various bugs in future subscribers.

To clarify further: not making this change now, will break subscribers that call request prior to completely finishing any computation relating to previously received elements. That is the implication of allowing arbitrary-depth recursion, but as it is stated now, its (quite severe, IMO) full implications are unclear.

@viktorklang
Copy link
Contributor

@pron

The spec says nothing about being a synchronous subscriber -- only a synchronous publisher/subscription. Any restriction placed on a subscriber when subscribing to a synchornous source should then be applied to all subscribers, as the subscriber is oblivious to the synchronous nature of the publisher (in fact, the spec allows the publisher to switch from async to syn at any time).

3.10 and 3.11 clearly states that Subscription.request MAY synchronously call methods on the Subscriber, as such, it means that if a Subscriber decides it wants to be synchronous, it needs to deal with this fact, don't you agree?

This change would not break any currently valid subscribers, as (implicit) restrictions are only being lifted, and probably few if any existing publishers. Not making it, however, might cause various bugs in future subscribers.

It will most definitely break existing synchronous Publishers.

To clarify further: not making this change now, will break subscribers that call request prior to completely finishing any computation relating to previously received elements. That is the implication of allowing arbitrary-depth recursion, but as it is stated now, its (quite severe, IMO) full implications are unclear.

Do note that the spec is clear what to expect from a Subscriber PoV, and also, that Synchronous Publishers and Subscribers are not the focal point of the initiative, I have a hard time convincing myself that breaking Publishers post 1.0 for a minor simplification has the right cost-benefit ratio—I'd love to hear what the other @reactive-streams/contributors think about the proposal.

A compromise which is non breaking and non-invasive would be to explain the intricacies of synchronous Publishers and Subscribers more clearly.

@viktorklang
Copy link
Contributor

@pron So, from my point of view there's nothing >>wrong<< with your proposal—it's an interesting simplification—it's that it has an unfortunate timing and (regrettably) does not fix a spec bug per se (which would make it much easier to justify breaking changes).

@rkuhn
Copy link
Member

rkuhn commented Jun 24, 2015

Why don’t we include it with RECOMMENDED status? That would not break anything while still simplifying life for future implementors.

@viktorklang
Copy link
Contributor

@rkuhn RECOMMENDED is equivalent to adding to the documentation as Subscribers couldn't count on "at most once"-recursion from the Publisher on subscription.next. Right?

@pron
Copy link
Author

pron commented Jun 24, 2015

@viktorklang

3.10 and 3.11 clearly states that Subscription.request MAY synchronously call methods on the Subscriber, as such, it means that if a Subscriber decides it wants to be synchronous, it needs to deal with this fact, don't you agree?

No, I do not :) It means that every subscriber must be written in such a way to support synchronous calls, because any subscription may act in this way at any time (the spec allows it). The implication of this is that the only way for a subscriber to support such behavior on the part of the subscription is if it calls request only once it's completed processing any previous elements. This is a rather severe limitation, one that you yourself -- and rightfully so -- refused to make explicit because you don't want it. However, as things stand, rules 3.10 and 3.11 implicitly imply it.

EDIT: Sorry, I've misunderstood -- yes, you are right. I still think it makes the synchronous case more complicated than need be.

does not fix a spec bug per se (which would make it much easier to justify breaking changes

I would claim that the implicit burden of issuing request only after all previous processing is completed is a bug, because you yourself said that it is absolutely against the intention of this spec to require it. But if you need an actual "unsoundness" bug rather than "this is more than we bargained for" bug
well, there is one (sort of):

The spec (3.10-11) allows multiple calls to onNext, or onNext and onComplete etc. -- basically any two subscriber methods -- to coexist on the stack at the same time. At least in the Java world, this counts as concurrent calls -- which are prohibited by rule 1.3 (it does not explicitly prohibit concurrent calls from multiple threads). I say "in the Java world" because Java collections throw a ConcurrentModificationException even when the concurrent modification occurs on the same thread as the iterator traversal. So as far as Java iterators are concerned, concurrent access applies to both cases: concurrent calls on different threads or concurrent nested calls on the same threads.

@rkuhn
Copy link
Member

rkuhn commented Jun 24, 2015

@pron What you are overlooking is that the intended use of Subscriber is to dispatch the processing of all signals asynchronously, which makes this point moot. There is no reentrancy once you do that.

@pron
Copy link
Author

pron commented Jun 24, 2015

@rkuhn You are probably right, which is why I think it's good that as someone not involved with writing the spec, I can only guess its intentions but know what it actually says :)

As the spec stands, it certainly devotes quite a few rules to deal with synchronous behavior -- even if it is just a special case. Indeed, if it is a special case, it is best to simplify it and make it as much of a non-issue for subscribers. In that case, it also stands to reason that simplifying the spec now would not be too onerous for publisher implementors.

Also, even if the subscriber does choose to process events by dispatching them to some other worker thread, rules 3.10-11 mean it should still be careful with instruction ordering. For example:

class MyAsyncSubscriber implements Subscriber<Integer> {
    Subscription sn;
    ExecutorService exec = Executors.newSingleThreadExecutor();

    public void onSubscribe(Subscription s) {
        this.sn = s;
        s.request(1);
    }

    public void onNext(Integer x) {
        sn.request(1);
        exec.submit(() -> doSomething(x)); // may throw a RejectedExecutionException depending on the subscription's behavior
    }

    public void onComplete() {
        exec.shutdown(); // might be called before the last submit
    }
}

This may not only submit tasks in an unexpected order (which may matter to someone), it will also fail if request calls onComplete synchronously, resulting in a call to shutdown before the last submit (of course, this subscriber should call request from within the executor, and this one is a bad subscriber as it exerts no backpressure).

So I think what you are saying is that the burden rules 3.10-11 place on the subscriber is not as onerous as I made it out to be, as the burden in this case is not "must not call request before all elements have been processed", but "must not call request before all elements have been submitted for processing"; this burden may indeed be acceptable.

@rkuhn
Copy link
Member

rkuhn commented Jun 24, 2015

Yes, that is indeed what I meant.

We cannot reasonably remove the obligation of the Subscriber to handle reentrant execution of its methods from within calls to request(), that would break the interoperation of such Subscribers with existing Publishers, but this discussion has definitely highlighted that we need to be more explicit about this obligation in the spec—we should also verify that we have these cases fully covered by the TCK.

@pron
Copy link
Author

pron commented Jun 24, 2015

OK. Although, it might be worthwhile to figure out if there are any existing publishers at all that take advantage of arbitrary-depth recursion. Looking at the list of implementations, I can guess that you would find them only in RxJava (all other implementations seem 100% IO related and unlikely to make use of recursion). If there aren't any (or only very few), you might be inclined to change the spec to simplify the synchronous case.

Whatever you decide, thank you (all) for considering this.

@viktorklang
Copy link
Contributor

@pron & @rkuhn,

What I propose is the following:

  1. add a footnote to the "concurrent"-mention to clarify that by concurrent, the multiple-threads kind is referred to. (I wouldn't call recursive methods concurrent but I acknowledge your ConcurrentModificationException argument, even if proper concurrent modification detection has to use volatile to make sure that concurrent access is visible/detectable)
  2. Augment the examples of synchronous Publishers and Subscribers to more clearly elucidate the reader as to the challenges they'll have to address, likely even recommending to keep recursion to 1 as the upper bound.

The end result of this proposal is that it doesn't break any current implementations, makes the spec clearer without changing its semantics and makes implementations of synchronous Publishers and Subscribers more straight forward as the documentation will more clearly articulate what needs consideration.

Does that sound like an acceptable middle road?

@pron
Copy link
Author

pron commented Jun 24, 2015

Sure. Quite reasonable.

I do have to ask, though, if it wouldn't be acceptable to say that while keeping recursion depth to 1 is only recommended, this may become a requirement in a future revision? Specs do sometimes tighten their requirements thus making formerly compliant implementation, non-compliant with the new version. I don't think it's reasonable to assume that if you conform with a standard specification, you will remain compliant for all future versions even if you change nothing. Most standards don't make that promise. In this particular case, tightening the requirements will only "negatively" affect a very small number of publisher implementations, and "positively" affect a potentially very large number of subscriber implementation.

In any case, your decision is perfectly acceptable.

@viktorklang
Copy link
Contributor

I think recommending implementors to keep depth to 1 as a future-proofing strategy is a great suggestion.

@smaldini
Copy link
Contributor

@viktorklang what about adding your foot note then ?

@viktorklang
Copy link
Contributor

@smaldini Open a PR and I'll be more than happy to review :) (I am swamped at the moment :S)

@nomagic
Copy link

nomagic commented Sep 14, 2015

I just saw this issue and I have to say, this kind of confusion is what I had attempted to communicate in #230

@rkuhn 's quote is the money quote:
"One thing we might want to remember: «Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure.» The point is transporting data elements across an asynchronous boundary, and none of the points above would be an issue if we were talking about that"

That's exactly right, and I would go further and say synchronous notification (Publisher calling onXX methods in the Subscriber's thread) should be explicitly proscribed; obviously @benjchristensen disagrees,but the very use case he describes (a Publisher only publishing a few elements), obviates backpressure

@smaldini
Copy link
Contributor

@nomagic I disagree and will try to argue why from my iPhone.
Backpressure is also an information you might want to connect to a publisher regardless of its location (async boundary) for cold source use cases in particular: DB request, Connection reads, queue based messaging source... In these 3 cases at least the demand can be used to control in flight data volume, and therefore apply a form of backpressure beyond the Thread boundaries. E.g., you might want to read connection input on demand, in a predictable way and then pass it onNext. While you can deal with reeantrance by forcing Publisher to dispatch asynchronously I think Processor actually is a more appropriate place where you could end up with this restriction as it would be opt in decision: do you want an async processor to defend publishing from slow consuming or do you want to collocate your pipeline processing. Note that a chain of publisher + N processors + subscriber benefit end to end from that behaviour where actually processors act as message passing components with their own demand bound to their backlog capacity.

Also it allows interesting threading setup like asynchronously dispatching onSubscribe+request only (mitigate slow publisher) versus dispatching all signals (mitigate slow subscriber).

Hope it makes sense, just wanted to weigh in my opinion.
Disclaimer: I am past a few gin-tonics, so I may need to precise my comment tomorrow :)

@nomagic
Copy link

nomagic commented Sep 15, 2015

@smaldini
My point is that there should be 2 distinct "worlds" -> ReactiveStream and Stream.
In the ReactiveStream "world"

Stack
----------
request(int)
onNext(T)

should be forbidden and would be blamed on the Publisher not the Subscriber

We already have built-in mechanisms for backpressure in the Stream world, e.g. pull-based mechanisms like Iterator.next()
And I don't think the desire for frameworks to provide a

Publisher.of(T[]) : Publisher<T>

is worth the attendant confusion by users of the framework, viz. the implementers of Subscriber.

Stream.of(T[]) : Stream<T>

is good enough.
The effort to unify all access to Collection-type things, where synchronous/asynchronous is an (hidden) implementation detail, behind a common interface is a big mistake IMO.

@akarnokd
Copy link
Contributor

I don't see what the problem with calling onNext from request is? If it's due to the stack depth, then I don't think that is an issue due to how JITs may inline the whole set of method calls into a single level.

Here is an Publisher implementation for RxJava 2.x that contains such call that caps the recursion at level 1 (so a request() call from onNext won't reenter the emission loop in request).

If you want to prevent any request from onSubscribe to trigger an emission, then you need extra overhead in the form of an atomic wip counter (work in progress) so until onSubscribe call returns, any requests that were issued are simply remembered and the emission starts after that.

@smaldini
Copy link
Contributor

That's also the way we solve it, it's not really hard and avoid an async hop.

Plus iterator is a bit all or nothing, request is fine grained and allows various behaviour like adapted prefetch. I think the key point is that subscriber is location transparent basically.

Sent from my iPhone

On 15 Sep 2015, at 7:40 am, David Karnok [email protected] wrote:

I don't see what the problem with calling onNext from request is? If it's due to the stack depth, then I don't think that is an issue due to how JITs may inline the whole set of method calls into a single level.

Here is an Publisher implementation for RxJava 2.x that contains such call that caps the recursion at level 1 (so a request() call from onNext won't reenter the emission loop in request).

If you want to prevent any request from onSubscribe to trigger an emission, then you need extra overhead in the form of an atomic wip counter (work in progress) so until onSubscribe call returns, any requests that were issued are simply remembered and the emission starts after that.


Reply to this email directly or view it on GitHub.

@nomagic
Copy link

nomagic commented Sep 15, 2015

@akarnokd
So the short answer (to what's wrong) is that a(synchronicity) is not an implementation detail, it's a semantic difference. Agreed?

Longer Answer:
Suppose a user of your RS framework writes the following code:

1 Publisher<T> data = Publisher.of(T[]) // returns PublisherArraySource
2 data.subscribe(T ->  commitlog.write(T); subscription.request(1);  return;)
. . .    //do some other work

Here's the stack (grows down):

Stack  (Thread A)
-----------
 Publisher.subscribe
 Subscriber.onSubscribe
 Subscription.request
 Subscriber.onNext
 Subscriber.onNext
 . . . .
 Subscriber.onComplete

I'm taking into account your recursion limitation within request by elliding them from the stack

  1. How is that asynchronous?
  2. Given that it's not - how could the programmer know that? Not by referring to the spec (which reads maybe, maybe not) - so what, he/she should run a debugger to step into the source code (if it's available) to figure out why line 2 blocked? Surely we can agree that having to do that for every line of code that invokes Publisher.subscribe is something akin to the 5th Circle of Hell

PublisherArraySource is essentially a glorified forEach(T); and if the programmer intended that. they would have used Stream.of(T[]) .forEach(...)
I'm not suggesting that your code is wrong; it's allowed by the spec (although my reading is it's frowned upon) - and that's a problem

I realize there is zero chance that the spec will be changed; but there isn't a font big or bold enough to explain in plain English that the above behavior is possible (although I don't really know what to do about it).
And I understand the arguments for allowing it (though "streaming" arrays isn't one of them), but I think this freedom/flexibility of implementation does more harm than good on balance.

--End Rant

@viktorklang
Copy link
Contributor

Closing due to inactivity, please reopen if you are also willing to open a PR about adding a recommendation to keep stack depth growth to 1.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

7 participants