Skip to content

Commit e9e6613

Browse files
Francesco Vascofvasco
Francesco Vasco
authored andcommitted
Support for blocking calls inside coroutines (Kotlin#79)
1 parent 69d9c85 commit e9e6613

File tree

4 files changed

+83
-1
lines changed

4 files changed

+83
-1
lines changed

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt

+19
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package kotlinx.coroutines.experimental
1818

19+
import java.util.concurrent.Executor
1920
import java.util.concurrent.locks.LockSupport
2021
import kotlin.coroutines.experimental.*
2122
import kotlin.coroutines.experimental.intrinsics.COROUTINE_SUSPENDED
@@ -24,6 +25,24 @@ import kotlin.coroutines.experimental.intrinsics.suspendCoroutineOrReturn
2425

2526
// --------------- basic coroutine builders ---------------
2627

28+
/**
29+
* Suspend current coroutine and execute the [block] blocking code in [executor].
30+
* The current coroutine is resumed after [block] execution.
31+
*
32+
* @param executor the executor for blocking code
33+
* @param block the blocking code
34+
*/
35+
suspend fun <T> blocking(executor: Executor = defaultBlockingExecutor, block: () -> T) =
36+
suspendCoroutine<T> { cont ->
37+
executor.execute {
38+
try {
39+
cont.resume(block())
40+
} catch (t: Throwable) {
41+
cont.resumeWithException(t)
42+
}
43+
}
44+
}
45+
2746
/**
2847
* Launches new coroutine without blocking current thread and returns a reference to the coroutine as a [Job].
2948
* The coroutine is cancelled when the resulting job is [cancelled][Job.cancel].

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/DefaultExecutor.kt

+30
Original file line numberDiff line numberDiff line change
@@ -16,22 +16,41 @@
1616

1717
package kotlinx.coroutines.experimental
1818

19+
import java.util.concurrent.ExecutorService
20+
import java.util.concurrent.LinkedBlockingQueue
1921
import java.util.concurrent.ScheduledExecutorService
2022
import java.util.concurrent.ScheduledThreadPoolExecutor
23+
import java.util.concurrent.ThreadPoolExecutor
2124
import java.util.concurrent.TimeUnit
2225

2326
private const val DEFAULT_KEEP_ALIVE = 1000L
27+
private const val DEFAULT_BLOCKING_KEEP_ALIVE = 60_000L
28+
private const val DEFAULT_BLOCKING_POOL_SIZE = 256
2429

2530
private val KEEP_ALIVE =
2631
try { java.lang.Long.getLong("kotlinx.coroutines.DefaultExecutor.keepAlive", DEFAULT_KEEP_ALIVE) }
2732
catch (e: SecurityException) { DEFAULT_KEEP_ALIVE }
2833

34+
private val BLOCKING_KEEP_ALIVE =
35+
try { java.lang.Long.getLong("kotlinx.coroutines.DefaultBlockingExecutor.keepAlive", DEFAULT_BLOCKING_KEEP_ALIVE) }
36+
catch (e: SecurityException) { DEFAULT_BLOCKING_KEEP_ALIVE }
37+
38+
private val BLOCKING_POOL_SIZE =
39+
try { java.lang.Integer.getInteger("kotlinx.coroutines.DefaultBlockingExecutor.maximumPoolSize", DEFAULT_BLOCKING_POOL_SIZE) }
40+
catch (e: SecurityException) { DEFAULT_BLOCKING_POOL_SIZE }
41+
2942
@Volatile
3043
private var _executor: ScheduledExecutorService? = null
3144

45+
@Volatile
46+
private var _blockingExecutor: ExecutorService? = null
47+
3248
internal val defaultExecutor: ScheduledExecutorService
3349
get() = _executor ?: getOrCreateExecutorSync()
3450

51+
internal val defaultBlockingExecutor: ExecutorService
52+
get() = _blockingExecutor ?: getOrCreateBlockingExecutorSync()
53+
3554
@Synchronized
3655
private fun getOrCreateExecutorSync(): ScheduledExecutorService =
3756
_executor ?: ScheduledThreadPoolExecutor(1) { r ->
@@ -48,6 +67,17 @@ private fun getOrCreateExecutorSync(): ScheduledExecutorService =
4867
_executor = this
4968
}
5069

70+
@Synchronized
71+
private fun getOrCreateBlockingExecutorSync(): ExecutorService =
72+
_blockingExecutor ?: ThreadPoolExecutor(1, BLOCKING_POOL_SIZE,
73+
BLOCKING_KEEP_ALIVE, TimeUnit.MILLISECONDS,
74+
LinkedBlockingQueue<Runnable>()) { r ->
75+
Thread(r, "kotlinx.coroutines.DefaultBlockingExecutor").apply { isDaemon = true }
76+
}.apply {
77+
allowCoreThreadTimeOut(true)
78+
_blockingExecutor = this
79+
}
80+
5181
// used for tests
5282
@Synchronized
5383
internal fun shutdownDefaultExecutor(timeout: Long) {

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt

+16
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package kotlinx.coroutines.experimental
1919
import kotlinx.coroutines.experimental.selects.SelectBuilder
2020
import kotlinx.coroutines.experimental.selects.SelectInstance
2121
import kotlinx.coroutines.experimental.selects.select
22+
import java.util.concurrent.Executor
2223
import kotlin.coroutines.experimental.CoroutineContext
2324

2425
/**
@@ -160,6 +161,21 @@ public fun <T> async(
160161
public fun <T> async(context: CoroutineContext, start: Boolean, block: suspend CoroutineScope.() -> T): Deferred<T> =
161162
async(context, if (start) CoroutineStart.DEFAULT else CoroutineStart.LAZY, block)
162163

164+
/**
165+
* Execute the blocking code [block] in the [executor] without blocking current coroutine.
166+
* [start] policy is same as [async] method, but [CoroutineStart.UNDISPATCHED] is an invalid option
167+
*
168+
* @param executor the executor for blocking code
169+
* @param start start option, [CoroutineStart.UNDISPATCHED] is invalid
170+
* @param block the blocking code
171+
*/
172+
public fun <T> blockingAsync(executor: Executor = defaultBlockingExecutor,
173+
start: CoroutineStart = CoroutineStart.DEFAULT,
174+
block: () -> T): Deferred<T> {
175+
require(start != CoroutineStart.UNDISPATCHED) { "Start blocking code undispatched is not supported" }
176+
return async(executor.asCoroutineDispatcher(), start) { block() }
177+
}
178+
163179
/**
164180
* @suppress **Deprecated**: `defer` was renamed to `async`.
165181
*/

kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/AsyncTest.kt

+18-1
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,23 @@ class AsyncTest : TestBase() {
144144
override fun toString(): String = error("toString")
145145
}
146146

147+
@Test
148+
fun blockingAsync() = runBlocking {
149+
val d = blockingAsync { 42 }
150+
check(d.await() == 42)
151+
}
152+
153+
@Test
154+
fun blockingAsyncLazy() = runBlocking {
155+
val d = blockingAsync(start = CoroutineStart.LAZY) { 42 }
156+
check(d.await() == 42)
157+
}
158+
159+
@Test(expected = IllegalArgumentException::class)
160+
fun blockingAsyncUndispatched() = runBlocking {
161+
blockingAsync(start = CoroutineStart.UNDISPATCHED) { 42 }
162+
}
163+
147164
@Test
148165
fun testDeferBadClass() = runBlocking {
149166
val bad = BadClass()
@@ -154,4 +171,4 @@ class AsyncTest : TestBase() {
154171
assertTrue(d.await() === bad)
155172
finish(2)
156173
}
157-
}
174+
}

0 commit comments

Comments
 (0)