Skip to content

Commit 299d0c9

Browse files
ndkovalelizarov
authored andcommitted
Make the list of segments more abstract (Kotlin#1563)
* Make the list of segments more abstract, so that it can be used for other synchronization and communication primitives Co-authored-by: Roman Elizarov <[email protected]>
1 parent 6c297a6 commit 299d0c9

File tree

7 files changed

+405
-241
lines changed

7 files changed

+405
-241
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,240 @@
1+
/*
2+
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.internal
6+
7+
import kotlinx.atomicfu.*
8+
import kotlinx.coroutines.*
9+
import kotlin.native.concurrent.SharedImmutable
10+
11+
/**
12+
* Returns the first segment `s` with `s.id >= id` or `CLOSED`
13+
* if all the segments in this linked list have lower `id`, and the list is closed for further segment additions.
14+
*/
15+
private inline fun <S : Segment<S>> S.findSegmentInternal(
16+
id: Long,
17+
createNewSegment: (id: Long, prev: S?) -> S
18+
): SegmentOrClosed<S> {
19+
/*
20+
Go through `next` references and add new segments if needed, similarly to the `push` in the Michael-Scott
21+
queue algorithm. The only difference is that "CAS failure" means that the required segment has already been
22+
added, so the algorithm just uses it. This way, only one segment with each id can be added.
23+
*/
24+
var cur: S = this
25+
while (cur.id < id || cur.removed) {
26+
val next = cur.nextOrIfClosed { return SegmentOrClosed(CLOSED) }
27+
if (next != null) { // there is a next node -- move there
28+
cur = next
29+
continue
30+
}
31+
val newTail = createNewSegment(cur.id + 1, cur)
32+
if (cur.trySetNext(newTail)) { // successfully added new node -- move there
33+
if (cur.removed) cur.remove()
34+
cur = newTail
35+
}
36+
}
37+
return SegmentOrClosed(cur)
38+
}
39+
40+
/**
41+
* Returns `false` if the segment `to` is logically removed, `true` on a successful update.
42+
*/
43+
@Suppress("NOTHING_TO_INLINE") // Must be inline because it is an AtomicRef extension
44+
private inline fun <S : Segment<S>> AtomicRef<S>.moveForward(to: S): Boolean = loop { cur ->
45+
if (cur.id >= to.id) return true
46+
if (!to.tryIncPointers()) return false
47+
if (compareAndSet(cur, to)) { // the segment is moved
48+
if (cur.decPointers()) cur.remove()
49+
return true
50+
}
51+
if (to.decPointers()) to.remove() // undo tryIncPointers
52+
}
53+
54+
/**
55+
* Tries to find a segment with the specified [id] following by next references from the
56+
* [startFrom] segment and creating new ones if needed. The typical use-case is reading this `AtomicRef` values,
57+
* doing some synchronization, and invoking this function to find the required segment and update the pointer.
58+
* At the same time, [Segment.cleanPrev] should also be invoked if the previous segments are no longer needed
59+
* (e.g., queues should use it in dequeue operations).
60+
*
61+
* Since segments can be removed from the list, or it can be closed for further segment additions.
62+
* Returns the segment `s` with `s.id >= id` or `CLOSED` if all the segments in this linked list have lower `id`,
63+
* and the list is closed.
64+
*/
65+
internal inline fun <S : Segment<S>> AtomicRef<S>.findSegmentAndMoveForward(
66+
id: Long,
67+
startFrom: S,
68+
createNewSegment: (id: Long, prev: S?) -> S
69+
): SegmentOrClosed<S> {
70+
while (true) {
71+
val s = startFrom.findSegmentInternal(id, createNewSegment)
72+
if (s.isClosed || moveForward(s.segment)) return s
73+
}
74+
}
75+
76+
/**
77+
* Closes this linked list of nodes by forbidding adding new ones,
78+
* returns the last node in the list.
79+
*/
80+
internal fun <N : ConcurrentLinkedListNode<N>> N.close(): N {
81+
var cur: N = this
82+
while (true) {
83+
val next = cur.nextOrIfClosed { return cur }
84+
if (next === null) {
85+
if (cur.markAsClosed()) return cur
86+
} else {
87+
cur = next
88+
}
89+
}
90+
}
91+
92+
internal abstract class ConcurrentLinkedListNode<N : ConcurrentLinkedListNode<N>>(prev: N?) {
93+
// Pointer to the next node, updates similarly to the Michael-Scott queue algorithm.
94+
private val _next = atomic<Any?>(null)
95+
// Pointer to the previous node, updates in [remove] function.
96+
private val _prev = atomic(prev)
97+
98+
private val nextOrClosed get() = _next.value
99+
100+
/**
101+
* Returns the next segment or `null` of the one does not exist,
102+
* and invokes [onClosedAction] if this segment is marked as closed.
103+
*/
104+
@Suppress("UNCHECKED_CAST")
105+
inline fun nextOrIfClosed(onClosedAction: () -> Nothing): N? = nextOrClosed.let {
106+
if (it === CLOSED) {
107+
onClosedAction()
108+
} else {
109+
it as N?
110+
}
111+
}
112+
113+
val next: N? get() = nextOrIfClosed { return null }
114+
115+
/**
116+
* Tries to set the next segment if it is not specified and this segment is not marked as closed.
117+
*/
118+
fun trySetNext(value: N): Boolean = _next.compareAndSet(null, value)
119+
120+
/**
121+
* Checks whether this node is the physical tail of the current linked list.
122+
*/
123+
val isTail: Boolean get() = next == null
124+
125+
val prev: N? get() = _prev.value
126+
127+
/**
128+
* Cleans the pointer to the previous node.
129+
*/
130+
fun cleanPrev() { _prev.lazySet(null) }
131+
132+
/**
133+
* Tries to mark the linked list as closed by forbidding adding new nodes after this one.
134+
*/
135+
fun markAsClosed() = _next.compareAndSet(null, CLOSED)
136+
137+
/**
138+
* This property indicates whether the current node is logically removed.
139+
* The expected use-case is removing the node logically (so that [removed] becomes true),
140+
* and invoking [remove] after that. Note that this implementation relies on the contract
141+
* that the physical tail cannot be logically removed. Please, do not break this contract;
142+
* otherwise, memory leaks and unexpected behavior can occur.
143+
*/
144+
abstract val removed: Boolean
145+
146+
/**
147+
* Removes this node physically from this linked list. The node should be
148+
* logically removed (so [removed] returns `true`) at the point of invocation.
149+
*/
150+
fun remove() {
151+
assert { removed } // The node should be logically removed at first.
152+
assert { !isTail } // The physical tail cannot be removed.
153+
while (true) {
154+
// Read `next` and `prev` pointers ignoring logically removed nodes.
155+
val prev = leftmostAliveNode
156+
val next = rightmostAliveNode
157+
// Link `next` and `prev`.
158+
next._prev.value = prev
159+
if (prev !== null) prev._next.value = next
160+
// Checks that prev and next are still alive.
161+
if (next.removed) continue
162+
if (prev !== null && prev.removed) continue
163+
// This node is removed.
164+
return
165+
}
166+
}
167+
168+
private val leftmostAliveNode: N? get() {
169+
var cur = prev
170+
while (cur !== null && cur.removed)
171+
cur = cur._prev.value
172+
return cur
173+
}
174+
175+
private val rightmostAliveNode: N get() {
176+
assert { !isTail } // Should not be invoked on the tail node
177+
var cur = next!!
178+
while (cur.removed)
179+
cur = cur.next!!
180+
return cur
181+
}
182+
}
183+
184+
/**
185+
* Each segment in the list has a unique id and is created by the provided to [findSegmentAndMoveForward] method.
186+
* Essentially, this is a node in the Michael-Scott queue algorithm,
187+
* but with maintaining [prev] pointer for efficient [remove] implementation.
188+
*/
189+
internal abstract class Segment<S : Segment<S>>(val id: Long, prev: S?, pointers: Int): ConcurrentLinkedListNode<S>(prev) {
190+
/**
191+
* This property should return the maximal number of slots in this segment,
192+
* it is used to define whether the segment is logically removed.
193+
*/
194+
abstract val maxSlots: Int
195+
196+
/**
197+
* Numbers of cleaned slots (the lowest bits) and AtomicRef pointers to this segment (the highest bits)
198+
*/
199+
private val cleanedAndPointers = atomic(pointers shl POINTERS_SHIFT)
200+
201+
/**
202+
* The segment is considered as removed if all the slots are cleaned.
203+
* There are no pointers to this segment from outside, and
204+
* it is not a physical tail in the linked list of segments.
205+
*/
206+
override val removed get() = cleanedAndPointers.value == maxSlots && !isTail
207+
208+
// increments the number of pointers if this segment is not logically removed.
209+
internal fun tryIncPointers() = cleanedAndPointers.addConditionally(1 shl POINTERS_SHIFT) { it != maxSlots || isTail }
210+
211+
// returns `true` if this segment is logically removed after the decrement.
212+
internal fun decPointers() = cleanedAndPointers.addAndGet(-(1 shl POINTERS_SHIFT)) == maxSlots && !isTail
213+
214+
/**
215+
* Invoked on each slot clean-up; should not be invoked twice for the same slot.
216+
*/
217+
fun onSlotCleaned() {
218+
if (cleanedAndPointers.incrementAndGet() == maxSlots && !isTail) remove()
219+
}
220+
}
221+
222+
private inline fun AtomicInt.addConditionally(delta: Int, condition: (cur: Int) -> Boolean): Boolean {
223+
while (true) {
224+
val cur = this.value
225+
if (!condition(cur)) return false
226+
if (this.compareAndSet(cur, cur + delta)) return true
227+
}
228+
}
229+
230+
@Suppress("EXPERIMENTAL_FEATURE_WARNING") // We are using inline class only internally, so it is Ok
231+
internal inline class SegmentOrClosed<S : Segment<S>>(private val value: Any?) {
232+
val isClosed: Boolean get() = value === CLOSED
233+
@Suppress("UNCHECKED_CAST")
234+
val segment: S get() = if (value === CLOSED) error("Does not contain segment") else value as S
235+
}
236+
237+
private const val POINTERS_SHIFT = 16
238+
239+
@SharedImmutable
240+
private val CLOSED = Symbol("CLOSED")

0 commit comments

Comments
 (0)