Skip to content

Commit 5918efa

Browse files
committed
Almost non-blocking ConflatedBufferedChannel
Signed-off-by: Nikita Koval <[email protected]>
1 parent 7bbc362 commit 5918efa

File tree

1 file changed

+36
-1
lines changed

1 file changed

+36
-1
lines changed

kotlinx-coroutines-core/common/src/channels/ConflatedBufferedChannel.kt

+36-1
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,17 @@ internal open class ConflatedBufferedChannel<E>(
6969
return success(Unit)
7070
}
7171

72-
private fun trySendDropOldest(element: E): ChannelResult<Unit> =
72+
private val closeStarted = atomic(false)
73+
private val trySendStarted = atomic(1L)
74+
private val trySendCompleted = atomic(1L)
75+
76+
private fun trySendDropOldest(element: E): ChannelResult<Unit> = try {
77+
trySendStarted.incrementAndGet()
78+
if (closeStarted.value) {
79+
trySendCompleted.incrementAndGet()
80+
while (!isClosedForSend) {}
81+
trySendStarted.incrementAndGet()
82+
}
7383
sendImpl( // <-- this is an inline function
7484
element = element,
7585
// Put the element into the logical buffer in any case,
@@ -90,6 +100,9 @@ internal open class ConflatedBufferedChannel<E>(
90100
// If the channel is closed, return the corresponding result.
91101
onClosed = { closed(sendException) }
92102
)
103+
} finally {
104+
trySendCompleted.incrementAndGet()
105+
}
93106

94107
@Suppress("UNCHECKED_CAST")
95108
override fun registerSelectForSend(select: SelectInstance<*>, element: Any?) {
@@ -109,4 +122,26 @@ internal open class ConflatedBufferedChannel<E>(
109122
}
110123

111124
override fun shouldSendSuspend() = false // never suspends
125+
126+
override fun closeOrCancelImpl(cause: Throwable?, cancel: Boolean): Boolean {
127+
// Inform `trySend(..)` the the channel closing procedure has been started.
128+
// All `trySend(..)`s that start after setting this flag, should wait
129+
// until the channel is properly closed. Otherwise, a non-linearizable
130+
// behaviour can be observed.
131+
closeStarted.value = true
132+
// It is critical to wait until all the started `trySend(..)`s
133+
// complete their operation.
134+
waitUntilStartedTrySendsComplete()
135+
// Finally, close/cancel this channel.
136+
return super.closeOrCancelImpl(cause, cancel)
137+
}
138+
139+
private fun waitUntilStartedTrySendsComplete() {
140+
while (true) {
141+
val started = trySendStarted.value
142+
val completed = trySendCompleted.value
143+
if (started != trySendStarted.value) continue
144+
if (started == completed) break
145+
}
146+
}
112147
}

0 commit comments

Comments
 (0)