Skip to content

Commit b0f4863

Browse files
committed
ConflatedBufferedChannel
Signed-off-by: Nikita Koval <[email protected]>
1 parent 94b6397 commit b0f4863

File tree

2 files changed

+61
-38
lines changed

2 files changed

+61
-38
lines changed

kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt

+50-22
Original file line numberDiff line numberDiff line change
@@ -812,16 +812,24 @@ internal open class BufferedChannel<E>(
812812
)
813813
}
814814

815-
// TODO: method name, documentation, implementation.
816-
protected fun dropFirstElementIfNeeded(s: Long) {
815+
/**
816+
* Extracts the first element from this channel until the cell with the specified
817+
* index is moved to the logical buffer. This is a key procedure for the _conflated_
818+
* channel implementation, see [ConflatedBufferedChannel] with the [BufferOverflow.DROP_OLDEST]
819+
* strategy on buffer overflowing.
820+
*/
821+
protected fun dropFirstElementUntilTheSpecifiedCellIsInTheBuffer(globalCellIndex: Long) {
822+
assert { isConflatedDropOldest }
817823
// Read the segment reference before the counter increment;
818824
// it is crucial to be able to find the required segment later.
819825
var segment = receiveSegment.value
820826
while (true) {
821-
// Atomically increments the `receivers` counter
822-
// and obtain the value right before the increment.
827+
// Read the receivers counter to check whether the specified cell is already in the buffer
828+
// or should be moved to the buffer in a short time, due to the already started `receive()`.
823829
val r = this.receivers.value
824-
if (s < max(r + capacity, bufferEndCounter)) return
830+
if (globalCellIndex < max(r + capacity, bufferEndCounter)) return
831+
// The cell is outside the buffer. Try to extract the first element
832+
// if the `receivers` counter has not been changed.
825833
if (!this.receivers.compareAndSet(r, r + 1)) continue
826834
// Count the required segment id and the cell index in it.
827835
val id = r / SEGMENT_SIZE
@@ -831,25 +839,26 @@ internal open class BufferedChannel<E>(
831839
if (segment.id != id) {
832840
// Find the required segment, restarting the operation if it has not been found.
833841
segment = findSegmentReceive(id, segment) ?:
834-
// The required segment is not found. It is possible that the channel is already
842+
// The required segment has not been found. It is possible that the channel is already
835843
// closed for receiving, so the linked list of segments is closed as well.
836-
// In the latter case, the operation fails with the corresponding check at the beginning.
844+
// In the latter case, the operation will finish eventually after incrementing
845+
// the `receivers` counter sufficient times. Note that it is impossible to check
846+
// whether this channel is closed for receiving (we do this in `receive`),
847+
// as it may call this function when helping to complete closing the channel.
837848
continue
838849
}
839850
// Update the cell according to the cell life-cycle.
840851
val updCellResult = updateCellReceive(segment, i, r, null)
841852
when {
842853
updCellResult === FAILED -> {
843-
// The cell is poisoned.
844-
// Restart from the beginning in this case.
854+
// The cell is poisoned; restart from the beginning.
845855
// To avoid memory leaks, we also need to reset
846-
// the `prev` pointer of the working segment.
856+
// the `prev` pointer of t he working segment.
847857
if (r < sendersCounter) segment.cleanPrev()
848858
}
849859
else -> { // element
850-
// Either a buffered element was retrieved from the cell
851-
// or a rendezvous with a waiting sender has happened.
852-
// Clean the reference to the previous segment before finishing.
860+
// A buffered element was retrieved from the cell.
861+
// Clean the reference to the previous segment.
853862
segment.cleanPrev()
854863
@Suppress("UNCHECKED_CAST")
855864
onUndeliveredElement?.invoke(updCellResult as E)
@@ -1961,8 +1970,20 @@ internal open class BufferedChannel<E>(
19611970
// Close the linked list for further segment addition,
19621971
// obtaining the last segment in the data structure.
19631972
val lastSegment = closeLinkedList()
1964-
// TODO
1965-
if (isConflatedDropOldest) xxx(lastSegment)
1973+
// In the conflated channel implementation (with the DROP_OLDEST
1974+
// elements conflation strategy), it is critical to mark all empty
1975+
// cells as closed to prevent in-progress `send(e)`-s, which have not
1976+
// put their elements yet, completions after this channel is closed.
1977+
// Otherwise, it is possible for a `send(e)` to put an element when
1978+
// the buffer is already full, while a concurrent receiver may extract
1979+
// the oldest element. When the channel is not closed, we can linearize
1980+
// this `receive()` before the `send(e)`, but after the channel is closed,
1981+
// `send(e)` must fails. Marking all unprocessed cells as `CLOSED` solves the issue.
1982+
if (isConflatedDropOldest) {
1983+
val lastBufferedCellGlobalIndex = markAllEmptyCellsAsClosed(lastSegment)
1984+
if (lastBufferedCellGlobalIndex != -1L)
1985+
dropFirstElementUntilTheSpecifiedCellIsInTheBuffer(lastBufferedCellGlobalIndex)
1986+
}
19661987
// Resume waiting `receive()` requests,
19671988
// informing them that the channel is closed.
19681989
cancelSuspendedReceiveRequests(lastSegment, sendersCur)
@@ -1971,10 +1992,20 @@ internal open class BufferedChannel<E>(
19711992
return lastSegment
19721993
}
19731994

1974-
private fun xxx(lastSegment: ChannelSegment<E>) {
1995+
/**
1996+
* This function marks all empty cells, in the `null` and [IN_BUFFER] state,
1997+
* as closed. Notably, it processes the cells from right to left, and finishes
1998+
* immediately when the processing cell is already covered by `receive()` or
1999+
* contains a buffered elements ([BUFFERED] state).
2000+
*
2001+
* This function returns the global index of the last buffered element,
2002+
* or `-1` if this channel does not contain buffered elements.
2003+
*/
2004+
private fun markAllEmptyCellsAsClosed(lastSegment: ChannelSegment<E>): Long {
19752005
var segment = lastSegment
1976-
traverse@while (true) {
2006+
while (true) {
19772007
for (index in SEGMENT_SIZE - 1 downTo 0) {
2008+
if (segment.id * SEGMENT_SIZE + index < receiversCounter) return -1
19782009
cell_update@while (true) {
19792010
val state = segment.getState(index)
19802011
when {
@@ -1984,15 +2015,12 @@ internal open class BufferedChannel<E>(
19842015
break@cell_update
19852016
}
19862017
}
1987-
state === BUFFERED -> {
1988-
dropFirstElementIfNeeded(segment.id * SEGMENT_SIZE + index)
1989-
break@traverse
1990-
}
2018+
state === BUFFERED -> return segment.id * SEGMENT_SIZE + index
19912019
else -> break@cell_update
19922020
}
19932021
}
19942022
}
1995-
segment = segment.prev ?: break
2023+
segment = segment.prev ?: return -1
19962024
}
19972025
}
19982026

kotlinx-coroutines-core/common/src/channels/ConflatedBufferedChannel.kt

+11-16
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,11 @@ import kotlinx.coroutines.selects.*
1515
import kotlin.coroutines.*
1616

1717
/**
18-
* Channel with array buffer of a fixed capacity.
19-
* Sender suspends only when buffer is full and receiver suspends only when buffer is empty.
20-
*
21-
* This channel is created by `Channel(capacity)` factory function invocation.
22-
*
23-
* This implementation is blocking and uses coarse-grained locking to protect all channel operations.
24-
* However, removing a cancelled sender or receiver from a list of waiters is lock-free.
25-
**/
18+
* This is a special [BufferedChannel] extension that supports [DROP_OLDEST] and [DROP_LATEST]
19+
* strategies for buffer overflowing. This implementation ensures that `send(e)` never suspends,
20+
* either extracting the first element ([DROP_OLDEST]) or dropping the sending one ([DROP_LATEST])
21+
* when the channel capacity exceeds.
22+
*/
2623
internal open class ConflatedBufferedChannel<E>(
2724
private val capacity: Int,
2825
private val onBufferOverflow: BufferOverflow,
@@ -66,28 +63,26 @@ internal open class ConflatedBufferedChannel<E>(
6663
// Complete on success or if this channel is closed.
6764
if (result.isSuccess || result.isClosed) return result
6865
// This channel is full. Drop the sending element.
69-
// Call the `onUndeliveredElement` lambda if required
70-
// and successfully finish.
66+
// Call the `onUndeliveredElement` lambda (if required) and finish.
7167
onUndeliveredElement?.invoke(element)
7268
return success(Unit)
7369
}
7470

7571
private fun trySendDropOldest(element: E): ChannelResult<Unit> =
7672
sendImpl( // <-- this is an inline function
7773
element = element,
78-
// Put the element into the logical buffer in any case,
79-
// but if this channel is already full, the `onSuspend`
74+
// Put the element into the logical buffer even
75+
// if this channel is already full, the `onSuspend`
8076
// callback below extract the first (oldest) element.
8177
waiter = BUFFERED,
82-
// Finish successfully when a rendezvous happens
78+
// Finish successfully when a rendezvous has happened
8379
// or the element has been buffered.
8480
onRendezvousOrBuffered = { success(Unit) },
8581
// In case the algorithm decided to suspend, the element
8682
// was added to the buffer. However, as the buffer is now
8783
// overflowed, the first (oldest) element has to be extracted.
88-
// After that, the operation finishes.
8984
onSuspend = { segm, i ->
90-
dropFirstElementIfNeeded(segm.id * SEGMENT_SIZE + i)
85+
dropFirstElementUntilTheSpecifiedCellIsInTheBuffer(segm.id * SEGMENT_SIZE + i)
9186
success(Unit)
9287
},
9388
// If the channel is closed, return the corresponding result.
@@ -111,5 +106,5 @@ internal open class ConflatedBufferedChannel<E>(
111106
error("unreachable")
112107
}
113108

114-
override fun shouldSendSuspend() = false // never suspends
109+
override fun shouldSendSuspend() = false // never suspends.
115110
}

0 commit comments

Comments
 (0)