@@ -160,14 +160,24 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
160
160
val result = offerInternal(element)
161
161
return when {
162
162
result == = OFFER_SUCCESS -> true
163
- // We should check for closed token on offer as well, otherwise offer won't be linearizable
164
- // in the face of concurrent close()
165
- result == = OFFER_FAILED -> throw closedForSend?.sendException?.let { recoverStackTrace(it) } ? : return false
166
- result is Closed <* > -> throw recoverStackTrace(result.sendException)
163
+ result == = OFFER_FAILED -> {
164
+ // We should check for closed token on offer as well, otherwise offer won't be linearizable
165
+ // in the face of concurrent close()
166
+ // See https://github.com/Kotlin/kotlinx.coroutines/issues/359
167
+ helpCloseAndThrowSendException(closedForSend ? : return false )
168
+ }
169
+ result is Closed <* > -> helpCloseAndThrowSendException(result)
167
170
else -> error(" offerInternal returned $result " )
168
171
}
169
172
}
170
173
174
+ private fun helpCloseAndThrowSendException (closed : Closed <* >): Nothing {
175
+ // To ensure linearizablity we must ALWAYS help close the channel when we observe that it was closed
176
+ // See https://github.com/Kotlin/kotlinx.coroutines/issues/1419
177
+ helpClose(closed)
178
+ throw recoverStackTrace(closed.sendException)
179
+ }
180
+
171
181
private suspend fun sendSuspend (element : E ): Unit = suspendAtomicCancellableCoroutine sc@ { cont ->
172
182
loop@ while (true ) {
173
183
if (full) {
@@ -179,8 +189,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
179
189
return @sc
180
190
}
181
191
enqueueResult is Closed <* > -> {
182
- helpClose(enqueueResult)
183
- cont.resumeWithException(enqueueResult.sendException)
192
+ cont.helpCloseAndResumeWithSendException(enqueueResult)
184
193
return @sc
185
194
}
186
195
enqueueResult == = ENQUEUE_FAILED -> {} // try to offer instead
@@ -197,15 +206,19 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
197
206
}
198
207
offerResult == = OFFER_FAILED -> continue @loop
199
208
offerResult is Closed <* > -> {
200
- helpClose(offerResult)
201
- cont.resumeWithException(offerResult.sendException)
209
+ cont.helpCloseAndResumeWithSendException(offerResult)
202
210
return @sc
203
211
}
204
212
else -> error(" offerInternal returned $offerResult " )
205
213
}
206
214
}
207
215
}
208
216
217
+ private fun Continuation <* >.helpCloseAndResumeWithSendException (closed : Closed <* >) {
218
+ helpClose(closed)
219
+ resumeWithException(closed.sendException)
220
+ }
221
+
209
222
/* *
210
223
* Result is:
211
224
* * null -- successfully enqueued
@@ -230,23 +243,17 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
230
243
231
244
public override fun close (cause : Throwable ? ): Boolean {
232
245
val closed = Closed <E >(cause)
233
-
234
246
/*
235
247
* Try to commit close by adding a close token to the end of the queue.
236
248
* Successful -> we're now responsible for closing receivers
237
249
* Not successful -> help closing pending receivers to maintain invariant
238
250
* "if (!close()) next send will throw"
239
251
*/
240
- val closeAdded = queue.addLastIfPrev(closed, { it !is Closed <* > })
241
- if (! closeAdded) {
242
- val actualClosed = queue.prevNode as Closed <* >
243
- helpClose(actualClosed)
244
- return false
245
- }
246
-
247
- helpClose(closed)
248
- invokeOnCloseHandler(cause)
249
- return true
252
+ val closeAdded = queue.addLastIfPrev(closed) { it !is Closed <* > }
253
+ val actuallyClosed = if (closeAdded) closed else queue.prevNode as Closed <* >
254
+ helpClose(actuallyClosed)
255
+ if (closeAdded) invokeOnCloseHandler(cause)
256
+ return closeAdded // true if we have closed
250
257
}
251
258
252
259
private fun invokeOnCloseHandler (cause : Throwable ? ) {
@@ -370,10 +377,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
370
377
select.disposeOnSelect(node)
371
378
return
372
379
}
373
- enqueueResult is Closed <* > -> {
374
- helpClose(enqueueResult)
375
- throw recoverStackTrace(enqueueResult.sendException)
376
- }
380
+ enqueueResult is Closed <* > -> helpCloseAndThrowSendException(enqueueResult)
377
381
enqueueResult == = ENQUEUE_FAILED -> {} // try to offer
378
382
enqueueResult is Receive <* > -> {} // try to offer
379
383
else -> error(" enqueueSend returned $enqueueResult " )
@@ -388,10 +392,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
388
392
block.startCoroutineUnintercepted(receiver = this , completion = select.completion)
389
393
return
390
394
}
391
- offerResult is Closed <* > -> {
392
- helpClose(offerResult)
393
- throw recoverStackTrace(offerResult.sendException)
394
- }
395
+ offerResult is Closed <* > -> helpCloseAndThrowSendException(offerResult)
395
396
else -> error(" offerSelectInternal returned $offerResult " )
396
397
}
397
398
}
@@ -432,7 +433,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
432
433
433
434
private class SendSelect <E , R >(
434
435
override val pollResult : Any? ,
435
- @JvmField val channel : SendChannel <E >,
436
+ @JvmField val channel : AbstractSendChannel <E >,
436
437
@JvmField val select : SelectInstance <R >,
437
438
@JvmField val block : suspend (SendChannel <E >) -> R
438
439
) : Send(), DisposableHandle {
0 commit comments