Skip to content

Commit e29f497

Browse files
qwwdfsadelizarov
authored andcommitted
Avoid OOM in thread-pool dispatchers, try to reflectively invoke setRemoveFutureOnCancel on executor instance and use default dispatcher if attempt failed
Fixes #571
1 parent 7ff678f commit e29f497

File tree

3 files changed

+69
-18
lines changed

3 files changed

+69
-18
lines changed

core/kotlinx-coroutines-core/src/Executors.kt

+46-18
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44

55
package kotlinx.coroutines.experimental
66

7+
import kotlinx.coroutines.experimental.internal.*
78
import java.io.*
9+
import java.io.Closeable
810
import java.util.concurrent.*
911
import kotlin.coroutines.experimental.*
1012

@@ -18,7 +20,7 @@ import kotlin.coroutines.experimental.*
1820
public abstract class ExecutorCoroutineDispatcher: CloseableCoroutineDispatcher(), Closeable {
1921
/**
2022
* Closes this coroutine dispatcher and shuts down its executor.
21-
*
23+
*
2224
* It may throw an exception if this dispatcher is global and cannot be closed.
2325
*/
2426
public abstract override fun close()
@@ -74,41 +76,67 @@ public fun ExecutorService.asCoroutineDispatcher_Deprecated(): CloseableCoroutin
7476
public fun Executor.toCoroutineDispatcher(): CoroutineDispatcher =
7577
asCoroutineDispatcher()
7678

77-
private class ExecutorCoroutineDispatcherImpl(override val executor: Executor) : ExecutorCoroutineDispatcherBase()
79+
private class ExecutorCoroutineDispatcherImpl(override val executor: Executor) : ExecutorCoroutineDispatcherBase() {
80+
init {
81+
initFutureCancellation()
82+
}
83+
}
7884

7985
/**
8086
* @suppress **This is unstable API and it is subject to change.**
8187
*/
8288
@InternalCoroutinesApi
8389
public abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispatcher(), Delay {
8490

91+
private var removesFutureOnCancellation: Boolean = false
92+
93+
internal fun initFutureCancellation() {
94+
removesFutureOnCancellation = removeFutureOnCancel(executor)
95+
}
96+
8597
override fun dispatch(context: CoroutineContext, block: Runnable) =
8698
try { executor.execute(timeSource.trackTask(block)) }
8799
catch (e: RejectedExecutionException) {
88100
timeSource.unTrackTask()
89101
DefaultExecutor.execute(block)
90102
}
91103

104+
/*
105+
* removesFutureOnCancellation is required to avoid memory leak.
106+
* On Java 7+ we reflectively invoke ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy(true) and we're fine.
107+
* On Java 6 we're scheduling time-based coroutines to our own thread safe heap which supports cancellation.
108+
*/
92109
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
93-
val timeout =
94-
try { (executor as? ScheduledExecutorService)
95-
?.schedule(ResumeUndispatchedRunnable(this, continuation), timeMillis, TimeUnit.MILLISECONDS) }
96-
catch (e: RejectedExecutionException) { null }
97-
if (timeout != null)
98-
continuation.cancelFutureOnCancellation(timeout)
99-
else
100-
DefaultExecutor.scheduleResumeAfterDelay(timeMillis, continuation)
110+
val future = if (removesFutureOnCancellation) {
111+
scheduleBlock(ResumeUndispatchedRunnable(this, continuation), timeMillis, TimeUnit.MILLISECONDS)
112+
} else {
113+
null
114+
}
115+
// If everything went fine and the scheduling attempt was not rejected -- use it
116+
if (future != null) {
117+
continuation.cancelFutureOnCancellation(future)
118+
return
119+
}
120+
// Otherwise fallback to default executor
121+
DefaultExecutor.scheduleResumeAfterDelay(timeMillis, continuation)
101122
}
102123

103124
override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
104-
val timeout =
105-
try { (executor as? ScheduledExecutorService)
106-
?.schedule(block, timeMillis, TimeUnit.MILLISECONDS) }
107-
catch (e: RejectedExecutionException) { null }
108-
return if (timeout != null)
109-
DisposableFutureHandle(timeout)
110-
else
111-
DefaultExecutor.invokeOnTimeout(timeMillis, block)
125+
val future = if (removesFutureOnCancellation) {
126+
scheduleBlock(block, timeMillis, TimeUnit.MILLISECONDS)
127+
} else {
128+
null
129+
}
130+
131+
return if (future != null ) DisposableFutureHandle(future) else DefaultExecutor.invokeOnTimeout(timeMillis, block)
132+
}
133+
134+
private fun scheduleBlock(block: Runnable, time: Long, unit: TimeUnit): ScheduledFuture<*>? {
135+
return try {
136+
(executor as? ScheduledExecutorService)?.schedule(block, time, unit)
137+
} catch (e: RejectedExecutionException) {
138+
null
139+
}
112140
}
113141

114142
override fun close() {

core/kotlinx-coroutines-core/src/ThreadPoolDispatcher.kt

+5
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
package kotlinx.coroutines.experimental
66

7+
import kotlinx.coroutines.experimental.internal.*
78
import java.util.concurrent.*
89
import java.util.concurrent.atomic.AtomicInteger
910
import kotlin.coroutines.experimental.*
@@ -88,6 +89,10 @@ public class ThreadPoolDispatcher internal constructor(
8889
PoolThread(this, target, if (nThreads == 1) name else name + "-" + threadNo.incrementAndGet())
8990
}
9091

92+
init {
93+
initFutureCancellation()
94+
}
95+
9196
/**
9297
* Closes this dispatcher -- shuts down all threads in this pool and releases resources.
9398
*/

core/kotlinx-coroutines-core/src/internal/Concurrent.kt

+18
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
package kotlinx.coroutines.experimental.internal
66

7+
import java.lang.reflect.*
78
import java.util.*
89
import java.util.concurrent.*
910
import kotlin.concurrent.withLock as withLockJvm
@@ -16,3 +17,20 @@ internal actual typealias ReentrantLock = java.util.concurrent.locks.ReentrantLo
1617
internal actual inline fun <T> ReentrantLock.withLock(action: () -> T) = this.withLockJvm(action)
1718

1819
internal actual fun <E> identitySet(expectedSize: Int): MutableSet<E> = Collections.newSetFromMap(IdentityHashMap(expectedSize))
20+
21+
private val REMOVE_FUTURE_ON_CANCEL: Method? = try {
22+
ScheduledThreadPoolExecutor::class.java.getMethod("setRemoveOnCancelPolicy", Boolean::class.java)
23+
} catch (e: Throwable) {
24+
null
25+
}
26+
27+
@Suppress("NAME_SHADOWING")
28+
internal fun removeFutureOnCancel(executor: Executor): Boolean {
29+
try {
30+
val executor = executor as? ScheduledExecutorService ?: return false
31+
(REMOVE_FUTURE_ON_CANCEL ?: return false).invoke(executor, true)
32+
return true
33+
} catch (e: Throwable) {
34+
return true
35+
}
36+
}

0 commit comments

Comments
 (0)