-
Notifications
You must be signed in to change notification settings - Fork 534
Contract Definition – Multicast/Unicast – Simplified Types #41
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@@ -19,7 +19,7 @@ The latest preview release is available on Maven Central as | |||
|
|||
Handling streams of data—especially “live” data whose volume is not predetermined—requires special care in an asynchronous system. The most prominent issue is that resource consumption needs to be carefully controlled such that a fast data source does not overwhelm the stream destination. Asynchrony is needed in order to enable the parallel use of computing resources, on collaborating network hosts or multiple CPU cores within a single machine. | |||
|
|||
The main goal of Reactive Streams is to govern the exchange of stream data across an asynchronous boundary—think passing elements on to another thread or thread-pool—while ensuring that the receiving side is not forced to buffer arbitrary amounts of data. In other words, backpressure is an integral part of this model in order to allow the queues which mediate between threads to be bounded. The benefits of asynchronous processing would be negated if the communication of backpressure were synchronous (see also the [Reactive Manifesto](http://reactivemanifesto.org/)), therefore care has been taken to mandate fully non-blocking and asynchronous behavior of all aspects of a Reactive Streams implementation. | |||
The main goal of Reactive Streams is to govern the exchange of stream data across an asynchronous boundary – think passing elements on to another thread or thread-pool — while ensuring that the receiving side is not forced to buffer arbitrary amounts of data. In other words, backpressure is an integral part of this model in order to allow the queues which mediate between threads to be bounded. The benefits of asynchronous processing would be negated if the communication of backpressure were synchronous (see also the [Reactive Manifesto](http://reactivemanifesto.org/)), therefore care has been taken to mandate fully non-blocking and asynchronous behavior of all aspects of a Reactive Streams implementation. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think "fully non-blocking and asynchronous behavior of all aspects of a Reactive Streams implementation" [emphasis mine] is a little too inclusive given there will be times that actions taken in an implementation are non-blocking but not asynchronous. Shouldn't this language be restricted to the specific components in which this behavior should be indisputable vs times when its allowed to be synchronous as long as its non-blocking?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. I had focused more on the lower parts of the document where I had changed it to clearly state it's about being non-blocking, not async vs sync for much of it.
The only two places where truly async behavior is needed as I see it are:
- the
Subscription.request
method (and possiblecancel
) - The need for a
Subscriber
to userequest(n)
in the first place which implies that somewhere down the chain it is async (buffering).
Experience with prototypes in RxJava have led me to believe the Subscription.request
method should allow receiving -1
or some other value to represent infinite – just send me as fast as you can.
This is a very valid use case for a variety of things. For example, if I have a Publisher
that represents a file and it's a choice of the Subscriber
whether it is consumed synchronously or asynchronously, the Subscriber
should be able to tell the Subscription
to send without restriction.
Similarly, if I'm streaming metrics, I an do request(-1)
and then just sample or throttle, since request(n)
doesn't really make much sense in that use case. The data is flowing along at whatever rate it does and I always want to sample whatever the latest value is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I propose to discuss the topic of asynchrony in #46 and open new issues for discussing the other things: we should focus on fixing the README so that it matches the code again ASAP (and then update the website with correct links as well).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
focus on fixing the README so that it matches the code again ASAP (and then update the website with correct links as well).
Agreed, which was my hope with this PR even though I knew it would continue evolving over time.
- The `Publisher.subscribe` method can be called as many times as wanted as long as it is with a different `Subscriber` each time. It is up to the `Publisher` whether underlying streams are shared or not. In other words, a `Publisher` can support multi-subscribe and then choose whether each `Subscription` is unicast or multicast. | ||
- A `Publisher` can refuse subscriptions (calls to `subscribe`) if it is unable or unwilling to serve them (overwhelmed, fronting a single-use data source, etc) and can do so by calling `Subscriber.onError` instead of `Subscriber.onSubscribe` on the `Subscriber` instance calling `subscribe`. | ||
- A `Publisher` should not throw an `Exception`. The only legal way to signal failure (or reject a `Subscription`) is via the `Subscriber.onError` method. | ||
- The `Subscription.request` method must behave asynchronously (separate thread, event loop, trampoline, etc) so as to not cause a StackOverflow since `Subscriber.onNext` -> `Subscription.request` -> `Subscriber.onNext` can recurse infinitely. This allows a `Subscriber` to directly invoke `Subscription.request` and isolate the async responsibility to the `Subscription` instance which has responsibility for scheduling events. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is a little tricky because, using the RingBuffer as an example, it would be impossible to fulfill this requirement as-is. I would deadlock the thread if I tried to checkout another slot in the RingBuffer while in the event thread of the RingBuffer itself. The only solution to this is what we do in Reactor which is simulate tail recursion by placing a task to be dispatched onto a Queue
that will be emptied after the current event handling method has returned.
It also doesn't seem optimal to enforce crossing a thread boundary here. When using the RingBuffer I only ever use a single thread so I can certainly make task execution asynchronous but I can't make it run in another thread (nor would I want to since I'm gaining a lot of performance by not context-switching).
If we're going to mandate crossing an asynchronous boundary here I think it should be fairly explicit what we expect. I don't feel like the current text does enough to explain what's expected and why.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is certainly one of the more tricky things I came across in the implementation. First of all, I am not sure that only allowing async boundary is sensible. For testing purpose, the flow behavior should be the same synchronously. E.g simple RxJava Observable.from(xxx) flow or Reactor Streams.defer(xxx). It is certainly optimized towards async, but what does that mean is we should have a -1 marker to let the Subscription dismisses the backpressure and just fully drain its source. If the publisher detects such capacity on one of its subscriptions, it should never buffer/queue but directly call the subscriber.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It also doesn't seem optimal to enforce crossing a thread boundary here
By async this does not mean crossing thread boundaries. If the producer and consumer are on the same thread this can be achieved via trampolining for example.
I think #46 is a good discussion to continue on this. I won't update this text for now as it feels like it needs further clarification.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should have a -1 marker to let the Subscription dismisses the backpressure and just fully drain its source
I would definitely like to add this to the contract.
|
||
*If N is the total number of demand tokens handed to the Publisher P by a Subscriber S during the time period up to a time T, then the number of onNext calls that are allowed to be performed by P on S before T must be less than or equal to N. The number of pending demand tokens must be tracked by the Producer separately for each of its subscribers.* | ||
*If N is the total number of demand tokens handed to the `Publisher` P by a `Subscriber` S during the time period up to a time T, then the number of `onNext` calls that are allowed to be performed by P on S before T must be less than or equal to N. The number of pending demand tokens must be tracked by the `Producer` separately for each of its subscribers.* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Me no understand. 😕
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The producer must check the capacity on the subscription/subscriber before handing over any onNext. Until Subscription#request is effectively called, the capacity remains null at first. However onSubscribe will probably generate a request(xxx) from the subscriber which will lead to adding to its capacity the requested number then signaling the Publisher to broadcast its onNext events from its buffer (if any, if it is a cold/replayable stream).
Please accept my apologies for joining the discussion late and, perhaps more importantly, uninvited. But I'm not sure how multicast (or multi-subscribe for that matter) brings you closer to solving the problem of non-blocking backpressure (which seems to be the unique selling point of this SPI). Moreover, attempting to solve multicasting in Further, I'm not entirely convinced that a 'multicaster' (currently Finally, thank you for all of your work on this. I really enjoyed reading through your discussion in #19. |
Not a problem at all, this is open for exactly this purpose, for anyone to jump in and get involved! Thank you for doing so.
It doesn't. It actually has little to do with back pressure other than affecting how a The issue was whether this spec should require or dictate
Agreed, but this spec is not intended to define strategies for handling backpressure, only a mechanism for communicating capacity/demand. Each implementation of a For example, in Rx there will be mechanisms for choosing buffering, throttling, sampling, unsubscribing, pausing, blocking and probably others. I can't speak to Akka or Reactor or others, but each implementation will choose how to expose or communicate strategies of what to do. Thus, you could implement a For example: Publisher p = new MyStreamPublisher(Strategy.DROP);
p.subscribe(new MySubscriber()); Does this make sense, or is there something that should be adjusted about the contract or interfaces to better achieve these goals? |
Fully agreed, @benjchristensen. In Akka we will have combinators which express different schemes of attaching and detaching streams: buffering, dropping, conflation, but also many strategies for fan-in and fan-out. The |
Thank you both @benjchristensen, @rkuhn for taking the time to respond to my queries and for giving insight into your planned implementation(s). All your responses make complete sense and greatly improved my understanding - which was previously lacking! Just to test my understanding and for the sake of completeness: I could, hypothetically, overload the subscribe method on my Publisher implementation to accept a strategy at subscription time e.g. trait SomePublisher[T] extends Publisher[T] {
...
def subscribe(subscriber: Subscriber[T], strategy: Strategy): Unit
} Finally, to answer your question, no I cannot envisage a better way of achieving these goals! In fact, I'd go so far as to say it's the most elegant mechanism I've seen to date for handling back pressure in an asynchronous manner. Myself, and the team I work with, are excited to try out some of the implementations. Hopefully at some point we'll get an opportunity to contribute back. |
If the Subscription owns the state, it should be aware that a given subscriber has no capacity and stop the recursion at that stage by buffering, or alternatively if it has capacity it doesn't need to buffer and can call onNext which will recurse until completion. This behavior is the current Reactor one and passes the actual TCK, would that be correct ? Of course if Subscriber.onNext dispatches asynchronously first thing this also prevents stackOverflow, but it doesn't prevent the code to be synchronous friendly for e.g. testing environment. Reactor can use a RingBuffer dispatcher (event loop like) or Synchronous Dispatcher without distinction on the Subscriber side for now. If the async rule is enforced I will probably restrict the dispatching to RingBuffer. |
The following code could be used by an implementation: trait SomePublisher[T] extends Publisher[T] {
...
def subscribe(subscriber: Subscriber[T], strategy: Strategy): Unit
def subscribe(subscriber: Subscriber[T]): Unit
} However, since it doesn't comply with the interface, it would not work for interop, as the normal Alternatively, the For example: new SomePublisher(Strategy.DROP).subscribe(subscriber) |
The current TCK was built against a moving target and the contract definition before we started discussing things, so I have no idea what the TCK is validating.
If all the The only thing that really needs to be "enforced" is that a StackOverflow can't occur, and that means the
Can you explain more what you mean by this? |
This has no issue running synchronously: Stream<Integer> stream = Streams.defer(list).filter(integer -> true).map(integer -> integer).flatMap(Streams::defer);
stream.subscribe(new Subscriber(){
//...
public void onSubscribe(Subscription subscription){
subscription.request(1000); //will stop after 5 and calls onComplete since list has only 5 elements
}
}); Current subscription code (WIP, e.g. the buffer queue especially): If the subscriber is a Reactor one (a subject like MapManyAction, MapAction etc), it will dispatch asynchronously inside mapManyAction.onNext, but in that case it will be synchronous unless we decide that some of the asynchronous boundary needs to move to publisher so Publisher.subscribe has to run asynchronously ? |
@benjchristensen Sorry, not my finest hour. val p = new SomePublisher()
val s1 = new SomeSubscriber(Strategy.DROP) // alternatively, new DroppingSubscriber()
val s2 = new SomeSubscriber(Strategy.RESAMPLE) // or, new ResamplingSubscriber()
p.subscribe(s1)
p.subscribe(s2) Just to provide some context, the reason I'm pursuing this line of questioning is that for some of our work we've had one 'publisher' and many 'subscribers' each with different overflow strategies. |
@mattroberts297 what about moving this logic into the Subscription rather than the Subscriber ? Each subscription is the unique state between a given publisher and subscriber, therefore you can do all kind of buffering/dropping/short circuit etc from there without impacting others subscribers. |
@smaldini Subscription feels like a sensible place to put this logic. But I think an implementation may decide to hide the Subscription and just expose the Publisher and Subscriber. What are your thoughts? |
|
||
#### Comparison with related technologies #### |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this section removed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had a few problems with it while revising this:
- It's not comprehensive to all possible technologies or solutions, particularly when you leave the JVM world and try and take into account everything going on in Javascript, Go, Dart, .Net, etc.
- The "related technologies" are moving targets (such as the fact that Rx is working with this initiative to support all of this) thus baking any statements into this document then requires upkeep.
- This document and project should focus on what this new specification and contract is, not what other projects are.
In short, comparisons should live elsewhere, not in the specification and contract documentation which should be timeless.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, this is a good point; we should probably open the wiki for that purpose
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#49 created.
val p = new SomePublisher()
val s1 = new SomeSubscriber(Strategy.DROP) // alternatively, new DroppingSubscriber()
val s2 = new SomeSubscriber(Strategy.RESAMPLE) // or, new ResamplingSubscriber()
p.subscribe(s1)
p.subscribe(s2) I can envision an implementation like that. The How I'd proceed would depend on whether the publisher was "hot" or "cold". Considering the RESAMPLE strategy I'm assuming it's "hot" (stream of stock prices or metrics for example). In that case I would expect both the Strategy.DROP and Strategy.RESAMPLE subscribers to call |
The |
Based on discussion at #41
I have updated based on feedback. I know there are things still to discuss and change, but can we merge this so that the README is in sync with the code and then let other pull requests and issues be used for further changes? (Note that I'll be quiet for a few days starting in a few hours ... baby is expected tonight/tomorrow). |
Implementation driven subscription API ? publisher.subscribe(subscriber, Subscription.DROP) Subscriber can implement their own strategy too, but having everything managed at subscription level just makes thing simples. At subscription time you are aware of buffer size, at request time you are aware of capacity and can signal. I don't think it makes a difference in the end if the subscriber or the subscription handle it but leaving this open allows libraries not on Actor model to optimize the message passing, e.g. through a RingBuffer implementation. ATM we have a duplicate queue (one for missing capacity in Subscription, one for Dispatching in Subscriber/reactor.rx.action.Action). I'm just trying to find simple middle ground solution but happy with consensus.. NB: Awesome news Ben ! Your wife, yourself and Ben 2.0 take care! |
- The `Publisher.subscribe` method can be called as many times as wanted as long as it is with a different `Subscriber` each time. It is up to the `Publisher` whether underlying streams are shared or not. In other words, a `Publisher` can support multi-subscribe and then choose whether each `Subscription` is unicast or multicast. | ||
- A `Publisher` can reject calls to its `subscribe` method if it is unable or unwilling to serve them (e.g. because it is overwhelmed or bounded by a finite number of underlying resources, etc...). It does this by calling `onError` on the `Subscriber` passed to `Publisher.subscribe` instead of calling `onSubscribe`". | ||
- A `Publisher` should not throw an `Exception`. The only legal way to signal failure (or reject a `Subscription`) is via the `Subscriber.onError` method. | ||
- The `Subscription.request` method must behave asynchronously (separate thread, event loop, trampoline, etc) so as to not cause a StackOverflow since `Subscriber.onNext` -> `Subscription.request` -> `Subscriber.onNext` can recurse infinitely. This allows a `Subscriber` to directly invoke `Subscription.request` and isolate the async responsibility to the `Subscription` instance which has responsibility for scheduling events. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change forecloses the decision in #46; I would strongly prefer to keep the current wording (requiring fully asynchronous behavior of all specified methods) until we have properly settled that point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The wording of this README can and will change as we continue progressing.
This pull request adds far more detail than currently exists and brings us closer to agreement so I suggest merging this and moving forward instead of continuing to bike shed on details that should be modified after and holding up everything else.
Problem I see with that is we can't possibly know or define all strategies up front, so we'd basically have to use an empty marker interface and it would be useless for interop, and therefore not very useful for this project. |
We don't need to add that to the project, neither necessarily go the dropping subscriber route IMO. It should be up to the impl. Sent from my iPhone
|
Updates to README to match simplified types from #40 and update contract based on discussions in unicast/multicast (#19) and closed PR #37