Skip to content

Commit b0b5502

Browse files
committed
Introducing collectLatest terminal operator
Fixes #1269
1 parent af81b3c commit b0b5502

File tree

4 files changed

+103
-5
lines changed

4 files changed

+103
-5
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt

+2-1
Original file line numberDiff line numberDiff line change
@@ -847,6 +847,7 @@ public final class kotlinx/coroutines/flow/FlowKt {
847847
public static final fun collect (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
848848
public static final fun collect (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
849849
public static final fun collectIndexed (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
850+
public static final fun collectLatest (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
850851
public static final synthetic fun combine (Ljava/lang/Iterable;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
851852
public static final fun combine (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
852853
public static final fun combine (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function4;)Lkotlinx/coroutines/flow/Flow;
@@ -913,7 +914,7 @@ public final class kotlinx/coroutines/flow/FlowKt {
913914
public static final fun getDEFAULT_CONCURRENCY ()I
914915
public static final fun launchIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;)Lkotlinx/coroutines/Job;
915916
public static final fun map (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
916-
public static final fun mapLatest (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
917+
public static final fun mapLatest (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
917918
public static final fun mapNotNull (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
918919
public static final fun merge (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
919920
public static final fun observeOn (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/flow/Flow;

kotlinx-coroutines-core/common/src/flow/internal/Merge.kt

+8-3
Original file line numberDiff line numberDiff line change
@@ -48,12 +48,17 @@ internal class ChannelFlowMerge<T>(
4848
// The actual merge implementation with concurrency limit
4949
private suspend fun mergeImpl(scope: CoroutineScope, collector: ConcurrentFlowCollector<T>) {
5050
val semaphore = Semaphore(concurrency)
51-
@Suppress("UNCHECKED_CAST")
51+
val job: Job? = coroutineContext[Job]
5252
flow.collect { inner ->
53-
semaphore.acquire() // Acquire concurrency permit
53+
/*
54+
* We launch a coroutine on each emitted element and the only potential
55+
* suspension point in this collector is `semaphore.acquire` that rarely suspends,
56+
* so we manually check for cancellation to propagate it to the upstream in time.
57+
*/
58+
job?.ensureActive()
59+
semaphore.acquire()
5460
scope.launch {
5561
try {
56-
scope.ensureActive()
5762
inner.collect(collector)
5863
} finally {
5964
semaphore.release() // Release concurrency permit

kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt

+37-1
Original file line numberDiff line numberDiff line change
@@ -86,9 +86,45 @@ public suspend inline fun <T> Flow<T>.collectIndexed(crossinline action: suspend
8686
override suspend fun emit(value: T) = action(checkIndexOverflow(index++), value)
8787
})
8888

89+
/**
90+
* Terminal flow operator that collects the given flow with a provided [action].
91+
* The crucial difference from [collect] is that when the original flow emits a new value, [action] block for previous
92+
* value is cancelled.
93+
* It can be demonstrated by the following example:
94+
* ```
95+
* flow {
96+
* emit(1)
97+
* delay(50)
98+
* emit(2)
99+
* }.collectLatest { value ->
100+
* println("Collecting $value")
101+
* delay(100) // Emulate work
102+
* println("$value collected")
103+
* }
104+
* ```
105+
*
106+
* prints "Collecting 1, Collecting 2, 2 collected"
107+
*/
108+
@ExperimentalCoroutinesApi
109+
public suspend fun <T> Flow<T>.collectLatest(action: suspend (value: T) -> Unit) {
110+
/*
111+
* Implementation note:
112+
* buffer(0) is inserted here to fulfil user's expectations in sequential usages, e.g.:
113+
* ```
114+
* flowOf(1, 2, 3).collectLatest {
115+
* delay(1)
116+
* println(it) // Expect only 3 to be printed
117+
* }
118+
* ```
119+
*
120+
* It's not the case for intermediate operators which users mostly use for interactive UI,
121+
* where performance of dispatch is more important.
122+
*/
123+
mapLatest(action).buffer(0).collect()
124+
}
125+
89126
/**
90127
* Collects all the values from the given [flow] and emits them to the collector.
91-
*
92128
* It is a shorthand for `flow.collect { value -> emit(value) }`.
93129
*/
94130
@ExperimentalCoroutinesApi
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.flow
6+
7+
import kotlinx.coroutines.*
8+
import kotlin.test.*
9+
10+
class CollectLatestTest : TestBase() {
11+
@Test
12+
fun testNoSuspension() = runTest {
13+
flowOf(1, 2, 3).collectLatest {
14+
expect(it)
15+
}
16+
finish(4)
17+
}
18+
19+
@Test
20+
fun testSuspension() = runTest {
21+
flowOf(1, 2, 3).collectLatest {
22+
yield()
23+
expect(1)
24+
}
25+
finish(2)
26+
}
27+
28+
@Test
29+
fun testUpstreamErrorSuspension() = runTest({it is TestException}) {
30+
try {
31+
flow {
32+
emit(1)
33+
throw TestException()
34+
}.collectLatest { expect(1) }
35+
expectUnreached()
36+
} finally {
37+
finish(2)
38+
}
39+
}
40+
41+
@Test
42+
fun testDownstreamError() = runTest({it is TestException}) {
43+
try {
44+
flow {
45+
emit(1)
46+
hang { expect(1) }
47+
}.collectLatest {
48+
throw TestException()
49+
}
50+
expectUnreached()
51+
} finally {
52+
finish(2)
53+
}
54+
55+
}
56+
}

0 commit comments

Comments
 (0)