Skip to content

Consequences of #46 #230

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
nomagic opened this issue Mar 3, 2015 · 18 comments
Closed

Consequences of #46 #230

nomagic opened this issue Mar 3, 2015 · 18 comments
Labels

Comments

@nomagic
Copy link

nomagic commented Mar 3, 2015

Hello,

I spent some considerable time and care reading the conversation (issue #46); but I'm still uncertain about the consequences for user code (by which in this case I mean #Subscribers).

As I understand it, when implementing a Subscriber (and to keep it simple I'm just focusing on #Subscriber.onNext(T) ), you can make no assumption about whether onNext() will be invoked synchronously by the Producer, or whether it will be invoked asynchronously.

And the recommendation is that you should consider whether the Subscriber does "heavy" processing of T or not (and no I'm not here to quibble about the meaning of heavy)
if "heavy", then Subscriber.onNext() should take care to "schedule" the processing to be done asynchronously, e.g.

void onNext(T)
      this.work-queue.enqueue(T)
      signal() //notify processing thread/pool that there's work
      . . . .

of course there are other ways to do it.
Is that the conclusion that should be drawn (Rule 2.2 sure reads like that)?

I don't see how "heavy" ? async : sync is a proper rule.

It depends on the "synch-ness" of the onNext() call itself, i.e. if it's async, then why would you ever "double up" on async and schedule the processing asynchronously, regardless of it's heaviness (that's sheer gratuitous overhead on any platform)?
Maybe I'm misinterpreting the conclusion, but that strikes me as unworkable.

@akarnokd
Copy link
Contributor

akarnokd commented Mar 3, 2015

Such rules imply a scheduling dimension not covered by the spec. In RxJava 1.0, we let the developer decide where if he/she wants to put the consumption of values via observeOn/subscribeOn. The second thing is that if onXXX methods are sequential in respect of each other, that avoids using defensive synchronization everywhere.

Writing such work-scheduler is difficult: you need to prevent reordering, allow onErrors to cut ahead and even avoid thread-hopping (reduce thread-migration and getting cold-caches).

@nomagic
Copy link
Author

nomagic commented Mar 3, 2015

@akarnokd: Szia. I don't understand your response, except to say that the prescription of a total ordering of onNext() calls does not determine whether onNext() calls are asynchronous or not (sure, if asynchronous, then it's trickier, but still doable). And my question was restricted to the reactive streams API (which has no observe/subscribe(On) methods)

Assume I define a couple of helper classes:

abstract AsyncSubscriber(func: (T) -> void) implements Subscriber<T>
. . . .
void onNext(T)
    this.pool.submit(() -> func(T))

abstract SyncSubscriber(func: (T) -> void) implements Subscriber<T>
void onNext(T)
    func(T)

And a user has some reference to some Producer and has written a lambda to process T's (and therefore knows it's "heaviness")

which of the following should the user write?

producer.subscribe(new AsyncSubscriber(lambda)) or
producer.subscribe(new SyncSubscriber(lambda))

AFAICT, this reactive streams spec states that it depends only on the "heaviness" of the lambda; but it seems to me that to be useful, the best choice ineluctably depends on whether producer invokes onNext() synchronously or asynchronously;
if asynchronously ==> use SyncSubscriber no matter what the lambda does
if synchronously, then choose based on what the lambda does (i.e. how heavy it is)

The rub being that there is nothing in the spec that allows for a user to know/determine which one of those it is or to affect it dynamically (which I presume is what Rx's observeOn does).

Isn't this a problem?

@viktorklang
Copy link
Contributor

It's not about whether the Publisher invokes onNext synchronously or asynchronously, is whether the Subscriber's onNext method does the processing of the element synchronously or asynchronously.
I really recommend having a look at the examples: https://github.com/reactive-streams/reactive-streams-jvm/tree/master/examples/src/main/java/org/reactivestreams/example/unicast

Does that answer your question?

@nomagic
Copy link
Author

nomagic commented Mar 14, 2015

"Does that answer your question?" No.

I don't understand your response;
the link you provided is an example of an asynchronous Subscriber (in 250 lines), the example I provided is as well (in 3 lines).
Both obviously satisfy "Subscriber's onNext method results in the processing of the element asynchronously" - no question.

I don't know if you looked through the long discussion (#46) - the original position that @rkuhn elaborated (AFAIUI, he can speak for himself), is essentially the one I'm echoing here; where exactly is the "asynchronous boundary" between Publishers and Subscribers and which one of the two is responsible for ensuring it?

AFAICT, this spec's deliberate response is ===> crickets. Both your link and my simplified example put the onus on the Subscriber (class), but doing that is extremely inefficient (gratuitously), if the Producer implementation has assumed the onus itself - is that clear?

So I'll repeat my previous question:
I'm a user.
I get a reference to a Producer.
I know what my lambda is (that's the part I'm in charge of) - maybe it does sys.out, maybe it's input into my linear gradient descent algorithm - that's my concern.)
Which of the two options should I choose?

producer.subscribe(new AsyncSubscriber(lambda)) or
producer.subscribe(new SyncSubscriber(lambda))

@rkuhn
Copy link
Member

rkuhn commented Mar 14, 2015

The reasoning behind the rule is that if you run the processing synchronously you steal the processing resources that were allocated for the Publisher. Let’s say that the Publisher is running on a dedicated thread (pool) in order to either compute or obtain the elements it produces. The request(N) method will just let it know that it can subsequently call onNext once the elements become available, but running the gradient descent algo for the next 3 seconds on its dedicated thread will then keep it from coming up with new values or it might miss data coming in from a continuous source—or it might just make querying an external source less efficient.

So the answer is: use only those resources that you know were properly allocated to you. If you control the Publisher then you may conclude that synchronous onNext is appropriate, but if you don’t control it then you should not make such assumptions.

@viktorklang
Copy link
Contributor

@rkuhn 👍

@nomagic
Copy link
Author

nomagic commented Mar 14, 2015

@rkuhn: But that doesn't address my question.

Before continuing, I'd like to make sure that I'm not misunderstanding something. I'd appreciate it if both @viktorklang and @rkuhn would answer this question directly:
Is the following skeleton code legal according to this spec:

class Ticker<Tick> implements Producer<Tick>
Executor onNextPool
Subscriber sub

//Ticker runs on its own thread
run()
{
Tick t = . . .
this.onNextPool.execute(() -> this.sub.onNext(t))
}

According to my understanding, this is completely legal (I'm leaving out the backpressure part of course here as it's not relevant to my question).

This should be a yes/no

@viktorklang
Copy link
Contributor

@nomagic Your code is unfortunately a bit too incomplete for a yes/no answer.
If you submit multiple this.onNextPool.execute(() -> this.sub.onNext(t)) without the previous having run to completion and the pool has more than 1 Thread, and/or you issue other signals concurrently, then you are violating rule 1.3

@nomagic
Copy link
Author

nomagic commented Mar 14, 2015

@viktorklang Fair enough, but is there anything that proscribes a Producer from invoking Subscriber#onNext(T) asynchronously? That's the relevant question (I realize there are other invariants, but they're not relevant here, I think)

@viktorklang
Copy link
Contributor

@nomagic As long as you obey 1.3 and make sure that you take care of safe publication, then I think it is alright. The question, as a followup, would be "Invoke asynchronous in relation to what?"
If you look at AsyncPublisher, it runs asynchronously and as such signals asynchronously.

@rkuhn
Copy link
Member

rkuhn commented Mar 14, 2015

@nomagic I answered the question from the inverse view point. Turning it around it reads: rule 2:2 means that a Publisher can assume that calling onNext is “cheap” (as defined by the context in which the application is developed). This has deliberately been introduced so that defensively pushing this invocation into a ThreadPool is unnecessary. The code sample you show therefore is unnecessarily complex albeit legal (barring the other concerns that you deliberately left out).

@nomagic
Copy link
Author

nomagic commented Mar 14, 2015

@rkuhn : Yes, everything you wrote in your initial response makes sense given the predicate that there are no Producers implemented like Ticker above, i.e. that Producers invoke onNext(T) synchronously. And that was my concern, because in such a case as Ticker, I can never imagine doing

ticker.subscribe(new AsyncSubscriber(lambda))

no matter what the lambda does, be it do a println or prove Fermat's Last Theorem.
And I interpreted the spec (and the conclusion of #46) to suggest it's 50/50 on what Producer implementations do. That was my lament/concern (and I think echoes your original post in #46 - I am/was agreeing with you) - the deliberate ambiguity/silence/"flexibility" of the spec is invidious IMO.

It seems that you and @viktorklang interpret the intent/spirit of the spec to be such that users should expect that Producers will invoke onNext(T) synchronously (with enough wiggle room for exceptional cases - Ticker might be such a case, given its purpose).
I might suggest that there be a corollary added to the Producer rules, reading something like:
"It is RECOMMENDED that Producers signal Subscribers synchronously . . . " - I think that would hit home with users the intent/spirit that you have.

I might also suggest that 2.2 be modified to read:
"If a Subscriber suspects that its processing of signals will negatively impact its Publisher's responsivity, it is RECOMMENDED that it asynchronously dispatches process its signals."
As I interpreted upon first reading "dispatches its signals" to refer to the Subscriber invoking Subscription.request(int) and Subscription.cancel(), the only signals that a Subscriber can truly dispatch.

I prefer that it be made explicit that Ticker-like implementations be disallowed, but . . .

Anyway, thanks to both @rkuhn and @viktorklang, this has been helpful (to me at least)

@viktorklang
Copy link
Contributor

@nomagic

i.e. that Producers invoke onNext(T) synchronously.

onNext is a signal, it is equivalent to sending the message T to its Subscriber and as such it can and should be executed synchronously. There is a natural asynchrony between the sending and the reception of a message.

And that was my concern, because in such a case as Ticker, I can never imagine doing
ticker.subscribe(new AsyncSubscriber(lambda))

the deliberate ambiguity/silence/"flexibility" of the spec is invidious IMO.

How so? See 2.2, 2.7 and 2.11

It seems that you and @viktorklang interpret the intent/spirit of the spec to be such that users should expect that Producers will invoke onNext(T) synchronously (with enough wiggle room for exceptional cases - Ticker might be such a case, given its purpose).

Yes, refer to the rules I mentioned above.

"It is RECOMMENDED that Producers signal Subscribers synchronously . . . " - I think that would hit home with users the intent/spirit that you have.

That is implied by:

"If a Subscriber suspects that its processing of signals will negatively impact its Publisher's responsivity, it is RECOMMENDED that it asynchronously dispatches its signals."

"dispatches its signals" is intended to mean "the invocations of its signals" It would definitely be wrong to change it to "process" since the processing of the signal is, at least to me, the act of doing the work associated with the signal.

I'm open to clarifying this rule by changing to invocations if that would help,
do you think it would?

@nomagic
Copy link
Author

nomagic commented Mar 15, 2015

@viktorklang
Hmmm, something is getting lost in translation, but I'm at a loss as to where the confusion rests.
If Subscriber#onNext runs in the same thread as the Producer's thread that "computes/obtains" the next T, then message delivery/signal (dispatch) is synchronous. (obviously Ticker does not satisfy this).
lambda represents the onNext signal processing (in a SyncSubscriber, the lambda runs in the same thread as Subscriber#onNext; in an AsyncSubscriber, the lambda runs in a different thread than Subscriber#onNext).

"onNext is a signal, it is equivalent to sending the message T to its Subscriber and as such it can and should be executed synchronously. There is a natural asynchrony between the sending and the reception of a message." Was that just a typo? Those two sentences are in contradiction.

For me "message reception" == entry into the Subscriber#onNext method.

"How so"?
Are you objecting to the "ambiguity/silence/"flexibility"" part or the "invidious" part?

Rule 2.2 should mean: if the lambda takes a long time (linear gradient descent), then Subscriber#onNext should ensure/schedule the lambda to run in a separate thread (i.e. the processing of the signal should run in a different thread than the receipt of the signal/message); I'm pretty sure we're on the same page wrt to that; I don't see what "signal" Subscriber#onNext is dispatching; do you mean

AsyncSubscriber:
onNext(T)
{
pool.submit(() -> fun(T)) //signal dispatch??????????
}

constitutes a "signal dispatch"? Maybe that's the confusion? to me, the Subscriber class only "dispatches signals" request and cancel.

In the end, I think all of us agree that the onNext signal processing (the lambda) should run on a separate thread (i.e. asynchronously) than the Producer's "compute/obtain" thread, when the lambda is "heavy", but that can happen in 2 ways:

  1. The Producer takes charge and signals the Subscriber asynchronously (see Ticker) - both you and @rkuhn have indicated that this infringes the spirit of the spec, but conceded that it is allowed
  2. The Subscriber takes charge and schedules the lambda (in body of onNext) on a separate thread (see the 250 and 3 line AsyncSubscriber respectively).

My concern is that due to the "ambiguity/silence/"flexibility"" of the spec, both 1 and 2 end up happening and that's very wasteful, i.e. bad, through no fault of users, but of the spec itself.

Compare to Erlang, where message delivery/receipt is always asynchronous, no exceptions, no mystery, etc.

@nomagic
Copy link
Author

nomagic commented Mar 15, 2015

Ooops - replaced all the Subscription#onNext references to Subscriber#onNext

@nomagic
Copy link
Author

nomagic commented Mar 15, 2015

Stripping all of the cruft away, I'm suggesting 2.2 read:

  1. If processing (of T) will be slow (linear gradient descent), then Subscriber should process T on another thread.

as opposed to currently:
2. If processing (of T) will be slow (linear gradient descent), then Subscriber should dispatch signal on another thread.

Does anyone think 2 is clearer than 1?

@viktorklang
Copy link
Contributor

I'd even go as far as: unless otherwise warranted, and the processing is is near-instant, the Subscriber should process the signal on another thread.

EDIT: s/dispatch/process

@viktorklang
Copy link
Contributor

@nomagic Is there something left to do here or shall we close?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants