Skip to content

Commit e2a5671

Browse files
authored
Flow scope (#1227)
* Introducing flowScope, builder necessary for creating cancellation-transparent flow operators * Incorporate flow scope into flow operators Fixes #1218 Fixes #1128
1 parent db52e97 commit e2a5671

35 files changed

+589
-129
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt

+1-1
Original file line numberDiff line numberDiff line change
@@ -381,7 +381,6 @@ public class kotlinx/coroutines/JobSupport : kotlinx/coroutines/ChildJob, kotlin
381381
public fun fold (Ljava/lang/Object;Lkotlin/jvm/functions/Function2;)Ljava/lang/Object;
382382
public fun get (Lkotlin/coroutines/CoroutineContext$Key;)Lkotlin/coroutines/CoroutineContext$Element;
383383
public final fun getCancellationException ()Ljava/util/concurrent/CancellationException;
384-
protected fun getCancelsParent ()Z
385384
public fun getChildJobCancellationCause ()Ljava/util/concurrent/CancellationException;
386385
public final fun getChildren ()Lkotlin/sequences/Sequence;
387386
protected final fun getCompletionCause ()Ljava/lang/Throwable;
@@ -396,6 +395,7 @@ public class kotlinx/coroutines/JobSupport : kotlinx/coroutines/ChildJob, kotlin
396395
public final fun isCancelled ()Z
397396
public final fun isCompleted ()Z
398397
public final fun isCompletedExceptionally ()Z
398+
protected fun isScopedCoroutine ()Z
399399
public final fun join (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
400400
public fun minusKey (Lkotlin/coroutines/CoroutineContext$Key;)Lkotlin/coroutines/CoroutineContext;
401401
protected fun onCancelling (Ljava/lang/Throwable;)V

kotlinx-coroutines-core/common/src/Exceptions.common.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ internal expect class JobCancellationException(
2323
internal val job: Job
2424
}
2525

26-
internal expect class CoroutinesInternalError(message: String, cause: Throwable) : Error
26+
internal class CoroutinesInternalError(message: String, cause: Throwable) : Error(message, cause)
2727

2828
internal expect fun Throwable.addSuppressedThrowable(other: Throwable)
2929
// For use in tests

kotlinx-coroutines-core/common/src/JobSupport.kt

+45-25
Original file line numberDiff line numberDiff line change
@@ -319,6 +319,31 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
319319
cancelParent(cause) // tentative cancellation -- does not matter if there is no parent
320320
}
321321

322+
/**
323+
* The method that is invoked when the job is cancelled to possibly propagate cancellation to the parent.
324+
* Returns `true` if the parent is responsible for handling the exception, `false` otherwise.
325+
*
326+
* Invariant: never returns `false` for instances of [CancellationException], otherwise such exception
327+
* may leak to the [CoroutineExceptionHandler].
328+
*/
329+
private fun cancelParent(cause: Throwable): Boolean {
330+
/* CancellationException is considered "normal" and parent usually is not cancelled when child produces it.
331+
* This allow parent to cancel its children (normally) without being cancelled itself, unless
332+
* child crashes and produce some other exception during its completion.
333+
*/
334+
val isCancellation = cause is CancellationException
335+
val parent = parentHandle
336+
// No parent -- ignore CE, report other exceptions.
337+
if (parent === null || parent === NonDisposableHandle) {
338+
return isCancellation
339+
}
340+
341+
// Is scoped coroutine -- don't propagate, will be rethrown
342+
if (isScopedCoroutine) return isCancellation
343+
// Notify parent but don't forget to check cancellation
344+
return parent.childCancelled(cause) || isCancellation
345+
}
346+
322347
private fun NodeList.notifyCompletion(cause: Throwable?) =
323348
notifyHandlers<JobNode<*>>(this, cause)
324349

@@ -594,21 +619,29 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
594619
cancelImpl(parentJob)
595620
}
596621

597-
// Child was cancelled with cause
598-
// It is overridden in supervisor implementations to ignore child cancellation
599-
public open fun childCancelled(cause: Throwable): Boolean =
600-
cancelImpl(cause) && handlesException
622+
/**
623+
* Child was cancelled with a cause.
624+
* In this method parent decides whether it cancels itself (e.g. on a critical failure) and whether it handles the exception of the child.
625+
* It is overridden in supervisor implementations to completely ignore any child cancellation.
626+
* Returns `true` if exception is handled, `false` otherwise (then caller is responsible for handling an exception)
627+
*
628+
* Invariant: never returns `false` for instances of [CancellationException], otherwise such exception
629+
* may leak to the [CoroutineExceptionHandler].
630+
*/
631+
public open fun childCancelled(cause: Throwable): Boolean {
632+
if (cause is CancellationException) return true
633+
return cancelImpl(cause) && handlesException
634+
}
601635

602636
/**
603637
* Makes this [Job] cancelled with a specified [cause].
604638
* It is used in [AbstractCoroutine]-derived classes when there is an internal failure.
605639
*/
606-
public fun cancelCoroutine(cause: Throwable?) =
607-
cancelImpl(cause)
640+
public fun cancelCoroutine(cause: Throwable?) = cancelImpl(cause)
608641

609642
// cause is Throwable or ParentJob when cancelChild was invoked
610643
// returns true is exception was handled, false otherwise
611-
private fun cancelImpl(cause: Any?): Boolean {
644+
internal fun cancelImpl(cause: Any?): Boolean {
612645
if (onCancelComplete) {
613646
// make sure it is completing, if cancelMakeCompleting returns true it means it had make it
614647
// completing and had recorded exception
@@ -912,14 +945,12 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
912945
protected open fun onCancelling(cause: Throwable?) {}
913946

914947
/**
915-
* When this function returns `true` the parent is cancelled on cancellation of this job.
916-
* Note that [CancellationException] is considered "normal" and parent is not cancelled when child produces it.
917-
* This allows parent to cancel its children (normally) without being cancelled itself, unless
918-
* child crashes and produce some other exception during its completion.
919-
*
920-
* @suppress **This is unstable API and it is subject to change.*
948+
* Returns `true` for scoped coroutines.
949+
* Scoped coroutine is a coroutine that is executed sequentially within the enclosing scope without any concurrency.
950+
* Scoped coroutines always handle any exception happened within -- they just rethrow it to the enclosing scope.
951+
* Examples of scoped coroutines are `coroutineScope`, `withTimeout` and `runBlocking`.
921952
*/
922-
protected open val cancelsParent: Boolean get() = true
953+
protected open val isScopedCoroutine: Boolean get() = false
923954

924955
/**
925956
* Returns `true` for jobs that handle their exceptions or integrate them into the job's result via [onCompletionInternal].
@@ -939,20 +970,9 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
939970
*
940971
* This method is invoked **exactly once** when the final exception of the job is determined
941972
* and before it becomes complete. At the moment of invocation the job and all its children are complete.
942-
*
943-
* @suppress **This is unstable API and it is subject to change.*
944973
*/
945974
protected open fun handleJobException(exception: Throwable): Boolean = false
946975

947-
private fun cancelParent(cause: Throwable): Boolean {
948-
// CancellationException is considered "normal" and parent is not cancelled when child produces it.
949-
// This allow parent to cancel its children (normally) without being cancelled itself, unless
950-
// child crashes and produce some other exception during its completion.
951-
if (cause is CancellationException) return true
952-
if (!cancelsParent) return false
953-
return parentHandle?.childCancelled(cause) == true
954-
}
955-
956976
/**
957977
* Override for completion actions that need to update some external object depending on job's state,
958978
* right before all the waiters for coroutine's completion are notified.

kotlinx-coroutines-core/common/src/Timeout.kt

+1-3
Original file line numberDiff line numberDiff line change
@@ -85,9 +85,7 @@ private open class TimeoutCoroutine<U, in T: U>(
8585
override val defaultResumeMode: Int get() = MODE_DIRECT
8686
override val callerFrame: CoroutineStackFrame? get() = (uCont as? CoroutineStackFrame)
8787
override fun getStackTraceElement(): StackTraceElement? = null
88-
89-
override val cancelsParent: Boolean
90-
get() = false // it throws exception to parent instead of cancelling it
88+
override val isScopedCoroutine: Boolean get() = true
9189

9290
@Suppress("LeakingThis", "Deprecation")
9391
override fun run() {

kotlinx-coroutines-core/common/src/channels/Produce.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ public fun <E> CoroutineScope.produce(
126126
return coroutine
127127
}
128128

129-
private class ProducerCoroutine<E>(
129+
internal open class ProducerCoroutine<E>(
130130
parentContext: CoroutineContext, channel: Channel<E>
131131
) : ChannelCoroutine<E>(parentContext, channel, active = true), ProducerScope<E> {
132132
override val isActive: Boolean

kotlinx-coroutines-core/common/src/flow/Builders.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,7 @@ public fun <T> channelFlow(@BuilderInference block: suspend ProducerScope<T>.()
313313
public inline fun <T> callbackFlow(@BuilderInference noinline block: suspend ProducerScope<T>.() -> Unit): Flow<T> =
314314
channelFlow(block)
315315

316-
// ChannelFlow implementation that is the first in the chain of flow operations and introduces (builds) a flow
316+
// ChannelFlow implementation that is the first in the chain of flow operations and introduces (builds) a flow
317317
private class ChannelFlowBuilder<T>(
318318
private val block: suspend ProducerScope<T>.() -> Unit,
319319
context: CoroutineContext = EmptyCoroutineContext,

kotlinx-coroutines-core/common/src/flow/Migration.kt

-1
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,6 @@ public fun <T> Flow<T>.onErrorResume(fallback: Flow<T>): Flow<T> = error("Should
118118
@Deprecated(message = "withContext in flow body is deprecated, use flowOn instead", level = DeprecationLevel.ERROR)
119119
public fun <T, R> FlowCollector<T>.withContext(context: CoroutineContext, block: suspend () -> R): Unit = error("Should not be called")
120120

121-
122121
/**
123122
* `subscribe` is Rx-specific API that has no direct match in flows.
124123
* One can use `launch` instead, for example the following:

kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -63,10 +63,10 @@ internal abstract class ChannelFlow<T>(
6363
scope.broadcast(context, produceCapacity, start, block = collectToFun)
6464

6565
fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> =
66-
scope.produce(context, produceCapacity, block = collectToFun)
66+
scope.flowProduce(context, produceCapacity, block = collectToFun)
6767

6868
override suspend fun collect(collector: FlowCollector<T>) =
69-
coroutineScope { // todo: flowScope
69+
coroutineScope {
7070
val channel = produceImpl(this)
7171
channel.consumeEach { collector.emit(it) }
7272
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.flow.internal
6+
7+
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.channels.*
9+
import kotlinx.coroutines.flow.*
10+
import kotlinx.coroutines.internal.*
11+
import kotlinx.coroutines.intrinsics.*
12+
import kotlin.coroutines.*
13+
import kotlin.coroutines.intrinsics.*
14+
import kotlinx.coroutines.flow.unsafeFlow as flow
15+
16+
/**
17+
* Creates a [CoroutineScope] and calls the specified suspend block with this scope.
18+
* This builder is similar to [coroutineScope] with the only exception that it *ties* lifecycle of children
19+
* and itself regarding the cancellation, thus being cancelled when one of the children becomes cancelled.
20+
*
21+
* For example:
22+
* ```
23+
* flowScope {
24+
* launch {
25+
* throw CancellationException()
26+
* }
27+
* } // <- CE will be rethrown here
28+
* ```
29+
*/
30+
internal suspend fun <R> flowScope(@BuilderInference block: suspend CoroutineScope.() -> R): R =
31+
suspendCoroutineUninterceptedOrReturn { uCont ->
32+
val coroutine = FlowCoroutine(uCont.context, uCont)
33+
coroutine.startUndispatchedOrReturn(coroutine, block)
34+
}
35+
36+
/**
37+
* Creates a flow that also provides a [CoroutineScope] for each collector
38+
* Shorthand for:
39+
* ```
40+
* flow {
41+
* flowScope {
42+
* ...
43+
* }
44+
* }
45+
* ```
46+
* with additional constraint on cancellation.
47+
* To cancel child without cancelling itself, `cancel(ChildCancelledException())` should be used.
48+
*/
49+
internal fun <R> scopedFlow(@BuilderInference block: suspend CoroutineScope.(FlowCollector<R>) -> Unit): Flow<R> =
50+
flow {
51+
val collector = this
52+
flowScope { block(collector) }
53+
}
54+
55+
/*
56+
* Shortcut for produce { flowScope {block() } }
57+
*/
58+
internal fun <T> CoroutineScope.flowProduce(
59+
context: CoroutineContext,
60+
capacity: Int = 0, @BuilderInference block: suspend ProducerScope<T>.() -> Unit
61+
): ReceiveChannel<T> {
62+
val channel = Channel<T>(capacity)
63+
val newContext = newCoroutineContext(context)
64+
val coroutine = FlowProduceCoroutine(newContext, channel)
65+
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
66+
return coroutine
67+
}
68+
69+
private class FlowCoroutine<T>(
70+
context: CoroutineContext,
71+
uCont: Continuation<T>
72+
) : ScopeCoroutine<T>(context, uCont) {
73+
74+
public override fun childCancelled(cause: Throwable): Boolean {
75+
if (cause is ChildCancelledException) return true
76+
return cancelImpl(cause)
77+
}
78+
}
79+
80+
private class FlowProduceCoroutine<T>(
81+
parentContext: CoroutineContext,
82+
channel: Channel<T>
83+
) : ProducerCoroutine<T>(parentContext, channel) {
84+
85+
public override fun childCancelled(cause: Throwable): Boolean {
86+
if (cause is ChildCancelledException) return true
87+
return cancelImpl(cause)
88+
}
89+
}

kotlinx-coroutines-core/common/src/flow/internal/AbortFlowException.common.kt renamed to kotlinx-coroutines-core/common/src/flow/internal/FlowExceptions.common.kt

+5
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,8 @@ import kotlinx.coroutines.*
1111
* This exception should never escape outside of operator's implementation.
1212
*/
1313
internal expect class AbortFlowException() : CancellationException
14+
15+
/**
16+
* Exception used to cancel child of [scopedFlow] without cancelling the whole scope.
17+
*/
18+
internal expect class ChildCancelledException() : CancellationException

kotlinx-coroutines-core/common/src/flow/operators/Delay.kt

+45-47
Original file line numberDiff line numberDiff line change
@@ -60,34 +60,33 @@ public fun <T> Flow<T>.delayEach(timeMillis: Long): Flow<T> = flow {
6060
*/
6161
public fun <T> Flow<T>.debounce(timeoutMillis: Long): Flow<T> {
6262
require(timeoutMillis > 0) { "Debounce timeout should be positive" }
63-
return flow {
64-
coroutineScope {
65-
val values = Channel<Any?>(Channel.CONFLATED) // Actually Any, KT-30796
66-
// Channel is not closed deliberately as there is no close with value
67-
val collector = async {
68-
collect { value -> values.send(value ?: NULL) }
69-
}
63+
return scopedFlow { downstream ->
64+
val values = Channel<Any?>(Channel.CONFLATED) // Actually Any, KT-30796
65+
// Channel is not closed deliberately as there is no close with value
66+
val collector = async {
67+
collect { value -> values.send(value ?: NULL) }
68+
}
7069

71-
var isDone = false
72-
var lastValue: Any? = null
73-
while (!isDone) {
74-
select<Unit> {
75-
values.onReceive {
76-
lastValue = it
77-
}
70+
var isDone = false
71+
var lastValue: Any? = null
72+
while (!isDone) {
73+
select<Unit> {
74+
values.onReceive {
75+
lastValue = it
76+
}
7877

79-
lastValue?.let { value -> // set timeout when lastValue != null
80-
onTimeout(timeoutMillis) {
81-
lastValue = null // Consume the value
82-
emit(NULL.unbox(value))
83-
}
78+
lastValue?.let { value ->
79+
// set timeout when lastValue != null
80+
onTimeout(timeoutMillis) {
81+
lastValue = null // Consume the value
82+
downstream.emit(NULL.unbox(value))
8483
}
84+
}
8585

86-
// Close with value 'idiom'
87-
collector.onAwait {
88-
if (lastValue != null) emit(NULL.unbox(lastValue))
89-
isDone = true
90-
}
86+
// Close with value 'idiom'
87+
collector.onAwait {
88+
if (lastValue != null) downstream.emit(NULL.unbox(lastValue))
89+
isDone = true
9190
}
9291
}
9392
}
@@ -112,32 +111,31 @@ public fun <T> Flow<T>.debounce(timeoutMillis: Long): Flow<T> {
112111
*/
113112
public fun <T> Flow<T>.sample(periodMillis: Long): Flow<T> {
114113
require(periodMillis > 0) { "Sample period should be positive" }
115-
return flow {
116-
coroutineScope {
117-
val values = produce<Any?>(capacity = Channel.CONFLATED) { // Actually Any, KT-30796
118-
collect { value -> send(value ?: NULL) }
119-
}
114+
return scopedFlow { downstream ->
115+
val values = produce<Any?>(capacity = Channel.CONFLATED) {
116+
// Actually Any, KT-30796
117+
collect { value -> send(value ?: NULL) }
118+
}
120119

121-
var isDone = false
122-
var lastValue: Any? = null
123-
val ticker = fixedPeriodTicker(periodMillis)
124-
while (!isDone) {
125-
select<Unit> {
126-
values.onReceiveOrNull {
127-
if (it == null) {
128-
ticker.cancel()
129-
isDone = true
130-
} else {
131-
lastValue = it
132-
}
120+
var isDone = false
121+
var lastValue: Any? = null
122+
val ticker = fixedPeriodTicker(periodMillis)
123+
while (!isDone) {
124+
select<Unit> {
125+
values.onReceiveOrNull {
126+
if (it == null) {
127+
ticker.cancel(ChildCancelledException())
128+
isDone = true
129+
} else {
130+
lastValue = it
133131
}
132+
}
134133

135-
// todo: shall be start sampling only when an element arrives or sample aways as here?
136-
ticker.onReceive {
137-
val value = lastValue ?: return@onReceive
138-
lastValue = null // Consume the value
139-
emit(NULL.unbox(value))
140-
}
134+
// todo: shall be start sampling only when an element arrives or sample aways as here?
135+
ticker.onReceive {
136+
val value = lastValue ?: return@onReceive
137+
lastValue = null // Consume the value
138+
downstream.emit(NULL.unbox(value))
141139
}
142140
}
143141
}

0 commit comments

Comments
 (0)