Skip to content

Commit 769996d

Browse files
qwwdfsadrecheej
authored andcommitted
Flow cancellation (Kotlin#2028)
Make Flow more cancellation friendly: * flow builder is cancellable by default * New unambiguous currentCoroutineContext top-level function * New Flow.cancellable() operator Fixes Kotlin#2026
1 parent 54a9b77 commit 769996d

File tree

15 files changed

+179
-16
lines changed

15 files changed

+179
-16
lines changed

kotlinx-coroutines-core/api/kotlinx-coroutines-core.api

+2
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,7 @@ public final class kotlinx/coroutines/CoroutineScopeKt {
214214
public static synthetic fun cancel$default (Lkotlinx/coroutines/CoroutineScope;Ljava/lang/String;Ljava/lang/Throwable;ILjava/lang/Object;)V
215215
public static synthetic fun cancel$default (Lkotlinx/coroutines/CoroutineScope;Ljava/util/concurrent/CancellationException;ILjava/lang/Object;)V
216216
public static final fun coroutineScope (Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
217+
public static final fun currentCoroutineContext (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
217218
public static final fun ensureActive (Lkotlinx/coroutines/CoroutineScope;)V
218219
public static final fun isActive (Lkotlinx/coroutines/CoroutineScope;)Z
219220
public static final fun plus (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/CoroutineScope;
@@ -873,6 +874,7 @@ public final class kotlinx/coroutines/flow/FlowKt {
873874
public static final fun buffer (Lkotlinx/coroutines/flow/Flow;I)Lkotlinx/coroutines/flow/Flow;
874875
public static synthetic fun buffer$default (Lkotlinx/coroutines/flow/Flow;IILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
875876
public static final fun callbackFlow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
877+
public static final fun cancellable (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
876878
public static final fun catch (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
877879
public static final fun channelFlow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
878880
public static final fun collect (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;

kotlinx-coroutines-core/common/src/CoroutineScope.kt

+16
Original file line numberDiff line numberDiff line change
@@ -239,3 +239,19 @@ public fun CoroutineScope.cancel(message: String, cause: Throwable? = null): Uni
239239
* ```
240240
*/
241241
public fun CoroutineScope.ensureActive(): Unit = coroutineContext.ensureActive()
242+
243+
244+
/**
245+
* Returns the current [CoroutineContext] retrieved by using [kotlin.coroutines.coroutineContext].
246+
* This function is an alias to avoid name clash with [CoroutineScope.coroutineContext] in a receiver position:
247+
*
248+
* ```
249+
* launch { // this: CoroutineScope
250+
* val flow = flow<Unit> {
251+
* coroutineContext // Resolves into the context of outer launch, which is incorrect, see KT-38033
252+
* currentCoroutineContext() // Retrieves actual context where the flow is collected
253+
* }
254+
* }
255+
* ```
256+
*/
257+
public suspend inline fun currentCoroutineContext(): CoroutineContext = coroutineContext

kotlinx-coroutines-core/common/src/flow/Builders.kt

+4-8
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import kotlinx.coroutines.flow.internal.unsafeFlow as flow
3333
*
3434
* fibonacci().take(100).collect { println(it) }
3535
* ```
36+
* Emissions from [flow] builder are [cancellable] by default.
3637
*
3738
* `emit` should happen strictly in the dispatchers of the [block] in order to preserve the flow context.
3839
* For example, the following code will result in an [IllegalStateException]:
@@ -49,14 +50,9 @@ import kotlinx.coroutines.flow.internal.unsafeFlow as flow
4950
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
5051

5152
// Named anonymous object
52-
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : Flow<T> {
53-
override suspend fun collect(collector: FlowCollector<T>) {
54-
val safeCollector = SafeCollector(collector, coroutineContext)
55-
try {
56-
safeCollector.block()
57-
} finally {
58-
safeCollector.releaseIntercepted()
59-
}
53+
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
54+
override suspend fun collectSafely(collector: FlowCollector<T>) {
55+
collector.block()
6056
}
6157
}
6258

kotlinx-coroutines-core/common/src/flow/operators/Context.kt

+16-1
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,21 @@ public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T> {
208208
}
209209
}
210210

211+
/**
212+
* Returns a flow which checks cancellation status on each emission and throws
213+
* the corresponding cancellation cause if flow collector was cancelled.
214+
* Note that [flow] builder is [cancellable] by default.
215+
*/
216+
public fun <T> Flow<T>.cancellable(): Flow<T> {
217+
if (this is AbstractFlow<*>) return this // Fast-path, already cancellable
218+
return unsafeFlow {
219+
collect {
220+
currentCoroutineContext().ensureActive()
221+
emit(it)
222+
}
223+
}
224+
}
225+
211226
/**
212227
* The operator that changes the context where all transformations applied to the given flow within a [builder] are executed.
213228
* This operator is context preserving and does not affect the context of the preceding and subsequent operations.
@@ -256,7 +271,7 @@ public fun <T, R> Flow<T>.flowWith(
256271
* All builders are written using scoping and no global coroutines are launched, so it is safe not to provide explicit Job.
257272
* It is also necessary not to mess with cancellation if multiple flowWith are used.
258273
*/
259-
val originalContext = coroutineContext.minusKey(Job)
274+
val originalContext = currentCoroutineContext().minusKey(Job)
260275
val prepared = source.flowOn(originalContext).buffer(bufferSize)
261276
builder(prepared).flowOn(flowContext).buffer(bufferSize).collect { value ->
262277
return@collect emit(value)

kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt

+3-4
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ package kotlinx.coroutines.flow
1010

1111
import kotlinx.coroutines.*
1212
import kotlinx.coroutines.flow.internal.*
13-
import kotlin.coroutines.*
1413
import kotlin.jvm.*
1514

1615
// ------------------ WARNING ------------------
@@ -70,7 +69,7 @@ internal inline fun <T, R> Flow<T>.unsafeTransform(
7069
public fun <T> Flow<T>.onStart(
7170
action: suspend FlowCollector<T>.() -> Unit
7271
): Flow<T> = unsafeFlow { // Note: unsafe flow is used here, but safe collector is used to invoke start action
73-
val safeCollector = SafeCollector<T>(this, coroutineContext)
72+
val safeCollector = SafeCollector<T>(this, currentCoroutineContext())
7473
try {
7574
safeCollector.action()
7675
} finally {
@@ -153,7 +152,7 @@ public fun <T> Flow<T>.onCompletion(
153152
throw e
154153
}
155154
// Normal completion
156-
SafeCollector(this, coroutineContext).invokeSafely(action, null)
155+
SafeCollector(this, currentCoroutineContext()).invokeSafely(action, null)
157156
}
158157

159158
/**
@@ -178,7 +177,7 @@ public fun <T> Flow<T>.onEmpty(
178177
emit(it)
179178
}
180179
if (isEmpty) {
181-
val collector = SafeCollector(this, coroutineContext)
180+
val collector = SafeCollector(this, currentCoroutineContext())
182181
try {
183182
collector.action()
184183
} finally {

kotlinx-coroutines-core/common/src/flow/operators/Lint.kt

+29
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
package kotlinx.coroutines.flow
66

7+
import kotlinx.coroutines.*
78
import kotlin.coroutines.*
89

910
/**
@@ -41,3 +42,31 @@ public fun <T> StateFlow<T>.conflate(): Flow<T> = noImpl()
4142
replaceWith = ReplaceWith("this")
4243
)
4344
public fun <T> StateFlow<T>.distinctUntilChanged(): Flow<T> = noImpl()
45+
46+
//@Deprecated(
47+
// message = "isActive is resolved into the extension of outer CoroutineScope which is likely to be an error." +
48+
// "Use currentCoroutineContext().isActive or cancellable() operator instead " +
49+
// "or specify the receiver of isActive explicitly. " +
50+
// "Additionally, flow {} builder emissions are cancellable by default.",
51+
// level = DeprecationLevel.WARNING, // ERROR in 1.4
52+
// replaceWith = ReplaceWith("currentCoroutineContext().isActive")
53+
//)
54+
//public val FlowCollector<*>.isActive: Boolean
55+
// get() = noImpl()
56+
//
57+
//@Deprecated(
58+
// message = "cancel() is resolved into the extension of outer CoroutineScope which is likely to be an error." +
59+
// "Use currentCoroutineContext().cancel() instead or specify the receiver of cancel() explicitly",
60+
// level = DeprecationLevel.WARNING, // ERROR in 1.4
61+
// replaceWith = ReplaceWith("currentCoroutineContext().cancel(cause)")
62+
//)
63+
//public fun FlowCollector<*>.cancel(cause: CancellationException? = null): Unit = noImpl()
64+
//
65+
//@Deprecated(
66+
// message = "coroutineContext is resolved into the property of outer CoroutineScope which is likely to be an error." +
67+
// "Use currentCoroutineContext() instead or specify the receiver of coroutineContext explicitly",
68+
// level = DeprecationLevel.WARNING, // ERROR in 1.4
69+
// replaceWith = ReplaceWith("currentCoroutineContext()")
70+
//)
71+
//public val FlowCollector<*>.coroutineContext: CoroutineContext
72+
// get() = noImpl()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.flow.operators
6+
7+
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.flow.*
9+
import kotlin.test.*
10+
11+
class CancellableTest : TestBase() {
12+
13+
@Test
14+
fun testCancellable() = runTest {
15+
var sum = 0
16+
val flow = (0..1000).asFlow()
17+
.onEach {
18+
if (it != 0) currentCoroutineContext().cancel()
19+
sum += it
20+
}
21+
22+
flow.launchIn(this).join()
23+
assertEquals(500500, sum)
24+
25+
sum = 0
26+
flow.cancellable().launchIn(this).join()
27+
assertEquals(1, sum)
28+
}
29+
30+
@Test
31+
fun testFastPath() {
32+
val flow = listOf(1).asFlow()
33+
assertNotSame(flow, flow.cancellable())
34+
35+
val cancellableFlow = flow { emit(42) }
36+
assertSame(cancellableFlow, cancellableFlow.cancellable())
37+
}
38+
}

kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeTest.kt

+1
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ class FlatMapMergeTest : FlatMapMergeBaseTest() {
5858
}
5959
launch {
6060
expect(2)
61+
yield()
6162
job.cancel()
6263
}
6364
}

kotlinx-coroutines-core/js/src/flow/internal/SafeCollector.kt

+3-1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
package kotlinx.coroutines.flow.internal
66

7+
import kotlinx.coroutines.*
78
import kotlinx.coroutines.flow.*
89
import kotlin.coroutines.*
910

@@ -17,7 +18,8 @@ internal actual class SafeCollector<T> actual constructor(
1718
private var lastEmissionContext: CoroutineContext? = null
1819

1920
override suspend fun emit(value: T) {
20-
val currentContext = coroutineContext
21+
val currentContext = currentCoroutineContext()
22+
currentContext.ensureActive()
2123
if (lastEmissionContext !== currentContext) {
2224
checkContext(currentContext)
2325
lastEmissionContext = currentContext

kotlinx-coroutines-core/jvm/src/flow/internal/SafeCollector.kt

+2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
package kotlinx.coroutines.flow.internal
66

7+
import kotlinx.coroutines.*
78
import kotlinx.coroutines.flow.*
89
import kotlin.coroutines.*
910
import kotlin.coroutines.intrinsics.*
@@ -62,6 +63,7 @@ internal actual class SafeCollector<T> actual constructor(
6263

6364
private fun emit(uCont: Continuation<Unit>, value: T): Any? {
6465
val currentContext = uCont.context
66+
currentContext.ensureActive()
6567
// This check is triggered once per flow on happy path.
6668
val previousContext = lastEmissionContext
6769
if (previousContext !== currentContext) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.flow
6+
7+
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.channels.*
9+
import kotlin.test.*
10+
11+
class FlowCancellationTest : TestBase() {
12+
13+
@Test
14+
fun testEmitIsCooperative() = runTest {
15+
val latch = Channel<Unit>(1)
16+
val job = flow {
17+
expect(1)
18+
latch.send(Unit)
19+
while (true) {
20+
emit(42)
21+
}
22+
}.launchIn(this + Dispatchers.Default)
23+
24+
latch.receive()
25+
expect(2)
26+
job.cancelAndJoin()
27+
finish(3)
28+
}
29+
30+
@Test
31+
fun testIsActiveOnCurrentContext() = runTest {
32+
val latch = Channel<Unit>(1)
33+
val job = flow<Unit> {
34+
expect(1)
35+
latch.send(Unit)
36+
while (currentCoroutineContext().isActive) {
37+
// Do nothing
38+
}
39+
}.launchIn(this + Dispatchers.Default)
40+
41+
latch.receive()
42+
expect(2)
43+
job.cancelAndJoin()
44+
finish(3)
45+
}
46+
}

kotlinx-coroutines-core/native/src/flow/internal/SafeCollector.kt

+3-1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
package kotlinx.coroutines.flow.internal
66

7+
import kotlinx.coroutines.*
78
import kotlinx.coroutines.flow.*
89
import kotlin.coroutines.*
910

@@ -17,7 +18,8 @@ internal actual class SafeCollector<T> actual constructor(
1718
private var lastEmissionContext: CoroutineContext? = null
1819

1920
override suspend fun emit(value: T) {
20-
val currentContext = coroutineContext
21+
val currentContext = currentCoroutineContext()
22+
currentContext.ensureActive()
2123
if (lastEmissionContext !== currentContext) {
2224
checkContext(currentContext)
2325
lastEmissionContext = currentContext

reactive/kotlinx-coroutines-reactive/src/Convert.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import kotlin.coroutines.*
1616
* @param context -- the coroutine context from which the resulting observable is going to be signalled
1717
*/
1818
@Deprecated(message = "Deprecated in the favour of consumeAsFlow()",
19-
level = DeprecationLevel.WARNING,
19+
level = DeprecationLevel.WARNING, // Error in 1.4
2020
replaceWith = ReplaceWith("this.consumeAsFlow().asPublisher()"))
2121
public fun <T> ReceiveChannel<T>.asPublisher(context: CoroutineContext = EmptyCoroutineContext): Publisher<T> = publish(context) {
2222
for (t in this@asPublisher)

reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt

+1
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ private class PublisherAsFlow<T : Any>(
8585
var consumed = 0L
8686
while (true) {
8787
val value = subscriber.takeNextOrNull() ?: break
88+
coroutineContext.ensureActive()
8889
collector.emit(value)
8990
if (++consumed == requestSize) {
9091
consumed = 0L

reactive/kotlinx-coroutines-reactor/test/BackpressureTest.kt

+14
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package kotlinx.coroutines.reactor
66

77
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.channels.*
89
import kotlinx.coroutines.flow.*
910
import kotlinx.coroutines.reactive.*
1011
import org.junit.Test
@@ -36,4 +37,17 @@ class BackpressureTest : TestBase() {
3637
}
3738
finish(3)
3839
}
40+
41+
@Test
42+
fun testCooperativeCancellation() = runTest {
43+
val flow = Flux.fromIterable((0L..Long.MAX_VALUE)).asFlow()
44+
flow.onEach { if (it > 10) currentCoroutineContext().cancel() }.launchIn(this + Dispatchers.Default).join()
45+
}
46+
47+
@Test
48+
fun testCooperativeCancellationForBuffered() = runTest(expected = { it is CancellationException }) {
49+
val flow = Flux.fromIterable((0L..Long.MAX_VALUE)).asFlow()
50+
val channel = flow.onEach { if (it > 10) currentCoroutineContext().cancel() }.produceIn(this + Dispatchers.Default)
51+
channel.consumeEach { /* Do nothing, just consume elements */ }
52+
}
3953
}

0 commit comments

Comments
 (0)