Skip to content

Rule 3.1 vs asynchronous calls #228

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
akarnokd opened this issue Mar 2, 2015 · 6 comments
Closed

Rule 3.1 vs asynchronous calls #228

akarnokd opened this issue Mar 2, 2015 · 6 comments

Comments

@akarnokd
Copy link
Contributor

akarnokd commented Mar 2, 2015

§3.1: Subscription.request and Subscription.cancel MUST only be called inside of its Subscriber context. A Subscription represents the unique relationship between a Subscriber and a Publisher.

"Subscriber context" feels a bit vague. While reimplementing RxJava to be natively j.u.c.Flow (and RS compliant), I found that most non-trivial Subscriptions (subscriptions just wrapping other Subscriptions or applying a function to the requested amount) either have to serialize access to request() and cancel methods or be completely thread-safe.

For example, consider a takeUntil() operator which let's onNext events through until a specified time elapsed. The timeout most likely triggered from another thread in which case the Subscription.cancel needs to be invoked asynchronously.

If I interpret §3.1 strictly, one could only set a terminal flag on the operator's Subscriber to indicate it should drop any further values and cancel the subscription from its onNext method the next time a value comes from upstream. This approach, however, delays the upstream cancellation indefinitely and requires serializing access to the downstream's Subscriber anyway.

I'd say Subscription.request and Subscription.cancel MUST be called in serialized fashion in respect to each other.

Given the rest of the rules, I'd also say the most basic/efficient, rule-abiding Subscription implementation is built around a volatile long counter: negative value means cancelled, zero means nothing requested yet an positive value is a pending request.

@viktorklang
Copy link
Contributor

Hi David,

Have you had a look at the example implementations of publishers and
subscribers?

Personally I have not seen any need for what you describe, is it an

artifact of how you have decided to implement the spec?

Cheers,

On 2 Mar 2015 18:39, "David Karnok" [email protected] wrote:

§3.1: Subscription.request and Subscription.cancel MUST only be called
inside of its Subscriber context. A Subscription represents the unique
relationship between a Subscriber and a Publisher.

"Subscriber context" feels a bit vague. While reimplementing RxJava to be
natively j.u.c.Flow (and RS compliant), I found that most non-trivial
Subscriptions (subscriptions just wrapping other Subscriptions or
applying a function to the requested amount) either have to serialize
access to request() and cancel methods or be completely thread-safe.

For example, consider a takeUntil() operator which let's onNext events
through until a specified time elapsed. The timeout most likely triggered
from another thread in which case the Subscription.cancel needs to be
invoked asynchronously.

If I interpret §3.1 strictly, one could only set a terminal flag on the
operator's Subscriber to indicate it should drop any further values and
cancel the subscription from its onNext method the next time a value
comes from upstream. This approach, however, delays the upstream
cancellation indefinitely and requires serializing access to the
downstream's Subscriber anyway.

I'd say Subscription.request and Subscription.cancel MUST be called in
serialized fashion in respect to each other.

Given the rest of the rules, I'd also say the most basic/efficient,
rule-abiding Subscription implementation is built around a volatile long
counter: negative value means cancelled, zero means nothing requested yet
an positive value is a pending request.


Reply to this email directly or view it on GitHub
#228.

@akarnokd
Copy link
Contributor Author

akarnokd commented Mar 2, 2015

The examples are a bit hard to read with all those validations (I've introduced a Conformance class to keep all behavior and text in the same place).

What I could make out from the examples is that it is intertwined with scheduling the events on an Executor, we have this factored out in Rx* into observeOn.

The other thing I've noticed is that SyncSubscriber doesn't error if subscription != null just ignores the fact, but rule §2.12 mandates a single call to onSubscribe. onNext, onError and onComplete ignore subscription being null, but §1.9 mandates it being non-null. Besides throw null; can't convey a reference to any rules in its message.

@viktorklang
Copy link
Contributor

On 2 Mar 2015 22:01, "David Karnok" [email protected] wrote:

The examples are a bit hard to read with all those validations (I've
introduced a Conformance class to keep all behavior and text in the same
place).

I'm surprised that they are hard to read give the considerable amount of
comments :-(

What I could make out from the examples is that it is intertwined with
scheduling the events on an Executor, we have this factored out in Rx* into
observeOn.

Not intertwined: the assumption is that the Publisher and the Subscriber
are their own asynchronous entities. Without means of executions they are
simply calling thread parasites :-)

The other thing I've noticed is that SyncSubscriber doesn't error if
subscription != null just ignores the fact, but rule §2.12 mandates a
single call to onSubscribe. onNext, onError and onComplete ignore
subscription being null, but §1.9 mandates it being non-null.

Interesting. I'll have a look when I am back from my vacation. Thanks for
noticing!

Besides throw null; can't convey a reference to any rules in its message.

Mandating verbatim exception messages is hard because internationalisation
though.


Reply to this email directly or view it on GitHub.

@viktorklang
Copy link
Contributor

The other thing I've noticed is that SyncSubscriber doesn't error if subscription != null just ignores the fact, but rule §2.12 mandates a single call to onSubscribe. onNext, onError and onComplete ignore subscription being null,

2.12: "Subscriber.onSubscribe MUST be called at most once for a given Subscriber (based on object equality)."

This does not say that subsequent calls be ignored.

but §1.9 mandates it being non-null.

Yes, did you see: https://github.com/reactive-streams/reactive-streams-jvm/blob/master/examples/src/main/java/org/reactivestreams/example/unicast/SyncSubscriber.java#L21

Besides throw null; can't convey a reference to any rules in its message.

The stack trace will point to what's null but adding a message is certainly possible, we need to keep in mind that it is an example.

The above aside, is there anything here that is unaddressed, if so, please let us know!

@akarnokd
Copy link
Contributor Author

That's all for now.

@viktorklang
Copy link
Contributor

Thanks for raising these questions, we clearly have more to do in order to communicate clearly how things are dealt with

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants