Skip to content

Commit 3b05b9c

Browse files
committed
Make the list of segments more abstract, so that it can be used for other synchronization and communication primitives
1 parent 9ceec6d commit 3b05b9c

File tree

7 files changed

+376
-240
lines changed

7 files changed

+376
-240
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
/*
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.internal
6+
7+
import kotlinx.atomicfu.*
8+
import kotlinx.coroutines.*
9+
10+
// returns the first segment `s` with `s.id >= id` or `CLOSED`
11+
// if all the segments in this linked list have lower `id` and the list is closed for further segment additions.
12+
private inline fun <S : Segment<S>> S.findSegmentInternal(id: Long, createNewSegment: (id: Long, prev: S?) -> S): SegmentOrClosed<S> {
13+
// Go through `next` references and add new segments if needed,
14+
// similarly to the `push` in the Michael-Scott queue algorithm.
15+
// The only difference is that `CAS failure` means that the
16+
// required segment has already been added, so the algorithm just
17+
// uses it. This way, only one segment with each id can be added.
18+
var cur: S = this
19+
while (cur.id < id || cur.removed) {
20+
val nextOrClosed = cur.nextOrClosed
21+
if (nextOrClosed.isClosed) return SegmentOrClosed(CLOSED)
22+
val next: S = if (nextOrClosed.segment === null) {
23+
val newTail = createNewSegment(cur.id + 1, cur)
24+
if (cur.trySetNext(newTail)) {
25+
if (cur.removed) cur.remove()
26+
newTail
27+
} else {
28+
val nextOrClosed = cur.nextOrClosed
29+
if (nextOrClosed.isClosed) return SegmentOrClosed(CLOSED)
30+
nextOrClosed.segment!!
31+
}
32+
} else nextOrClosed.segment!!
33+
cur = next
34+
}
35+
return SegmentOrClosed(cur)
36+
}
37+
38+
// Returns `false` if the segment `to` is logically removed, `true` on successful update.
39+
private inline fun <S : Segment<S>> AtomicRef<S>.moveForward(to: S): Boolean = loop { cur ->
40+
if (cur.id >= to.id) return true
41+
if (!to.tryIncPointers()) return false
42+
if (compareAndSet(cur, to)) {
43+
// the segment is moved
44+
if (cur.decPointers()) cur.remove()
45+
return true
46+
} else {
47+
if (to.decPointers()) to.remove()
48+
}
49+
}
50+
51+
/**
52+
* Tries to find a segment with the specified [id] following by next references from the
53+
* [startFrom] segment and creating new ones if needed. The typical use-case is reading this `AtomicRef` values,
54+
* doing some synchronization, and invoking this function to find the required segment and update the pointer.
55+
* At the same time, [Segment.cleanPrev] should also be invoked if the previous segments are no longer needed
56+
* (e.g., queues should use it in dequeue operations).
57+
*
58+
* Since segments can be removed from the list, or it can be closed for further segment additions, this function
59+
* returns the segment `s` with `s.id >= id` or `CLOSED` if all the segments in this linked list have lower `id`
60+
* and the list is closed.
61+
*/
62+
internal inline fun <S : Segment<S>> AtomicRef<S>.findSegmentAndMoveForward(id: Long, startFrom: S, createNewSegment: (id: Long, prev: S?) -> S): SegmentOrClosed<S> {
63+
while (true) {
64+
val s = startFrom.findSegmentInternal(id, createNewSegment)
65+
if (s.isClosed || moveForward(s.segment)) return s
66+
}
67+
}
68+
69+
/**
70+
* Closes this linked list of segments by forbidding adding new segments,
71+
* returns the last segment in the list.
72+
*/
73+
internal fun <S : Segment<S>> S.close(): S {
74+
var cur: S = this
75+
while (true) {
76+
val next = cur.nextOrClosed.run { if (isClosed) return cur else segment }
77+
if (next === null) {
78+
if (cur.markAsClosed()) return cur
79+
} else {
80+
cur = next
81+
}
82+
}
83+
}
84+
85+
/**
86+
* Each segment in [SegmentList] has a unique id and is created by [SegmentList.newSegment].
87+
* Essentially, this is a node in the Michael-Scott queue algorithm, but with
88+
* maintaining [prev] pointer for efficient [remove] implementation.
89+
*/
90+
internal abstract class Segment<S : Segment<S>>(val id: Long, prev: S?, pointers: Int) {
91+
// Pointer to the next segment, updates similarly to the Michael-Scott queue algorithm.
92+
private val _next = atomic<Any?>(null)
93+
val nextOrClosed: NextSegmentOrClosed<S> get() = NextSegmentOrClosed(_next.value)
94+
fun trySetNext(value: S): Boolean = _next.compareAndSet(null, value)
95+
96+
// Pointer to the previous segment, updates in [remove] function.
97+
private val _prev = atomic(prev)
98+
val prev: S? get() = _prev.value
99+
100+
/**
101+
* Cleans the pointer to the previous segment.
102+
*/
103+
fun cleanPrev() { _prev.lazySet(null) }
104+
105+
/**
106+
* This property should return the maximal number of slots in this segment,
107+
* it is used to define whether the segment is logically removed.
108+
*/
109+
abstract val maxSlots: Int
110+
111+
// numbers of cleaned slots (lowest bits) and AtomicRef pointers to this segment (highest bits)
112+
private val cleanedAndPointers = atomic(pointers shl POINTERS_SHIFT)
113+
114+
/**
115+
* Returns `true` if this segment is logically removed from the queue.
116+
* The segment is considered as removed if all the slots are cleaned,
117+
* there is no pointers to this segment from outside, and
118+
* it is not a physical tail in the linked list of segments.
119+
*/
120+
val removed get() = cleanedAndPointers.value == maxSlots && _next.value !== null
121+
122+
// increments the number of pointers if this segment is not logically removed
123+
fun tryIncPointers() = cleanedAndPointers.addConditionally(1 shl POINTERS_SHIFT) { it != maxSlots || _next.value == null }
124+
125+
// returns `true` if this segment is logically removed after the decrement
126+
fun decPointers() = cleanedAndPointers.addAndGet(-(1 shl POINTERS_SHIFT)) == maxSlots && _next.value !== null
127+
128+
/**
129+
* This functions should be invoked on each slot clean-up;
130+
* should not be invoked twice for the same slot.
131+
*/
132+
fun onSlotCleaned() {
133+
if (cleanedAndPointers.incrementAndGet() == maxSlots && _next.value !== null) remove()
134+
}
135+
136+
/**
137+
* Tries to mark the linked list as closed by forbidding adding new segments after this one.
138+
*/
139+
fun markAsClosed() = _next.compareAndSet(null, CLOSED)
140+
141+
/**
142+
* Checks whether this segment is a physical tail and is closed for further segment additions.
143+
*/
144+
val isClosed get() = _next.value === CLOSED
145+
146+
/**
147+
* Removes this segment physically from the segment queue. The segment should be
148+
* logically removed (so [removed] returns `true`) at the point of invocation.
149+
*/
150+
fun remove() {
151+
assert { removed } // The segment should be logically removed at first
152+
// Read `next` and `prev` pointers.
153+
var next = this.nextOrClosed.segment!! // cannot be invoked on the last segment
154+
var prev = _prev.value ?: return // head cannot be removed
155+
// Link `next` and `prev`.
156+
prev.moveNextToRight(next)
157+
while (prev.removed) {
158+
prev = prev._prev.value ?: break
159+
prev.moveNextToRight(next)
160+
}
161+
next.movePrevToLeft(prev)
162+
while (next.removed) {
163+
next = next.nextOrClosed.segment ?: break
164+
next.movePrevToLeft(prev)
165+
}
166+
}
167+
168+
/**
169+
* Updates [next] pointer to the specified segment if
170+
* the [id] of the specified segment is greater.
171+
*/
172+
private fun moveNextToRight(next: S) {
173+
while (true) {
174+
val curNext = this._next.value as S
175+
if (next.id <= curNext.id) return
176+
if (this._next.compareAndSet(curNext, next)) return
177+
}
178+
}
179+
180+
/**
181+
* Updates [prev] pointer to the specified segment if
182+
* the [id] of the specified segment is lower.
183+
*/
184+
private fun movePrevToLeft(prev: S) {
185+
while (true) {
186+
val curPrev = this._prev.value ?: return
187+
if (curPrev.id <= prev.id) return
188+
if (this._prev.compareAndSet(curPrev, prev)) return
189+
}
190+
}
191+
}
192+
193+
private inline fun AtomicInt.addConditionally(delta: Int, condition: (cur: Int) -> Boolean): Boolean {
194+
while (true) {
195+
val cur = this.value
196+
if (!condition(cur)) return false
197+
if (this.compareAndSet(cur, cur + delta)) return true
198+
}
199+
}
200+
201+
internal inline class SegmentOrClosed<S : Segment<S>>(private val value: Any?) {
202+
val isClosed: Boolean get() = value === CLOSED
203+
val segment: S get() = if (value === CLOSED) error("Does not contain segment") else value as S
204+
}
205+
206+
internal inline class NextSegmentOrClosed<S : Segment<S>>(private val value: Any?) {
207+
val isClosed: Boolean get() = value === CLOSED
208+
val segment: S? get() = if (isClosed) null else value as S?
209+
}
210+
211+
private const val POINTERS_SHIFT = 16
212+
213+
@SharedImmutable
214+
private val CLOSED = Symbol("CLOSED")

kotlinx-coroutines-core/common/src/internal/SegmentQueue.kt

-179
This file was deleted.

0 commit comments

Comments
 (0)