Skip to content

More operators #1236

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jun 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -822,10 +822,13 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static final fun distinctUntilChangedBy (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
public static final fun drop (Lkotlinx/coroutines/flow/Flow;I)Lkotlinx/coroutines/flow/Flow;
public static final fun dropWhile (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun emitAll (Lkotlinx/coroutines/flow/FlowCollector;Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun emptyFlow ()Lkotlinx/coroutines/flow/Flow;
public static final fun filter (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun filterNot (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun filterNotNull (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
public static final fun first (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun first (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun flatMapConcat (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun flatMapMerge (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun flatMapMerge$default (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
Expand Down Expand Up @@ -853,6 +856,8 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static final fun retry (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun retry$default (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun sample (Lkotlinx/coroutines/flow/Flow;J)Lkotlinx/coroutines/flow/Flow;
public static final fun scan (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
public static final fun scanReduce (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
public static final fun single (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun singleOrNull (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun switchMap (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
Expand All @@ -872,13 +877,17 @@ public final class kotlinx/coroutines/flow/MigrationKt {
public static final fun BehaviourSubject ()Ljava/lang/Object;
public static final fun PublishSubject ()Ljava/lang/Object;
public static final fun ReplaySubject ()Ljava/lang/Object;
public static final fun compose (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
public static final fun concatMap (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
public static final fun flatMap (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun flatten (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
public static final fun forEach (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)V
public static final fun merge (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
public static final fun observeOn (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/flow/Flow;
public static final fun onErrorResume (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
public static final fun publishOn (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/flow/Flow;
public static final fun scanFold (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
public static final fun skip (Lkotlinx/coroutines/flow/Flow;I)Lkotlinx/coroutines/flow/Flow;
public static final fun subscribe (Lkotlinx/coroutines/flow/Flow;)V
public static final fun subscribe (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)V
public static final fun subscribe (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;)V
Expand Down
53 changes: 53 additions & 0 deletions kotlinx-coroutines-core/common/src/flow/Migration.kt
Original file line number Diff line number Diff line change
Expand Up @@ -193,3 +193,56 @@ public fun <T> Flow<Flow<T>>.merge(): Flow<T> = error("Should not be called")
replaceWith = ReplaceWith("flattenConcat()")
)
public fun <T> Flow<Flow<T>>.flatten(): Flow<T> = error("Should not be called")

/**
* Kotlin has a built-in generic mechanism for making chained calls.
* If you wish to write something like
* ```
* myFlow.compose(MyFlowExtensions.ignoreErrors()).collect { ... }
* ```
* you can replace it with
*
* ```
* myFlow.let(MyFlowExtensions.ignoreErrors()).collect { ... }
* ```
*
* @suppress
*/
@Deprecated(
level = DeprecationLevel.ERROR,
message = "Kotlin analogue of compose is 'let'",
replaceWith = ReplaceWith("let(transformer)")
)
public fun <T, R> Flow<T>.compose(transformer: Flow<T>.() -> Flow<R>): Flow<R> = error("Should not be called")

/**
* @suppress
*/
@Deprecated(
level = DeprecationLevel.ERROR,
message = "Kotlin analogue of 'skip' is 'drop'",
replaceWith = ReplaceWith("drop(count)")
)
public fun <T> Flow<T>.skip(count: Int): Flow<T> = error("Should not be called")

/**
* Flow extension to iterate over elements is [collect].
* Foreach wasn't introduced deliberately to avoid confusion.
* Flow is not a collection, iteration over it may be not idempotent
* and can *launch* computations with side-effects.
* This behaviour is not reflected in [forEach] name.
* @suppress
*/
@Deprecated(
level = DeprecationLevel.ERROR,
message = "Flow analogue of 'forEach' is 'collect'",
replaceWith = ReplaceWith("collect(block)")
)
public fun <T> Flow<T>.forEach(action: suspend (value: T) -> Unit): Unit = error("Should not be called")

@Deprecated(
level = DeprecationLevel.ERROR,
message = "Flow has less verbose 'scan' shortcut",
replaceWith = ReplaceWith("scan(initial, operation)")
)
public fun <T, R> Flow<T>.scanFold(initial: R, @BuilderInference operation: suspend (accumulator: R, value: T) -> R): Flow<R> = error("Should not be called")
4 changes: 1 addition & 3 deletions kotlinx-coroutines-core/common/src/flow/operators/Errors.kt
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@ public fun <T> Flow<T>.onErrorCollect(
predicate: ExceptionPredicate = ALWAYS_TRUE
): Flow<T> = collectSafely { e ->
if (!predicate(e)) throw e
fallback.collect { value ->
emit(value)
}
emitAll(fallback)
}

/**
Expand Down
10 changes: 2 additions & 8 deletions kotlinx-coroutines-core/common/src/flow/operators/Merge.kt
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,7 @@ public fun <T, R> Flow<T>.flatMapMerge(
*/
@FlowPreview
public fun <T> Flow<Flow<T>>.flattenConcat(): Flow<T> = flow {
collect { value ->
value.collect { innerValue ->
emit(innerValue)
}
}
collect { value -> emitAll(value) }
}

/**
Expand Down Expand Up @@ -137,9 +133,7 @@ public fun <T, R> Flow<T>.switchMap(transform: suspend (value: T) -> Flow<R>): F
previousFlow?.join()
// Undispatched to have better user experience in case of synchronous flows
previousFlow = launch(start = CoroutineStart.UNDISPATCHED) {
transform(value).collect { innerValue ->
downstream.emit(innerValue)
}
downstream.emitAll(transform(value))
}
}
}
Expand Down
45 changes: 45 additions & 0 deletions kotlinx-coroutines-core/common/src/flow/operators/Transform.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@

@file:JvmMultifileClass
@file:JvmName("FlowKt")
@file:Suppress("UNCHECKED_CAST")

package kotlinx.coroutines.flow

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.internal.NULL
import kotlin.jvm.*
import kotlinx.coroutines.flow.unsafeFlow as flow

Expand Down Expand Up @@ -97,3 +99,46 @@ public fun <T> Flow<T>.onEach(action: suspend (T) -> Unit): Flow<T> = flow {
emit(value)
}
}

/**
* Folds the given flow with [operation], emitting every intermediate result, including [initial] value.
* Note that initial value should be immutable (or should not be mutated) as it is shared between different collectors.
* For example:
* ```
* flowOf(1, 2, 3).accumulate(emptyList<Int>()) { acc, value -> acc + value }.toList()
* ```
* will produce `[], [1], [1, 2], [1, 2, 3]]`.
*/
@FlowPreview
public fun <T, R> Flow<T>.scan(initial: R, @BuilderInference operation: suspend (accumulator: R, value: T) -> R): Flow<R> = flow {
var accumulator: R = initial
emit(accumulator)
collect { value ->
accumulator = operation(accumulator, value)
emit(accumulator)
}
}

/**
* Reduces the given flow with [operation], emitting every intermediate result, including initial value.
* The first element is taken as initial value for operation accumulator.
* This operator has a sibling with initial value -- [scan].
*
* For example:
* ```
* flowOf(1, 2, 3, 4).scan { (v1, v2) -> v1 + v2 }.toList()
* ```
* will produce `[1, 3, 6, 10]`
*/
@FlowPreview
public fun <T> Flow<T>.scanReduce(operation: suspend (accumulator: T, value: T) -> T): Flow<T> = flow {
var accumulator: Any? = NULL
collect { value ->
accumulator = if (accumulator === NULL) {
value
} else {
operation(accumulator as T, value)
}
emit(accumulator as T)
}
}
6 changes: 6 additions & 0 deletions kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,9 @@ public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value
collect(object : FlowCollector<T> {
override suspend fun emit(value: T) = action(value)
})

/**
* Collects all the values from the given [flow] and emits them to the collector.
* Shortcut for `flow.collect { value -> emit(value) }`.
*/
public suspend inline fun <T> FlowCollector<T>.emitAll(flow: Flow<T>) = flow.collect(this)
47 changes: 45 additions & 2 deletions kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

@file:JvmMultifileClass
@file:JvmName("FlowKt")
@file:Suppress("UNCHECKED_CAST")

package kotlinx.coroutines.flow

Expand Down Expand Up @@ -50,7 +51,7 @@ public suspend inline fun <T, R> Flow<T>.fold(
}

/**
* Terminal operator, that awaits for one and only one value to be published.
* The terminal operator, that awaits for one and only one value to be published.
* Throws [NoSuchElementException] for empty flow and [IllegalStateException] for flow
* that contains more than one element.
*/
Expand All @@ -68,7 +69,7 @@ public suspend fun <T> Flow<T>.single(): T {
}

/**
* Terminal operator, that awaits for one and only one value to be published.
* The terminal operator, that awaits for one and only one value to be published.
* Throws [IllegalStateException] for flow that contains more than one element.
*/
@FlowPreview
Expand All @@ -81,3 +82,45 @@ public suspend fun <T: Any> Flow<T>.singleOrNull(): T? {

return result
}

/**
* The terminal operator that returns the first element emitted by the flow and then cancels flow's collection.
* Throws [NoSuchElementException] if the flow was empty.
*/
@FlowPreview
public suspend fun <T> Flow<T>.first(): T {
var result: Any? = NULL
try {
collect { value ->
result = value
throw AbortFlowException()
}
} catch (e: AbortFlowException) {
// Do nothing
}

if (result === NULL) throw NoSuchElementException("Expected at least one element")
return result as T
}

/**
* The terminal operator that returns the first element emitted by the flow matching the given [predicate] and then cancels flow's collection.
* Throws [NoSuchElementException] if the flow has not contained elements matching the [predicate].
*/
@FlowPreview
public suspend fun <T> Flow<T>.first(predicate: suspend (T) -> Boolean): T {
var result: Any? = NULL
try {
collect { value ->
if (predicate(value)) {
result = value
throw AbortFlowException()
}
}
} catch (e: AbortFlowException) {
// Do nothing
}

if (result === NULL) throw NoSuchElementException("Expected at least one element matching the predicate $predicate")
return result as T
}
68 changes: 68 additions & 0 deletions kotlinx-coroutines-core/common/test/flow/operators/ScanTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.flow

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlin.test.*

class ScanTest : TestBase() {
@Test
fun testScan() = runTest {
val flow = flowOf(1, 2, 3, 4, 5)
val result = flow.scanReduce { acc, v -> acc + v }.toList()
assertEquals(listOf(1, 3, 6, 10, 15), result)
}

@Test
fun testScanWithInitial() = runTest {
val flow = flowOf(1, 2, 3)
val result = flow.scan(emptyList<Int>()) { acc, value -> acc + value }.toList()
assertEquals(listOf(emptyList(), listOf(1), listOf(1, 2), listOf(1, 2, 3)), result)
}

@Test
fun testNulls() = runTest {
val flow = flowOf(null, 2, null, null, null, 5)
val result = flow.scanReduce { acc, v -> if (v == null) acc else (if (acc == null) v else acc + v) }.toList()
assertEquals(listOf(null, 2, 2, 2, 2, 7), result)
}

@Test
fun testEmptyFlow() = runTest {
val result = emptyFlow<Int>().scanReduce { _, _ -> 1 }.toList()
assertTrue(result.isEmpty())
}

@Test
fun testErrorCancelsUpstream() = runTest {
expect(1)
val latch = Channel<Unit>()
val flow = flow {
coroutineScope {
launch {
latch.send(Unit)
hang { expect(3) }
}
emit(1)
emit(2)
}
}.scanReduce { _, value ->
expect(value) // 2
latch.receive()
throw TestException()
}.onErrorCollect(emptyFlow())

assertEquals(1, flow.single())
finish(4)
}

public operator fun <T> Collection<T>.plus(element: T): List<T> {
val result = ArrayList<T>(size + 1)
result.addAll(this)
result.add(element)
return result
}
}
Loading