@@ -236,16 +236,21 @@ public actual open class LockFreeLinkedListNode {
236
236
* In particular, invoking [nextNode].[prevNode] might still return this node even though it is "already removed".
237
237
* Invoke [helpRemove] to make sure that remove was completed.
238
238
*/
239
- public actual open fun remove (): Boolean {
239
+ public actual open fun remove (): Boolean =
240
+ removeOrNext() == null
241
+
242
+ // returns null if removed successfully or next node if this node is already removed
243
+ @PublishedApi
244
+ internal fun removeOrNext (): Node ? {
240
245
while (true ) { // lock-free loop on next
241
246
val next = this .next
242
- if (next is Removed ) return false // was already removed -- don't try to help (original thread will take care)
243
- if (next == = this ) return false // was not even added
247
+ if (next is Removed ) return next.ref // was already removed -- don't try to help (original thread will take care)
248
+ if (next == = this ) return next // was not even added
244
249
val removed = (next as Node ).removed()
245
250
if (_next .compareAndSet(next, removed)) {
246
251
// was removed successfully (linearized remove) -- fixup the list
247
252
next.correctPrev(null , false )
248
- return true
253
+ return null
249
254
}
250
255
}
251
256
}
@@ -283,13 +288,34 @@ public actual open class LockFreeLinkedListNode {
283
288
284
289
// just peek at item when predicate is true
285
290
public actual inline fun <reified T > removeFirstIfIsInstanceOfOrPeekIf (predicate : (T ) -> Boolean ): T ? {
286
- while (true ) { // try to linearize
287
- val first = next as Node
288
- if (first == = this ) return null
289
- if (first !is T ) return null
290
- if (predicate(first)) return first // just peek when predicate is true
291
- if (first.remove()) return first
292
- first.helpRemove() // must help remove to ensure lock-freedom
291
+ while (true ) { // start with the first node of the list
292
+ var first = this .next as Node
293
+ var moveNextAttempts = 0
294
+ while (true ) { // scan a sequence of next nodes to find the first non-removed node
295
+ if (first == = this ) return null // got list head -- nothing to remove
296
+ if (first !is T ) return null
297
+ if (predicate(first)) {
298
+ // check for removal of the current node to make sure "peek" operation is linearizable
299
+ if (first.isRemoved) break // help remove it and start from the beginning
300
+ return first // just peek when predicate is true
301
+ }
302
+ val next = first.removeOrNext()
303
+ if (next == = null ) return first // removed successfully -- return it
304
+ /*
305
+ * This code is needed to reduce starvation in ConflatedChannel receiver that ties to remove a node
306
+ * from the head of its queue while sender threads are constantly removing and adding new nodes.
307
+ * So, upon discovery of a removed node we don't immediately rush to help with its removal.
308
+ * We try to quickly move to the next node to see if we can remove it instead.
309
+ * However, we limit the number of attempts to move to the next node and help with removal at the end
310
+ * to avoid repeatedly scanning very long lists in LinkedListChannel.
311
+ */
312
+ if (moveNextAttempts++ < 32 ) {
313
+ first = next
314
+ } else {
315
+ break // help and start from the beginning
316
+ }
317
+ }
318
+ first.helpRemove() // help remove this one and retry from the beginning of the list
293
319
}
294
320
}
295
321
0 commit comments