-
Notifications
You must be signed in to change notification settings - Fork 1.9k
/
Copy pathBroadcastChannelSubStressTest.kt
57 lines (53 loc) · 2.13 KB
/
BroadcastChannelSubStressTest.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package kotlinx.coroutines.channels
import kotlinx.coroutines.testing.*
import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import kotlin.test.*
/**
* Creates a broadcast channel and repeatedly opens new subscription, receives event, closes it,
* to stress test the logic of opening the subscription
* to broadcast channel while events are being concurrently sent to it.
*/
class BroadcastChannelSubStressTest: TestBase() {
private val nSeconds = maxOf(5, stressTestMultiplier)
private val sentTotal = atomic(0L)
private val receivedTotal = atomic(0L)
@Test
fun testStress() = runTest {
TestBroadcastChannelKind.entries.forEach { kind ->
println("--- BroadcastChannelSubStressTest $kind")
val broadcast = kind.create<Long>()
val sender =
launch(context = Dispatchers.Default + CoroutineName("Sender")) {
while (isActive) {
broadcast.send(sentTotal.incrementAndGet())
}
}
val receiver =
launch(context = Dispatchers.Default + CoroutineName("Receiver")) {
var last = -1L
while (isActive) {
val channel = broadcast.openSubscription()
val i = channel.receive()
check(i >= last) { "Last was $last, got $i" }
if (!kind.isConflated) check(i != last) { "Last was $last, got it again" }
receivedTotal.incrementAndGet()
last = i
channel.cancel()
}
}
var prevSent = -1L
repeat(nSeconds) { sec ->
delay(1000)
val curSent = sentTotal.value
println("${sec + 1}: Sent $curSent, received ${receivedTotal.value}")
check(curSent > prevSent) { "Send stalled at $curSent events" }
prevSent = curSent
}
withTimeout(5000) {
sender.cancelAndJoin()
receiver.cancelAndJoin()
}
}
}
}