Skip to content

Commit 7c47c79

Browse files
committed
Introduce trySend and tryReceive channel operations as a future replacement for error-prone offer, poll and receiveOrNull
Fixes #974
1 parent dbd7274 commit 7c47c79

File tree

12 files changed

+120
-50
lines changed

12 files changed

+120
-50
lines changed

Diff for: kotlinx-coroutines-core/api/kotlinx-coroutines-core.api

+20-1
Original file line numberDiff line numberDiff line change
@@ -555,6 +555,7 @@ public abstract interface class kotlinx/coroutines/channels/ActorScope : kotlinx
555555

556556
public final class kotlinx/coroutines/channels/ActorScope$DefaultImpls {
557557
public static synthetic fun cancel (Lkotlinx/coroutines/channels/ActorScope;)V
558+
public static fun poll (Lkotlinx/coroutines/channels/ActorScope;)Ljava/lang/Object;
558559
}
559560

560561
public abstract interface class kotlinx/coroutines/channels/BroadcastChannel : kotlinx/coroutines/channels/SendChannel {
@@ -566,6 +567,7 @@ public abstract interface class kotlinx/coroutines/channels/BroadcastChannel : k
566567
public final class kotlinx/coroutines/channels/BroadcastChannel$DefaultImpls {
567568
public static synthetic fun cancel$default (Lkotlinx/coroutines/channels/BroadcastChannel;Ljava/lang/Throwable;ILjava/lang/Object;)Z
568569
public static synthetic fun cancel$default (Lkotlinx/coroutines/channels/BroadcastChannel;Ljava/util/concurrent/CancellationException;ILjava/lang/Object;)V
570+
public static fun offer (Lkotlinx/coroutines/channels/BroadcastChannel;Ljava/lang/Object;)Z
569571
}
570572

571573
public final class kotlinx/coroutines/channels/BroadcastChannelKt {
@@ -598,6 +600,8 @@ public abstract interface class kotlinx/coroutines/channels/Channel : kotlinx/co
598600

599601
public final class kotlinx/coroutines/channels/Channel$DefaultImpls {
600602
public static synthetic fun cancel (Lkotlinx/coroutines/channels/Channel;)V
603+
public static fun offer (Lkotlinx/coroutines/channels/Channel;Ljava/lang/Object;)Z
604+
public static fun poll (Lkotlinx/coroutines/channels/Channel;)Ljava/lang/Object;
601605
}
602606

603607
public final class kotlinx/coroutines/channels/Channel$Factory {
@@ -628,7 +632,7 @@ public final class kotlinx/coroutines/channels/ChannelKt {
628632
public final class kotlinx/coroutines/channels/ChannelResult {
629633
public static final field Companion Lkotlinx/coroutines/channels/ChannelResult$Companion;
630634
public static final synthetic fun box-impl (Ljava/lang/Object;)Lkotlinx/coroutines/channels/ChannelResult;
631-
public static synthetic fun constructor-impl (Ljava/lang/Object;Lkotlin/jvm/internal/DefaultConstructorMarker;)Ljava/lang/Object;
635+
public static fun constructor-impl (Ljava/lang/Object;)Ljava/lang/Object;
632636
public fun equals (Ljava/lang/Object;)Z
633637
public static fun equals-impl (Ljava/lang/Object;Ljava/lang/Object;)Z
634638
public static final fun equals-impl0 (Ljava/lang/Object;Ljava/lang/Object;)Z
@@ -645,6 +649,12 @@ public final class kotlinx/coroutines/channels/ChannelResult {
645649
public final synthetic fun unbox-impl ()Ljava/lang/Object;
646650
}
647651

652+
public final class kotlinx/coroutines/channels/ChannelResult$Companion {
653+
public final fun closed-JP2dKIU (Ljava/lang/Throwable;)Ljava/lang/Object;
654+
public final fun failure-PtdJZtk ()Ljava/lang/Object;
655+
public final fun success-JP2dKIU (Ljava/lang/Object;)Ljava/lang/Object;
656+
}
657+
648658
public final class kotlinx/coroutines/channels/ChannelsKt {
649659
public static final fun all (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
650660
public static final fun any (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
@@ -789,6 +799,7 @@ public final class kotlinx/coroutines/channels/ConflatedBroadcastChannel : kotli
789799
public fun offer (Ljava/lang/Object;)Z
790800
public fun openSubscription ()Lkotlinx/coroutines/channels/ReceiveChannel;
791801
public fun send (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
802+
public fun trySend-JP2dKIU (Ljava/lang/Object;)Ljava/lang/Object;
792803
}
793804

794805
public final class kotlinx/coroutines/channels/ProduceKt {
@@ -804,6 +815,10 @@ public abstract interface class kotlinx/coroutines/channels/ProducerScope : kotl
804815
public abstract fun getChannel ()Lkotlinx/coroutines/channels/SendChannel;
805816
}
806817

818+
public final class kotlinx/coroutines/channels/ProducerScope$DefaultImpls {
819+
public static fun offer (Lkotlinx/coroutines/channels/ProducerScope;Ljava/lang/Object;)Z
820+
}
821+
807822
public abstract interface class kotlinx/coroutines/channels/ReceiveChannel {
808823
public abstract synthetic fun cancel ()V
809824
public abstract synthetic fun cancel (Ljava/lang/Throwable;)Z
@@ -818,12 +833,14 @@ public abstract interface class kotlinx/coroutines/channels/ReceiveChannel {
818833
public abstract fun receive (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
819834
public abstract fun receiveCatching-JP2dKIU (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
820835
public abstract fun receiveOrNull (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
836+
public abstract fun tryReceive-PtdJZtk ()Ljava/lang/Object;
821837
}
822838

823839
public final class kotlinx/coroutines/channels/ReceiveChannel$DefaultImpls {
824840
public static synthetic fun cancel (Lkotlinx/coroutines/channels/ReceiveChannel;)V
825841
public static synthetic fun cancel$default (Lkotlinx/coroutines/channels/ReceiveChannel;Ljava/lang/Throwable;ILjava/lang/Object;)Z
826842
public static synthetic fun cancel$default (Lkotlinx/coroutines/channels/ReceiveChannel;Ljava/util/concurrent/CancellationException;ILjava/lang/Object;)V
843+
public static fun poll (Lkotlinx/coroutines/channels/ReceiveChannel;)Ljava/lang/Object;
827844
}
828845

829846
public abstract interface class kotlinx/coroutines/channels/SendChannel {
@@ -834,10 +851,12 @@ public abstract interface class kotlinx/coroutines/channels/SendChannel {
834851
public abstract fun isFull ()Z
835852
public abstract fun offer (Ljava/lang/Object;)Z
836853
public abstract fun send (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
854+
public abstract fun trySend-JP2dKIU (Ljava/lang/Object;)Ljava/lang/Object;
837855
}
838856

839857
public final class kotlinx/coroutines/channels/SendChannel$DefaultImpls {
840858
public static synthetic fun close$default (Lkotlinx/coroutines/channels/SendChannel;Ljava/lang/Throwable;ILjava/lang/Object;)Z
859+
public static fun offer (Lkotlinx/coroutines/channels/SendChannel;Ljava/lang/Object;)Z
841860
}
842861

843862
public final class kotlinx/coroutines/channels/TickerChannelsKt {

Diff for: kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt

+15-11
Original file line numberDiff line numberDiff line change
@@ -137,20 +137,22 @@ internal abstract class AbstractSendChannel<E>(
137137
return sendSuspend(element)
138138
}
139139

140-
public final override fun offer(element: E): Boolean {
140+
public final override fun trySend(element: E): ChannelResult<Unit> {
141141
val result = offerInternal(element)
142142
return when {
143-
result === OFFER_SUCCESS -> true
143+
result === OFFER_SUCCESS -> ChannelResult.success(Unit)
144144
result === OFFER_FAILED -> {
145-
// We should check for closed token on offer as well, otherwise offer won't be linearizable
145+
// We should check for closed token on trySend as well, otherwise trySend won't be linearizable
146146
// in the face of concurrent close()
147147
// See https://github.com/Kotlin/kotlinx.coroutines/issues/359
148-
throw recoverStackTrace(helpCloseAndGetSendException(element, closedForSend ?: return false))
148+
val closedForSend = closedForSend ?: return ChannelResult.failure()
149+
helpClose(closedForSend)
150+
ChannelResult.closed(closedForSend.sendException)
149151
}
150152
result is Closed<*> -> {
151-
throw recoverStackTrace(helpCloseAndGetSendException(element, result))
153+
ChannelResult.closed(helpCloseAndGetSendException(element, result))
152154
}
153-
else -> error("offerInternal returned $result")
155+
else -> error("trySend returned $result")
154156
}
155157
}
156158

@@ -632,9 +634,11 @@ internal abstract class AbstractChannel<E>(
632634
}
633635

634636
@Suppress("UNCHECKED_CAST")
635-
public final override fun poll(): E? {
637+
public final override fun tryReceive(): ChannelResult<E> {
636638
val result = pollInternal()
637-
return if (result === POLL_FAILED) null else receiveOrNullResult(result)
639+
if (result === POLL_FAILED) return ChannelResult.failure()
640+
if (result is Closed<*>) return ChannelResult.closed(result.closeCause)
641+
return ChannelResult.success(result as E)
638642
}
639643

640644
@Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.2.0, binary compatibility with versions <= 1.1.x")
@@ -905,7 +909,7 @@ internal abstract class AbstractChannel<E>(
905909
@JvmField val receiveMode: Int
906910
) : Receive<E>() {
907911
fun resumeValue(value: E): Any? = when (receiveMode) {
908-
RECEIVE_RESULT -> ChannelResult.value(value)
912+
RECEIVE_RESULT -> ChannelResult.success(value)
909913
else -> value
910914
}
911915

@@ -990,7 +994,7 @@ internal abstract class AbstractChannel<E>(
990994
@Suppress("UNCHECKED_CAST")
991995
override fun completeResumeReceive(value: E) {
992996
block.startCoroutineCancellable(
993-
if (receiveMode == RECEIVE_RESULT) ChannelResult.value(value) else value,
997+
if (receiveMode == RECEIVE_RESULT) ChannelResult.success(value) else value,
994998
select.completion,
995999
resumeOnCancellationFun(value)
9961000
)
@@ -1144,7 +1148,7 @@ internal abstract class Receive<in E> : LockFreeLinkedListNode(), ReceiveOrClose
11441148

11451149
@Suppress("NOTHING_TO_INLINE", "UNCHECKED_CAST")
11461150
private inline fun <E> Any?.toResult(): ChannelResult<E> =
1147-
if (this is Closed<*>) ChannelResult.closed(closeCause) else ChannelResult.value(this as E)
1151+
if (this is Closed<*>) ChannelResult.closed(closeCause) else ChannelResult.success(this as E)
11481152

11491153
@Suppress("NOTHING_TO_INLINE")
11501154
private inline fun <E> Closed<*>.toResult(): ChannelResult<E> = ChannelResult.closed(closeCause)

Diff for: kotlinx-coroutines-core/common/src/channels/Channel.kt

+55-20
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,23 @@ public interface SendChannel<in E> {
8585
* then it calls `onUndeliveredElement` before throwing an exception.
8686
* See "Undelivered elements" section in [Channel] documentation for details on handling undelivered elements.
8787
*/
88-
public fun offer(element: E): Boolean
88+
public fun offer(element: E): Boolean {
89+
val result = trySend(element)
90+
if (result.isSuccess) return true
91+
throw recoverStackTrace(result.exceptionOrNull() ?: return false)
92+
}
93+
94+
/**
95+
* Immediately adds the specified [element] to this channel, if this doesn't violate its capacity restrictions,
96+
* and returns the successful result. Otherwise, returns failed or closed result.
97+
* This is synchronous variant of [send], which backs off in situations when `send` suspends or throws.
98+
*
99+
* When `trySend` call returns a non-successful result, it guarantees that the element was not delivered to the consumer, and
100+
* it does not call `onUndeliveredElement` that was installed for this channel. If the channel was closed,
101+
* then it calls `onUndeliveredElement` before throwing an exception.
102+
* See "Undelivered elements" section in [Channel] documentation for details on handling undelivered elements.
103+
*/
104+
public fun trySend(element: E): ChannelResult<Unit>
89105

90106
/**
91107
* Closes this channel.
@@ -271,11 +287,22 @@ public interface ReceiveChannel<out E> {
271287
public val onReceiveCatching: SelectClause1<ChannelResult<E>>
272288

273289
/**
274-
* Retrieves and removes an element from this channel if its not empty, or returns `null` if the channel is empty
290+
* Retrieves and removes an element from this channel if it's not empty or returns `null` if the channel is empty
275291
* or is [is closed for `receive`][isClosedForReceive] without a cause.
276292
* It throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
277293
*/
278-
public fun poll(): E?
294+
public fun poll(): E? {
295+
val result = tryReceive()
296+
if (result.isSuccess) return result.getOrThrow()
297+
throw recoverStackTrace(result.exceptionOrNull() ?: return null)
298+
}
299+
300+
/**
301+
* Retrieves and removes an element from this channel if it's not empty, returning a [successful][ChannelResult.success]
302+
* result, returns [failed][ChannelResult.failed] result if the channel is empty, and [closed][ChannelResult.closed]
303+
* result if the channel is closed.
304+
*/
305+
public fun tryReceive(): ChannelResult<E>
279306

280307
/**
281308
* Returns a new iterator to receive elements from this channel using a `for` loop.
@@ -315,35 +342,35 @@ public interface ReceiveChannel<out E> {
315342

316343
/**
317344
* A discriminated union of channel operation result.
318-
* It encapsulates successful or failed result of a channel operation, or a failed operation to a closed channel with
345+
* It encapsulates the successful or failed result of a channel operation or a failed operation to a closed channel with
319346
* an optional cause.
320347
*
321-
* Successful result represents a successful operation with value of type [T], for example, result of [Channel.receiveCatching]
322-
* operation or a successfully sent element as a result of [Channel.trySend].
348+
* The successful result represents a successful operation with a value of type [T], for example,
349+
* the result of [Channel.receiveCatching] operation or a successfully sent element as a result of [Channel.trySend].
323350
*
324-
* Failed result represents a failed operation attempt to a channel, but it doesn't necessary indicate that the channel is failed.
351+
* The failed result represents a failed operation attempt to a channel, but it doesn't necessary indicate that the channel is failed.
325352
* E.g. when the channel is full, [Channel.trySend] returns failed result, but the channel itself is not in the failed state.
326353
*
327-
* Closed result represents an operation attempt to a closed channel and also implies that the operation was failed.
354+
* The closed result represents an operation attempt to a closed channel and also implies that the operation was failed.
328355
*/
329356
@Suppress("UNCHECKED_CAST")
330357
public inline class ChannelResult<out T>
331-
internal constructor(private val holder: Any?) {
358+
@PublishedApi internal constructor(private val holder: Any?) {
332359
/**
333360
* Returns `true` if this instance represents a successful
334361
* operation outcome.
335362
*
336-
* In this case [isFailure] and [isClosed] return false.
363+
* In this case [isFailure] and [isClosed] return `false`.
337364
*/
338-
public val isSuccess: Boolean get() = holder !is Closed
365+
public val isSuccess: Boolean get() = holder !is Failed
339366

340367
/**
341-
* Returns true if this instance represents unsuccessful operation.
368+
* Returns `true` if this instance represents unsuccessful operation.
342369
*
343370
* In this case [isSuccess] returns false, but it does not imply
344371
* that the channel is failed or closed.
345372
*
346-
* Example of failed operation without an exception and channel being closed
373+
* Example of a failed operation without an exception and channel being closed
347374
* is [Channel.trySend] attempt to a channel that is full.
348375
*/
349376
public val isFailure: Boolean get() = holder is Failed
@@ -352,7 +379,7 @@ internal constructor(private val holder: Any?) {
352379
* Returns `true` if this instance represents unsuccessful operation
353380
* to a closed or cancelled channel.
354381
*
355-
* In this case [isSuccess] returns false, [isFailure] returns `true`, but it does not imply
382+
* In this case [isSuccess] returns `false`, [isFailure] returns `true`, but it does not imply
356383
* that [exceptionOrNull] returns non-null value.
357384
*
358385
* It can happen if the channel was [closed][Channel.close] normally without an exception.
@@ -374,7 +401,7 @@ internal constructor(private val holder: Any?) {
374401
}
375402

376403
/**
377-
* Returns the encapsulated exception if this instance represents failure or null if it is success
404+
* Returns the encapsulated exception if this instance represents failure or `null` if it is success
378405
* or unsuccessful operation to closed channel.
379406
*/
380407
public fun exceptionOrNull(): Throwable? = (holder as? Closed)?.cause
@@ -389,13 +416,21 @@ internal constructor(private val holder: Any?) {
389416
override fun toString(): String = "Closed($cause)"
390417
}
391418

392-
internal companion object {
393-
@Suppress("NOTHING_TO_INLINE")
394-
internal inline fun <E> value(value: E): ChannelResult<E> =
419+
@Suppress("NOTHING_TO_INLINE")
420+
@InternalCoroutinesApi
421+
public companion object {
422+
private val failed = Failed()
423+
424+
@InternalCoroutinesApi
425+
public fun <E> success(value: E): ChannelResult<E> =
395426
ChannelResult(value)
396427

397-
@Suppress("NOTHING_TO_INLINE")
398-
internal inline fun <E> closed(cause: Throwable?): ChannelResult<E> =
428+
@InternalCoroutinesApi
429+
public fun <E> failure(): ChannelResult<E> =
430+
ChannelResult(failed)
431+
432+
@InternalCoroutinesApi
433+
public fun <E> closed(cause: Throwable?): ChannelResult<E> =
399434
ChannelResult(Closed(cause))
400435
}
401436

Diff for: kotlinx-coroutines-core/common/src/channels/ConflatedBroadcastChannel.kt

+5-5
Original file line numberDiff line numberDiff line change
@@ -229,12 +229,12 @@ public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
229229

230230
/**
231231
* Sends the value to all subscribed receives and stores this value as the most recent state for
232-
* future subscribers. This implementation always returns `true`.
233-
* It throws exception if the channel [isClosedForSend] (see [close] for details).
232+
* future subscribers. This implementation always returns either successful result
233+
* or closed with an exception.
234234
*/
235-
public override fun offer(element: E): Boolean {
236-
offerInternal(element)?.let { throw it.sendException }
237-
return true
235+
public override fun trySend(element: E): ChannelResult<Unit> {
236+
offerInternal(element)?.let { return ChannelResult.closed(it.sendException) }
237+
return ChannelResult.success(Unit)
238238
}
239239

240240
@Suppress("UNCHECKED_CAST")

Diff for: kotlinx-coroutines-core/common/test/channels/BasicOperationsTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ class BasicOperationsTest : TestBase() {
142142
val result = channel.receiveCatching()
143143
assertEquals(1, result.getOrThrow())
144144
assertEquals(1, result.getOrNull())
145-
assertTrue(ChannelResult.value(1) == result)
145+
assertTrue(ChannelResult.success(1) == result)
146146

147147
expect(3)
148148
launch {

Diff for: kotlinx-coroutines-core/common/test/channels/ChannelReceiveCatchingTest.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ class ChannelReceiveCatchingTest : TestBase() {
4545
assertEquals(1, element.getOrThrow())
4646
assertEquals(1, element.getOrNull())
4747
assertEquals("Value(1)", element.toString())
48-
assertTrue(ChannelResult.value(1) == element) // Don't box
48+
assertTrue(ChannelResult.success(1) == element) // Don't box
4949
assertFalse(element.isFailure)
5050
assertFalse(element.isClosed)
5151

@@ -54,7 +54,7 @@ class ChannelReceiveCatchingTest : TestBase() {
5454
assertNull(nullElement.getOrThrow())
5555
assertNull(nullElement.getOrNull())
5656
assertEquals("Value(null)", nullElement.toString())
57-
assertTrue(ChannelResult.value(null) == nullElement) // Don't box
57+
assertTrue(ChannelResult.success(null) == nullElement) // Don't box
5858
assertFalse(element.isFailure)
5959
assertFalse(element.isClosed)
6060

@@ -113,7 +113,7 @@ class ChannelReceiveCatchingTest : TestBase() {
113113
fun testReceiveResultChannel() = runTest {
114114
val channel = Channel<ChannelResult<UInt>>()
115115
launch {
116-
channel.send(ChannelResult.value(1u))
116+
channel.send(ChannelResult.success(1u))
117117
channel.send(ChannelResult.closed(TestException1()))
118118
channel.close(TestException2())
119119
}

Diff for: kotlinx-coroutines-core/common/test/channels/TestChannelKind.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,9 @@ private class ChannelViaBroadcast<E>(
4444
override suspend fun receive(): E = sub.receive()
4545
override suspend fun receiveOrNull(): E? = sub.receiveOrNull()
4646
override suspend fun receiveCatching(): ChannelResult<E> = sub.receiveCatching()
47-
override fun poll(): E? = sub.poll()
4847
override fun iterator(): ChannelIterator<E> = sub.iterator()
49-
48+
override fun tryReceive(): ChannelResult<E> = sub.tryReceive()
49+
5050
override fun cancel(cause: CancellationException?) = sub.cancel(cause)
5151

5252
// implementing hidden method anyway, so can cast to an internal class

Diff for: kotlinx-coroutines-core/jvm/src/channels/Actor.kt

+5
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,11 @@ private class LazyActorCoroutine<E>(
164164
return super.offer(element)
165165
}
166166

167+
override fun trySend(element: E): ChannelResult<Unit> {
168+
start()
169+
return super.trySend(element)
170+
}
171+
167172
override fun close(cause: Throwable?): Boolean {
168173
// close the channel _first_
169174
val closed = super.close(cause)

0 commit comments

Comments
 (0)