@@ -155,10 +155,9 @@ class ChannelUndeliveredElementSelectOldStressTest(private val kind: TestChannel
155
155
var counter = 0
156
156
while (true ) {
157
157
val trySendData = Data (sentCnt++ )
158
- val sendMode = Random .nextInt(2 ) + 1
159
- sentStatus[trySendData.x] = sendMode
158
+ sentStatus[trySendData.x] = 1
160
159
selectOld<Unit > { channel.onSend(trySendData) {} }
161
- sentStatus[trySendData.x] = sendMode + 2
160
+ sentStatus[trySendData.x] = 3
162
161
when {
163
162
// must artificially slow down LINKED_LIST sender to avoid overwhelming receiver and going OOM
164
163
kind == TestChannelKind .LINKED_LIST -> while (sentCnt > lastReceived + 100 ) yield ()
@@ -172,7 +171,7 @@ class ChannelUndeliveredElementSelectOldStressTest(private val kind: TestChannel
172
171
173
172
private suspend fun stopSender () {
174
173
stoppedSender++
175
- sender.cancel ()
174
+ sender.cancelAndJoin ()
176
175
senderDone.receive()
177
176
}
178
177
@@ -207,25 +206,22 @@ class ChannelUndeliveredElementSelectOldStressTest(private val kind: TestChannel
207
206
}
208
207
209
208
private inner class Data (val x : Long ) {
210
- private val failedToDeliverOrReceived = atomic(false )
211
- private var firstFailedToDeliverOrReceivedCallTrace: Exception ? = null
209
+ private val firstFailedToDeliverOrReceivedCallTrace = atomic<Exception ?>(null )
212
210
213
211
fun failedToDeliver () {
214
- if (failedToDeliverOrReceived.compareAndSet(false , true )) {
215
- firstFailedToDeliverOrReceivedCallTrace = Exception (" First onUndeliveredElement() call" )
216
- } else {
217
- throw IllegalStateException (" onUndeliveredElement()/onReceived() notified twice" , firstFailedToDeliverOrReceivedCallTrace!! )
212
+ val trace = Exception (" First onUndeliveredElement() call" )
213
+ if (firstFailedToDeliverOrReceivedCallTrace.compareAndSet(null , trace)) {
214
+ failedToDeliverCnt.incrementAndGet()
215
+ failedStatus[x] = 1
216
+ return
218
217
}
219
- failedToDeliverCnt.incrementAndGet()
220
- failedStatus[x] = 1
218
+ throw IllegalStateException (" onUndeliveredElement()/onReceived() notified twice" , firstFailedToDeliverOrReceivedCallTrace.value!! )
221
219
}
222
220
223
221
fun onReceived () {
224
- if (failedToDeliverOrReceived.compareAndSet(false , true )) {
225
- firstFailedToDeliverOrReceivedCallTrace = Exception (" First onReceived() call" )
226
- } else {
227
- throw IllegalStateException (" onUndeliveredElement()/onReceived() notified twice" , firstFailedToDeliverOrReceivedCallTrace!! )
228
- }
222
+ val trace = Exception (" First onReceived() call" )
223
+ if (firstFailedToDeliverOrReceivedCallTrace.compareAndSet(null , trace)) return
224
+ throw IllegalStateException (" onUndeliveredElement()/onReceived() notified twice" , firstFailedToDeliverOrReceivedCallTrace.value!! )
229
225
}
230
226
}
231
227
0 commit comments