Skip to content

Commit 73068ae

Browse files
committed
Introduce merge operator
Fixes #1491
1 parent 6cb317b commit 73068ae

File tree

4 files changed

+126
-0
lines changed

4 files changed

+126
-0
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt

+2
Original file line numberDiff line numberDiff line change
@@ -926,7 +926,9 @@ public final class kotlinx/coroutines/flow/FlowKt {
926926
public static final fun map (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
927927
public static final fun mapLatest (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
928928
public static final fun mapNotNull (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
929+
public static final fun merge (Ljava/lang/Iterable;)Lkotlinx/coroutines/flow/Flow;
929930
public static final fun merge (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
931+
public static final fun merge ([Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
930932
public static final fun observeOn (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/flow/Flow;
931933
public static final synthetic fun onCompletion (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
932934
public static final fun onCompletion (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;

kotlinx-coroutines-core/common/src/flow/internal/Merge.kt

+20
Original file line numberDiff line numberDiff line change
@@ -75,3 +75,23 @@ internal class ChannelFlowMerge<T>(
7575
override fun additionalToStringProps(): String =
7676
"concurrency=$concurrency, "
7777
}
78+
79+
internal class ChannelLimitedFlowMerge<T>(
80+
private val flows: Iterable<Flow<T>>,
81+
context: CoroutineContext = EmptyCoroutineContext,
82+
capacity: Int = Channel.BUFFERED
83+
) : ChannelFlow<T>(context, capacity) {
84+
override fun create(context: CoroutineContext, capacity: Int): ChannelFlow<T> =
85+
ChannelLimitedFlowMerge(flows, context, capacity)
86+
87+
override fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> {
88+
return scope.flowProduce(context, capacity, block = collectToFun)
89+
}
90+
91+
override suspend fun collectTo(scope: ProducerScope<T>) {
92+
val collector = SendingCollector(scope)
93+
flows.forEach { flow ->
94+
scope.launch { flow.collect(collector) }
95+
}
96+
}
97+
}

kotlinx-coroutines-core/common/src/flow/operators/Merge.kt

+36
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,42 @@ public fun <T> Flow<Flow<T>>.flattenConcat(): Flow<T> = flow {
8181
collect { value -> emitAll(value) }
8282
}
8383

84+
/**
85+
* Merges the given flows into a single flow without preserving an order of elements.
86+
* All flows are merged concurrently, without limit on the number of simultaneously collected flows.
87+
*
88+
* ### Operator fusion
89+
*
90+
* Applications of [flowOn], [buffer], [produceIn], and [broadcastIn] _after_ this operator are fused with
91+
* its concurrent merging so that only one properly configured channel is used for execution of merging logic.
92+
*/
93+
@ExperimentalCoroutinesApi
94+
public fun <T> Iterable<Flow<T>>.merge(): Flow<T> {
95+
/*
96+
* This is a fuseable implementation of the following operator:
97+
* channelFlow {
98+
* forEach { flow ->
99+
* launch {
100+
* flow.collect { send(it) }
101+
* }
102+
* }
103+
* }
104+
*/
105+
return ChannelLimitedFlowMerge(this)
106+
}
107+
108+
/**
109+
* Merges the given flows into a single flow without preserving an order of elements.
110+
* All flows are merged concurrently, without limit on the number of simultaneously collected flows.
111+
*
112+
* ### Operator fusion
113+
*
114+
* Applications of [flowOn], [buffer], [produceIn], and [broadcastIn] _after_ this operator are fused with
115+
* its concurrent merging so that only one properly configured channel is used for execution of merging logic.
116+
*/
117+
@ExperimentalCoroutinesApi
118+
public fun <T> merge(vararg flows: Flow<T>): Flow<T> = flows.asIterable().merge()
119+
84120
/**
85121
* Flattens the given flow of flows into a single flow with a [concurrency] limit on the number of
86122
* concurrently collected flows.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.flow
6+
7+
import kotlinx.coroutines.*
8+
import kotlin.test.*
9+
import kotlinx.coroutines.flow.merge as originalMerge
10+
11+
abstract class MergeTest : TestBase() {
12+
13+
abstract fun <T> Iterable<Flow<T>>.merge(): Flow<T>
14+
15+
@Test
16+
fun testMerge() = runTest {
17+
val n = 100
18+
val sum = (1..n).map { flowOf(it) }
19+
.merge()
20+
.sum()
21+
22+
assertEquals(n * (n + 1) / 2, sum)
23+
}
24+
25+
@Test
26+
fun testSingle() = runTest {
27+
val flow = listOf(flowOf(), flowOf(42), flowOf()).merge()
28+
val value = flow.single()
29+
assertEquals(42, value)
30+
}
31+
32+
@Test
33+
fun testNulls() = runTest {
34+
val list = listOf(flowOf(1), flowOf(null), flowOf(2)).merge().toList()
35+
assertEquals(listOf(1, null, 2), list)
36+
}
37+
38+
@Test
39+
fun testContext() = runTest {
40+
val flow = flow {
41+
emit(NamedDispatchers.name())
42+
}.flowOn(NamedDispatchers("source"))
43+
44+
val result = listOf(flow).merge().flowOn(NamedDispatchers("irrelevant")).toList()
45+
assertEquals(listOf("source"), result)
46+
}
47+
48+
@Test
49+
fun testIsolatedContext() = runTest {
50+
val flow = flow {
51+
emit(NamedDispatchers.name())
52+
}
53+
54+
val result = listOf(flow.flowOn(NamedDispatchers("1")), flow.flowOn(NamedDispatchers("2")))
55+
.merge()
56+
.flowOn(NamedDispatchers("irrelevant"))
57+
.toList()
58+
assertEquals(listOf("1", "2"), result)
59+
}
60+
}
61+
62+
class IterableMergeTest : MergeTest() {
63+
override fun <T> Iterable<Flow<T>>.merge(): Flow<T> = originalMerge()
64+
}
65+
66+
class VarargMergeTest : MergeTest() {
67+
override fun <T> Iterable<Flow<T>>.merge(): Flow<T> = originalMerge(*toList().toTypedArray())
68+
}

0 commit comments

Comments
 (0)