Skip to content

Commit d377ac8

Browse files
committed
Make the list of segments more abstract, so that it can be used for other synchronization and communication primitives
1 parent d5f6dbb commit d377ac8

File tree

7 files changed

+384
-240
lines changed

7 files changed

+384
-240
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,222 @@
1+
/*
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.internal
6+
7+
import kotlinx.atomicfu.*
8+
import kotlinx.coroutines.*
9+
10+
// Returns the first segment `s` with `s.id >= id` or `CLOSED`
11+
// if all the segments in this linked list have lower `id` and the list is closed for further segment additions.
12+
private inline fun <S : Segment<S>> S.findSegmentInternal(id: Long, createNewSegment: (id: Long, prev: S?) -> S): SegmentOrClosed<S> {
13+
// Go through `next` references and add new segments if needed,
14+
// similarly to the `push` in the Michael-Scott queue algorithm.
15+
// The only difference is that `CAS failure` means that the
16+
// required segment has already been added, so the algorithm just
17+
// uses it. This way, only one segment with each id can be added.
18+
var cur: S = this
19+
while (cur.id < id || cur.removed) {
20+
val nextOrClosed = cur.nextOrClosed
21+
if (nextOrClosed.isClosed) return SegmentOrClosed(CLOSED)
22+
val next: S = if (nextOrClosed.node === null) {
23+
val newTail = createNewSegment(cur.id + 1, cur)
24+
if (cur.trySetNext(newTail)) {
25+
if (cur.removed) cur.remove()
26+
newTail
27+
} else {
28+
val nextOrClosed = cur.nextOrClosed
29+
if (nextOrClosed.isClosed) return SegmentOrClosed(CLOSED)
30+
nextOrClosed.node!!
31+
}
32+
} else nextOrClosed.node!!
33+
cur = next
34+
}
35+
return SegmentOrClosed(cur)
36+
}
37+
38+
// Returns `false` if the segment `to` is logically removed, `true` on successful update.
39+
private inline fun <S : Segment<S>> AtomicRef<S>.moveForward(to: S): Boolean = loop { cur ->
40+
if (cur.id >= to.id) return true
41+
if (!to.tryIncPointers()) return false
42+
if (compareAndSet(cur, to)) {
43+
// the segment is moved
44+
if (cur.decPointers()) cur.remove()
45+
return true
46+
} else {
47+
if (to.decPointers()) to.remove()
48+
}
49+
}
50+
51+
/**
52+
* Tries to find a segment with the specified [id] following by next references from the
53+
* [startFrom] segment and creating new ones if needed. The typical use-case is reading this `AtomicRef` values,
54+
* doing some synchronization, and invoking this function to find the required segment and update the pointer.
55+
* At the same time, [Segment.cleanPrev] should also be invoked if the previous segments are no longer needed
56+
* (e.g., queues should use it in dequeue operations).
57+
*
58+
* Since segments can be removed from the list, or it can be closed for further segment additions, this function
59+
* returns the segment `s` with `s.id >= id` or `CLOSED` if all the segments in this linked list have lower `id`
60+
* and the list is closed.
61+
*/
62+
internal inline fun <S : Segment<S>> AtomicRef<S>.findSegmentAndMoveForward(id: Long, startFrom: S, createNewSegment: (id: Long, prev: S?) -> S): SegmentOrClosed<S> {
63+
while (true) {
64+
val s = startFrom.findSegmentInternal(id, createNewSegment)
65+
if (s.isClosed || moveForward(s.segment)) return s
66+
}
67+
}
68+
69+
/**
70+
* Closes this linked list of nodes by forbidding adding new ones,
71+
* returns the last node in the list.
72+
*/
73+
internal fun <N : ConcurrentLinkedListNode<N>> N.close(): N {
74+
var cur: N = this
75+
while (true) {
76+
val next = cur.nextOrClosed.run { if (isClosed) return cur else node }
77+
if (next === null) {
78+
if (cur.markAsClosed()) return cur
79+
} else {
80+
cur = next
81+
}
82+
}
83+
}
84+
85+
internal abstract class ConcurrentLinkedListNode<N : ConcurrentLinkedListNode<N>>(prev: N?) {
86+
// Pointer to the next node, updates similarly to the Michael-Scott queue algorithm.
87+
private val _next = atomic<Any?>(null)
88+
val nextOrClosed: NextNodeOrClosed<N> get() = NextNodeOrClosed(_next.value)
89+
fun trySetNext(value: N): Boolean = _next.compareAndSet(null, value)
90+
91+
/**
92+
* Checks whether this node is the physical tail of the current linked list.
93+
*/
94+
val isTail: Boolean get() = _next.value.let { it === null || it === CLOSED }
95+
96+
// Pointer to the previous node, updates in [remove] function.
97+
private val _prev = atomic(prev)
98+
val prev: N? get() = _prev.value
99+
100+
/**
101+
* Cleans the pointer to the previous node.
102+
*/
103+
fun cleanPrev() { _prev.lazySet(null) }
104+
105+
/**
106+
* Tries to mark the linked list as closed by forbidding adding new nodes after this one.
107+
*/
108+
fun markAsClosed() = _next.compareAndSet(null, CLOSED)
109+
110+
/**
111+
* Checks whether this node is a physical tail and is closed for further node additions.
112+
*/
113+
val isClosed get() = _next.value === CLOSED
114+
115+
/**
116+
* This property indicates whether the current node is logically removed.
117+
* The expected use-case is removing the node logically (so that [removed] becomes true),
118+
* and invoking [remove] after that. Note that this implementation relies on the contract
119+
* that the physical tail cannot be logically removed. Please, do not break this contract;
120+
* otherwise, memory leaks and unexpected behavior can occur.
121+
*/
122+
abstract val removed: Boolean
123+
124+
/**
125+
* Removes this node physically from this linked list. The node should be
126+
* logically removed (so [removed] returns `true`) at the point of invocation.
127+
*/
128+
fun remove() {
129+
assert { removed } // The node should be logically removed at first.
130+
assert { nextOrClosed.node !== null } // The physical tail cannot be removed.
131+
while (true) {
132+
// Read `next` and `prev` pointers ignoring logically removed nodes.
133+
val prev = leftmostAliveNode
134+
val next = rightmostAliveNode
135+
// Link `next` and `prev`.
136+
next._prev.value = prev
137+
if (prev !== null) prev._next.value = next
138+
// Check that prev and next are still alive.
139+
if (next.removed) continue
140+
if (prev !== null && prev.removed) continue
141+
// This node is removed.
142+
return
143+
}
144+
}
145+
146+
private val leftmostAliveNode: N? get() {
147+
var cur = prev
148+
while (cur !== null && cur.removed)
149+
cur = cur._prev.value
150+
return cur
151+
}
152+
153+
private val rightmostAliveNode: N get() {
154+
assert { !isTail } // Should not be invoked on the tail node
155+
var cur = nextOrClosed.node!!
156+
while (cur.removed)
157+
cur = cur.nextOrClosed.node!!
158+
return cur
159+
}
160+
}
161+
162+
/**
163+
* Each segment in the list has a unique id and is created by the provided to [findSegmentAndMoveForward] method.
164+
* Essentially, this is a node in the Michael-Scott queue algorithm,
165+
* but with maintaining [prev] pointer for efficient [remove] implementation.
166+
*/
167+
internal abstract class Segment<S : Segment<S>>(val id: Long, prev: S?, pointers: Int): ConcurrentLinkedListNode<S>(prev) {
168+
/**
169+
* This property should return the maximal number of slots in this segment,
170+
* it is used to define whether the segment is logically removed.
171+
*/
172+
abstract val maxSlots: Int
173+
174+
/**
175+
* Numbers of cleaned slots (lowest bits) and AtomicRef pointers to this segment (highest bits)
176+
*/
177+
private val cleanedAndPointers = atomic(pointers shl POINTERS_SHIFT)
178+
179+
/**
180+
* The segment is considered as removed if all the slots are cleaned,
181+
* there is no pointers to this segment from outside, and
182+
* it is not a physical tail in the linked list of segments.
183+
*/
184+
override val removed get() = cleanedAndPointers.value == maxSlots && !isTail
185+
186+
// increments the number of pointers if this segment is not logically removed.
187+
fun tryIncPointers() = cleanedAndPointers.addConditionally(1 shl POINTERS_SHIFT) { it != maxSlots || isTail }
188+
189+
// returns `true` if this segment is logically removed after the decrement.
190+
fun decPointers() = cleanedAndPointers.addAndGet(-(1 shl POINTERS_SHIFT)) == maxSlots && !isTail
191+
192+
/**
193+
* This functions should be invoked on each slot clean-up;
194+
* should not be invoked twice for the same slot.
195+
*/
196+
fun onSlotCleaned() {
197+
if (cleanedAndPointers.incrementAndGet() == maxSlots && !isTail) remove()
198+
}
199+
}
200+
201+
private inline fun AtomicInt.addConditionally(delta: Int, condition: (cur: Int) -> Boolean): Boolean {
202+
while (true) {
203+
val cur = this.value
204+
if (!condition(cur)) return false
205+
if (this.compareAndSet(cur, cur + delta)) return true
206+
}
207+
}
208+
209+
internal inline class SegmentOrClosed<S : Segment<S>>(private val value: Any?) {
210+
val isClosed: Boolean get() = value === CLOSED
211+
val segment: S get() = if (value === CLOSED) error("Does not contain segment") else value as S
212+
}
213+
214+
internal inline class NextNodeOrClosed<N : ConcurrentLinkedListNode<N>>(private val value: Any?) {
215+
val isClosed: Boolean get() = value === CLOSED
216+
val node: N? get() = if (isClosed) null else value as N?
217+
}
218+
219+
private const val POINTERS_SHIFT = 16
220+
221+
@SharedImmutable
222+
private val CLOSED = Symbol("CLOSED")

0 commit comments

Comments
 (0)