-
Notifications
You must be signed in to change notification settings - Fork 534
Bounded publisher onComplete signal timing (related to rule 1.5) #543
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
Request affects onNexts only. Sources are free to signal onComplete or onError without requests or without fulfilling requests fully. |
So would the publisher implementation be free to decide how it should handle that? It seems like that it something that would be agreed upon so that any implementation of subscriber could have an expectation of what the publisher should do. I've encountered a bug in a project where a subscriber is requesting only 1 item on a publisher that it knows only has one item. Most libraries will call onComplete after emitting the one item but some do not. I'm trying to figure out if it is the responsibility of the subscriber to request more or the publisher to signal onComplete when it emits the one item. |
I see. There are cases indeed when the source can't determine it should complete because there won't be further items unless it actively tries to find out. This can happen in intermediate publishers. For example, a list of items is filtered. You know the filter will match the one item in the middle so you request 1. None of the rest will match but the source is not allowed to scan ahead until more is requested. You won't get oncomplete until another request. If you know there is only 1 item, cancel after 1 onNext and execute your onComplete logic right there. |
That makes sense, thank you! In that case, should subscribers always call .cancel() when they expect only 1 item, given you don't know the implementation of your publisher, just that it has one 1 item? public class OneItemSubscriber implements Subscriber<Integer> {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(Integer item) {
System.out.println("Received item: " + item);
subscription.request(1);
subscription.cancel();
onComplete(); // or perhaps inline the onComplete here?
}
@Override
public void onError(Throwable throwable) {
System.out.println("Error: " + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("Completed");
}
} |
I've been looking at a few different implementations of subscribers and something has been ambiguous to me. When a bounded publisher of n items receives a request for n items, should it signal onComplete? Is it the job of the subscriber to request n+1 to receive a completion signal, or is it the job of the subscriber to signal onComplete when it runs out of items?
It feels like the former should be correct but that means to receive a completion state, onNext should always call request, or at least request(2) in the onSubscribe, which also doesn't seem quite right.
Is there any specification guideline for this that I somehow missed?
The text was updated successfully, but these errors were encountered: