Skip to content

Commit fcc53f8

Browse files
committed
Incorporate flow scope into flow operators
1 parent 533666f commit fcc53f8

File tree

15 files changed

+333
-85
lines changed

15 files changed

+333
-85
lines changed

kotlinx-coroutines-core/common/src/JobSupport.kt

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -620,16 +620,15 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
620620
}
621621

622622
/**
623-
* Returns `true` if job should cancel itself during on child [CancellationException].
623+
* Returns `true` if job should cancel itself on child [CancellationException].
624624
*/
625625
public open fun cancelOnChildCancellation(cause: CancellationException) = false
626626

627627
/**
628628
* Child was cancelled with a cause.
629-
* At this point parent decides whether it cancels itself (e.g. on a critical failure) and
629+
* In this method parent decides whether it cancels itself (e.g. on a critical failure) and
630630
* whether it handles the exception of the child.
631-
*
632-
* It is overridden in supervisor implementations to completely ignore any child cancellation
631+
* It is overridden in supervisor implementations to completely ignore any child cancellation.
633632
*/
634633
public open fun childCancelled(cause: Throwable): Boolean {
635634
if (cause is CancellationException && !cancelOnChildCancellation(cause)) return true
@@ -949,11 +948,8 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
949948

950949
/**
951950
* Returns `true` for scoped coroutines.
952-
* Scoped coroutine is a coroutine that is executed sequentially within the enclosing scope
953-
* without any concurrency.
954-
* Scoped coroutines always handle any exception happened within -- they just rethrow it
955-
* to the enclosing scope.
956-
*
951+
* Scoped coroutine is a coroutine that is executed sequentially within the enclosing scope without any concurrency.
952+
* Scoped coroutines always handle any exception happened within -- they just rethrow it to the enclosing scope.
957953
* Examples of scoped coroutines are `coroutineScope`, `withTimeout` and `runBlocking`.
958954
*/
959955
protected open val isScopedCoroutine: Boolean get() = false

kotlinx-coroutines-core/common/src/flow/Migration.kt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,6 @@ public fun <T> Flow<T>.onErrorResume(fallback: Flow<T>): Flow<T> = error("Should
118118
@Deprecated(message = "withContext in flow body is deprecated, use flowOn instead", level = DeprecationLevel.ERROR)
119119
public fun <T, R> FlowCollector<T>.withContext(context: CoroutineContext, block: suspend () -> R): Unit = error("Should not be called")
120120

121-
122121
/**
123122
* `subscribe` is Rx-specific API that has no direct match in flows.
124123
* One can use `launch` instead, for example the following:

kotlinx-coroutines-core/common/src/flow/internal/FlowScope.kt

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,33 +5,56 @@
55
package kotlinx.coroutines.flow.internal
66

77
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.flow.*
89
import kotlinx.coroutines.internal.*
910
import kotlinx.coroutines.intrinsics.*
1011
import kotlin.coroutines.*
1112
import kotlin.coroutines.intrinsics.*
1213

1314
/**
14-
* Creates a [CoroutineScope] and calls the specified suspend block with this scope.
15-
* This builder is similar to [coroutineScope] with the only exception that it *ties* lifecycle of children
16-
* and itself regarding the cancellation, thus being cancelled when one of the children becomes cancelled.
15+
* Creates a flow that also provides a [CoroutineScope] for each collector.
1716
*
17+
* Provided [CoroutineScope] is similar to [coroutineScope] with the only exception that it *ties* lifecycle of children
18+
* and itself regarding the cancellation, thus being cancelled when one of the children becomes cancelled.
1819
* For example:
1920
* ```
20-
* flowScope {
21+
* scopedFlow {
2122
* launch {
2223
* throw CancellationException()
2324
* }
2425
* } // <- CE will be rethrown here
2526
* ```
27+
*
28+
* Basically, it is shorthand for:
29+
* ```
30+
* flow {
31+
* coroutineScope {
32+
* ...
33+
* }
34+
* }
35+
* ```
36+
* with additional constraint on cancellation.
37+
* To cancel child without cancelling itself, `cancel(ChildCancelledException())` should be used.
2638
*/
27-
internal suspend fun <R> flowScope(block: suspend CoroutineScope.() -> R): R =
28-
suspendCoroutineUninterceptedOrReturn { uCont ->
29-
val coroutine = FlowScope(uCont.context, uCont)
30-
coroutine.startUndispatchedOrReturn(coroutine, block)
39+
internal fun <R> scopedFlow(@BuilderInference block: suspend FlowScope<Unit, R>.() -> Unit): Flow<R> =
40+
object : Flow<R> {
41+
override suspend fun collect(collector: FlowCollector<R>) {
42+
suspendCoroutineUninterceptedOrReturn<Unit> { uCont ->
43+
val coroutine = FlowScope(collector, uCont.context, uCont)
44+
coroutine.startUndispatchedOrReturn(coroutine, block)
45+
}
46+
}
3147
}
3248

33-
private class FlowScope<T>(context: CoroutineContext, uCont: Continuation<T>) : ScopeCoroutine<T>(context, uCont) {
49+
internal class FlowScope<T, R>(
50+
private val collector: FlowCollector<R>,
51+
context: CoroutineContext,
52+
uCont: Continuation<T>
53+
) : ScopeCoroutine<T>(context, uCont), FlowCollector<R> {
54+
3455
public override fun cancelOnChildCancellation(cause: CancellationException) = cause !is ChildCancelledException
56+
57+
override suspend fun emit(value: R) = collector.emit(value)
3558
}
3659

3760
internal class ChildCancelledException : CancellationException(null)

kotlinx-coroutines-core/common/src/flow/operators/Delay.kt

Lines changed: 45 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -60,34 +60,33 @@ public fun <T> Flow<T>.delayEach(timeMillis: Long): Flow<T> = flow {
6060
*/
6161
public fun <T> Flow<T>.debounce(timeoutMillis: Long): Flow<T> {
6262
require(timeoutMillis > 0) { "Debounce timeout should be positive" }
63-
return flow {
64-
coroutineScope {
65-
val values = Channel<Any?>(Channel.CONFLATED) // Actually Any, KT-30796
66-
// Channel is not closed deliberately as there is no close with value
67-
val collector = async {
68-
collect { value -> values.send(value ?: NULL) }
69-
}
63+
return scopedFlow {
64+
val values = Channel<Any?>(Channel.CONFLATED) // Actually Any, KT-30796
65+
// Channel is not closed deliberately as there is no close with value
66+
val collector = async {
67+
collect { value -> values.send(value ?: NULL) }
68+
}
7069

71-
var isDone = false
72-
var lastValue: Any? = null
73-
while (!isDone) {
74-
select<Unit> {
75-
values.onReceive {
76-
lastValue = it
77-
}
70+
var isDone = false
71+
var lastValue: Any? = null
72+
while (!isDone) {
73+
select<Unit> {
74+
values.onReceive {
75+
lastValue = it
76+
}
7877

79-
lastValue?.let { value -> // set timeout when lastValue != null
80-
onTimeout(timeoutMillis) {
81-
lastValue = null // Consume the value
82-
emit(NULL.unbox(value))
83-
}
78+
lastValue?.let { value ->
79+
// set timeout when lastValue != null
80+
onTimeout(timeoutMillis) {
81+
lastValue = null // Consume the value
82+
emit(NULL.unbox(value))
8483
}
84+
}
8585

86-
// Close with value 'idiom'
87-
collector.onAwait {
88-
if (lastValue != null) emit(NULL.unbox(lastValue))
89-
isDone = true
90-
}
86+
// Close with value 'idiom'
87+
collector.onAwait {
88+
if (lastValue != null) emit(NULL.unbox(lastValue))
89+
isDone = true
9190
}
9291
}
9392
}
@@ -112,32 +111,31 @@ public fun <T> Flow<T>.debounce(timeoutMillis: Long): Flow<T> {
112111
*/
113112
public fun <T> Flow<T>.sample(periodMillis: Long): Flow<T> {
114113
require(periodMillis > 0) { "Sample period should be positive" }
115-
return flow {
116-
coroutineScope {
117-
val values = produce<Any?>(capacity = Channel.CONFLATED) { // Actually Any, KT-30796
118-
collect { value -> send(value ?: NULL) }
119-
}
114+
return scopedFlow {
115+
val values = produce<Any?>(capacity = Channel.CONFLATED) {
116+
// Actually Any, KT-30796
117+
collect { value -> send(value ?: NULL) }
118+
}
120119

121-
var isDone = false
122-
var lastValue: Any? = null
123-
val ticker = fixedPeriodTicker(periodMillis)
124-
while (!isDone) {
125-
select<Unit> {
126-
values.onReceiveOrNull {
127-
if (it == null) {
128-
ticker.cancel()
129-
isDone = true
130-
} else {
131-
lastValue = it
132-
}
120+
var isDone = false
121+
var lastValue: Any? = null
122+
val ticker = fixedPeriodTicker(periodMillis)
123+
while (!isDone) {
124+
select<Unit> {
125+
values.onReceiveOrNull {
126+
if (it == null) {
127+
ticker.cancel(ChildCancelledException())
128+
isDone = true
129+
} else {
130+
lastValue = it
133131
}
132+
}
134133

135-
// todo: shall be start sampling only when an element arrives or sample aways as here?
136-
ticker.onReceive {
137-
val value = lastValue ?: return@onReceive
138-
lastValue = null // Consume the value
139-
emit(NULL.unbox(value))
140-
}
134+
// todo: shall be start sampling only when an element arrives or sample aways as here?
135+
ticker.onReceive {
136+
val value = lastValue ?: return@onReceive
137+
lastValue = null // Consume the value
138+
emit(NULL.unbox(value))
141139
}
142140
}
143141
}

kotlinx-coroutines-core/common/src/flow/operators/Merge.kt

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -129,17 +129,16 @@ public fun <T> Flow<Flow<T>>.flattenMerge(concurrency: Int = DEFAULT_CONCURRENCY
129129
* produces `aa bb b_last`
130130
*/
131131
@FlowPreview
132-
public fun <T, R> Flow<T>.switchMap(transform: suspend (value: T) -> Flow<R>): Flow<R> = flow {
133-
coroutineScope {
134-
var previousFlow: Job? = null
135-
collect { value ->
136-
// Linearize calls to emit as alternative to the channel. Bonus points for never-overlapping channels.
137-
previousFlow?.cancelAndJoin()
138-
// Undispatched to have better user experience in case of synchronous flows
139-
previousFlow = launch(start = CoroutineStart.UNDISPATCHED) {
140-
transform(value).collect { innerValue ->
141-
emit(innerValue)
142-
}
132+
public fun <T, R> Flow<T>.switchMap(transform: suspend (value: T) -> Flow<R>): Flow<R> = scopedFlow {
133+
var previousFlow: Job? = null
134+
collect { value ->
135+
// Linearize calls to emit as alternative to the channel. Bonus points for never-overlapping channels.
136+
previousFlow?.cancel(ChildCancelledException())
137+
previousFlow?.join()
138+
// Undispatched to have better user experience in case of synchronous flows
139+
previousFlow = launch(start = CoroutineStart.UNDISPATCHED) {
140+
transform(value).collect { innerValue ->
141+
emit(innerValue)
143142
}
144143
}
145144
}

kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,4 +81,35 @@ class ChannelFlowTest : TestBase() {
8181
assertFailsWith<TestException>(flow)
8282
finish(4)
8383
}
84+
85+
@Test
86+
fun testMergeOneCoroutineWithCancellation() = runTest {
87+
val flow = flowOf(1, 2, 3)
88+
val f = flow.mergeOneCoroutine(flow).take(2)
89+
assertEquals(listOf(1, 1), f.toList())
90+
}
91+
92+
@Test
93+
fun testMergeTwoCoroutinesWithCancellation() = runTest {
94+
val flow = flowOf(1, 2, 3)
95+
val f = flow.mergeTwoCoroutines(flow).take(2)
96+
assertEquals(listOf(1, 1), f.toList())
97+
}
98+
99+
private fun Flow<Int>.mergeTwoCoroutines(other: Flow<Int>): Flow<Int> = channelFlow {
100+
launch {
101+
collect { send(it); yield() }
102+
}
103+
launch {
104+
other.collect { send(it) }
105+
}
106+
}
107+
108+
private fun Flow<Int>.mergeOneCoroutine(other: Flow<Int>): Flow<Int> = channelFlow {
109+
launch {
110+
collect { send(it); yield() }
111+
}
112+
113+
other.collect { send(it); yield() }
114+
}
84115
}

kotlinx-coroutines-core/common/test/flow/internal/FlowScopeTest.kt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,15 @@
55
package kotlinx.coroutines.flow.internal
66

77
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.flow.*
89
import kotlin.test.*
910

1011
class FlowScopeTest : TestBase() {
1112

13+
private suspend fun flowScope(block: suspend CoroutineScope.() -> Unit): Unit {
14+
scopedFlow<Unit>(block).singleOrNull()
15+
}
16+
1217
@Test
1318
fun testCancellation() = runTest {
1419
assertFailsWith<CancellationException> {

kotlinx-coroutines-core/common/test/flow/operators/CombineLatestTest.kt

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,46 @@ abstract class CombineLatestTestBase : TestBase() {
197197
assertFailsWith<TestException>(flow)
198198
finish(2)
199199
}
200+
201+
@Test
202+
fun testCancellationExceptionUpstream() = runTest {
203+
val f1 = flow {
204+
expect(1)
205+
emit(1)
206+
throw CancellationException("")
207+
}
208+
val f2 = flow {
209+
emit(1)
210+
hang { expect(3) }
211+
}
212+
213+
val flow = f1.combineLatest(f2, { _, _ -> 1 }).onEach { expect(2) }
214+
assertFailsWith<CancellationException>(flow)
215+
finish(4)
216+
}
217+
218+
@Test
219+
fun testCancellationExceptionDownstream() = runTest {
220+
val f1 = flow {
221+
emit(1)
222+
expect(2)
223+
hang { expect(5) }
224+
}
225+
val f2 = flow {
226+
emit(1)
227+
expect(3)
228+
hang { expect(6) }
229+
}
230+
231+
val flow = f1.combineLatest(f2, { _, _ -> 1 }).onEach {
232+
expect(1)
233+
yield()
234+
expect(4)
235+
throw CancellationException("")
236+
}
237+
assertFailsWith<CancellationException>(flow)
238+
finish(7)
239+
}
200240
}
201241

202242
class CombineLatestTest : CombineLatestTestBase() {

kotlinx-coroutines-core/common/test/flow/operators/DebounceTest.kt

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,20 +95,25 @@ class DebounceTest : TestBase() {
9595
}
9696

9797
@Test
98-
fun testUpstreamError() = runTest {
98+
fun testUpstreamError()= testUpstreamError(TimeoutCancellationException(""))
99+
100+
@Test
101+
fun testUpstreamErrorCancellation() = testUpstreamError(TimeoutCancellationException(""))
102+
103+
private inline fun <reified T: Throwable> testUpstreamError(cause: T) = runTest {
99104
val latch = Channel<Unit>()
100105
val flow = flow {
101106
expect(1)
102107
emit(1)
103108
expect(2)
104109
latch.receive()
105-
throw TestException()
110+
throw cause
106111
}.debounce(1).map {
107112
latch.send(Unit)
108113
hang { expect(3) }
109114
}
110115

111-
assertFailsWith<TestException>(flow)
116+
assertFailsWith<T>(flow)
112117
finish(4)
113118
}
114119

0 commit comments

Comments
 (0)