Skip to content

Commit 184ed7b

Browse files
committed
ConflatedBufferedChannel
Signed-off-by: Nikita Koval <[email protected]>
1 parent b0f4863 commit 184ed7b

File tree

1 file changed

+11
-2
lines changed

1 file changed

+11
-2
lines changed

kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt

+11-2
Original file line numberDiff line numberDiff line change
@@ -2002,24 +2002,33 @@ internal open class BufferedChannel<E>(
20022002
* or `-1` if this channel does not contain buffered elements.
20032003
*/
20042004
private fun markAllEmptyCellsAsClosed(lastSegment: ChannelSegment<E>): Long {
2005+
// Process the cells in reverse order, from right to left.
20052006
var segment = lastSegment
20062007
while (true) {
20072008
for (index in SEGMENT_SIZE - 1 downTo 0) {
2008-
if (segment.id * SEGMENT_SIZE + index < receiversCounter) return -1
2009+
// Is this cell already covered by `receive()`?
2010+
val globalIndex = segment.id * SEGMENT_SIZE + index
2011+
if (globalIndex < receiversCounter) return -1
2012+
// Process the cell `segment[index]`.
20092013
cell_update@while (true) {
20102014
val state = segment.getState(index)
20112015
when {
2016+
// The cell is empty.
20122017
state === null || state === IN_BUFFER -> {
2018+
// Inform a possibly upcoming sender that this channel is already closed.
20132019
if (segment.casState(index, state, CHANNEL_CLOSED)) {
20142020
segment.onSlotCleaned()
20152021
break@cell_update
20162022
}
20172023
}
2018-
state === BUFFERED -> return segment.id * SEGMENT_SIZE + index
2024+
// The cell stores a buffered element.
2025+
state === BUFFERED -> return globalIndex
2026+
// Skip this cell if it is not empty and does not store a buffered element.
20192027
else -> break@cell_update
20202028
}
20212029
}
20222030
}
2031+
// Process the next segment, finishing if the linked list ends.
20232032
segment = segment.prev ?: return -1
20242033
}
20252034
}

0 commit comments

Comments
 (0)