Skip to content

Commit c8be056

Browse files
committed
Introduce merge operator
Fixes #1491
1 parent 6cb317b commit c8be056

File tree

4 files changed

+59
-0
lines changed

4 files changed

+59
-0
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt

+2
Original file line numberDiff line numberDiff line change
@@ -926,7 +926,9 @@ public final class kotlinx/coroutines/flow/FlowKt {
926926
public static final fun map (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
927927
public static final fun mapLatest (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
928928
public static final fun mapNotNull (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
929+
public static final fun merge (Ljava/lang/Iterable;)Lkotlinx/coroutines/flow/Flow;
929930
public static final fun merge (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
931+
public static final fun merge ([Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
930932
public static final fun observeOn (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/flow/Flow;
931933
public static final synthetic fun onCompletion (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
932934
public static final fun onCompletion (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;

kotlinx-coroutines-core/common/src/flow/internal/Merge.kt

+20
Original file line numberDiff line numberDiff line change
@@ -75,3 +75,23 @@ internal class ChannelFlowMerge<T>(
7575
override fun additionalToStringProps(): String =
7676
"concurrency=$concurrency, "
7777
}
78+
79+
internal class ChannelLimitedFlowMerge<T>(
80+
private val flows: Iterable<Flow<T>>,
81+
context: CoroutineContext = EmptyCoroutineContext,
82+
capacity: Int = Channel.BUFFERED
83+
) : ChannelFlow<T>(context, capacity) {
84+
override fun create(context: CoroutineContext, capacity: Int): ChannelFlow<T> =
85+
ChannelLimitedFlowMerge(flows, context, capacity)
86+
87+
override fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> {
88+
return scope.flowProduce(context, capacity, block = collectToFun)
89+
}
90+
91+
override suspend fun collectTo(scope: ProducerScope<T>) {
92+
val collector = SendingCollector(scope)
93+
flows.forEach { flow ->
94+
scope.launch { flow.collect(collector) }
95+
}
96+
}
97+
}

kotlinx-coroutines-core/common/src/flow/operators/Merge.kt

+36
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,42 @@ public fun <T> Flow<Flow<T>>.flattenConcat(): Flow<T> = flow {
8181
collect { value -> emitAll(value) }
8282
}
8383

84+
/**
85+
* Merges the given flows into a single flow without preserving an order of elements.
86+
* All flows are merged concurrently, without limit on the number of simultaneously collected flows.
87+
*
88+
* ### Operator fusion
89+
*
90+
* Applications of [flowOn], [buffer], [produceIn], and [broadcastIn] _after_ this operator are fused with
91+
* its concurrent merging so that only one properly configured channel is used for execution of merging logic.
92+
*/
93+
@ExperimentalCoroutinesApi
94+
public fun <T> Iterable<Flow<T>>.merge(): Flow<T> {
95+
/*
96+
* This is a fuseable implementation of the following operator:
97+
* channelFlow {
98+
* forEach { flow ->
99+
* launch {
100+
* flow.collect { send(it) }
101+
* }
102+
* }
103+
* }
104+
*/
105+
return ChannelLimitedFlowMerge(this)
106+
}
107+
108+
/**
109+
* Merges the given flows into a single flow without preserving an order of elements.
110+
* All flows are merged concurrently, without limit on the number of simultaneously collected flows.
111+
*
112+
* ### Operator fusion
113+
*
114+
* Applications of [flowOn], [buffer], [produceIn], and [broadcastIn] _after_ this operator are fused with
115+
* its concurrent merging so that only one properly configured channel is used for execution of merging logic.
116+
*/
117+
@ExperimentalCoroutinesApi
118+
public fun <T> merge(vararg flows: Flow<T>): Flow<T> = flows.asIterable().merge()
119+
84120
/**
85121
* Flattens the given flow of flows into a single flow with a [concurrency] limit on the number of
86122
* concurrently collected flows.

kotlinx-coroutines-core/common/test/flow/operators/MergeTest.kt

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
/* * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */package kotlinx.coroutines.flowimport kotlinx.coroutines.*import kotlin.test.*import kotlinx.coroutines.flow.merge as originalMergeabstract class MergeTest : TestBase() { abstract fun <T> Iterable<Flow<T>>.merge(): Flow<T> @Test fun testMerge() = runTest { val n = 100 val sum = (1..n).map { flowOf(it) } .merge() .sum() assertEquals(n * (n + 1) / 2, sum) } @Test fun testSingle() = runTest { val flow = listOf(flowOf(), flowOf(42), flowOf()).merge() val value = flow.single() assertEquals(42, value) } @Test fun testNulls() = runTest { val list = listOf(flowOf(1), flowOf(null), flowOf(2)).merge().toList() assertEquals(listOf(1, null, 2), list) } @Test fun testContext() = runTest { val flow = flow { emit(NamedDispatchers.name()) }.flowOn(NamedDispatchers("source")) val result = listOf(flow).merge().flowOn(NamedDispatchers("irrelevant")).toList() assertEquals(listOf("source"), result) } @Test fun testIsolatedContext() = runTest { val flow = flow { emit(NamedDispatchers.name()) } val result = listOf(flow.flowOn(NamedDispatchers("1")), flow.flowOn(NamedDispatchers("2"))) .merge() .flowOn(NamedDispatchers("irrelevant")) .toList() assertEquals(listOf("1", "2"), result) }}class IterableMergeTest : MergeTest() { override fun <T> Iterable<Flow<T>>.merge(): Flow<T> = originalMerge()}class VarargMergeTest : MergeTest() { override fun <T> Iterable<Flow<T>>.merge(): Flow<T> = originalMerge(*toList().toTypedArray())}

0 commit comments

Comments
 (0)