Skip to content

Embrace new channel API #2643

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,7 @@ public final class kotlinx/coroutines/channels/ChannelsKt {
public static final synthetic fun toMutableList (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun toMutableSet (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final synthetic fun toSet (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun trySendBlocking (Lkotlinx/coroutines/channels/SendChannel;Ljava/lang/Object;)Ljava/lang/Object;
public static final synthetic fun withIndex (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/channels/ReceiveChannel;
public static synthetic fun withIndex$default (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lkotlinx/coroutines/channels/ReceiveChannel;
public static final synthetic fun zip (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlinx/coroutines/channels/ReceiveChannel;)Lkotlinx/coroutines/channels/ReceiveChannel;
Expand Down
11 changes: 4 additions & 7 deletions kotlinx-coroutines-core/common/src/flow/Builders.kt
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,6 @@ public fun <T> flowViaChannel(
* }
* ```
*/
@ExperimentalCoroutinesApi
public fun <T> channelFlow(@BuilderInference block: suspend ProducerScope<T>.() -> Unit): Flow<T> =
ChannelFlowBuilder(block)

Expand Down Expand Up @@ -302,11 +301,10 @@ public fun <T> channelFlow(@BuilderInference block: suspend ProducerScope<T>.()
* override fun onNextValue(value: T) {
* // To avoid blocking you can configure channel capacity using
* // either buffer(Channel.CONFLATED) or buffer(Channel.UNLIMITED) to avoid overfill
* try {
* sendBlocking(value)
* } catch (e: Exception) {
* // Handle exception from the channel: failure in flow or premature closing
* }
* trySendBlocking(value)
* .onFailure { throwable ->
* // Downstream has been cancelled or failed, can log here
* }
* }
* override fun onApiError(cause: Throwable) {
* cancel(CancellationException("API Error", cause))
Expand All @@ -327,7 +325,6 @@ public fun <T> channelFlow(@BuilderInference block: suspend ProducerScope<T>.()
* > `awaitClose` block can be called at any time due to asynchronous nature of cancellation, even
* > concurrently with the call of the callback.
*/
@ExperimentalCoroutinesApi
public fun <T> callbackFlow(@BuilderInference block: suspend ProducerScope<T>.() -> Unit): Flow<T> = CallbackFlowBuilder(block)

// ChannelFlow implementation that is the first in the chain of flow operations and introduces (builds) a flow
Expand Down
77 changes: 73 additions & 4 deletions kotlinx-coroutines-core/jvm/src/channels/Channels.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,44 @@ package kotlinx.coroutines.channels
import kotlinx.coroutines.*

/**
* Adds [element] into to this channel, **blocking** the caller while this channel is full,
* or throws exception if the channel [Channel.isClosedForSend] (see [Channel.close] for details).
* **Deprecated** blocking variant of send.
* This method is deprecated in the favour of [trySendBlocking].
*
* This is a way to call [Channel.send] method inside a blocking code using [runBlocking],
* so this function should not be used from coroutine.
* `sendBlocking` is a dangerous primitive &mdash; it throws an exception
* if the channel was closed or, more commonly, cancelled.
* Cancellation exceptions in non-blocking code are unexpected and frequently
* trigger internal failures.
*
* These bugs are hard-to-spot during code review and they forced users to write
* their own wrappers around `sendBlocking`.
* So this function is deprecated and replaced with a more explicit primitive.
*
* The real-world example of broken usage with Firebase:
*
* ```kotlin
* callbackFlow {
* val listener = object : ValueEventListener {
* override fun onDataChange(snapshot: DataSnapshot) {
* // This line may fail and crash the app when the downstream flow is cancelled
* sendBlocking(DataSnapshot(snapshot))
* }
*
* override fun onCancelled(error: DatabaseError) {
* close(error.toException())
* }
* }
*
* firebaseQuery.addValueEventListener(listener)
* awaitClose { firebaseQuery.removeEventListener(listener) }
* }
* ```
*/
@Deprecated(
level = DeprecationLevel.WARNING,
message = "Deprecated in the favour of 'trySendBlocking'. " +
"Consider handling the result of 'trySendBlocking' explicitly and rethrow exception if necessary",
replaceWith = ReplaceWith("trySendBlocking(element)")
)
public fun <E> SendChannel<E>.sendBlocking(element: E) {
// fast path
if (offer(element))
Expand All @@ -25,3 +57,40 @@ public fun <E> SendChannel<E>.sendBlocking(element: E) {
send(element)
}
}

/**
* Adds [element] into to this channel, **blocking** the caller while this channel is full,
* and returning either [successful][ChannelResult.isSuccess] result when the element was added, or
* failed result representing closed channel with a corresponding exception.
*
* This is a way to call [Channel.send] method in a safe manner inside a blocking code using [runBlocking] and catching,
* so this function should not be used from coroutine.
*
* Example of usage:
*
* ```
* // From callback API
* channel.trySendBlocking(element)
* .onSuccess { /* request next element or debug log */ }
* .onFailure { t: Throwable? -> /* throw or log */ }
* ```
*
* For this operation it is guaranteed that [failure][ChannelResult.failed] always contains an exception in it.
*
* @throws [InterruptedException] if the current thread is interrupted during the blocking send operation.
*/
@Throws(InterruptedException::class)
public fun <E> SendChannel<E>.trySendBlocking(element: E): ChannelResult<Unit> {
/*
* Sent successfully -- bail out.
* But failure may indicate either that the channel it full or that
* it is close. Go to slow path on failure to simplify the successful path and
* to materialize default exception.
*/
trySend(element).onSuccess { return ChannelResult.success(Unit) }
return runBlocking {
val r = runCatching { send(element) }
if (r.isSuccess) ChannelResult.success(Unit)
else ChannelResult.closed(r.exceptionOrNull())
}
}
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/jvm/test/VirtualTimeSource.kt
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ internal class VirtualTimeSource(
}

private fun minParkedTill(): Long =
threads.values.map { if (it.permit) NOT_PARKED else it.parkedTill }.min() ?: NOT_PARKED
threads.values.map { if (it.permit) NOT_PARKED else it.parkedTill }.minOrNull() ?: NOT_PARKED

@Synchronized
fun shutdown() {
Expand Down
33 changes: 31 additions & 2 deletions kotlinx-coroutines-core/jvm/test/channels/ChannelsJvmTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,46 @@ import kotlin.test.*
class ChannelsJvmTest : TestBase() {

@Test
fun testBlocking() {
fun testTrySendBlocking() {
val ch = Channel<Int>()
val sum = GlobalScope.async {
var sum = 0
ch.consumeEach { sum += it }
sum
}
repeat(10) {
ch.sendBlocking(it)
assertTrue(ch.trySendBlocking(it).isSuccess)
}
ch.close()
assertEquals(45, runBlocking { sum.await() })
}

// Uncomment lines when migrated to 1.5, these are bugs in inline classes codegen
@Test
fun testTrySendBlockingClosedChannel() {
run {
val channel = Channel<Unit>().also { it.close() }
channel.trySendBlocking(Unit)
.onSuccess { expectUnreached() }
.onFailure { assertTrue(it is ClosedSendChannelException) }
// .also { assertTrue { it.isClosed } }
}

run {
val channel = Channel<Unit>().also { it.close(TestException()) }
channel.trySendBlocking(Unit)
.onSuccess { expectUnreached() }
.onFailure { assertTrue(it is TestException) }
// .also { assertTrue { it.isClosed } }
}

run {
val channel = Channel<Unit>().also { it.cancel(TestCancellationException()) }
channel.trySendBlocking(Unit)
.onSuccess { expectUnreached() }
.onFailure { assertTrue(it is TestCancellationException) }
// .also { assertTrue { it.isClosed } }
}
}

}
4 changes: 2 additions & 2 deletions kotlinx-coroutines-core/jvm/test/flow/StateFlowStressTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class StateFlowStressTest : TestBase() {
for (second in 1..nSeconds) {
delay(1000)
val cs = collected.map { it.sum() }
println("$second: emitted=${emitted.sum()}, collected=${cs.min()}..${cs.max()}")
println("$second: emitted=${emitted.sum()}, collected=${cs.minOrNull()}..${cs.maxOrNull()}")
}
emitters.cancelAndJoin()
collectors.cancelAndJoin()
Expand All @@ -77,4 +77,4 @@ class StateFlowStressTest : TestBase() {

@Test
fun testTenEmittersAndCollectors() = stress(10, 10)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ abstract class SchedulerTestBase : TestBase() {

private fun maxSequenceNumber(): Int? {
return Thread.getAllStackTraces().keys.asSequence().filter { it is CoroutineScheduler.Worker }
.map { sequenceNumber(it.name) }.max()
.map { sequenceNumber(it.name) }.maxOrNull()
}

private fun sequenceNumber(threadName: String): Int {
Expand Down Expand Up @@ -105,4 +105,4 @@ abstract class SchedulerTestBase : TestBase() {
}
}
}
}
}
10 changes: 7 additions & 3 deletions reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,14 @@ public fun <T: Any> ObservableSource<T>.asFlow(): Flow<T> = callbackFlow {
override fun onComplete() { close() }
override fun onSubscribe(d: Disposable) { if (!disposableRef.compareAndSet(null, d)) d.dispose() }
override fun onNext(t: T) {
/*
* Channel was closed by the downstream, so the exception (if any)
* also was handled by the same downstream
*/
try {
sendBlocking(t)
} catch (ignored: Throwable) { // TODO: Replace when this issue is fixed: https://github.com/Kotlin/kotlinx.coroutines/issues/974
// Is handled by the downstream flow
trySendBlocking(t)
} catch (e: InterruptedException) {
// RxJava interrupts the source
}
}
override fun onError(e: Throwable) { close(e) }
Expand Down
10 changes: 7 additions & 3 deletions reactive/kotlinx-coroutines-rx3/src/RxConvert.kt
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,14 @@ public fun <T: Any> ObservableSource<T>.asFlow(): Flow<T> = callbackFlow {
override fun onComplete() { close() }
override fun onSubscribe(d: Disposable) { if (!disposableRef.compareAndSet(null, d)) d.dispose() }
override fun onNext(t: T) {
/*
* Channel was closed by the downstream, so the exception (if any)
* also was handled by the same downstream
*/
try {
sendBlocking(t)
} catch (ignored: Throwable) { // TODO: Replace when this issue is fixed: https://github.com/Kotlin/kotlinx.coroutines/issues/974
// Is handled by the downstream flow
trySendBlocking(t)
} catch (e: InterruptedException) {
// RxJava interrupts the source
}
}
override fun onError(e: Throwable) { close(e) }
Expand Down