Skip to content

Introduce merge operator #1698

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -926,7 +926,9 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static final fun map (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun mapLatest (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun mapNotNull (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun merge (Ljava/lang/Iterable;)Lkotlinx/coroutines/flow/Flow;
public static final fun merge (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
public static final fun merge ([Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
public static final fun observeOn (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/flow/Flow;
public static final synthetic fun onCompletion (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun onCompletion (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
Expand Down
20 changes: 20 additions & 0 deletions kotlinx-coroutines-core/common/src/flow/internal/Merge.kt
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,23 @@ internal class ChannelFlowMerge<T>(
override fun additionalToStringProps(): String =
"concurrency=$concurrency, "
}

internal class ChannelLimitedFlowMerge<T>(
private val flows: Iterable<Flow<T>>,
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = Channel.BUFFERED
) : ChannelFlow<T>(context, capacity) {
override fun create(context: CoroutineContext, capacity: Int): ChannelFlow<T> =
ChannelLimitedFlowMerge(flows, context, capacity)

override fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> {
return scope.flowProduce(context, capacity, block = collectToFun)
}

override suspend fun collectTo(scope: ProducerScope<T>) {
val collector = SendingCollector(scope)
flows.forEach { flow ->
scope.launch { flow.collect(collector) }
}
}
}
36 changes: 36 additions & 0 deletions kotlinx-coroutines-core/common/src/flow/operators/Merge.kt
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,42 @@ public fun <T> Flow<Flow<T>>.flattenConcat(): Flow<T> = flow {
collect { value -> emitAll(value) }
}

/**
* Merges the given flows into a single flow without preserving an order of elements.
* All flows are merged concurrently, without limit on the number of simultaneously collected flows.
*
* ### Operator fusion
*
* Applications of [flowOn], [buffer], [produceIn], and [broadcastIn] _after_ this operator are fused with
* its concurrent merging so that only one properly configured channel is used for execution of merging logic.
*/
@ExperimentalCoroutinesApi
public fun <T> Iterable<Flow<T>>.merge(): Flow<T> {
/*
* This is a fuseable implementation of the following operator:
* channelFlow {
* forEach { flow ->
* launch {
* flow.collect { send(it) }
* }
* }
* }
*/
return ChannelLimitedFlowMerge(this)
}

/**
* Merges the given flows into a single flow without preserving an order of elements.
* All flows are merged concurrently, without limit on the number of simultaneously collected flows.
*
* ### Operator fusion
*
* Applications of [flowOn], [buffer], [produceIn], and [broadcastIn] _after_ this operator are fused with
* its concurrent merging so that only one properly configured channel is used for execution of merging logic.
*/
@ExperimentalCoroutinesApi
public fun <T> merge(vararg flows: Flow<T>): Flow<T> = flows.asIterable().merge()

/**
* Flattens the given flow of flows into a single flow with a [concurrency] limit on the number of
* concurrently collected flows.
Expand Down
68 changes: 68 additions & 0 deletions kotlinx-coroutines-core/common/test/flow/operators/MergeTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.flow

import kotlinx.coroutines.*
import kotlin.test.*
import kotlinx.coroutines.flow.merge as originalMerge

abstract class MergeTest : TestBase() {

abstract fun <T> Iterable<Flow<T>>.merge(): Flow<T>

@Test
fun testMerge() = runTest {
val n = 100
val sum = (1..n).map { flowOf(it) }
.merge()
.sum()

assertEquals(n * (n + 1) / 2, sum)
}

@Test
fun testSingle() = runTest {
val flow = listOf(flowOf(), flowOf(42), flowOf()).merge()
val value = flow.single()
assertEquals(42, value)
}

@Test
fun testNulls() = runTest {
val list = listOf(flowOf(1), flowOf(null), flowOf(2)).merge().toList()
assertEquals(listOf(1, null, 2), list)
}

@Test
fun testContext() = runTest {
val flow = flow {
emit(NamedDispatchers.name())
}.flowOn(NamedDispatchers("source"))

val result = listOf(flow).merge().flowOn(NamedDispatchers("irrelevant")).toList()
assertEquals(listOf("source"), result)
}

@Test
fun testIsolatedContext() = runTest {
val flow = flow {
emit(NamedDispatchers.name())
}

val result = listOf(flow.flowOn(NamedDispatchers("1")), flow.flowOn(NamedDispatchers("2")))
.merge()
.flowOn(NamedDispatchers("irrelevant"))
.toList()
assertEquals(listOf("1", "2"), result)
}
}

class IterableMergeTest : MergeTest() {
override fun <T> Iterable<Flow<T>>.merge(): Flow<T> = originalMerge()
}

class VarargMergeTest : MergeTest() {
override fun <T> Iterable<Flow<T>>.merge(): Flow<T> = originalMerge(*toList().toTypedArray())
}