Skip to content

Commit 08e5b4f

Browse files
committed
Avoid OOM in thread-pool dispatchers, try to reflectively invoke setRemoveFutureOnCancel on executor instance and use default dispatcher if attempt failed
Fixes #571
1 parent 8dceb17 commit 08e5b4f

File tree

3 files changed

+69
-19
lines changed

3 files changed

+69
-19
lines changed

core/kotlinx-coroutines-core/src/Executors.kt

+45-18
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,9 @@
44

55
package kotlinx.coroutines.experimental
66

7+
import kotlinx.coroutines.experimental.internal.*
78
import kotlinx.coroutines.experimental.timeunit.TimeUnit
8-
import java.io.*
9+
import java.io.Closeable
910
import java.util.concurrent.*
1011
import kotlin.coroutines.experimental.*
1112

@@ -63,40 +64,66 @@ public fun ExecutorService.asCoroutineDispatcher_Deprecated(): CloseableCoroutin
6364
public fun Executor.toCoroutineDispatcher(): CoroutineDispatcher =
6465
asCoroutineDispatcher()
6566

66-
private class ExecutorCoroutineDispatcherImpl(override val executor: Executor) : ExecutorCoroutineDispatcherBase()
67+
private class ExecutorCoroutineDispatcherImpl(override val executor: Executor) : ExecutorCoroutineDispatcherBase() {
68+
init {
69+
initFutureCancellation()
70+
}
71+
}
6772

6873
/**
6974
* @suppress **This is unstable API and it is subject to change.**
7075
*/
7176
public abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispatcher(), Delay {
7277

78+
private var removesFutureOnCancellation: Boolean = false
79+
80+
internal fun initFutureCancellation() {
81+
removesFutureOnCancellation = removeFutureOnCancel(executor)
82+
}
83+
7384
override fun dispatch(context: CoroutineContext, block: Runnable) =
7485
try { executor.execute(timeSource.trackTask(block)) }
7586
catch (e: RejectedExecutionException) {
7687
timeSource.unTrackTask()
7788
DefaultExecutor.execute(block)
7889
}
7990

91+
/*
92+
* removesFutureOnCancellation is required to avoid memory leak.
93+
* On Java 7+ we reflectively invoke ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy(true) and we're fine.
94+
* On Java 6 we're scheduling time-based coroutines to our own thread safe heap which supports cancellation.
95+
*/
8096
override fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>) {
81-
val timeout =
82-
try { (executor as? ScheduledExecutorService)
83-
?.schedule(ResumeUndispatchedRunnable(this, continuation), time, unit) }
84-
catch (e: RejectedExecutionException) { null }
85-
if (timeout != null)
86-
continuation.cancelFutureOnCancellation(timeout)
87-
else
88-
DefaultExecutor.scheduleResumeAfterDelay(time, unit, continuation)
97+
val future = if (removesFutureOnCancellation) {
98+
scheduleBlock(ResumeUndispatchedRunnable(this, continuation), time, unit)
99+
} else {
100+
null
101+
}
102+
103+
if (future != null) {
104+
continuation.cancelFutureOnCancellation(future)
105+
return
106+
}
107+
108+
DefaultExecutor.scheduleResumeAfterDelay(time, unit, continuation)
89109
}
90110

91111
override fun invokeOnTimeout(time: Long, unit: TimeUnit, block: Runnable): DisposableHandle {
92-
val timeout =
93-
try { (executor as? ScheduledExecutorService)
94-
?.schedule(block, time, unit) }
95-
catch (e: RejectedExecutionException) { null }
96-
return if (timeout != null)
97-
DisposableFutureHandle(timeout)
98-
else
99-
DefaultExecutor.invokeOnTimeout(time, unit, block)
112+
val future = if (removesFutureOnCancellation) {
113+
scheduleBlock(block, time, unit)
114+
} else {
115+
null
116+
}
117+
118+
return if (future != null ) DisposableFutureHandle(future) else DefaultExecutor.invokeOnTimeout(time, unit, block)
119+
}
120+
121+
private fun scheduleBlock(block: Runnable, time: Long, unit: TimeUnit): ScheduledFuture<*>? {
122+
return try {
123+
(executor as? ScheduledExecutorService)?.schedule(block, time, unit)
124+
} catch (e: RejectedExecutionException) {
125+
null
126+
}
100127
}
101128

102129
override fun close() {

core/kotlinx-coroutines-core/src/ThreadPoolDispatcher.kt

+6-1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
package kotlinx.coroutines.experimental
66

7+
import kotlinx.coroutines.experimental.internal.*
78
import java.util.concurrent.*
89
import java.util.concurrent.atomic.AtomicInteger
910
import kotlin.coroutines.experimental.*
@@ -62,12 +63,16 @@ public class ThreadPoolDispatcher internal constructor(
6263
private val nThreads: Int,
6364
private val name: String
6465
) : ExecutorCoroutineDispatcherBase() {
65-
private val threadNo = AtomicInteger()
6666

67+
private val threadNo = AtomicInteger()
6768
override val executor: Executor = Executors.newScheduledThreadPool(nThreads) { target ->
6869
PoolThread(this, target, if (nThreads == 1) name else name + "-" + threadNo.incrementAndGet())
6970
}
7071

72+
init {
73+
initFutureCancellation()
74+
}
75+
7176
/**
7277
* Closes this dispatcher -- shuts down all threads in this pool and releases resources.
7378
*/

core/kotlinx-coroutines-core/src/internal/Concurrent.kt

+18
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
package kotlinx.coroutines.experimental.internal
66

7+
import java.lang.reflect.*
78
import java.util.concurrent.*
89
import kotlin.concurrent.withLock as withLockJvm
910

@@ -13,3 +14,20 @@ internal actual fun <E> subscriberList(): SubscribersList<E> = CopyOnWriteArrayL
1314
internal actual typealias ReentrantLock = java.util.concurrent.locks.ReentrantLock
1415

1516
internal actual inline fun <T> ReentrantLock.withLock(action: () -> T) = this.withLockJvm(action)
17+
18+
private val REMOVE_FUTURE_ON_CANCEL: Method? = try {
19+
ScheduledThreadPoolExecutor::class.java.getMethod("setRemoveOnCancelPolicy", Boolean::class.java)
20+
} catch (e: Throwable) {
21+
null
22+
}
23+
24+
@Suppress("NAME_SHADOWING")
25+
internal fun removeFutureOnCancel(executor: Executor): Boolean {
26+
try {
27+
val executor = executor as? ScheduledExecutorService ?: return false
28+
(REMOVE_FUTURE_ON_CANCEL ?: return false).invoke(executor, true)
29+
return true
30+
} catch (e: Throwable) {
31+
return true
32+
}
33+
}

0 commit comments

Comments
 (0)