Skip to content

Commit 41297e7

Browse files
qwwdfsadelizarov
authored andcommitted
Default delay improvements
* Fixed memory leak when coroutine was cancelled before delay() invocation * synchronized(this) is replaced with @synchronized to reduce TSH footprint
1 parent ffda1da commit 41297e7

File tree

2 files changed

+35
-23
lines changed

2 files changed

+35
-23
lines changed

core/kotlinx-coroutines-core/src/EventLoop.kt

+5-5
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,9 @@ public fun EventLoop(thread: Thread = Thread.currentThread(), parentJob: Job? =
7272
public fun EventLoop_Deprecated(thread: Thread = Thread.currentThread(), parentJob: Job? = null): CoroutineDispatcher =
7373
EventLoop(thread, parentJob) as CoroutineDispatcher
7474

75-
private const val DELAYED = 0
76-
private const val REMOVED = 1
77-
private const val RESCHEDULED = 2
75+
internal const val DELAYED = 0
76+
internal const val REMOVED = 1
77+
internal const val RESCHEDULED = 2
7878

7979
private const val MS_TO_NS = 1_000_000L
8080
private const val MAX_MS = Long.MAX_VALUE / MS_TO_NS
@@ -243,10 +243,10 @@ internal abstract class EventLoopBase: CoroutineDispatcher(), Delay, EventLoop {
243243

244244
internal fun schedule(delayedTask: DelayedTask) {
245245
if (scheduleImpl(delayedTask)) {
246-
// todo: we should unpark only when this delayed task became first in the queue
247246
unpark()
248-
} else
247+
} else {
249248
DefaultExecutor.schedule(delayedTask)
249+
}
250250
}
251251

252252
private fun scheduleImpl(delayedTask: DelayedTask): Boolean {

core/kotlinx-coroutines-core/src/internal/ThreadSafeHeap.kt

+30-18
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
package kotlinx.coroutines.experimental.internal
66

7+
import kotlinx.coroutines.experimental.*
78
import java.util.*
89

910
/**
@@ -26,42 +27,48 @@ public class ThreadSafeHeap<T> : SynchronizedObject() where T: ThreadSafeHeapNod
2627

2728
public val isEmpty: Boolean get() = size == 0
2829

29-
public fun clear() = synchronized(this) {
30+
@Synchronized
31+
public fun clear() {
3032
Arrays.fill(a, 0, size, null)
3133
size = 0
3234
}
3335

34-
public fun peek(): T? = synchronized(this) { firstImpl() }
36+
@Synchronized
37+
public fun peek(): T? = firstImpl()
3538

36-
public fun removeFirstOrNull(): T? = synchronized(this) {
39+
@Synchronized
40+
public fun removeFirstOrNull(): T? =
3741
if (size > 0) {
3842
removeAtImpl(0)
39-
} else
43+
} else {
4044
null
41-
}
45+
}
4246

43-
public inline fun removeFirstIf(predicate: (T) -> Boolean): T? = synchronized(this) {
44-
val first = firstImpl() ?: return@synchronized null
45-
if (predicate(first)) {
47+
@Synchronized
48+
public inline fun removeFirstIf(predicate: (T) -> Boolean): T? {
49+
val first = firstImpl() ?: return null
50+
return if (predicate(first)) {
4651
removeAtImpl(0)
47-
} else
52+
} else {
4853
null
54+
}
4955
}
5056

51-
public fun addLast(node: T) = synchronized(this) {
52-
addImpl(node)
53-
}
57+
@Synchronized
58+
public fun addLast(node: T) = addImpl(node)
5459

55-
public fun addLastIf(node: T, cond: () -> Boolean): Boolean = synchronized(this) {
60+
@Synchronized
61+
public fun addLastIf(node: T, cond: () -> Boolean): Boolean =
5662
if (cond()) {
5763
addImpl(node)
5864
true
59-
} else
65+
} else {
6066
false
61-
}
67+
}
6268

63-
public fun remove(node: T): Boolean = synchronized(this) {
64-
if (node.index < 0) {
69+
@Synchronized
70+
public fun remove(node: T): Boolean {
71+
return if (node.index < 0) {
6572
false
6673
} else {
6774
removeAtImpl(node.index)
@@ -95,6 +102,11 @@ public class ThreadSafeHeap<T> : SynchronizedObject() where T: ThreadSafeHeapNod
95102

96103
@PublishedApi
97104
internal fun addImpl(node: T) {
105+
// TODO remove this after #541 when ThreadSafeHeapNode is gone
106+
if (node is EventLoopBase.DelayedTask && node.state == REMOVED) {
107+
return
108+
}
109+
98110
val a = realloc()
99111
val i = size++
100112
a[i] = node
@@ -140,4 +152,4 @@ public class ThreadSafeHeap<T> : SynchronizedObject() where T: ThreadSafeHeapNod
140152
ni.index = i
141153
nj.index = j
142154
}
143-
}
155+
}

0 commit comments

Comments
 (0)