-
Notifications
You must be signed in to change notification settings - Fork 1.9k
/
Copy pathSegmentBasedQueue.kt
107 lines (92 loc) · 4.06 KB
/
SegmentBasedQueue.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.internal
import kotlinx.atomicfu.*
/**
* This queue implementation is based on [SegmentList] for testing purposes and is organized as follows. Essentially,
* the [SegmentBasedQueue] is represented as an infinite array of segments, each stores one element (see [OneElementSegment]).
* Both [enqueue] and [dequeue] operations increment the corresponding global index ([enqIdx] for [enqueue] and
* [deqIdx] for [dequeue]) and work with the indexed by this counter cell. Since both operations increment the indices
* at first, there could be a race: [enqueue] increments [enqIdx], then [dequeue] checks that the queue is not empty
* (that's true) and increments [deqIdx], looking into the corresponding cell after that; however, the cell is empty
* because the [enqIdx] operation has not been put its element yet. To make the queue non-blocking, [dequeue] can mark
* the cell with [BROKEN] token and retry the operation, [enqueue] at the same time should restart as well; this way,
* the queue is obstruction-free.
*/
internal class SegmentBasedQueue<T> {
private val head: AtomicRef<OneElementSegment<T>>
private val tail: AtomicRef<OneElementSegment<T>>
private val enqIdx = atomic(0L)
private val deqIdx = atomic(0L)
init {
val s = OneElementSegment<T>(0, null, 2)
head = atomic(s)
tail = atomic(s)
}
// Returns the segments associated with the enqueued element, or `null` if the queue is closed.
fun enqueue(element: T): OneElementSegment<T>? {
while (true) {
val curTail = this.tail.value
val enqIdx = this.enqIdx.getAndIncrement()
val segmentOrClosed = this.tail.findSegmentAndMoveForward(id = enqIdx, startFrom = curTail, createNewSegment = ::createSegment)
if (segmentOrClosed.isClosed) return null
val s = segmentOrClosed.segment
if (s.element.value === BROKEN) continue
if (s.element.compareAndSet(null, element)) return s
}
}
fun dequeue(): T? {
while (true) {
if (this.deqIdx.value >= this.enqIdx.value) return null
val curHead = this.head.value
val deqIdx = this.deqIdx.getAndIncrement()
val segmentOrClosed = this.head.findSegmentAndMoveForward(id = deqIdx, startFrom = curHead, createNewSegment = ::createSegment)
if (segmentOrClosed.isClosed) return null
val s = segmentOrClosed.segment
if (s.id > deqIdx) continue
var el = s.element.value
if (el === null) {
if (s.element.compareAndSet(null, BROKEN)) continue
else el = s.element.value
}
// The link to the previous segment should be cleaned after retrieving the element;
// otherwise, `close()` cannot clean the slot.
s.cleanPrev()
if (el === BROKEN) continue
@Suppress("UNCHECKED_CAST")
return el as T
}
}
// `enqueue` should return `null` after the queue is closed
fun close(): OneElementSegment<T> {
val s = this.tail.value.close()
var cur = s
while (true) {
cur.element.compareAndSet(null, BROKEN)
cur = cur.prev ?: break
}
return s
}
val numberOfSegments: Int get() {
var cur = head.value
var i = 1
while (true) {
cur = cur.next ?: return i
i++
}
}
fun checkHeadPrevIsCleaned() {
check(head.value.prev === null)
}
}
private fun <T> createSegment(id: Long, prev: OneElementSegment<T>?) = OneElementSegment(id, prev, 0)
internal class OneElementSegment<T>(id: Long, prev: OneElementSegment<T>?, pointers: Int) : Segment<OneElementSegment<T>>(id, prev, pointers) {
val element = atomic<Any?>(null)
override val maxSlots get() = 1
fun removeSegment() {
element.value = BROKEN
onSlotCleaned()
}
}
private val BROKEN = Symbol("BROKEN")