Skip to content

Commit 8f8eee8

Browse files
ndkovalqwwdfsad
authored andcommitted
Fix select
1 parent e36544b commit 8f8eee8

File tree

2 files changed

+11
-25
lines changed

2 files changed

+11
-25
lines changed

kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt

+10-24
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import kotlin.coroutines.*
1212
import kotlin.js.*
1313
import kotlin.jvm.*
1414
import kotlin.math.*
15-
import kotlin.native.concurrent.*
1615
import kotlin.random.*
1716
import kotlin.reflect.*
1817

@@ -1236,7 +1235,6 @@ internal open class BufferedChannel<E>(
12361235
segm = segm.next ?: break
12371236
}
12381237
while (true) {
1239-
val segmPrev = segm.prev
12401238
for (i in SEGMENT_SIZE - 1 downTo 0) {
12411239
if (segm.id * SEGMENT_SIZE + i < receivers.value) return
12421240
while (true) {
@@ -1256,12 +1254,18 @@ internal open class BufferedChannel<E>(
12561254
}
12571255
state is Waiter -> {
12581256
if (segm.casState(i, state, CHANNEL_CLOSED)) {
1257+
// if (onUndeliveredElement != null) {
1258+
// undeliveredElementException = onUndeliveredElement.callUndeliveredElementCatchingException(segm.retrieveElement(i), undeliveredElementException)
1259+
// }
12591260
state.closeSender()
12601261
break
12611262
}
12621263
}
12631264
state is WaiterEB -> {
12641265
if (segm.casState(i, state, CHANNEL_CLOSED)) {
1266+
// if (onUndeliveredElement != null) {
1267+
// undeliveredElementException = onUndeliveredElement.callUndeliveredElementCatchingException(segm.retrieveElement(i), undeliveredElementException)
1268+
// }
12651269
state.waiter.closeSender()
12661270
break
12671271
}
@@ -1270,7 +1274,7 @@ internal open class BufferedChannel<E>(
12701274
}
12711275
}
12721276
}
1273-
segm = segmPrev ?: break
1277+
segm = segm.prev ?: break
12741278
}
12751279
undeliveredElementException?.let { throw it } // throw UndeliveredElementException at the end if there was one
12761280
}
@@ -1306,7 +1310,7 @@ internal open class BufferedChannel<E>(
13061310
}
13071311
}
13081312

1309-
private fun Any.closeReceiver() = closeWaiter(receiver = true)
1313+
private fun Waiter.closeReceiver() = closeWaiter(receiver = true)
13101314
private fun Any.closeSender() = closeWaiter(receiver = false)
13111315

13121316
private fun Any.closeWaiter(receiver: Boolean): Boolean {
@@ -1757,7 +1761,8 @@ internal class ChannelSegment<E>(id: Long, prev: ChannelSegment<E>?, pointers: I
17571761
val update = when {
17581762
cur is Waiter -> INTERRUPTED
17591763
cur is WaiterEB -> INTERRUPTED_EB
1760-
cur === S_RESUMING_EB || cur === S_RESUMING_RCV -> continue
1764+
cur === S_RESUMING_EB -> continue
1765+
cur === S_RESUMING_RCV -> continue
17611766
cur === INTERRUPTED_SEND -> INTERRUPTED_SEND
17621767
cur === DONE || cur === BUFFERED || cur === CHANNEL_CLOSED -> return
17631768
else -> error("unexpected: $cur")
@@ -1773,7 +1778,6 @@ internal class ChannelSegment<E>(id: Long, prev: ChannelSegment<E>?, pointers: I
17731778
}
17741779
}
17751780
private fun <E> createSegment(id: Long, prev: ChannelSegment<E>?) = ChannelSegment(id, prev, 0)
1776-
@SharedImmutable
17771781
private val NULL_SEGMENT = createSegment<Any?>(-1, null)
17781782
/**
17791783
* Number of cells in each segment.
@@ -1816,41 +1820,33 @@ private fun initialBufferEnd(capacity: Int): Long = when (capacity) {
18161820
*/
18171821

18181822
// The cell stores a buffered element.
1819-
@SharedImmutable
18201823
private val BUFFERED = Symbol("BUFFERED")
18211824
// Concurrent `expandBuffer(..)` can inform the
18221825
// upcoming sender that it should buffer the element.
1823-
@SharedImmutable
18241826
private val IN_BUFFER = Symbol("SHOULD_BUFFER")
18251827
// Indicates that a receiver (R suffix) is resuming
18261828
// the suspended sender; after that, it should update
18271829
// the state to either `DONE` (on success) or
18281830
// `INTERRUPTED_R` (on failure).
1829-
@SharedImmutable
18301831
private val S_RESUMING_RCV = Symbol("RESUMING_R")
18311832
// Indicates that `expandBuffer(..)` is resuming the
18321833
// suspended sender; after that, it should update the
18331834
// state to either `BUFFERED` (on success) or
18341835
// `INTERRUPTED_EB` (on failure).
1835-
@SharedImmutable
18361836
private val S_RESUMING_EB = Symbol("RESUMING_EB")
18371837
// When a receiver comes to the cell already covered by
18381838
// a sender (according to the counters), but the cell
18391839
// is still in `EMPTY` or `IN_BUFFER` state, it poisons
18401840
// the cell by changing its state to `POISONED`.
1841-
@SharedImmutable
18421841
private val POISONED = Symbol("POISONED")
18431842
// When the element is successfully transferred (possibly,
18441843
// through buffering), the cell moves to `DONE` state.
1845-
@SharedImmutable
18461844
private val DONE = Symbol("DONE")
18471845
// When the waiter is cancelled, it moves the cell to
18481846
// `INTERRUPTED` state; thus, informing other parties
18491847
// that may come to the cell and avoiding memory leaks.
1850-
@SharedImmutable
18511848
private val INTERRUPTED = Symbol("INTERRUPTED")
18521849
// TODO
1853-
@SharedImmutable
18541850
private val INTERRUPTED_SEND = Symbol("INTERRUPTED_SEND")
18551851
// When the cell is already covered by both sender and
18561852
// receiver (`sender` and `receivers` counters are greater
@@ -1872,11 +1868,9 @@ private class WaiterEB(@JvmField val waiter: Waiter) {
18721868
// receiver that it should complete the `expandBuffer(..)`
18731869
// procedure if the cancelled waiter stored in the cell
18741870
// was sender.
1875-
@SharedImmutable
18761871
private val INTERRUPTED_EB = Symbol("INTERRUPTED_EB")
18771872
// Indicates that the channel is already closed,
18781873
// and no more operation should not touch this cell.
1879-
@SharedImmutable
18801874
internal val CHANNEL_CLOSED = Symbol("CHANNEL_CLOSED")
18811875

18821876

@@ -1895,11 +1889,8 @@ private class ReceiveCatching<E>(
18951889
buffered element retrieval, the corresponding element
18961890
is returned as result of [BufferedChannel.updateCellReceive].
18971891
*/
1898-
@SharedImmutable
18991892
private val SUSPEND = Symbol("SUSPEND")
1900-
@SharedImmutable
19011893
private val SUSPEND_NO_WAITER = Symbol("SUSPEND_NO_WAITER")
1902-
@SharedImmutable
19031894
private val FAILED = Symbol("FAILED")
19041895

19051896
/*
@@ -1915,7 +1906,6 @@ private const val RESULT_FAILED = 4
19151906
* Special value for [BufferedChannel.BufferedChannelIterator.receiveResult]
19161907
* that indicates the absence of pre-received result.
19171908
*/
1918-
@SharedImmutable
19191909
private val NO_RECEIVE_RESULT = Symbol("NO_RECEIVE_RESULT")
19201910

19211911

@@ -1962,23 +1952,19 @@ private inline fun constructSendersAndCloseStatus(counter: Long, closeStatus: In
19621952
+------------>| CLOSED |
19631953
close +--------+
19641954
*/
1965-
@SharedImmutable
19661955
private val CLOSE_HANDLER_CLOSED = Symbol("CLOSE_HANDLER_CLOSED")
1967-
@SharedImmutable
19681956
private val CLOSE_HANDLER_INVOKED = Symbol("CLOSE_HANDLER_INVOKED")
19691957

19701958
/**
19711959
* Specifies the absence of closing cause, stored in [BufferedChannel.closeCause].
19721960
* When the channel is closed or cancelled without exception, this [NO_CLOSE_CAUSE]
19731961
* marker should be replaced with `null`.
19741962
*/
1975-
@SharedImmutable
19761963
private val NO_CLOSE_CAUSE = Symbol("NO_CLOSE_CAUSE")
19771964

19781965
/**
19791966
* All waiters, such as [CancellableContinuationImpl], [SelectInstance], and
19801967
* [BufferedChannel.BufferedChannelIterator], should be marked with this interface
19811968
* to make the code faster and easier to read.
19821969
*/
1983-
@InternalCoroutinesApi
19841970
internal interface Waiter

kotlinx-coroutines-core/common/src/selects/Select.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -705,7 +705,7 @@ internal open class SelectImplementation<R> constructor(
705705
// Update the state.
706706
state.update { cur ->
707707
// Finish immediately when this `select` is already completed.
708-
if (cur is ClauseData<*> || cur == STATE_COMPLETED) return
708+
if (cur == STATE_COMPLETED) return
709709
STATE_CANCELLED
710710
}
711711
// Read the list of clauses. If the `clauses` field is already `null`,

0 commit comments

Comments
 (0)