Skip to content

Commit 61c64cc

Browse files
committed
Flow improvements:
* Declaration-site variance for FlowCollector * Better Flow documentation * Use SendChannel.isEmpty in FlatMap, add nulls test, use unbox, fail fast for conflated channels * Simplify Flow.asFlowable in rx2 module * Consistent naming
1 parent c9f25fc commit 61c64cc

File tree

18 files changed

+72
-69
lines changed

18 files changed

+72
-69
lines changed

kotlinx-coroutines-core/common/src/flow/Builders.kt

+5-5
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,9 @@ import kotlin.jvm.*
4444
* If you want to switch the context where this flow is executed use [flowOn] operator.
4545
*/
4646
@FlowPreview
47-
public fun <T> flow(@BuilderInference block: suspend FlowCollector<in T>.() -> Unit): Flow<T> {
47+
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> {
4848
return object : Flow<T> {
49-
override suspend fun collect(collector: FlowCollector<in T>) {
49+
override suspend fun collect(collector: FlowCollector<T>) {
5050
SafeCollector(collector, coroutineContext).block()
5151
}
5252
}
@@ -58,9 +58,9 @@ public fun <T> flow(@BuilderInference block: suspend FlowCollector<in T>.() -> U
5858
*/
5959
@FlowPreview
6060
@PublishedApi
61-
internal fun <T> unsafeFlow(@BuilderInference block: suspend FlowCollector<in T>.() -> Unit): Flow<T> {
61+
internal fun <T> unsafeFlow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> {
6262
return object : Flow<T> {
63-
override suspend fun collect(collector: FlowCollector<in T>) {
63+
override suspend fun collect(collector: FlowCollector<T>) {
6464
collector.block()
6565
}
6666
}
@@ -129,7 +129,7 @@ public fun <T> flowOf(vararg elements: T): Flow<T> = unsafeFlow {
129129
public fun <T> emptyFlow(): Flow<T> = EmptyFlow
130130

131131
private object EmptyFlow : Flow<Nothing> {
132-
override suspend fun collect(collector: FlowCollector<in Nothing>) = Unit
132+
override suspend fun collect(collector: FlowCollector<Nothing>) = Unit
133133
}
134134

135135
/**

kotlinx-coroutines-core/common/src/flow/Flow.kt

+11-5
Original file line numberDiff line numberDiff line change
@@ -88,16 +88,22 @@ public interface Flow<out T> {
8888
* 1) It should not change the coroutine context (e.g. with `withContext(Dispatchers.IO)`) when emitting values.
8989
* The emission should happen in the context of the [collect] call.
9090
*
91-
* Only coroutine builders that inherit the context are allowed, for example the following code is fine:
91+
* Only coroutine builders that inherit the context are allowed, for example:
9292
* ```
93-
* coroutineScope { // Context is inherited
94-
* launch { // Dispatcher is not overridden, fine as well
95-
* collector.emit(someValue)
93+
* class MyFlow : Flow<Int> {
94+
* override suspend fun collect(collector: FlowCollector<Int>) {
95+
* coroutineScope {
96+
* // Context is inherited
97+
* launch { // Dispatcher is not overridden, fine as well
98+
* collector.emit(42) // Emit from the launched coroutine
99+
* }
100+
* }
96101
* }
97102
* }
98103
* ```
104+
* is a proper [Flow] implementation, but using `launch(Dispatchers.IO)` is not.
99105
*
100106
* 2) It should serialize calls to [emit][FlowCollector.emit] as [FlowCollector] implementations are not thread safe by default.
101107
*/
102-
public suspend fun collect(collector: FlowCollector<in T>)
108+
public suspend fun collect(collector: FlowCollector<T>)
103109
}

kotlinx-coroutines-core/common/src/flow/FlowCollector.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import kotlinx.coroutines.*
1414
* Implementations of this interface are not thread-safe.
1515
*/
1616
@FlowPreview
17-
public interface FlowCollector<T> {
17+
public interface FlowCollector<in T> {
1818

1919
/**
2020
* Collects the value emitted by the upstream.

kotlinx-coroutines-core/common/src/flow/operators/Context.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import kotlinx.coroutines.flow.unsafeFlow as flow
2929
* .single() // Will be executed in the Main
3030
* }
3131
* ```
32-
* For more explanation of purity concept please refer to [Flow] documentation.
32+
* For more explanation of context preservation please refer to [Flow] documentation.
3333
*
3434
* This operator uses a channel of the specific [bufferSize] in order to switch between contexts,
3535
* but it is not guaranteed that the channel will be created, implementation is free to optimize it away in case of fusing.
@@ -83,7 +83,7 @@ public fun <T> Flow<T>.flowOn(flowContext: CoroutineContext, bufferSize: Int = 1
8383
* }
8484
* .map { ... } // Not affected
8585
* ```
86-
* For more explanation of purity concept please refer to [Flow] documentation.
86+
* For more explanation of context preservation please refer to [Flow] documentation.
8787
*
8888
* This operator uses channel of the specific [bufferSize] in order to switch between contexts,
8989
* but it is not guaranteed that channel will be created, implementation is free to optimize it away in case of fusing.

kotlinx-coroutines-core/common/src/flow/operators/Errors.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ public fun <T> Flow<T>.retry(
7474
}
7575
}
7676

77-
private fun <T> Flow<T>.collectSafely(onException: suspend FlowCollector<in T>.(Throwable) -> Unit): Flow<T> =
77+
private fun <T> Flow<T>.collectSafely(onException: suspend FlowCollector<T>.(Throwable) -> Unit): Flow<T> =
7878
flow {
7979
// Note that exception may come from the downstream operators, we should not switch on that
8080
var fromDownstream = false

kotlinx-coroutines-core/common/src/flow/operators/Merge.kt

+15-16
Original file line numberDiff line numberDiff line change
@@ -16,23 +16,23 @@ import kotlin.jvm.*
1616
import kotlinx.coroutines.flow.unsafeFlow as flow
1717

1818
/**
19-
* Transforms elements emitted by the original flow by applying [mapper], that returns another flow, and then concatenating and flattening these flows.
19+
* Transforms elements emitted by the original flow by applying [transform], that returns another flow, and then concatenating and flattening these flows.
2020
* This method is identical to `flatMapMerge(concurrency = 1, bufferSize = 1)`
2121
*
2222
* Note that even though this operator looks very familiar, we discourage its usage in a regular application-specific flows.
2323
* Most likely, suspending operation in [map] operator will be sufficient and linear transformations are much easier to reason about.
2424
*/
2525
@FlowPreview
26-
public fun <T, R> Flow<T>.flatMapConcat(mapper: suspend (value: T) -> Flow<R>): Flow<R> = flow {
26+
public fun <T, R> Flow<T>.flatMapConcat(transform: suspend (value: T) -> Flow<R>): Flow<R> = flow {
2727
collect { value ->
28-
mapper(value).collect { innerValue ->
28+
transform(value).collect { innerValue ->
2929
emit(innerValue)
3030
}
3131
}
3232
}
3333

3434
/**
35-
* Transforms elements emitted by the original flow by applying [mapper], that returns another flow, and then merging and flattening these flows.
35+
* Transforms elements emitted by the original flow by applying [transform], that returns another flow, and then merging and flattening these flows.
3636
*
3737
* Note that even though this operator looks very familiar, we discourage its usage in a regular application-specific flows.
3838
* Most likely, suspending operation in [map] operator will be sufficient and linear transformations are much easier to reason about.
@@ -41,14 +41,17 @@ public fun <T, R> Flow<T>.flatMapConcat(mapper: suspend (value: T) -> Flow<R>):
4141
* [concurrency] parameter controls the size of in-flight flows, at most [concurrency] flows are collected at the same time.
4242
*/
4343
@FlowPreview
44-
public fun <T, R> Flow<T>.flatMapMerge(concurrency: Int = 16, bufferSize: Int = 16, mapper: suspend (value: T) -> Flow<R>): Flow<R> {
44+
public fun <T, R> Flow<T>.flatMapMerge(concurrency: Int = 16, bufferSize: Int = 16, transform: suspend (value: T) -> Flow<R>): Flow<R> {
45+
require(bufferSize >= 0) { "Expected non-negative buffer size, but had $bufferSize" }
46+
require(concurrency >= 0) { "Expected non-negative concurrency level, but had $concurrency" }
4547
return flow {
4648
val semaphore = Channel<Unit>(concurrency)
4749
val flatMap = SerializingFlatMapCollector(this, bufferSize)
4850
coroutineScope {
4951
collect { outerValue ->
52+
// TODO real semaphore (#94)
5053
semaphore.send(Unit) // Acquire concurrency permit
51-
val inner = mapper(outerValue)
54+
val inner = transform(outerValue)
5255
launch {
5356
try {
5457
inner.collect { value ->
@@ -94,13 +97,12 @@ private class SerializingFlatMapCollector<T>(
9497
) {
9598

9699
// Let's try to leverage the fact that flatMapMerge is never contended
97-
private val channel: Channel<Any?> by lazy { Channel<Any?>(bufferSize) } // Should be any, but KT-30796
100+
// TODO 1.2.1 do not allocate channel
101+
private val channel = Channel<Any?>(bufferSize) // Should be any, but KT-30796
98102
private val inProgressLock = atomic(false)
99-
private val sentValues = atomic(0)
100103

101104
public suspend fun emit(value: T) {
102105
if (!inProgressLock.tryAcquire()) {
103-
sentValues.incrementAndGet()
104106
channel.send(value ?: NullSurrogate)
105107
if (inProgressLock.tryAcquire()) {
106108
helpEmit()
@@ -116,17 +118,14 @@ private class SerializingFlatMapCollector<T>(
116118
private suspend fun helpEmit() {
117119
while (true) {
118120
var element = channel.poll()
119-
while (element != null) { // TODO receive or closed
120-
if (element === NullSurrogate) downstream.emit(null as T)
121-
else downstream.emit(element as T)
122-
sentValues.decrementAndGet()
121+
while (element != null) { // TODO receive or closed (#330)
122+
downstream.emit(NullSurrogate.unbox(element))
123123
element = channel.poll()
124124
}
125125

126126
inProgressLock.release()
127-
// Enforce liveness of the algorithm
128-
// TODO looks like isEmpty use-case
129-
if (sentValues.value == 0 || !inProgressLock.tryAcquire()) break
127+
// Enforce liveness
128+
if (channel.isEmpty || !inProgressLock.tryAcquire()) break
130129
}
131130
}
132131
}

kotlinx-coroutines-core/common/src/flow/operators/Transform.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import kotlinx.coroutines.flow.unsafeFlow as flow
2626
* ```
2727
*/
2828
@FlowPreview
29-
public fun <T, R> Flow<T>.transform(@BuilderInference transformer: suspend FlowCollector<in R>.(value: T) -> Unit): Flow<R> {
29+
public fun <T, R> Flow<T>.transform(@BuilderInference transformer: suspend FlowCollector<R>.(value: T) -> Unit): Flow<R> {
3030
return flow {
3131
collect { value ->
3232
transformer(value)

kotlinx-coroutines-core/common/test/flow/operators/CombineLatestTest.kt

+10-7
Original file line numberDiff line numberDiff line change
@@ -7,50 +7,53 @@ package kotlinx.coroutines.flow
77
import kotlinx.coroutines.*
88
import kotlin.test.*
99

10+
/*
11+
* Replace: { i, j -> i + j } -> { i, j -> i + j } as soon as KT-30991 is fixed
12+
*/
1013
class CombineLatestTest : TestBase() {
1114

1215
@Test
1316
fun testCombineLatest() = runTest {
1417
val flow = flowOf("a", "b", "c")
1518
val flow2 = flowOf(1, 2, 3)
16-
val list = flow.combineLatest(flow2, ::sum).toList()
19+
val list = flow.combineLatest(flow2, { i, j -> i + j }).toList()
1720
assertEquals(listOf("a1", "b1", "b2", "c2", "c3"), list)
1821
}
1922

2023
@Test
2124
fun testNulls() = runTest {
2225
val flow = flowOf("a", null, null)
2326
val flow2 = flowOf(1, 2, 3)
24-
val list = flow.combineLatest(flow2, ::sum).toList()
27+
val list = flow.combineLatest(flow2, { i, j -> i + j }).toList()
2528
assertEquals(listOf("a1", "null1", "null2", "null2", "null3"), list)
2629
}
2730

2831
@Test
2932
fun testNullsOther() = runTest {
3033
val flow = flowOf("a", "b", "c")
3134
val flow2 = flowOf(null, 2, null)
32-
val list = flow.combineLatest(flow2, ::sum).toList()
35+
val list = flow.combineLatest(flow2, { i, j -> i + j }).toList()
3336
assertEquals(listOf("anull", "bnull", "b2", "c2", "cnull"), list)
3437
}
3538

3639
@Test
3740
fun testEmptyFlow() = runTest {
38-
val flow = emptyFlow<String>().combineLatest(emptyFlow(), ::sum)
41+
val flow = emptyFlow<String>().combineLatest(emptyFlow<Int>(), { i, j -> i + j })
3942
assertNull(flow.singleOrNull())
4043
}
4144

4245
@Test
4346
fun testFirstIsEmpty() = runTest {
4447
val f1 = emptyFlow<String>()
4548
val f2 = flowOf(1)
46-
assertEquals(emptyList(), f1.combineLatest(f2, ::sum).toList())
49+
assertEquals(emptyList(), f1.combineLatest(f2, { i, j -> i + j }).toList())
4750
}
4851

4952
@Test
5053
fun testSecondIsEmpty() = runTest {
5154
val f1 = flowOf("a")
5255
val f2 = emptyFlow<Int>()
53-
assertEquals(emptyList(), f1.combineLatest(f2, ::sum).toList())
56+
assertEquals(emptyList(), f1.combineLatest(f2, { i, j -> i + j }).toList())
5457
}
5558

5659
@Test
@@ -77,7 +80,7 @@ class CombineLatestTest : TestBase() {
7780
emit(3)
7881
}
7982

80-
val result = f1.combineLatest(f2, ::sum).toList()
83+
val result = f1.combineLatest(f2, { i, j -> i + j }).toList()
8184
assertEquals(listOf("a1", "b1", "c1", "c2", "c3"), result)
8285
finish(8)
8386
}

kotlinx-coroutines-core/common/test/flow/operators/FlatMapBaseTest.kt

+10-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

5-
package kotlinx.coroutines.flow.operators
5+
package kotlinx.coroutines.flow
66

77
import kotlinx.coroutines.*
88
import kotlinx.coroutines.channels.*
@@ -44,6 +44,15 @@ abstract class FlatMapBaseTest : TestBase() {
4444
assertEquals(42, value)
4545
}
4646

47+
@Test
48+
fun testNulls() = runTest {
49+
val list = flowOf(1, null, 2).flatMap {
50+
flowOf(1, null, null, 2)
51+
}.toList()
52+
53+
assertEquals(List(3) { listOf(1, null, null, 2)}.flatten(), list)
54+
}
55+
4756
@Test
4857
fun testContext() = runTest {
4958
val captured = ArrayList<String>()

kotlinx-coroutines-core/common/test/flow/operators/FlatMapConcatTest.kt

+1-3
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,11 @@
55
package kotlinx.coroutines.flow
66

77
import kotlinx.coroutines.*
8-
import kotlinx.coroutines.channels.*
9-
import kotlinx.coroutines.flow.operators.*
108
import kotlin.test.*
119

1210
class FlatMapConcatTest : FlatMapBaseTest() {
1311

14-
override fun <T> Flow<T>.flatMap(mapper: suspend (T) -> Flow<T>): Flow<T> = flatMapConcat(mapper = mapper)
12+
override fun <T> Flow<T>.flatMap(mapper: suspend (T) -> Flow<T>): Flow<T> = flatMapConcat(transform = mapper)
1513

1614
@Test
1715
fun testFlatMapConcurrency() = runTest {

kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeBaseTest.kt

+1-2
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,10 @@
33
*/
44

55

6-
package kotlinx.coroutines.flow.operators
6+
package kotlinx.coroutines.flow
77

88
import kotlinx.coroutines.*
99
import kotlinx.coroutines.channels.*
10-
import kotlinx.coroutines.flow.*
1110
import kotlin.test.*
1211

1312
abstract class FlatMapMergeBaseTest : FlatMapBaseTest() {

kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeTest.kt

+1-3
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,11 @@
55
package kotlinx.coroutines.flow
66

77
import kotlinx.coroutines.*
8-
import kotlinx.coroutines.channels.*
9-
import kotlinx.coroutines.flow.operators.*
108
import kotlin.test.*
119

1210
class FlatMapMergeTest : FlatMapMergeBaseTest() {
1311

14-
override fun <T> Flow<T>.flatMap(mapper: suspend (T) -> Flow<T>): Flow<T> = flatMapMerge(mapper = mapper)
12+
override fun <T> Flow<T>.flatMap(mapper: suspend (T) -> Flow<T>): Flow<T> = flatMapMerge(transform = mapper)
1513

1614
@Test
1715
override fun testFlatMapConcurrency() = runTest {

kotlinx-coroutines-core/common/test/flow/operators/FlattenConcatTest.kt

-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
package kotlinx.coroutines.flow
66

77
import kotlinx.coroutines.*
8-
import kotlinx.coroutines.flow.operators.*
98
import kotlin.test.*
109

1110
class FlattenConcatTest : FlatMapBaseTest() {

kotlinx-coroutines-core/common/test/flow/operators/FlattenMergeTest.kt

-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
package kotlinx.coroutines.flow
66

77
import kotlinx.coroutines.*
8-
import kotlinx.coroutines.flow.operators.*
98
import kotlin.test.*
109

1110
class FlattenMergeTest : FlatMapMergeBaseTest() {

0 commit comments

Comments
 (0)