1
+ /*
2
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3
+ */
4
+
5
+ package kotlinx.coroutines.internal
6
+
7
+ import kotlinx.atomicfu.*
8
+ import kotlinx.coroutines.*
9
+ import kotlin.native.concurrent.*
10
+
11
+ // Returns the first segment `s` with `s.id >= id` or `CLOSED`
12
+ // if all the segments in this linked list have lower `id` and the list is closed for further segment additions.
13
+ private inline fun <S : Segment <S >> S.findSegmentInternal (id : Long , createNewSegment : (id: Long , prev: S ? ) -> S ): SegmentOrClosed <S > {
14
+ // Go through `next` references and add new segments if needed,
15
+ // similarly to the `push` in the Michael-Scott queue algorithm.
16
+ // The only difference is that `CAS failure` means that the
17
+ // required segment has already been added, so the algorithm just
18
+ // uses it. This way, only one segment with each id can be added.
19
+ var cur: S = this
20
+ while (cur.id < id || cur.removed) {
21
+ val nextOrClosed = cur.nextOrClosed
22
+ if (nextOrClosed.isClosed) return SegmentOrClosed (CLOSED )
23
+ val next: S = if (nextOrClosed.node == = null ) {
24
+ val newTail = createNewSegment(cur.id + 1 , cur)
25
+ if (cur.trySetNext(newTail)) {
26
+ if (cur.removed) cur.remove()
27
+ newTail
28
+ } else {
29
+ val nextOrClosed = cur.nextOrClosed
30
+ if (nextOrClosed.isClosed) return SegmentOrClosed (CLOSED )
31
+ nextOrClosed.node!!
32
+ }
33
+ } else nextOrClosed.node!!
34
+ cur = next
35
+ }
36
+ return SegmentOrClosed (cur)
37
+ }
38
+
39
+ // Returns `false` if the segment `to` is logically removed, `true` on successful update.
40
+ private inline fun <S : Segment <S >> AtomicRef<S>.moveForward (to : S ): Boolean = loop { cur ->
41
+ if (cur.id >= to.id) return true
42
+ if (! to.tryIncPointers()) return false
43
+ if (compareAndSet(cur, to)) {
44
+ // the segment is moved
45
+ if (cur.decPointers()) cur.remove()
46
+ return true
47
+ } else {
48
+ if (to.decPointers()) to.remove()
49
+ }
50
+ }
51
+
52
+ /* *
53
+ * Tries to find a segment with the specified [id] following by next references from the
54
+ * [startFrom] segment and creating new ones if needed. The typical use-case is reading this `AtomicRef` values,
55
+ * doing some synchronization, and invoking this function to find the required segment and update the pointer.
56
+ * At the same time, [Segment.cleanPrev] should also be invoked if the previous segments are no longer needed
57
+ * (e.g., queues should use it in dequeue operations).
58
+ *
59
+ * Since segments can be removed from the list, or it can be closed for further segment additions, this function
60
+ * returns the segment `s` with `s.id >= id` or `CLOSED` if all the segments in this linked list have lower `id`
61
+ * and the list is closed.
62
+ */
63
+ internal inline fun <S : Segment <S >> AtomicRef<S>.findSegmentAndMoveForward (id : Long , startFrom : S , createNewSegment : (id: Long , prev: S ? ) -> S ): SegmentOrClosed <S > {
64
+ while (true ) {
65
+ val s = startFrom.findSegmentInternal(id, createNewSegment)
66
+ if (s.isClosed || moveForward(s.segment)) return s
67
+ }
68
+ }
69
+
70
+ /* *
71
+ * Closes this linked list of nodes by forbidding adding new ones,
72
+ * returns the last node in the list.
73
+ */
74
+ internal fun <N : ConcurrentLinkedListNode <N >> N.close (): N {
75
+ var cur: N = this
76
+ while (true ) {
77
+ val next = cur.nextOrClosed.run { if (isClosed) return cur else node }
78
+ if (next == = null ) {
79
+ if (cur.markAsClosed()) return cur
80
+ } else {
81
+ cur = next
82
+ }
83
+ }
84
+ }
85
+
86
+ internal abstract class ConcurrentLinkedListNode <N : ConcurrentLinkedListNode <N >>(prev : N ? ) {
87
+ // Pointer to the next node, updates similarly to the Michael-Scott queue algorithm.
88
+ private val _next = atomic<Any ?>(null )
89
+ val nextOrClosed: NextNodeOrClosed <N > get() = NextNodeOrClosed (_next .value)
90
+ fun trySetNext (value : N ): Boolean = _next .compareAndSet(null , value)
91
+
92
+ /* *
93
+ * Checks whether this node is the physical tail of the current linked list.
94
+ */
95
+ val isTail: Boolean get() = _next .value.let { it == = null || it == = CLOSED }
96
+
97
+ // Pointer to the previous node, updates in [remove] function.
98
+ private val _prev = atomic(prev)
99
+ val prev: N ? get() = _prev .value
100
+
101
+ /* *
102
+ * Cleans the pointer to the previous node.
103
+ */
104
+ fun cleanPrev () { _prev .lazySet(null ) }
105
+
106
+ /* *
107
+ * Tries to mark the linked list as closed by forbidding adding new nodes after this one.
108
+ */
109
+ fun markAsClosed () = _next .compareAndSet(null , CLOSED )
110
+
111
+ /* *
112
+ * Checks whether this node is a physical tail and is closed for further node additions.
113
+ */
114
+ val isClosed get() = _next .value == = CLOSED
115
+
116
+ /* *
117
+ * This property indicates whether the current node is logically removed.
118
+ * The expected use-case is removing the node logically (so that [removed] becomes true),
119
+ * and invoking [remove] after that. Note that this implementation relies on the contract
120
+ * that the physical tail cannot be logically removed. Please, do not break this contract;
121
+ * otherwise, memory leaks and unexpected behavior can occur.
122
+ */
123
+ abstract val removed: Boolean
124
+
125
+ /* *
126
+ * Removes this node physically from this linked list. The node should be
127
+ * logically removed (so [removed] returns `true`) at the point of invocation.
128
+ */
129
+ fun remove () {
130
+ assert { removed } // The node should be logically removed at first.
131
+ assert { nextOrClosed.node != = null } // The physical tail cannot be removed.
132
+ while (true ) {
133
+ // Read `next` and `prev` pointers ignoring logically removed nodes.
134
+ val prev = leftmostAliveNode
135
+ val next = rightmostAliveNode
136
+ // Link `next` and `prev`.
137
+ next._prev .value = prev
138
+ if (prev != = null ) prev._next .value = next
139
+ // Check that prev and next are still alive.
140
+ if (next.removed) continue
141
+ if (prev != = null && prev.removed) continue
142
+ // This node is removed.
143
+ return
144
+ }
145
+ }
146
+
147
+ private val leftmostAliveNode: N ? get() {
148
+ var cur = prev
149
+ while (cur != = null && cur.removed)
150
+ cur = cur._prev .value
151
+ return cur
152
+ }
153
+
154
+ private val rightmostAliveNode: N get() {
155
+ assert { ! isTail } // Should not be invoked on the tail node
156
+ var cur = nextOrClosed.node!!
157
+ while (cur.removed)
158
+ cur = cur.nextOrClosed.node!!
159
+ return cur
160
+ }
161
+ }
162
+
163
+ /* *
164
+ * Each segment in the list has a unique id and is created by the provided to [findSegmentAndMoveForward] method.
165
+ * Essentially, this is a node in the Michael-Scott queue algorithm,
166
+ * but with maintaining [prev] pointer for efficient [remove] implementation.
167
+ */
168
+ internal abstract class Segment <S : Segment <S >>(val id : Long , prev : S ? , pointers : Int ): ConcurrentLinkedListNode<S>(prev) {
169
+ /* *
170
+ * This property should return the maximal number of slots in this segment,
171
+ * it is used to define whether the segment is logically removed.
172
+ */
173
+ abstract val maxSlots: Int
174
+
175
+ /* *
176
+ * Numbers of cleaned slots (lowest bits) and AtomicRef pointers to this segment (highest bits)
177
+ */
178
+ private val cleanedAndPointers = atomic(pointers shl POINTERS_SHIFT )
179
+
180
+ /* *
181
+ * The segment is considered as removed if all the slots are cleaned,
182
+ * there is no pointers to this segment from outside, and
183
+ * it is not a physical tail in the linked list of segments.
184
+ */
185
+ override val removed get() = cleanedAndPointers.value == maxSlots && ! isTail
186
+
187
+ // increments the number of pointers if this segment is not logically removed.
188
+ fun tryIncPointers () = cleanedAndPointers.addConditionally(1 shl POINTERS_SHIFT ) { it != maxSlots || isTail }
189
+
190
+ // returns `true` if this segment is logically removed after the decrement.
191
+ fun decPointers () = cleanedAndPointers.addAndGet(- (1 shl POINTERS_SHIFT )) == maxSlots && ! isTail
192
+
193
+ /* *
194
+ * This functions should be invoked on each slot clean-up;
195
+ * should not be invoked twice for the same slot.
196
+ */
197
+ fun onSlotCleaned () {
198
+ if (cleanedAndPointers.incrementAndGet() == maxSlots && ! isTail) remove()
199
+ }
200
+ }
201
+
202
+ private inline fun AtomicInt.addConditionally (delta : Int , condition : (cur: Int ) -> Boolean ): Boolean {
203
+ while (true ) {
204
+ val cur = this .value
205
+ if (! condition(cur)) return false
206
+ if (this .compareAndSet(cur, cur + delta)) return true
207
+ }
208
+ }
209
+
210
+ internal inline class SegmentOrClosed <S : Segment <S >>(private val value : Any? ) {
211
+ val isClosed: Boolean get() = value == = CLOSED
212
+ val segment: S get() = if (value == = CLOSED ) error(" Does not contain segment" ) else value as S
213
+ }
214
+
215
+ internal inline class NextNodeOrClosed <N : ConcurrentLinkedListNode <N >>(private val value : Any? ) {
216
+ val isClosed: Boolean get() = value == = CLOSED
217
+ val node: N ? get() = if (isClosed) null else value as N ?
218
+ }
219
+
220
+ private const val POINTERS_SHIFT = 16
221
+
222
+ @SharedImmutable
223
+ private val CLOSED = Symbol (" CLOSED" )
0 commit comments