diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactive.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactive.txt index 26821f638d..7191ca10a3 100644 --- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactive.txt +++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactive.txt @@ -8,6 +8,7 @@ public final class kotlinx/coroutines/reactive/AwaitKt { } public final class kotlinx/coroutines/reactive/ChannelKt { + public static final fun collect (Lorg/reactivestreams/Publisher;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun consumeEach (Lorg/reactivestreams/Publisher;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun openSubscription (Lorg/reactivestreams/Publisher;I)Lkotlinx/coroutines/channels/ReceiveChannel; public static synthetic fun openSubscription$default (Lorg/reactivestreams/Publisher;IILjava/lang/Object;)Lkotlinx/coroutines/channels/ReceiveChannel; diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-rx2.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-rx2.txt index 3a99ada4a1..67ef8a131c 100644 --- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-rx2.txt +++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-rx2.txt @@ -12,6 +12,8 @@ public final class kotlinx/coroutines/rx2/RxAwaitKt { } public final class kotlinx/coroutines/rx2/RxChannelKt { + public static final fun collect (Lio/reactivex/MaybeSource;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun collect (Lio/reactivex/ObservableSource;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun consumeEach (Lio/reactivex/MaybeSource;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun consumeEach (Lio/reactivex/ObservableSource;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun openSubscription (Lio/reactivex/MaybeSource;)Lkotlinx/coroutines/channels/ReceiveChannel; diff --git a/reactive/coroutines-guide-reactive.md b/reactive/coroutines-guide-reactive.md index 771c1b4511..de819df069 100644 --- a/reactive/coroutines-guide-reactive.md +++ b/reactive/coroutines-guide-reactive.md @@ -141,7 +141,8 @@ Let us rewrite this code using [publish] coroutine builder from `kotlinx-corouti instead of [produce] from `kotlinx-coroutines-core` module. The code stays the same, but where `source` used to have [ReceiveChannel] type, it now has reactive streams [Publisher](https://www.reactive-streams.org/reactive-streams-1.0.0-javadoc/org/reactivestreams/Publisher.html) -type. +type, where [consumeEach] was used to _consume_ elements from the channel, +now [collect][org.reactivestreams.Publisher.collect] is used to _collect_ elements from the publisher. - + With an explicit `openSubscription` we should [cancel][ReceiveChannel.cancel] the corresponding -subscription to unsubscribe from the source. There is no need to invoke `cancel` explicitly -- under the hood -`consume` does that for us. +subscription to unsubscribe from the source, but there is no need to call `cancel` explicitly -- under the hood +[consume] does that for us. The installed [doFinally](https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#doFinally(io.reactivex.functions.Action)) listener prints "Finally" to confirm that the subscription is actually being closed. Note that "OnComplete" is never printed because we did not consume all of the elements. -We do not need to use an explicit `cancel` either if iteration is performed over all the items that are emitted -by the publisher, because it is being cancelled automatically by `consumeEach`: +We do not need to use an explicit `cancel` either if we `collect` all the elements: ```kotlin @@ -432,7 +432,7 @@ fun main() = runBlocking { subject.onNext("two") // now launch a coroutine to print everything GlobalScope.launch(Dispatchers.Unconfined) { // launch coroutine in unconfined context - subject.consumeEach { println(it) } + subject.collect { println(it) } } subject.onNext("three") subject.onNext("four") @@ -476,7 +476,7 @@ fun main() = runBlocking { subject.onNext("two") // now launch a coroutine to print the most recent update launch { // use the context of the main thread for a coroutine - subject.consumeEach { println(it) } + subject.collect { println(it) } } subject.onNext("three") subject.onNext("four") @@ -584,7 +584,7 @@ It is straightforward to use from a coroutine: ```kotlin fun main() = runBlocking { // Range inherits parent job from runBlocking, but overrides dispatcher with Dispatchers.Default - range(Dispatchers.Default, 1, 5).consumeEach { println(it) } + range(Dispatchers.Default, 1, 5).collect { println(it) } } ``` @@ -623,7 +623,7 @@ fun Publisher.fusedFilterMap( predicate: (T) -> Boolean, // the filter predicate mapper: (T) -> R // the mapper function ) = GlobalScope.publish(context) { - consumeEach { // consume the source stream + collect { // collect the source stream if (predicate(it)) // filter part send(mapper(it)) // map part } @@ -644,7 +644,7 @@ fun CoroutineScope.range(start: Int, count: Int) = publish { fun main() = runBlocking { range(1, 5) .fusedFilterMap(coroutineContext, { it % 2 == 0}, { "$it is even" }) - .consumeEach { println(it) } // print all the resulting strings + .collect { println(it) } // print all the resulting strings } ``` @@ -684,8 +684,8 @@ fun Publisher.takeUntil(context: CoroutineContext, other: Publisher other.openSubscription().consume { // explicitly open channel to Publisher val other = this whileSelect { - other.onReceive { false } // bail out on any received element from `other` - current.onReceive { send(it); true } // resend element from this channel and continue + other.onReceive { false } // bail out on any received element from `other` + current.onReceive { send(it); true } // resend element from this channel and continue } } } @@ -717,7 +717,7 @@ The following code shows how `takeUntil` works: fun main() = runBlocking { val slowNums = rangeWithInterval(200, 1, 10) // numbers with 200ms interval val stop = rangeWithInterval(500, 1, 10) // the first one after 500ms - slowNums.takeUntil(coroutineContext, stop).consumeEach { println(it) } // let's test it + slowNums.takeUntil(coroutineContext, stop).collect { println(it) } // let's test it } ``` @@ -749,9 +749,9 @@ import kotlin.coroutines.* ```kotlin fun Publisher>.merge(context: CoroutineContext) = GlobalScope.publish(context) { - consumeEach { pub -> // for each publisher received on the source channel + collect { pub -> // for each publisher collected launch { // launch a child coroutine - pub.consumeEach { send(it) } // resend all element from this publisher + pub.collect { send(it) } // resend all element from this publisher } } } @@ -792,7 +792,7 @@ The test code is to use `merge` on `testPub` and to display the results: ```kotlin fun main() = runBlocking { - testPub().merge(coroutineContext).consumeEach { println(it) } // print the whole stream + testPub().merge(coroutineContext).collect { println(it) } // print the whole stream } ``` @@ -975,7 +975,7 @@ fun rangeWithIntervalRx(scheduler: Scheduler, time: Long, start: Int, count: Int fun main() = runBlocking { rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3) - .consumeEach { println("$it on thread ${Thread.currentThread().name}") } + .collect { println("$it on thread ${Thread.currentThread().name}") } } ``` @@ -1021,7 +1021,7 @@ fun rangeWithIntervalRx(scheduler: Scheduler, time: Long, start: Int, count: Int fun main() = runBlocking { val job = launch(Dispatchers.Unconfined) { // launch a new coroutine in Unconfined context (without its own thread pool) rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3) - .consumeEach { println("$it on thread ${Thread.currentThread().name}") } + .collect { println("$it on thread ${Thread.currentThread().name}") } } job.join() // wait for our coroutine to complete } @@ -1068,6 +1068,7 @@ coroutines for complex pipelines with fan-in and fan-out between multiple worker [consumeEach]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/consume-each.html [ReceiveChannel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/index.html [ReceiveChannel.cancel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/cancel.html +[consume]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/consume.html [SendChannel.send]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/send.html [BroadcastChannel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-broadcast-channel/index.html [ConflatedBroadcastChannel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-conflated-broadcast-channel/index.html @@ -1077,7 +1078,7 @@ coroutines for complex pipelines with fan-in and fan-out between multiple worker [publish]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/kotlinx.coroutines.-coroutine-scope/publish.html -[org.reactivestreams.Publisher.consumeEach]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/org.reactivestreams.-publisher/consume-each.html +[org.reactivestreams.Publisher.collect]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/org.reactivestreams.-publisher/collect.html [org.reactivestreams.Publisher.openSubscription]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/org.reactivestreams.-publisher/open-subscription.html diff --git a/reactive/kotlinx-coroutines-reactive/src/Channel.kt b/reactive/kotlinx-coroutines-reactive/src/Channel.kt index f958d72138..d589a0d0c8 100644 --- a/reactive/kotlinx-coroutines-reactive/src/Channel.kt +++ b/reactive/kotlinx-coroutines-reactive/src/Channel.kt @@ -27,10 +27,17 @@ public fun Publisher.openSubscription(request: Int = 0): ReceiveChannel Publisher.consumeEach(action: (T) -> Unit) = + openSubscription().consumeEach(action) + /** * Subscribes to this [Publisher] and performs the specified action for each received element. + * Cancels subscription if any exception happens during collect. */ -public suspend inline fun Publisher.consumeEach(action: (T) -> Unit) = +@ExperimentalCoroutinesApi // Since 1.2.1, tentatively till 1.3.0 +public suspend inline fun Publisher.collect(action: (T) -> Unit) = openSubscription().consumeEach(action) @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") diff --git a/reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt b/reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt index 6928011f58..ca1834998e 100644 --- a/reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt @@ -50,7 +50,7 @@ class IntegrationTest( assertNSE { pub.awaitLast() } assertNSE { pub.awaitSingle() } var cnt = 0 - pub.consumeEach { cnt++ } + pub.collect { cnt++ } assertThat(cnt, IsEqual(0)) } @@ -67,7 +67,7 @@ class IntegrationTest( assertThat(pub.awaitLast(), IsEqual("OK")) assertThat(pub.awaitSingle(), IsEqual("OK")) var cnt = 0 - pub.consumeEach { + pub.collect { assertThat(it, IsEqual("OK")) cnt++ } @@ -125,7 +125,7 @@ class IntegrationTest( private suspend fun checkNumbers(n: Int, pub: Publisher) { var last = 0 - pub.consumeEach { + pub.collect { assertThat(it, IsEqual(++last)) } assertThat(last, IsEqual(n)) diff --git a/reactive/kotlinx-coroutines-reactive/test/PublishTest.kt b/reactive/kotlinx-coroutines-reactive/test/PublishTest.kt index c80b368817..e022ff1529 100644 --- a/reactive/kotlinx-coroutines-reactive/test/PublishTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/PublishTest.kt @@ -262,7 +262,7 @@ class PublishTest : TestBase() { } } try { - pub.consumeEach { + pub.collect { throw TestException() } } catch (e: TestException) { diff --git a/reactive/kotlinx-coroutines-reactive/test/PublisherCompletionStressTest.kt b/reactive/kotlinx-coroutines-reactive/test/PublisherCompletionStressTest.kt index 6476054b84..b16310d0cd 100644 --- a/reactive/kotlinx-coroutines-reactive/test/PublisherCompletionStressTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/PublisherCompletionStressTest.kt @@ -24,7 +24,7 @@ class PublisherCompletionStressTest : TestBase() { runBlocking { withTimeout(5000) { var received = 0 - range(Dispatchers.Default, 1, count).consumeEach { x -> + range(Dispatchers.Default, 1, count).collect { x -> received++ if (x != received) error("$x != $received") } diff --git a/reactive/kotlinx-coroutines-reactive/test/PublisherMultiTest.kt b/reactive/kotlinx-coroutines-reactive/test/PublisherMultiTest.kt index d942abd54d..cac2f550f7 100644 --- a/reactive/kotlinx-coroutines-reactive/test/PublisherMultiTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/PublisherMultiTest.kt @@ -24,7 +24,7 @@ class PublisherMultiTest : TestBase() { jobs.forEach { it.join() } } val resultSet = mutableSetOf() - observable.consumeEach { + observable.collect { assertTrue(resultSet.add(it)) } assertThat(resultSet.size, IsEqual(n)) diff --git a/reactive/kotlinx-coroutines-reactor/test/FluxCompletionStressTest.kt b/reactive/kotlinx-coroutines-reactor/test/FluxCompletionStressTest.kt index c22f6527ce..6090408f97 100644 --- a/reactive/kotlinx-coroutines-reactor/test/FluxCompletionStressTest.kt +++ b/reactive/kotlinx-coroutines-reactor/test/FluxCompletionStressTest.kt @@ -25,7 +25,7 @@ class FluxCompletionStressTest : TestBase() { runBlocking { withTimeout(5000) { var received = 0 - range(Dispatchers.Default, 1, count).consumeEach { x -> + range(Dispatchers.Default, 1, count).collect { x -> received++ if (x != received) error("$x != $received") } diff --git a/reactive/kotlinx-coroutines-reactor/test/FluxMultiTest.kt b/reactive/kotlinx-coroutines-reactor/test/FluxMultiTest.kt index 3b70317ad3..c4c0dbcbb6 100644 --- a/reactive/kotlinx-coroutines-reactor/test/FluxMultiTest.kt +++ b/reactive/kotlinx-coroutines-reactor/test/FluxMultiTest.kt @@ -46,7 +46,7 @@ class FluxMultiTest : TestBase() { fun testIteratorResendUnconfined() { val n = 10_000 * stressTestMultiplier val flux = GlobalScope.flux(Dispatchers.Unconfined) { - Flux.range(0, n).consumeEach { send(it) } + Flux.range(0, n).collect { send(it) } } checkMonoValue(flux.collectList()) { list -> assertEquals((0 until n).toList(), list) @@ -57,7 +57,7 @@ class FluxMultiTest : TestBase() { fun testIteratorResendPool() { val n = 10_000 * stressTestMultiplier val flux = GlobalScope.flux { - Flux.range(0, n).consumeEach { send(it) } + Flux.range(0, n).collect { send(it) } } checkMonoValue(flux.collectList()) { list -> assertEquals((0 until n).toList(), list) diff --git a/reactive/kotlinx-coroutines-reactor/test/FluxSingleTest.kt b/reactive/kotlinx-coroutines-reactor/test/FluxSingleTest.kt index 542355961d..241cc6aaed 100644 --- a/reactive/kotlinx-coroutines-reactor/test/FluxSingleTest.kt +++ b/reactive/kotlinx-coroutines-reactor/test/FluxSingleTest.kt @@ -180,7 +180,7 @@ class FluxSingleTest { fun testFluxIteration() { val flux = GlobalScope.flux { var result = "" - Flux.just("O", "K").consumeEach { result += it } + Flux.just("O", "K").collect { result += it } send(result) } @@ -193,7 +193,7 @@ class FluxSingleTest { fun testFluxIterationFailure() { val flux = GlobalScope.flux { try { - Flux.error(RuntimeException("OK")).consumeEach { fail("Should not be here") } + Flux.error(RuntimeException("OK")).collect { fail("Should not be here") } send("Fail") } catch (e: RuntimeException) { send(e.message!!) diff --git a/reactive/kotlinx-coroutines-reactor/test/FluxTest.kt b/reactive/kotlinx-coroutines-reactor/test/FluxTest.kt index 63c4426b05..a0368f84c7 100644 --- a/reactive/kotlinx-coroutines-reactor/test/FluxTest.kt +++ b/reactive/kotlinx-coroutines-reactor/test/FluxTest.kt @@ -107,7 +107,7 @@ class FluxTest : TestBase() { expect(2) val job = launch(start = CoroutineStart.UNDISPATCHED) { expect(3) - observable.consumeEach { + observable.collect { expect(8) assertEquals("OK", it) } @@ -131,7 +131,7 @@ class FluxTest : TestBase() { } } try { - pub.consumeEach { + pub.collect { throw TestException() } } catch (e: TestException) { diff --git a/reactive/kotlinx-coroutines-rx2/src/RxChannel.kt b/reactive/kotlinx-coroutines-rx2/src/RxChannel.kt index a8ce7ce5e5..b038e539fb 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxChannel.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxChannel.kt @@ -41,16 +41,30 @@ public fun ObservableSource.openSubscription(): ReceiveChannel { return channel } +// Will be promoted to error in 1.3.0, removed in 1.4.0 +@Deprecated(message = "Use collect instead", level = DeprecationLevel.WARNING, replaceWith = ReplaceWith("this.collect(action)")) +public suspend inline fun MaybeSource.consumeEach(action: (T) -> Unit) = + openSubscription().consumeEach(action) + +// Will be promoted to error in 1.3.0, removed in 1.4.0 +@Deprecated(message = "Use collect instead", level = DeprecationLevel.WARNING, replaceWith = ReplaceWith("this.collect(action)")) +public suspend inline fun ObservableSource.consumeEach(action: (T) -> Unit) = + openSubscription().consumeEach(action) + /** * Subscribes to this [MaybeSource] and performs the specified action for each received element. + * Cancels subscription if any exception happens during collect. */ -public suspend inline fun MaybeSource.consumeEach(action: (T) -> Unit) = +@ExperimentalCoroutinesApi // Since 1.2.1, tentatively till 1.3.0 +public suspend inline fun MaybeSource.collect(action: (T) -> Unit) = openSubscription().consumeEach(action) /** * Subscribes to this [ObservableSource] and performs the specified action for each received element. + * Cancels subscription if any exception happens during collect. */ -public suspend inline fun ObservableSource.consumeEach(action: (T) -> Unit) = +@ExperimentalCoroutinesApi // Since 1.2.1, tentatively till 1.3.0 +public suspend inline fun ObservableSource.collect(action: (T) -> Unit) = openSubscription().consumeEach(action) @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") diff --git a/reactive/kotlinx-coroutines-rx2/test/FlowAsObservableTest.kt b/reactive/kotlinx-coroutines-rx2/test/FlowAsObservableTest.kt index 65d9d67b44..ab9e402893 100644 --- a/reactive/kotlinx-coroutines-rx2/test/FlowAsObservableTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/FlowAsObservableTest.kt @@ -82,7 +82,7 @@ class FlowAsObservableTest : TestBase() { expect(1) val job = launch(start = CoroutineStart.UNDISPATCHED) { expect(2) - observable.consumeEach { + observable.collect { expect(5) assertEquals("OK", it) } @@ -106,7 +106,7 @@ class FlowAsObservableTest : TestBase() { }.asObservable() try { - observable.consumeEach { + observable.collect { expect(3) throw TestException() } diff --git a/reactive/kotlinx-coroutines-rx2/test/FlowableTest.kt b/reactive/kotlinx-coroutines-rx2/test/FlowableTest.kt index 6da2e1d6f4..543de0930c 100644 --- a/reactive/kotlinx-coroutines-rx2/test/FlowableTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/FlowableTest.kt @@ -107,7 +107,7 @@ class FlowableTest : TestBase() { expect(2) val job = launch(start = CoroutineStart.UNDISPATCHED) { expect(3) - observable.consumeEach{ + observable.collect { expect(8) assertEquals("OK", it) } @@ -131,7 +131,7 @@ class FlowableTest : TestBase() { } } try { - pub.consumeEach { + pub.collect { throw TestException() } } catch (e: TestException) { diff --git a/reactive/kotlinx-coroutines-rx2/test/IntegrationTest.kt b/reactive/kotlinx-coroutines-rx2/test/IntegrationTest.kt index 856bce58c2..9b55e58c83 100644 --- a/reactive/kotlinx-coroutines-rx2/test/IntegrationTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/IntegrationTest.kt @@ -50,7 +50,7 @@ class IntegrationTest( assertNSE { observable.awaitLast() } assertNSE { observable.awaitSingle() } var cnt = 0 - observable.consumeEach { + observable.collect { cnt++ } assertThat(cnt, IsEqual(0)) @@ -69,7 +69,7 @@ class IntegrationTest( assertThat(observable.awaitLast(), IsEqual("OK")) assertThat(observable.awaitSingle(), IsEqual("OK")) var cnt = 0 - observable.consumeEach { + observable.collect { assertThat(it, IsEqual("OK")) cnt++ } @@ -127,7 +127,7 @@ class IntegrationTest( private suspend fun checkNumbers(n: Int, observable: Observable) { var last = 0 - observable.consumeEach { + observable.collect { assertThat(it, IsEqual(++last)) } assertThat(last, IsEqual(n)) diff --git a/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt b/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt index 5ef6d0664f..e97b1f01bb 100644 --- a/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt @@ -229,7 +229,7 @@ class MaybeTest : TestBase() { expect(2) val timeout = withTimeoutOrNull(100) { expect(3) - maybe.consumeEach { + maybe.collect { expectUnreached() } expectUnreached() diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableCompletionStressTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableCompletionStressTest.kt index 10134e6ddf..30266e3e50 100644 --- a/reactive/kotlinx-coroutines-rx2/test/ObservableCompletionStressTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/ObservableCompletionStressTest.kt @@ -24,7 +24,7 @@ class ObservableCompletionStressTest : TestBase() { runBlocking { withTimeout(5000) { var received = 0 - range(Dispatchers.Default, 1, count).consumeEach { x -> + range(Dispatchers.Default, 1, count).collect { x -> received++ if (x != received) error("$x != $received") } diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableMultiTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableMultiTest.kt index edef8b3073..9208195fef 100644 --- a/reactive/kotlinx-coroutines-rx2/test/ObservableMultiTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/ObservableMultiTest.kt @@ -52,7 +52,7 @@ class ObservableMultiTest : TestBase() { fun testIteratorResendUnconfined() { val n = 10_000 * stressTestMultiplier val observable = GlobalScope.rxObservable(Dispatchers.Unconfined) { - Observable.range(0, n).consumeEach { send(it) } + Observable.range(0, n).collect { send(it) } } checkSingleValue(observable.toList()) { list -> assertEquals((0 until n).toList(), list) @@ -63,7 +63,7 @@ class ObservableMultiTest : TestBase() { fun testIteratorResendPool() { val n = 10_000 * stressTestMultiplier val observable = GlobalScope.rxObservable { - Observable.range(0, n).consumeEach { send(it) } + Observable.range(0, n).collect { send(it) } } checkSingleValue(observable.toList()) { list -> assertEquals((0 until n).toList(), list) diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableSingleTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableSingleTest.kt index db95c3f9d2..6b5d7451a4 100644 --- a/reactive/kotlinx-coroutines-rx2/test/ObservableSingleTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/ObservableSingleTest.kt @@ -179,7 +179,7 @@ class ObservableSingleTest { fun testObservableIteration() { val observable = GlobalScope.rxObservable { var result = "" - Observable.just("O", "K").consumeEach { result += it } + Observable.just("O", "K").collect { result += it } send(result) } @@ -192,7 +192,7 @@ class ObservableSingleTest { fun testObservableIterationFailure() { val observable = GlobalScope.rxObservable { try { - Observable.error(RuntimeException("OK")).consumeEach { fail("Should not be here") } + Observable.error(RuntimeException("OK")).collect { fail("Should not be here") } send("Fail") } catch (e: RuntimeException) { send(e.message!!) diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableTest.kt index 298dd9dea8..8dc7120c0f 100644 --- a/reactive/kotlinx-coroutines-rx2/test/ObservableTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/ObservableTest.kt @@ -106,7 +106,7 @@ class ObservableTest : TestBase() { expect(2) val job = launch(start = CoroutineStart.UNDISPATCHED) { expect(3) - observable.consumeEach{ + observable.collect { expect(8) assertEquals("OK", it) } @@ -134,7 +134,7 @@ class ObservableTest : TestBase() { } } try { - pub.consumeEach { + pub.collect { expect(3) throw TestException() } diff --git a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-02.kt b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-02.kt index 0bb8431cb0..6d48cb04f2 100644 --- a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-02.kt +++ b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-02.kt @@ -21,12 +21,12 @@ fun main() = runBlocking { } // print elements from the source println("Elements:") - source.consumeEach { // consume elements from it + source.collect { // collect elements from it println(it) } // print elements from the source AGAIN println("Again:") - source.consumeEach { // consume elements from it + source.collect { // collect elements from it println(it) } } diff --git a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-04.kt b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-04.kt index bbfb169d5d..5c373a7fc3 100644 --- a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-04.kt +++ b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-04.kt @@ -15,6 +15,6 @@ fun main() = runBlocking { .doOnSubscribe { println("OnSubscribe") } // provide some insight .doOnComplete { println("OnComplete") } // ... .doFinally { println("Finally") } // ... into what's going on - // iterate over the source fully - source.consumeEach { println(it) } + // collect the source fully + source.collect { println(it) } } diff --git a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-07.kt b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-07.kt index 74c45cbeca..7d4a788ebe 100644 --- a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-07.kt +++ b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-07.kt @@ -7,7 +7,7 @@ package kotlinx.coroutines.rx2.guide.basic07 import io.reactivex.subjects.BehaviorSubject import kotlinx.coroutines.* -import kotlinx.coroutines.rx2.consumeEach +import kotlinx.coroutines.rx2.collect fun main() = runBlocking { val subject = BehaviorSubject.create() @@ -15,7 +15,7 @@ fun main() = runBlocking { subject.onNext("two") // now launch a coroutine to print everything GlobalScope.launch(Dispatchers.Unconfined) { // launch coroutine in unconfined context - subject.consumeEach { println(it) } + subject.collect { println(it) } } subject.onNext("three") subject.onNext("four") diff --git a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-08.kt b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-08.kt index 1eb82bcc24..d5233c0436 100644 --- a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-08.kt +++ b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-08.kt @@ -16,7 +16,7 @@ fun main() = runBlocking { subject.onNext("two") // now launch a coroutine to print the most recent update launch { // use the context of the main thread for a coroutine - subject.consumeEach { println(it) } + subject.collect { println(it) } } subject.onNext("three") subject.onNext("four") diff --git a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-context-04.kt b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-context-04.kt index 0e66738134..d6d5771700 100644 --- a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-context-04.kt +++ b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-context-04.kt @@ -20,5 +20,5 @@ fun rangeWithIntervalRx(scheduler: Scheduler, time: Long, start: Int, count: Int fun main() = runBlocking { rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3) - .consumeEach { println("$it on thread ${Thread.currentThread().name}") } + .collect { println("$it on thread ${Thread.currentThread().name}") } } diff --git a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-context-05.kt b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-context-05.kt index ca09d4a7a4..614e534ad2 100644 --- a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-context-05.kt +++ b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-context-05.kt @@ -21,7 +21,7 @@ fun rangeWithIntervalRx(scheduler: Scheduler, time: Long, start: Int, count: Int fun main() = runBlocking { val job = launch(Dispatchers.Unconfined) { // launch a new coroutine in Unconfined context (without its own thread pool) rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3) - .consumeEach { println("$it on thread ${Thread.currentThread().name}") } + .collect { println("$it on thread ${Thread.currentThread().name}") } } job.join() // wait for our coroutine to complete } diff --git a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-01.kt b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-01.kt index 8d52cbb208..bf79a6565d 100644 --- a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-01.kt +++ b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-01.kt @@ -15,5 +15,5 @@ fun CoroutineScope.range(context: CoroutineContext, start: Int, count: Int) = pu fun main() = runBlocking { // Range inherits parent job from runBlocking, but overrides dispatcher with Dispatchers.Default - range(Dispatchers.Default, 1, 5).consumeEach { println(it) } + range(Dispatchers.Default, 1, 5).collect { println(it) } } diff --git a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-02.kt b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-02.kt index d1ce00c67e..defd8e5144 100644 --- a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-02.kt +++ b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-02.kt @@ -15,7 +15,7 @@ fun Publisher.fusedFilterMap( predicate: (T) -> Boolean, // the filter predicate mapper: (T) -> R // the mapper function ) = GlobalScope.publish(context) { - consumeEach { // consume the source stream + collect { // collect the source stream if (predicate(it)) // filter part send(mapper(it)) // map part } @@ -28,5 +28,5 @@ fun CoroutineScope.range(start: Int, count: Int) = publish { fun main() = runBlocking { range(1, 5) .fusedFilterMap(coroutineContext, { it % 2 == 0}, { "$it is even" }) - .consumeEach { println(it) } // print all the resulting strings + .collect { println(it) } // print all the resulting strings } diff --git a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-03.kt b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-03.kt index 4c49326b3e..f1be007eab 100644 --- a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-03.kt +++ b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-03.kt @@ -18,8 +18,8 @@ fun Publisher.takeUntil(context: CoroutineContext, other: Publisher other.openSubscription().consume { // explicitly open channel to Publisher val other = this whileSelect { - other.onReceive { false } // bail out on any received element from `other` - current.onReceive { send(it); true } // resend element from this channel and continue + other.onReceive { false } // bail out on any received element from `other` + current.onReceive { send(it); true } // resend element from this channel and continue } } } @@ -35,5 +35,5 @@ fun CoroutineScope.rangeWithInterval(time: Long, start: Int, count: Int) = publi fun main() = runBlocking { val slowNums = rangeWithInterval(200, 1, 10) // numbers with 200ms interval val stop = rangeWithInterval(500, 1, 10) // the first one after 500ms - slowNums.takeUntil(coroutineContext, stop).consumeEach { println(it) } // let's test it + slowNums.takeUntil(coroutineContext, stop).collect { println(it) } // let's test it } diff --git a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-04.kt b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-04.kt index 44d1557751..bbc2e7bf4d 100644 --- a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-04.kt +++ b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-04.kt @@ -11,9 +11,9 @@ import org.reactivestreams.* import kotlin.coroutines.* fun Publisher>.merge(context: CoroutineContext) = GlobalScope.publish(context) { - consumeEach { pub -> // for each publisher received on the source channel + collect { pub -> // for each publisher collected launch { // launch a child coroutine - pub.consumeEach { send(it) } // resend all element from this publisher + pub.collect { send(it) } // resend all element from this publisher } } } @@ -33,5 +33,5 @@ fun CoroutineScope.testPub() = publish> { } fun main() = runBlocking { - testPub().merge(coroutineContext).consumeEach { println(it) } // print the whole stream + testPub().merge(coroutineContext).collect { println(it) } // print the whole stream }