Skip to content

Commit 183d5ab

Browse files
committed
Embrace new channel API
* Introduce trySendBlocking and deprecate sendBlocking * Use it in callbackFlow example * Stabilize callbackFlow and channelFlow as they finally have error-safe API * Irrelevant: migrate from deprecated stdlib API to be able to build with Kotlin 1.5
1 parent 12f4dbc commit 183d5ab

File tree

9 files changed

+97
-28
lines changed

9 files changed

+97
-28
lines changed

kotlinx-coroutines-core/api/kotlinx-coroutines-core.api

+1
Original file line numberDiff line numberDiff line change
@@ -727,6 +727,7 @@ public final class kotlinx/coroutines/channels/ChannelsKt {
727727
public static final synthetic fun toMutableList (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
728728
public static final fun toMutableSet (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
729729
public static final synthetic fun toSet (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
730+
public static final fun trySendBlocking (Lkotlinx/coroutines/channels/SendChannel;Ljava/lang/Object;)Ljava/lang/Object;
730731
public static final synthetic fun withIndex (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/channels/ReceiveChannel;
731732
public static synthetic fun withIndex$default (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lkotlinx/coroutines/channels/ReceiveChannel;
732733
public static final synthetic fun zip (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlinx/coroutines/channels/ReceiveChannel;)Lkotlinx/coroutines/channels/ReceiveChannel;

kotlinx-coroutines-core/common/src/flow/Builders.kt

+4-7
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,6 @@ public fun <T> flowViaChannel(
261261
* }
262262
* ```
263263
*/
264-
@ExperimentalCoroutinesApi
265264
public fun <T> channelFlow(@BuilderInference block: suspend ProducerScope<T>.() -> Unit): Flow<T> =
266265
ChannelFlowBuilder(block)
267266

@@ -302,11 +301,10 @@ public fun <T> channelFlow(@BuilderInference block: suspend ProducerScope<T>.()
302301
* override fun onNextValue(value: T) {
303302
* // To avoid blocking you can configure channel capacity using
304303
* // either buffer(Channel.CONFLATED) or buffer(Channel.UNLIMITED) to avoid overfill
305-
* try {
306-
* sendBlocking(value)
307-
* } catch (e: Exception) {
308-
* // Handle exception from the channel: failure in flow or premature closing
309-
* }
304+
* trySendBlocking(value)
305+
* .onFailure { throwable ->
306+
* // Downstream has been cancelled or failed, can log here
307+
* }
310308
* }
311309
* override fun onApiError(cause: Throwable) {
312310
* cancel(CancellationException("API Error", cause))
@@ -327,7 +325,6 @@ public fun <T> channelFlow(@BuilderInference block: suspend ProducerScope<T>.()
327325
* > `awaitClose` block can be called at any time due to asynchronous nature of cancellation, even
328326
* > concurrently with the call of the callback.
329327
*/
330-
@ExperimentalCoroutinesApi
331328
public fun <T> callbackFlow(@BuilderInference block: suspend ProducerScope<T>.() -> Unit): Flow<T> = CallbackFlowBuilder(block)
332329

333330
// ChannelFlow implementation that is the first in the chain of flow operations and introduces (builds) a flow

kotlinx-coroutines-core/jvm/src/channels/Channels.kt

+46-4
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,25 @@ package kotlinx.coroutines.channels
1010
import kotlinx.coroutines.*
1111

1212
/**
13-
* Adds [element] into to this channel, **blocking** the caller while this channel is full,
14-
* or throws exception if the channel [Channel.isClosedForSend] (see [Channel.close] for details).
13+
* ### Deprecation note.
1514
*
16-
* This is a way to call [Channel.send] method inside a blocking code using [runBlocking],
17-
* so this function should not be used from coroutine.
15+
* This method is deprecated in the favour of [trySendBlocking].
16+
*
17+
* `sendBlocking` is considered to be dangerous primitive -- it throws
18+
* if the channel was closed or, more commonly, cancelled.
19+
* Cancellation exceptions are not ignored by non-blocking code and frequently
20+
* trigger internal failures.
21+
*
22+
* These bugs were hard-to-spot during code review and also forced users to write
23+
* their own wrappers around `sendBlocking`, so it was decided to deprecate
24+
* this function and provide a more explicit primitive instead.
1825
*/
26+
@Deprecated(
27+
level = DeprecationLevel.WARNING,
28+
message = "Deprecated in the favour of 'trySendBlocking'. " +
29+
"Consider handle the result of 'trySendBlocking' explicitly and rethrow exception if necessary",
30+
replaceWith = ReplaceWith("trySendBlocking(element)")
31+
)
1932
public fun <E> SendChannel<E>.sendBlocking(element: E) {
2033
// fast path
2134
if (offer(element))
@@ -25,3 +38,32 @@ public fun <E> SendChannel<E>.sendBlocking(element: E) {
2538
send(element)
2639
}
2740
}
41+
42+
/**
43+
* Adds [element] into to this channel, **blocking** the caller while this channel is full,
44+
* and returning either [successful][ChannelResult.isSuccess] result when the element was added, or
45+
* failed result representing closed channel with a corresponding exception.
46+
*
47+
* This is a way to call [Channel.send] method in a safe manner inside a blocking code using [runBlocking] and catching,
48+
* so this function should not be used from coroutine.
49+
*
50+
* Example of usage:
51+
* ```
52+
* // From callback API
53+
* channel.trySendBlocking(element)
54+
* .onSuccess { /* request next element or debug log */ }
55+
* .onFailure { t: Throwable? -> /* throw or log */ }
56+
*
57+
* ```
58+
* For this operation it is guaranteed that [failure][ChannelResult.failed] always contains an exception in it.
59+
*/
60+
public fun <E> SendChannel<E>.trySendBlocking(element: E): ChannelResult<Unit> {
61+
// fast path
62+
trySend(element).onSuccess { return ChannelResult.success(Unit) }
63+
// slow path
64+
return runBlocking {
65+
val r = runCatching { send(element) }
66+
if (r.isSuccess) ChannelResult.success(Unit)
67+
else ChannelResult.closed(r.exceptionOrNull())
68+
}
69+
}

kotlinx-coroutines-core/jvm/test/VirtualTimeSource.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ internal class VirtualTimeSource(
142142
}
143143

144144
private fun minParkedTill(): Long =
145-
threads.values.map { if (it.permit) NOT_PARKED else it.parkedTill }.min() ?: NOT_PARKED
145+
threads.values.map { if (it.permit) NOT_PARKED else it.parkedTill }.minOrNull() ?: NOT_PARKED
146146

147147
@Synchronized
148148
fun shutdown() {

kotlinx-coroutines-core/jvm/test/channels/ChannelsJvmTest.kt

+31-2
Original file line numberDiff line numberDiff line change
@@ -11,17 +11,46 @@ import kotlin.test.*
1111
class ChannelsJvmTest : TestBase() {
1212

1313
@Test
14-
fun testBlocking() {
14+
fun testTrySendBlocking() {
1515
val ch = Channel<Int>()
1616
val sum = GlobalScope.async {
1717
var sum = 0
1818
ch.consumeEach { sum += it }
1919
sum
2020
}
2121
repeat(10) {
22-
ch.sendBlocking(it)
22+
assertTrue(ch.trySendBlocking(it).isSuccess)
2323
}
2424
ch.close()
2525
assertEquals(45, runBlocking { sum.await() })
2626
}
27+
28+
// Uncomment lines when migrated to 1.5, these are bugs in inline classes codegen
29+
@Test
30+
fun testTrySendBlockingClosedChannel() {
31+
run {
32+
val channel = Channel<Unit>().also { it.close() }
33+
channel.trySendBlocking(Unit)
34+
.onSuccess { expectUnreached() }
35+
.onFailure { assertTrue(it is ClosedSendChannelException) }
36+
// .also { assertTrue { it.isClosed } }
37+
}
38+
39+
run {
40+
val channel = Channel<Unit>().also { it.close(TestException()) }
41+
channel.trySendBlocking(Unit)
42+
.onSuccess { expectUnreached() }
43+
.onFailure { assertTrue(it is TestException) }
44+
// .also { assertTrue { it.isClosed } }
45+
}
46+
47+
run {
48+
val channel = Channel<Unit>().also { it.cancel(TestCancellationException()) }
49+
channel.trySendBlocking(Unit)
50+
.onSuccess { expectUnreached() }
51+
.onFailure { assertTrue(it is TestCancellationException) }
52+
// .also { assertTrue { it.isClosed } }
53+
}
54+
}
55+
2756
}

kotlinx-coroutines-core/jvm/test/flow/StateFlowStressTest.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ class StateFlowStressTest : TestBase() {
6262
for (second in 1..nSeconds) {
6363
delay(1000)
6464
val cs = collected.map { it.sum() }
65-
println("$second: emitted=${emitted.sum()}, collected=${cs.min()}..${cs.max()}")
65+
println("$second: emitted=${emitted.sum()}, collected=${cs.minOrNull()}..${cs.maxOrNull()}")
6666
}
6767
emitters.cancelAndJoin()
6868
collectors.cancelAndJoin()
@@ -77,4 +77,4 @@ class StateFlowStressTest : TestBase() {
7777

7878
@Test
7979
fun testTenEmittersAndCollectors() = stress(10, 10)
80-
}
80+
}

kotlinx-coroutines-core/jvm/test/scheduling/SchedulerTestBase.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ abstract class SchedulerTestBase : TestBase() {
4949

5050
private fun maxSequenceNumber(): Int? {
5151
return Thread.getAllStackTraces().keys.asSequence().filter { it is CoroutineScheduler.Worker }
52-
.map { sequenceNumber(it.name) }.max()
52+
.map { sequenceNumber(it.name) }.maxOrNull()
5353
}
5454

5555
private fun sequenceNumber(threadName: String): Int {
@@ -105,4 +105,4 @@ abstract class SchedulerTestBase : TestBase() {
105105
}
106106
}
107107
}
108-
}
108+
}

reactive/kotlinx-coroutines-rx2/src/RxConvert.kt

+5-5
Original file line numberDiff line numberDiff line change
@@ -82,11 +82,11 @@ public fun <T: Any> ObservableSource<T>.asFlow(): Flow<T> = callbackFlow {
8282
override fun onComplete() { close() }
8383
override fun onSubscribe(d: Disposable) { if (!disposableRef.compareAndSet(null, d)) d.dispose() }
8484
override fun onNext(t: T) {
85-
try {
86-
sendBlocking(t)
87-
} catch (ignored: Throwable) { // TODO: Replace when this issue is fixed: https://github.com/Kotlin/kotlinx.coroutines/issues/974
88-
// Is handled by the downstream flow
89-
}
85+
/*
86+
* Channel was closed by the downstream, so the exception (if any)
87+
* also was handled by the same downstream
88+
*/
89+
trySendBlocking(t)
9090
}
9191
override fun onError(e: Throwable) { close(e) }
9292
}

reactive/kotlinx-coroutines-rx3/src/RxConvert.kt

+5-5
Original file line numberDiff line numberDiff line change
@@ -82,11 +82,11 @@ public fun <T: Any> ObservableSource<T>.asFlow(): Flow<T> = callbackFlow {
8282
override fun onComplete() { close() }
8383
override fun onSubscribe(d: Disposable) { if (!disposableRef.compareAndSet(null, d)) d.dispose() }
8484
override fun onNext(t: T) {
85-
try {
86-
sendBlocking(t)
87-
} catch (ignored: Throwable) { // TODO: Replace when this issue is fixed: https://github.com/Kotlin/kotlinx.coroutines/issues/974
88-
// Is handled by the downstream flow
89-
}
85+
/*
86+
* Channel was closed by the downstream, so the exception (if any)
87+
* also was handled by the same downstream
88+
*/
89+
trySendBlocking(t)
9090
}
9191
override fun onError(e: Throwable) { close(e) }
9292
}

0 commit comments

Comments
 (0)