-
Notifications
You must be signed in to change notification settings - Fork 13
Write backpressure #25
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
@smaldini and I were discussing backpressure with A concern brought up was the default behavior of RxJava Vertical vs Horizontal Bounded BuffersI think of buffers in a streaming system in two directions, vertical and horizontal. The vertical buffering is the pub/sub relationship and the one we generally think of. For example: Observable.range(0, 10000).observeOn(Schedulers.io()).subscribe(observer); This goes from "top" to "bottom" and has a "vertical" buffer inside All "vertical" buffers in RxJava are bounded. This is the Reactive Streams backpressure semantics. No producer can overwhelm a consumer. Horizontal buffering comes when composing multiple streams. For example: Observable s1 = Observable.range(0, 10000);
Observable s2 = Observable.range(0, 10000);
Observable.merge(s1, s2).observeOn(Schedulers.io()).subscribe(observer); In this case we have bounded vertical buffering on both DeadlockIf I artificially bound the horizontal buffering a system can deadlock. Here is an example showing how: Observable s1 = Observable.never(); // simulate a very latent or empty stream that never completes
Observable s2 = Observable.never();
...
Observable s10 = Observable.just(1);
Observable.merge(s1, s2, ..., s10).take(1).observeOn(Schedulers.io()).subscribe(observer); If
This behaves the same if I structure it as an Observable s1 = Observable.never(); // simulate a very latent or empty stream that never completes
Observable s2 = Observable.never();
...
Observable s10 = Observable.just(1);
Observable<Observable<?>> os = Observable.from(s1, s2, ..., s10);
Observable.merge(os).take(1).observeOn(Schedulers.io()).subscribe(observer); Even though we can use StarvationA similar issue is with long-lived streams, fairness and starvation. Observable s1 = Observable.interval(1, TimeUnit.SECONDS).map(i -> a());
Observable s2 = Observable.interval(1, TimeUnit.SECONDS).map(i -> b());
...
Observable s10 = Observable.interval(1, TimeUnit.SECONDS).map(i -> j());
Observable.merge(s1, s2, ..., s10).take(1).observeOn(Schedulers.io()).subscribe(observer); If concurrency is limited to 8, we will receive data from functions Developer ChoiceFor these 2 reasons we can not artificially limit the horizontal buffering. If a developer merges There are however use cases where this results in horizontal buffer bloat, primarily when trying to do computationally driven parallel processing such as this: Observable.range(0, 50000000)
.window(500)
.flatMap(work -> {
return work.observeOn(Schedulers.computation()).map(item -> {
// simulate computational work
try {
Thread.sleep(1);
} catch (Exception e) {
}
return item + " processed " + Thread.currentThread();
});
}).toBlocking().forEach(System.out::println); In this example it will horizontally buffer bloat and subscribe to 1000s of windowed Observables concurrently and start queueing it all up on the It will run through 50,000,000 items very quickly and turn this into 10,000 windows and all of them will be enqueued. An infinite stream would be far worse. In this case we leave it to the developer to specify that they want to limit concurrency using Observable.range(0, 50000000)
.window(500)
.flatMap(work -> {
return work.observeOn(Schedulers.computation()).map(item -> {
// simulate computational work
try {
Thread.sleep(1);
} catch (Exception e) {
}
return item + " processed " + Thread.currentThread();
});
}, 8).toBlocking().forEach(System.out::println); This will now only process 8 windowed Observables at a time and correctly emit backpressure upstream so the 50,000,000 will emit only when downstream has processed and the next chunk is wanted. The upside is that backpressure all composes here and does what is wanted. The downside is that the backpressure only really composes vertically, not horizontally. The horizontal backpressure needs to be decided upon by the developer. I am not aware of an approach to automatically solve horizontal backpressure in stream composition use cases without risking deadlock or starvation. We determined that deadlock (especially the silent async variety) and starvation are far more damaging and difficult to debug that the rare time when manual concurrency limits are needed and thus we have chosen what we consider the safest default behavior. Application to IO WritesThe context above is shared as I see IO writes as the same. We have to subscribe to all of them and allow buffering per |
This issue intends to discuss how would backpressure be implemented on writes to a
Connection
Key considerations
There have been some discussions around the same in issue #21 specifically in comments here and here
The text was updated successfully, but these errors were encountered: