-
Notifications
You must be signed in to change notification settings - Fork 1.9k
/
Copy pathThreadSafeHeap.kt
157 lines (136 loc) · 3.9 KB
/
ThreadSafeHeap.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.internal
import kotlinx.atomicfu.*
import kotlinx.coroutines.*
/**
* @suppress **This an internal API and should not be used from general code.**
*/
@InternalCoroutinesApi
public interface ThreadSafeHeapNode {
public var heap: ThreadSafeHeap<*>?
public var index: Int
}
/**
* Synchronized binary heap.
* @suppress **This an internal API and should not be used from general code.**
*/
@InternalCoroutinesApi
public open class ThreadSafeHeap<T> : SynchronizedObject() where T: ThreadSafeHeapNode, T: Comparable<T> {
private var a: Array<T?>? = null
private val _size = atomic(0)
public var size: Int
get() = _size.value
private set(value) { _size.value = value }
public val isEmpty: Boolean get() = size == 0
public fun clear(): Unit = synchronized(this) {
a?.fill(null)
_size.value = 0
}
public fun peek(): T? = synchronized(this) { firstImpl() }
public fun removeFirstOrNull(): T? = synchronized(this) {
if (size > 0) {
removeAtImpl(0)
} else {
null
}
}
public inline fun removeFirstIf(predicate: (T) -> Boolean): T? = synchronized(this) {
val first = firstImpl() ?: return null
if (predicate(first)) {
removeAtImpl(0)
} else {
null
}
}
public fun addLast(node: T): Unit = synchronized(this) { addImpl(node) }
// Condition also receives current first node in the heap
public inline fun addLastIf(node: T, cond: (T?) -> Boolean): Boolean = synchronized(this) {
if (cond(firstImpl())) {
addImpl(node)
true
} else {
false
}
}
public fun remove(node: T): Boolean = synchronized(this) {
return if (node.heap == null) {
false
} else {
val index = node.index
assert { index >= 0 }
removeAtImpl(index)
true
}
}
@PublishedApi
internal fun firstImpl(): T? = a?.get(0)
@PublishedApi
internal fun removeAtImpl(index: Int): T {
assert { size > 0 }
val a = this.a!!
size--
if (index < size) {
swap(index, size)
val j = (index - 1) / 2
if (index > 0 && a[index]!! < a[j]!!) {
swap(index, j)
siftUpFrom(j)
} else {
siftDownFrom(index)
}
}
val result = a[size]!!
assert { result.heap === this }
result.heap = null
result.index = -1
a[size] = null
return result
}
@PublishedApi
internal fun addImpl(node: T) {
assert { node.heap == null }
node.heap = this
val a = realloc()
val i = size++
a[i] = node
node.index = i
siftUpFrom(i)
}
private tailrec fun siftUpFrom(i: Int) {
if (i <= 0) return
val a = a!!
val j = (i - 1) / 2
if (a[j]!! <= a[i]!!) return
swap(i, j)
siftUpFrom(j)
}
private tailrec fun siftDownFrom(i: Int) {
var j = 2 * i + 1
if (j >= size) return
val a = a!!
if (j + 1 < size && a[j + 1]!! < a[j]!!) j++
if (a[i]!! <= a[j]!!) return
swap(i, j)
siftDownFrom(j)
}
@Suppress("UNCHECKED_CAST")
private fun realloc(): Array<T?> {
val a = this.a
return when {
a == null -> (arrayOfNulls<ThreadSafeHeapNode>(4) as Array<T?>).also { this.a = it }
size >= a.size -> a.copyOf(size * 2).also { this.a = it }
else -> a
}
}
private fun swap(i: Int, j: Int) {
val a = a!!
val ni = a[j]!!
val nj = a[i]!!
a[i] = ni
a[j] = nj
ni.index = i
nj.index = j
}
}