@@ -20,8 +20,8 @@ class GuideReactiveTest : ReactiveTestBase() {
20
20
21
21
# Guide to reactive streams with coroutines
22
22
23
- This guide explains key differences between Kotlin coroutines and reactive streams and shows
24
- how they can be used together for greater good. Prior familiarity with basic coroutine concepts
23
+ This guide explains the key differences between Kotlin coroutines and reactive streams and shows
24
+ how they can be used together for the greater good. Prior familiarity with the basic coroutine concepts
25
25
that are covered in [ Guide to kotlinx.coroutines] ( ../docs/coroutines-guide.md ) is not required,
26
26
but is a big plus. If you are familiar with reactive streams, you may find this guide
27
27
a better introduction into the world of coroutines.
@@ -130,18 +130,18 @@ Again:
130
130
131
131
<!-- - TEST -->
132
132
133
- Notice, how "Begin" line was printed just once, because [ produce] _ coroutine builder_ , when it is executed,
133
+ Notice how the "Begin" line was printed just once, because the [ produce] _ coroutine builder_ , when it is executed,
134
134
launches one coroutine to produce a stream of elements. All the produced elements are consumed
135
135
with [ ReceiveChannel.consumeEach] [ consumeEach ]
136
136
extension function. There is no way to receive the elements from this
137
- channel again. The channel is closed when the producer coroutine is over and the attempt to receive
137
+ channel again. The channel is closed when the producer coroutine is over and an attempt to receive
138
138
from it again cannot receive anything.
139
139
140
- Let us rewrite this code using [ publish] coroutine builder from ` kotlinx-coroutines-reactive ` module
140
+ Let us rewrite this code using the [ publish] coroutine builder from ` kotlinx-coroutines-reactive ` module
141
141
instead of [ produce] from ` kotlinx-coroutines-core ` module. The code stays the same,
142
- but where ` source ` used to have [ ReceiveChannel] type, it now has reactive streams
142
+ but where ` source ` used to have the [ ReceiveChannel] type, it now has the reactive streams'
143
143
[ Publisher] ( https://www.reactive-streams.org/reactive-streams-1.0.0-javadoc/org/reactivestreams/Publisher.html )
144
- type, where [ consumeEach] was used to _ consume_ elements from the channel,
144
+ type, and where [ consumeEach] was used to _ consume_ elements from the channel,
145
145
now [ collect] [ org.reactivestreams.Publisher.collect ] is used to _ collect_ elements from the publisher.
146
146
147
147
<!-- - INCLUDE
@@ -198,11 +198,11 @@ functional concept. While the channel _is_ a stream of elements, the reactive st
198
198
elements is produced. It becomes the actual stream of elements when _ collected_ . Each collector may receive the same or
199
199
a different stream of elements, depending on how the corresponding implementation of ` Publisher ` works.
200
200
201
- The [ publish] coroutine builder, that is used in the above example, does not launch a coroutine,
202
- but every [ collect] [ org.reactivestreams.Publisher.collect ] invocation launches a coroutine .
203
- We have two of them in this code and that is why we see "Begin" printed twice.
201
+ The [ publish] coroutine builder used in the above example does not launch a coroutine,
202
+ but every [ collect] [ org.reactivestreams.Publisher.collect ] invocation does .
203
+ There are two of them here and that is why we see "Begin" printed twice.
204
204
205
- In Rx lingo this is called a _ cold_ publisher . Many standard Rx operators produce cold streams, too. We can collect
205
+ In Rx lingo, this kind of publisher is called _ cold_ . Many standard Rx operators produce cold streams, too. We can collect
206
206
them from a coroutine, and every collector gets the same stream of elements.
207
207
208
208
> Note that we can replicate the same behaviour that we saw with channels by using Rx
@@ -212,10 +212,10 @@ method with it.
212
212
213
213
### Subscription and cancellation
214
214
215
- An example in the previous section uses ` source.collect { ... } ` to collect all elements.
216
- Instead of collecting elements , we can open a channel using [ openSubscription] [ org.reactivestreams.Publisher.openSubscription ]
217
- and iterate over it, so that we have more finer-grained control on our iteration,
218
- for example using ` break ` , as shown below:
215
+ In the second example from the previous section, ` source.collect { ... } ` was used to collect all elements.
216
+ Instead, we can open a channel using [ openSubscription] [ org.reactivestreams.Publisher.openSubscription ]
217
+ and iterate over it. In this way, we can have finer-grained control over our iteration
218
+ ( using ` break ` , for example) , as shown below:
219
219
220
220
<!-- - INCLUDE
221
221
import io.reactivex.*
@@ -256,8 +256,8 @@ Finally
256
256
<!-- - TEST -->
257
257
258
258
With an explicit ` openSubscription ` we should [ cancel] [ ReceiveChannel.cancel ] the corresponding
259
- subscription to unsubscribe from the source, but there is no need to call ` cancel ` explicitly -- under the hood
260
- [ consume] does that for us.
259
+ subscription to unsubscribe from the source, but there is no need to call ` cancel ` explicitly --
260
+ [ consume] does that for us under the hood .
261
261
The installed
262
262
[ doFinally] ( https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#doFinally(io.reactivex.functions.Action) )
263
263
listener prints "Finally" to confirm that the subscription is actually being closed. Note that "OnComplete"
@@ -300,9 +300,9 @@ Finally
300
300
301
301
<!-- - TEST -->
302
302
303
- Notice, how "OnComplete" and "Finally" are printed before the last element "5". It happens because our ` main ` function in this
304
- example is a coroutine that we start with [ runBlocking] coroutine builder.
305
- Our main coroutine receives on the flowable using ` source.collect { ... } ` expression.
303
+ Notice how "OnComplete" and "Finally" are printed before the last element "5". It happens because our ` main ` function in this
304
+ example is a coroutine that we start with the [ runBlocking] coroutine builder.
305
+ Our main coroutine receives on the flowable using the ` source.collect { ... } ` expression.
306
306
The main coroutine is _ suspended_ while it waits for the source to emit an item.
307
307
When the last item is emitted by ` Flowable.range(1, 5) ` it
308
308
_ resumes_ the main coroutine, which gets dispatched onto the main thread to print this
@@ -313,14 +313,14 @@ _resumes_ the main coroutine, which gets dispatched onto the main thread to prin
313
313
Backpressure is one of the most interesting and complex aspects of reactive streams. Coroutines can
314
314
_ suspend_ and they provide a natural answer to handling backpressure.
315
315
316
- In Rx Java 2.x a backpressure-capable class is called
316
+ In Rx Java 2.x, the backpressure-capable class is called
317
317
[ Flowable] ( https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html ) .
318
- In the following example we use [ rxFlowable] coroutine builder from ` kotlinx-coroutines-rx2 ` module to define a
318
+ In the following example, we use [ rxFlowable] coroutine builder from ` kotlinx-coroutines-rx2 ` module to define a
319
319
flowable that sends three integers from 1 to 3.
320
- It prints a message to the output before invocation of
320
+ It prints a message to the output before invocation of the
321
321
suspending [ send] [ SendChannel.send ] function, so that we can study how it operates.
322
322
323
- The integers are generated in the context of the main thread, but subscription is shifted
323
+ The integers are generated in the context of the main thread, but the subscription is shifted
324
324
to another thread using Rx
325
325
[ observeOn] ( https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#observeOn(io.reactivex.Scheduler,%20boolean,%20int) )
326
326
operator with a buffer of size 1.
@@ -370,14 +370,14 @@ Complete
370
370
371
371
<!-- - TEST -->
372
372
373
- We see here how producer coroutine puts the first element in the buffer and is suspended while trying to send another
374
- one. Only after consumer processes the first item, producer sends the second one and resumes, etc.
373
+ We see here how the producer coroutine puts the first element in the buffer and is suspended while trying to send another
374
+ one. Only after the consumer processes the first item, the producer sends the second one and resumes, etc.
375
375
376
376
377
377
### Rx Subject vs BroadcastChannel
378
378
379
379
RxJava has a concept of [ Subject] ( https://github.com/ReactiveX/RxJava/wiki/Subject ) which is an object that
380
- effectively broadcasts elements to all its subscribers. The matching concept in coroutines world is called a
380
+ effectively broadcasts elements to all its subscribers. The matching concept in the coroutines world is called a
381
381
[ BroadcastChannel] . There is a variety of subjects in Rx with
382
382
[ BehaviorSubject] ( https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/subjects/BehaviorSubject.html ) being
383
383
the one used to manage state:
@@ -445,15 +445,15 @@ four
445
445
446
446
<!-- - TEST -->
447
447
448
- Here we use [ Dispatchers.Unconfined] coroutine context to launch consuming coroutine with the same behaviour as subscription in Rx.
448
+ Here we use the [ Dispatchers.Unconfined] coroutine context to launch a consuming coroutine with the same behavior as subscription in Rx.
449
449
It basically means that the launched coroutine is going to be immediately executed in the same thread that
450
450
is emitting elements. Contexts are covered in more details in a [ separate section] ( #coroutine-context ) .
451
451
452
452
The advantage of coroutines is that it is easy to get conflation behavior for single-threaded UI updates.
453
453
A typical UI application does not need to react to every state change. Only the most recent state is relevant.
454
454
A sequence of back-to-back updates to the application state needs to get reflected in UI only once,
455
455
as soon as the UI thread is free. For the following example we are going to simulate this by launching
456
- consuming coroutine in the context of the main thread and use [ yield] function to simulate a break in the
456
+ a consuming coroutine in the context of the main thread and use the [ yield] function to simulate a break in the
457
457
sequence of updates and to release the main thread:
458
458
459
459
<!-- - INCLUDE
@@ -475,21 +475,21 @@ fun main() = runBlocking<Unit> {
475
475
subject.onNext(" three" )
476
476
subject.onNext(" four" )
477
477
yield () // yield the main thread to the launched coroutine <--- HERE
478
- subject.onComplete() // now complete subject's sequence to cancel consumer, too
478
+ subject.onComplete() // now complete the subject's sequence to cancel the consumer, too
479
479
}
480
480
```
481
481
482
482
> You can get full code [ here] ( kotlinx-coroutines-rx2/test/guide/example-reactive-basic-08.kt ) .
483
483
484
- Now coroutine process (prints) only the most recent update:
484
+ Now the coroutine processes (prints) only the most recent update:
485
485
486
486
``` text
487
487
four
488
488
```
489
489
490
490
<!-- - TEST -->
491
491
492
- The corresponding behavior in a pure coroutines world is implemented by [ ConflatedBroadcastChannel]
492
+ The corresponding behavior in the pure coroutines world is implemented by [ ConflatedBroadcastChannel]
493
493
that provides the same logic on top of coroutine channels directly,
494
494
without going through the bridge to the reactive streams:
495
495
@@ -511,7 +511,7 @@ fun main() = runBlocking<Unit> {
511
511
broadcast.offer(" three" )
512
512
broadcast.offer(" four" )
513
513
yield () // yield the main thread to the launched coroutine
514
- broadcast.close() // now close broadcast channel to cancel consumer, too
514
+ broadcast.close() // now close the broadcast channel to cancel the consumer, too
515
515
}
516
516
```
517
517
@@ -528,10 +528,10 @@ four
528
528
Another implementation of [ BroadcastChannel] is ` ArrayBroadcastChannel ` with an array-based buffer of
529
529
a specified ` capacity ` . It can be created with ` BroadcastChannel(capacity) ` .
530
530
It delivers every event to every
531
- subscriber since the moment the corresponding subscription is open . It corresponds to
531
+ subscriber as soon as their corresponding subscriptions are opened . It corresponds to
532
532
[ PublishSubject] ( https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/subjects/PublishSubject.html ) in Rx.
533
533
The capacity of the buffer in the constructor of ` ArrayBroadcastChannel ` controls the numbers of elements
534
- that can be sent before the sender is suspended waiting for receiver to receive those elements.
534
+ that can be sent before the sender is suspended waiting for a receiver to receive those elements.
535
535
536
536
## Operators
537
537
@@ -545,13 +545,13 @@ Coroutines and channels are designed to provide an opposite experience. There ar
545
545
but processing streams of elements is extremely simple and back-pressure is supported automatically
546
546
without you having to explicitly think about it.
547
547
548
- This section shows coroutine-based implementation of several reactive stream operators.
548
+ This section shows a coroutine-based implementation of several reactive stream operators.
549
549
550
550
### Range
551
551
552
552
Let's roll out own implementation of
553
553
[ range] ( https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#range(int,%20int) )
554
- operator for reactive streams ` Publisher ` interface. The asynchronous clean-slate implementation of this operator for
554
+ operator for the reactive streams' ` Publisher ` interface. The asynchronous clean-slate implementation of this operator for
555
555
reactive streams is explained in
556
556
[ this blog post] ( https://akarnokd.blogspot.ru/2017/03/java-9-flow-api-asynchronous-integer.html ) .
557
557
It takes a lot of code.
@@ -569,11 +569,11 @@ fun CoroutineScope.range(context: CoroutineContext, start: Int, count: Int) = pu
569
569
}
570
570
```
571
571
572
- In this code ` CoroutineScope ` and ` context ` are used instead of an ` Executor ` and all the backpressure aspects are taken care
572
+ Here, ` CoroutineScope ` and ` context ` are used instead of an ` Executor ` and all the backpressure aspects are taken care
573
573
of by the coroutines machinery. Note that this implementation depends only on the small reactive streams library
574
- that defines ` Publisher ` interface and its friends.
574
+ that defines the ` Publisher ` interface and its friends.
575
575
576
- It is straightforward to use from a coroutine:
576
+ Using it from a coroutine is straightforward :
577
577
578
578
``` kotlin
579
579
fun main () = runBlocking<Unit > {
@@ -644,7 +644,7 @@ fun main() = runBlocking<Unit> {
644
644
645
645
> You can get full code [ here] ( kotlinx-coroutines-rx2/test/guide/example-reactive-operators-02.kt ) .
646
646
647
- It is not hard to see, that the result is going to be:
647
+ It is not hard to see that the result is going to be:
648
648
649
649
``` text
650
650
2 is even
@@ -657,10 +657,10 @@ It is not hard to see, that the result is going to be:
657
657
658
658
Let's implement our own version of
659
659
[ takeUntil] ( https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#takeUntil(org.reactivestreams.Publisher) )
660
- operator. It is quite a [ tricky one ] ( https://akarnokd.blogspot.ru/2015/05/pitfalls-of-operator-implementations.html )
661
- to implement, because of the need to track and manage subscription to two streams .
660
+ operator. It is quite [ tricky] ( https://akarnokd.blogspot.ru/2015/05/pitfalls-of-operator-implementations.html )
661
+ as subscriptions to two streams need to be tracked and managed .
662
662
We need to relay all the elements from the source stream until the other stream either completes or
663
- emits anything. However, we have [ select] expression to rescue us in coroutines implementation:
663
+ emits anything. However, we have the [ select] expression to rescue us in the coroutines implementation:
664
664
665
665
<!-- - INCLUDE
666
666
import kotlinx.coroutines.channels.*
@@ -732,7 +732,7 @@ There are always at least two ways for processing multiple streams of data with
732
732
[ select] was shown in the previous example. The other way is just to launch multiple coroutines. Let
733
733
us implement
734
734
[ merge] ( https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#merge(org.reactivestreams.Publisher) )
735
- operator using the later approach:
735
+ operator using the latter approach:
736
736
737
737
<!-- - INCLUDE
738
738
import kotlinx.coroutines.*
@@ -751,16 +751,16 @@ fun <T> Publisher<Publisher<T>>.merge(context: CoroutineContext) = GlobalScope.p
751
751
}
752
752
```
753
753
754
- Notice, the use of
754
+ Notice the use of
755
755
[ coroutineContext] ( https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.coroutines/coroutine-context.html )
756
756
in the invocation of [ launch] coroutine builder. It is used to refer
757
757
to the context of the enclosing ` publish ` coroutine. This way, all the coroutines that are
758
758
being launched here are [ children] ( ../docs/coroutines-guide.md#children-of-a-coroutine ) of the ` publish `
759
759
coroutine and will get cancelled when the ` publish ` coroutine is cancelled or is otherwise completed.
760
- Moreover, since parent coroutine waits until all children are complete, this implementation fully
760
+ Moreover, since the parent coroutine waits until all the children are complete, this implementation fully
761
761
merges all the received streams.
762
762
763
- For a test, let us start with ` rangeWithInterval ` function from the previous example and write a
763
+ For a test, let us start with the ` rangeWithInterval ` function from the previous example and write a
764
764
producer that sends its results twice with some delay:
765
765
766
766
<!-- - INCLUDE
@@ -810,7 +810,7 @@ And the results should be:
810
810
811
811
All the example operators that are shown in the previous section have an explicit
812
812
[ CoroutineContext] ( https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.coroutines/-coroutine-context/ )
813
- parameter. In Rx world it roughly corresponds to
813
+ parameter. In the Rx world it roughly corresponds to
814
814
a [ Scheduler] ( https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Scheduler.html ) .
815
815
816
816
### Threads with Rx
@@ -988,13 +988,13 @@ The resulting messages are going to be printed in the main thread:
988
988
### Unconfined context
989
989
990
990
Most Rx operators do not have any specific thread (scheduler) associated with them and are working
991
- in whatever thread that they happen to be invoked in . We've seen it on the example of ` subscribe ` operator
991
+ in whatever thread they happen to be invoked. We've seen it in the example with the ` subscribe ` operator
992
992
in the [ threads with Rx] ( #threads-with-rx ) section.
993
993
994
994
In the world of coroutines, [ Dispatchers.Unconfined] context serves a similar role. Let us modify our previous example,
995
995
but instead of iterating over the source ` Flowable ` from the ` runBlocking ` coroutine that is confined
996
- to the main thread, we launch a new coroutine in ` Dispatchers.Unconfined ` context, while the main coroutine
997
- simply waits its completion using [ Job.join] :
996
+ to the main thread, we launch a new coroutine in the ` Dispatchers.Unconfined ` context, while the main coroutine
997
+ simply waits for its completion using [ Job.join] :
998
998
999
999
<!-- - INCLUDE
1000
1000
import io.reactivex.*
@@ -1024,7 +1024,7 @@ fun main() = runBlocking<Unit> {
1024
1024
> You can get full code [ here] ( kotlinx-coroutines-rx2/test/guide/example-reactive-context-05.kt ) .
1025
1025
1026
1026
Now, the output shows that the code of the coroutine is executing in the Rx computation thread pool, just
1027
- like our initial example using Rx ` subscribe ` operator.
1027
+ like our initial example using the Rx ` subscribe ` operator.
1028
1028
1029
1029
``` text
1030
1030
1 on thread RxComputationThreadPool-1
@@ -1034,14 +1034,14 @@ like our initial example using Rx `subscribe` operator.
1034
1034
1035
1035
<!-- - TEST LINES_START -->
1036
1036
1037
- Note that [ Dispatchers.Unconfined] context shall be used with care. It may improve the overall performance on certain tests,
1037
+ Note that the [ Dispatchers.Unconfined] context should be used with care. It may improve the overall performance on certain tests,
1038
1038
due to the increased stack-locality of operations and less scheduling overhead, but it also produces deeper stacks
1039
1039
and makes it harder to reason about asynchronicity of the code that is using it.
1040
1040
1041
1041
If a coroutine sends an element to a channel, then the thread that invoked the
1042
- [ send] [ SendChannel.send ] may start executing the code of a coroutine with [ Dispatchers.Unconfined] dispatcher.
1042
+ [ send] [ SendChannel.send ] may start executing the code of the coroutine with the [ Dispatchers.Unconfined] dispatcher.
1043
1043
The original producer coroutine that invoked ` send ` is paused until the unconfined consumer coroutine hits its next
1044
- suspension point. This is very similar to a lock-step single-threaded ` onNext ` execution in Rx world in the absense
1044
+ suspension point. This is very similar to a lock-step single-threaded ` onNext ` execution in the Rx world in the absense
1045
1045
of thread-shifting operators. It is a normal default for Rx, because operators are usually doing very small chunks
1046
1046
of work and you have to combine many operators for a complex processing. However, this is unusual with coroutines,
1047
1047
where you can have an arbitrary complex processing in a coroutine. Usually, you only need to chain stream-processing
0 commit comments