Skip to content

Commit 33e4ef0

Browse files
elizarovqwwdfsad
authored andcommitted
~ Rollback manual dispose of SharedBlock
1 parent 43a00dc commit 33e4ef0

File tree

7 files changed

+16
-82
lines changed

7 files changed

+16
-82
lines changed

kotlinx-coroutines-core/api/kotlinx-coroutines-core.api

+4-8
Original file line numberDiff line numberDiff line change
@@ -1139,8 +1139,8 @@ public final class kotlinx/coroutines/selects/SelectBuilderImpl : kotlinx/corout
11391139
public fun resumeSelectWithException (Ljava/lang/Throwable;)V
11401140
public fun resumeWith (Ljava/lang/Object;)V
11411141
public fun toString ()Ljava/lang/String;
1142-
public fun trySelect (Lkotlin/jvm/functions/Function0;)Z
1143-
public fun trySelectOther (Lkotlinx/coroutines/internal/LockFreeLinkedListNode$PrepareOp;Lkotlin/jvm/functions/Function0;)Ljava/lang/Object;
1142+
public fun trySelect ()Z
1143+
public fun trySelectOther (Lkotlinx/coroutines/internal/LockFreeLinkedListNode$PrepareOp;)Ljava/lang/Object;
11441144
}
11451145

11461146
public abstract interface class kotlinx/coroutines/selects/SelectClause0 {
@@ -1161,12 +1161,8 @@ public abstract interface class kotlinx/coroutines/selects/SelectInstance {
11611161
public abstract fun isSelected ()Z
11621162
public abstract fun performAtomicTrySelect (Lkotlinx/coroutines/internal/AtomicDesc;)Ljava/lang/Object;
11631163
public abstract fun resumeSelectWithException (Ljava/lang/Throwable;)V
1164-
public abstract fun trySelect (Lkotlin/jvm/functions/Function0;)Z
1165-
public abstract fun trySelectOther (Lkotlinx/coroutines/internal/LockFreeLinkedListNode$PrepareOp;Lkotlin/jvm/functions/Function0;)Ljava/lang/Object;
1166-
}
1167-
1168-
public final class kotlinx/coroutines/selects/SelectInstance$DefaultImpls {
1169-
public static synthetic fun trySelect$default (Lkotlinx/coroutines/selects/SelectInstance;Lkotlin/jvm/functions/Function0;ILjava/lang/Object;)Z
1164+
public abstract fun trySelect ()Z
1165+
public abstract fun trySelectOther (Lkotlinx/coroutines/internal/LockFreeLinkedListNode$PrepareOp;)Ljava/lang/Object;
11701166
}
11711167

11721168
public final class kotlinx/coroutines/selects/SelectKt {

kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt

+6-20
Original file line numberDiff line numberDiff line change
@@ -382,15 +382,11 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
382382
select.disposeOnSelect(node)
383383
return
384384
}
385-
enqueueResult is Closed<*> -> {
386-
node.block.shareableDispose(useIt = true)
387-
throw recoverStackTrace(helpCloseAndGetSendException(enqueueResult))
388-
}
385+
enqueueResult is Closed<*> -> throw recoverStackTrace(helpCloseAndGetSendException(enqueueResult))
389386
enqueueResult === ENQUEUE_FAILED -> {} // try to offer
390387
enqueueResult is Receive<*> -> {} // try to offer
391388
else -> error("enqueueSend returned $enqueueResult ")
392389
}
393-
node.block.shareableDispose(useIt = true)
394390
}
395391
// hm... receiver is waiting or buffer is not full. try to offer
396392
val offerResult = offerSelectInternal(element, select)
@@ -457,14 +453,13 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
457453
@JvmField val block: suspend (SendChannel<E>) -> R = block.asShareable()
458454

459455
override fun tryResumeSend(otherOp: PrepareOp?): Symbol? =
460-
select.trySelectOther(otherOp, onSelect = block::shareableWillBeUsed) as Symbol? // must return symbol
456+
select.trySelectOther(otherOp) as Symbol? // must return symbol
461457

462458
override fun completeResumeSend() {
463459
startCoroutine(CoroutineStart.ATOMIC, channel, select.completion, block)
464460
}
465461

466462
override fun dispose() { // invoked on select completion
467-
block.shareableDispose(useIt = false)
468463
remove()
469464
}
470465

@@ -780,11 +775,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
780775
): Boolean {
781776
val node = ReceiveSelect(this, select, block, receiveMode)
782777
val result = enqueueReceive(node)
783-
if (result) {
784-
select.disposeOnSelect(node)
785-
} else {
786-
node.block.shareableDispose(useIt = true)
787-
}
778+
if (result) select.disposeOnSelect(node)
788779
return result
789780
}
790781

@@ -963,32 +954,27 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
963954
@JvmField val block: suspend (Any?) -> R = block.asShareable() // captured variables in this block need screening
964955

965956
override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol? =
966-
select.trySelectOther(otherOp, onSelect = block::shareableWillBeUsed) as Symbol?
957+
select.trySelectOther(otherOp) as Symbol?
967958

968959
@Suppress("UNCHECKED_CAST")
969960
override fun completeResumeReceive(value: E) {
970961
startCoroutine(CoroutineStart.ATOMIC, if (receiveMode == RECEIVE_RESULT) ValueOrClosed.value(value) else value, select.completion, block)
971962
}
972963

973964
override fun resumeReceiveClosed(closed: Closed<*>) {
974-
if (!select.trySelect(onSelect = block::shareableWillBeUsed)) return
965+
if (!select.trySelect()) return
975966
when (receiveMode) {
976-
RECEIVE_THROWS_ON_CLOSE -> {
977-
block.shareableDispose(useIt = true)
978-
select.resumeSelectWithException(closed.receiveException)
979-
}
967+
RECEIVE_THROWS_ON_CLOSE -> select.resumeSelectWithException(closed.receiveException)
980968
RECEIVE_RESULT -> startCoroutine(CoroutineStart.ATOMIC, ValueOrClosed.closed<R>(closed.closeCause), select.completion, block)
981969
RECEIVE_NULL_ON_CLOSE -> if (closed.closeCause == null) {
982970
startCoroutine(CoroutineStart.ATOMIC, null, select.completion, block)
983971
} else {
984-
block.shareableDispose(useIt = true)
985972
select.resumeSelectWithException(closed.receiveException)
986973
}
987974
}
988975
}
989976

990977
override fun dispose() { // invoked on select completion
991-
block.shareableDispose(useIt = false)
992978
if (remove())
993979
channel.onReceiveDequeued() // notify cancellation of receive
994980
}

kotlinx-coroutines-core/common/src/internal/Sharing.common.kt

-2
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@ internal expect fun disposeContinuation(cont: () -> Continuation<*>)
2323
internal expect fun <T> CancellableContinuationImpl<T>.shareableResume(delegate: Continuation<T>, useMode: Int)
2424

2525
internal expect fun <T, R> (suspend (T) -> R).asShareable(): suspend (T) -> R
26-
internal expect fun <T, R> (suspend (T) -> R).shareableDispose(useIt: Boolean)
27-
internal expect fun <T, R> (suspend (T) -> R).shareableWillBeUsed()
2826

2927
internal expect fun isReuseSupportedInPlatform(): Boolean
3028
internal expect fun <T> ArrayList<T>.addOrUpdate(element: T, update: (ArrayList<T>) -> Unit)

kotlinx-coroutines-core/common/src/selects/Select.kt

+6-10
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ public interface SelectInstance<in R> {
116116
/**
117117
* Tries to select this instance. Returns `true` on success.
118118
*/
119-
public fun trySelect(onSelect: () -> Unit = {}): Boolean
119+
public fun trySelect(): Boolean
120120

121121
/**
122122
* Tries to select this instance. Returns:
@@ -130,7 +130,7 @@ public interface SelectInstance<in R> {
130130
* member is public, but [Symbol] is internal. When [SelectInstance] becomes a `sealed interface`
131131
* (see KT-222860) we can declare this method as internal.
132132
*/
133-
public fun trySelectOther(otherOp: PrepareOp?, onSelect: () -> Unit): Any?
133+
public fun trySelectOther(otherOp: PrepareOp?): Any?
134134

135135
/**
136136
* Performs action atomically with [trySelect].
@@ -282,10 +282,7 @@ internal class SelectBuilderImpl<in R>(
282282
when {
283283
result === UNDECIDED -> {
284284
val update = value()
285-
if (_result.compareAndSet(UNDECIDED, update)) {
286-
uCont.shareableDispose() // will return result without calling continuation
287-
return
288-
}
285+
if (_result.compareAndSet(UNDECIDED, update)) return
289286
}
290287
result === COROUTINE_SUSPENDED -> if (_result.compareAndSet(COROUTINE_SUSPENDED, RESUMED)) {
291288
block()
@@ -397,8 +394,8 @@ internal class SelectBuilderImpl<in R>(
397394
disposeLockFreeLinkedList { this }
398395
}
399396

400-
override fun trySelect(onSelect: () -> Unit): Boolean {
401-
val result = trySelectOther(null, onSelect)
397+
override fun trySelect(): Boolean {
398+
val result = trySelectOther(null)
402399
return when {
403400
result === RESUME_TOKEN -> true
404401
result == null -> false
@@ -491,7 +488,7 @@ internal class SelectBuilderImpl<in R>(
491488

492489
// it is just like plain trySelect, but support idempotent start
493490
// Returns RESUME_TOKEN | RETRY_ATOMIC | null (when already selected)
494-
override fun trySelectOther(otherOp: PrepareOp?, onSelect: () -> Unit): Any? {
491+
override fun trySelectOther(otherOp: PrepareOp?): Any? {
495492
_state.loop { state -> // lock-free loop on state
496493
when {
497494
// Found initial state (not selected yet) -- try to make it selected
@@ -506,7 +503,6 @@ internal class SelectBuilderImpl<in R>(
506503
val decision = pairSelectOp.perform(this)
507504
if (decision !== null) return decision
508505
}
509-
onSelect()
510506
doAfterSelect()
511507
return RESUME_TOKEN
512508
}

kotlinx-coroutines-core/js/src/internal/Sharing.kt

-6
Original file line numberDiff line numberDiff line change
@@ -58,12 +58,6 @@ internal actual inline fun <T> CancellableContinuationImpl<T>.shareableResume(de
5858
@Suppress("NOTHING_TO_INLINE") // Should be NOP
5959
internal actual inline fun <T, R> (suspend (T) -> R).asShareable(): suspend (T) -> R = this
6060

61-
@Suppress("NOTHING_TO_INLINE") // Should be NOP
62-
internal actual inline fun <T, R> (suspend (T) -> R).shareableDispose(useIt: Boolean) {}
63-
64-
@Suppress("NOTHING_TO_INLINE") // Should be NOP
65-
internal actual inline fun <T, R> (suspend (T) -> R).shareableWillBeUsed() {}
66-
6761
@Suppress("NOTHING_TO_INLINE") // Save an entry on call stack
6862
internal actual inline fun isReuseSupportedInPlatform() = true
6963

kotlinx-coroutines-core/jvm/src/internal/Sharing.kt

-8
Original file line numberDiff line numberDiff line change
@@ -75,14 +75,6 @@ internal actual inline fun <T> CancellableContinuationImpl<T>.shareableResume(de
7575
@Suppress("NOTHING_TO_INLINE") // Should be NOP
7676
internal actual inline fun <T, R> (suspend (T) -> R).asShareable(): suspend (T) -> R = this
7777

78-
@InlineOnly
79-
@Suppress("NOTHING_TO_INLINE") // Should be NOP
80-
internal actual inline fun <T, R> (suspend (T) -> R).shareableDispose(useIt: Boolean) {}
81-
82-
@InlineOnly
83-
@Suppress("NOTHING_TO_INLINE") // Should be NOP
84-
internal actual inline fun <T, R> (suspend (T) -> R).shareableWillBeUsed() {}
85-
8678
@InlineOnly
8779
@Suppress("NOTHING_TO_INLINE") // Save an entry on call stack
8880
internal actual inline fun isReuseSupportedInPlatform() = true

kotlinx-coroutines-core/native/src/internal/Sharing.kt

-28
Original file line numberDiff line numberDiff line change
@@ -86,16 +86,6 @@ internal actual inline fun <T> Continuation<T>.shareableDispose() {
8686
internal actual fun <T, R> (suspend (T) -> R).asShareable(): suspend (T) -> R =
8787
ShareableBlock(this)
8888

89-
internal actual fun <T, R> (suspend (T) -> R).shareableDispose(useIt: Boolean) {
90-
this as ShareableBlock<*, *> // must have been shared
91-
dispose(useIt)
92-
}
93-
94-
internal actual fun <T, R> (suspend (T) -> R).shareableWillBeUsed() {
95-
this as ShareableBlock<*, *> // must have been shared
96-
willBeUsed()
97-
}
98-
9989
@PublishedApi
10090
internal actual inline fun disposeContinuation(cont: () -> Continuation<*>) {
10191
(cont() as ShareableContinuation<*>).disposeRef()
@@ -229,27 +219,9 @@ private typealias Fun2<T, R> = Function2<T, Continuation<R>, Any?>
229219
private class ShareableBlock<T, R>(
230220
block: Block1<T, R>
231221
) : ShareableObject<Block1<T, R>>(block), Block1<T, R>, SuspendFunction<R>, Fun2<T, R> {
232-
private val willBeUsed = atomic(false)
233-
234222
override suspend fun invoke(param: T): R = useRef().invoke(param)
235223

236224
@Suppress("UNCHECKED_CAST")
237225
override fun invoke(param: T, cont: Continuation<R>): Any? =
238226
(useRef() as Fun2<T, R>).invoke(param, cont)
239-
240-
fun willBeUsed() {
241-
willBeUsed.value = true
242-
}
243-
244-
fun dispose(useIt: Boolean) {
245-
if (willBeUsed.value && !useIt) return
246-
val thread = ownerThreadOrNull ?: return
247-
if (currentThread() == thread) {
248-
disposeRef()
249-
} else {
250-
thread.execute {
251-
disposeRef()
252-
}
253-
}
254-
}
255227
}

0 commit comments

Comments
 (0)