Skip to content

Commit 7ed706a

Browse files
committed
Simplify FlowScope implementation, stylistic improvements
1 parent 2064693 commit 7ed706a

File tree

8 files changed

+78
-84
lines changed

8 files changed

+78
-84
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt

-1
Original file line numberDiff line numberDiff line change
@@ -377,7 +377,6 @@ public class kotlinx/coroutines/JobSupport : kotlinx/coroutines/ChildJob, kotlin
377377
public fun cancel (Ljava/util/concurrent/CancellationException;)V
378378
public final fun cancelCoroutine (Ljava/lang/Throwable;)Z
379379
public fun cancelInternal (Ljava/lang/Throwable;)Z
380-
public fun cancelOnChildCancellation (Ljava/util/concurrent/CancellationException;)Z
381380
public fun childCancelled (Ljava/lang/Throwable;)Z
382381
public fun fold (Ljava/lang/Object;Lkotlin/jvm/functions/Function2;)Ljava/lang/Object;
383382
public fun get (Lkotlin/coroutines/CoroutineContext$Key;)Lkotlin/coroutines/CoroutineContext$Element;

kotlinx-coroutines-core/common/src/JobSupport.kt

+9-11
Original file line numberDiff line numberDiff line change
@@ -320,10 +320,10 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
320320
}
321321

322322
/**
323-
* The method that is invoked when the job is cancelled to possible propagate cancellation to the parent.
323+
* The method that is invoked when the job is cancelled to possibly propagate cancellation to the parent.
324324
* Returns `true` if the parent is responsible for handling the exception, `false` otherwise.
325325
*
326-
* Invariant: never returns `true` for instances of [CancellationException], otherwise such exception
326+
* Invariant: never returns `false` for instances of [CancellationException], otherwise such exception
327327
* may leak to the [CoroutineExceptionHandler].
328328
*/
329329
private fun cancelParent(cause: Throwable): Boolean {
@@ -619,19 +619,17 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
619619
cancelImpl(parentJob)
620620
}
621621

622-
/**
623-
* Returns `true` if job should cancel itself on child [CancellationException].
624-
*/
625-
public open fun cancelOnChildCancellation(cause: CancellationException) = false
626-
627622
/**
628623
* Child was cancelled with a cause.
629-
* In this method parent decides whether it cancels itself (e.g. on a critical failure) and
630-
* whether it handles the exception of the child.
624+
* In this method parent decides whether it cancels itself (e.g. on a critical failure) and whether it handles the exception of the child.
631625
* It is overridden in supervisor implementations to completely ignore any child cancellation.
626+
* Returns `true` if exception is handled, `false` otherwise (then caller is responsible for handling an exception)
627+
*
628+
* Invariant: never returns `false` for instances of [CancellationException], otherwise such exception
629+
* may leak to the [CoroutineExceptionHandler].
632630
*/
633631
public open fun childCancelled(cause: Throwable): Boolean {
634-
if (cause is CancellationException && !cancelOnChildCancellation(cause)) return true
632+
if (cause is CancellationException) return true
635633
return cancelImpl(cause) && handlesException
636634
}
637635

@@ -643,7 +641,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
643641

644642
// cause is Throwable or ParentJob when cancelChild was invoked
645643
// returns true is exception was handled, false otherwise
646-
private fun cancelImpl(cause: Any?): Boolean {
644+
internal fun cancelImpl(cause: Any?): Boolean {
647645
if (onCancelComplete) {
648646
// make sure it is completing, if cancelMakeCompleting returns true it means it had make it
649647
// completing and had recorded exception
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.flow.internal
6+
7+
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.flow.*
9+
import kotlinx.coroutines.internal.*
10+
import kotlinx.coroutines.intrinsics.*
11+
import kotlin.coroutines.*
12+
import kotlin.coroutines.intrinsics.*
13+
import kotlinx.coroutines.flow.unsafeFlow as flow
14+
15+
/**
16+
* Creates a [CoroutineScope] and calls the specified suspend block with this scope.
17+
* This builder is similar to [coroutineScope] with the only exception that it *ties* lifecycle of children
18+
* and itself regarding the cancellation, thus being cancelled when one of the children becomes cancelled.
19+
*
20+
* For example:
21+
* ```
22+
* flowScope {
23+
* launch {
24+
* throw CancellationException()
25+
* }
26+
* } // <- CE will be rethrown here
27+
* ```
28+
*/
29+
internal suspend fun <R> flowScope(@BuilderInference block: suspend CoroutineScope.() -> R): R =
30+
suspendCoroutineUninterceptedOrReturn { uCont ->
31+
val coroutine = FlowCoroutine(uCont.context, uCont)
32+
coroutine.startUndispatchedOrReturn(coroutine, block)
33+
}
34+
35+
/**
36+
* Creates a flow that also provides a [CoroutineScope] for each collector
37+
* Shorthand for:
38+
* ```
39+
* flow {
40+
* flowScope {
41+
* ...
42+
* }
43+
* }
44+
* ```
45+
* with additional constraint on cancellation.
46+
* To cancel child without cancelling itself, `cancel(ChildCancelledException())` should be used.
47+
*/
48+
internal fun <R> scopedFlow(@BuilderInference block: suspend CoroutineScope.(FlowCollector<R>) -> Unit): Flow<R> =
49+
flow {
50+
val collector = this
51+
flowScope { block(collector) }
52+
}
53+
54+
internal class FlowCoroutine<T>(context: CoroutineContext, uCont: Continuation<T>) :
55+
ScopeCoroutine<T>(context, uCont) {
56+
57+
public override fun childCancelled(cause: Throwable): Boolean {
58+
if (cause is ChildCancelledException) return true
59+
return cancelImpl(cause)
60+
}
61+
}

kotlinx-coroutines-core/common/src/flow/internal/FlowScope.kt

-58
This file was deleted.

kotlinx-coroutines-core/common/src/flow/operators/Delay.kt

+5-5
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public fun <T> Flow<T>.delayEach(timeMillis: Long): Flow<T> = flow {
6060
*/
6161
public fun <T> Flow<T>.debounce(timeoutMillis: Long): Flow<T> {
6262
require(timeoutMillis > 0) { "Debounce timeout should be positive" }
63-
return scopedFlow {
63+
return scopedFlow { downstream ->
6464
val values = Channel<Any?>(Channel.CONFLATED) // Actually Any, KT-30796
6565
// Channel is not closed deliberately as there is no close with value
6666
val collector = async {
@@ -79,13 +79,13 @@ public fun <T> Flow<T>.debounce(timeoutMillis: Long): Flow<T> {
7979
// set timeout when lastValue != null
8080
onTimeout(timeoutMillis) {
8181
lastValue = null // Consume the value
82-
emit(NULL.unbox(value))
82+
downstream.emit(NULL.unbox(value))
8383
}
8484
}
8585

8686
// Close with value 'idiom'
8787
collector.onAwait {
88-
if (lastValue != null) emit(NULL.unbox(lastValue))
88+
if (lastValue != null) downstream.emit(NULL.unbox(lastValue))
8989
isDone = true
9090
}
9191
}
@@ -111,7 +111,7 @@ public fun <T> Flow<T>.debounce(timeoutMillis: Long): Flow<T> {
111111
*/
112112
public fun <T> Flow<T>.sample(periodMillis: Long): Flow<T> {
113113
require(periodMillis > 0) { "Sample period should be positive" }
114-
return scopedFlow {
114+
return scopedFlow { downstream ->
115115
val values = produce<Any?>(capacity = Channel.CONFLATED) {
116116
// Actually Any, KT-30796
117117
collect { value -> send(value ?: NULL) }
@@ -135,7 +135,7 @@ public fun <T> Flow<T>.sample(periodMillis: Long): Flow<T> {
135135
ticker.onReceive {
136136
val value = lastValue ?: return@onReceive
137137
lastValue = null // Consume the value
138-
emit(NULL.unbox(value))
138+
downstream.emit(NULL.unbox(value))
139139
}
140140
}
141141
}

kotlinx-coroutines-core/common/src/flow/operators/Merge.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public fun <T, R> Flow<T>.flatMapMerge(concurrency: Int = 16, bufferSize: Int =
4646
require(concurrency >= 0) { "Expected non-negative concurrency level, but had $concurrency" }
4747
return scopedFlow {
4848
val semaphore = Channel<Unit>(concurrency)
49-
val flatMap = SerializingFlatMapCollector(this@scopedFlow, bufferSize)
49+
val flatMap = SerializingFlatMapCollector(it, bufferSize)
5050
collect { outerValue ->
5151
// TODO real semaphore (#94)
5252
semaphore.send(Unit) // Acquire concurrency permit
@@ -108,7 +108,7 @@ public fun <T> Flow<Flow<T>>.flattenMerge(concurrency: Int = 16, bufferSize: Int
108108
* produces `aa bb b_last`
109109
*/
110110
@FlowPreview
111-
public fun <T, R> Flow<T>.switchMap(transform: suspend (value: T) -> Flow<R>): Flow<R> = scopedFlow {
111+
public fun <T, R> Flow<T>.switchMap(transform: suspend (value: T) -> Flow<R>): Flow<R> = scopedFlow { downstream ->
112112
var previousFlow: Job? = null
113113
collect { value ->
114114
// Linearize calls to emit as alternative to the channel. Bonus points for never-overlapping channels.
@@ -117,7 +117,7 @@ public fun <T, R> Flow<T>.switchMap(transform: suspend (value: T) -> Flow<R>): F
117117
// Undispatched to have better user experience in case of synchronous flows
118118
previousFlow = launch(start = CoroutineStart.UNDISPATCHED) {
119119
transform(value).collect { innerValue ->
120-
emit(innerValue)
120+
downstream.emit(innerValue)
121121
}
122122
}
123123
}

kotlinx-coroutines-core/common/test/flow/internal/FlowScopeTest.kt

-5
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,10 @@
55
package kotlinx.coroutines.flow.internal
66

77
import kotlinx.coroutines.*
8-
import kotlinx.coroutines.flow.*
98
import kotlin.test.*
109

1110
class FlowScopeTest : TestBase() {
1211

13-
private suspend fun flowScope(block: suspend CoroutineScope.() -> Unit): Unit {
14-
scopedFlow<Unit>(block).singleOrNull()
15-
}
16-
1712
@Test
1813
fun testCancellation() = runTest {
1914
assertFailsWith<CancellationException> {

kotlinx-coroutines-core/common/test/flow/operators/FlowOnTest.kt

-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ package kotlinx.coroutines.flow
66

77
import kotlinx.coroutines.*
88
import kotlinx.coroutines.channels.*
9-
import kotlin.math.*
109
import kotlin.test.*
1110

1211
class FlowOnTest : TestBase() {

0 commit comments

Comments
 (0)