-
Notifications
You must be signed in to change notification settings - Fork 534
2.5 Why isn't it required to cancel the active Subscription? #317
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
Even if they are both cancelled, the In Rsc/Reactor, we choose to cancel the incoming second In theory, you could switch the |
@akarnokd nails the reply here. As for backstory, in the beginning there was a proposal to have all the signals have the Subscription as a parameter, this not only meant that the Subscriber could be immutable but also you could do demuxing. The massive downside of that proposal is given that Reactive Streams is a communications protocol, it would mean that for every element passed into |
Thanks, that makes sense. But then why not change rule 1.9 to something like the following?
It seems like this wouldn't be too much asked of the Publisher, while at the same time guaranteeing that a Subscriber could properly fend off an intruding Publisher as explained by @akarnokd. |
How would the Publisher know that the Subscription was cancelled during the execution of onSubscribe? |
Maybe I'm missing something, but wouldn't the Publisher only have to check this in case it wants to send an onComplete/onError signal in its class SubscriptionDecorator implements Subscription {
private final Subscription delegate;
private volatile boolean cancelled;
SubscriptionDecorator(Subscription delegate) {
this.delegate = delegate;
}
boolean isCancelled() {
return cancelled;
}
public void cancel() {
if (!cancelled) {
delegate.cancel();
this.cancelled = true;
}
}
public void request(long n) {
if (!cancelled) {
delegate.request(n);
}
}
} and then a Publisher which completes immediately could be implemented as: class EmptyPublisher implements Publisher<T> {
public void subscribe(Subscriber<? super T> s) {
SubscriptionDecorator decorator = new SubscriptionDecorator(...);
s.onSubscribe(decorator);
if (!decorator.isCancelled()) {
s.onComplete();
}
}
} |
Yep, that's how it usually looks. (You should swap the cancelled = true with the call to delegate to avoid reentrancy problems) |
Closing this issue, since the original question was answered & I've created follow-up issue #325 for the proposal to change rule 1.9. |
Rule 2.5 states: A Subscriber MUST call Subscription.cancel() on the given Subscription after an onSubscribe signal if it already has an active Subscription.
Why doesn't it additionally state that the Subscriber MUST call Subscription.cancel() on the active Subscription as well?
Say I have Subscriber S, with an active Subscription for Publisher P1. Now if Publisher P2 invokes onSubscribe on S:
So I believe that, as soon as onSubscribe is invoked on a Subscriber with an active Subscription, the Subscriber is broken and both the new and active Subscription must immediately be cancelled.
The text was updated successfully, but these errors were encountered: