Skip to content

Commit f40a4b7

Browse files
committed
Flow operators:
* switchMap * debounce * sample * Update RxJava version to 2.2.8 Partially fixes #1107
1 parent 1748ce1 commit f40a4b7

File tree

14 files changed

+856
-7
lines changed

14 files changed

+856
-7
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt

+3
Original file line numberDiff line numberDiff line change
@@ -796,6 +796,7 @@ public final class kotlinx/coroutines/flow/FlowKt {
796796
public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
797797
public static final fun count (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
798798
public static final fun count (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
799+
public static final fun debounce (Lkotlinx/coroutines/flow/Flow;J)Lkotlinx/coroutines/flow/Flow;
799800
public static final fun delayEach (Lkotlinx/coroutines/flow/Flow;J)Lkotlinx/coroutines/flow/Flow;
800801
public static final fun delayFlow (Lkotlinx/coroutines/flow/Flow;J)Lkotlinx/coroutines/flow/Flow;
801802
public static final fun distinctUntilChanged (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
@@ -833,8 +834,10 @@ public final class kotlinx/coroutines/flow/FlowKt {
833834
public static final fun reduce (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
834835
public static final fun retry (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
835836
public static synthetic fun retry$default (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
837+
public static final fun sample (Lkotlinx/coroutines/flow/Flow;J)Lkotlinx/coroutines/flow/Flow;
836838
public static final fun single (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
837839
public static final fun singleOrNull (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
840+
public static final fun switchMap (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
838841
public static final fun take (Lkotlinx/coroutines/flow/Flow;I)Lkotlinx/coroutines/flow/Flow;
839842
public static final fun takeWhile (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
840843
public static final fun toCollection (Lkotlinx/coroutines/flow/Flow;Ljava/util/Collection;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;

gradle.properties

+1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ bintray_version=1.8.4-jetbrains-5
1313
byte_buddy_version=1.9.3
1414
reactor_vesion=3.2.5.RELEASE
1515
reactive_streams_version=1.0.2
16+
rxjava2_version=2.2.8
1617
artifactory_plugin_version=4.7.3
1718

1819
# JS

kotlinx-coroutines-core/common/src/flow/operators/Delay.kt

+135-1
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,11 @@
88
package kotlinx.coroutines.flow
99

1010
import kotlinx.coroutines.*
11-
import kotlinx.coroutines.flow.unsafeFlow as flow
11+
import kotlinx.coroutines.channels.*
12+
import kotlinx.coroutines.flow.internal.*
13+
import kotlinx.coroutines.selects.*
1214
import kotlin.jvm.*
15+
import kotlinx.coroutines.flow.unsafeFlow as flow
1316

1417
/**
1518
* Delays the emission of values from this flow for the given [timeMillis].
@@ -32,3 +35,134 @@ public fun <T> Flow<T>.delayEach(timeMillis: Long): Flow<T> = flow {
3235
emit(value)
3336
}
3437
}
38+
39+
/**
40+
* Returns a flow that mirrors the original flow, but filters out values
41+
* that are followed by the newer values within the given [timeout][timeoutMs].
42+
* The latest value is always emitted.
43+
* Example:
44+
* ```
45+
* flow {
46+
* emit(1)
47+
* delay(99)
48+
* emit(2)
49+
* delay(99)
50+
* emit(3)
51+
* delay(1001)
52+
* emit(4)
53+
* delay(1001)
54+
* emit(5)
55+
* }.debounce(1000)
56+
* ```
57+
* will produce `3, 4, 5`.
58+
*/
59+
public fun <T> Flow<T>.debounce(timeoutMs: Long): Flow<T> {
60+
require(timeoutMs > 0) { "Debounce timeout should be positive" }
61+
return flow {
62+
coroutineScope {
63+
val values = Channel<Any>(Channel.CONFLATED)
64+
// Channel is not closed deliberately as there is no close with value
65+
val collector = launch {
66+
try {
67+
collect { value -> values.send(value ?: NullSurrogate) }
68+
} catch (e: Throwable) {
69+
values.close(e) // Workaround for #1130
70+
throw e
71+
}
72+
}
73+
74+
var isDone = false
75+
var lastValue: Any? = null
76+
while (!isDone) {
77+
select<Unit> {
78+
values.onReceive {
79+
lastValue = it
80+
}
81+
82+
onTimeout(timeoutMs) {
83+
val value = lastValue ?: return@onTimeout
84+
lastValue = null // Consume the value
85+
emit(NullSurrogate.unbox(value))
86+
}
87+
88+
// Close with value 'idiom'
89+
collector.onJoin {
90+
if (lastValue != null) emit(NullSurrogate.unbox(lastValue))
91+
isDone = true
92+
}
93+
}
94+
}
95+
}
96+
}
97+
}
98+
99+
/**
100+
* Returns a flow that emits only the latest value emitted by the original flow during the given sampling [period][periodMs].
101+
* Example:
102+
* ```
103+
* flow {
104+
* repeat(10) {
105+
* emit(it)
106+
* delay(50)
107+
* }
108+
* }.sample(100)
109+
* ```
110+
* will produce `1, 3, 5, 7, 9`.
111+
* Note that the latest element is not emitted if it does not fit into the sampling window.
112+
*/
113+
public fun <T> Flow<T>.sample(periodMs: Long): Flow<T> {
114+
require(periodMs > 0) { "Sample period should be positive" }
115+
return flow {
116+
coroutineScope {
117+
val values = Channel<Any>(Channel.CONFLATED)
118+
// Channel is not closed deliberately as there is no close with value
119+
launch {
120+
try {
121+
collect { value -> values.send(value ?: NullSurrogate) }
122+
} catch (e: Throwable) {
123+
values.close(e) // Workaround for #1130
124+
throw e
125+
} finally {
126+
values.close()
127+
}
128+
}
129+
130+
var isDone = false
131+
var lastValue: Any? = null
132+
val ticker = fixedPeriodTicker(periodMs, periodMs)
133+
while (!isDone) {
134+
select<Unit> {
135+
values.onReceiveOrNull {
136+
if (it == null) {
137+
ticker.cancel()
138+
isDone = true
139+
} else {
140+
lastValue = it
141+
}
142+
}
143+
144+
ticker.onReceive {
145+
val value = lastValue ?: return@onReceive
146+
lastValue = null // Consume the value
147+
emit(NullSurrogate.unbox(value))
148+
}
149+
}
150+
}
151+
}
152+
}
153+
}
154+
155+
/*
156+
* TODO this design (and design of the corresponding operator) depends on #540
157+
*/
158+
internal fun CoroutineScope.fixedPeriodTicker(delayMillis: Long, initialDelayMillis: Long = delayMillis): ReceiveChannel<Unit> {
159+
require(delayMillis >= 0) { "Expected non-negative delay, but has $delayMillis ms" }
160+
require(initialDelayMillis >= 0) { "Expected non-negative initial delay, but has $initialDelayMillis ms" }
161+
return produce(capacity = 0) {
162+
delay(initialDelayMillis)
163+
while (true) {
164+
channel.send(Unit)
165+
delay(delayMillis)
166+
}
167+
}
168+
}

kotlinx-coroutines-core/common/src/flow/operators/Merge.kt

+39-4
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,9 @@ public fun <T, R> Flow<T>.flatMapMerge(concurrency: Int = 16, bufferSize: Int =
4545
require(bufferSize >= 0) { "Expected non-negative buffer size, but had $bufferSize" }
4646
require(concurrency >= 0) { "Expected non-negative concurrency level, but had $concurrency" }
4747
return flow {
48-
val semaphore = Channel<Unit>(concurrency)
49-
val flatMap = SerializingFlatMapCollector(this, bufferSize)
5048
coroutineScope {
49+
val semaphore = Channel<Unit>(concurrency)
50+
val flatMap = SerializingFlatMapCollector(this@flow, bufferSize)
5151
collect { outerValue ->
5252
// TODO real semaphore (#94)
5353
semaphore.send(Unit) // Acquire concurrency permit
@@ -89,11 +89,46 @@ public fun <T> Flow<Flow<T>>.flattenConcat(): Flow<T> = flow {
8989
@FlowPreview
9090
public fun <T> Flow<Flow<T>>.flattenMerge(concurrency: Int = 16, bufferSize: Int = 16): Flow<T> = flatMapMerge(concurrency, bufferSize) { it }
9191

92+
/**
93+
* Returns a flow that switches to the new flow produced by [transform] function every time the original flow emits a value.
94+
* When switch on the newer flow is performed, the previous one is cancelled.
95+
*
96+
* For example, the following flow:
97+
* ```
98+
* flow {
99+
* emit("a")
100+
* delay(100)
101+
* emit("b")
102+
* }.switchMap { value ->
103+
* flow {
104+
* emit(value + value)
105+
* delay(200)
106+
* emit(value + "_last")
107+
* }
108+
* }
109+
* ```
110+
* will produce `aa bb b_last`
111+
*/
112+
@FlowPreview
113+
public fun <T, R> Flow<T>.switchMap(transform: suspend (value: T) -> Flow<R>): Flow<R> = flow {
114+
coroutineScope {
115+
var previousFlow: Job? = null
116+
collect { value ->
117+
// Linearize calls to emit as alternative to the channel. Bonus points for never-overlapping channels.
118+
previousFlow?.cancelAndJoin()
119+
// Undispatched to have better user experience in case of synchronous flows
120+
previousFlow = launch(start = CoroutineStart.UNDISPATCHED) {
121+
transform(value).collect { innerValue ->
122+
emit(innerValue)
123+
}
124+
}
125+
}
126+
}
127+
}
92128

93129
// Effectively serializes access to downstream collector from flatMap
94130
private class SerializingFlatMapCollector<T>(
95-
private val downstream: FlowCollector<T>,
96-
private val bufferSize: Int
131+
private val downstream: FlowCollector<T>, bufferSize: Int
97132
) {
98133

99134
// Let's try to leverage the fact that flatMapMerge is never contended

kotlinx-coroutines-core/common/test/TestBase.common.kt

+1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ public expect open class TestBase constructor() {
1717
public fun expect(index: Int)
1818
public fun expectUnreached()
1919
public fun finish(index: Int)
20+
public fun ensureFinished() // Ensures that 'finish' was invoked
2021
public fun reset() // Resets counter and finish flag. Workaround for parametrized tests absence in common
2122

2223
public fun runTest(
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines
6+
7+
import kotlin.coroutines.*
8+
import kotlin.jvm.*
9+
10+
private class VirtualTimeDispatcher(enclosingScope: CoroutineScope) : CoroutineDispatcher(), Delay {
11+
12+
private val originalDispatcher = enclosingScope.coroutineContext[ContinuationInterceptor] as CoroutineDispatcher
13+
private val heap = ArrayList<TimedTask>() // TODO use MPP heap/ordered set implementation
14+
private var currentTime = 0L
15+
16+
init {
17+
/*
18+
* Launch "event-loop-owning" task on start of the virtual time event loop.
19+
* It ensures the progress of the enclosing event-loop and polls the timed queue
20+
* when the enclosing event loop is empty, emulating virtual time.
21+
*/
22+
enclosingScope.launch {
23+
while (true) {
24+
val secret = ThreadLocalEventLoop.currentOrNull()?.processNextEvent()
25+
?: error("Event loop is missing, virtual time source works only as part of event loop")
26+
if (secret <= 0) continue
27+
if (secret > 0 && secret != Long.MAX_VALUE) error("Unexpected external delay: $secret")
28+
val nextTask = heap.minBy { it.deadline } ?: return@launch
29+
heap.remove(nextTask)
30+
currentTime = nextTask.deadline
31+
nextTask.run()
32+
}
33+
}
34+
}
35+
36+
private inner class TimedTask(
37+
private val runnable: Runnable,
38+
@JvmField val deadline: Long
39+
) : DisposableHandle, Runnable by runnable {
40+
41+
override fun dispose() {
42+
heap.remove(this)
43+
}
44+
}
45+
46+
override fun dispatch(context: CoroutineContext, block: Runnable) {
47+
originalDispatcher.dispatch(context, block)
48+
}
49+
50+
@ExperimentalCoroutinesApi
51+
override fun isDispatchNeeded(context: CoroutineContext): Boolean = originalDispatcher.isDispatchNeeded(context)
52+
53+
override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
54+
val task = TimedTask(block, currentTime + timeMillis)
55+
heap += task
56+
return task
57+
}
58+
59+
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
60+
val task = TimedTask(Runnable { with(continuation) { resumeUndispatched(Unit) } }, currentTime + timeMillis)
61+
heap += task
62+
continuation.invokeOnCancellation { task.dispose() }
63+
}
64+
}
65+
66+
/**
67+
* Runs a test ([TestBase.runTest]) with a virtual time source.
68+
* This runner has the following constraints:
69+
* 1) It works only in the event-loop environment and it is relying on it.
70+
* None of the coroutines should be launched in any dispatcher different from a current
71+
* 2) Regular tasks always dominate delayed ones. It means that
72+
* `launch { while(true) yield() }` will block the progress of the delayed tasks
73+
* 3) [TestBase.finish] should always be invoked.
74+
* Given all the constraints into account, it is easy to mess up with a test and actually
75+
* return from [withVirtualTime] before the test is executed completely.
76+
* To decrease the probability of such error, additional `finish` constraint is added.
77+
*/
78+
public fun TestBase.withVirtualTime(block: suspend CoroutineScope.() -> Unit) = runTest {
79+
withContext(Dispatchers.Unconfined) {
80+
// Create a platform-independent event loop
81+
val dispatcher = VirtualTimeDispatcher(this)
82+
withContext(dispatcher) { block() }
83+
ensureFinished()
84+
}
85+
}

0 commit comments

Comments
 (0)