From 2dd3bf06c284cada0256ba7579ed97cd35a614b3 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Wed, 21 Apr 2021 13:58:07 +0300 Subject: [PATCH] Add ChannelResult.onClosed * Establish clear contract on ChannelResult.isClosed * This method provides a **clear** migration from correct 'offer' usages to 'trySend' --- .../api/kotlinx-coroutines-core.api | 1 + .../common/src/channels/Channel.kt | 36 +++++++++++++++++-- .../test/channels/BasicOperationsTest.kt | 22 +++++++++++- 3 files changed, 56 insertions(+), 3 deletions(-) diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api index fdb70200f4..fa5924fda6 100644 --- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api +++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api @@ -635,6 +635,7 @@ public final class kotlinx/coroutines/channels/ChannelKt { public static synthetic fun Channel$default (IILjava/lang/Object;)Lkotlinx/coroutines/channels/Channel; public static synthetic fun Channel$default (ILkotlinx/coroutines/channels/BufferOverflow;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/channels/Channel; public static final fun getOrElse-WpGqRn0 (Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)Ljava/lang/Object; + public static final fun onClosed-WpGqRn0 (Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)Ljava/lang/Object; public static final fun onFailure-WpGqRn0 (Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)Ljava/lang/Object; public static final fun onSuccess-WpGqRn0 (Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)Ljava/lang/Object; } diff --git a/kotlinx-coroutines-core/common/src/channels/Channel.kt b/kotlinx-coroutines-core/common/src/channels/Channel.kt index 1812bddfa7..a0dbc6b9b4 100644 --- a/kotlinx-coroutines-core/common/src/channels/Channel.kt +++ b/kotlinx-coroutines-core/common/src/channels/Channel.kt @@ -144,13 +144,22 @@ public interface SendChannel { * oversee such error during code review. * * Its name was not aligned with the rest of the API and tried to mimic Java's queue instead. * + * **NB** Automatic migration provides best-effort for the user experience, but requires removal + * or adjusting of the code that relied on the exception handling. + * The complete replacement has a more verbose form: + * ``` + * channel.trySend(element) + * .onClosed { throw it ?: ClosedSendChannelException("Channel was closed normally") } + * .isSuccess + * ``` + * * See https://github.com/Kotlin/kotlinx.coroutines/issues/974 for more context. */ @Deprecated( level = DeprecationLevel.WARNING, message = "Deprecated in the favour of 'trySend' method", replaceWith = ReplaceWith("trySend(element).isSuccess") - ) // Since 1.5.0 + ) // Warning since 1.5.0 public fun offer(element: E): Boolean { val result = trySend(element) if (result.isSuccess) return true @@ -297,7 +306,7 @@ public interface ReceiveChannel { @Deprecated(level = DeprecationLevel.WARNING, message = "Deprecated in the favour of 'tryReceive'", replaceWith = ReplaceWith("tryReceive().getOrNull()") - ) // Since 1.5.0 + ) // Warning since 1.5.0 public fun poll(): E? { val result = tryReceive() if (result.isSuccess) return result.getOrThrow() @@ -362,6 +371,8 @@ public interface ReceiveChannel { * E.g. when the channel is full, [Channel.trySend] returns failed result, but the channel itself is not in the failed state. * * The closed result represents an operation attempt to a closed channel and also implies that the operation has failed. + * It is guaranteed that if the result is _closed_, then the target channel is either [closed for send][Channel.isClosedForSend] + * or is [closed for receive][Channel.isClosedForReceive] depending on whether the failed operation was sending or receiving. */ @JvmInline public value class ChannelResult @@ -399,12 +410,14 @@ public value class ChannelResult /** * Returns the encapsulated value if this instance represents success or `null` if it represents failed result. */ + @Suppress("UNCHECKED_CAST") public fun getOrNull(): T? = if (holder !is Failed) holder as T else null /** * Returns the encapsulated value if this instance represents success or throws an exception if it is closed or failed. */ public fun getOrThrow(): T { + @Suppress("UNCHECKED_CAST") if (holder !is Failed) return holder as T if (holder is Closed && holder.cause != null) throw holder.cause error("Trying to call 'getOrThrow' on a failed channel result: $holder") @@ -495,6 +508,25 @@ public inline fun ChannelResult.onFailure(action: (exception: Throwable?) return this } +/** + * Performs the given [action] on the encapsulated [Throwable] exception if this instance represents [failure][ChannelResult.isFailure] + * due to channel being [closed][Channel.close]. + * The result of [ChannelResult.exceptionOrNull] is passed to the [action] parameter. + * It is guaranteed that if action is invoked, then the channel is either [closed for send][Channel.isClosedForSend] + * or is [closed for receive][Channel.isClosedForReceive] depending on the failed operation. + * + * Returns the original `ChannelResult` unchanged. + */ +@OptIn(ExperimentalContracts::class) +public inline fun ChannelResult.onClosed(action: (exception: Throwable?) -> Unit): ChannelResult { + contract { + callsInPlace(action, InvocationKind.AT_MOST_ONCE) + } + @Suppress("UNCHECKED_CAST") + if (holder is ChannelResult.Closed) action(exceptionOrNull()) + return this +} + /** * Iterator for [ReceiveChannel]. Instances of this interface are *not thread-safe* and shall not be used * from concurrent coroutines. diff --git a/kotlinx-coroutines-core/common/test/channels/BasicOperationsTest.kt b/kotlinx-coroutines-core/common/test/channels/BasicOperationsTest.kt index 8962acc3dc..4538f6c680 100644 --- a/kotlinx-coroutines-core/common/test/channels/BasicOperationsTest.kt +++ b/kotlinx-coroutines-core/common/test/channels/BasicOperationsTest.kt @@ -14,6 +14,11 @@ class BasicOperationsTest : TestBase() { TestChannelKind.values().forEach { kind -> testSendReceive(kind, 20) } } + @Test + fun testTrySendToFullChannel() = runTest { + TestChannelKind.values().forEach { kind -> testTrySendToFullChannel(kind) } + } + @Test fun testTrySendAfterClose() = runTest { TestChannelKind.values().forEach { kind -> testTrySend(kind) } @@ -118,7 +123,7 @@ class BasicOperationsTest : TestBase() { assertTrue(channel.isClosedForSend) channel.trySend(2) .onSuccess { expectUnreached() } - .onFailure { + .onClosed { assertTrue { it is ClosedSendChannelException} if (!kind.isConflated) { assertEquals(42, channel.receive()) @@ -127,6 +132,21 @@ class BasicOperationsTest : TestBase() { d.await() } + private suspend fun testTrySendToFullChannel(kind: TestChannelKind) = coroutineScope { + if (kind.isConflated || kind.capacity == Int.MAX_VALUE) return@coroutineScope + val channel = kind.create() + // Make it full + repeat(11) { + channel.trySend(42) + } + channel.trySend(1) + .onSuccess { expectUnreached() } + .onFailure { assertNull(it) } + .onClosed { + expectUnreached() + } + } + /** * [ClosedSendChannelException] should not be eaten. * See [https://github.com/Kotlin/kotlinx.coroutines/issues/957]