@@ -15,15 +15,17 @@ import java.util.concurrent.atomic.*
15
15
class BroadcastChannelMultiReceiveStressTest (
16
16
private val kind : TestBroadcastChannelKind
17
17
) : TestBase() {
18
+
19
+ // Stressed by lincheck
18
20
companion object {
19
21
@Parameterized.Parameters (name = " {0}" )
20
22
@JvmStatic
21
23
fun params (): Collection <Array <Any >> =
22
- TestBroadcastChannelKind .values() .map { arrayOf<Any >(it) }
24
+ TestBroadcastChannelKind .entries .map { arrayOf<Any >(it) }
23
25
}
24
26
25
27
private val nReceivers = if (isStressTest) 10 else 5
26
- private val nSeconds = 3 * stressTestMultiplier
28
+ private val nSeconds = 3 * stressTestMultiplierSqrt
27
29
28
30
private val broadcast = kind.create<Long >()
29
31
private val pool = newFixedThreadPoolContext(nReceivers + 1 , " BroadcastChannelMultiReceiveStressTest" )
@@ -62,13 +64,13 @@ class BroadcastChannelMultiReceiveStressTest(
62
64
println (" Launching $name " )
63
65
receivers + = launch(pool + CoroutineName (name)) {
64
66
val channel = broadcast.openSubscription()
65
- when (receiverIndex % 5 ) {
66
- 0 -> doReceive(channel, receiverIndex)
67
- 1 -> doReceiveCatching(channel, receiverIndex)
68
- 2 -> doIterator(channel, receiverIndex)
69
- 3 -> doReceiveSelect(channel, receiverIndex)
70
- 4 -> doReceiveCatchingSelect(channel, receiverIndex)
71
- }
67
+ when (receiverIndex % 5 ) {
68
+ 0 -> doReceive(channel, receiverIndex)
69
+ 1 -> doReceiveCatching(channel, receiverIndex)
70
+ 2 -> doIterator(channel, receiverIndex)
71
+ 3 -> doReceiveSelect(channel, receiverIndex)
72
+ 4 -> doReceiveCatchingSelect(channel, receiverIndex)
73
+ }
72
74
channel.cancel()
73
75
}
74
76
printProgress()
@@ -93,7 +95,7 @@ class BroadcastChannelMultiReceiveStressTest(
93
95
} catch (e: Exception ) {
94
96
println (" Failed: $e " )
95
97
pool.dumpThreads(" Threads in pool" )
96
- receivers.indices.forEach { index ->
98
+ receivers.indices.forEach { index ->
97
99
println (" lastReceived[$index ] = ${lastReceived[index].get()} " )
98
100
}
99
101
throw e
@@ -116,8 +118,9 @@ class BroadcastChannelMultiReceiveStressTest(
116
118
try {
117
119
val stop = doReceived(receiverIndex, channel.receive())
118
120
if (stop) break
121
+ } catch (ex: ClosedReceiveChannelException ) {
122
+ break
119
123
}
120
- catch (ex: ClosedReceiveChannelException ) { break }
121
124
}
122
125
}
123
126
@@ -141,7 +144,9 @@ class BroadcastChannelMultiReceiveStressTest(
141
144
val event = select<Long > { channel.onReceive { it } }
142
145
val stop = doReceived(receiverIndex, event)
143
146
if (stop) break
144
- } catch (ex: ClosedReceiveChannelException ) { break }
147
+ } catch (ex: ClosedReceiveChannelException ) {
148
+ break
149
+ }
145
150
}
146
151
}
147
152
@@ -152,4 +157,10 @@ class BroadcastChannelMultiReceiveStressTest(
152
157
if (stop) break
153
158
}
154
159
}
160
+
161
+ @Suppress(" UNUSED_PARAMETER" )
162
+ private fun println (debugMessage : String ) {
163
+ // Uncomment for local debugging
164
+ // println(debugMessage as Any?)
165
+ }
155
166
}
0 commit comments