Skip to content

Document if Publisher.subscribe is allowed to block #393

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
guenhter opened this issue Sep 28, 2017 · 22 comments
Closed

Document if Publisher.subscribe is allowed to block #393

guenhter opened this issue Sep 28, 2017 · 22 comments

Comments

@guenhter
Copy link

Hi,

I'm using reactive-streams and wrote a Publisher for ZeroMq.
One of the first things is stumbled over was, if the method Publisher.subscribe(...) must return and emit the results async, or if the method can block.

I looked up the javadoc of the method, but it isn't documented. Would be great to make a statement what is expected by that method.

@viktorklang
Copy link
Contributor

@guenhter Did your publisher pass the TCK?

@akarnokd
Copy link
Contributor

I think it is unavoidable to have blocking code inside subscribe since we have to interact with non-reactive and often blocking code if we have to expose them as reactive in some way. Some reactive libraries have specific operators that move a possibly blocking subscribe call to a background thread:

Flowable.fromCallable(() -> {
    Thread.sleep(1000);
    return 1;
})
.subscribeOn(Schedulers.io())
.subscribe(System.out::println);

The main use case is to free up the main thread. You can inline the two so your subscribe implementation starts a task in some thread pool which can "freely" block.

The TCK can't and doesn't detect if your subscribe blocks as from the TCK's perspective, they appear as synchronous sources.

@viktorklang
Copy link
Contributor

viktorklang commented Sep 28, 2017

@akarnokd I guess it depends on the definition of "blocking" used. I think it would be surprising to users if they had to assume/do defensive backgrounding of subscribe calls as a general practice. I think it is fair to assume that "work is done" within subscribe, and that the method takes time to execute, but stalling the calling thread perpetually?

EDIT: The TCK comment was to make sure that all the other bits and pieces are already verified.

@akarnokd
Copy link
Contributor

akarnokd commented Sep 28, 2017

It happens all the time on Android with network or file resources being streamed. Most underlying components use blocking calls (i.e., FileInputStream.read() type of blocking).

@viktorklang
Copy link
Contributor

viktorklang commented Sep 28, 2017

@akarnokd Well, technically referencing new Class causes it to be loaded, which could be a URLClassLoader which could technically block forever. I'm just saying that the general case one should assume that calling subscribe is safe (i.e. calling thread is not blocked perpetually).

@rkuhn
Copy link
Member

rkuhn commented Sep 28, 2017

@guenhter Thanks for pointing out an omission: for Subscription.request and Subscription.cancel we have documented the intention of non-obstructive behavior, perhaps that should also be added to Publisher.subscribe. Since all information flow between Publisher and Subscriber is asynchronous, it should not be a problem to construct a Subscription without involving blocking ømq APIs—asynchronous failures can easily be signalled using onError.

@guenhter
Copy link
Author

guenhter commented Sep 28, 2017

@rkuhn ømq was just an example. I guess it should be seen more general. If e.g. we have a Socket or some other API which reads data in a blocking fashion is it allowed to do the following

class FooPublisher : Publisher<Int> {
    //...
    override fun subscribe(sub: Subscriber<in Int>) {
        while (running) {
            val number = receiver.readNext()  // this call can block until the next element arrives (or close is called)
            sub.onNext(number)
        }
    }
}

@rkuhn
Copy link
Member

rkuhn commented Sep 28, 2017

@guenhter Please read the spec in full: that example code violates many of the Reactive Streams rules, including not respecting back-pressure.

If creating a certain Socket is a blocking operation, then it would be prudent to not create the Socket in .subscribe() but defer it to another thread (pool). The idea of Reactive Streams is to have Publisher and Subscriber use different resources, especially threads.

@viktorklang
Copy link
Contributor

viktorklang commented Sep 28, 2017

@guenhter @rkuhn Deferring blocking operations to a pool provided to, or managed by, the Publisher would be the way to do it.

@akarnokd
Copy link
Contributor

akarnokd commented Sep 28, 2017

I think if it comes to amending the spec, this expectation should be listed as SHOULD. Mandating to go asynchronous when subscribing may be an overkill, especially when interfacing with a legacy (gen 0) reactive source such as GUI events. Most GUI frameworks have the requirement that attaching a listener happens on the GUI thread and whenever this happens in the user code, the execution is already on the GUI thread.

RxJava users face this often and there are two practices suggested to them: 1) specify in the method javadoc returning a Publisher that it is blocking and subscribing to it should happen on a background thread or 2) the method expects an Executor-like parameter which will be tasked to block. Users may not like getting blocked but our experience indicates they don't like getting their execution tossed around to arbitrary threads without much control either.

@rkuhn
Copy link
Member

rkuhn commented Sep 28, 2017

@viktorklang I think there is a good point to this issue that needs to be addressed: we have not documented appropriately what the expectations for Publisher.subscribe() are in terms of how long it may suspend the caller thread.

@rkuhn rkuhn reopened this Sep 28, 2017
@viktorklang
Copy link
Contributor

@rkuhn Yes, I agree. Adding an advisory Responsivity clause is both properly backwards compatible, as well as guiding implementations as to what the desired properties should be.

@viktorklang
Copy link
Contributor

@akarnokd Switching it to a MUST would be backwards incompatible, a SHOULD or RECOMMENDED might be more appropriate.

@viktorklang
Copy link
Contributor

@rkuhn @akarnokd There are implications though: for sync Pub-Sub pairs.

@viktorklang
Copy link
Contributor

Oops, I accidentally pressed the lovely "Close and comment".

@akarnokd
Copy link
Contributor

@viktorklang I'm glad to hear it.

@guenhter
Copy link
Author

guenhter commented Oct 3, 2017

In org.reactivestreams:reactive-streams-examples:1.0.1 there are some great examples of Publisher-classes but sadly, they are all async. When the documentation is enhanced (is it?), it would be helpful to add just one sync example of the Publisher.

@viktorklang
Copy link
Contributor

@guenhter Yep, a sync Publisher would be great to add to the examples!

@akarnokd
Copy link
Contributor

akarnokd commented Oct 3, 2017

@guenhter done: #395

@guenhter
Copy link
Author

guenhter commented Oct 3, 2017

Wow! This was fast. Thanks.

@viktorklang
Copy link
Contributor

Merged!

@adamw
Copy link

adamw commented Nov 20, 2024

The example is there, but I think the specification changes are missing?

I spent some considerable time today trying to figure out if Publisher.subscribe can block (until all elements are pushed to the subscriber, in my case). It seemed unlikely, but then the other non-blocking methods are explicitly required to be non-obstructing, which made me think again ;)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants