-
Notifications
You must be signed in to change notification settings - Fork 1.9k
/
Copy pathChannelUndeliveredElementTest.kt
141 lines (127 loc) · 4.6 KB
/
ChannelUndeliveredElementTest.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.channels
import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import kotlin.test.*
class ChannelUndeliveredElementTest : TestBase() {
@Test
fun testSendSuccessfully() = runTest {
runAllKindsTest { kind ->
val channel = kind.create<Resource> { it.cancel() }
val res = Resource("OK")
launch {
channel.send(res)
}
val ok = channel.receive()
assertEquals("OK", ok.value)
assertFalse(res.isCancelled) // was not cancelled
channel.close()
assertFalse(res.isCancelled) // still was not cancelled
}
}
@Test
fun testRendezvousSendCancelled() = runTest {
val channel = Channel<Resource> { it.cancel() }
val res = Resource("OK")
val sender = launch(start = CoroutineStart.UNDISPATCHED) {
assertFailsWith<CancellationException> {
channel.send(res) // suspends & get cancelled
}
}
sender.cancelAndJoin()
assertTrue(res.isCancelled)
}
@Test
fun testBufferedSendCancelled() = runTest {
val channel = Channel<Resource>(1) { it.cancel() }
val resA = Resource("A")
val resB = Resource("B")
val sender = launch(start = CoroutineStart.UNDISPATCHED) {
channel.send(resA) // goes to buffer
assertFailsWith<CancellationException> {
channel.send(resB) // suspends & get cancelled
}
}
sender.cancelAndJoin()
assertFalse(resA.isCancelled) // it is in buffer, not cancelled
assertTrue(resB.isCancelled) // send was cancelled
channel.cancel() // now cancel the channel
assertTrue(resA.isCancelled) // now cancelled in buffer
}
@Test
fun testUnlimitedChannelCancelled() = runTest {
val channel = Channel<Resource>(Channel.UNLIMITED) { it.cancel() }
val resA = Resource("A")
val resB = Resource("B")
channel.send(resA) // goes to buffer
channel.send(resB) // goes to buffer
assertFalse(resA.isCancelled) // it is in buffer, not cancelled
assertFalse(resB.isCancelled) // it is in buffer, not cancelled
channel.cancel() // now cancel the channel
assertTrue(resA.isCancelled) // now cancelled in buffer
assertTrue(resB.isCancelled) // now cancelled in buffer
}
@Test
fun testConflatedResourceCancelled() = runTest {
val channel = Channel<Resource>(Channel.CONFLATED) { it.cancel() }
val resA = Resource("A")
val resB = Resource("B")
channel.send(resA)
assertFalse(resA.isCancelled)
assertFalse(resB.isCancelled)
channel.send(resB)
assertTrue(resA.isCancelled) // it was conflated (lost) and thus cancelled
assertFalse(resB.isCancelled)
channel.close()
assertFalse(resB.isCancelled) // not cancelled yet, can be still read by receiver
channel.cancel()
assertTrue(resB.isCancelled) // now it is cancelled
}
@Test
fun testSendToClosedChannel() = runTest {
runAllKindsTest { kind ->
val channel = kind.create<Resource> { it.cancel() }
channel.close() // immediately close channel
val res = Resource("OK")
assertFailsWith<ClosedSendChannelException> {
channel.send(res) // send fails to closed channel, resource was not delivered
}
assertTrue(res.isCancelled)
}
}
private suspend fun runAllKindsTest(test: suspend CoroutineScope.(TestChannelKind) -> Unit) {
for (kind in TestChannelKind.values()) {
if (kind.viaBroadcast) continue // does not support onUndeliveredElement
try {
withContext(Job()) {
test(kind)
}
} catch(e: Throwable) {
error("$kind: $e", e)
}
}
}
private class Resource(val value: String) {
private val _cancelled = atomic(false)
val isCancelled: Boolean
get() = _cancelled.value
fun cancel() {
check(!_cancelled.getAndSet(true)) { "Already cancelled" }
}
}
@Test
fun testHandlerIsNotInvoked() = runTest { // #2826
val channel = Channel<Unit> {
expectUnreached()
}
expect(1)
launch {
expect(2)
channel.receive()
}
channel.send(Unit)
finish(3)
}
}