@@ -382,11 +382,15 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
382
382
select.disposeOnSelect(node)
383
383
return
384
384
}
385
- enqueueResult is Closed <* > -> throw recoverStackTrace(helpCloseAndGetSendException(enqueueResult))
385
+ enqueueResult is Closed <* > -> {
386
+ node.block.shareableDispose(useIt = true )
387
+ throw recoverStackTrace(helpCloseAndGetSendException(enqueueResult))
388
+ }
386
389
enqueueResult == = ENQUEUE_FAILED -> {} // try to offer
387
390
enqueueResult is Receive <* > -> {} // try to offer
388
391
else -> error(" enqueueSend returned $enqueueResult " )
389
392
}
393
+ node.block.shareableDispose(useIt = true )
390
394
}
391
395
// hm... receiver is waiting or buffer is not full. try to offer
392
396
val offerResult = offerSelectInternal(element, select)
@@ -448,16 +452,19 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
448
452
override val pollResult : Any? ,
449
453
@JvmField val channel : AbstractSendChannel <E >,
450
454
@JvmField val select : SelectInstance <R >,
451
- @JvmField val block : suspend (SendChannel <E >) -> R
455
+ block : suspend (SendChannel <E >) -> R
452
456
) : Send(), DisposableHandle {
457
+ @JvmField val block: suspend (SendChannel <E >) -> R = block.asShareable()
458
+
453
459
override fun tryResumeSend (otherOp : PrepareOp ? ): Symbol ? =
454
- select.trySelectOther(otherOp) as Symbol ? // must return symbol
460
+ select.trySelectOther(otherOp, onSelect = block::shareableWillBeUsed ) as Symbol ? // must return symbol
455
461
456
462
override fun completeResumeSend () {
457
- block. startCoroutine(receiver = channel, completion = select.completion)
463
+ startCoroutine(CoroutineStart . ATOMIC , channel, select.completion, block )
458
464
}
459
465
460
466
override fun dispose () { // invoked on select completion
467
+ block.shareableDispose(useIt = false )
461
468
remove()
462
469
}
463
470
@@ -773,7 +780,11 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
773
780
): Boolean {
774
781
val node = ReceiveSelect (this , select, block, receiveMode)
775
782
val result = enqueueReceive(node)
776
- if (result) select.disposeOnSelect(node)
783
+ if (result) {
784
+ select.disposeOnSelect(node)
785
+ } else {
786
+ node.block.shareableDispose(useIt = true )
787
+ }
777
788
return result
778
789
}
779
790
@@ -871,41 +882,47 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
871
882
}
872
883
873
884
private class ReceiveElement <in E >(
874
- @JvmField val cont : CancellableContinuation <Any ?>,
885
+ cont : CancellableContinuation <Any ?>,
875
886
@JvmField val receiveMode : Int
876
887
) : Receive<E>() {
888
+ private val _cont = atomic<CancellableContinuation <Any ?>? > (cont)
889
+
877
890
fun resumeValue (value : E ): Any? = when (receiveMode) {
878
891
RECEIVE_RESULT -> ValueOrClosed .value(value)
879
892
else -> value
880
893
}
881
894
882
895
@Suppress(" IMPLICIT_CAST_TO_ANY" )
883
896
override fun tryResumeReceive (value : E , otherOp : PrepareOp ? ): Symbol ? {
884
- val token = cont .tryResume(resumeValue(value), otherOp?.desc) ? : return null
897
+ val token = _cont .value? .tryResume(resumeValue(value), otherOp?.desc) ? : return null
885
898
assert { token == = RESUME_TOKEN } // the only other possible result
886
899
// We can call finishPrepare only after successful tryResume, so that only good affected node is saved
887
900
otherOp?.finishPrepare()
888
901
return RESUME_TOKEN
889
902
}
890
903
891
- override fun completeResumeReceive (value : E ) = cont. completeResume(RESUME_TOKEN )
904
+ override fun completeResumeReceive (value : E ) { _cont .getAndSet( null ) !! . completeResume(RESUME_TOKEN ) }
892
905
893
906
override fun resumeReceiveClosed (closed : Closed <* >) {
907
+ val cont = _cont .getAndSet(null )!!
894
908
when {
895
909
receiveMode == RECEIVE_NULL_ON_CLOSE && closed.closeCause == null -> cont.resume(null )
896
910
receiveMode == RECEIVE_RESULT -> cont.resume(closed.toResult<Any >())
897
911
else -> cont.resumeWithException(closed.receiveException)
898
912
}
899
913
}
914
+
900
915
override fun toString (): String = " ReceiveElement@$hexAddress [receiveMode=$receiveMode ]"
901
916
}
902
917
903
918
private class ReceiveHasNext <E >(
904
919
@JvmField val iterator : Itr <E >,
905
- @JvmField val cont : CancellableContinuation <Boolean >
920
+ cont : CancellableContinuation <Boolean >
906
921
) : Receive<E>() {
922
+ private val _cont = atomic<CancellableContinuation <Boolean >? > (cont)
923
+
907
924
override fun tryResumeReceive (value : E , otherOp : PrepareOp ? ): Symbol ? {
908
- val token = cont .tryResume(true , otherOp?.desc) ? : return null
925
+ val token = _cont .value? .tryResume(true , otherOp?.desc) ? : return null
909
926
assert { token == = RESUME_TOKEN } // the only other possible result
910
927
// We can call finishPrepare only after successful tryResume, so that only good affected node is saved
911
928
otherOp?.finishPrepare()
@@ -918,10 +935,11 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
918
935
but completeResumeReceive is called once so we set iterator result here.
919
936
*/
920
937
iterator.result = value
921
- cont .completeResume(RESUME_TOKEN )
938
+ _cont .getAndSet( null ) !! .completeResume(RESUME_TOKEN )
922
939
}
923
940
924
941
override fun resumeReceiveClosed (closed : Closed <* >) {
942
+ val cont = _cont .getAndSet(null )!!
925
943
val token = if (closed.closeCause == null ) {
926
944
cont.tryResume(false )
927
945
} else {
@@ -932,37 +950,45 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
932
950
cont.completeResume(token)
933
951
}
934
952
}
953
+
935
954
override fun toString (): String = " ReceiveHasNext@$hexAddress "
936
955
}
937
956
938
957
private class ReceiveSelect <R , E >(
939
958
@JvmField val channel : AbstractChannel <E >,
940
959
@JvmField val select : SelectInstance <R >,
941
- @JvmField val block : suspend (Any? ) -> R ,
960
+ block : suspend (Any? ) -> R ,
942
961
@JvmField val receiveMode : Int
943
962
) : Receive<E>(), DisposableHandle {
963
+ @JvmField val block: suspend (Any? ) -> R = block.asShareable() // captured variables in this block need screening
964
+
944
965
override fun tryResumeReceive (value : E , otherOp : PrepareOp ? ): Symbol ? =
945
- select.trySelectOther(otherOp) as Symbol ?
966
+ select.trySelectOther(otherOp, onSelect = block::shareableWillBeUsed ) as Symbol ?
946
967
947
968
@Suppress(" UNCHECKED_CAST" )
948
969
override fun completeResumeReceive (value : E ) {
949
- block. startCoroutine(if (receiveMode == RECEIVE_RESULT ) ValueOrClosed .value(value) else value, select.completion)
970
+ startCoroutine(CoroutineStart . ATOMIC , if (receiveMode == RECEIVE_RESULT ) ValueOrClosed .value(value) else value, select.completion, block )
950
971
}
951
972
952
973
override fun resumeReceiveClosed (closed : Closed <* >) {
953
- if (! select.trySelect()) return
974
+ if (! select.trySelect(onSelect = block::shareableWillBeUsed )) return
954
975
when (receiveMode) {
955
- RECEIVE_THROWS_ON_CLOSE -> select.resumeSelectWithException(closed.receiveException)
956
- RECEIVE_RESULT -> block.startCoroutine(ValueOrClosed .closed<R >(closed.closeCause), select.completion)
976
+ RECEIVE_THROWS_ON_CLOSE -> {
977
+ block.shareableDispose(useIt = true )
978
+ select.resumeSelectWithException(closed.receiveException)
979
+ }
980
+ RECEIVE_RESULT -> startCoroutine(CoroutineStart .ATOMIC , ValueOrClosed .closed<R >(closed.closeCause), select.completion, block)
957
981
RECEIVE_NULL_ON_CLOSE -> if (closed.closeCause == null ) {
958
- block. startCoroutine(null , select.completion)
982
+ startCoroutine(CoroutineStart . ATOMIC , null , select.completion, block )
959
983
} else {
984
+ block.shareableDispose(useIt = true )
960
985
select.resumeSelectWithException(closed.receiveException)
961
986
}
962
987
}
963
988
}
964
989
965
990
override fun dispose () { // invoked on select completion
991
+ block.shareableDispose(useIt = false )
966
992
if (remove())
967
993
channel.onReceiveDequeued() // notify cancellation of receive
968
994
}
@@ -1031,17 +1057,19 @@ internal interface ReceiveOrClosed<in E> {
1031
1057
@Suppress(" UNCHECKED_CAST" )
1032
1058
internal class SendElement (
1033
1059
override val pollResult : Any? ,
1034
- @JvmField val cont : CancellableContinuation <Unit >
1060
+ cont : CancellableContinuation <Unit >
1035
1061
) : Send() {
1062
+ private val _cont = atomic<CancellableContinuation <Unit >? > (cont)
1063
+
1036
1064
override fun tryResumeSend (otherOp : PrepareOp ? ): Symbol ? {
1037
- val token = cont .tryResume(Unit , otherOp?.desc) ? : return null
1065
+ val token = _cont .value? .tryResume(Unit , otherOp?.desc) ? : return null
1038
1066
assert { token == = RESUME_TOKEN } // the only other possible result
1039
1067
// We can call finishPrepare only after successful tryResume, so that only good affected node is saved
1040
1068
otherOp?.finishPrepare() // finish preparations
1041
1069
return RESUME_TOKEN
1042
1070
}
1043
- override fun completeResumeSend () = cont. completeResume(RESUME_TOKEN )
1044
- override fun resumeSendClosed (closed : Closed <* >) = cont. resumeWithException(closed.sendException)
1071
+ override fun completeResumeSend () { _cont .getAndSet( null ) !! . completeResume(RESUME_TOKEN ) }
1072
+ override fun resumeSendClosed (closed : Closed <* >) { _cont .getAndSet( null ) !! . resumeWithException(closed.sendException) }
1045
1073
override fun toString (): String = " SendElement@$hexAddress ($pollResult )"
1046
1074
}
1047
1075
0 commit comments