-
Notifications
You must be signed in to change notification settings - Fork 534
Consequences of #46 #230
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
Such rules imply a scheduling dimension not covered by the spec. In RxJava 1.0, we let the developer decide where if he/she wants to put the consumption of values via observeOn/subscribeOn. The second thing is that if onXXX methods are sequential in respect of each other, that avoids using defensive synchronization everywhere. Writing such work-scheduler is difficult: you need to prevent reordering, allow onErrors to cut ahead and even avoid thread-hopping (reduce thread-migration and getting cold-caches). |
@akarnokd: Szia. I don't understand your response, except to say that the prescription of a total ordering of onNext() calls does not determine whether onNext() calls are asynchronous or not (sure, if asynchronous, then it's trickier, but still doable). And my question was restricted to the reactive streams API (which has no observe/subscribe(On) methods) Assume I define a couple of helper classes: abstract AsyncSubscriber(func: (T) -> void) implements Subscriber<T>
. . . .
void onNext(T)
this.pool.submit(() -> func(T))
abstract SyncSubscriber(func: (T) -> void) implements Subscriber<T>
void onNext(T)
func(T) And a user has some reference to some Producer and has written a lambda to process T's (and therefore knows it's "heaviness") which of the following should the user write? producer.subscribe(new AsyncSubscriber(lambda)) or
producer.subscribe(new SyncSubscriber(lambda)) AFAICT, this reactive streams spec states that it depends only on the "heaviness" of the lambda; but it seems to me that to be useful, the best choice ineluctably depends on whether producer invokes onNext() synchronously or asynchronously; The rub being that there is nothing in the spec that allows for a user to know/determine which one of those it is or to affect it dynamically (which I presume is what Rx's observeOn does). Isn't this a problem? |
It's not about whether the Publisher invokes onNext synchronously or asynchronously, is whether the Subscriber's onNext method does the processing of the element synchronously or asynchronously. Does that answer your question? |
"Does that answer your question?" No. I don't understand your response; I don't know if you looked through the long discussion (#46) - the original position that @rkuhn elaborated (AFAIUI, he can speak for himself), is essentially the one I'm echoing here; where exactly is the "asynchronous boundary" between Publishers and Subscribers and which one of the two is responsible for ensuring it? AFAICT, this spec's deliberate response is ===> crickets. Both your link and my simplified example put the onus on the Subscriber (class), but doing that is extremely inefficient (gratuitously), if the Producer implementation has assumed the onus itself - is that clear? So I'll repeat my previous question: producer.subscribe(new AsyncSubscriber(lambda)) or
producer.subscribe(new SyncSubscriber(lambda)) |
The reasoning behind the rule is that if you run the processing synchronously you steal the processing resources that were allocated for the Publisher. Let’s say that the Publisher is running on a dedicated thread (pool) in order to either compute or obtain the elements it produces. The So the answer is: use only those resources that you know were properly allocated to you. If you control the Publisher then you may conclude that synchronous |
@rkuhn 👍 |
@rkuhn: But that doesn't address my question. Before continuing, I'd like to make sure that I'm not misunderstanding something. I'd appreciate it if both @viktorklang and @rkuhn would answer this question directly: class Ticker<Tick> implements Producer<Tick>
Executor onNextPool
Subscriber sub
//Ticker runs on its own thread
run()
{
Tick t = . . .
this.onNextPool.execute(() -> this.sub.onNext(t))
} According to my understanding, this is completely legal (I'm leaving out the backpressure part of course here as it's not relevant to my question). This should be a yes/no |
@nomagic Your code is unfortunately a bit too incomplete for a yes/no answer. |
@viktorklang Fair enough, but is there anything that proscribes a Producer from invoking Subscriber#onNext(T) asynchronously? That's the relevant question (I realize there are other invariants, but they're not relevant here, I think) |
@nomagic As long as you obey 1.3 and make sure that you take care of safe publication, then I think it is alright. The question, as a followup, would be "Invoke asynchronous in relation to what?" |
@nomagic I answered the question from the inverse view point. Turning it around it reads: rule 2:2 means that a Publisher can assume that calling |
@rkuhn : Yes, everything you wrote in your initial response makes sense given the predicate that there are no Producers implemented like Ticker above, i.e. that Producers invoke onNext(T) synchronously. And that was my concern, because in such a case as Ticker, I can never imagine doing ticker.subscribe(new AsyncSubscriber(lambda)) no matter what the lambda does, be it do a println or prove Fermat's Last Theorem. It seems that you and @viktorklang interpret the intent/spirit of the spec to be such that users should expect that Producers will invoke onNext(T) synchronously (with enough wiggle room for exceptional cases - Ticker might be such a case, given its purpose). I might also suggest that 2.2 be modified to read: I prefer that it be made explicit that Ticker-like implementations be disallowed, but . . . Anyway, thanks to both @rkuhn and @viktorklang, this has been helpful (to me at least) |
onNext is a signal, it is equivalent to sending the message T to its Subscriber and as such it can and should be executed synchronously. There is a natural asynchrony between the sending and the reception of a message.
How so? See 2.2, 2.7 and 2.11
Yes, refer to the rules I mentioned above.
That is implied by: "If a Subscriber suspects that its processing of signals will negatively impact its Publisher's responsivity, it is RECOMMENDED that it asynchronously dispatches its signals." "dispatches its signals" is intended to mean "the invocations of its signals" It would definitely be wrong to change it to "process" since the processing of the signal is, at least to me, the act of doing the work associated with the signal. I'm open to clarifying this rule by changing to invocations if that would help, |
@viktorklang "onNext is a signal, it is equivalent to sending the message T to its Subscriber and as such it can and should be executed synchronously. There is a natural asynchrony between the sending and the reception of a message." Was that just a typo? Those two sentences are in contradiction. For me "message reception" == entry into the Subscriber#onNext method. "How so"? Rule 2.2 should mean: if the lambda takes a long time (linear gradient descent), then Subscriber#onNext should ensure/schedule the lambda to run in a separate thread (i.e. the processing of the signal should run in a different thread than the receipt of the signal/message); I'm pretty sure we're on the same page wrt to that; I don't see what "signal" Subscriber#onNext is dispatching; do you mean AsyncSubscriber:
onNext(T)
{
pool.submit(() -> fun(T)) //signal dispatch??????????
} constitutes a "signal dispatch"? Maybe that's the confusion? to me, the Subscriber class only "dispatches signals" request and cancel. In the end, I think all of us agree that the onNext signal processing (the lambda) should run on a separate thread (i.e. asynchronously) than the Producer's "compute/obtain" thread, when the lambda is "heavy", but that can happen in 2 ways:
My concern is that due to the "ambiguity/silence/"flexibility"" of the spec, both 1 and 2 end up happening and that's very wasteful, i.e. bad, through no fault of users, but of the spec itself. Compare to Erlang, where message delivery/receipt is always asynchronous, no exceptions, no mystery, etc. |
Ooops - replaced all the Subscription#onNext references to Subscriber#onNext |
Stripping all of the cruft away, I'm suggesting 2.2 read:
as opposed to currently: Does anyone think 2 is clearer than 1? |
I'd even go as far as: unless otherwise warranted, and the processing is is near-instant, the Subscriber should process the signal on another thread. EDIT: s/dispatch/process |
@nomagic Is there something left to do here or shall we close? |
Hello,
I spent some considerable time and care reading the conversation (issue #46); but I'm still uncertain about the consequences for user code (by which in this case I mean #Subscribers).
As I understand it, when implementing a Subscriber (and to keep it simple I'm just focusing on #Subscriber.onNext(T) ), you can make no assumption about whether onNext() will be invoked synchronously by the Producer, or whether it will be invoked asynchronously.
And the recommendation is that you should consider whether the Subscriber does "heavy" processing of T or not (and no I'm not here to quibble about the meaning of heavy)
if "heavy", then Subscriber.onNext() should take care to "schedule" the processing to be done asynchronously, e.g.
of course there are other ways to do it.
Is that the conclusion that should be drawn (Rule 2.2 sure reads like that)?
I don't see how "heavy" ? async : sync is a proper rule.
It depends on the "synch-ness" of the onNext() call itself, i.e. if it's async, then why would you ever "double up" on async and schedule the processing asynchronously, regardless of it's heaviness (that's sheer gratuitous overhead on any platform)?
Maybe I'm misinterpreting the conclusion, but that strikes me as unworkable.
The text was updated successfully, but these errors were encountered: