Skip to content

Commit 73cd4d7

Browse files
committed
StateFlow stress tests (#2857)
1 parent a00ad4d commit 73cd4d7

File tree

2 files changed

+85
-0
lines changed

2 files changed

+85
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.flow
6+
7+
import kotlinx.coroutines.*
8+
import kotlin.random.*
9+
import kotlin.test.*
10+
11+
// Simplified version of StateFlowStressTest
12+
class StateFlowCommonStressTest : TestBase() {
13+
private val state = MutableStateFlow<Long>(0)
14+
15+
@Test
16+
fun testSingleEmitterAndCollector() = runTest {
17+
var collected = 0L
18+
val collector = launch(Dispatchers.Default) {
19+
// collect, but abort and collect again after every 1000 values to stress allocation/deallocation
20+
do {
21+
val batchSize = Random.nextInt(1..1000)
22+
var index = 0
23+
val cnt = state.onEach { value ->
24+
// the first value in batch is allowed to repeat, but cannot go back
25+
val ok = if (index++ == 0) value >= collected else value > collected
26+
check(ok) {
27+
"Values must be monotonic, but $value is not, was $collected"
28+
}
29+
collected = value
30+
}.take(batchSize).map { 1 }.sum()
31+
} while (cnt == batchSize)
32+
}
33+
34+
var current = 1L
35+
val emitter = launch {
36+
while (true) {
37+
state.value = current++
38+
if (current % 1000 == 0L) yield() // make it cancellable
39+
}
40+
}
41+
42+
delay(3000)
43+
emitter.cancelAndJoin()
44+
collector.cancelAndJoin()
45+
assertTrue { current >= collected / 2 }
46+
}
47+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.flow
6+
7+
import kotlinx.coroutines.*
8+
import kotlin.test.*
9+
import kotlin.test.Test
10+
11+
class StateFlowUpdateCommonTest : TestBase() {
12+
private val iterations = 100_000 * stressTestMultiplier
13+
14+
@Test
15+
fun testUpdate() = doTest { update { it + 1 } }
16+
17+
@Test
18+
fun testUpdateAndGet() = doTest { updateAndGet { it + 1 } }
19+
20+
@Test
21+
fun testGetAndUpdate() = doTest { getAndUpdate { it + 1 } }
22+
23+
private fun doTest(increment: MutableStateFlow<Int>.() -> Unit) = runTest {
24+
val flow = MutableStateFlow(0)
25+
val j1 = launch(Dispatchers.Default) {
26+
repeat(iterations / 2) {
27+
flow.increment()
28+
}
29+
}
30+
31+
repeat(iterations / 2) {
32+
flow.increment()
33+
}
34+
35+
joinAll(j1)
36+
assertEquals(iterations, flow.value)
37+
}
38+
}

0 commit comments

Comments
 (0)