Skip to content

Commit db52e97

Browse files
committed
Flow.conflate operator
1 parent 3971df3 commit db52e97

File tree

4 files changed

+67
-4
lines changed

4 files changed

+67
-4
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt

+1
Original file line numberDiff line numberDiff line change
@@ -812,6 +812,7 @@ public final class kotlinx/coroutines/flow/FlowKt {
812812
public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function5;)Lkotlinx/coroutines/flow/Flow;
813813
public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function6;)Lkotlinx/coroutines/flow/Flow;
814814
public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;[Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
815+
public static final fun conflate (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
815816
public static final fun count (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
816817
public static final fun count (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
817818
public static final fun debounce (Lkotlinx/coroutines/flow/Flow;J)Lkotlinx/coroutines/flow/Flow;

kotlinx-coroutines-core/common/src/flow/operators/Context.kt

+39-3
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,11 @@ import kotlin.jvm.*
9494
* }
9595
* ```
9696
*
97+
* ### Conflation
98+
*
99+
* Usage of this function with [capacity] of [Channel.CONFLATED][Channel.CONFLATED] is provided as a shortcut via
100+
* [conflate] operator. See its documentation for details.
101+
*
97102
* @param capacity type/capacity of the buffer between coroutines. Allowed values are the same as in `Channel(...)`
98103
* factory function: [BUFFERED][Channel.BUFFERED] (by default), [CONFLATED][Channel.CONFLATED],
99104
* [RENDEZVOUS][Channel.RENDEZVOUS], [UNLIMITED][Channel.UNLIMITED] or a non-negative value indicating
@@ -110,9 +115,40 @@ public fun <T> Flow<T>.buffer(capacity: Int = BUFFERED): Flow<T> {
110115
ChannelFlowOperatorImpl(this, capacity = capacity)
111116
}
112117

113-
// todo: conflate would be a useful operator only when Channel.CONFLATE is changed to always deliver the last send value
114-
//@FlowPreview
115-
//public fun <T> Flow<T>.conflate(): Flow<T> = buffer(CONFLATED)
118+
/**
119+
* Conflates flow emissions via conflated channel and runs collector in a separate coroutine.
120+
* The effect of this is that emitter is never suspended due to a slow collector, but collector
121+
* always gets the most recent value emitted.
122+
*
123+
* For example, consider the flow that emits integers from 1 to 30 with 100 ms delay between them:
124+
*
125+
* ```
126+
* val flow = flow {
127+
* for (i in 1..30) {
128+
* delay(100)
129+
* emit(i)
130+
* }
131+
* }
132+
* ```
133+
*
134+
* Applying `conflate()` operator to it allows a collector that delays 1 second on each element to get
135+
* integers 1, 10, 20, 30:
136+
*
137+
* ```
138+
* val result = flow.conflate().onEach { delay(1000) }.toList()
139+
* assertEquals(listOf(1, 10, 20, 30), result)
140+
* ```
141+
*
142+
* Note that `conflate` operator is a shortcut for [buffer] with `capacity` of [Channel.CONFLATED][Channel.CONFLATED].
143+
*
144+
* ### Operator fusion
145+
*
146+
* Adjacent applications of `conflate`/[buffer], [channelFlow], [flowOn], [produceIn], and [broadcastIn] are
147+
* always fused so that only one properly configured channel is used for execution.
148+
* **Conflation takes precedence over `buffer()` calls with any other capacity.**
149+
*/
150+
@FlowPreview
151+
public fun <T> Flow<T>.conflate(): Flow<T> = buffer(CONFLATED)
116152

117153
/**
118154
* The operator that changes the context where this flow is executed to the given [context].

kotlinx-coroutines-core/common/test/flow/operators/BufferTest.kt

-1
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,6 @@ class BufferTest : TestBase() {
164164
}
165165

166166
@Test
167-
@Ignore // todo: conflated behavior is pretty useless right now, because closing channel overwrites last value
168167
fun testConflate() = runTest {
169168
expect(1)
170169
// emit all and conflate / then collect first & last
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.flow.operators
6+
7+
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.flow.*
9+
import kotlin.test.*
10+
11+
class ConflateTest : TestBase() {
12+
@Test // from example
13+
fun testExample() = withVirtualTime {
14+
expect(1)
15+
val flow = flow {
16+
for (i in 1..30) {
17+
delay(100)
18+
emit(i)
19+
}
20+
}
21+
val result = flow.conflate().onEach {
22+
delay(1000)
23+
}.toList()
24+
assertEquals(listOf(1, 10, 20, 30), result)
25+
finish(2)
26+
}
27+
}

0 commit comments

Comments
 (0)