diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api index cf9906fc5a..48df9648aa 100644 --- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api +++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api @@ -727,6 +727,7 @@ public final class kotlinx/coroutines/channels/ChannelsKt { public static final synthetic fun toMutableList (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun toMutableSet (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final synthetic fun toSet (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun trySendBlocking (Lkotlinx/coroutines/channels/SendChannel;Ljava/lang/Object;)Ljava/lang/Object; public static final synthetic fun withIndex (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/channels/ReceiveChannel; public static synthetic fun withIndex$default (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lkotlinx/coroutines/channels/ReceiveChannel; public static final synthetic fun zip (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlinx/coroutines/channels/ReceiveChannel;)Lkotlinx/coroutines/channels/ReceiveChannel; diff --git a/kotlinx-coroutines-core/common/src/flow/Builders.kt b/kotlinx-coroutines-core/common/src/flow/Builders.kt index 3a43ccb7e0..83ae9136eb 100644 --- a/kotlinx-coroutines-core/common/src/flow/Builders.kt +++ b/kotlinx-coroutines-core/common/src/flow/Builders.kt @@ -261,7 +261,6 @@ public fun flowViaChannel( * } * ``` */ -@ExperimentalCoroutinesApi public fun channelFlow(@BuilderInference block: suspend ProducerScope.() -> Unit): Flow = ChannelFlowBuilder(block) @@ -302,11 +301,10 @@ public fun channelFlow(@BuilderInference block: suspend ProducerScope.() * override fun onNextValue(value: T) { * // To avoid blocking you can configure channel capacity using * // either buffer(Channel.CONFLATED) or buffer(Channel.UNLIMITED) to avoid overfill - * try { - * sendBlocking(value) - * } catch (e: Exception) { - * // Handle exception from the channel: failure in flow or premature closing - * } + * trySendBlocking(value) + * .onFailure { throwable -> + * // Downstream has been cancelled or failed, can log here + * } * } * override fun onApiError(cause: Throwable) { * cancel(CancellationException("API Error", cause)) @@ -327,7 +325,6 @@ public fun channelFlow(@BuilderInference block: suspend ProducerScope.() * > `awaitClose` block can be called at any time due to asynchronous nature of cancellation, even * > concurrently with the call of the callback. */ -@ExperimentalCoroutinesApi public fun callbackFlow(@BuilderInference block: suspend ProducerScope.() -> Unit): Flow = CallbackFlowBuilder(block) // ChannelFlow implementation that is the first in the chain of flow operations and introduces (builds) a flow diff --git a/kotlinx-coroutines-core/jvm/src/channels/Channels.kt b/kotlinx-coroutines-core/jvm/src/channels/Channels.kt index 52adfea276..ea0b639e90 100644 --- a/kotlinx-coroutines-core/jvm/src/channels/Channels.kt +++ b/kotlinx-coroutines-core/jvm/src/channels/Channels.kt @@ -10,12 +10,44 @@ package kotlinx.coroutines.channels import kotlinx.coroutines.* /** - * Adds [element] into to this channel, **blocking** the caller while this channel is full, - * or throws exception if the channel [Channel.isClosedForSend] (see [Channel.close] for details). + * **Deprecated** blocking variant of send. + * This method is deprecated in the favour of [trySendBlocking]. * - * This is a way to call [Channel.send] method inside a blocking code using [runBlocking], - * so this function should not be used from coroutine. + * `sendBlocking` is a dangerous primitive — it throws an exception + * if the channel was closed or, more commonly, cancelled. + * Cancellation exceptions in non-blocking code are unexpected and frequently + * trigger internal failures. + * + * These bugs are hard-to-spot during code review and they forced users to write + * their own wrappers around `sendBlocking`. + * So this function is deprecated and replaced with a more explicit primitive. + * + * The real-world example of broken usage with Firebase: + * + * ```kotlin + * callbackFlow { + * val listener = object : ValueEventListener { + * override fun onDataChange(snapshot: DataSnapshot) { + * // This line may fail and crash the app when the downstream flow is cancelled + * sendBlocking(DataSnapshot(snapshot)) + * } + * + * override fun onCancelled(error: DatabaseError) { + * close(error.toException()) + * } + * } + * + * firebaseQuery.addValueEventListener(listener) + * awaitClose { firebaseQuery.removeEventListener(listener) } + * } + * ``` */ +@Deprecated( + level = DeprecationLevel.WARNING, + message = "Deprecated in the favour of 'trySendBlocking'. " + + "Consider handling the result of 'trySendBlocking' explicitly and rethrow exception if necessary", + replaceWith = ReplaceWith("trySendBlocking(element)") +) public fun SendChannel.sendBlocking(element: E) { // fast path if (offer(element)) @@ -25,3 +57,40 @@ public fun SendChannel.sendBlocking(element: E) { send(element) } } + +/** + * Adds [element] into to this channel, **blocking** the caller while this channel is full, + * and returning either [successful][ChannelResult.isSuccess] result when the element was added, or + * failed result representing closed channel with a corresponding exception. + * + * This is a way to call [Channel.send] method in a safe manner inside a blocking code using [runBlocking] and catching, + * so this function should not be used from coroutine. + * + * Example of usage: + * + * ``` + * // From callback API + * channel.trySendBlocking(element) + * .onSuccess { /* request next element or debug log */ } + * .onFailure { t: Throwable? -> /* throw or log */ } + * ``` + * + * For this operation it is guaranteed that [failure][ChannelResult.failed] always contains an exception in it. + * + * @throws [InterruptedException] if the current thread is interrupted during the blocking send operation. + */ +@Throws(InterruptedException::class) +public fun SendChannel.trySendBlocking(element: E): ChannelResult { + /* + * Sent successfully -- bail out. + * But failure may indicate either that the channel it full or that + * it is close. Go to slow path on failure to simplify the successful path and + * to materialize default exception. + */ + trySend(element).onSuccess { return ChannelResult.success(Unit) } + return runBlocking { + val r = runCatching { send(element) } + if (r.isSuccess) ChannelResult.success(Unit) + else ChannelResult.closed(r.exceptionOrNull()) + } +} diff --git a/kotlinx-coroutines-core/jvm/test/VirtualTimeSource.kt b/kotlinx-coroutines-core/jvm/test/VirtualTimeSource.kt index ca399f5325..d2dcfd2f89 100644 --- a/kotlinx-coroutines-core/jvm/test/VirtualTimeSource.kt +++ b/kotlinx-coroutines-core/jvm/test/VirtualTimeSource.kt @@ -142,7 +142,7 @@ internal class VirtualTimeSource( } private fun minParkedTill(): Long = - threads.values.map { if (it.permit) NOT_PARKED else it.parkedTill }.min() ?: NOT_PARKED + threads.values.map { if (it.permit) NOT_PARKED else it.parkedTill }.minOrNull() ?: NOT_PARKED @Synchronized fun shutdown() { diff --git a/kotlinx-coroutines-core/jvm/test/channels/ChannelsJvmTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ChannelsJvmTest.kt index da20f0c5ee..2e4ba9ac0e 100644 --- a/kotlinx-coroutines-core/jvm/test/channels/ChannelsJvmTest.kt +++ b/kotlinx-coroutines-core/jvm/test/channels/ChannelsJvmTest.kt @@ -11,7 +11,7 @@ import kotlin.test.* class ChannelsJvmTest : TestBase() { @Test - fun testBlocking() { + fun testTrySendBlocking() { val ch = Channel() val sum = GlobalScope.async { var sum = 0 @@ -19,9 +19,38 @@ class ChannelsJvmTest : TestBase() { sum } repeat(10) { - ch.sendBlocking(it) + assertTrue(ch.trySendBlocking(it).isSuccess) } ch.close() assertEquals(45, runBlocking { sum.await() }) } + + // Uncomment lines when migrated to 1.5, these are bugs in inline classes codegen + @Test + fun testTrySendBlockingClosedChannel() { + run { + val channel = Channel().also { it.close() } + channel.trySendBlocking(Unit) + .onSuccess { expectUnreached() } + .onFailure { assertTrue(it is ClosedSendChannelException) } +// .also { assertTrue { it.isClosed } } + } + + run { + val channel = Channel().also { it.close(TestException()) } + channel.trySendBlocking(Unit) + .onSuccess { expectUnreached() } + .onFailure { assertTrue(it is TestException) } +// .also { assertTrue { it.isClosed } } + } + + run { + val channel = Channel().also { it.cancel(TestCancellationException()) } + channel.trySendBlocking(Unit) + .onSuccess { expectUnreached() } + .onFailure { assertTrue(it is TestCancellationException) } +// .also { assertTrue { it.isClosed } } + } + } + } diff --git a/kotlinx-coroutines-core/jvm/test/flow/StateFlowStressTest.kt b/kotlinx-coroutines-core/jvm/test/flow/StateFlowStressTest.kt index dc3cd43c93..e55eaad127 100644 --- a/kotlinx-coroutines-core/jvm/test/flow/StateFlowStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/flow/StateFlowStressTest.kt @@ -62,7 +62,7 @@ class StateFlowStressTest : TestBase() { for (second in 1..nSeconds) { delay(1000) val cs = collected.map { it.sum() } - println("$second: emitted=${emitted.sum()}, collected=${cs.min()}..${cs.max()}") + println("$second: emitted=${emitted.sum()}, collected=${cs.minOrNull()}..${cs.maxOrNull()}") } emitters.cancelAndJoin() collectors.cancelAndJoin() @@ -77,4 +77,4 @@ class StateFlowStressTest : TestBase() { @Test fun testTenEmittersAndCollectors() = stress(10, 10) -} \ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/SchedulerTestBase.kt b/kotlinx-coroutines-core/jvm/test/scheduling/SchedulerTestBase.kt index bfabf5b2f3..72d7c90882 100644 --- a/kotlinx-coroutines-core/jvm/test/scheduling/SchedulerTestBase.kt +++ b/kotlinx-coroutines-core/jvm/test/scheduling/SchedulerTestBase.kt @@ -49,7 +49,7 @@ abstract class SchedulerTestBase : TestBase() { private fun maxSequenceNumber(): Int? { return Thread.getAllStackTraces().keys.asSequence().filter { it is CoroutineScheduler.Worker } - .map { sequenceNumber(it.name) }.max() + .map { sequenceNumber(it.name) }.maxOrNull() } private fun sequenceNumber(threadName: String): Int { @@ -105,4 +105,4 @@ abstract class SchedulerTestBase : TestBase() { } } } -} \ No newline at end of file +} diff --git a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt index 14c24942ac..a64e6d02ed 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt @@ -82,10 +82,14 @@ public fun ObservableSource.asFlow(): Flow = callbackFlow { override fun onComplete() { close() } override fun onSubscribe(d: Disposable) { if (!disposableRef.compareAndSet(null, d)) d.dispose() } override fun onNext(t: T) { + /* + * Channel was closed by the downstream, so the exception (if any) + * also was handled by the same downstream + */ try { - sendBlocking(t) - } catch (ignored: Throwable) { // TODO: Replace when this issue is fixed: https://github.com/Kotlin/kotlinx.coroutines/issues/974 - // Is handled by the downstream flow + trySendBlocking(t) + } catch (e: InterruptedException) { + // RxJava interrupts the source } } override fun onError(e: Throwable) { close(e) } diff --git a/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt index 63e30f2617..b0ba4ab2e0 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt @@ -82,10 +82,14 @@ public fun ObservableSource.asFlow(): Flow = callbackFlow { override fun onComplete() { close() } override fun onSubscribe(d: Disposable) { if (!disposableRef.compareAndSet(null, d)) d.dispose() } override fun onNext(t: T) { + /* + * Channel was closed by the downstream, so the exception (if any) + * also was handled by the same downstream + */ try { - sendBlocking(t) - } catch (ignored: Throwable) { // TODO: Replace when this issue is fixed: https://github.com/Kotlin/kotlinx.coroutines/issues/974 - // Is handled by the downstream flow + trySendBlocking(t) + } catch (e: InterruptedException) { + // RxJava interrupts the source } } override fun onError(e: Throwable) { close(e) }