Skip to content

Commit c34c1d5

Browse files
committed
Improve documentation, rename await to awaitClose, improve awaitClose implementation
1 parent b3e70a1 commit c34c1d5

File tree

5 files changed

+25
-19
lines changed

5 files changed

+25
-19
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt

+2-2
Original file line numberDiff line numberDiff line change
@@ -718,8 +718,8 @@ public final class kotlinx/coroutines/channels/ConflatedBroadcastChannel : kotli
718718
}
719719

720720
public final class kotlinx/coroutines/channels/ProduceKt {
721-
public static final fun await (Lkotlinx/coroutines/channels/ProducerScope;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
722-
public static synthetic fun await$default (Lkotlinx/coroutines/channels/ProducerScope;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
721+
public static final fun awaitClose (Lkotlinx/coroutines/channels/ProducerScope;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
722+
public static synthetic fun awaitClose$default (Lkotlinx/coroutines/channels/ProducerScope;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
723723
public static final fun produce (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/channels/ReceiveChannel;
724724
public static final fun produce (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/channels/ReceiveChannel;
725725
public static synthetic fun produce$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/channels/ReceiveChannel;

kotlinx-coroutines-core/common/src/channels/Produce.kt

+12-8
Original file line numberDiff line numberDiff line change
@@ -29,25 +29,29 @@ public interface ProducerScope<in E> : CoroutineScope, SendChannel<E> {
2929
* Suspends the current coroutine until the channel is either [closed][SendChannel.close] or [cancelled][ReceiveChannel.cancel]
3030
* and invokes the given [block] before resuming the coroutine.
3131
*
32+
* Note that when producer channel is cancelled this function resumes with cancellation exception,
33+
* so putting the code after calling this function would not lead to its execution in case of cancellation.
34+
* That is why this code takes a lambda parameter.
35+
*
3236
* Example of usage:
3337
* ```
3438
* val callbackEventsStream = produce {
3539
* val disposable = registerChannelInCallback(channel)
36-
* await { disposable.dispose() }
40+
* awaitClose { disposable.dispose() }
3741
* }
3842
* ```
3943
*/
4044
@ExperimentalCoroutinesApi
41-
public suspend fun <T> ProducerScope<T>.await(block: () -> Unit = {}) {
42-
check(kotlin.coroutines.coroutineContext[Job] === this) { "await() can be invoke only from the producer context" }
43-
suspendCancellableCoroutine<Unit> { cont ->
44-
invokeOnClose {
45-
try {
46-
block()
47-
} finally {
45+
public suspend fun <T> ProducerScope<T>.awaitClose(block: () -> Unit = {}) {
46+
check(kotlin.coroutines.coroutineContext[Job] === this) { "awaitClose() can be invoke only from the producer context" }
47+
try {
48+
suspendCancellableCoroutine<Unit> { cont ->
49+
invokeOnClose {
4850
cont.resume(Unit)
4951
}
5052
}
53+
} finally {
54+
block()
5155
}
5256
}
5357

kotlinx-coroutines-core/common/src/flow/Builders.kt

+6-4
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ public fun <T> flowViaChannel(
234234
* on the resulting flow.
235235
*
236236
* This builder ensures thread-safety and context preservation, thus the provided [ProducerScope] can be used concurrently from different contexts.
237-
* The resulting flow will complete as soon as [ProducerScope], to artificially prolong it [await] can be used.
237+
* The resulting flow will complete as soon as [ProducerScope], to artificially prolong it [awaitClose] can be used.
238238
* For more detailed example please refer to [callbackFlow] documentation.
239239
*
240240
* To control backpressure, [bufferSize] is used and matches directly the `capacity` parameter of [Channel] factory.
@@ -283,7 +283,7 @@ public fun <T> channelFlow(bufferSize: Int = 16, @BuilderInference block: suspen
283283
* This builder ensures thread-safety and context preservation, thus the provided [ProducerScope] can be used from any context,
284284
* e.g. from the callback-based API. The flow completes as soon as its scope completes, thus if you are using channel from the
285285
* callback-based API, to artificially prolong scope lifetime and avoid memory-leaks related to unregistered resources,
286-
* [await] extension should be used. [await] argument will be invoked when either flow consumer cancels flow collection
286+
* [awaitClose] extension should be used. [awaitClose] argument will be invoked when either flow consumer cancels flow collection
287287
* or when callback-based API invokes [SendChannel.close] manually.
288288
*
289289
* To control backpressure, [bufferSize] is used and matches directly the `capacity` parameter of [Channel] factory.
@@ -295,10 +295,12 @@ public fun <T> channelFlow(bufferSize: Int = 16, @BuilderInference block: suspen
295295
* fun flowFrom(api: CallbackBasedApi): Flow<T> = callbackFlow {
296296
* val callback = object : Callback { // implementation of some callback interface
297297
* override fun onNextValue(value: T) {
298-
* offer(value) // Note: offer drops value when buffer is full
298+
* // Note: offer drops value when buffer is full
299+
* // Channel.UNLIMITED can be used to avoid overfill
300+
* offer(value)
299301
* }
300302
* override fun onApiError(cause: Throwable) {
301-
* cancel("API Error", CancellationException(cause))
303+
* cancel(CancellationException("API Error", cause))
302304
* }
303305
* override fun onCompleted() = channel.close()
304306
* }

kotlinx-coroutines-core/common/test/channels/ProduceTest.kt

+4-4
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ class ProduceTest : TestBase() {
9999
val parent = Job()
100100
val channel = produce<Int>(parent) {
101101
expect(2)
102-
await { expect(4) }
102+
awaitClose { expect(4) }
103103
}
104104
expect(1)
105105
yield()
@@ -119,7 +119,7 @@ class ProduceTest : TestBase() {
119119
expect(3)
120120
this@produce.cancel()
121121
}
122-
await { expect(4) }
122+
awaitClose { expect(4) }
123123
}
124124
expect(1)
125125
parent.complete()
@@ -132,7 +132,7 @@ class ProduceTest : TestBase() {
132132
val parent = Job()
133133
produce<Int>(parent) {
134134
expect(2)
135-
await { expect(4) }
135+
awaitClose { expect(4) }
136136
}
137137
expect(1)
138138
yield()
@@ -145,7 +145,7 @@ class ProduceTest : TestBase() {
145145
fun testAwaitIllegalState() = runTest {
146146
val channel = produce<Int> { }
147147
@Suppress("RemoveExplicitTypeArguments") // KT-31525
148-
assertFailsWith<IllegalStateException> { (channel as ProducerScope<*>).await<Nothing>() }
148+
assertFailsWith<IllegalStateException> { (channel as ProducerScope<*>).awaitClose<Nothing>() }
149149
}
150150

151151
private suspend fun cancelOnCompletion(coroutineContext: CoroutineContext) = CoroutineScope(coroutineContext).apply {

kotlinx-coroutines-core/common/test/flow/channels/FlowCallbackTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ class FlowCallbackTest : TestBase() {
3838
close()
3939
}
4040
expect(1)
41-
await()
41+
awaitClose()
4242
}
4343

4444
assertEquals(listOf(1), flow.toList())

0 commit comments

Comments
 (0)