Skip to content

Commit e42569c

Browse files
committed
Almost non-blocking ConflatedBufferedChannel
Signed-off-by: Nikita Koval <[email protected]>
1 parent 5918efa commit e42569c

File tree

2 files changed

+42
-50
lines changed

2 files changed

+42
-50
lines changed

kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt

+37-13
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ internal open class BufferedChannel<E>(
6868

6969
internal val sendersCounter: Long get() = sendersAndCloseStatus.value.sendersCounter
7070
internal val receiversCounter: Long get() = receivers.value
71-
internal val bufferEndCounter: Long get() = bufferEnd.value
71+
private val bufferEndCounter: Long get() = bufferEnd.value
7272

7373
/*
7474
Additionally to the counters above, we need an extra one that
@@ -813,17 +813,11 @@ internal open class BufferedChannel<E>(
813813
}
814814

815815
// TODO: method name, documentation, implementation.
816-
protected fun dropFirstElementsIfNeeded(s: Long) {
816+
protected fun dropFirstElementIfNeeded(s: Long) {
817817
// Read the segment reference before the counter increment;
818818
// it is crucial to be able to find the required segment later.
819819
var segment = receiveSegment.value
820820
while (true) {
821-
// Similar to the `send(e)` operation, `receive()` first checks
822-
// whether the channel is already closed for receiving.
823-
if (isClosedForReceiveImpl) {
824-
if (receiversCounter < sendersCounter) segment.cleanPrev()
825-
return
826-
}
827821
// Atomically increments the `receivers` counter
828822
// and obtain the value right before the increment.
829823
val r = this.receivers.value
@@ -1534,7 +1528,8 @@ internal open class BufferedChannel<E>(
15341528
onCancellationConstructor = onUndeliveredElementReceiveCancellationConstructor
15351529
)
15361530

1537-
protected open fun registerSelectForReceive(select: SelectInstance<*>, ignoredParam: Any?) =
1531+
@Suppress("UNUSED_PARAMETER")
1532+
private fun registerSelectForReceive(select: SelectInstance<*>, ignoredParam: Any?) =
15381533
receiveImpl( // <-- this is an inline function
15391534
waiter = select,
15401535
onElementRetrieved = { elem -> select.selectInRegistrationPhase(elem) },
@@ -1606,7 +1601,7 @@ internal open class BufferedChannel<E>(
16061601
* and [SelectInstance.trySelect]. When the channel becomes closed,
16071602
* [tryResumeHasNextOnClosedChannel] should be used instead.
16081603
*/
1609-
protected open inner class BufferedChannelIterator : ChannelIterator<E>, BeforeResumeCancelHandler(), Waiter {
1604+
private inner class BufferedChannelIterator : ChannelIterator<E>, BeforeResumeCancelHandler(), Waiter {
16101605
/**
16111606
* Stores the element retrieved by [hasNext] or
16121607
* a special [CHANNEL_CLOSED] token if this channel is closed.
@@ -1775,7 +1770,7 @@ internal open class BufferedChannel<E>(
17751770
*/
17761771
private val _closeCause = atomic<Any?>(NO_CLOSE_CAUSE)
17771772
// Should be called only if this channel is closed or cancelled.
1778-
protected val closeCause get() = _closeCause.value as Throwable?
1773+
internal val closeCause get() = _closeCause.value as Throwable?
17791774

17801775
/** Returns the closing cause if it is non-null, or [ClosedSendChannelException] otherwise. */
17811776
protected val sendException get() = closeCause ?: ClosedSendChannelException(DEFAULT_CLOSE_MESSAGE)
@@ -1953,17 +1948,21 @@ internal open class BufferedChannel<E>(
19531948
/**
19541949
* Completes the started [close] or [cancel] procedure.
19551950
*/
1956-
protected open fun completeCloseOrCancel() {
1951+
private fun completeCloseOrCancel() {
19571952
isClosedForSendImpl // must finish the started close/cancel if one is detected.
19581953
}
19591954

1955+
protected open val isConflatedDropOldest get() = false
1956+
19601957
/**
19611958
* Completes the channel closing procedure.
19621959
*/
1963-
private fun completeClose(sendersCur: Long): ChannelSegment<E> {
1960+
protected open fun completeClose(sendersCur: Long): ChannelSegment<E> {
19641961
// Close the linked list for further segment addition,
19651962
// obtaining the last segment in the data structure.
19661963
val lastSegment = closeLinkedList()
1964+
// TODO
1965+
if (isConflatedDropOldest) xxx(lastSegment)
19671966
// Resume waiting `receive()` requests,
19681967
// informing them that the channel is closed.
19691968
cancelSuspendedReceiveRequests(lastSegment, sendersCur)
@@ -1972,6 +1971,31 @@ internal open class BufferedChannel<E>(
19721971
return lastSegment
19731972
}
19741973

1974+
private fun xxx(lastSegment: ChannelSegment<E>) {
1975+
var segment = lastSegment
1976+
traverse@while (true) {
1977+
for (index in SEGMENT_SIZE - 1 downTo 0) {
1978+
cell_update@while (true) {
1979+
val state = segment.getState(index)
1980+
when {
1981+
state === null || state === IN_BUFFER -> {
1982+
if (segment.casState(index, state, CHANNEL_CLOSED)) {
1983+
segment.onSlotCleaned()
1984+
break@cell_update
1985+
}
1986+
}
1987+
state === BUFFERED -> {
1988+
dropFirstElementIfNeeded(segment.id * SEGMENT_SIZE + index)
1989+
break@traverse
1990+
}
1991+
else -> break@cell_update
1992+
}
1993+
}
1994+
}
1995+
segment = segment.prev ?: break
1996+
}
1997+
}
1998+
19751999
/**
19762000
* Completes the channel cancellation procedure.
19772001
*/

kotlinx-coroutines-core/common/src/channels/ConflatedBufferedChannel.kt

+5-37
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@ internal open class ConflatedBufferedChannel<E>(
3737
}
3838
}
3939

40+
override val isConflatedDropOldest: Boolean
41+
get() = onBufferOverflow == DROP_OLDEST
42+
4043
override suspend fun send(element: E) {
4144
// Should never suspend, implement via `trySend(..)`.
4245
trySend(element).onClosed { // fails only when this channel is closed.
@@ -69,17 +72,7 @@ internal open class ConflatedBufferedChannel<E>(
6972
return success(Unit)
7073
}
7174

72-
private val closeStarted = atomic(false)
73-
private val trySendStarted = atomic(1L)
74-
private val trySendCompleted = atomic(1L)
75-
76-
private fun trySendDropOldest(element: E): ChannelResult<Unit> = try {
77-
trySendStarted.incrementAndGet()
78-
if (closeStarted.value) {
79-
trySendCompleted.incrementAndGet()
80-
while (!isClosedForSend) {}
81-
trySendStarted.incrementAndGet()
82-
}
75+
private fun trySendDropOldest(element: E): ChannelResult<Unit> =
8376
sendImpl( // <-- this is an inline function
8477
element = element,
8578
// Put the element into the logical buffer in any case,
@@ -94,15 +87,12 @@ internal open class ConflatedBufferedChannel<E>(
9487
// overflowed, the first (oldest) element has to be extracted.
9588
// After that, the operation finishes.
9689
onSuspend = { segm, i ->
97-
dropFirstElementsIfNeeded(segm.id * SEGMENT_SIZE + i)
90+
dropFirstElementIfNeeded(segm.id * SEGMENT_SIZE + i)
9891
success(Unit)
9992
},
10093
// If the channel is closed, return the corresponding result.
10194
onClosed = { closed(sendException) }
10295
)
103-
} finally {
104-
trySendCompleted.incrementAndGet()
105-
}
10696

10797
@Suppress("UNCHECKED_CAST")
10898
override fun registerSelectForSend(select: SelectInstance<*>, element: Any?) {
@@ -122,26 +112,4 @@ internal open class ConflatedBufferedChannel<E>(
122112
}
123113

124114
override fun shouldSendSuspend() = false // never suspends
125-
126-
override fun closeOrCancelImpl(cause: Throwable?, cancel: Boolean): Boolean {
127-
// Inform `trySend(..)` the the channel closing procedure has been started.
128-
// All `trySend(..)`s that start after setting this flag, should wait
129-
// until the channel is properly closed. Otherwise, a non-linearizable
130-
// behaviour can be observed.
131-
closeStarted.value = true
132-
// It is critical to wait until all the started `trySend(..)`s
133-
// complete their operation.
134-
waitUntilStartedTrySendsComplete()
135-
// Finally, close/cancel this channel.
136-
return super.closeOrCancelImpl(cause, cancel)
137-
}
138-
139-
private fun waitUntilStartedTrySendsComplete() {
140-
while (true) {
141-
val started = trySendStarted.value
142-
val completed = trySendCompleted.value
143-
if (started != trySendStarted.value) continue
144-
if (started == completed) break
145-
}
146-
}
147115
}

0 commit comments

Comments
 (0)