-
Notifications
You must be signed in to change notification settings - Fork 534
Structured concurrency and cancellation #482
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
Your observation can be summarised with “Reactive Streams is not a resource management framework”, which is true as per the mission statement: Reactive Streams only concerns itself with transmitting unbounded streams of data over asynchronous boundaries. How these streams are generated or consumed is of no concern to this communication protocol. In particular, signalling Cancellation in the same vein only transmits the intent of dropping all future stream elements. How the Publisher reacts to that is left intentionally vague. I recommend using a resource-managed entity like an Actor for your Publisher such that you can control its lifecycle and resource usage. |
This level of resource-awareness is not part of the specification or part of the design goals. (In contrast, this was somewhat considered with The general workaround is to bring the computation to the resource, like an actor (or event loop) that can correlate the subscriber's actions with the resource's lifecycle, as @rkuhn mentioned |
Thanks for the detailed response, @rkuhn and @akarnokd. Unfortunately, I cannot use any resource management framework. I will describe my use-case just to give an additional data-point for your consideration, not expecting a solution or anything to be fixed. I am working on Kotlin's It allows to do things like this:
The consumer now can do the following:
Now, taking into account the popularity of RS and similarities between
|
@qwwdfsad The Reactive Streams interfaces are not designed for, or intended as, an end-user API—it is more of an SPI to facilitate spec-conformant interoperability. |
Well, the same question applies to |
The general question is "How to maintain structured concurrency between publisher and subscriber in the face of cancellation?"
One of the ideas is to have the ability to merge concurrent (and potentially parallel) control flow.
For example, consider the finite
Publisher
that creates a resource per subscriber. It could be a thread, a socket or anything else that requires explicit close that may take indefinitely long period of time.In a basic scenario (let's say subscriber request is always unbounded), a subscriber receives all the data, then the
Publisher
(or itsSubscription
, to be more precise) closes the underlying resource (again: it may take indefinite time!) and only then invokesSubscriber.onComplete
.After receiving
onComplete
event, subscriber knows for sure that all resources associated with its subscription are properly released at this point (aka control flow is merged back into a single point).In a more advanced scenario,
Subscriber
cancels its subscription in the middle of the stream. E.g. it does not care about the rest of the elements because a corresponding user's session was terminated or for whatever reason.At this point, the protocol explicitly allows omitting
onComplete
:onSubscribe onNext* (onError | onComplete)?
(also,untested_spec108_possiblyCanceledSubscriptionShouldNotReceiveOnErrorOrOnCompleteSignals
only confirms my suspicions).Meaning that, after the call to
Subscription.cancel
, the subscriber cannot be sure that the corresponding resource is properly terminated, neither it has the API to be notified, creating a potentially unbounded memory leak (e.g. in the situations when the new subscription is immediately established as soon as the previous one is complete or cancelled) or an unbounded resource consumption without an ability to throttle it at subscriber level.Could you please elaborate whether it is designed behaviour, grey area of the specification or I am just missing something and subscription termination can be properly detected?
The text was updated successfully, but these errors were encountered: