Skip to content

Commit 79975c5

Browse files
committed
Deprecate receiveOrNull and onReceiveOrNull and introduce corresponding extensions with the same semantic and generic ': Any' bound. Rename ReceiveResult to ValueOrClosed for consistency.
1 parent f8a6880 commit 79975c5

File tree

8 files changed

+97
-58
lines changed

8 files changed

+97
-58
lines changed

common/kotlinx-coroutines-core-common/src/channels/AbstractChannel.kt

+14-14
Original file line numberDiff line numberDiff line change
@@ -651,7 +651,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
651651
}
652652

653653
@Suppress("UNCHECKED_CAST")
654-
public final override suspend fun receiveOrClosed(): ReceiveResult<E> {
654+
public final override suspend fun receiveOrClosed(): ValueOrClosed<E> {
655655
// fast path -- try poll non-blocking
656656
val result = pollInternal()
657657
if (result !== POLL_FAILED) {
@@ -662,7 +662,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
662662
}
663663

664664
@Suppress("UNCHECKED_CAST")
665-
private suspend fun receiveOrClosedSuspend(): ReceiveResult<E> = suspendAtomicCancellableCoroutine(holdCancellability = true) sc@{ cont ->
665+
private suspend fun receiveOrClosedSuspend(): ValueOrClosed<E> = suspendAtomicCancellableCoroutine(holdCancellability = true) sc@{ cont ->
666666
val receive = ReceiveElement<E>(cont as CancellableContinuation<Any?>, ResumeMode.RECEIVE_RESULT)
667667
while (true) {
668668
if (enqueueReceive(receive)) {
@@ -827,15 +827,15 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
827827
}
828828
}
829829

830-
override val onReceiveOrClosed: SelectClause1<ReceiveResult<E>>
831-
get() = object : SelectClause1<ReceiveResult<E>> {
832-
override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (ReceiveResult<E>) -> R) {
830+
override val onReceiveOrClosed: SelectClause1<ValueOrClosed<E>>
831+
get() = object : SelectClause1<ValueOrClosed<E>> {
832+
override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (ValueOrClosed<E>) -> R) {
833833
registerSelectReceiveOrClosed(select, block)
834834
}
835835
}
836836

837837
@Suppress("UNCHECKED_CAST")
838-
private fun <R> registerSelectReceiveOrClosed(select: SelectInstance<R>, block: suspend (ReceiveResult<E>) -> R) {
838+
private fun <R> registerSelectReceiveOrClosed(select: SelectInstance<R>, block: suspend (ValueOrClosed<E>) -> R) {
839839
while (true) {
840840
if (select.isSelected) return
841841
if (isEmpty) {
@@ -846,11 +846,11 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
846846
pollResult === ALREADY_SELECTED -> return
847847
pollResult === POLL_FAILED -> {} // retry
848848
pollResult is Closed<*> -> {
849-
block.startCoroutineUnintercepted(ReceiveResult.closed(pollResult.receiveException), select.completion)
849+
block.startCoroutineUnintercepted(ValueOrClosed.closed(pollResult.receiveException), select.completion)
850850
}
851851
else -> {
852852
// selected successfully
853-
block.startCoroutineUnintercepted(ReceiveResult.value(pollResult as E), select.completion)
853+
block.startCoroutineUnintercepted(ValueOrClosed.value(pollResult as E), select.completion)
854854
return
855855
}
856856
}
@@ -970,7 +970,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
970970
@Suppress("IMPLICIT_CAST_TO_ANY")
971971
override fun tryResumeReceive(value: E, idempotent: Any?): Any? {
972972
val resumeValue = when (resumeMode) {
973-
ResumeMode.RECEIVE_RESULT -> ReceiveResult.value(value)
973+
ResumeMode.RECEIVE_RESULT -> ValueOrClosed.value(value)
974974
else -> value
975975
}
976976
return cont.tryResume(resumeValue, idempotent)
@@ -1041,15 +1041,15 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
10411041
@Suppress("UNCHECKED_CAST")
10421042
override fun completeResumeReceive(token: Any) {
10431043
val value: E = (if (token === NULL_VALUE) null else token) as E
1044-
block.startCoroutine(if (mode == ResumeMode.RECEIVE_RESULT) ReceiveResult.value(value) else value, select.completion)
1044+
block.startCoroutine(if (mode == ResumeMode.RECEIVE_RESULT) ValueOrClosed.value(value) else value, select.completion)
10451045
}
10461046

10471047
override fun resumeReceiveClosed(closed: Closed<*>) {
10481048
if (!select.trySelect(null)) return
10491049

10501050
when (mode) {
10511051
ResumeMode.THROW_ON_CLOSE -> select.resumeSelectCancellableWithException(closed.receiveException)
1052-
ResumeMode.RECEIVE_RESULT -> block.startCoroutine(ReceiveResult.closed<R>(closed.receiveException), select.completion)
1052+
ResumeMode.RECEIVE_RESULT -> block.startCoroutine(ValueOrClosed.closed<R>(closed.receiveException), select.completion)
10531053
ResumeMode.NULL_ON_CLOSE -> if (closed.closeCause == null) {
10541054
block.startCoroutine(null, select.completion)
10551055
} else {
@@ -1161,11 +1161,11 @@ private abstract class Receive<in E> : LockFreeLinkedListNode(), ReceiveOrClosed
11611161
}
11621162

11631163
@Suppress("NOTHING_TO_INLINE", "UNCHECKED_CAST")
1164-
private inline fun <E> Any?.toResult(): ReceiveResult<E> =
1165-
if (this is Closed<*>) ReceiveResult.closed(receiveException) else ReceiveResult.value(this as E)
1164+
private inline fun <E> Any?.toResult(): ValueOrClosed<E> =
1165+
if (this is Closed<*>) ValueOrClosed.closed(receiveException) else ValueOrClosed.value(this as E)
11661166

11671167
@Suppress("NOTHING_TO_INLINE")
1168-
private inline fun <E> Closed<*>.toResult(): ReceiveResult<E> = ReceiveResult.closed(receiveException)
1168+
private inline fun <E> Closed<*>.toResult(): ValueOrClosed<E> = ValueOrClosed.closed(receiveException)
11691169

11701170
// Marker for receive, receiveOrNull and receiveOrClosed
11711171
private enum class ResumeMode {

common/kotlinx-coroutines-core-common/src/channels/Channel.kt

+21-19
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import kotlinx.coroutines.channels.Channel.Factory.RENDEZVOUS
1212
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
1313
import kotlinx.coroutines.selects.*
1414
import kotlin.jvm.*
15+
import kotlin.internal.*
1516

1617
/**
1718
* Sender's interface to [Channel].
@@ -188,10 +189,6 @@ public interface ReceiveChannel<out E> {
188189
public val onReceive: SelectClause1<E>
189190

190191
/**
191-
* Retrieves and removes the element from this channel suspending the caller while this channel [isEmpty]
192-
* or returns `null` if the channel is [closed][isClosedForReceive] without cause
193-
* or throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
194-
*
195192
* This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
196193
* function is suspended, this function immediately resumes with [CancellationException].
197194
*
@@ -207,12 +204,14 @@ public interface ReceiveChannel<out E> {
207204
* This function can be used in [select] invocation with [onReceiveOrNull] clause.
208205
* Use [poll] to try receiving from this channel without waiting.
209206
*
210-
* **Note: This is an obsolete api.**
211-
* This function will be replaced with `receiveOrClosed: ReceiveResult<E>` and
212-
* extension `suspend fun <E: Any> ReceiveChannel<E>.receiveOrNull(): E?`
213-
* It is obsolete because it does not distinguish closed channel and null elements.
207+
* This function is deprecated because it does not distinguish closed channel and null elements.
208+
* Use [receiveOrClosed] or [receiveOrNull] extension method.
214209
*/
215210
@ObsoleteCoroutinesApi
211+
@Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
212+
@LowPriorityInOverloadResolution
213+
@Deprecated(message = "Deprecated in favor of receiveOrClosed and receiveOrNull extension",
214+
level = DeprecationLevel.WARNING, replaceWith = ReplaceWith("receiveOrClosed"))
216215
public suspend fun receiveOrNull(): E?
217216

218217
/**
@@ -224,12 +223,15 @@ public interface ReceiveChannel<out E> {
224223
* **Note: This is an experimental api.** This function may be replaced with a better on in the future.
225224
*/
226225
@ExperimentalCoroutinesApi
226+
@ObsoleteCoroutinesApi
227+
@Deprecated(message = "Deprecated in favor of onReceiveOrClosed and onReceiveOrNull extension",
228+
level = DeprecationLevel.WARNING, replaceWith = ReplaceWith("onReceiveOrClosed"))
227229
public val onReceiveOrNull: SelectClause1<E?>
228230

229231
/**
230232
* Retrieves and removes the element from this channel suspending the caller while this channel [isEmpty].
231-
* This method returns [ReceiveResult] with a value if element was successfully retrieved from the channel
232-
* or [ReceiveResult] with close cause if channel was closed.
233+
* This method returns [ValueOrClosed] with a value if element was successfully retrieved from the channel
234+
* or [ValueOrClosed] with close cause if channel was closed.
233235
*
234236
* This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
235237
* function is suspended, this function immediately resumes with [CancellationException].
@@ -247,15 +249,15 @@ public interface ReceiveChannel<out E> {
247249
* Use [poll] to try receiving from this channel without waiting.
248250
*/
249251
@ExperimentalCoroutinesApi
250-
public suspend fun receiveOrClosed(): ReceiveResult<E>
252+
public suspend fun receiveOrClosed(): ValueOrClosed<E>
251253

252254
/**
253-
* Clause for [select] expression of [receiveOrClosed] suspending function that selects with the [ReceiveResult] with a value
254-
* that is received from the channel or selects with [ReceiveResult] with a close cause if the channel
255+
* Clause for [select] expression of [receiveOrClosed] suspending function that selects with the [ValueOrClosed] with a value
256+
* that is received from the channel or selects with [ValueOrClosed] with a close cause if the channel
255257
* [isClosedForReceive].
256258
*/
257259
@ExperimentalCoroutinesApi
258-
public val onReceiveOrClosed: SelectClause1<ReceiveResult<E>>
260+
public val onReceiveOrClosed: SelectClause1<ValueOrClosed<E>>
259261

260262
/**
261263
* Retrieves and removes the element from this channel, or returns `null` if this channel [isEmpty]
@@ -304,7 +306,7 @@ public interface ReceiveChannel<out E> {
304306
* that encapsulates either successfully received element of type [T] from the channel or a close cause.
305307
*/
306308
@Suppress("NON_PUBLIC_PRIMARY_CONSTRUCTOR_OF_INLINE_CLASS")
307-
public inline class ReceiveResult<out T>
309+
public inline class ValueOrClosed<out T>
308310
internal constructor(private val holder: Any?) {
309311
/**
310312
* Returns `true` if this instance represents received element.
@@ -359,12 +361,12 @@ internal constructor(private val holder: Any?) {
359361

360362
internal companion object {
361363
@Suppress("NOTHING_TO_INLINE")
362-
internal inline fun <E> value(value: E): ReceiveResult<E> =
363-
ReceiveResult(value)
364+
internal inline fun <E> value(value: E): ValueOrClosed<E> =
365+
ValueOrClosed(value)
364366

365367
@Suppress("NOTHING_TO_INLINE")
366-
internal inline fun <E> closed(cause: Throwable): ReceiveResult<E> =
367-
ReceiveResult(Closed(cause))
368+
internal inline fun <E> closed(cause: Throwable): ValueOrClosed<E> =
369+
ValueOrClosed(Closed(cause))
368370
}
369371
}
370372

common/kotlinx-coroutines-core-common/src/channels/Channels.common.kt

+36
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
package kotlinx.coroutines.channels
88

99
import kotlinx.coroutines.*
10+
import kotlinx.coroutines.selects.*
1011
import kotlin.coroutines.*
1112
import kotlin.jvm.*
1213

@@ -32,6 +33,41 @@ public inline fun <E, R> BroadcastChannel<E>.consume(block: ReceiveChannel<E>.()
3233
}
3334
}
3435

36+
/**
37+
* Retrieves and removes the element from this channel suspending the caller while this channel [isEmpty]
38+
* or returns `null` if the channel is [closed][Channel.isClosedForReceive].
39+
*
40+
* This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
41+
* function is suspended, this function immediately resumes with [CancellationException].
42+
*
43+
* *Cancellation of suspended receive is atomic* -- when this function
44+
* throws [CancellationException] it means that the element was not retrieved from this channel.
45+
* As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may
46+
* continue to execute even after it was cancelled from the same thread in the case when this receive operation
47+
* was already resumed and the continuation was posted for execution to the thread's queue.
48+
*
49+
* Note, that this function does not check for cancellation when it is not suspended.
50+
* Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
51+
*/
52+
@Suppress("EXTENSION_SHADOWED_BY_MEMBER")
53+
@ExperimentalCoroutinesApi
54+
public suspend fun <E: Any> Channel<E>.receiveOrNull(): E? {
55+
@Suppress("DEPRECATION", "UNCHECKED_CAST")
56+
return (this as Channel<E?>).receiveOrNull()
57+
}
58+
59+
/**
60+
* Clause for [select] expression of [receiveOrNull] suspending function that selects with the element that
61+
* is received from the channel or selects with `null` if the channel
62+
* [isClosedForReceive][ReceiveChannel.isClosedForReceive] without cause. The [select] invocation fails with
63+
* the original [close][SendChannel.close] cause exception if the channel has _failed_.
64+
**/
65+
@ExperimentalCoroutinesApi
66+
public fun <E: Any> Channel<E>.onReceiveOrNull(): SelectClause1<E?> {
67+
@Suppress("DEPRECATION", "UNCHECKED_CAST")
68+
return (this as Channel<E?>).onReceiveOrNull
69+
}
70+
3571
/**
3672
* Subscribes to this [BroadcastChannel] and performs the specified action for each received element.
3773
*

common/kotlinx-coroutines-core-common/test/channels/BasicOperationsTest.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ class BasicOperationsTest : TestBase() {
126126
val result = channel.receiveOrClosed()
127127
assertEquals(1, result.value)
128128
assertEquals(1, result.valueOrNull)
129-
assertTrue(ReceiveResult.value(1) == result)
129+
assertTrue(ValueOrClosed.value(1) == result)
130130

131131
expect(3)
132132
launch {
@@ -139,7 +139,7 @@ class BasicOperationsTest : TestBase() {
139139
assertTrue(closed.isClosed)
140140
assertTrue(closed.closeCause is ClosedReceiveChannelException)
141141
assertFailsWith<ClosedReceiveChannelException> { closed.valueOrThrow }
142-
assertTrue(ReceiveResult.closed<Int>(closed.closeCause) == closed)
142+
assertTrue(ValueOrClosed.closed<Int>(closed.closeCause) == closed)
143143
finish(6)
144144
}
145145

common/kotlinx-coroutines-core-common/test/channels/ChannelReceiveOrClosedTest.kt

+5-5
Original file line numberDiff line numberDiff line change
@@ -46,15 +46,15 @@ class ChannelReceiveOrClosedTest : TestBase() {
4646
assertEquals(1, element.value)
4747
assertEquals(1, element.valueOrNull)
4848
assertEquals("Value(1)", element.toString())
49-
assertTrue(ReceiveResult.value(1) == element) // Don't box
49+
assertTrue(ValueOrClosed.value(1) == element) // Don't box
5050

5151
expect(4)
5252
val nullElement = channel.receiveOrClosed()
5353
assertTrue(nullElement.isValue)
5454
assertNull(nullElement.value)
5555
assertNull(nullElement.valueOrNull)
5656
assertEquals("Value(null)", nullElement.toString())
57-
assertTrue(ReceiveResult.value(null) == nullElement) // Don't box
57+
assertTrue(ValueOrClosed.value(null) == nullElement) // Don't box
5858

5959
expect(5)
6060
val closed = channel.receiveOrClosed()
@@ -107,10 +107,10 @@ class ChannelReceiveOrClosedTest : TestBase() {
107107
@Test
108108
@ExperimentalUnsignedTypes
109109
fun testReceiveResultChannel() = runTest {
110-
val channel = Channel<ReceiveResult<UInt>>()
110+
val channel = Channel<ValueOrClosed<UInt>>()
111111
launch {
112-
channel.send(ReceiveResult.value(1u))
113-
channel.send(ReceiveResult.closed(TestException1()))
112+
channel.send(ValueOrClosed.value(1u))
113+
channel.send(ValueOrClosed.closed(TestException1()))
114114
channel.close(TestException2())
115115
}
116116

common/kotlinx-coroutines-core-common/test/channels/TestChannelKind.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ private class ChannelViaBroadcast<E>(
5757

5858
override suspend fun receive(): E = sub.receive()
5959
override suspend fun receiveOrNull(): E? = sub.receiveOrNull()
60-
override suspend fun receiveOrClosed(): ReceiveResult<E> = sub.receiveOrClosed()
60+
override suspend fun receiveOrClosed(): ValueOrClosed<E> = sub.receiveOrClosed()
6161
override fun poll(): E? = sub.poll()
6262
override fun iterator(): ChannelIterator<E> = sub.iterator()
6363
override fun cancel(): Unit = sub.cancel()
@@ -66,6 +66,6 @@ private class ChannelViaBroadcast<E>(
6666
get() = sub.onReceive
6767
override val onReceiveOrNull: SelectClause1<E?>
6868
get() = sub.onReceiveOrNull
69-
override val onReceiveOrClosed: SelectClause1<ReceiveResult<E>>
69+
override val onReceiveOrClosed: SelectClause1<ValueOrClosed<E>>
7070
get() = sub.onReceiveOrClosed
7171
}

core/kotlinx-coroutines-core/test/channels/TickerChannelCommonTest.kt

+9-12
Original file line numberDiff line numberDiff line change
@@ -113,18 +113,15 @@ class TickerChannelCommonTest(private val channelFactory: Channel) : TestBase()
113113
var sum = 0
114114
var n = 0
115115
whileSelect {
116-
this@averageInTimeWindow.onReceiveOrNull {
117-
when (it) {
118-
null -> {
119-
// Send leftovers and bail out
120-
if (n != 0) send(sum / n.toDouble())
121-
false
122-
}
123-
else -> {
124-
sum += it
125-
++n
126-
true
127-
}
116+
this@averageInTimeWindow.onReceiveOrClosed {
117+
if (it.isClosed) {
118+
// Send leftovers and bail out
119+
if (n != 0) send(sum / n.toDouble())
120+
false
121+
} else {
122+
sum += it.value
123+
++n
124+
true
128125
}
129126
}
130127

docs/select-expression.md

+8-4
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,8 @@ buzz -> 'Buzz!'
163163
### Selecting on close
164164

165165
The [onReceive][ReceiveChannel.onReceive] clause in `select` fails when the channel is closed causing the corresponding
166-
`select` to throw an exception. We can use [onReceiveOrNull][ReceiveChannel.onReceiveOrNull] clause to perform a
166+
`select` to throw an exception. We can use [onReceiveOrNull][ReceiveChannel.onReceiveOrNull]
167+
or [onReceiveOrClosed][ReceiveChannel.onReceiveOrClosed] clause to perform a
167168
specific action when the channel is closed. The following example also shows that `select` is an expression that returns
168169
the result of its selected clause:
169170

@@ -178,14 +179,16 @@ suspend fun selectAorB(a: ReceiveChannel<String>, b: ReceiveChannel<String>): St
178179
else
179180
"a -> '$value'"
180181
}
181-
b.onReceiveOrNull { value ->
182-
if (value == null)
182+
b.onReceiveOrClosed { valueOrClosed ->
183+
if (valueOrClosed.isClosed)
183184
"Channel 'b' is closed"
184185
else
185-
"b -> '$value'"
186+
"b -> '${valueOrClosed.value}'"
186187
}
187188
}
188189
```
190+
Note that [onReceiveOrNull][ReceiveChannel.onReceiveOrNull] is an extension function defined only for channels with non-nullable elements.
191+
189192

190193
</div>
191194

@@ -557,6 +560,7 @@ Channel was closed
557560
[ReceiveChannel.receive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/receive.html
558561
[ReceiveChannel.onReceive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/on-receive.html
559562
[ReceiveChannel.onReceiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/on-receive-or-null.html
563+
[ReceiveChannel.onReceiveOrClosed]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/on-receive-or-closed.html
560564
[SendChannel.send]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/send.html
561565
[SendChannel.onSend]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/on-send.html
562566
<!--- INDEX kotlinx.coroutines.selects -->

0 commit comments

Comments
 (0)