Skip to content

Commit c0d8603

Browse files
committed
Flow operators review: docs tweaks and code style fixes in tests
1 parent af2f68e commit c0d8603

File tree

6 files changed

+76
-51
lines changed

6 files changed

+76
-51
lines changed

kotlinx-coroutines-core/common/src/flow/operators/Delay.kt

+35-29
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,7 @@ import kotlinx.coroutines.flow.unsafeFlow as flow
2020
@FlowPreview
2121
public fun <T> Flow<T>.delayFlow(timeMillis: Long): Flow<T> = flow {
2222
delay(timeMillis)
23-
collect { value ->
24-
emit(value)
25-
}
23+
collect(this@flow)
2624
}
2725

2826
/**
@@ -38,26 +36,30 @@ public fun <T> Flow<T>.delayEach(timeMillis: Long): Flow<T> = flow {
3836

3937
/**
4038
* Returns a flow that mirrors the original flow, but filters out values
41-
* that are followed by the newer values within the given [timeout][timeoutMs].
39+
* that are followed by the newer values within the given [timeout][timeoutMillis].
4240
* The latest value is always emitted.
41+
*
4342
* Example:
4443
* ```
4544
* flow {
46-
* emit(1)
47-
* delay(99)
48-
* emit(2)
49-
* delay(99)
50-
* emit(3)
51-
* delay(1001)
52-
* emit(4)
53-
* delay(1001)
54-
* emit(5)
45+
* emit(1)
46+
* delay(99)
47+
* emit(2)
48+
* delay(99)
49+
* emit(3)
50+
* delay(1001)
51+
* emit(4)
52+
* delay(1001)
53+
* emit(5)
5554
* }.debounce(1000)
5655
* ```
57-
* will produce `3, 4, 5`.
56+
* produces `3, 4, 5`.
57+
*
58+
* Note that the resulting flow does not emit anything as long as the original flow emits
59+
* items faster than every [timeoutMillis] milliseconds.
5860
*/
59-
public fun <T> Flow<T>.debounce(timeoutMs: Long): Flow<T> {
60-
require(timeoutMs > 0) { "Debounce timeout should be positive" }
61+
public fun <T> Flow<T>.debounce(timeoutMillis: Long): Flow<T> {
62+
require(timeoutMillis > 0) { "Debounce timeout should be positive" }
6163
return flow {
6264
coroutineScope {
6365
val values = Channel<Any?>(Channel.CONFLATED) // Actually Any, KT-30796
@@ -79,10 +81,11 @@ public fun <T> Flow<T>.debounce(timeoutMs: Long): Flow<T> {
7981
lastValue = it
8082
}
8183

82-
onTimeout(timeoutMs) {
83-
val value = lastValue ?: return@onTimeout
84-
lastValue = null // Consume the value
85-
emit(NullSurrogate.unbox(value))
84+
lastValue?.let { value -> // set timeout when lastValue != null
85+
onTimeout(timeoutMillis) {
86+
lastValue = null // Consume the value
87+
emit(NullSurrogate.unbox(value))
88+
}
8689
}
8790

8891
// Close with value 'idiom'
@@ -97,21 +100,23 @@ public fun <T> Flow<T>.debounce(timeoutMs: Long): Flow<T> {
97100
}
98101

99102
/**
100-
* Returns a flow that emits only the latest value emitted by the original flow during the given sampling [period][periodMs].
103+
* Returns a flow that emits only the latest value emitted by the original flow during the given sampling [period][periodMillis].
104+
*
101105
* Example:
102106
* ```
103107
* flow {
104-
* repeat(10) {
105-
* emit(it)
106-
* delay(50)
107-
* }
108+
* repeat(10) {
109+
* emit(it)
110+
* delay(50)
111+
* }
108112
* }.sample(100)
109113
* ```
110-
* will produce `1, 3, 5, 7, 9`.
114+
* produces `1, 3, 5, 7, 9`.
115+
*
111116
* Note that the latest element is not emitted if it does not fit into the sampling window.
112117
*/
113-
public fun <T> Flow<T>.sample(periodMs: Long): Flow<T> {
114-
require(periodMs > 0) { "Sample period should be positive" }
118+
public fun <T> Flow<T>.sample(periodMillis: Long): Flow<T> {
119+
require(periodMillis > 0) { "Sample period should be positive" }
115120
return flow {
116121
coroutineScope {
117122
val values = produce<Any?>(capacity = Channel.CONFLATED) { // Actually Any, KT-30796
@@ -120,7 +125,7 @@ public fun <T> Flow<T>.sample(periodMs: Long): Flow<T> {
120125

121126
var isDone = false
122127
var lastValue: Any? = null
123-
val ticker = fixedPeriodTicker(periodMs, periodMs)
128+
val ticker = fixedPeriodTicker(periodMillis)
124129
while (!isDone) {
125130
select<Unit> {
126131
values.onReceiveOrNull {
@@ -132,6 +137,7 @@ public fun <T> Flow<T>.sample(periodMs: Long): Flow<T> {
132137
}
133138
}
134139

140+
// todo: shall be start sampling only when an element arrives or sample aways as here?
135141
ticker.onReceive {
136142
val value = lastValue ?: return@onReceive
137143
lastValue = null // Consume the value

kotlinx-coroutines-core/common/src/flow/operators/Merge.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,8 @@ public fun <T> Flow<Flow<T>>.flattenConcat(): Flow<T> = flow {
9090
public fun <T> Flow<Flow<T>>.flattenMerge(concurrency: Int = 16, bufferSize: Int = 16): Flow<T> = flatMapMerge(concurrency, bufferSize) { it }
9191

9292
/**
93-
* Returns a flow that switches to the new flow produced by [transform] function every time the original flow emits a value.
94-
* When switch on the newer flow is performed, the previous one is cancelled.
93+
* Returns a flow that switches to a new flow produced by [transform] function every time the original flow emits a value.
94+
* When switch on the a flow is performed, the previous one is cancelled.
9595
*
9696
* For example, the following flow:
9797
* ```
@@ -107,7 +107,7 @@ public fun <T> Flow<Flow<T>>.flattenMerge(concurrency: Int = 16, bufferSize: Int
107107
* }
108108
* }
109109
* ```
110-
* will produce `aa bb b_last`
110+
* produces `aa bb b_last`
111111
*/
112112
@FlowPreview
113113
public fun <T, R> Flow<T>.switchMap(transform: suspend (value: T) -> Flow<R>): Flow<R> = flow {

kotlinx-coroutines-core/common/test/flow/VirtualTime.kt

+4-4
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import kotlin.jvm.*
1010
private class VirtualTimeDispatcher(enclosingScope: CoroutineScope) : CoroutineDispatcher(), Delay {
1111

1212
private val originalDispatcher = enclosingScope.coroutineContext[ContinuationInterceptor] as CoroutineDispatcher
13-
private val heap = ArrayList<TimedTask>() // TODO use MPP heap/ordered set implementation
13+
private val heap = ArrayList<TimedTask>() // TODO use MPP heap/ordered set implementation (commonize ThreadSafeHeap)
1414
private var currentTime = 0L
1515

1616
init {
@@ -21,10 +21,10 @@ private class VirtualTimeDispatcher(enclosingScope: CoroutineScope) : CoroutineD
2121
*/
2222
enclosingScope.launch {
2323
while (true) {
24-
val secret = ThreadLocalEventLoop.currentOrNull()?.processNextEvent()
24+
val delayNanos = ThreadLocalEventLoop.currentOrNull()?.processNextEvent()
2525
?: error("Event loop is missing, virtual time source works only as part of event loop")
26-
if (secret <= 0) continue
27-
if (secret > 0 && secret != Long.MAX_VALUE) error("Unexpected external delay: $secret")
26+
if (delayNanos <= 0) continue
27+
if (delayNanos > 0 && delayNanos != Long.MAX_VALUE) error("Unexpected external delay: $delayNanos")
2828
val nextTask = heap.minBy { it.deadline } ?: return@launch
2929
heap.remove(nextTask)
3030
currentTime = nextTask.deadline

kotlinx-coroutines-core/common/test/flow/operators/CombineLatestTest.kt

+10-10
Original file line numberDiff line numberDiff line change
@@ -16,44 +16,44 @@ class CombineLatestTest : TestBase() {
1616
fun testCombineLatest() = runTest {
1717
val flow = flowOf("a", "b", "c")
1818
val flow2 = flowOf(1, 2, 3)
19-
val list = flow.combineLatest(flow2, { i, j -> i + j }).toList()
19+
val list = flow.combineLatest(flow2) { i, j -> i + j }.toList()
2020
assertEquals(listOf("a1", "b1", "b2", "c2", "c3"), list)
2121
}
2222

2323
@Test
2424
fun testNulls() = runTest {
2525
val flow = flowOf("a", null, null)
2626
val flow2 = flowOf(1, 2, 3)
27-
val list = flow.combineLatest(flow2, { i, j -> i + j }).toList()
27+
val list = flow.combineLatest(flow2, { i, j -> i + j }).toList()
2828
assertEquals(listOf("a1", "null1", "null2", "null2", "null3"), list)
2929
}
3030

3131
@Test
3232
fun testNullsOther() = runTest {
3333
val flow = flowOf("a", "b", "c")
3434
val flow2 = flowOf(null, 2, null)
35-
val list = flow.combineLatest(flow2, { i, j -> i + j }).toList()
35+
val list = flow.combineLatest(flow2, { i, j -> i + j }).toList()
3636
assertEquals(listOf("anull", "bnull", "b2", "c2", "cnull"), list)
3737
}
3838

3939
@Test
4040
fun testEmptyFlow() = runTest {
41-
val flow = emptyFlow<String>().combineLatest(emptyFlow<Int>(), { i, j -> i + j })
41+
val flow = emptyFlow<String>().combineLatest(emptyFlow<Int>(), { i, j -> i + j })
4242
assertNull(flow.singleOrNull())
4343
}
4444

4545
@Test
4646
fun testFirstIsEmpty() = runTest {
4747
val f1 = emptyFlow<String>()
4848
val f2 = flowOf(1)
49-
assertEquals(emptyList(), f1.combineLatest(f2, { i, j -> i + j }).toList())
49+
assertEquals(emptyList(), f1.combineLatest(f2) { i, j -> i + j }.toList())
5050
}
5151

5252
@Test
5353
fun testSecondIsEmpty() = runTest {
5454
val f1 = flowOf("a")
5555
val f2 = emptyFlow<Int>()
56-
assertEquals(emptyList(), f1.combineLatest(f2, { i, j -> i + j }).toList())
56+
assertEquals(emptyList(), f1.combineLatest(f2) { i, j -> i + j }.toList())
5757
}
5858

5959
@Test
@@ -80,7 +80,7 @@ class CombineLatestTest : TestBase() {
8080
emit(3)
8181
}
8282

83-
val result = f1.combineLatest(f2, { i, j -> i + j }).toList()
83+
val result = f1.combineLatest(f2) { i, j -> i + j }.toList()
8484
assertEquals(listOf("a1", "b1", "c1", "c2", "c3"), result)
8585
finish(8)
8686
}
@@ -137,7 +137,7 @@ class CombineLatestTest : TestBase() {
137137
}
138138
}
139139

140-
val value = withContext(NamedDispatchers("main")) {
140+
val value = withContext(NamedDispatchers("main")) {
141141
f1.combineLatest(f2) { i, j ->
142142
assertEquals("main", NamedDispatchers.name())
143143
expect(5)
@@ -170,8 +170,8 @@ class CombineLatestTest : TestBase() {
170170
expect(1)
171171
i + j
172172
}.flowOn(NamedDispatchers("combine")).onEach {
173-
throw TestException()
174-
}
173+
throw TestException()
174+
}
175175

176176
assertFailsWith<TestException>(flow)
177177
finish(4)

kotlinx-coroutines-core/common/test/flow/operators/SampleTest.kt

+19
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,25 @@ class SampleTest : TestBase() {
129129
assertNull(flow.singleOrNull())
130130
}
131131

132+
@Test
133+
// note that this test depends on the sampling strategy -- when sampling time starts on a quiescent flow that suddenly emits
134+
fun testLongWait() = withVirtualTime {
135+
expect(1)
136+
val flow = flow {
137+
expect(2)
138+
emit("A")
139+
delay(3500) // long delay -- multiple sampling intervals
140+
emit("B")
141+
delay(900) // crosses time = 4000 barrier
142+
emit("C")
143+
delay(3000) // long wait again
144+
145+
}
146+
val result = flow.sample(1000).toList()
147+
assertEquals(listOf("A", "B", "C"), result)
148+
finish(3)
149+
}
150+
132151
@Test
133152
fun testPace() = withVirtualTime {
134153
val flow = flow {

kotlinx-coroutines-core/common/test/flow/operators/SwitchMapTest.kt

+5-5
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,11 @@ class SwitchMapTest : TestBase() {
4242
emit(2)
4343
emit(4)
4444
}.flowOn(NamedDispatchers("source")).switchMap { value ->
45-
flow {
46-
assertEquals("switch$value", NamedDispatchers.name())
47-
emit(value)
48-
expect(value)
49-
}.flowOn(NamedDispatchers("switch$value"))
45+
flow {
46+
assertEquals("switch$value", NamedDispatchers.name())
47+
emit(value)
48+
expect(value)
49+
}.flowOn(NamedDispatchers("switch$value"))
5050
}.onEach {
5151
expect(it + 1)
5252
assertEquals("main", NamedDispatchers.nameOr("main"))

0 commit comments

Comments
 (0)