From 73068ae279a52a9490ce45d18c50397f1e2ed331 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Wed, 11 Dec 2019 14:54:31 +0300 Subject: [PATCH] Introduce merge operator Fixes #1491 --- .../kotlinx-coroutines-core.txt | 2 + .../common/src/flow/internal/Merge.kt | 20 ++++++ .../common/src/flow/operators/Merge.kt | 36 ++++++++++ .../common/test/flow/operators/MergeTest.kt | 68 +++++++++++++++++++ 4 files changed, 126 insertions(+) create mode 100644 kotlinx-coroutines-core/common/test/flow/operators/MergeTest.kt diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt index 5b7955c499..d8d4528eb4 100644 --- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt +++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt @@ -926,7 +926,9 @@ public final class kotlinx/coroutines/flow/FlowKt { public static final fun map (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; public static final fun mapLatest (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; public static final fun mapNotNull (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; + public static final fun merge (Ljava/lang/Iterable;)Lkotlinx/coroutines/flow/Flow; public static final fun merge (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow; + public static final fun merge ([Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow; public static final fun observeOn (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/flow/Flow; public static final synthetic fun onCompletion (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; public static final fun onCompletion (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow; diff --git a/kotlinx-coroutines-core/common/src/flow/internal/Merge.kt b/kotlinx-coroutines-core/common/src/flow/internal/Merge.kt index 289a4ebcab..6fbbea31dd 100644 --- a/kotlinx-coroutines-core/common/src/flow/internal/Merge.kt +++ b/kotlinx-coroutines-core/common/src/flow/internal/Merge.kt @@ -75,3 +75,23 @@ internal class ChannelFlowMerge( override fun additionalToStringProps(): String = "concurrency=$concurrency, " } + +internal class ChannelLimitedFlowMerge( + private val flows: Iterable>, + context: CoroutineContext = EmptyCoroutineContext, + capacity: Int = Channel.BUFFERED +) : ChannelFlow(context, capacity) { + override fun create(context: CoroutineContext, capacity: Int): ChannelFlow = + ChannelLimitedFlowMerge(flows, context, capacity) + + override fun produceImpl(scope: CoroutineScope): ReceiveChannel { + return scope.flowProduce(context, capacity, block = collectToFun) + } + + override suspend fun collectTo(scope: ProducerScope) { + val collector = SendingCollector(scope) + flows.forEach { flow -> + scope.launch { flow.collect(collector) } + } + } +} diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt b/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt index a7b7f709a5..d69afad2f3 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt @@ -81,6 +81,42 @@ public fun Flow>.flattenConcat(): Flow = flow { collect { value -> emitAll(value) } } +/** + * Merges the given flows into a single flow without preserving an order of elements. + * All flows are merged concurrently, without limit on the number of simultaneously collected flows. + * + * ### Operator fusion + * + * Applications of [flowOn], [buffer], [produceIn], and [broadcastIn] _after_ this operator are fused with + * its concurrent merging so that only one properly configured channel is used for execution of merging logic. + */ +@ExperimentalCoroutinesApi +public fun Iterable>.merge(): Flow { + /* + * This is a fuseable implementation of the following operator: + * channelFlow { + * forEach { flow -> + * launch { + * flow.collect { send(it) } + * } + * } + * } + */ + return ChannelLimitedFlowMerge(this) +} + +/** + * Merges the given flows into a single flow without preserving an order of elements. + * All flows are merged concurrently, without limit on the number of simultaneously collected flows. + * + * ### Operator fusion + * + * Applications of [flowOn], [buffer], [produceIn], and [broadcastIn] _after_ this operator are fused with + * its concurrent merging so that only one properly configured channel is used for execution of merging logic. + */ +@ExperimentalCoroutinesApi +public fun merge(vararg flows: Flow): Flow = flows.asIterable().merge() + /** * Flattens the given flow of flows into a single flow with a [concurrency] limit on the number of * concurrently collected flows. diff --git a/kotlinx-coroutines-core/common/test/flow/operators/MergeTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/MergeTest.kt new file mode 100644 index 0000000000..1248188554 --- /dev/null +++ b/kotlinx-coroutines-core/common/test/flow/operators/MergeTest.kt @@ -0,0 +1,68 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.flow + +import kotlinx.coroutines.* +import kotlin.test.* +import kotlinx.coroutines.flow.merge as originalMerge + +abstract class MergeTest : TestBase() { + + abstract fun Iterable>.merge(): Flow + + @Test + fun testMerge() = runTest { + val n = 100 + val sum = (1..n).map { flowOf(it) } + .merge() + .sum() + + assertEquals(n * (n + 1) / 2, sum) + } + + @Test + fun testSingle() = runTest { + val flow = listOf(flowOf(), flowOf(42), flowOf()).merge() + val value = flow.single() + assertEquals(42, value) + } + + @Test + fun testNulls() = runTest { + val list = listOf(flowOf(1), flowOf(null), flowOf(2)).merge().toList() + assertEquals(listOf(1, null, 2), list) + } + + @Test + fun testContext() = runTest { + val flow = flow { + emit(NamedDispatchers.name()) + }.flowOn(NamedDispatchers("source")) + + val result = listOf(flow).merge().flowOn(NamedDispatchers("irrelevant")).toList() + assertEquals(listOf("source"), result) + } + + @Test + fun testIsolatedContext() = runTest { + val flow = flow { + emit(NamedDispatchers.name()) + } + + val result = listOf(flow.flowOn(NamedDispatchers("1")), flow.flowOn(NamedDispatchers("2"))) + .merge() + .flowOn(NamedDispatchers("irrelevant")) + .toList() + assertEquals(listOf("1", "2"), result) + } +} + +class IterableMergeTest : MergeTest() { + override fun Iterable>.merge(): Flow = originalMerge() +} + +class VarargMergeTest : MergeTest() { + override fun Iterable>.merge(): Flow = originalMerge(*toList().toTypedArray()) +}