Skip to content

Flow cancellation #2028

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 18, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ public final class kotlinx/coroutines/CoroutineScopeKt {
public static synthetic fun cancel$default (Lkotlinx/coroutines/CoroutineScope;Ljava/lang/String;Ljava/lang/Throwable;ILjava/lang/Object;)V
public static synthetic fun cancel$default (Lkotlinx/coroutines/CoroutineScope;Ljava/util/concurrent/CancellationException;ILjava/lang/Object;)V
public static final fun coroutineScope (Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun currentContext (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun ensureActive (Lkotlinx/coroutines/CoroutineScope;)V
public static final fun isActive (Lkotlinx/coroutines/CoroutineScope;)Z
public static final fun plus (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/CoroutineScope;
Expand Down Expand Up @@ -873,6 +874,7 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static final fun buffer (Lkotlinx/coroutines/flow/Flow;I)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun buffer$default (Lkotlinx/coroutines/flow/Flow;IILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun callbackFlow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun cancellable (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
public static final fun catch (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
public static final fun channelFlow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun collect (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
Expand Down Expand Up @@ -999,9 +1001,13 @@ public final class kotlinx/coroutines/flow/FlowKt {
}

public final class kotlinx/coroutines/flow/LintKt {
public static final fun cancel (Lkotlinx/coroutines/flow/FlowCollector;Ljava/util/concurrent/CancellationException;)V
public static synthetic fun cancel$default (Lkotlinx/coroutines/flow/FlowCollector;Ljava/util/concurrent/CancellationException;ILjava/lang/Object;)V
public static final fun conflate (Lkotlinx/coroutines/flow/StateFlow;)Lkotlinx/coroutines/flow/Flow;
public static final fun distinctUntilChanged (Lkotlinx/coroutines/flow/StateFlow;)Lkotlinx/coroutines/flow/Flow;
public static final fun flowOn (Lkotlinx/coroutines/flow/StateFlow;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/flow/Flow;
public static final fun getCoroutineContext (Lkotlinx/coroutines/flow/FlowCollector;)Lkotlin/coroutines/CoroutineContext;
public static final fun isActive (Lkotlinx/coroutines/flow/FlowCollector;)Z
}

public abstract interface class kotlinx/coroutines/flow/MutableStateFlow : kotlinx/coroutines/flow/StateFlow {
Expand Down
16 changes: 16 additions & 0 deletions kotlinx-coroutines-core/common/src/CoroutineScope.kt
Original file line number Diff line number Diff line change
Expand Up @@ -233,3 +233,19 @@ public fun CoroutineScope.cancel(message: String, cause: Throwable? = null): Uni
* ```
*/
public fun CoroutineScope.ensureActive(): Unit = coroutineContext.ensureActive()


/**
* Returns the current [CoroutineContext] retrieved by using [kotlin.coroutines.coroutineContext].
* This function is an alias to avoid name clash with [CoroutineScope.coroutineContext] in a receiver position:
*
* ```
* launch { // this: CoroutineScope
* val flow = flow<Unit> {
* coroutineContext // Resolves into the context of outer launch, which is incorrect, see KT
* currentContext() // Retrieves actual context whe the flow is collected
* }
* }
* ```
*/
public suspend inline fun currentContext(): CoroutineContext = coroutineContext
1 change: 1 addition & 0 deletions kotlinx-coroutines-core/common/src/flow/Builders.kt
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import kotlinx.coroutines.flow.internal.unsafeFlow as flow
*
* fibonacci().take(100).collect { println(it) }
* ```
* Emissions from [flow] builder are [cancellable] by default.
*
* `emit` should happen strictly in the dispatchers of the [block] in order to preserve the flow context.
* For example, the following code will result in an [IllegalStateException]:
Expand Down
14 changes: 13 additions & 1 deletion kotlinx-coroutines-core/common/src/flow/operators/Context.kt
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,18 @@ public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T> {
}
}

/**
* Returns a flow which checks cancellation status on each emission and throws
* the corresponding cancellation cause if flow collector was cancelled.
* Note that [flow] builder is [cancellable] by default.
*/
public fun <T> Flow<T>.cancellable(): Flow<T> = unsafeFlow {
collect {
currentContext().ensureActive()
emit(it)
}
}

/**
* The operator that changes the context where all transformations applied to the given flow within a [builder] are executed.
* This operator is context preserving and does not affect the context of the preceding and subsequent operations.
Expand Down Expand Up @@ -256,7 +268,7 @@ public fun <T, R> Flow<T>.flowWith(
* All builders are written using scoping and no global coroutines are launched, so it is safe not to provide explicit Job.
* It is also necessary not to mess with cancellation if multiple flowWith are used.
*/
val originalContext = coroutineContext.minusKey(Job)
val originalContext = currentContext().minusKey(Job)
val prepared = source.flowOn(originalContext).buffer(bufferSize)
builder(prepared).flowOn(flowContext).buffer(bufferSize).collect { value ->
return@collect emit(value)
Expand Down
6 changes: 3 additions & 3 deletions kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ internal inline fun <T, R> Flow<T>.unsafeTransform(
public fun <T> Flow<T>.onStart(
action: suspend FlowCollector<T>.() -> Unit
): Flow<T> = unsafeFlow { // Note: unsafe flow is used here, but safe collector is used to invoke start action
val safeCollector = SafeCollector<T>(this, coroutineContext)
val safeCollector = SafeCollector<T>(this, currentContext())
try {
safeCollector.action()
} finally {
Expand Down Expand Up @@ -153,7 +153,7 @@ public fun <T> Flow<T>.onCompletion(
throw e
}
// Normal completion
SafeCollector(this, coroutineContext).invokeSafely(action, null)
SafeCollector(this, currentContext()).invokeSafely(action, null)
}

/**
Expand All @@ -178,7 +178,7 @@ public fun <T> Flow<T>.onEmpty(
emit(it)
}
if (isEmpty) {
val collector = SafeCollector(this, coroutineContext)
val collector = SafeCollector(this, currentContext())
try {
collector.action()
} finally {
Expand Down
29 changes: 29 additions & 0 deletions kotlinx-coroutines-core/common/src/flow/operators/Lint.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package kotlinx.coroutines.flow

import kotlinx.coroutines.*
import kotlin.coroutines.*

/**
Expand Down Expand Up @@ -41,3 +42,31 @@ public fun <T> StateFlow<T>.conflate(): Flow<T> = noImpl()
replaceWith = ReplaceWith("this")
)
public fun <T> StateFlow<T>.distinctUntilChanged(): Flow<T> = noImpl()

@Deprecated(
message = "isActive is resolved into the extension of outer CoroutineScope which is likely to be an error." +
"Use currentContext().isActive or cancellable() operator instead " +
"or specify the receiver of isActive explicitly. " +
"Additionally, flow {} builder emissions are cancellable by default.",
level = DeprecationLevel.WARNING, // ERROR in 1.4
replaceWith = ReplaceWith("currentContext().isActive")
)
public val FlowCollector<*>.isActive: Boolean
get() = noImpl()

@Deprecated(
message = "cancel() is resolved into the extension of outer CoroutineScope which is likely to be an error." +
"Use currentContext().cancel() instead or specify the receiver of cancel() explicitly",
level = DeprecationLevel.WARNING, // ERROR in 1.4
replaceWith = ReplaceWith("currentContext().cancel(cause)")
)
public fun FlowCollector<*>.cancel(cause: CancellationException? = null): Unit = noImpl()

@Deprecated(
message = "coroutineContext is resolved into the property of outer CoroutineScope which is likely to be an error." +
"Use currentContext() instead or specify the receiver of coroutineContext explicitly",
level = DeprecationLevel.WARNING, // ERROR in 1.4
replaceWith = ReplaceWith("currentContext()")
)
public val FlowCollector<*>.coroutineContext: CoroutineContext
get() = noImpl()
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.flow.operators

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.test.*

class CancellableTest : TestBase() {

@Test
fun testCancellable() = runTest {
var sum = 0
val flow = (0..1000).asFlow()
.onEach {
if (it != 0) currentContext().cancel()
sum += it
}

flow.launchIn(this).join()
assertEquals(500500, sum)

sum = 0
flow.cancellable().launchIn(this).join()
assertEquals(1, sum)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class FlatMapMergeTest : FlatMapMergeBaseTest() {
}
launch {
expect(2)
yield()
job.cancel()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package kotlinx.coroutines.flow.internal

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.coroutines.*

Expand All @@ -17,7 +18,8 @@ internal actual class SafeCollector<T> actual constructor(
private var lastEmissionContext: CoroutineContext? = null

override suspend fun emit(value: T) {
val currentContext = coroutineContext
val currentContext = currentContext()
currentContext.ensureActive()
if (lastEmissionContext !== currentContext) {
checkContext(currentContext)
lastEmissionContext = currentContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package kotlinx.coroutines.flow.internal

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.coroutines.*
import kotlin.coroutines.intrinsics.*
Expand Down Expand Up @@ -62,6 +63,7 @@ internal actual class SafeCollector<T> actual constructor(

private fun emit(uCont: Continuation<Unit>, value: T): Any? {
val currentContext = uCont.context
currentContext.ensureActive()
// This check is triggered once per flow on happy path.
val previousContext = lastEmissionContext
if (previousContext !== currentContext) {
Expand Down
46 changes: 46 additions & 0 deletions kotlinx-coroutines-core/jvm/test/flow/FlowCancellationTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.flow

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlin.test.*

class FlowCancellationTest : TestBase() {

@Test
fun testEmitIsCooperative() = runTest {
val latch = Channel<Unit>(1)
val job = flow {
expect(1)
latch.send(Unit)
while (true) {
emit(42)
}
}.launchIn(this + Dispatchers.Default)

latch.receive()
expect(2)
job.cancelAndJoin()
finish(3)
}

@Test
fun testIsActiveOnCurrentContext() = runTest {
val latch = Channel<Unit>(1)
val job = flow<Unit> {
expect(1)
latch.send(Unit)
while (currentContext().isActive) {
// Do nothing
}
}.launchIn(this + Dispatchers.Default)

latch.receive()
expect(2)
job.cancelAndJoin()
finish(3)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package kotlinx.coroutines.flow.internal

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.coroutines.*

Expand All @@ -17,7 +18,8 @@ internal actual class SafeCollector<T> actual constructor(
private var lastEmissionContext: CoroutineContext? = null

override suspend fun emit(value: T) {
val currentContext = coroutineContext
val currentContext = currentContext()
currentContext.ensureActive()
if (lastEmissionContext !== currentContext) {
checkContext(currentContext)
lastEmissionContext = currentContext
Expand Down
2 changes: 1 addition & 1 deletion reactive/kotlinx-coroutines-reactive/src/Convert.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import kotlin.coroutines.*
* @param context -- the coroutine context from which the resulting observable is going to be signalled
*/
@Deprecated(message = "Deprecated in the favour of consumeAsFlow()",
level = DeprecationLevel.WARNING,
level = DeprecationLevel.WARNING, // Error in 1.4
replaceWith = ReplaceWith("this.consumeAsFlow().asPublisher()"))
public fun <T> ReceiveChannel<T>.asPublisher(context: CoroutineContext = EmptyCoroutineContext): Publisher<T> = publish(context) {
for (t in this@asPublisher)
Expand Down
1 change: 1 addition & 0 deletions reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ private class PublisherAsFlow<T : Any>(
var consumed = 0L
while (true) {
val value = subscriber.takeNextOrNull() ?: break
coroutineContext.ensureActive()
collector.emit(value)
if (++consumed == requestSize) {
consumed = 0L
Expand Down
14 changes: 14 additions & 0 deletions reactive/kotlinx-coroutines-reactor/test/BackpressureTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package kotlinx.coroutines.reactor

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.reactive.*
import org.junit.Test
Expand Down Expand Up @@ -36,4 +37,17 @@ class BackpressureTest : TestBase() {
}
finish(3)
}

@Test
fun testCooperativeCancellation() = runTest {
val flow = Flux.fromIterable((0L..Long.MAX_VALUE)).asFlow()
flow.onEach { if (it > 10) currentContext().cancel() }.launchIn(this + Dispatchers.Default).join()
}

@Test
fun testCooperativeCancellationForBuffered() = runTest(expected = { it is CancellationException }) {
val flow = Flux.fromIterable((0L..Long.MAX_VALUE)).asFlow()
val channel = flow.onEach { if (it > 10) currentContext().cancel() }.produceIn(this + Dispatchers.Default)
channel.consumeEach { /* Do nothing, just consume elements */ }
}
}