Skip to content

Commit 94b6397

Browse files
committed
Review fixes
Signed-off-by: Nikita Koval <[email protected]>
1 parent e42569c commit 94b6397

File tree

1 file changed

+18
-22
lines changed

1 file changed

+18
-22
lines changed

kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt

+18-22
Original file line numberDiff line numberDiff line change
@@ -1392,9 +1392,9 @@ internal open class BufferedChannel<E>(
13921392
/**
13931393
* Waits in a spin-loop until the [expandBuffer] call that
13941394
* should process the [globalIndex]-th cell is completed.
1395-
* Essentially, it waits until the counters of started ([bufferEnd])
1396-
* and completed ([completedExpandBuffersAndPauseFlag]) [expandBuffer] attempts
1397-
* coincide and become equal or greater than [globalIndex].
1395+
* Essentially, it waits until the numbers of started ([bufferEnd])
1396+
* and completed ([completedExpandBuffersAndPauseFlag]) [expandBuffer]
1397+
* attempts coincide and become equal or greater than [globalIndex].
13981398
* To avoid starvation, this function may set a flag
13991399
* that pauses further progress.
14001400
*/
@@ -1770,7 +1770,7 @@ internal open class BufferedChannel<E>(
17701770
*/
17711771
private val _closeCause = atomic<Any?>(NO_CLOSE_CAUSE)
17721772
// Should be called only if this channel is closed or cancelled.
1773-
internal val closeCause get() = _closeCause.value as Throwable?
1773+
protected val closeCause get() = _closeCause.value as Throwable?
17741774

17751775
/** Returns the closing cause if it is non-null, or [ClosedSendChannelException] otherwise. */
17761776
protected val sendException get() = closeCause ?: ClosedSendChannelException(DEFAULT_CLOSE_MESSAGE)
@@ -1957,7 +1957,7 @@ internal open class BufferedChannel<E>(
19571957
/**
19581958
* Completes the channel closing procedure.
19591959
*/
1960-
protected open fun completeClose(sendersCur: Long): ChannelSegment<E> {
1960+
private fun completeClose(sendersCur: Long): ChannelSegment<E> {
19611961
// Close the linked list for further segment addition,
19621962
// obtaining the last segment in the data structure.
19631963
val lastSegment = closeLinkedList()
@@ -2040,7 +2040,7 @@ internal open class BufferedChannel<E>(
20402040
// order after that.
20412041
var suspendedSenders = InlineList<Waiter>()
20422042
var segment = lastSegment
2043-
traverse@ while (true) {
2043+
process_segments@ while (true) {
20442044
for (index in SEGMENT_SIZE - 1 downTo 0) {
20452045
// Process the cell `segment[index]`.
20462046
val globalIndex = segment.id * SEGMENT_SIZE + index
@@ -2050,11 +2050,11 @@ internal open class BufferedChannel<E>(
20502050
val state = segment.getState(index)
20512051
when {
20522052
// The cell is already processed by a receiver.
2053-
state === DONE_RCV -> break@traverse
2053+
state === DONE_RCV -> break@process_segments
20542054
// The cell stores a buffered element.
20552055
state === BUFFERED -> {
20562056
// Is the cell already covered by a receiver?
2057-
if (globalIndex < receiversCounter) break@traverse
2057+
if (globalIndex < receiversCounter) break@process_segments
20582058
// Update the cell state to `CHANNEL_CLOSED`.
20592059
if (segment.casState(index, state, CHANNEL_CLOSED)) {
20602060
// If `onUndeliveredElement` lambda is non-null, call it.
@@ -2081,7 +2081,7 @@ internal open class BufferedChannel<E>(
20812081
// The cell stores a suspended waiter.
20822082
state is Waiter || state is WaiterEB -> {
20832083
// Is the cell already covered by a receiver?
2084-
if (globalIndex < receiversCounter) break@traverse
2084+
if (globalIndex < receiversCounter) break@process_segments
20852085
// Obtain the sender.
20862086
val sender: Waiter = if (state is WaiterEB) state.waiter
20872087
else state as Waiter
@@ -2103,7 +2103,7 @@ internal open class BufferedChannel<E>(
21032103
}
21042104
// A concurrent receiver is resuming a suspended sender.
21052105
// As the cell is covered by a receiver, finish immediately.
2106-
state === RESUMING_BY_EB || state === RESUMING_BY_RCV -> break@traverse
2106+
state === RESUMING_BY_EB || state === RESUMING_BY_RCV -> break@process_segments
21072107
// A concurrent `expandBuffer()` is resuming a suspended sender.
21082108
// Wait in a spin-loop until the cell state changes.
21092109
state === RESUMING_BY_EB -> continue@update_cell
@@ -2284,18 +2284,14 @@ internal open class BufferedChannel<E>(
22842284
// obtained segment (in the beginning of this function) has lower id.
22852285
val id = r / SEGMENT_SIZE
22862286
if (segment.id != id) {
2287-
// Check that the channel is not closed yet
2288-
// and the required segment may be found.
2289-
// Otherwise, the `r`-th cell will not be physically
2290-
// created, and this function returns `false` immediately.
2291-
// We could perform this check only if `findSegmentReceive`
2292-
// below returns `null`, but it would make the code less clean.
2293-
segment.nextOrIfClosed {
2294-
segment.cleanPrev()
2295-
return false
2296-
}
2297-
// Find the required segment, restarting the operation if it has not been found.
2298-
segment = findSegmentReceive(id, segment) ?: continue
2287+
// Try to find the required segment.
2288+
segment = findSegmentReceive(id, segment) ?:
2289+
// The required segment has not been found. Either it has already
2290+
// been removed, or the underlying linked list is already closed
2291+
// for segment additions. In the latter case, the channel is closed
2292+
// and does not contain elements, so this operation returns `false`.
2293+
// Otherwise, if the required segment is removed, the operation restarts.
2294+
if (receiveSegment.value.id < id) return false else continue
22992295
}
23002296
segment.cleanPrev() // all the previous segments are no longer needed.
23012297
// Does the `r`-th cell contain waiting sender or buffered element?

0 commit comments

Comments
 (0)