diff --git a/benchmarks/src/jmh/kotlin/benchmarks/YieldRelativeCostBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/YieldRelativeCostBenchmark.kt deleted file mode 100644 index c1dd0d9281..0000000000 --- a/benchmarks/src/jmh/kotlin/benchmarks/YieldRelativeCostBenchmark.kt +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ - -package benchmarks - -import org.openjdk.jmh.annotations.* -import org.openjdk.jmh.infra.* -import java.util.concurrent.* - -@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) -@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) -@Fork(value = 2) -@BenchmarkMode(Mode.AverageTime) -@OutputTimeUnit(TimeUnit.NANOSECONDS) -@State(Scope.Benchmark) -open class YieldRelativeCostBenchmark { - - @Param("1", "10", "100", "1000") - var iterations: Int = 10 - - @Benchmark - fun yields() { - repeat(iterations) { - Thread.yield() - } - } - - @Benchmark - fun spins(bh: Blackhole) { - repeat(iterations) { - bh.consume(it) - } - } -} diff --git a/benchmarks/src/jmh/kotlin/benchmarks/flow/FlatMapMergeBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/flow/FlatMapMergeBenchmark.kt new file mode 100644 index 0000000000..f6690977a2 --- /dev/null +++ b/benchmarks/src/jmh/kotlin/benchmarks/flow/FlatMapMergeBenchmark.kt @@ -0,0 +1,47 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package benchmarks.flow + +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* +import org.openjdk.jmh.annotations.* +import java.util.concurrent.* + +@Warmup(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS) +@Fork(value = 1) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MICROSECONDS) +@State(Scope.Benchmark) +open class FlatMapMergeBenchmark { + + // Note: tests only absence of contention on downstream + + @Param("10", "100", "1000") + private var iterations = 100 + + @Benchmark + fun flatMapUnsafe() = runBlocking { + benchmarks.flow.scrabble.flow { + repeat(iterations) { emit(it) } + }.flatMapMerge { value -> + flowOf(value) + }.collect { + if (it == -1) error("") + } + } + + @Benchmark + fun flatMapSafe() = runBlocking { + kotlinx.coroutines.flow.flow { + repeat(iterations) { emit(it) } + }.flatMapMerge { value -> + flowOf(value) + }.collect { + if (it == -1) error("") + } + } + +} \ No newline at end of file diff --git a/benchmarks/src/jmh/kotlin/benchmarks/flow/misc/Numbers.kt b/benchmarks/src/jmh/kotlin/benchmarks/flow/NumbersBenchmark.kt similarity index 98% rename from benchmarks/src/jmh/kotlin/benchmarks/flow/misc/Numbers.kt rename to benchmarks/src/jmh/kotlin/benchmarks/flow/NumbersBenchmark.kt index 1eb6fa4c6f..e037069d22 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/flow/misc/Numbers.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/flow/NumbersBenchmark.kt @@ -3,7 +3,7 @@ */ -package benchmarks.flow.misc +package benchmarks.flow import benchmarks.flow.scrabble.flow import io.reactivex.* @@ -35,7 +35,7 @@ import java.util.concurrent.* @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.MICROSECONDS) @State(Scope.Benchmark) -open class Numbers { +open class NumbersBenchmark { companion object { private const val primes = 100 diff --git a/benchmarks/src/jmh/kotlin/benchmarks/flow/misc/SafeFlowBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/flow/SafeFlowBenchmark.kt similarity index 97% rename from benchmarks/src/jmh/kotlin/benchmarks/flow/misc/SafeFlowBenchmark.kt rename to benchmarks/src/jmh/kotlin/benchmarks/flow/SafeFlowBenchmark.kt index f62af91eb8..f8c459fd0d 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/flow/misc/SafeFlowBenchmark.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/flow/SafeFlowBenchmark.kt @@ -2,7 +2,7 @@ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ -package benchmarks.flow.misc +package benchmarks.flow import kotlinx.coroutines.* import kotlinx.coroutines.flow.* diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt index a277169065..3f27b78abd 100644 --- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt +++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt @@ -992,7 +992,7 @@ public final class kotlinx/coroutines/flow/internal/SafeCollectorKt { public static final fun unsafeFlow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; } -public final class kotlinx/coroutines/flow/internal/SendingCollector : kotlinx/coroutines/flow/internal/ConcurrentFlowCollector { +public final class kotlinx/coroutines/flow/internal/SendingCollector : kotlinx/coroutines/flow/FlowCollector { public fun (Lkotlinx/coroutines/channels/SendChannel;)V public fun emit (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; } diff --git a/kotlinx-coroutines-core/common/src/channels/Produce.kt b/kotlinx-coroutines-core/common/src/channels/Produce.kt index bf88b6a062..a579d7a247 100644 --- a/kotlinx-coroutines-core/common/src/channels/Produce.kt +++ b/kotlinx-coroutines-core/common/src/channels/Produce.kt @@ -126,7 +126,7 @@ public fun CoroutineScope.produce( return coroutine } -private class ProducerCoroutine( +internal open class ProducerCoroutine( parentContext: CoroutineContext, channel: Channel ) : ChannelCoroutine(parentContext, channel, active = true), ProducerScope { override val isActive: Boolean diff --git a/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt b/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt index 3bae2ebd38..4711b88418 100644 --- a/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt +++ b/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt @@ -58,7 +58,7 @@ public abstract class ChannelFlow( protected abstract suspend fun collectTo(scope: ProducerScope) // shared code to create a suspend lambda from collectTo function in one place - private val collectToFun: suspend (ProducerScope) -> Unit + internal val collectToFun: suspend (ProducerScope) -> Unit get() = { collectTo(it) } private val produceCapacity: Int @@ -140,13 +140,11 @@ internal class ChannelFlowOperatorImpl( private fun FlowCollector.withUndispatchedContextCollector(emitContext: CoroutineContext): FlowCollector = when (this) { // SendingCollector & NopCollector do not care about the context at all and can be used as is is SendingCollector, is NopCollector -> this - // Original collector is concurrent, so wrap into ConcurrentUndispatchedContextCollector (also concurrent) - is ConcurrentFlowCollector -> ConcurrentUndispatchedContextCollector(this, emitContext) // Otherwise just wrap into UndispatchedContextCollector interface implementation else -> UndispatchedContextCollector(this, emitContext) } -private open class UndispatchedContextCollector( +private class UndispatchedContextCollector( downstream: FlowCollector, private val emitContext: CoroutineContext ) : FlowCollector { @@ -157,12 +155,6 @@ private open class UndispatchedContextCollector( withContextUndispatched(emitContext, countOrElement, emitRef, value) } -// named class for a combination of UndispatchedContextCollector & ConcurrentFlowCollector interface -private class ConcurrentUndispatchedContextCollector( - downstream: ConcurrentFlowCollector, - emitContext: CoroutineContext -) : UndispatchedContextCollector(downstream, emitContext), ConcurrentFlowCollector - // Efficiently computes block(value) in the newContext private suspend fun withContextUndispatched( newContext: CoroutineContext, diff --git a/kotlinx-coroutines-core/common/src/flow/internal/Concurrent.kt b/kotlinx-coroutines-core/common/src/flow/internal/Concurrent.kt deleted file mode 100644 index f37cc1caec..0000000000 --- a/kotlinx-coroutines-core/common/src/flow/internal/Concurrent.kt +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ - -package kotlinx.coroutines.flow.internal - -import kotlinx.atomicfu.* -import kotlinx.coroutines.* -import kotlinx.coroutines.channels.* -import kotlinx.coroutines.channels.ArrayChannel -import kotlinx.coroutines.flow.* - -internal fun FlowCollector.asConcurrentFlowCollector(): ConcurrentFlowCollector = - this as? ConcurrentFlowCollector ?: SerializingCollector(this) - -// Flow collector that supports concurrent emit calls. -// It is internal for now but may be public in the future. -// Two basic implementations are here: SendingCollector and ConcurrentFlowCollector -internal interface ConcurrentFlowCollector : FlowCollector - -/** - * Collection that sends to channel. It is marked as [ConcurrentFlowCollector] because it can be used concurrently. - * - * @suppress **This an internal API and should not be used from general code.** - */ -@InternalCoroutinesApi -public class SendingCollector( - private val channel: SendChannel -) : ConcurrentFlowCollector { - override suspend fun emit(value: T) = channel.send(value) -} - -// Effectively serializes access to downstream collector for merging -// This is basically a converted from FlowCollector interface to ConcurrentFlowCollector -private class SerializingCollector( - private val downstream: FlowCollector -) : ConcurrentFlowCollector { - // Let's try to leverage the fact that merge is never contended - // Should be Any, but KT-30796 - private val _channel = atomic?>(null) - private val inProgressLock = atomic(false) - - private val channel: ArrayChannel - get() = _channel.updateAndGet { value -> - if (value != null) return value - ArrayChannel(Channel.CHANNEL_DEFAULT_CAPACITY) - }!! - - public override suspend fun emit(value: T) { - if (!inProgressLock.tryAcquire()) { - channel.send(value ?: NULL) - if (inProgressLock.tryAcquire()) { - helpEmit() - } - return - } - downstream.emit(value) - helpEmit() - } - - @Suppress("UNCHECKED_CAST") - private suspend fun helpEmit() { - while (true) { - while (true) { - val element = _channel.value?.poll() ?: break // todo: pollOrClosed - downstream.emit(NULL.unbox(element)) - } - inProgressLock.release() - // Enforce liveness - if (_channel.value?.isEmpty != false || !inProgressLock.tryAcquire()) break - } - } -} - -@Suppress("NOTHING_TO_INLINE") -private inline fun AtomicBoolean.tryAcquire(): Boolean = compareAndSet(false, true) - -@Suppress("NOTHING_TO_INLINE") -private inline fun AtomicBoolean.release() { - value = false -} diff --git a/kotlinx-coroutines-core/common/src/flow/internal/FlowCoroutine.kt b/kotlinx-coroutines-core/common/src/flow/internal/FlowCoroutine.kt index adc3a17d16..1917afb8d7 100644 --- a/kotlinx-coroutines-core/common/src/flow/internal/FlowCoroutine.kt +++ b/kotlinx-coroutines-core/common/src/flow/internal/FlowCoroutine.kt @@ -52,6 +52,18 @@ internal fun scopedFlow(@BuilderInference block: suspend CoroutineScope.(Flo flowScope { block(collector) } } +internal fun CoroutineScope.flowProduce( + context: CoroutineContext, + capacity: Int = 0, + @BuilderInference block: suspend ProducerScope.() -> Unit +): ReceiveChannel { + val channel = Channel(capacity) + val newContext = newCoroutineContext(context) + val coroutine = FlowProduceCoroutine(newContext, channel) + coroutine.start(CoroutineStart.DEFAULT, coroutine, block) + return coroutine +} + private class FlowCoroutine( context: CoroutineContext, uCont: Continuation @@ -61,3 +73,13 @@ private class FlowCoroutine( return cancelImpl(cause) } } + +private class FlowProduceCoroutine( + parentContext: CoroutineContext, + channel: Channel +) : ProducerCoroutine(parentContext, channel) { + public override fun childCancelled(cause: Throwable): Boolean { + if (cause is ChildCancelledException) return true + return cancelImpl(cause) + } +} diff --git a/kotlinx-coroutines-core/common/src/flow/internal/Merge.kt b/kotlinx-coroutines-core/common/src/flow/internal/Merge.kt index f621be034e..289a4ebcab 100644 --- a/kotlinx-coroutines-core/common/src/flow/internal/Merge.kt +++ b/kotlinx-coroutines-core/common/src/flow/internal/Merge.kt @@ -38,17 +38,21 @@ internal class ChannelFlowTransformLatest( } internal class ChannelFlowMerge( - flow: Flow>, + private val flow: Flow>, private val concurrency: Int, context: CoroutineContext = EmptyCoroutineContext, - capacity: Int = Channel.OPTIONAL_CHANNEL -) : ChannelFlowOperator, T>(flow, context, capacity) { + capacity: Int = Channel.BUFFERED +) : ChannelFlow(context, capacity) { override fun create(context: CoroutineContext, capacity: Int): ChannelFlow = ChannelFlowMerge(flow, concurrency, context, capacity) - // The actual merge implementation with concurrency limit - private suspend fun mergeImpl(scope: CoroutineScope, collector: ConcurrentFlowCollector) { + override fun produceImpl(scope: CoroutineScope): ReceiveChannel { + return scope.flowProduce(context, capacity, block = collectToFun) + } + + override suspend fun collectTo(scope: ProducerScope) { val semaphore = Semaphore(concurrency) + val collector = SendingCollector(scope) val job: Job? = coroutineContext[Job] flow.collect { inner -> /* @@ -68,19 +72,6 @@ internal class ChannelFlowMerge( } } - // Fast path in ChannelFlowOperator calls this function (channel was not created yet) - override suspend fun flowCollect(collector: FlowCollector) { - // this function should not have been invoked when channel was explicitly requested - assert { capacity == Channel.OPTIONAL_CHANNEL } - flowScope { - mergeImpl(this, collector.asConcurrentFlowCollector()) - } - } - - // Slow path when output channel is required (and was created) - override suspend fun collectTo(scope: ProducerScope) = - mergeImpl(scope, SendingCollector(scope)) - override fun additionalToStringProps(): String = "concurrency=$concurrency, " } diff --git a/kotlinx-coroutines-core/common/src/flow/internal/NopCollector.kt b/kotlinx-coroutines-core/common/src/flow/internal/NopCollector.kt index f83f31348f..d1b6ad2bf5 100644 --- a/kotlinx-coroutines-core/common/src/flow/internal/NopCollector.kt +++ b/kotlinx-coroutines-core/common/src/flow/internal/NopCollector.kt @@ -4,7 +4,9 @@ package kotlinx.coroutines.flow.internal -internal object NopCollector : ConcurrentFlowCollector { +import kotlinx.coroutines.flow.* + +internal object NopCollector : FlowCollector { override suspend fun emit(value: Any?) { // does nothing } diff --git a/kotlinx-coroutines-core/common/src/flow/internal/SendingCollector.kt b/kotlinx-coroutines-core/common/src/flow/internal/SendingCollector.kt new file mode 100644 index 0000000000..b6d578fedc --- /dev/null +++ b/kotlinx-coroutines-core/common/src/flow/internal/SendingCollector.kt @@ -0,0 +1,20 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.flow.internal + +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* +import kotlinx.coroutines.flow.* + +/** + * Collection that sends to channel + * @suppress **This an internal API and should not be used from general code.** + */ +@InternalCoroutinesApi +public class SendingCollector( + private val channel: SendChannel +) : FlowCollector { + override suspend fun emit(value: T) = channel.send(value) +} diff --git a/kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeFastPathTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeFastPathTest.kt new file mode 100644 index 0000000000..a92189c45c --- /dev/null +++ b/kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeFastPathTest.kt @@ -0,0 +1,92 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.flow + +import kotlinx.coroutines.* +import kotlin.test.* + +class FlatMapMergeFastPathTest : FlatMapMergeBaseTest() { + + override fun Flow.flatMap(mapper: suspend (T) -> Flow): Flow = flatMapMerge(transform = mapper).buffer(64) + + @Test + override fun testFlatMapConcurrency() = runTest { + var concurrentRequests = 0 + val flow = (1..100).asFlow().flatMapMerge(concurrency = 2) { value -> + flow { + ++concurrentRequests + emit(value) + delay(Long.MAX_VALUE) + } + }.buffer(64) + + val consumer = launch { + flow.collect { value -> + expect(value) + } + } + + repeat(4) { + yield() + } + + assertEquals(2, concurrentRequests) + consumer.cancelAndJoin() + finish(3) + } + + @Test + fun testCancellationExceptionDownstream() = runTest { + val flow = flow { + emit(1) + hang { expect(2) } + }.flatMapMerge { + flow { + emit(it) + expect(1) + throw CancellationException("") + } + }.buffer(64) + + assertFailsWith(flow) + finish(3) + } + + @Test + fun testCancellationExceptionUpstream() = runTest { + val flow = flow { + expect(1) + emit(1) + expect(2) + yield() + throw CancellationException("") + }.flatMapMerge { + flow { + expect(3) + emit(it) + hang { expect(4) } + } + }.buffer(64) + + assertFailsWith(flow) + finish(5) + } + + @Test + fun testCancellation() = runTest { + val result = flow { + emit(1) + emit(2) + emit(3) + emit(4) + expectUnreached() // Cancelled by take + emit(5) + }.flatMapMerge(2) { v -> flow { emit(v) } } + .buffer(64) + .take(2) + .toList() + assertEquals(listOf(1, 2), result) + } +} diff --git a/kotlinx-coroutines-core/common/test/flow/operators/FlattenMergeTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/FlattenMergeTest.kt index e1f1d4a624..c15f503c45 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/FlattenMergeTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/FlattenMergeTest.kt @@ -14,7 +14,7 @@ class FlattenMergeTest : FlatMapMergeBaseTest() { @Test override fun testFlatMapConcurrency() = runTest { var concurrentRequests = 0 - val flow = (1..100).asFlow().map() { value -> + val flow = (1..100).asFlow().map { value -> flow { ++concurrentRequests emit(value) @@ -36,4 +36,19 @@ class FlattenMergeTest : FlatMapMergeBaseTest() { consumer.cancelAndJoin() finish(3) } + + @Test + fun testContextPreservationAcrossFlows() = runTest { + val result = flow { + flowOf(1, 2).flatMapMerge { + flow { + yield() + emit(it) + } + }.collect { + emit(it) + } + }.toList() + assertEquals(listOf(1, 2), result) + } }