Skip to content

Deprecate consumeEach on Publisher, ObservableSource and MaybeSource,… #1139

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ public final class kotlinx/coroutines/reactive/AwaitKt {
}

public final class kotlinx/coroutines/reactive/ChannelKt {
public static final fun collect (Lorg/reactivestreams/Publisher;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun consumeEach (Lorg/reactivestreams/Publisher;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun openSubscription (Lorg/reactivestreams/Publisher;I)Lkotlinx/coroutines/channels/ReceiveChannel;
public static synthetic fun openSubscription$default (Lorg/reactivestreams/Publisher;IILjava/lang/Object;)Lkotlinx/coroutines/channels/ReceiveChannel;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ public final class kotlinx/coroutines/rx2/RxAwaitKt {
}

public final class kotlinx/coroutines/rx2/RxChannelKt {
public static final fun collect (Lio/reactivex/MaybeSource;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun collect (Lio/reactivex/ObservableSource;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun consumeEach (Lio/reactivex/MaybeSource;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun consumeEach (Lio/reactivex/ObservableSource;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun openSubscription (Lio/reactivex/MaybeSource;)Lkotlinx/coroutines/channels/ReceiveChannel;
Expand Down
71 changes: 36 additions & 35 deletions reactive/coroutines-guide-reactive.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ Let us rewrite this code using [publish] coroutine builder from `kotlinx-corouti
instead of [produce] from `kotlinx-coroutines-core` module. The code stays the same,
but where `source` used to have [ReceiveChannel] type, it now has reactive streams
[Publisher](https://www.reactive-streams.org/reactive-streams-1.0.0-javadoc/org/reactivestreams/Publisher.html)
type.
type, where [consumeEach] was used to _consume_ elements from the channel,
now [collect][org.reactivestreams.Publisher.collect] is used to _collect_ elements from the publisher.

<!--- INCLUDE
import kotlinx.coroutines.*
Expand All @@ -162,12 +163,12 @@ fun main() = runBlocking<Unit> {
}
// print elements from the source
println("Elements:")
source.consumeEach { // consume elements from it
source.collect { // collect elements from it
println(it)
}
// print elements from the source AGAIN
println("Again:")
source.consumeEach { // consume elements from it
source.collect { // collect elements from it
println(it)
}
}
Expand All @@ -194,15 +195,15 @@ Begin

This example highlights the key difference between a reactive stream and a channel. A reactive stream is a higher-order
functional concept. While the channel _is_ a stream of elements, the reactive stream defines a recipe on how the stream of
elements is produced. It becomes the actual stream of elements on _subscription_. Each subscriber may receive the same or
elements is produced. It becomes the actual stream of elements when _collected_. Each collector may receive the same or
a different stream of elements, depending on how the corresponding implementation of `Publisher` works.

The [publish] coroutine builder, that is used in the above example, launches a fresh coroutine on each subscription.
Every [Publisher.consumeEach][org.reactivestreams.Publisher.consumeEach] invocation creates a fresh subscription.
The [publish] coroutine builder, that is used in the above example, does not launch a coroutine,
but every [collect][org.reactivestreams.Publisher.collect] invocation launches a coroutine.
We have two of them in this code and that is why we see "Begin" printed twice.

In Rx lingo this is called a _cold_ publisher. Many standard Rx operators produce cold streams, too. We can iterate
over them from a coroutine, and every subscription produces the same stream of elements.
In Rx lingo this is called a _cold_ publisher. Many standard Rx operators produce cold streams, too. We can collect
them from a coroutine, and every collector gets the same stream of elements.

**WARNING**: It is planned that in the future a second invocation of `consumeEach` method
on an channel that is already being consumed is going to fail fast, that is
Expand All @@ -217,10 +218,10 @@ method with it.

### Subscription and cancellation

An example in the previous section uses `source.consumeEach { ... }` snippet to open a subscription
and receive all the elements from it. If we need more control on how what to do with
the elements that are being received from the channel, we can use [Publisher.openSubscription][org.reactivestreams.Publisher.openSubscription]
as shown in the following example:
An example in the previous section uses `source.collect { ... }` to collect all elements.
Instead of collecting elements, we can open a channel using [openSubscription][org.reactivestreams.Publisher.openSubscription]
and iterate over it, so that we have more finer-grained control on our iteration,
for example using `break`, as shown below:

<!--- INCLUDE
import io.reactivex.*
Expand Down Expand Up @@ -259,17 +260,16 @@ Finally
```

<!--- TEST -->

With an explicit `openSubscription` we should [cancel][ReceiveChannel.cancel] the corresponding
subscription to unsubscribe from the source. There is no need to invoke `cancel` explicitly -- under the hood
`consume` does that for us.
subscription to unsubscribe from the source, but there is no need to call `cancel` explicitly -- under the hood
[consume] does that for us.
The installed
[doFinally](https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#doFinally(io.reactivex.functions.Action))
listener prints "Finally" to confirm that the subscription is actually being closed. Note that "OnComplete"
is never printed because we did not consume all of the elements.

We do not need to use an explicit `cancel` either if iteration is performed over all the items that are emitted
by the publisher, because it is being cancelled automatically by `consumeEach`:
We do not need to use an explicit `cancel` either if we `collect` all the elements:

<!--- INCLUDE
import io.reactivex.*
Expand All @@ -284,8 +284,8 @@ fun main() = runBlocking<Unit> {
.doOnSubscribe { println("OnSubscribe") } // provide some insight
.doOnComplete { println("OnComplete") } // ...
.doFinally { println("Finally") } // ... into what's going on
// iterate over the source fully
source.consumeEach { println(it) }
// collect the source fully
source.collect { println(it) }
}
```

Expand All @@ -308,7 +308,7 @@ Finally

Notice, how "OnComplete" and "Finally" are printed before the last element "5". It happens because our `main` function in this
example is a coroutine that we start with [runBlocking] coroutine builder.
Our main coroutine receives on the channel using `source.consumeEach { ... }` expression.
Our main coroutine receives on the flowable using `source.collect { ... }` expression.
The main coroutine is _suspended_ while it waits for the source to emit an item.
When the last item is emitted by `Flowable.range(1, 5)` it
_resumes_ the main coroutine, which gets dispatched onto the main thread to print this
Expand Down Expand Up @@ -422,7 +422,7 @@ You can subscribe to subjects from a coroutine just as with any other reactive s
<!--- INCLUDE
import io.reactivex.subjects.BehaviorSubject
import kotlinx.coroutines.*
import kotlinx.coroutines.rx2.consumeEach
import kotlinx.coroutines.rx2.collect
-->

```kotlin
Expand All @@ -432,7 +432,7 @@ fun main() = runBlocking<Unit> {
subject.onNext("two")
// now launch a coroutine to print everything
GlobalScope.launch(Dispatchers.Unconfined) { // launch coroutine in unconfined context
subject.consumeEach { println(it) }
subject.collect { println(it) }
}
subject.onNext("three")
subject.onNext("four")
Expand Down Expand Up @@ -476,7 +476,7 @@ fun main() = runBlocking<Unit> {
subject.onNext("two")
// now launch a coroutine to print the most recent update
launch { // use the context of the main thread for a coroutine
subject.consumeEach { println(it) }
subject.collect { println(it) }
}
subject.onNext("three")
subject.onNext("four")
Expand Down Expand Up @@ -584,7 +584,7 @@ It is straightforward to use from a coroutine:
```kotlin
fun main() = runBlocking<Unit> {
// Range inherits parent job from runBlocking, but overrides dispatcher with Dispatchers.Default
range(Dispatchers.Default, 1, 5).consumeEach { println(it) }
range(Dispatchers.Default, 1, 5).collect { println(it) }
}
```

Expand Down Expand Up @@ -623,7 +623,7 @@ fun <T, R> Publisher<T>.fusedFilterMap(
predicate: (T) -> Boolean, // the filter predicate
mapper: (T) -> R // the mapper function
) = GlobalScope.publish<R>(context) {
consumeEach { // consume the source stream
collect { // collect the source stream
if (predicate(it)) // filter part
send(mapper(it)) // map part
}
Expand All @@ -644,7 +644,7 @@ fun CoroutineScope.range(start: Int, count: Int) = publish<Int> {
fun main() = runBlocking<Unit> {
range(1, 5)
.fusedFilterMap(coroutineContext, { it % 2 == 0}, { "$it is even" })
.consumeEach { println(it) } // print all the resulting strings
.collect { println(it) } // print all the resulting strings
}
```

Expand Down Expand Up @@ -684,8 +684,8 @@ fun <T, U> Publisher<T>.takeUntil(context: CoroutineContext, other: Publisher<U>
other.openSubscription().consume { // explicitly open channel to Publisher<U>
val other = this
whileSelect {
other.onReceive { false } // bail out on any received element from `other`
current.onReceive { send(it); true } // resend element from this channel and continue
other.onReceive { false } // bail out on any received element from `other`
current.onReceive { send(it); true } // resend element from this channel and continue
}
}
}
Expand Down Expand Up @@ -717,7 +717,7 @@ The following code shows how `takeUntil` works:
fun main() = runBlocking<Unit> {
val slowNums = rangeWithInterval(200, 1, 10) // numbers with 200ms interval
val stop = rangeWithInterval(500, 1, 10) // the first one after 500ms
slowNums.takeUntil(coroutineContext, stop).consumeEach { println(it) } // let's test it
slowNums.takeUntil(coroutineContext, stop).collect { println(it) } // let's test it
}
```

Expand Down Expand Up @@ -749,9 +749,9 @@ import kotlin.coroutines.*

```kotlin
fun <T> Publisher<Publisher<T>>.merge(context: CoroutineContext) = GlobalScope.publish<T>(context) {
consumeEach { pub -> // for each publisher received on the source channel
collect { pub -> // for each publisher collected
launch { // launch a child coroutine
pub.consumeEach { send(it) } // resend all element from this publisher
pub.collect { send(it) } // resend all element from this publisher
}
}
}
Expand Down Expand Up @@ -792,7 +792,7 @@ The test code is to use `merge` on `testPub` and to display the results:

```kotlin
fun main() = runBlocking<Unit> {
testPub().merge(coroutineContext).consumeEach { println(it) } // print the whole stream
testPub().merge(coroutineContext).collect { println(it) } // print the whole stream
}
```

Expand Down Expand Up @@ -975,7 +975,7 @@ fun rangeWithIntervalRx(scheduler: Scheduler, time: Long, start: Int, count: Int

fun main() = runBlocking<Unit> {
rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3)
.consumeEach { println("$it on thread ${Thread.currentThread().name}") }
.collect { println("$it on thread ${Thread.currentThread().name}") }
}
```

Expand Down Expand Up @@ -1021,7 +1021,7 @@ fun rangeWithIntervalRx(scheduler: Scheduler, time: Long, start: Int, count: Int
fun main() = runBlocking<Unit> {
val job = launch(Dispatchers.Unconfined) { // launch a new coroutine in Unconfined context (without its own thread pool)
rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3)
.consumeEach { println("$it on thread ${Thread.currentThread().name}") }
.collect { println("$it on thread ${Thread.currentThread().name}") }
}
job.join() // wait for our coroutine to complete
}
Expand Down Expand Up @@ -1068,6 +1068,7 @@ coroutines for complex pipelines with fan-in and fan-out between multiple worker
[consumeEach]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/consume-each.html
[ReceiveChannel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/index.html
[ReceiveChannel.cancel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/cancel.html
[consume]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/consume.html
[SendChannel.send]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/send.html
[BroadcastChannel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-broadcast-channel/index.html
[ConflatedBroadcastChannel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-conflated-broadcast-channel/index.html
Expand All @@ -1077,7 +1078,7 @@ coroutines for complex pipelines with fan-in and fan-out between multiple worker
<!--- MODULE kotlinx-coroutines-reactive -->
<!--- INDEX kotlinx.coroutines.reactive -->
[publish]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/kotlinx.coroutines.-coroutine-scope/publish.html
[org.reactivestreams.Publisher.consumeEach]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/org.reactivestreams.-publisher/consume-each.html
[org.reactivestreams.Publisher.collect]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/org.reactivestreams.-publisher/collect.html
[org.reactivestreams.Publisher.openSubscription]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/org.reactivestreams.-publisher/open-subscription.html
<!--- MODULE kotlinx-coroutines-rx2 -->
<!--- INDEX kotlinx.coroutines.rx2 -->
Expand Down
9 changes: 8 additions & 1 deletion reactive/kotlinx-coroutines-reactive/src/Channel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,17 @@ public fun <T> Publisher<T>.openSubscription(request: Int = 0): ReceiveChannel<T
return channel
}

// Will be promoted to error in 1.3.0, removed in 1.4.0
@Deprecated(message = "Use collect instead", level = DeprecationLevel.WARNING, replaceWith = ReplaceWith("this.collect(action)"))
public suspend inline fun <T> Publisher<T>.consumeEach(action: (T) -> Unit) =
openSubscription().consumeEach(action)

/**
* Subscribes to this [Publisher] and performs the specified action for each received element.
* Cancels subscription if any exception happens during collect.
*/
public suspend inline fun <T> Publisher<T>.consumeEach(action: (T) -> Unit) =
@ExperimentalCoroutinesApi // Since 1.2.1, tentatively till 1.3.0
public suspend inline fun <T> Publisher<T>.collect(action: (T) -> Unit) =
openSubscription().consumeEach(action)

@Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
Expand Down
6 changes: 3 additions & 3 deletions reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class IntegrationTest(
assertNSE { pub.awaitLast() }
assertNSE { pub.awaitSingle() }
var cnt = 0
pub.consumeEach { cnt++ }
pub.collect { cnt++ }
assertThat(cnt, IsEqual(0))
}

Expand All @@ -67,7 +67,7 @@ class IntegrationTest(
assertThat(pub.awaitLast(), IsEqual("OK"))
assertThat(pub.awaitSingle(), IsEqual("OK"))
var cnt = 0
pub.consumeEach {
pub.collect {
assertThat(it, IsEqual("OK"))
cnt++
}
Expand Down Expand Up @@ -125,7 +125,7 @@ class IntegrationTest(

private suspend fun checkNumbers(n: Int, pub: Publisher<Int>) {
var last = 0
pub.consumeEach {
pub.collect {
assertThat(it, IsEqual(++last))
}
assertThat(last, IsEqual(n))
Expand Down
2 changes: 1 addition & 1 deletion reactive/kotlinx-coroutines-reactive/test/PublishTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ class PublishTest : TestBase() {
}
}
try {
pub.consumeEach {
pub.collect {
throw TestException()
}
} catch (e: TestException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class PublisherCompletionStressTest : TestBase() {
runBlocking {
withTimeout(5000) {
var received = 0
range(Dispatchers.Default, 1, count).consumeEach { x ->
range(Dispatchers.Default, 1, count).collect { x ->
received++
if (x != received) error("$x != $received")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class PublisherMultiTest : TestBase() {
jobs.forEach { it.join() }
}
val resultSet = mutableSetOf<Int>()
observable.consumeEach {
observable.collect {
assertTrue(resultSet.add(it))
}
assertThat(resultSet.size, IsEqual(n))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class FluxCompletionStressTest : TestBase() {
runBlocking {
withTimeout(5000) {
var received = 0
range(Dispatchers.Default, 1, count).consumeEach { x ->
range(Dispatchers.Default, 1, count).collect { x ->
received++
if (x != received) error("$x != $received")
}
Expand Down
4 changes: 2 additions & 2 deletions reactive/kotlinx-coroutines-reactor/test/FluxMultiTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class FluxMultiTest : TestBase() {
fun testIteratorResendUnconfined() {
val n = 10_000 * stressTestMultiplier
val flux = GlobalScope.flux(Dispatchers.Unconfined) {
Flux.range(0, n).consumeEach { send(it) }
Flux.range(0, n).collect { send(it) }
}
checkMonoValue(flux.collectList()) { list ->
assertEquals((0 until n).toList(), list)
Expand All @@ -57,7 +57,7 @@ class FluxMultiTest : TestBase() {
fun testIteratorResendPool() {
val n = 10_000 * stressTestMultiplier
val flux = GlobalScope.flux {
Flux.range(0, n).consumeEach { send(it) }
Flux.range(0, n).collect { send(it) }
}
checkMonoValue(flux.collectList()) { list ->
assertEquals((0 until n).toList(), list)
Expand Down
4 changes: 2 additions & 2 deletions reactive/kotlinx-coroutines-reactor/test/FluxSingleTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ class FluxSingleTest {
fun testFluxIteration() {
val flux = GlobalScope.flux {
var result = ""
Flux.just("O", "K").consumeEach { result += it }
Flux.just("O", "K").collect { result += it }
send(result)
}

Expand All @@ -193,7 +193,7 @@ class FluxSingleTest {
fun testFluxIterationFailure() {
val flux = GlobalScope.flux {
try {
Flux.error<String>(RuntimeException("OK")).consumeEach { fail("Should not be here") }
Flux.error<String>(RuntimeException("OK")).collect { fail("Should not be here") }
send("Fail")
} catch (e: RuntimeException) {
send(e.message!!)
Expand Down
4 changes: 2 additions & 2 deletions reactive/kotlinx-coroutines-reactor/test/FluxTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ class FluxTest : TestBase() {
expect(2)
val job = launch(start = CoroutineStart.UNDISPATCHED) {
expect(3)
observable.consumeEach {
observable.collect {
expect(8)
assertEquals("OK", it)
}
Expand All @@ -131,7 +131,7 @@ class FluxTest : TestBase() {
}
}
try {
pub.consumeEach {
pub.collect {
throw TestException()
}
} catch (e: TestException) {
Expand Down
Loading