Skip to content

Commit d5da2e0

Browse files
committed
Try to reproduce second resume (WIP)
1 parent 1b0eca9 commit d5da2e0

File tree

1 file changed

+39
-11
lines changed

1 file changed

+39
-11
lines changed

kotlinx-coroutines-core/jvm/test/channels/ChannelAtomicCancelStressTest.kt

+39-11
Original file line numberDiff line numberDiff line change
@@ -10,21 +10,49 @@ import org.junit.*
1010
import org.junit.Assert.*
1111
import org.junit.runner.*
1212
import org.junit.runners.*
13-
import kotlin.random.Random
1413
import java.util.concurrent.atomic.*
14+
import kotlin.random.*
15+
16+
private const val DIRECT = 1
17+
private const val SELECT = 2
1518

1619
/**
1720
* Tests cancel atomicity for channel send & receive operations, including their select versions.
1821
*/
1922
@RunWith(Parameterized::class)
20-
class ChannelAtomicCancelStressTest(private val kind: TestChannelKind) : TestBase() {
23+
class ChannelAtomicCancelStressTest(
24+
private val kind: TestChannelKind,
25+
private val mode: Mode
26+
) : TestBase() {
27+
28+
enum class Mode(val send: Int, val receive: Int) {
29+
SEND_RECEIVE(DIRECT, DIRECT),
30+
SELECT_SEND_RECEIVE(SELECT, DIRECT),
31+
SEND_SELECT_RECEIVE(DIRECT, SELECT),
32+
SELECT_SEND_SELECT_RECEIVE(SELECT, SELECT),
33+
RANDOM(DIRECT or SELECT, DIRECT or SELECT)
34+
}
35+
36+
private fun Random.nextOp(mask: Int): Int =
37+
when (mask) {
38+
DIRECT or SELECT -> nextInt(DIRECT..SELECT)
39+
else -> mask
40+
}
41+
2142
companion object {
22-
@Parameterized.Parameters(name = "{0}")
43+
@Parameterized.Parameters(name = "{0}, {1}")
2344
@JvmStatic
24-
fun params(): Collection<Array<Any>> = TestChannelKind.values().map { arrayOf<Any>(it) }
45+
fun params(): Collection<Array<Any>> =
46+
// todo: all channels, all modes only in stress test
47+
listOf(TestChannelKind.RENDEZVOUS).flatMap { kind ->
48+
Mode.values().map { mode ->
49+
arrayOf<Any>(kind, mode)
50+
}
51+
}
2552
}
2653

27-
private val TEST_DURATION = 1000L * stressTestMultiplier
54+
// todo: restore regular duration
55+
private val TEST_DURATION = 10_000L * stressTestMultiplier
2856

2957
private val dispatcher = newFixedThreadPoolContext(2, "ChannelAtomicCancelStressTest")
3058
private val scope = CoroutineScope(dispatcher)
@@ -104,9 +132,9 @@ class ChannelAtomicCancelStressTest(private val kind: TestChannelKind) : TestBas
104132
var counter = 0
105133
while (true) {
106134
val trySend = lastSent + 1
107-
when (Random.nextInt(2)) {
108-
0 -> channel.send(trySend)
109-
1 -> select { channel.onSend(trySend) {} }
135+
when (Random.nextOp(mode.send)) {
136+
DIRECT -> channel.send(trySend)
137+
SELECT -> select { channel.onSend(trySend) {} }
110138
else -> error("cannot happen")
111139
}
112140
lastSent = trySend // update on success
@@ -131,9 +159,9 @@ class ChannelAtomicCancelStressTest(private val kind: TestChannelKind) : TestBas
131159
receiver = scope.launch(start = CoroutineStart.ATOMIC) {
132160
cancellable(receiverDone) {
133161
while (true) {
134-
val received = when (Random.nextInt(2)) {
135-
0 -> channel.receive()
136-
1 -> select { channel.onReceive { it } }
162+
val received = when (Random.nextOp(mode.receive)) {
163+
DIRECT -> channel.receive()
164+
SELECT -> select { channel.onReceive { it } }
137165
else -> error("cannot happen")
138166
}
139167
val expected = lastReceived + 1

0 commit comments

Comments
 (0)