Skip to content

Commit 50fd769

Browse files
committed
Make the list of segments more abstract, so that it can be used for stacks and channels
1 parent 3dbe82b commit 50fd769

File tree

6 files changed

+308
-234
lines changed

6 files changed

+308
-234
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
/*
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.internal
6+
7+
import kotlinx.atomicfu.*
8+
import kotlinx.coroutines.*
9+
10+
// returns the first segment `s` with `s.id >= id` or `CLOSED` if all the segments in this linked list have lower `id`
11+
//// and the list is closed for further segment additions.
12+
private inline fun <S: Segment<S>> S.findSegmentInternal(id: Long, createNewSegment: (id: Long, prev: S?) -> S): SegmentOrClosed<S> {
13+
// Go through `next` references and add new segments if needed,
14+
// similarly to the `push` in the Michael-Scott queue algorithm.
15+
// The only difference is that `CAS failure` means that the
16+
// required segment has already been added, so the algorithm just
17+
// uses it. This way, only one segment with each id can be in the queue.
18+
var cur: S = this
19+
while (cur.id < id || cur.removed) {
20+
val nextOrClosed = cur.nextOrClosed
21+
if (nextOrClosed.isClosed) return SegmentOrClosed(CLOSED)
22+
val next: S = if (nextOrClosed.segment === null) {
23+
val newTail = createNewSegment(cur.id + 1, cur)
24+
if (cur.trySetNext(newTail)) {
25+
if (cur.removed) cur.remove()
26+
newTail
27+
} else {
28+
val nextOrClosed = cur.nextOrClosed
29+
if (nextOrClosed.isClosed) return SegmentOrClosed(CLOSED)
30+
nextOrClosed.segment!!
31+
}
32+
} else nextOrClosed.segment!!
33+
cur = next
34+
}
35+
return SegmentOrClosed(cur)
36+
}
37+
38+
// Returns `false` if the segment `to` is logically removed, `true` on successful update.
39+
private inline fun <S : Segment<S>> AtomicRef<S>.moveForward(to: S): Boolean = loop { cur ->
40+
if (cur.id >= to.id) return true
41+
if (!to.tryIncPointers()) return false
42+
if (compareAndSet(cur, to)) {
43+
// the segment is moved
44+
if (cur.decPointers()) cur.remove()
45+
return true
46+
} else {
47+
if (to.decPointers()) to.remove()
48+
}
49+
}
50+
51+
/**
52+
* Tries to find a segment with the specified [id] following by next references from the
53+
* [startFrom] segment and creating new ones if needed. The typical use-case is reading this `AtomicRef` values,
54+
* doing some synchronization, and invoking this function to find the required segment and update the pointer.
55+
* At the same time, [Segment.cleanPrev] should also be invoked if the previous segments are no longer needed
56+
* (e.g., queues should use it in dequeue operations).
57+
*
58+
* Since segments can be removed from the list, or it can be closed for further segment additions, this function
59+
* returns the segment `s` with `s.id >= id` or `CLOSED` if all the segments in this linked list have lower `id`
60+
* and the list is closed.
61+
*/
62+
internal inline fun <S: Segment<S>> AtomicRef<S>.findSegmentAndMoveForward(id: Long, startFrom: S, createNewSegment: (id: Long, prev: S?) -> S): SegmentOrClosed<S> {
63+
while (true) {
64+
val s = startFrom.findSegmentInternal(id, createNewSegment)
65+
if (s.isClosed || moveForward(s.segment)) return s
66+
}
67+
}
68+
69+
/**
70+
* Closes this linked list of segments by forbidding adding new segments,
71+
* returns the last segment in the list.
72+
*/
73+
internal fun <S : Segment<S>> S.close(): S {
74+
var cur: S = this
75+
while (true) {
76+
val next = cur.nextOrClosed.run { if (isClosed) return cur else segment }
77+
if (next === null) {
78+
if (cur.markAsClosed()) return cur
79+
} else {
80+
cur = next
81+
}
82+
}
83+
}
84+
85+
/**
86+
* Each segment in [SegmentList] has a unique id and is created by [SegmentList.newSegment].
87+
* Essentially, this is a node in the Michael-Scott queue algorithm, but with
88+
* maintaining [prev] pointer for efficient [remove] implementation.
89+
*/
90+
internal abstract class Segment<S: Segment<S>>(val id: Long, prev: S?, pointers: Int) {
91+
// Pointer to the next segment, updates similarly to the Michael-Scott queue algorithm.
92+
private val _next = atomic<Any?>(null)
93+
val nextOrClosed: NextSegmentOrClosed<S> get() = NextSegmentOrClosed(_next.value)
94+
fun trySetNext(value: S): Boolean = _next.compareAndSet(null, value)
95+
96+
// Pointer to the previous segment, updates in [remove] function.
97+
val prev = atomic(prev)
98+
/**
99+
* Cleans the pointer to the previous segment.
100+
*/
101+
fun cleanPrev() = prev.lazySet(null)
102+
103+
/**
104+
* This property should return the maximal number of slots in this segment,
105+
* it is used to define whether the segment is logically removed.
106+
*/
107+
abstract val maxSlots: Int
108+
109+
// numbers of cleaned slots (lowest bits) and AtomicRef pointers to this segment (highest bits)
110+
private val cleanedAndPointers = atomic(pointers shl POINTERS_SHIFT)
111+
112+
/**
113+
* Returns `true` if this segment is logically removed from the queue.
114+
* The segment is considered as removed if all the slots are cleaned,
115+
* there is no pointers to this segment from outside, and
116+
* it is not a physical tail in the linked list of segments.
117+
*/
118+
inline val removed get() = cleanedAndPointers.value == maxSlots && _next.value !== null
119+
120+
// increments the number of pointers if this segment is not logically removed\
121+
inline fun tryIncPointers() = cleanedAndPointers.addConditionally(1 shl POINTERS_SHIFT) { it != maxSlots }
122+
// returns `true` if this segment is logically removed after the decrement
123+
inline fun decPointers() = cleanedAndPointers.addAndGet(-(1 shl POINTERS_SHIFT)) == maxSlots
124+
125+
/**
126+
* This functions should be invoked on each slot clean-up;
127+
* should not be invoked twice for the same slot.
128+
*/
129+
fun onSlotCleaned() {
130+
if (cleanedAndPointers.incrementAndGet() == maxSlots) remove()
131+
}
132+
133+
/**
134+
* Tries to mark the linked list as closed by forbidding adding new segments after this one.
135+
*/
136+
fun markAsClosed() = _next.compareAndSet(null, CLOSED)
137+
138+
/**
139+
* Checks whether this segment is a physical tail and is closed for further segment additions.
140+
*/
141+
val isClosed = _next.value === CLOSED
142+
143+
/**
144+
* Removes this segment physically from the segment queue. The segment should be
145+
* logically removed (so [removed] returns `true`) at the point of invocation.
146+
*/
147+
fun remove() {
148+
assert { removed } // The segment should be logically removed at first
149+
// Read `next` and `prev` pointers.
150+
var next = this.nextOrClosed.segment ?: return // tail cannot be removed
151+
var prev = prev.value ?: return // head cannot be removed
152+
// Link `next` and `prev`.
153+
prev.moveNextToRight(next)
154+
while (prev.removed) {
155+
prev = prev.prev.value ?: break
156+
prev.moveNextToRight(next)
157+
}
158+
next.movePrevToLeft(prev)
159+
while (next.removed) {
160+
next = next.nextOrClosed.segment ?: break
161+
next.movePrevToLeft(prev)
162+
}
163+
}
164+
165+
/**
166+
* Updates [next] pointer to the specified segment if
167+
* the [id] of the specified segment is greater.
168+
*/
169+
private fun moveNextToRight(next: S) {
170+
while (true) {
171+
val curNext = this._next.value as S
172+
if (next.id <= curNext.id) return
173+
if (this._next.compareAndSet(curNext, next)) return
174+
}
175+
}
176+
177+
/**
178+
* Updates [prev] pointer to the specified segment if
179+
* the [id] of the specified segment is lower.
180+
*/
181+
private fun movePrevToLeft(prev: S) {
182+
while (true) {
183+
val curPrev = this.prev.value ?: return
184+
if (curPrev.id <= prev.id) return
185+
if (this.prev.compareAndSet(curPrev, prev)) return
186+
}
187+
}
188+
}
189+
190+
private inline fun AtomicInt.addConditionally(delta: Int, condition: (cur: Int) -> Boolean): Boolean {
191+
while(true) {
192+
val cur = this.value
193+
if (!condition(cur)) return false
194+
if (this.compareAndSet(cur, cur + delta)) return true
195+
}
196+
}
197+
198+
internal inline class SegmentOrClosed<S: Segment<S>>(private val value: Any?) {
199+
val isClosed: Boolean get() = value === CLOSED
200+
val segment: S get() = if (value === CLOSED) error("Does not contain segment") else value as S
201+
}
202+
203+
internal inline class NextSegmentOrClosed<S: Segment<S>>(private val value: Any?) {
204+
val isClosed: Boolean get() = value === CLOSED
205+
val segment: S? get() = if (isClosed) null else value as S?
206+
}
207+
208+
private const val POINTERS_SHIFT = 16
209+
210+
@SharedImmutable
211+
private val CLOSED = Symbol("CLOSED")

kotlinx-coroutines-core/common/src/internal/SegmentQueue.kt

-179
This file was deleted.

0 commit comments

Comments
 (0)