@@ -108,7 +108,6 @@ internal open class BufferedChannel<E>(
108
108
// ## The send operations ##
109
109
// #########################
110
110
111
- // TODO onClosed misses stacktrace recovery mechanism even though it shouldn't
112
111
override suspend fun send (element : E ): Unit =
113
112
sendImpl( // <-- this is an inline function
114
113
element = element,
@@ -123,20 +122,26 @@ internal open class BufferedChannel<E>(
123
122
// According to the `send(e)` contract, we need to call
124
123
// `onUndeliveredElement(..)` handler and throw an exception
125
124
// if the channel is already closed.
126
- onClosed = { onClosedSend(element) },
125
+ onClosed = {
126
+ // Use continuation in order to recover stacktrace
127
+ suspendCancellableCoroutine<Unit > {
128
+ onClosedSend(element, it)
129
+ }
130
+ },
127
131
// When `send(e)` decides to suspend, the corresponding
128
132
// `onNoWaiterSuspend` function that creates a continuation
129
133
// is called. The tail-call optimization is applied here.
130
134
onNoWaiterSuspend = { segm, i, elem, s -> sendOnNoWaiterSuspend(segm, i, elem, s) }
131
135
)
132
136
133
- private fun onClosedSend (element : E ) {
137
+ private fun onClosedSend (element : E , continuation : Continuation < Nothing > ) {
134
138
onUndeliveredElement?.callUndeliveredElementCatchingException(element)?.let {
135
139
// If it crashes, add send exception as suppressed for better diagnostics
136
140
it.addSuppressed(sendException)
137
- throw recoverStackTrace(it)
141
+ continuation.resumeWithStackTrace(it)
142
+ return
138
143
}
139
- throw recoverStackTrace (sendException)
144
+ continuation.resumeWithStackTrace (sendException)
140
145
}
141
146
142
147
private suspend fun sendOnNoWaiterSuspend (
@@ -188,6 +193,7 @@ internal open class BufferedChannel<E>(
188
193
override fun dispose () {
189
194
segment.onCancellation(index)
190
195
}
196
+
191
197
override fun invoke (cause : Throwable ? ) = dispose()
192
198
}
193
199
@@ -199,6 +205,7 @@ internal open class BufferedChannel<E>(
199
205
override fun dispose () {
200
206
segment.onSenderCancellationWithOnUndeliveredElement(index, context)
201
207
}
208
+
202
209
override fun invoke (cause : Throwable ? ) = dispose()
203
210
}
204
211
@@ -741,7 +748,7 @@ internal open class BufferedChannel<E>(
741
748
invokeOnCancellation(SenderOrReceiverCancellationHandler (segment, index).asHandler)
742
749
}
743
750
744
- private fun onClosedReceiveOnNoWaiterSuspend (cont : CancellableContinuation <E >) {
751
+ private fun onClosedReceiveOnNoWaiterSuspend (cont : CancellableContinuation <E >) {
745
752
cont.resumeWithException(receiveException)
746
753
}
747
754
@@ -2040,7 +2047,7 @@ internal open class BufferedChannel<E>(
2040
2047
val globalIndex = segment.id * SEGMENT_SIZE + index
2041
2048
if (globalIndex < receiversCounter) return - 1
2042
2049
// Process the cell `segment[index]`.
2043
- cell_update@while (true ) {
2050
+ cell_update@ while (true ) {
2044
2051
val state = segment.getState(index)
2045
2052
when {
2046
2053
// The cell is empty.
@@ -2644,7 +2651,7 @@ internal open class BufferedChannel<E>(
2644
2651
append_elements@ while (true ) {
2645
2652
process_cell@ for (i in 0 until SEGMENT_SIZE ) {
2646
2653
val globalCellIndex = segment.id * SEGMENT_SIZE + i
2647
- if (globalCellIndex >= s && globalCellIndex >= r) break @append_elements
2654
+ if (globalCellIndex >= s && globalCellIndex >= r) break @append_elements
2648
2655
val cellState = segment.getState(i)
2649
2656
val element = segment.getElement(i)
2650
2657
val cellStateString = when (cellState) {
0 commit comments