Skip to content

Commit 8788488

Browse files
committed
FlatMap-related API rework
* Deprecate flatMap to see the feedback about it * Introduce flatMapMerge and flatMapConcat (concurrent and sequential versions) * Rename concat to flattenMerge and flattenConcat to be more like Sequence
1 parent 170690f commit 8788488

File tree

13 files changed

+422
-440
lines changed

13 files changed

+422
-440
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt

+9-9
Original file line numberDiff line numberDiff line change
@@ -793,8 +793,6 @@ public final class kotlinx/coroutines/flow/FlowKt {
793793
public static final fun broadcastIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;ILkotlinx/coroutines/CoroutineStart;)Lkotlinx/coroutines/channels/BroadcastChannel;
794794
public static synthetic fun broadcastIn$default (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;ILkotlinx/coroutines/CoroutineStart;ILjava/lang/Object;)Lkotlinx/coroutines/channels/BroadcastChannel;
795795
public static final fun collect (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
796-
public static final fun concatenate (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
797-
public static final fun concatenate (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
798796
public static final fun count (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
799797
public static final fun count (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
800798
public static final fun delayEach (Lkotlinx/coroutines/flow/Flow;J)Lkotlinx/coroutines/flow/Flow;
@@ -807,8 +805,12 @@ public final class kotlinx/coroutines/flow/FlowKt {
807805
public static final fun filter (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
808806
public static final fun filterNot (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
809807
public static final fun filterNotNull (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
810-
public static final fun flatMap (Lkotlinx/coroutines/flow/Flow;IILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
811-
public static synthetic fun flatMap$default (Lkotlinx/coroutines/flow/Flow;IILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
808+
public static final fun flatMapConcat (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
809+
public static final fun flatMapMerge (Lkotlinx/coroutines/flow/Flow;IILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
810+
public static synthetic fun flatMapMerge$default (Lkotlinx/coroutines/flow/Flow;IILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
811+
public static final fun flattenConcat (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
812+
public static final fun flattenMerge (Lkotlinx/coroutines/flow/Flow;II)Lkotlinx/coroutines/flow/Flow;
813+
public static synthetic fun flattenMerge$default (Lkotlinx/coroutines/flow/Flow;IIILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
812814
public static final fun flow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
813815
public static final fun flowOf ([Ljava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
814816
public static final fun flowOn (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;I)Lkotlinx/coroutines/flow/Flow;
@@ -820,10 +822,6 @@ public final class kotlinx/coroutines/flow/FlowKt {
820822
public static final fun fold (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
821823
public static final fun map (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
822824
public static final fun mapNotNull (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
823-
public static final fun merge (Ljava/lang/Iterable;II)Lkotlinx/coroutines/flow/Flow;
824-
public static final fun merge (Lkotlinx/coroutines/flow/Flow;II)Lkotlinx/coroutines/flow/Flow;
825-
public static synthetic fun merge$default (Ljava/lang/Iterable;IIILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
826-
public static synthetic fun merge$default (Lkotlinx/coroutines/flow/Flow;IIILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
827825
public static final fun onEach (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
828826
public static final fun onErrorCollect (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
829827
public static synthetic fun onErrorCollect$default (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
@@ -851,8 +849,10 @@ public final class kotlinx/coroutines/flow/MigrationKt {
851849
public static final fun BehaviourSubject ()Ljava/lang/Object;
852850
public static final fun PublishSubject ()Ljava/lang/Object;
853851
public static final fun ReplaySubject ()Ljava/lang/Object;
854-
public static final fun concat (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
855852
public static final fun concatMap (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
853+
public static final fun flatMap (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
854+
public static final fun flatten (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
855+
public static final fun merge (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
856856
public static final fun observeOn (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/flow/Flow;
857857
public static final fun onErrorResume (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
858858
public static final fun publishOn (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/flow/Flow;

kotlinx-coroutines-core/common/src/flow/Migration.kt

+32-6
Original file line numberDiff line numberDiff line change
@@ -64,18 +64,44 @@ public fun <T> Flow<T>.subscribe(onEach: (T) -> Unit): Unit = error("Should not
6464
@Deprecated(message = "Use launch + collect instead", level = DeprecationLevel.ERROR)
6565
public fun <T> Flow<T>.subscribe(onEach: (T) -> Unit, onError: (Throwable) -> Unit): Unit = error("Should not be called")
6666

67-
/** @suppress **/
67+
68+
/**
69+
* Note that this replacement is sequential (`concat`) by default.
70+
* For concurrent flatMap [flatMapMerge] can be used instead.
71+
* @suppress
72+
*/
6873
@Deprecated(
6974
level = DeprecationLevel.ERROR,
70-
message = "Flow analogue is named concatenate",
71-
replaceWith = ReplaceWith("concatenate()")
75+
message = "Flow analogue is named flatMapConcat",
76+
replaceWith = ReplaceWith("flatMapConcat(mapper)")
7277
)
73-
public fun <T> Flow<T>.concat(): Flow<T> = error("Should not be called")
78+
public fun <T, R> Flow<T>.flatMap(mapper: suspend (T) -> Flow<R>): Flow<R> = error("Should not be called")
7479

7580
/** @suppress **/
7681
@Deprecated(
7782
level = DeprecationLevel.ERROR,
78-
message = "Flow analogue is named concatenate",
79-
replaceWith = ReplaceWith("concatenate(mapper)")
83+
message = "Flow analogue is named flatMapConcat",
84+
replaceWith = ReplaceWith("flatMapConcat(mapper)")
8085
)
8186
public fun <T, R> Flow<T>.concatMap(mapper: (T) -> Flow<R>): Flow<R> = error("Should not be called")
87+
88+
89+
/**
90+
* Note that this replacement is sequential (`concat`) by default.
91+
* For concurrent flatMap [flattenMerge] can be used instead.
92+
* @suppress
93+
*/
94+
@Deprecated(
95+
level = DeprecationLevel.ERROR,
96+
message = "Flow analogue is named flattenConcat",
97+
replaceWith = ReplaceWith("flattenConcat()")
98+
)
99+
public fun <T> Flow<Flow<T>>.merge(): Flow<T> = error("Should not be called")
100+
101+
/** @suppress **/
102+
@Deprecated(
103+
level = DeprecationLevel.ERROR,
104+
message = "Flow analogue is named flattenConcat",
105+
replaceWith = ReplaceWith("flattenConcat()")
106+
)
107+
public fun <T> Flow<Flow<T>>.flatten(): Flow<T> = error("Should not be called")

kotlinx-coroutines-core/common/src/flow/operators/Merge.kt

+34-37
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,29 @@
77
@file:Suppress("unused")
88

99
package kotlinx.coroutines.flow
10+
1011
import kotlinx.atomicfu.*
1112
import kotlinx.coroutines.*
1213
import kotlinx.coroutines.channels.*
1314
import kotlinx.coroutines.flow.internal.*
14-
import kotlinx.coroutines.flow.unsafeFlow as flow
1515
import kotlin.jvm.*
16+
import kotlinx.coroutines.flow.unsafeFlow as flow
17+
18+
/**
19+
* Transforms elements emitted by the original flow by applying [mapper], that returns another flow, and then concatenating and flattening these flows.
20+
* This method is identical to `flatMapMerge(concurrency = 1, bufferSize = 1)`
21+
*
22+
* Note that even though this operator looks very familiar, we discourage its usage in a regular application-specific flows.
23+
* Most likely, suspending operation in [map] operator will be sufficient and linear transformations are much easier to reason about.
24+
*/
25+
@FlowPreview
26+
public fun <T, R> Flow<T>.flatMapConcat(mapper: suspend (value: T) -> Flow<R>): Flow<R> = flow {
27+
collect { value ->
28+
mapper(value).collect { innerValue ->
29+
emit(innerValue)
30+
}
31+
}
32+
}
1633

1734
/**
1835
* Transforms elements emitted by the original flow by applying [mapper], that returns another flow, and then merging and flattening these flows.
@@ -21,10 +38,10 @@ import kotlin.jvm.*
2138
* Most likely, suspending operation in [map] operator will be sufficient and linear transformations are much easier to reason about.
2239
*
2340
* [bufferSize] parameter controls the size of backpressure aka the amount of queued in-flight elements.
24-
* [concurrency] parameter controls the size of in-flight flows.
41+
* [concurrency] parameter controls the size of in-flight flows, at most [concurrency] flows are collected at the same time.
2542
*/
2643
@FlowPreview
27-
public fun <T, R> Flow<T>.flatMap(concurrency: Int = 16, bufferSize: Int = 16, mapper: suspend (value: T) -> Flow<R>): Flow<R> {
44+
public fun <T, R> Flow<T>.flatMapMerge(concurrency: Int = 16, bufferSize: Int = 16, mapper: suspend (value: T) -> Flow<R>): Flow<R> {
2845
return flow {
2946
val semaphore = Channel<Unit>(concurrency)
3047
val flatMap = SerializingFlatMapCollector(this, bufferSize)
@@ -47,56 +64,36 @@ public fun <T, R> Flow<T>.flatMap(concurrency: Int = 16, bufferSize: Int = 16, m
4764
}
4865

4966
/**
50-
* Merges the given sequence of flows into a single flow with no guarantees on the order.
51-
*
52-
* [bufferSize] parameter controls the size of backpressure aka the amount of queued in-flight elements.
53-
* [concurrency] parameter controls the size of in-flight flows.
67+
* Flattens the given flow of flows into a single flow in a sequentially manner, without interleaving nested flows.
68+
* This method is identical to `flattenMerge(concurrency = 1, bufferSize = 1)
5469
*/
5570
@FlowPreview
56-
public fun <T> Iterable<Flow<T>>.merge(concurrency: Int = 16, bufferSize: Int = 16): Flow<T> = asFlow().flatMap(concurrency, bufferSize) { it }
57-
58-
/**
59-
* Merges the given flow of flows into a single flow with no guarantees on the order.
60-
*
61-
* [bufferSize] parameter controls the size of backpressure aka the amount of queued in-flight elements.
62-
* [concurrency] parameter controls the size of in-flight flows.
63-
*/
64-
@FlowPreview
65-
public fun <T> Flow<Flow<T>>.merge(concurrency: Int = 16, bufferSize: Int = 16): Flow<T> = flatMap(concurrency, bufferSize) { it }
66-
67-
/**
68-
* Concatenates values of each flow sequentially, without interleaving them.
69-
*/
70-
@FlowPreview
71-
public fun <T> Flow<Flow<T>>.concatenate(): Flow<T> = flow {
72-
collect {
73-
val inner = it
74-
inner.collect { value ->
75-
emit(value)
71+
public fun <T> Flow<Flow<T>>.flattenConcat(): Flow<T> = flow {
72+
collect { value ->
73+
value.collect { innerValue ->
74+
emit(innerValue)
7675
}
7776
}
7877
}
7978

8079
/**
81-
* Transforms each value of the given flow into flow of another type and then flattens these flows
82-
* sequentially, without interleaving them.
80+
* Flattens the given flow of flows into a single flow.
81+
* This method is identical to `flatMapMerge(concurrency, bufferSize) { it }`
82+
*
83+
* [bufferSize] parameter controls the size of backpressure aka the amount of queued in-flight elements.
84+
* [concurrency] parameter controls the size of in-flight flows, at most [concurrency] flows are collected at the same time.
8385
*/
8486
@FlowPreview
85-
public fun <T, R> Flow<T>.concatenate(mapper: suspend (T) -> Flow<R>): Flow<R> = flow {
86-
collect { value ->
87-
mapper(value).collect { innerValue ->
88-
emit(innerValue)
89-
}
90-
}
91-
}
87+
public fun <T> Flow<Flow<T>>.flattenMerge(concurrency: Int = 16, bufferSize: Int = 16): Flow<T> = flatMapMerge(concurrency, bufferSize) { it }
88+
9289

9390
// Effectively serializes access to downstream collector from flatMap
9491
private class SerializingFlatMapCollector<T>(
9592
private val downstream: FlowCollector<T>,
9693
private val bufferSize: Int
9794
) {
9895

99-
// Let's try to leverage the fact that flatMap is never contended
96+
// Let's try to leverage the fact that flatMapMerge is never contended
10097
private val channel: Channel<Any?> by lazy { Channel<Any?>(bufferSize) } // Should be any, but KT-30796
10198
private val inProgressLock = atomic(false)
10299
private val sentValues = atomic(0)

kotlinx-coroutines-core/common/test/flow/operators/ConcatenateMapTest.kt

-137
This file was deleted.

0 commit comments

Comments
 (0)