-
Notifications
You must be signed in to change notification settings - Fork 1.9k
/
Copy pathBroadcastTest.kt
140 lines (131 loc) · 3.92 KB
/
BroadcastTest.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
@file:Suppress("NAMED_ARGUMENTS_NOT_ALLOWED")
package kotlinx.coroutines.channels
import kotlinx.coroutines.*
import kotlin.test.*
class BroadcastTest : TestBase() {
@Test
fun testBroadcastBasic() = runTest {
expect(1)
val b = broadcast {
expect(4)
send(1) // goes to receiver
expect(5)
send(2) // goes to buffer
expect(6)
send(3) // suspends, will not be consumes, but will not be cancelled either
expect(10)
}
yield() // has no effect, because default is lazy
expect(2)
b.consume {
expect(3)
assertEquals(1, receive()) // suspends
expect(7)
assertEquals(2, receive()) // suspends
expect(8)
}
expect(9)
yield() // to broadcast
finish(11)
}
/**
* See https://github.com/Kotlin/kotlinx.coroutines/issues/1713
*/
@Test
fun testChannelBroadcastLazyCancel() = runTest {
expect(1)
val a = produce {
expect(3)
assertFailsWith<CancellationException> { send("MSG") }
expect(5)
}
expect(2)
yield() // to produce
val b = a.broadcast()
b.cancel()
expect(4)
yield() // to abort produce
assertTrue(a.isClosedForReceive) // the source channel was consumed
finish(6)
}
@Test
fun testChannelBroadcastLazyClose() = runTest {
expect(1)
val a = produce {
expect(3)
send("MSG")
expect(5)
}
expect(2)
yield() // to produce
val b = a.broadcast()
b.close()
expect(4)
yield() // to abort produce
assertTrue(a.isClosedForReceive) // the source channel was consumed
finish(6)
}
@Test
fun testChannelBroadcastEagerCancel() = runTest {
expect(1)
val a = produce<Unit> {
expect(3)
yield() // back to main
expectUnreached() // will be cancelled
}
expect(2)
val b = a.broadcast(start = CoroutineStart.DEFAULT)
yield() // to produce
expect(4)
b.cancel()
yield() // to produce (cancelled)
assertTrue(a.isClosedForReceive) // the source channel was consumed
finish(5)
}
@Test
fun testChannelBroadcastEagerClose() = runTest {
expect(1)
val a = produce<Unit> {
expect(3)
yield() // back to main
// shall eventually get cancelled
assertFailsWith<CancellationException> {
while (true) { send(Unit) }
}
}
expect(2)
val b = a.broadcast(start = CoroutineStart.DEFAULT)
yield() // to produce
expect(4)
b.close()
yield() // to produce (closed)
assertTrue(a.isClosedForReceive) // the source channel was consumed
finish(5)
}
@Test
fun testBroadcastCloseWithException() = runTest {
expect(1)
val b = broadcast(NonCancellable, capacity = 1) {
expect(2)
send(1)
expect(3)
send(2) // suspends
expect(5)
// additional attempts to send fail
assertFailsWith<TestException> { send(3) }
}
val sub = b.openSubscription()
yield() // into broadcast
expect(4)
b.close(TestException()) // close broadcast channel with exception
assertTrue(b.isClosedForSend) // sub was also closed
assertEquals(1, sub.receive()) // 1st element received
assertEquals(2, sub.receive()) // 2nd element received
assertFailsWith<TestException> { sub.receive() } // then closed with exception
yield() // to cancel broadcast
finish(6)
}
}