Skip to content

Add ChannelResult.onClosed #2665

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,7 @@ public final class kotlinx/coroutines/channels/ChannelKt {
public static synthetic fun Channel$default (IILjava/lang/Object;)Lkotlinx/coroutines/channels/Channel;
public static synthetic fun Channel$default (ILkotlinx/coroutines/channels/BufferOverflow;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/channels/Channel;
public static final fun getOrElse-WpGqRn0 (Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)Ljava/lang/Object;
public static final fun onClosed-WpGqRn0 (Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)Ljava/lang/Object;
public static final fun onFailure-WpGqRn0 (Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)Ljava/lang/Object;
public static final fun onSuccess-WpGqRn0 (Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)Ljava/lang/Object;
}
Expand Down
36 changes: 34 additions & 2 deletions kotlinx-coroutines-core/common/src/channels/Channel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -144,13 +144,22 @@ public interface SendChannel<in E> {
* oversee such error during code review.
* * Its name was not aligned with the rest of the API and tried to mimic Java's queue instead.
*
* **NB** Automatic migration provides best-effort for the user experience, but requires removal
* or adjusting of the code that relied on the exception handling.
* The complete replacement has a more verbose form:
* ```
* channel.trySend(element)
* .onClosed { throw it ?: ClosedSendChannelException("Channel was closed normally") }
* .isSuccess
* ```
*
* See https://github.com/Kotlin/kotlinx.coroutines/issues/974 for more context.
*/
@Deprecated(
level = DeprecationLevel.WARNING,
message = "Deprecated in the favour of 'trySend' method",
replaceWith = ReplaceWith("trySend(element).isSuccess")
) // Since 1.5.0
) // Warning since 1.5.0
public fun offer(element: E): Boolean {
val result = trySend(element)
if (result.isSuccess) return true
Expand Down Expand Up @@ -297,7 +306,7 @@ public interface ReceiveChannel<out E> {
@Deprecated(level = DeprecationLevel.WARNING,
message = "Deprecated in the favour of 'tryReceive'",
replaceWith = ReplaceWith("tryReceive().getOrNull()")
) // Since 1.5.0
) // Warning since 1.5.0
public fun poll(): E? {
val result = tryReceive()
if (result.isSuccess) return result.getOrThrow()
Expand Down Expand Up @@ -362,6 +371,8 @@ public interface ReceiveChannel<out E> {
* E.g. when the channel is full, [Channel.trySend] returns failed result, but the channel itself is not in the failed state.
*
* The closed result represents an operation attempt to a closed channel and also implies that the operation has failed.
* It is guaranteed that if the result is _closed_, then the target channel is either [closed for send][Channel.isClosedForSend]
* or is [closed for receive][Channel.isClosedForReceive] depending on whether the failed operation was sending or receiving.
*/
@JvmInline
public value class ChannelResult<out T>
Expand Down Expand Up @@ -399,12 +410,14 @@ public value class ChannelResult<out T>
/**
* Returns the encapsulated value if this instance represents success or `null` if it represents failed result.
*/
@Suppress("UNCHECKED_CAST")
public fun getOrNull(): T? = if (holder !is Failed) holder as T else null

/**
* Returns the encapsulated value if this instance represents success or throws an exception if it is closed or failed.
*/
public fun getOrThrow(): T {
@Suppress("UNCHECKED_CAST")
if (holder !is Failed) return holder as T
if (holder is Closed && holder.cause != null) throw holder.cause
error("Trying to call 'getOrThrow' on a failed channel result: $holder")
Expand Down Expand Up @@ -495,6 +508,25 @@ public inline fun <T> ChannelResult<T>.onFailure(action: (exception: Throwable?)
return this
}

/**
* Performs the given [action] on the encapsulated [Throwable] exception if this instance represents [failure][ChannelResult.isFailure]
* due to channel being [closed][Channel.close].
* The result of [ChannelResult.exceptionOrNull] is passed to the [action] parameter.
* It is guaranteed that if action is invoked, then the channel is either [closed for send][Channel.isClosedForSend]
* or is [closed for receive][Channel.isClosedForReceive] depending on the failed operation.
*
* Returns the original `ChannelResult` unchanged.
*/
@OptIn(ExperimentalContracts::class)
public inline fun <T> ChannelResult<T>.onClosed(action: (exception: Throwable?) -> Unit): ChannelResult<T> {
contract {
callsInPlace(action, InvocationKind.AT_MOST_ONCE)
}
@Suppress("UNCHECKED_CAST")
if (holder is ChannelResult.Closed) action(exceptionOrNull())
return this
}

/**
* Iterator for [ReceiveChannel]. Instances of this interface are *not thread-safe* and shall not be used
* from concurrent coroutines.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ class BasicOperationsTest : TestBase() {
TestChannelKind.values().forEach { kind -> testSendReceive(kind, 20) }
}

@Test
fun testTrySendToFullChannel() = runTest {
TestChannelKind.values().forEach { kind -> testTrySendToFullChannel(kind) }
}

@Test
fun testTrySendAfterClose() = runTest {
TestChannelKind.values().forEach { kind -> testTrySend(kind) }
Expand Down Expand Up @@ -118,7 +123,7 @@ class BasicOperationsTest : TestBase() {
assertTrue(channel.isClosedForSend)
channel.trySend(2)
.onSuccess { expectUnreached() }
.onFailure {
.onClosed {
assertTrue { it is ClosedSendChannelException}
if (!kind.isConflated) {
assertEquals(42, channel.receive())
Expand All @@ -127,6 +132,21 @@ class BasicOperationsTest : TestBase() {
d.await()
}

private suspend fun testTrySendToFullChannel(kind: TestChannelKind) = coroutineScope {
if (kind.isConflated || kind.capacity == Int.MAX_VALUE) return@coroutineScope
val channel = kind.create<Int>()
// Make it full
repeat(11) {
channel.trySend(42)
}
channel.trySend(1)
.onSuccess { expectUnreached() }
.onFailure { assertNull(it) }
.onClosed {
expectUnreached()
}
}

/**
* [ClosedSendChannelException] should not be eaten.
* See [https://github.com/Kotlin/kotlinx.coroutines/issues/957]
Expand Down