From fda1e5681060c09dd5d82d1781ff85577754bc02 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Wed, 29 May 2019 12:41:43 +0300 Subject: [PATCH 1/6] Add a migration for compose operator --- .../common/src/flow/Migration.kt | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/kotlinx-coroutines-core/common/src/flow/Migration.kt b/kotlinx-coroutines-core/common/src/flow/Migration.kt index bf20d2f2a2..99d697f41f 100644 --- a/kotlinx-coroutines-core/common/src/flow/Migration.kt +++ b/kotlinx-coroutines-core/common/src/flow/Migration.kt @@ -193,3 +193,24 @@ public fun Flow>.merge(): Flow = error("Should not be called") replaceWith = ReplaceWith("flattenConcat()") ) public fun Flow>.flatten(): Flow = error("Should not be called") + +/** + * Kotlin has a built-in generic mechanism for making chained calls. + * If you wish to write something like + * ``` + * myFlow.compose(MyFlowExtensions.ignoreErrors()).collect { ... } + * ``` + * you can replace it with + * + * ``` + * myFlow.let(MyFlowExtensions.ignoreErrors()).collect { ... } + * ``` + * + * @suppress + */ +@Deprecated( + level = DeprecationLevel.ERROR, + message = "Kotlin analogue of compose is 'let'", + replaceWith = ReplaceWith("let(transformer)") +) +public fun Flow.compose(transformer: Flow.() -> Flow): Flow = error("Should not be called") From 26daec63bf59aaa60c863cabb43f8adb73ecd108 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Wed, 29 May 2019 18:17:19 +0300 Subject: [PATCH 2/6] Scan and emitAll operators Fixes #1094 --- .../kotlinx-coroutines-core.txt | 5 ++ .../common/src/flow/Migration.kt | 10 +++ .../common/src/flow/operators/Errors.kt | 4 +- .../common/src/flow/operators/Merge.kt | 10 +-- .../common/src/flow/operators/Transform.kt | 43 ++++++++++++ .../common/src/flow/terminal/Collect.kt | 6 ++ .../common/test/flow/operators/ScanTest.kt | 68 +++++++++++++++++++ 7 files changed, 135 insertions(+), 11 deletions(-) create mode 100644 kotlinx-coroutines-core/common/test/flow/operators/ScanTest.kt diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt index ab51a2dd72..ade26fe99f 100644 --- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt +++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt @@ -822,6 +822,7 @@ public final class kotlinx/coroutines/flow/FlowKt { public static final fun distinctUntilChangedBy (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow; public static final fun drop (Lkotlinx/coroutines/flow/Flow;I)Lkotlinx/coroutines/flow/Flow; public static final fun dropWhile (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; + public static final fun emitAll (Lkotlinx/coroutines/flow/FlowCollector;Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun emptyFlow ()Lkotlinx/coroutines/flow/Flow; public static final fun filter (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; public static final fun filterNot (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; @@ -853,6 +854,8 @@ public final class kotlinx/coroutines/flow/FlowKt { public static final fun retry (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow; public static synthetic fun retry$default (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow; public static final fun sample (Lkotlinx/coroutines/flow/Flow;J)Lkotlinx/coroutines/flow/Flow; + public static final fun scan (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow; + public static final fun scan (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow; public static final fun single (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun singleOrNull (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun switchMap (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; @@ -872,6 +875,7 @@ public final class kotlinx/coroutines/flow/MigrationKt { public static final fun BehaviourSubject ()Ljava/lang/Object; public static final fun PublishSubject ()Ljava/lang/Object; public static final fun ReplaySubject ()Ljava/lang/Object; + public static final fun compose (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow; public static final fun concatMap (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow; public static final fun flatMap (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; public static final fun flatten (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow; @@ -879,6 +883,7 @@ public final class kotlinx/coroutines/flow/MigrationKt { public static final fun observeOn (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/flow/Flow; public static final fun onErrorResume (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow; public static final fun publishOn (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/flow/Flow; + public static final fun skip (Lkotlinx/coroutines/flow/Flow;I)Lkotlinx/coroutines/flow/Flow; public static final fun subscribe (Lkotlinx/coroutines/flow/Flow;)V public static final fun subscribe (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)V public static final fun subscribe (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;)V diff --git a/kotlinx-coroutines-core/common/src/flow/Migration.kt b/kotlinx-coroutines-core/common/src/flow/Migration.kt index 99d697f41f..684976ebeb 100644 --- a/kotlinx-coroutines-core/common/src/flow/Migration.kt +++ b/kotlinx-coroutines-core/common/src/flow/Migration.kt @@ -214,3 +214,13 @@ public fun Flow>.flatten(): Flow = error("Should not be called") replaceWith = ReplaceWith("let(transformer)") ) public fun Flow.compose(transformer: Flow.() -> Flow): Flow = error("Should not be called") + +/** + * @suppress + */ +@Deprecated( + level = DeprecationLevel.ERROR, + message = "Kotlin analogue of 'skip' is 'drop'", + replaceWith = ReplaceWith("drop(count)") +) +public fun Flow.skip(count: Int): Flow = error("Should not be called") diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Errors.kt b/kotlinx-coroutines-core/common/src/flow/operators/Errors.kt index de964da6ef..29777b7a83 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Errors.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Errors.kt @@ -27,9 +27,7 @@ public fun Flow.onErrorCollect( predicate: ExceptionPredicate = ALWAYS_TRUE ): Flow = collectSafely { e -> if (!predicate(e)) throw e - fallback.collect { value -> - emit(value) - } + emitAll(fallback) } /** diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt b/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt index 0fa6e8abd4..38b116a83f 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt @@ -80,11 +80,7 @@ public fun Flow.flatMapMerge( */ @FlowPreview public fun Flow>.flattenConcat(): Flow = flow { - collect { value -> - value.collect { innerValue -> - emit(innerValue) - } - } + collect { value -> emitAll(value) } } /** @@ -137,9 +133,7 @@ public fun Flow.switchMap(transform: suspend (value: T) -> Flow): F previousFlow?.join() // Undispatched to have better user experience in case of synchronous flows previousFlow = launch(start = CoroutineStart.UNDISPATCHED) { - transform(value).collect { innerValue -> - downstream.emit(innerValue) - } + downstream.emitAll(transform(value)) } } } diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Transform.kt b/kotlinx-coroutines-core/common/src/flow/operators/Transform.kt index aff523dd99..d6ca3086ad 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Transform.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Transform.kt @@ -4,10 +4,12 @@ @file:JvmMultifileClass @file:JvmName("FlowKt") +@file:Suppress("UNCHECKED_CAST") package kotlinx.coroutines.flow import kotlinx.coroutines.* +import kotlinx.coroutines.flow.internal.NullSurrogate import kotlin.jvm.* import kotlinx.coroutines.flow.unsafeFlow as flow @@ -97,3 +99,44 @@ public fun Flow.onEach(action: suspend (T) -> Unit): Flow = flow { emit(value) } } + +/** + * Reduces the given flow with [operation], emitting every intermediate result, including initial value. + * The first element is takes as initial value for operation accumulator. + * For example: + * ``` + * flowOf(1, 2, 3, 4).scan { (v1, v2) -> v1 + v2 }.toList() + * ``` + * will produce `[1, 3, 6, 10]` + */ +@FlowPreview +public fun Flow.scan(operation: suspend (accumulator: T, value: T) -> T): Flow = flow { + var accumulator: Any? = NullSurrogate + collect { value -> + accumulator = if (accumulator === NullSurrogate) { + value + } else { + operation(accumulator as T, value) + } + emit(accumulator as T) + } +} + +/** + * Reduces the given flow with [operation], emitting every intermediate result, including initial value. + * An initial value is provided lazily by [initialSupplier] and is always immediately emitted. + * For example: + * ``` + * flowOf(1, 2, 3).scan(::emptyList) { acc: List, value -> acc + value }.toList() + * ``` + * will produce `[], [1], [1, 2], [1, 2, 3]]`. + */ +@FlowPreview +public fun Flow.scan(initialSupplier: () -> R, @BuilderInference operation: suspend (accumulator: R, value: T) -> R): Flow = flow { + var accumulator: R = initialSupplier() + emit(accumulator) + collect { value -> + accumulator = operation(accumulator, value) + emit(accumulator) + } +} diff --git a/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt b/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt index 624b51f683..e725f892d6 100644 --- a/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt +++ b/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt @@ -32,3 +32,9 @@ public suspend inline fun Flow.collect(crossinline action: suspend (value collect(object : FlowCollector { override suspend fun emit(value: T) = action(value) }) + +/** + * Collects all the values from the given [flow] and emits them to the collector. + * Shortcut for `flow.collect { value -> emit(value) }`. + */ +public suspend inline fun FlowCollector.emitAll(flow: Flow) = flow.collect { value -> emit(value) } diff --git a/kotlinx-coroutines-core/common/test/flow/operators/ScanTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/ScanTest.kt new file mode 100644 index 0000000000..689a6bd83e --- /dev/null +++ b/kotlinx-coroutines-core/common/test/flow/operators/ScanTest.kt @@ -0,0 +1,68 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.flow + +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* +import kotlin.test.* + +class ScanTest : TestBase() { + @Test + fun testScan() = runTest { + val flow = flowOf(1, 2, 3, 4, 5) + val result = flow.scan { acc, v -> acc + v }.toList() + assertEquals(listOf(1, 3, 6, 10, 15), result) + } + + @Test + fun testScanWithInitial() = runTest { + val flow = flowOf(1, 2, 3) + val result = flow.scan(::emptyList) { acc: List, value -> acc + value }.toList() + assertEquals(listOf(emptyList(), listOf(1), listOf(1, 2), listOf(1, 2, 3)), result) + } + + @Test + fun testNulls() = runTest { + val flow = flowOf(null, 2, null, null, null, 5) + val result = flow.scan { acc, v -> if (v == null) acc else (if (acc == null) v else acc + v) }.toList() + assertEquals(listOf(null, 2, 2, 2, 2, 7), result) + } + + @Test + fun testEmptyFlow() = runTest { + val result = emptyFlow().scan { _, _ -> 1 }.toList() + assertTrue(result.isEmpty()) + } + + @Test + fun testErrorCancelsUpstream() = runTest { + expect(1) + val latch = Channel() + val flow = flow { + coroutineScope { + launch { + latch.send(Unit) + hang { expect(3) } + } + emit(1) + emit(2) + } + }.scan { _, value -> + expect(value) // 2 + latch.receive() + throw TestException() + }.onErrorCollect(emptyFlow()) + + assertEquals(1, flow.single()) + finish(4) + } + + public operator fun Collection.plus(element: T): List { + val result = ArrayList(size + 1) + result.addAll(this) + result.add(element) + return result + } +} From 1f2c7f9d928796c2500c4c24762cfddfe6813866 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Fri, 31 May 2019 13:07:20 +0300 Subject: [PATCH 3/6] Flow.first operators family (without firstOrNull and firstOrDefault support) Fixes #1078 --- .../kotlinx-coroutines-core.txt | 2 + .../common/src/flow/terminal/Reduce.kt | 47 +++++++++- .../common/test/flow/terminal/FirstTest.kt | 86 +++++++++++++++++++ 3 files changed, 133 insertions(+), 2 deletions(-) create mode 100644 kotlinx-coroutines-core/common/test/flow/terminal/FirstTest.kt diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt index ade26fe99f..7ca7625aec 100644 --- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt +++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt @@ -827,6 +827,8 @@ public final class kotlinx/coroutines/flow/FlowKt { public static final fun filter (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; public static final fun filterNot (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; public static final fun filterNotNull (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow; + public static final fun first (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun first (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun flatMapConcat (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; public static final fun flatMapMerge (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; public static synthetic fun flatMapMerge$default (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow; diff --git a/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt b/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt index 3a519e6514..f17f5eba9d 100644 --- a/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt +++ b/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt @@ -4,6 +4,7 @@ @file:JvmMultifileClass @file:JvmName("FlowKt") +@file:Suppress("UNCHECKED_CAST") package kotlinx.coroutines.flow @@ -50,7 +51,7 @@ public suspend inline fun Flow.fold( } /** - * Terminal operator, that awaits for one and only one value to be published. + * The terminal operator, that awaits for one and only one value to be published. * Throws [NoSuchElementException] for empty flow and [IllegalStateException] for flow * that contains more than one element. */ @@ -68,7 +69,7 @@ public suspend fun Flow.single(): T { } /** - * Terminal operator, that awaits for one and only one value to be published. + * The terminal operator, that awaits for one and only one value to be published. * Throws [IllegalStateException] for flow that contains more than one element. */ @FlowPreview @@ -81,3 +82,45 @@ public suspend fun Flow.singleOrNull(): T? { return result } + +/** + * The terminal operator that returns the first element emitted by the flow and then cancels flow's collection. + * Throws [NoSuchElementException] if the flow was empty. + */ +@FlowPreview +public suspend fun Flow.first(): T { + var result: Any? = NullSurrogate + try { + collect { value -> + result = value + throw AbortFlowException() + } + } catch (e: AbortFlowException) { + // Do nothing + } + + if (result === NullSurrogate) throw NoSuchElementException("Expected at least one element") + return result as T +} + +/** + * The terminal operator that returns the first element emitted by the flow matching the given [predicate] and then cancels flow's collection. + * Throws [NoSuchElementException] if the flow has not contained elements matching the [predicate]. + */ +@FlowPreview +public suspend fun Flow.first(predicate: suspend (T) -> Boolean): T { + var result: Any? = NullSurrogate + try { + collect { value -> + if (predicate(value)) { + result = value + throw AbortFlowException() + } + } + } catch (e: AbortFlowException) { + // Do nothing + } + + if (result === NullSurrogate) throw NoSuchElementException("Expected at least one element matching the predicate $predicate") + return result as T +} diff --git a/kotlinx-coroutines-core/common/test/flow/terminal/FirstTest.kt b/kotlinx-coroutines-core/common/test/flow/terminal/FirstTest.kt new file mode 100644 index 0000000000..e84d4c7b77 --- /dev/null +++ b/kotlinx-coroutines-core/common/test/flow/terminal/FirstTest.kt @@ -0,0 +1,86 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.flow + +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* +import kotlin.test.* + +class FirstTest : TestBase() { + @Test + fun testFirst() = runTest { + val flow = flowOf(1, 2, 3) + assertEquals(1, flow.first()) + } + + @Test + fun testNulls() = runTest { + val flow = flowOf(null, 1) + assertNull(flow.first()) + assertNull(flow.first { it == null }) + assertEquals(1, flow.first { it != null }) + } + + @Test + fun testFirstWithPredicate() = runTest { + val flow = flowOf(1, 2, 3) + assertEquals(1, flow.first { it > 0 }) + assertEquals(2, flow.first { it > 1 }) + assertFailsWith { flow.first { it > 3 } } + } + + @Test + fun testFirstCancellation() = runTest { + val latch = Channel() + val flow = flow { + coroutineScope { + launch { + latch.send(Unit) + hang { expect(1) } + } + emit(1) + emit(2) + } + } + + + val result = flow.first { + latch.receive() + true + } + assertEquals(1, result) + finish(2) + } + + @Test + fun testEmptyFlow() = runTest { + assertFailsWith { emptyFlow().first() } + assertFailsWith { emptyFlow().first { true } } + } + + @Test + fun testErrorCancelsUpstream() = runTest { + val latch = Channel() + val flow = flow { + coroutineScope { + launch { + latch.send(Unit) + hang { expect(1) } + } + emit(1) + } + } + + assertFailsWith { + flow.first { + latch.receive() + throw TestException() + } + } + + assertEquals(1, flow.first()) + finish(2) + } +} From e118a9ab7f330524f6199945deccfef760a744ab Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Fri, 31 May 2019 21:25:17 +0300 Subject: [PATCH 4/6] Rename scan to accumulate, get rid of lambda with initial value --- .../kotlinx-coroutines-core.txt | 1 - .../common/src/flow/operators/Transform.kt | 14 ++++++++------ .../common/src/flow/terminal/Collect.kt | 2 +- .../common/test/flow/operators/ScanTest.kt | 2 +- 4 files changed, 10 insertions(+), 9 deletions(-) diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt index 7ca7625aec..5e80162b8d 100644 --- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt +++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt @@ -856,7 +856,6 @@ public final class kotlinx/coroutines/flow/FlowKt { public static final fun retry (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow; public static synthetic fun retry$default (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow; public static final fun sample (Lkotlinx/coroutines/flow/Flow;J)Lkotlinx/coroutines/flow/Flow; - public static final fun scan (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow; public static final fun scan (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow; public static final fun single (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun singleOrNull (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Transform.kt b/kotlinx-coroutines-core/common/src/flow/operators/Transform.kt index d6ca3086ad..60d521ef28 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Transform.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Transform.kt @@ -102,7 +102,9 @@ public fun Flow.onEach(action: suspend (T) -> Unit): Flow = flow { /** * Reduces the given flow with [operation], emitting every intermediate result, including initial value. - * The first element is takes as initial value for operation accumulator. + * The first element is taken as initial value for operation accumulator. + * This operator has a sibling with initial value -- [accumulate]. + * * For example: * ``` * flowOf(1, 2, 3, 4).scan { (v1, v2) -> v1 + v2 }.toList() @@ -123,17 +125,17 @@ public fun Flow.scan(operation: suspend (accumulator: T, value: T) -> T): } /** - * Reduces the given flow with [operation], emitting every intermediate result, including initial value. - * An initial value is provided lazily by [initialSupplier] and is always immediately emitted. + * Reduces the given flow with [operation], emitting every intermediate result, including [initial] value. + * Note that initial value should be immutable (or should not be mutated) as it is shared between different collectors. * For example: * ``` - * flowOf(1, 2, 3).scan(::emptyList) { acc: List, value -> acc + value }.toList() + * flowOf(1, 2, 3).accumulate(emptyList()) { acc, value -> acc + value }.toList() * ``` * will produce `[], [1], [1, 2], [1, 2, 3]]`. */ @FlowPreview -public fun Flow.scan(initialSupplier: () -> R, @BuilderInference operation: suspend (accumulator: R, value: T) -> R): Flow = flow { - var accumulator: R = initialSupplier() +public fun Flow.accumulate(initial: R, @BuilderInference operation: suspend (accumulator: R, value: T) -> R): Flow = flow { + var accumulator: R = initial emit(accumulator) collect { value -> accumulator = operation(accumulator, value) diff --git a/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt b/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt index e725f892d6..a6a218cf46 100644 --- a/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt +++ b/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt @@ -37,4 +37,4 @@ public suspend inline fun Flow.collect(crossinline action: suspend (value * Collects all the values from the given [flow] and emits them to the collector. * Shortcut for `flow.collect { value -> emit(value) }`. */ -public suspend inline fun FlowCollector.emitAll(flow: Flow) = flow.collect { value -> emit(value) } +public suspend inline fun FlowCollector.emitAll(flow: Flow) = flow.collect(this) diff --git a/kotlinx-coroutines-core/common/test/flow/operators/ScanTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/ScanTest.kt index 689a6bd83e..c13c09aeb9 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/ScanTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/ScanTest.kt @@ -19,7 +19,7 @@ class ScanTest : TestBase() { @Test fun testScanWithInitial() = runTest { val flow = flowOf(1, 2, 3) - val result = flow.scan(::emptyList) { acc: List, value -> acc + value }.toList() + val result = flow.accumulate(emptyList()) { acc, value -> acc + value }.toList() assertEquals(listOf(emptyList(), listOf(1), listOf(1, 2), listOf(1, 2, 3)), result) } From 234a4e8335d5cd0c0cda35eec5d6dbb394005a13 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Tue, 4 Jun 2019 12:06:32 +0300 Subject: [PATCH 5/6] Add forEach to migrations Fixes #1244 --- .../kotlinx-coroutines-core.txt | 1 + .../common/src/flow/Migration.kt | 15 +++++++++++++++ 2 files changed, 16 insertions(+) diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt index 5e80162b8d..d14feee0a4 100644 --- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt +++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt @@ -880,6 +880,7 @@ public final class kotlinx/coroutines/flow/MigrationKt { public static final fun concatMap (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow; public static final fun flatMap (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; public static final fun flatten (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow; + public static final fun forEach (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)V public static final fun merge (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow; public static final fun observeOn (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/flow/Flow; public static final fun onErrorResume (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow; diff --git a/kotlinx-coroutines-core/common/src/flow/Migration.kt b/kotlinx-coroutines-core/common/src/flow/Migration.kt index 684976ebeb..72b923756e 100644 --- a/kotlinx-coroutines-core/common/src/flow/Migration.kt +++ b/kotlinx-coroutines-core/common/src/flow/Migration.kt @@ -224,3 +224,18 @@ public fun Flow.compose(transformer: Flow.() -> Flow): Flow = replaceWith = ReplaceWith("drop(count)") ) public fun Flow.skip(count: Int): Flow = error("Should not be called") + +/** + * Flow extension to iterate over elements is [collect]. + * Foreach wasn't introduced deliberately to avoid confusion. + * Flow is not a collection, iteration over it may be not idempotent + * and can *launch* computations with side-effects. + * This behaviour is not reflected in [forEach] name. + * @suppress + */ +@Deprecated( + level = DeprecationLevel.ERROR, + message = "Flow analogue of 'forEach' is 'collect'", + replaceWith = ReplaceWith("collect(block)") +) +public fun Flow.forEach(action: suspend (value: T) -> Unit): Unit = error("Should not be called") \ No newline at end of file From d53f111b746fd11539087132494487a3d3056e0d Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Wed, 5 Jun 2019 18:57:01 +0300 Subject: [PATCH 6/6] Renaming, rebase --- .../kotlinx-coroutines-core.txt | 4 +- .../common/src/flow/Migration.kt | 9 +++- .../common/src/flow/operators/Transform.kt | 48 +++++++++---------- .../common/src/flow/terminal/Reduce.kt | 8 ++-- .../common/test/flow/operators/ScanTest.kt | 10 ++-- 5 files changed, 44 insertions(+), 35 deletions(-) diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt index d14feee0a4..1dcad707b1 100644 --- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt +++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt @@ -856,7 +856,8 @@ public final class kotlinx/coroutines/flow/FlowKt { public static final fun retry (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow; public static synthetic fun retry$default (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow; public static final fun sample (Lkotlinx/coroutines/flow/Flow;J)Lkotlinx/coroutines/flow/Flow; - public static final fun scan (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow; + public static final fun scan (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow; + public static final fun scanReduce (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow; public static final fun single (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun singleOrNull (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun switchMap (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; @@ -885,6 +886,7 @@ public final class kotlinx/coroutines/flow/MigrationKt { public static final fun observeOn (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/flow/Flow; public static final fun onErrorResume (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow; public static final fun publishOn (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/flow/Flow; + public static final fun scanFold (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow; public static final fun skip (Lkotlinx/coroutines/flow/Flow;I)Lkotlinx/coroutines/flow/Flow; public static final fun subscribe (Lkotlinx/coroutines/flow/Flow;)V public static final fun subscribe (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)V diff --git a/kotlinx-coroutines-core/common/src/flow/Migration.kt b/kotlinx-coroutines-core/common/src/flow/Migration.kt index 72b923756e..114a32e10d 100644 --- a/kotlinx-coroutines-core/common/src/flow/Migration.kt +++ b/kotlinx-coroutines-core/common/src/flow/Migration.kt @@ -238,4 +238,11 @@ public fun Flow.skip(count: Int): Flow = error("Should not be called") message = "Flow analogue of 'forEach' is 'collect'", replaceWith = ReplaceWith("collect(block)") ) -public fun Flow.forEach(action: suspend (value: T) -> Unit): Unit = error("Should not be called") \ No newline at end of file +public fun Flow.forEach(action: suspend (value: T) -> Unit): Unit = error("Should not be called") + +@Deprecated( + level = DeprecationLevel.ERROR, + message = "Flow has less verbose 'scan' shortcut", + replaceWith = ReplaceWith("scan(initial, operation)") +) +public fun Flow.scanFold(initial: R, @BuilderInference operation: suspend (accumulator: R, value: T) -> R): Flow = error("Should not be called") diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Transform.kt b/kotlinx-coroutines-core/common/src/flow/operators/Transform.kt index 60d521ef28..2ef4b97a9c 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Transform.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Transform.kt @@ -9,7 +9,7 @@ package kotlinx.coroutines.flow import kotlinx.coroutines.* -import kotlinx.coroutines.flow.internal.NullSurrogate +import kotlinx.coroutines.flow.internal.NULL import kotlin.jvm.* import kotlinx.coroutines.flow.unsafeFlow as flow @@ -101,44 +101,44 @@ public fun Flow.onEach(action: suspend (T) -> Unit): Flow = flow { } /** - * Reduces the given flow with [operation], emitting every intermediate result, including initial value. - * The first element is taken as initial value for operation accumulator. - * This operator has a sibling with initial value -- [accumulate]. - * + * Folds the given flow with [operation], emitting every intermediate result, including [initial] value. + * Note that initial value should be immutable (or should not be mutated) as it is shared between different collectors. * For example: * ``` - * flowOf(1, 2, 3, 4).scan { (v1, v2) -> v1 + v2 }.toList() + * flowOf(1, 2, 3).accumulate(emptyList()) { acc, value -> acc + value }.toList() * ``` - * will produce `[1, 3, 6, 10]` + * will produce `[], [1], [1, 2], [1, 2, 3]]`. */ @FlowPreview -public fun Flow.scan(operation: suspend (accumulator: T, value: T) -> T): Flow = flow { - var accumulator: Any? = NullSurrogate +public fun Flow.scan(initial: R, @BuilderInference operation: suspend (accumulator: R, value: T) -> R): Flow = flow { + var accumulator: R = initial + emit(accumulator) collect { value -> - accumulator = if (accumulator === NullSurrogate) { - value - } else { - operation(accumulator as T, value) - } - emit(accumulator as T) + accumulator = operation(accumulator, value) + emit(accumulator) } } /** - * Reduces the given flow with [operation], emitting every intermediate result, including [initial] value. - * Note that initial value should be immutable (or should not be mutated) as it is shared between different collectors. + * Reduces the given flow with [operation], emitting every intermediate result, including initial value. + * The first element is taken as initial value for operation accumulator. + * This operator has a sibling with initial value -- [scan]. + * * For example: * ``` - * flowOf(1, 2, 3).accumulate(emptyList()) { acc, value -> acc + value }.toList() + * flowOf(1, 2, 3, 4).scan { (v1, v2) -> v1 + v2 }.toList() * ``` - * will produce `[], [1], [1, 2], [1, 2, 3]]`. + * will produce `[1, 3, 6, 10]` */ @FlowPreview -public fun Flow.accumulate(initial: R, @BuilderInference operation: suspend (accumulator: R, value: T) -> R): Flow = flow { - var accumulator: R = initial - emit(accumulator) +public fun Flow.scanReduce(operation: suspend (accumulator: T, value: T) -> T): Flow = flow { + var accumulator: Any? = NULL collect { value -> - accumulator = operation(accumulator, value) - emit(accumulator) + accumulator = if (accumulator === NULL) { + value + } else { + operation(accumulator as T, value) + } + emit(accumulator as T) } } diff --git a/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt b/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt index f17f5eba9d..4eca3efaf6 100644 --- a/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt +++ b/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt @@ -89,7 +89,7 @@ public suspend fun Flow.singleOrNull(): T? { */ @FlowPreview public suspend fun Flow.first(): T { - var result: Any? = NullSurrogate + var result: Any? = NULL try { collect { value -> result = value @@ -99,7 +99,7 @@ public suspend fun Flow.first(): T { // Do nothing } - if (result === NullSurrogate) throw NoSuchElementException("Expected at least one element") + if (result === NULL) throw NoSuchElementException("Expected at least one element") return result as T } @@ -109,7 +109,7 @@ public suspend fun Flow.first(): T { */ @FlowPreview public suspend fun Flow.first(predicate: suspend (T) -> Boolean): T { - var result: Any? = NullSurrogate + var result: Any? = NULL try { collect { value -> if (predicate(value)) { @@ -121,6 +121,6 @@ public suspend fun Flow.first(predicate: suspend (T) -> Boolean): T { // Do nothing } - if (result === NullSurrogate) throw NoSuchElementException("Expected at least one element matching the predicate $predicate") + if (result === NULL) throw NoSuchElementException("Expected at least one element matching the predicate $predicate") return result as T } diff --git a/kotlinx-coroutines-core/common/test/flow/operators/ScanTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/ScanTest.kt index c13c09aeb9..d739f1a64f 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/ScanTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/ScanTest.kt @@ -12,27 +12,27 @@ class ScanTest : TestBase() { @Test fun testScan() = runTest { val flow = flowOf(1, 2, 3, 4, 5) - val result = flow.scan { acc, v -> acc + v }.toList() + val result = flow.scanReduce { acc, v -> acc + v }.toList() assertEquals(listOf(1, 3, 6, 10, 15), result) } @Test fun testScanWithInitial() = runTest { val flow = flowOf(1, 2, 3) - val result = flow.accumulate(emptyList()) { acc, value -> acc + value }.toList() + val result = flow.scan(emptyList()) { acc, value -> acc + value }.toList() assertEquals(listOf(emptyList(), listOf(1), listOf(1, 2), listOf(1, 2, 3)), result) } @Test fun testNulls() = runTest { val flow = flowOf(null, 2, null, null, null, 5) - val result = flow.scan { acc, v -> if (v == null) acc else (if (acc == null) v else acc + v) }.toList() + val result = flow.scanReduce { acc, v -> if (v == null) acc else (if (acc == null) v else acc + v) }.toList() assertEquals(listOf(null, 2, 2, 2, 2, 7), result) } @Test fun testEmptyFlow() = runTest { - val result = emptyFlow().scan { _, _ -> 1 }.toList() + val result = emptyFlow().scanReduce { _, _ -> 1 }.toList() assertTrue(result.isEmpty()) } @@ -49,7 +49,7 @@ class ScanTest : TestBase() { emit(1) emit(2) } - }.scan { _, value -> + }.scanReduce { _, value -> expect(value) // 2 latch.receive() throw TestException()