@@ -18,15 +18,17 @@ import java.util.concurrent.atomic.*
18
18
class BroadcastChannelMultiReceiveStressTest (
19
19
private val kind : TestBroadcastChannelKind
20
20
) : TestBase() {
21
+
22
+ // Stressed by lincheck
21
23
companion object {
22
24
@Parameterized.Parameters (name = " {0}" )
23
25
@JvmStatic
24
26
fun params (): Collection <Array <Any >> =
25
- TestBroadcastChannelKind .values() .map { arrayOf<Any >(it) }
27
+ TestBroadcastChannelKind .entries .map { arrayOf<Any >(it) }
26
28
}
27
29
28
30
private val nReceivers = if (isStressTest) 10 else 5
29
- private val nSeconds = 3 * stressTestMultiplier
31
+ private val nSeconds = 3 * stressTestMultiplierSqrt
30
32
31
33
private val broadcast = kind.create<Long >()
32
34
private val pool = newFixedThreadPoolContext(nReceivers + 1 , " BroadcastChannelMultiReceiveStressTest" )
@@ -65,13 +67,13 @@ class BroadcastChannelMultiReceiveStressTest(
65
67
println (" Launching $name " )
66
68
receivers + = launch(pool + CoroutineName (name)) {
67
69
val channel = broadcast.openSubscription()
68
- when (receiverIndex % 5 ) {
69
- 0 -> doReceive(channel, receiverIndex)
70
- 1 -> doReceiveCatching(channel, receiverIndex)
71
- 2 -> doIterator(channel, receiverIndex)
72
- 3 -> doReceiveSelect(channel, receiverIndex)
73
- 4 -> doReceiveCatchingSelect(channel, receiverIndex)
74
- }
70
+ when (receiverIndex % 5 ) {
71
+ 0 -> doReceive(channel, receiverIndex)
72
+ 1 -> doReceiveCatching(channel, receiverIndex)
73
+ 2 -> doIterator(channel, receiverIndex)
74
+ 3 -> doReceiveSelect(channel, receiverIndex)
75
+ 4 -> doReceiveCatchingSelect(channel, receiverIndex)
76
+ }
75
77
channel.cancel()
76
78
}
77
79
printProgress()
@@ -96,7 +98,7 @@ class BroadcastChannelMultiReceiveStressTest(
96
98
} catch (e: Exception ) {
97
99
println (" Failed: $e " )
98
100
pool.dumpThreads(" Threads in pool" )
99
- receivers.indices.forEach { index ->
101
+ receivers.indices.forEach { index ->
100
102
println (" lastReceived[$index ] = ${lastReceived[index].get()} " )
101
103
}
102
104
throw e
@@ -119,8 +121,9 @@ class BroadcastChannelMultiReceiveStressTest(
119
121
try {
120
122
val stop = doReceived(receiverIndex, channel.receive())
121
123
if (stop) break
124
+ } catch (ex: ClosedReceiveChannelException ) {
125
+ break
122
126
}
123
- catch (ex: ClosedReceiveChannelException ) { break }
124
127
}
125
128
}
126
129
@@ -144,7 +147,9 @@ class BroadcastChannelMultiReceiveStressTest(
144
147
val event = select<Long > { channel.onReceive { it } }
145
148
val stop = doReceived(receiverIndex, event)
146
149
if (stop) break
147
- } catch (ex: ClosedReceiveChannelException ) { break }
150
+ } catch (ex: ClosedReceiveChannelException ) {
151
+ break
152
+ }
148
153
}
149
154
}
150
155
@@ -155,4 +160,10 @@ class BroadcastChannelMultiReceiveStressTest(
155
160
if (stop) break
156
161
}
157
162
}
163
+
164
+ @Suppress(" UNUSED_PARAMETER" )
165
+ private fun println (debugMessage : String ) {
166
+ // Uncomment for local debugging
167
+ // kotlin.io.println(debugMessage)
168
+ }
158
169
}
0 commit comments