-
Notifications
You must be signed in to change notification settings - Fork 1.9k
/
Copy pathSharedFlowStressTest.kt
87 lines (77 loc) · 2.69 KB
/
SharedFlowStressTest.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.flow
import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import org.junit.*
import org.junit.Test
import kotlin.collections.ArrayList
import kotlin.test.*
import kotlin.time.*
@ExperimentalTime
class SharedFlowStressTest : TestBase() {
private val nProducers = 5
private val nConsumers = 3
private val nSeconds = 3 * stressTestMultiplier
private lateinit var sf: MutableSharedFlow<Long>
private lateinit var view: SharedFlow<Long>
@get:Rule
val producerDispatcher = ExecutorRule(nProducers)
@get:Rule
val consumerDispatcher = ExecutorRule(nConsumers)
private val totalProduced = atomic(0L)
private val totalConsumed = atomic(0L)
@Test
fun testStressReplay1() =
testStress(1, 0)
@Test
fun testStressReplay1ExtraBuffer1() =
testStress(1, 1)
@Test
fun testStressReplay2ExtraBuffer1() =
testStress(2, 1)
private fun testStress(replay: Int, extraBufferCapacity: Int) = runTest {
sf = MutableSharedFlow(replay, extraBufferCapacity)
view = sf.asSharedFlow()
val jobs = ArrayList<Job>()
jobs += List(nProducers) { producerIndex ->
launch(producerDispatcher) {
var cur = producerIndex.toLong()
while (isActive) {
sf.emit(cur)
totalProduced.incrementAndGet()
cur += nProducers
}
}
}
jobs += List(nConsumers) { consumerIndex ->
launch(consumerDispatcher) {
while (isActive) {
view
.dropWhile { it % nConsumers != consumerIndex.toLong() }
.take(1)
.collect {
check(it % nConsumers == consumerIndex.toLong())
totalConsumed.incrementAndGet()
}
}
}
}
var lastProduced = 0L
var lastConsumed = 0L
for (sec in 1..nSeconds) {
delay(1.seconds)
val produced = totalProduced.value
val consumed = totalConsumed.value
println("$sec sec: produced = $produced; consumed = $consumed")
assertNotEquals(lastProduced, produced)
assertNotEquals(lastConsumed, consumed)
lastProduced = produced
lastConsumed = consumed
}
jobs.forEach { it.cancel() }
jobs.forEach { it.join() }
println("total: produced = ${totalProduced.value}; consumed = ${totalConsumed.value}")
}
}