Skip to content

Commit fe820ba

Browse files
authored
Flow operators: (#1132)
Flow operators: * switchMap * debounce * sample * Update RxJava version to 2.2.8 Partially fixes #1107
1 parent 9e9c9a3 commit fe820ba

File tree

16 files changed

+901
-36
lines changed

16 files changed

+901
-36
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt

+3
Original file line numberDiff line numberDiff line change
@@ -796,6 +796,7 @@ public final class kotlinx/coroutines/flow/FlowKt {
796796
public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
797797
public static final fun count (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
798798
public static final fun count (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
799+
public static final fun debounce (Lkotlinx/coroutines/flow/Flow;J)Lkotlinx/coroutines/flow/Flow;
799800
public static final fun delayEach (Lkotlinx/coroutines/flow/Flow;J)Lkotlinx/coroutines/flow/Flow;
800801
public static final fun delayFlow (Lkotlinx/coroutines/flow/Flow;J)Lkotlinx/coroutines/flow/Flow;
801802
public static final fun distinctUntilChanged (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
@@ -833,8 +834,10 @@ public final class kotlinx/coroutines/flow/FlowKt {
833834
public static final fun reduce (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
834835
public static final fun retry (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
835836
public static synthetic fun retry$default (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
837+
public static final fun sample (Lkotlinx/coroutines/flow/Flow;J)Lkotlinx/coroutines/flow/Flow;
836838
public static final fun single (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
837839
public static final fun singleOrNull (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
840+
public static final fun switchMap (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
838841
public static final fun take (Lkotlinx/coroutines/flow/Flow;I)Lkotlinx/coroutines/flow/Flow;
839842
public static final fun takeWhile (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
840843
public static final fun toCollection (Lkotlinx/coroutines/flow/Flow;Ljava/util/Collection;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;

gradle.properties

+1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ bintray_version=1.8.4-jetbrains-5
1313
byte_buddy_version=1.9.3
1414
reactor_vesion=3.2.5.RELEASE
1515
reactive_streams_version=1.0.2
16+
rxjava2_version=2.2.8
1617
artifactory_plugin_version=4.7.3
1718

1819
# JS

kotlinx-coroutines-core/common/src/flow/operators/Delay.kt

+135-4
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,19 @@
88
package kotlinx.coroutines.flow
99

1010
import kotlinx.coroutines.*
11-
import kotlinx.coroutines.flow.unsafeFlow as flow
11+
import kotlinx.coroutines.channels.*
12+
import kotlinx.coroutines.flow.internal.*
13+
import kotlinx.coroutines.selects.*
1214
import kotlin.jvm.*
15+
import kotlinx.coroutines.flow.unsafeFlow as flow
1316

1417
/**
1518
* Delays the emission of values from this flow for the given [timeMillis].
1619
*/
1720
@FlowPreview
1821
public fun <T> Flow<T>.delayFlow(timeMillis: Long): Flow<T> = flow {
1922
delay(timeMillis)
20-
collect { value ->
21-
emit(value)
22-
}
23+
collect(this@flow)
2324
}
2425

2526
/**
@@ -32,3 +33,133 @@ public fun <T> Flow<T>.delayEach(timeMillis: Long): Flow<T> = flow {
3233
emit(value)
3334
}
3435
}
36+
37+
/**
38+
* Returns a flow that mirrors the original flow, but filters out values
39+
* that are followed by the newer values within the given [timeout][timeoutMillis].
40+
* The latest value is always emitted.
41+
*
42+
* Example:
43+
* ```
44+
* flow {
45+
* emit(1)
46+
* delay(99)
47+
* emit(2)
48+
* delay(99)
49+
* emit(3)
50+
* delay(1001)
51+
* emit(4)
52+
* delay(1001)
53+
* emit(5)
54+
* }.debounce(1000)
55+
* ```
56+
* produces `3, 4, 5`.
57+
*
58+
* Note that the resulting flow does not emit anything as long as the original flow emits
59+
* items faster than every [timeoutMillis] milliseconds.
60+
*/
61+
public fun <T> Flow<T>.debounce(timeoutMillis: Long): Flow<T> {
62+
require(timeoutMillis > 0) { "Debounce timeout should be positive" }
63+
return flow {
64+
coroutineScope {
65+
val values = Channel<Any?>(Channel.CONFLATED) // Actually Any, KT-30796
66+
// Channel is not closed deliberately as there is no close with value
67+
val collector = launch {
68+
try {
69+
collect { value -> values.send(value ?: NullSurrogate) }
70+
} catch (e: Throwable) {
71+
values.close(e) // Workaround for #1130
72+
throw e
73+
}
74+
}
75+
76+
var isDone = false
77+
var lastValue: Any? = null
78+
while (!isDone) {
79+
select<Unit> {
80+
values.onReceive {
81+
lastValue = it
82+
}
83+
84+
lastValue?.let { value -> // set timeout when lastValue != null
85+
onTimeout(timeoutMillis) {
86+
lastValue = null // Consume the value
87+
emit(NullSurrogate.unbox(value))
88+
}
89+
}
90+
91+
// Close with value 'idiom'
92+
collector.onJoin {
93+
if (lastValue != null) emit(NullSurrogate.unbox(lastValue))
94+
isDone = true
95+
}
96+
}
97+
}
98+
}
99+
}
100+
}
101+
102+
/**
103+
* Returns a flow that emits only the latest value emitted by the original flow during the given sampling [period][periodMillis].
104+
*
105+
* Example:
106+
* ```
107+
* flow {
108+
* repeat(10) {
109+
* emit(it)
110+
* delay(50)
111+
* }
112+
* }.sample(100)
113+
* ```
114+
* produces `1, 3, 5, 7, 9`.
115+
*
116+
* Note that the latest element is not emitted if it does not fit into the sampling window.
117+
*/
118+
public fun <T> Flow<T>.sample(periodMillis: Long): Flow<T> {
119+
require(periodMillis > 0) { "Sample period should be positive" }
120+
return flow {
121+
coroutineScope {
122+
val values = produce<Any?>(capacity = Channel.CONFLATED) { // Actually Any, KT-30796
123+
collect { value -> send(value ?: NullSurrogate) }
124+
}
125+
126+
var isDone = false
127+
var lastValue: Any? = null
128+
val ticker = fixedPeriodTicker(periodMillis)
129+
while (!isDone) {
130+
select<Unit> {
131+
values.onReceiveOrNull {
132+
if (it == null) {
133+
ticker.cancel()
134+
isDone = true
135+
} else {
136+
lastValue = it
137+
}
138+
}
139+
140+
// todo: shall be start sampling only when an element arrives or sample aways as here?
141+
ticker.onReceive {
142+
val value = lastValue ?: return@onReceive
143+
lastValue = null // Consume the value
144+
emit(NullSurrogate.unbox(value))
145+
}
146+
}
147+
}
148+
}
149+
}
150+
}
151+
152+
/*
153+
* TODO this design (and design of the corresponding operator) depends on #540
154+
*/
155+
internal fun CoroutineScope.fixedPeriodTicker(delayMillis: Long, initialDelayMillis: Long = delayMillis): ReceiveChannel<Unit> {
156+
require(delayMillis >= 0) { "Expected non-negative delay, but has $delayMillis ms" }
157+
require(initialDelayMillis >= 0) { "Expected non-negative initial delay, but has $initialDelayMillis ms" }
158+
return produce(capacity = 0) {
159+
delay(initialDelayMillis)
160+
while (true) {
161+
channel.send(Unit)
162+
delay(delayMillis)
163+
}
164+
}
165+
}

kotlinx-coroutines-core/common/src/flow/operators/Merge.kt

+40-5
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,9 @@ public fun <T, R> Flow<T>.flatMapMerge(concurrency: Int = 16, bufferSize: Int =
4545
require(bufferSize >= 0) { "Expected non-negative buffer size, but had $bufferSize" }
4646
require(concurrency >= 0) { "Expected non-negative concurrency level, but had $concurrency" }
4747
return flow {
48-
val semaphore = Channel<Unit>(concurrency)
49-
val flatMap = SerializingFlatMapCollector(this, bufferSize)
5048
coroutineScope {
49+
val semaphore = Channel<Unit>(concurrency)
50+
val flatMap = SerializingFlatMapCollector(this@flow, bufferSize)
5151
collect { outerValue ->
5252
// TODO real semaphore (#94)
5353
semaphore.send(Unit) // Acquire concurrency permit
@@ -89,15 +89,50 @@ public fun <T> Flow<Flow<T>>.flattenConcat(): Flow<T> = flow {
8989
@FlowPreview
9090
public fun <T> Flow<Flow<T>>.flattenMerge(concurrency: Int = 16, bufferSize: Int = 16): Flow<T> = flatMapMerge(concurrency, bufferSize) { it }
9191

92+
/**
93+
* Returns a flow that switches to a new flow produced by [transform] function every time the original flow emits a value.
94+
* When switch on the a flow is performed, the previous one is cancelled.
95+
*
96+
* For example, the following flow:
97+
* ```
98+
* flow {
99+
* emit("a")
100+
* delay(100)
101+
* emit("b")
102+
* }.switchMap { value ->
103+
* flow {
104+
* emit(value + value)
105+
* delay(200)
106+
* emit(value + "_last")
107+
* }
108+
* }
109+
* ```
110+
* produces `aa bb b_last`
111+
*/
112+
@FlowPreview
113+
public fun <T, R> Flow<T>.switchMap(transform: suspend (value: T) -> Flow<R>): Flow<R> = flow {
114+
coroutineScope {
115+
var previousFlow: Job? = null
116+
collect { value ->
117+
// Linearize calls to emit as alternative to the channel. Bonus points for never-overlapping channels.
118+
previousFlow?.cancelAndJoin()
119+
// Undispatched to have better user experience in case of synchronous flows
120+
previousFlow = launch(start = CoroutineStart.UNDISPATCHED) {
121+
transform(value).collect { innerValue ->
122+
emit(innerValue)
123+
}
124+
}
125+
}
126+
}
127+
}
92128

93129
// Effectively serializes access to downstream collector from flatMap
94130
private class SerializingFlatMapCollector<T>(
95-
private val downstream: FlowCollector<T>,
96-
private val bufferSize: Int
131+
private val downstream: FlowCollector<T>, bufferSize: Int
97132
) {
98133

99134
// Let's try to leverage the fact that flatMapMerge is never contended
100-
// TODO 1.2.1 do not allocate channel
135+
// TODO do not allocate channel
101136
private val channel = Channel<Any?>(bufferSize) // Should be any, but KT-30796
102137
private val inProgressLock = atomic(false)
103138

kotlinx-coroutines-core/common/test/TestBase.common.kt

+7-14
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ public expect open class TestBase constructor() {
1717
public fun expect(index: Int)
1818
public fun expectUnreached()
1919
public fun finish(index: Int)
20+
public fun ensureFinished() // Ensures that 'finish' was invoked
2021
public fun reset() // Resets counter and finish flag. Workaround for parametrized tests absence in common
2122

2223
public fun runTest(
@@ -44,20 +45,12 @@ public inline fun <reified T : Throwable> assertFailsWith(block: () -> Unit) {
4445
}
4546

4647
public suspend inline fun <reified T : Throwable> assertFailsWith(flow: Flow<*>) {
47-
var e: Throwable? = null
48-
var completed = false
49-
flow.launchIn(CoroutineScope(Dispatchers.Unconfined)) {
50-
onEach {}
51-
catch<Throwable> {
52-
e = it
53-
}
54-
finally {
55-
completed = true
56-
assertTrue(it is T)
57-
}
58-
}.join()
59-
assertTrue(e is T)
60-
assertTrue(completed)
48+
try {
49+
flow.collect { /* Do nothing */ }
50+
fail("Should be unreached")
51+
} catch (e: Throwable) {
52+
assertTrue(e is T)
53+
}
6154
}
6255

6356
public suspend fun Flow<Int>.sum() = fold(0) { acc, value -> acc + value }
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines
6+
7+
import kotlin.coroutines.*
8+
import kotlin.jvm.*
9+
10+
private class VirtualTimeDispatcher(enclosingScope: CoroutineScope) : CoroutineDispatcher(), Delay {
11+
12+
private val originalDispatcher = enclosingScope.coroutineContext[ContinuationInterceptor] as CoroutineDispatcher
13+
private val heap = ArrayList<TimedTask>() // TODO use MPP heap/ordered set implementation (commonize ThreadSafeHeap)
14+
private var currentTime = 0L
15+
16+
init {
17+
/*
18+
* Launch "event-loop-owning" task on start of the virtual time event loop.
19+
* It ensures the progress of the enclosing event-loop and polls the timed queue
20+
* when the enclosing event loop is empty, emulating virtual time.
21+
*/
22+
enclosingScope.launch {
23+
while (true) {
24+
val delayNanos = ThreadLocalEventLoop.currentOrNull()?.processNextEvent()
25+
?: error("Event loop is missing, virtual time source works only as part of event loop")
26+
if (delayNanos <= 0) continue
27+
if (delayNanos > 0 && delayNanos != Long.MAX_VALUE) error("Unexpected external delay: $delayNanos")
28+
val nextTask = heap.minBy { it.deadline } ?: return@launch
29+
heap.remove(nextTask)
30+
currentTime = nextTask.deadline
31+
nextTask.run()
32+
}
33+
}
34+
}
35+
36+
private inner class TimedTask(
37+
private val runnable: Runnable,
38+
@JvmField val deadline: Long
39+
) : DisposableHandle, Runnable by runnable {
40+
41+
override fun dispose() {
42+
heap.remove(this)
43+
}
44+
}
45+
46+
override fun dispatch(context: CoroutineContext, block: Runnable) {
47+
originalDispatcher.dispatch(context, block)
48+
}
49+
50+
@ExperimentalCoroutinesApi
51+
override fun isDispatchNeeded(context: CoroutineContext): Boolean = originalDispatcher.isDispatchNeeded(context)
52+
53+
override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
54+
val task = TimedTask(block, currentTime + timeMillis)
55+
heap += task
56+
return task
57+
}
58+
59+
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
60+
val task = TimedTask(Runnable { with(continuation) { resumeUndispatched(Unit) } }, currentTime + timeMillis)
61+
heap += task
62+
continuation.invokeOnCancellation { task.dispose() }
63+
}
64+
}
65+
66+
/**
67+
* Runs a test ([TestBase.runTest]) with a virtual time source.
68+
* This runner has the following constraints:
69+
* 1) It works only in the event-loop environment and it is relying on it.
70+
* None of the coroutines should be launched in any dispatcher different from a current
71+
* 2) Regular tasks always dominate delayed ones. It means that
72+
* `launch { while(true) yield() }` will block the progress of the delayed tasks
73+
* 3) [TestBase.finish] should always be invoked.
74+
* Given all the constraints into account, it is easy to mess up with a test and actually
75+
* return from [withVirtualTime] before the test is executed completely.
76+
* To decrease the probability of such error, additional `finish` constraint is added.
77+
*/
78+
public fun TestBase.withVirtualTime(block: suspend CoroutineScope.() -> Unit) = runTest {
79+
withContext(Dispatchers.Unconfined) {
80+
// Create a platform-independent event loop
81+
val dispatcher = VirtualTimeDispatcher(this)
82+
withContext(dispatcher) { block() }
83+
ensureFinished()
84+
}
85+
}

kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt

+9-1
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,22 @@ class ChannelFlowTest : TestBase() {
2323

2424
@Test
2525
fun testExceptionInBroadcast() = runTest {
26+
expect(1)
2627
val channel = broadcast(NonCancellable) { // otherwise failure will cancel scope as well
2728
repeat(10) {
2829
send(it + 1)
2930
}
3031
throw TestException()
3132
}
3233
assertEquals(15, channel.asFlow().take(5).sum())
33-
assertFailsWith<TestException>(channel.asFlow())
34+
35+
// Workaround for JS bug
36+
try {
37+
channel.asFlow().collect { /* Do nothing */ }
38+
expectUnreached()
39+
} catch (e: TestException) {
40+
finish(2)
41+
}
3442
}
3543

3644
@Test

0 commit comments

Comments
 (0)