4
4
5
5
package kotlinx.coroutines.experimental.internal
6
6
7
+ import kotlinx.coroutines.experimental.*
7
8
import java.util.*
8
9
9
10
/* *
@@ -26,42 +27,50 @@ public class ThreadSafeHeap<T> : SynchronizedObject() where T: ThreadSafeHeapNod
26
27
27
28
public val isEmpty: Boolean get() = size == 0
28
29
29
- public fun clear () = synchronized(this ) {
30
+ @Synchronized
31
+ public fun clear () {
30
32
Arrays .fill(a, 0 , size, null )
31
33
size = 0
32
34
}
33
35
34
- public fun peek (): T ? = synchronized(this ) { firstImpl() }
36
+ @Synchronized
37
+ public fun peek (): T ? = firstImpl()
35
38
36
- public fun removeFirstOrNull (): T ? = synchronized(this ) {
37
- if (size > 0 ) {
39
+ @Synchronized
40
+ public fun removeFirstOrNull (): T ? {
41
+ return if (size > 0 ) {
38
42
removeAtImpl(0 )
39
- } else
43
+ } else {
40
44
null
45
+ }
41
46
}
42
47
43
- public inline fun removeFirstIf (predicate : (T ) -> Boolean ): T ? = synchronized(this ) {
44
- val first = firstImpl() ? : return @synchronized null
45
- if (predicate(first)) {
48
+ @Synchronized
49
+ public inline fun removeFirstIf (predicate : (T ) -> Boolean ): T ? {
50
+ val first = firstImpl() ? : return null
51
+ return if (predicate(first)) {
46
52
removeAtImpl(0 )
47
- } else
53
+ } else {
48
54
null
55
+ }
49
56
}
50
57
51
- public fun addLast (node : T ) = synchronized(this ) {
52
- addImpl(node)
53
- }
58
+ @Synchronized
59
+ public fun addLast (node : T ) = addImpl(node)
54
60
55
- public fun addLastIf (node : T , cond : () -> Boolean ): Boolean = synchronized(this ) {
56
- if (cond()) {
61
+ @Synchronized
62
+ public fun addLastIf (node : T , cond : () -> Boolean ): Boolean {
63
+ return if (cond()) {
57
64
addImpl(node)
58
65
true
59
- } else
66
+ } else {
60
67
false
68
+ }
61
69
}
62
70
63
- public fun remove (node : T ): Boolean = synchronized(this ) {
64
- if (node.index < 0 ) {
71
+ @Synchronized
72
+ public fun remove (node : T ): Boolean {
73
+ return if (node.index < 0 ) {
65
74
false
66
75
} else {
67
76
removeAtImpl(node.index)
@@ -95,6 +104,11 @@ public class ThreadSafeHeap<T> : SynchronizedObject() where T: ThreadSafeHeapNod
95
104
96
105
@PublishedApi
97
106
internal fun addImpl (node : T ) {
107
+ // TODO remove this after #541 when ThreadSafeHeapNode is gone
108
+ if (node is EventLoopBase .DelayedTask && node.state == REMOVED ) {
109
+ return
110
+ }
111
+
98
112
val a = realloc()
99
113
val i = size++
100
114
a[i] = node
@@ -140,4 +154,4 @@ public class ThreadSafeHeap<T> : SynchronizedObject() where T: ThreadSafeHeapNod
140
154
ni.index = i
141
155
nj.index = j
142
156
}
143
- }
157
+ }
0 commit comments