Skip to content

Commit ba22d74

Browse files
committed
Make the list of segments more abstract, so that it can be used for other synchronization and communication primitives
1 parent 345458b commit ba22d74

File tree

7 files changed

+386
-241
lines changed

7 files changed

+386
-241
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,223 @@
1+
/*
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.internal
6+
7+
import kotlinx.atomicfu.*
8+
import kotlinx.coroutines.*
9+
import kotlin.native.concurrent.SharedImmutable
10+
11+
// Returns the first segment `s` with `s.id >= id` or `CLOSED`
12+
// if all the segments in this linked list have lower `id` and the list is closed for further segment additions.
13+
private inline fun <S : Segment<S>> S.findSegmentInternal(id: Long, createNewSegment: (id: Long, prev: S?) -> S): SegmentOrClosed<S> {
14+
// Go through `next` references and add new segments if needed,
15+
// similarly to the `push` in the Michael-Scott queue algorithm.
16+
// The only difference is that `CAS failure` means that the
17+
// required segment has already been added, so the algorithm just
18+
// uses it. This way, only one segment with each id can be added.
19+
var cur: S = this
20+
while (cur.id < id || cur.removed) {
21+
val nextOrClosed = cur.nextOrClosed
22+
if (nextOrClosed.isClosed) return SegmentOrClosed(CLOSED)
23+
val next: S = if (nextOrClosed.node === null) {
24+
val newTail = createNewSegment(cur.id + 1, cur)
25+
if (cur.trySetNext(newTail)) {
26+
if (cur.removed) cur.remove()
27+
newTail
28+
} else {
29+
val nextOrClosed = cur.nextOrClosed
30+
if (nextOrClosed.isClosed) return SegmentOrClosed(CLOSED)
31+
nextOrClosed.node!!
32+
}
33+
} else nextOrClosed.node!!
34+
cur = next
35+
}
36+
return SegmentOrClosed(cur)
37+
}
38+
39+
// Returns `false` if the segment `to` is logically removed, `true` on successful update.
40+
private inline fun <S : Segment<S>> AtomicRef<S>.moveForward(to: S): Boolean = loop { cur ->
41+
if (cur.id >= to.id) return true
42+
if (!to.tryIncPointers()) return false
43+
if (compareAndSet(cur, to)) {
44+
// the segment is moved
45+
if (cur.decPointers()) cur.remove()
46+
return true
47+
} else {
48+
if (to.decPointers()) to.remove()
49+
}
50+
}
51+
52+
/**
53+
* Tries to find a segment with the specified [id] following by next references from the
54+
* [startFrom] segment and creating new ones if needed. The typical use-case is reading this `AtomicRef` values,
55+
* doing some synchronization, and invoking this function to find the required segment and update the pointer.
56+
* At the same time, [Segment.cleanPrev] should also be invoked if the previous segments are no longer needed
57+
* (e.g., queues should use it in dequeue operations).
58+
*
59+
* Since segments can be removed from the list, or it can be closed for further segment additions, this function
60+
* returns the segment `s` with `s.id >= id` or `CLOSED` if all the segments in this linked list have lower `id`
61+
* and the list is closed.
62+
*/
63+
internal inline fun <S : Segment<S>> AtomicRef<S>.findSegmentAndMoveForward(id: Long, startFrom: S, createNewSegment: (id: Long, prev: S?) -> S): SegmentOrClosed<S> {
64+
while (true) {
65+
val s = startFrom.findSegmentInternal(id, createNewSegment)
66+
if (s.isClosed || moveForward(s.segment)) return s
67+
}
68+
}
69+
70+
/**
71+
* Closes this linked list of nodes by forbidding adding new ones,
72+
* returns the last node in the list.
73+
*/
74+
internal fun <N : ConcurrentLinkedListNode<N>> N.close(): N {
75+
var cur: N = this
76+
while (true) {
77+
val next = cur.nextOrClosed.run { if (isClosed) return cur else node }
78+
if (next === null) {
79+
if (cur.markAsClosed()) return cur
80+
} else {
81+
cur = next
82+
}
83+
}
84+
}
85+
86+
internal abstract class ConcurrentLinkedListNode<N : ConcurrentLinkedListNode<N>>(prev: N?) {
87+
// Pointer to the next node, updates similarly to the Michael-Scott queue algorithm.
88+
private val _next = atomic<Any?>(null)
89+
val nextOrClosed: NextNodeOrClosed<N> get() = NextNodeOrClosed(_next.value)
90+
fun trySetNext(value: N): Boolean = _next.compareAndSet(null, value)
91+
92+
/**
93+
* Checks whether this node is the physical tail of the current linked list.
94+
*/
95+
val isTail: Boolean get() = _next.value.let { it === null || it === CLOSED }
96+
97+
// Pointer to the previous node, updates in [remove] function.
98+
private val _prev = atomic(prev)
99+
val prev: N? get() = _prev.value
100+
101+
/**
102+
* Cleans the pointer to the previous node.
103+
*/
104+
fun cleanPrev() { _prev.lazySet(null) }
105+
106+
/**
107+
* Tries to mark the linked list as closed by forbidding adding new nodes after this one.
108+
*/
109+
fun markAsClosed() = _next.compareAndSet(null, CLOSED)
110+
111+
/**
112+
* Checks whether this node is a physical tail and is closed for further node additions.
113+
*/
114+
val isClosed get() = _next.value === CLOSED
115+
116+
/**
117+
* This property indicates whether the current node is logically removed.
118+
* The expected use-case is removing the node logically (so that [removed] becomes true),
119+
* and invoking [remove] after that. Note that this implementation relies on the contract
120+
* that the physical tail cannot be logically removed. Please, do not break this contract;
121+
* otherwise, memory leaks and unexpected behavior can occur.
122+
*/
123+
abstract val removed: Boolean
124+
125+
/**
126+
* Removes this node physically from this linked list. The node should be
127+
* logically removed (so [removed] returns `true`) at the point of invocation.
128+
*/
129+
fun remove() {
130+
assert { removed } // The node should be logically removed at first.
131+
assert { nextOrClosed.node !== null } // The physical tail cannot be removed.
132+
while (true) {
133+
// Read `next` and `prev` pointers ignoring logically removed nodes.
134+
val prev = leftmostAliveNode
135+
val next = rightmostAliveNode
136+
// Link `next` and `prev`.
137+
next._prev.value = prev
138+
if (prev !== null) prev._next.value = next
139+
// Check that prev and next are still alive.
140+
if (next.removed) continue
141+
if (prev !== null && prev.removed) continue
142+
// This node is removed.
143+
return
144+
}
145+
}
146+
147+
private val leftmostAliveNode: N? get() {
148+
var cur = prev
149+
while (cur !== null && cur.removed)
150+
cur = cur._prev.value
151+
return cur
152+
}
153+
154+
private val rightmostAliveNode: N get() {
155+
assert { !isTail } // Should not be invoked on the tail node
156+
var cur = nextOrClosed.node!!
157+
while (cur.removed)
158+
cur = cur.nextOrClosed.node!!
159+
return cur
160+
}
161+
}
162+
163+
/**
164+
* Each segment in the list has a unique id and is created by the provided to [findSegmentAndMoveForward] method.
165+
* Essentially, this is a node in the Michael-Scott queue algorithm,
166+
* but with maintaining [prev] pointer for efficient [remove] implementation.
167+
*/
168+
internal abstract class Segment<S : Segment<S>>(val id: Long, prev: S?, pointers: Int): ConcurrentLinkedListNode<S>(prev) {
169+
/**
170+
* This property should return the maximal number of slots in this segment,
171+
* it is used to define whether the segment is logically removed.
172+
*/
173+
abstract val maxSlots: Int
174+
175+
/**
176+
* Numbers of cleaned slots (lowest bits) and AtomicRef pointers to this segment (highest bits)
177+
*/
178+
private val cleanedAndPointers = atomic(pointers shl POINTERS_SHIFT)
179+
180+
/**
181+
* The segment is considered as removed if all the slots are cleaned,
182+
* there is no pointers to this segment from outside, and
183+
* it is not a physical tail in the linked list of segments.
184+
*/
185+
override val removed get() = cleanedAndPointers.value == maxSlots && !isTail
186+
187+
// increments the number of pointers if this segment is not logically removed.
188+
internal fun tryIncPointers() = cleanedAndPointers.addConditionally(1 shl POINTERS_SHIFT) { it != maxSlots || isTail }
189+
190+
// returns `true` if this segment is logically removed after the decrement.
191+
internal fun decPointers() = cleanedAndPointers.addAndGet(-(1 shl POINTERS_SHIFT)) == maxSlots && !isTail
192+
193+
/**
194+
* This functions should be invoked on each slot clean-up;
195+
* should not be invoked twice for the same slot.
196+
*/
197+
fun onSlotCleaned() {
198+
if (cleanedAndPointers.incrementAndGet() == maxSlots && !isTail) remove()
199+
}
200+
}
201+
202+
private inline fun AtomicInt.addConditionally(delta: Int, condition: (cur: Int) -> Boolean): Boolean {
203+
while (true) {
204+
val cur = this.value
205+
if (!condition(cur)) return false
206+
if (this.compareAndSet(cur, cur + delta)) return true
207+
}
208+
}
209+
210+
internal inline class SegmentOrClosed<S : Segment<S>>(private val value: Any?) {
211+
val isClosed: Boolean get() = value === CLOSED
212+
val segment: S get() = if (value === CLOSED) error("Does not contain segment") else value as S
213+
}
214+
215+
internal inline class NextNodeOrClosed<N : ConcurrentLinkedListNode<N>>(private val value: Any?) {
216+
val isClosed: Boolean get() = value === CLOSED
217+
val node: N? get() = if (isClosed) null else value as N?
218+
}
219+
220+
private const val POINTERS_SHIFT = 16
221+
222+
@SharedImmutable
223+
private val CLOSED = Symbol("CLOSED")

0 commit comments

Comments
 (0)