Skip to content

Commit dfcf2ae

Browse files
committed
Renaming switchMap to flatMapLatest to better reflect its semantics and to have a consistent and meaningful naming scheme for the rest of the 'latest' operators
* Make flatMapLatest pure, do not leak cancellation behaviour to downstream * Make *latest buffered by default to amortize constant re-dispatch cost * Introducing transformLatest * Introducing mapLatest Fixes #1335
1 parent bba3931 commit dfcf2ae

File tree

8 files changed

+456
-191
lines changed

8 files changed

+456
-191
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt

+3
Original file line numberDiff line numberDiff line change
@@ -890,6 +890,7 @@ public final class kotlinx/coroutines/flow/FlowKt {
890890
public static final fun first (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
891891
public static final fun flatMap (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
892892
public static final fun flatMapConcat (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
893+
public static final fun flatMapLatest (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
893894
public static final fun flatMapMerge (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
894895
public static synthetic fun flatMapMerge$default (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
895896
public static final fun flatten (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
@@ -912,6 +913,7 @@ public final class kotlinx/coroutines/flow/FlowKt {
912913
public static final fun getDEFAULT_CONCURRENCY ()I
913914
public static final fun launchIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;)Lkotlinx/coroutines/Job;
914915
public static final fun map (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
916+
public static final fun mapLatest (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
915917
public static final fun mapNotNull (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
916918
public static final fun merge (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
917919
public static final fun observeOn (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/flow/Flow;
@@ -956,6 +958,7 @@ public final class kotlinx/coroutines/flow/FlowKt {
956958
public static final fun toSet (Lkotlinx/coroutines/flow/Flow;Ljava/util/Set;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
957959
public static synthetic fun toSet$default (Lkotlinx/coroutines/flow/Flow;Ljava/util/Set;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
958960
public static final fun transform (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
961+
public static final fun transformLatest (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
959962
public static final fun unsafeTransform (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
960963
public static final fun withContext (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function1;)V
961964
public static final fun withIndex (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;

kotlinx-coroutines-core/common/src/flow/Migration.kt

+7
Original file line numberDiff line numberDiff line change
@@ -404,3 +404,10 @@ public inline fun <T1, T2, T3, T4, T5, R> Flow<T1>.combineLatest(
404404
other4: Flow<T5>,
405405
crossinline transform: suspend (T1, T2, T3, T4, T5) -> R
406406
): Flow<R> = combine(this, other, other2, other3, other4, transform)
407+
408+
@Deprecated(
409+
level = DeprecationLevel.ERROR,
410+
message = "Flow analogues of 'switchMap' are 'transformLatest', 'flatMapLatest' and 'mapLatest'",
411+
replaceWith = ReplaceWith("this.flatMapLatest(transform)")
412+
)
413+
public fun <T, R> Flow<T>.switchMap(transform: suspend (value: T) -> Flow<R>): Flow<R> = noImpl()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.flow.internal
6+
7+
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.channels.*
9+
import kotlinx.coroutines.flow.*
10+
import kotlinx.coroutines.sync.*
11+
import kotlin.coroutines.*
12+
13+
internal class ChannelFlowTransformLatest<T, R>(
14+
private val transform: suspend FlowCollector<R>.(value: T) -> Unit,
15+
flow: Flow<T>,
16+
context: CoroutineContext = EmptyCoroutineContext,
17+
capacity: Int = Channel.BUFFERED
18+
) : ChannelFlowOperator<T, R>(flow, context, capacity) {
19+
override fun create(context: CoroutineContext, capacity: Int): ChannelFlow<R> =
20+
ChannelFlowTransformLatest(transform, flow, context, capacity)
21+
22+
override suspend fun flowCollect(collector: FlowCollector<R>) {
23+
flowScope {
24+
var previousFlow: Job? = null
25+
flow.collect { value ->
26+
previousFlow?.apply {
27+
cancel(ChildCancelledException())
28+
join()
29+
}
30+
// Do not pay for dispatch here, it's never necessary
31+
previousFlow = launch(start = CoroutineStart.UNDISPATCHED) {
32+
collector.transform(value)
33+
}
34+
}
35+
}
36+
}
37+
}
38+
39+
internal class ChannelFlowMerge<T>(
40+
flow: Flow<Flow<T>>,
41+
private val concurrency: Int,
42+
context: CoroutineContext = EmptyCoroutineContext,
43+
capacity: Int = Channel.OPTIONAL_CHANNEL
44+
) : ChannelFlowOperator<Flow<T>, T>(flow, context, capacity) {
45+
override fun create(context: CoroutineContext, capacity: Int): ChannelFlow<T> =
46+
ChannelFlowMerge(flow, concurrency, context, capacity)
47+
48+
// The actual merge implementation with concurrency limit
49+
private suspend fun mergeImpl(scope: CoroutineScope, collector: ConcurrentFlowCollector<T>) {
50+
val semaphore = Semaphore(concurrency)
51+
@Suppress("UNCHECKED_CAST")
52+
flow.collect { inner ->
53+
semaphore.acquire() // Acquire concurrency permit
54+
scope.launch {
55+
try {
56+
scope.ensureActive()
57+
inner.collect(collector)
58+
} finally {
59+
semaphore.release() // Release concurrency permit
60+
}
61+
}
62+
}
63+
}
64+
65+
// Fast path in ChannelFlowOperator calls this function (channel was not created yet)
66+
override suspend fun flowCollect(collector: FlowCollector<T>) {
67+
// this function should not have been invoked when channel was explicitly requested
68+
assert { capacity == Channel.OPTIONAL_CHANNEL }
69+
flowScope {
70+
mergeImpl(this, collector.asConcurrentFlowCollector())
71+
}
72+
}
73+
74+
// Slow path when output channel is required (and was created)
75+
override suspend fun collectTo(scope: ProducerScope<T>) =
76+
mergeImpl(scope, SendingCollector(scope))
77+
78+
override fun additionalToStringProps(): String =
79+
"concurrency=$concurrency, "
80+
}

kotlinx-coroutines-core/common/src/flow/operators/Merge.kt

+56-68
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,8 @@ package kotlinx.coroutines.flow
1010

1111
import kotlinx.coroutines.*
1212
import kotlinx.coroutines.channels.*
13-
import kotlinx.coroutines.channels.Channel.Factory.OPTIONAL_CHANNEL
1413
import kotlinx.coroutines.flow.internal.*
1514
import kotlinx.coroutines.internal.*
16-
import kotlinx.coroutines.sync.*
17-
import kotlin.coroutines.*
1815
import kotlin.jvm.*
1916
import kotlinx.coroutines.flow.internal.unsafeFlow as flow
2017

@@ -105,85 +102,76 @@ public fun <T> Flow<Flow<T>>.flattenMerge(concurrency: Int = DEFAULT_CONCURRENCY
105102
return if (concurrency == 1) flattenConcat() else ChannelFlowMerge(this, concurrency)
106103
}
107104

105+
/**
106+
* Returns a flow that produces element by [transform] function every time the original flow emits a value.
107+
* When the original flow emits a new value, the previous `transform` block is cancelled, thus the name `transformLatest`.
108+
*
109+
* For example, the following flow:
110+
* ```
111+
* flow {
112+
* emit("a")
113+
* delay(100)
114+
* emit("b")
115+
* }.transformLatest { value ->
116+
* emit(value)
117+
* delay(200)
118+
* emit(value + "_last")
119+
* }
120+
* ```
121+
* produces `a b b_last`.
122+
*
123+
* This operator is [buffered][buffer] by default and size of its output buffer can be changed by applying subsequent [buffer] operator.
124+
*/
125+
@ExperimentalCoroutinesApi
126+
public fun <T, R> Flow<T>.transformLatest(@BuilderInference transform: suspend FlowCollector<R>.(value: T) -> Unit): Flow<R> =
127+
ChannelFlowTransformLatest(transform, this)
128+
108129
/**
109130
* Returns a flow that switches to a new flow produced by [transform] function every time the original flow emits a value.
110-
* When switch on the a flow is performed, the previous one is cancelled.
131+
* When the original flow emits a new value, the previous flow produced by `transform` block is cancelled.
111132
*
112133
* For example, the following flow:
113134
* ```
114135
* flow {
115136
* emit("a")
116137
* delay(100)
117138
* emit("b")
118-
* }.switchMap { value ->
139+
* }.flatMapLatest { value ->
119140
* flow {
120-
* emit(value + value)
141+
* emit(value)
121142
* delay(200)
122143
* emit(value + "_last")
123144
* }
124145
* }
125146
* ```
126-
* produces `aa bb b_last`
147+
* produces `a b b_last`
148+
*
149+
* This operator is [buffered][buffer] by default and size of its output buffer can be changed by applying subsequent [buffer] operator.
127150
*/
128-
@FlowPreview
129-
public fun <T, R> Flow<T>.switchMap(transform: suspend (value: T) -> Flow<R>): Flow<R> = scopedFlow { downstream ->
130-
var previousFlow: Job? = null
131-
collect { value ->
132-
// Linearize calls to emit as alternative to the channel. Bonus points for never-overlapping channels.
133-
previousFlow?.cancel(ChildCancelledException())
134-
previousFlow?.join()
135-
// Undispatched to have better user experience in case of synchronous flows
136-
previousFlow = launch(start = CoroutineStart.UNDISPATCHED) {
137-
downstream.emitAll(transform(value))
138-
}
139-
}
140-
}
141-
142-
private class ChannelFlowMerge<T>(
143-
flow: Flow<Flow<T>>,
144-
private val concurrency: Int,
145-
context: CoroutineContext = EmptyCoroutineContext,
146-
capacity: Int = OPTIONAL_CHANNEL
147-
) : ChannelFlowOperator<Flow<T>, T>(flow, context, capacity) {
148-
override fun create(context: CoroutineContext, capacity: Int): ChannelFlow<T> =
149-
ChannelFlowMerge(flow, concurrency, context, capacity)
150-
151-
// The actual merge implementation with concurrency limit
152-
private suspend fun mergeImpl(scope: CoroutineScope, collector: ConcurrentFlowCollector<T>) {
153-
val semaphore = Semaphore(concurrency)
154-
val job: Job? = coroutineContext[Job]
155-
flow.collect { inner ->
156-
/*
157-
* We launch a coroutine on each emitted element and the only potential
158-
* suspension point in this collector is `semaphore.acquire` that rarely suspends,
159-
* so we manually check for cancellation to propagate it to the upstream in time.
160-
*/
161-
job?.ensureActive()
162-
semaphore.acquire() // Acquire concurrency permit
163-
scope.launch {
164-
try {
165-
inner.collect(collector)
166-
} finally {
167-
semaphore.release() // Release concurrency permit
168-
}
169-
}
170-
}
171-
}
172-
173-
// Fast path in ChannelFlowOperator calls this function (channel was not created yet)
174-
override suspend fun flowCollect(collector: FlowCollector<T>) {
175-
// this function should not have been invoked when channel was explicitly requested
176-
assert { capacity == OPTIONAL_CHANNEL }
177-
flowScope {
178-
mergeImpl(this, collector.asConcurrentFlowCollector())
179-
}
180-
}
181-
182-
// Slow path when output channel is required (and was created)
183-
override suspend fun collectTo(scope: ProducerScope<T>) =
184-
mergeImpl(scope, SendingCollector(scope))
185-
186-
override fun additionalToStringProps(): String =
187-
"concurrency=$concurrency, "
188-
}
151+
@ExperimentalCoroutinesApi
152+
public fun <T, R> Flow<T>.flatMapLatest(@BuilderInference transform: (value: T) -> Flow<R>): Flow<R> =
153+
transformLatest { emitAll(transform(it)) }
189154

155+
/**
156+
* Returns a flow that emits elements from the original flow transformed by [transform] function.
157+
* When the original flow emits a new value, computation of the [transform] block for previous value is cancelled.
158+
*
159+
* For example, the following flow:
160+
* ```
161+
* flow {
162+
* emit("a")
163+
* delay(100)
164+
* emit("b")
165+
* }.mapLatest { value ->
166+
* println("Started computing $value")
167+
* delay(200)
168+
* "Computed $value"
169+
* }
170+
* ```
171+
* will print "Started computing 1" and "Started computing 2", but the resulting flow will contain only "Computed 2" value.
172+
*
173+
* This operator is [buffered][buffer] by default and size of its output buffer can be changed by applying subsequent [buffer] operator.
174+
*/
175+
@ExperimentalCoroutinesApi
176+
public fun <T, R> Flow<T>.mapLatest(@BuilderInference transform: suspend (value: T) -> R): Flow<R> =
177+
transformLatest { emit(transform(it)) }

0 commit comments

Comments
 (0)