-
Notifications
You must be signed in to change notification settings - Fork 1.9k
/
Copy pathCancellableContinuationResumeCloseStressTest.kt
68 lines (59 loc) · 1.98 KB
/
CancellableContinuationResumeCloseStressTest.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines
import kotlinx.atomicfu.*
import org.junit.*
import java.util.concurrent.*
import kotlin.test.*
import kotlin.test.Test
class CancellableContinuationResumeCloseStressTest : TestBase() {
private val dispatcher =
newFixedThreadPoolContext(2, "CancellableContinuationResumeCloseStressTest")
private val startBarrier = CyclicBarrier(3)
private val doneBarrier = CyclicBarrier(2)
private val nRepeats = 1_000 * stressTestMultiplier
private val closed = atomic(false)
private var returnedOk = false
@After
fun tearDown() {
dispatcher.close()
}
@Test
@Suppress("BlockingMethodInNonBlockingContext")
fun testStress() = runTest {
repeat(nRepeats) {
closed.value = false
returnedOk = false
val job = testJob()
startBarrier.await()
job.cancel() // (1) cancel job
job.join()
// check consistency
doneBarrier.await()
if (returnedOk) {
assertFalse(closed.value, "should not have closed resource -- returned Ok")
} else {
assertTrue(closed.value, "should have closed resource -- was cancelled")
}
}
}
private fun CoroutineScope.testJob(): Job = launch(dispatcher, start = CoroutineStart.ATOMIC) {
val ok = resumeClose() // might be cancelled
assertEquals("OK", ok)
returnedOk = true
}
private suspend fun resumeClose() = suspendCancellableCoroutine<String> { cont ->
dispatcher.executor.execute {
startBarrier.await() // (2) resume at the same time
cont.resume("OK") {
close()
}
doneBarrier.await()
}
startBarrier.await() // (3) return at the same time
}
fun close() {
assertFalse(closed.getAndSet(true))
}
}