Skip to content

Commit d381216

Browse files
committed
Reduce starvation of ConflatedChannel receiver with many senders
1 parent 4b62d3b commit d381216

File tree

1 file changed

+37
-11
lines changed

1 file changed

+37
-11
lines changed

Diff for: kotlinx-coroutines-core/jvm/src/internal/LockFreeLinkedList.kt

+37-11
Original file line numberDiff line numberDiff line change
@@ -236,16 +236,21 @@ public actual open class LockFreeLinkedListNode {
236236
* In particular, invoking [nextNode].[prevNode] might still return this node even though it is "already removed".
237237
* Invoke [helpRemove] to make sure that remove was completed.
238238
*/
239-
public actual open fun remove(): Boolean {
239+
public actual open fun remove(): Boolean =
240+
removeOrNext() == null
241+
242+
// returns null if removed successfully or next node if this node is already removed
243+
@PublishedApi
244+
internal fun removeOrNext(): Node? {
240245
while (true) { // lock-free loop on next
241246
val next = this.next
242-
if (next is Removed) return false // was already removed -- don't try to help (original thread will take care)
243-
if (next === this) return false // was not even added
247+
if (next is Removed) return next.ref // was already removed -- don't try to help (original thread will take care)
248+
if (next === this) return next // was not even added
244249
val removed = (next as Node).removed()
245250
if (_next.compareAndSet(next, removed)) {
246251
// was removed successfully (linearized remove) -- fixup the list
247252
next.correctPrev(null, false)
248-
return true
253+
return null
249254
}
250255
}
251256
}
@@ -283,13 +288,34 @@ public actual open class LockFreeLinkedListNode {
283288

284289
// just peek at item when predicate is true
285290
public actual inline fun <reified T> removeFirstIfIsInstanceOfOrPeekIf(predicate: (T) -> Boolean): T? {
286-
while (true) { // try to linearize
287-
val first = next as Node
288-
if (first === this) return null
289-
if (first !is T) return null
290-
if (predicate(first)) return first // just peek when predicate is true
291-
if (first.remove()) return first
292-
first.helpRemove() // must help remove to ensure lock-freedom
291+
while(true) { // start with the first node of the list
292+
var first = this.next as Node
293+
var moveNextAttempts = 0
294+
while (true) { // scan a sequence of next nodes to find the first non-removed node
295+
if (first === this) return null // got list head -- nothing to remove
296+
if (first !is T) return null
297+
if (predicate(first)) {
298+
// check for removal of the current node to make sure "peek" operation is linearizable
299+
if (first.isRemoved) break // help remove it and start from the beginning
300+
return first // just peek when predicate is true
301+
}
302+
val next = first.removeOrNext()
303+
if (next === null) return first // removed successfully -- return it
304+
/*
305+
* This code is needed to reduce starvation in ConflatedChannel receiver that ties to remove a node
306+
* from the head of its queue while sender threads are constantly removing and adding new nodes.
307+
* So, upon discovery of a removed node we don't immediately rush to help with its removal.
308+
* We try to quickly move to the next node to see if we can remove it instead.
309+
* However, we limit the number of attempts to move to the next node and help with removal at the end
310+
* to avoid repeatedly scanning very long lists in LinkedListChannel.
311+
*/
312+
if (moveNextAttempts++ < 32) {
313+
first = next
314+
} else {
315+
break // help and start from the beginning
316+
}
317+
}
318+
first.helpRemove() // help remove this one and retry from the beginning of the list
293319
}
294320
}
295321

0 commit comments

Comments
 (0)