From 8a701eaf80a5323e267fed6b5c1927e0037f8c27 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Mon, 20 May 2019 14:59:33 +0300 Subject: [PATCH 1/3] Promote ReceiveChannel.consumeEach and ReceiveChannel.consume to experimental API Fixes #1080 --- .../common/src/channels/Channels.common.kt | 26 ++++--------------- 1 file changed, 5 insertions(+), 21 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/channels/Channels.common.kt b/kotlinx-coroutines-core/common/src/channels/Channels.common.kt index b13dce2704..f7e3d208e4 100644 --- a/kotlinx-coroutines-core/common/src/channels/Channels.common.kt +++ b/kotlinx-coroutines-core/common/src/channels/Channels.common.kt @@ -100,18 +100,9 @@ public fun consumesAll(vararg channels: ReceiveChannel<*>): CompletionHandler = * Makes sure that the given [block] consumes all elements from the given channel * by always invoking [cancel][ReceiveChannel.cancel] after the execution of the block. * - * **WARNING**: It is planned that in the future a second invocation of this method - * on an channel that is already being consumed is going to fail fast, that is - * immediately throw an [IllegalStateException]. - * See [this issue](https://github.com/Kotlin/kotlinx.coroutines/issues/167) - * for details. - * * The operation is _terminal_. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ -@ObsoleteCoroutinesApi +@ExperimentalCoroutinesApi public inline fun ReceiveChannel.consume(block: ReceiveChannel.() -> R): R { var cause: Throwable? = null try { @@ -125,21 +116,14 @@ public inline fun ReceiveChannel.consume(block: ReceiveChannel.() - } /** - * Performs the given [action] for each received element. - * - * **WARNING**: It is planned that in the future a second invocation of this method - * on an channel that is already being consumed is going to fail fast, that is - * immediately throw an [IllegalStateException]. - * See [this issue](https://github.com/Kotlin/kotlinx.coroutines/issues/167) - * for details. + * Performs the given [action] for each received element and [cancels][ReceiveChannel.cancel] + * the channel after the execution of the block. + * If you need to iterate over the channel without consuming it, a regular `for` loop should be used instead. * * The operation is _terminal_. * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ -@ObsoleteCoroutinesApi +@ExperimentalCoroutinesApi public suspend inline fun ReceiveChannel.consumeEach(action: (E) -> Unit) = consume { for (e in this) action(e) From 7f95b3d9229cc38c24bcf7849741eae23be41289 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Mon, 20 May 2019 19:59:56 +0300 Subject: [PATCH 2/3] Remove mention of #167 from the doc --- reactive/coroutines-guide-reactive.md | 6 ------ 1 file changed, 6 deletions(-) diff --git a/reactive/coroutines-guide-reactive.md b/reactive/coroutines-guide-reactive.md index de819df069..63288c3b2b 100644 --- a/reactive/coroutines-guide-reactive.md +++ b/reactive/coroutines-guide-reactive.md @@ -205,12 +205,6 @@ We have two of them in this code and that is why we see "Begin" printed twice. In Rx lingo this is called a _cold_ publisher. Many standard Rx operators produce cold streams, too. We can collect them from a coroutine, and every collector gets the same stream of elements. -**WARNING**: It is planned that in the future a second invocation of `consumeEach` method -on an channel that is already being consumed is going to fail fast, that is -immediately throw an `IllegalStateException`. -See [this issue](https://github.com/Kotlin/kotlinx.coroutines/issues/167) -for details. - > Note that we can replicate the same behaviour that we saw with channels by using Rx [publish](https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#publish()) operator and [connect](https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/flowables/ConnectableFlowable.html#connect()) From 22704f42de2480c313f6ce4b4f153faa6609c468 Mon Sep 17 00:00:00 2001 From: Roman Elizarov Date: Fri, 24 May 2019 00:19:39 +0300 Subject: [PATCH 3/3] Added version comment to experimental annotation of channels consume --- .../common/src/channels/Channels.common.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/channels/Channels.common.kt b/kotlinx-coroutines-core/common/src/channels/Channels.common.kt index f7e3d208e4..c14929cae1 100644 --- a/kotlinx-coroutines-core/common/src/channels/Channels.common.kt +++ b/kotlinx-coroutines-core/common/src/channels/Channels.common.kt @@ -102,7 +102,7 @@ public fun consumesAll(vararg channels: ReceiveChannel<*>): CompletionHandler = * * The operation is _terminal_. */ -@ExperimentalCoroutinesApi +@ExperimentalCoroutinesApi // since 1.3.0, tentatively graduates in 1.4.0 public inline fun ReceiveChannel.consume(block: ReceiveChannel.() -> R): R { var cause: Throwable? = null try { @@ -123,7 +123,7 @@ public inline fun ReceiveChannel.consume(block: ReceiveChannel.() - * The operation is _terminal_. * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. */ -@ExperimentalCoroutinesApi +@ExperimentalCoroutinesApi // since 1.3.0, tentatively graduates in 1.4.0 public suspend inline fun ReceiveChannel.consumeEach(action: (E) -> Unit) = consume { for (e in this) action(e)