Skip to content

Commit 3bde161

Browse files
author
Francesco Vasco
committed
Support for blocking calls inside coroutines (Kotlin#79)
1 parent d28cc87 commit 3bde161

File tree

4 files changed

+72
-9
lines changed

4 files changed

+72
-9
lines changed

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt

+19
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,32 @@
1616

1717
package kotlinx.coroutines.experimental
1818

19+
import java.util.concurrent.Executor
1920
import java.util.concurrent.locks.LockSupport
2021
import kotlin.coroutines.experimental.*
2122
import kotlin.coroutines.experimental.intrinsics.startCoroutineUninterceptedOrReturn
2223
import kotlin.coroutines.experimental.intrinsics.suspendCoroutineOrReturn
2324

2425
// --------------- basic coroutine builders ---------------
2526

27+
/**
28+
* Suspend current coroutine and execute the [block] blocking code in [executor].
29+
* The current coroutine is resumed after [block] execution.
30+
*
31+
* @param executor the executor for blocking code
32+
* @param block the blocking code
33+
*/
34+
suspend fun <T> blocking(executor: Executor = IOExecutor, block: () -> T) =
35+
suspendCoroutine<T> { continuation ->
36+
executor.execute {
37+
try {
38+
continuation.resume(block())
39+
} catch (t: Throwable) {
40+
continuation.resumeWithException(t)
41+
}
42+
}
43+
}
44+
2645
/**
2746
* Launches new coroutine without blocking current thread and returns a reference to the coroutine as a [Job].
2847
* The coroutine is cancelled when the resulting job is [cancelled][Job.cancel].

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt

+17-1
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ import kotlinx.coroutines.experimental.intrinsics.startCoroutineUndispatched
2020
import kotlinx.coroutines.experimental.selects.SelectBuilder
2121
import kotlinx.coroutines.experimental.selects.SelectInstance
2222
import kotlinx.coroutines.experimental.selects.select
23+
import java.util.concurrent.Executor
2324
import kotlin.coroutines.experimental.CoroutineContext
24-
import kotlin.coroutines.experimental.startCoroutine
2525

2626
/**
2727
* Deferred value is a non-blocking cancellable future.
@@ -137,6 +137,22 @@ public fun <T> async(
137137
public fun <T> async(context: CoroutineContext, start: Boolean, block: suspend CoroutineScope.() -> T): Deferred<T> =
138138
async(context, if (start) CoroutineStart.DEFAULT else CoroutineStart.LAZY, block)
139139

140+
/**
141+
* Execute the blocking code [block] in the [executor] without blocking current coroutine.
142+
* [block] code is immediately scheduled for execution unless you set [start] as [CoroutineStart.LAZY], see [async] for more details.
143+
*
144+
* @param executor the executor for blocking code
145+
* @param start start option
146+
* @param block the blocking code
147+
* @see async
148+
*/
149+
public fun <T> blockingAsync(executor: Executor = IOExecutor,
150+
start: CoroutineStart = CoroutineStart.DEFAULT,
151+
block: () -> T): Deferred<T> {
152+
require(start != CoroutineStart.UNDISPATCHED) { "Start blocking code undispatched is not supported" }
153+
return async(executor.asCoroutineDispatcher(), start) { block() }
154+
}
155+
140156
/**
141157
* @suppress **Deprecated**: `defer` was renamed to `async`.
142158
*/

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Executors.kt

+14-3
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,23 @@
1616

1717
package kotlinx.coroutines.experimental
1818

19-
import java.util.concurrent.Executor
20-
import java.util.concurrent.ScheduledExecutorService
21-
import java.util.concurrent.TimeUnit
19+
import java.util.concurrent.*
2220
import kotlin.coroutines.experimental.Continuation
2321
import kotlin.coroutines.experimental.CoroutineContext
2422

23+
/**
24+
* Create a shared Executor for blocking method invocation (ie: I/O, synchronized block, lock, etc...).
25+
* You can tune the pool size and the thread lifetime setting the properties
26+
* `kotlinx.coroutines.experimental.IOExecutor.maximumPoolSize` and
27+
* `kotlinx.coroutines.experimental.IOExecutor.keepAliveMillis` respectively.
28+
*/
29+
public val IOExecutor: Executor = ThreadPoolExecutor(
30+
0,
31+
System.getProperty("kotlinx.coroutines.experimental.IOExecutor.maximumPoolSize").toIntOrNull() ?: Runtime.getRuntime().availableProcessors() * 64,
32+
System.getProperty("kotlinx.coroutines.experimental.IOExecutor.keepAliveMillis").toLongOrNull() ?: 60_000L, TimeUnit.MILLISECONDS,
33+
SynchronousQueue<Runnable>(),
34+
ThreadFactory { r -> Thread(r, "IOExecutor-${Math.abs(r.hashCode()).toString(16)}") })
35+
2536
/**
2637
* Converts an instance of [Executor] to an implementation of [CoroutineDispatcher].
2738
* @suppress **Deprecated**: Renamed to [asCoroutineDispatcher].

kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/AsyncTest.kt

+22-5
Original file line numberDiff line numberDiff line change
@@ -104,10 +104,21 @@ class AsyncTest : TestBase() {
104104
finish(13)
105105
}
106106

107-
class BadClass {
108-
override fun equals(other: Any?): Boolean = error("equals")
109-
override fun hashCode(): Int = error("hashCode")
110-
override fun toString(): String = error("toString")
107+
@Test
108+
fun blockingAsync() = runBlocking {
109+
val d = blockingAsync { 42 }
110+
check(d.await() == 42)
111+
}
112+
113+
@Test
114+
fun blockingAsyncLazy() = runBlocking {
115+
val d = blockingAsync(start = CoroutineStart.LAZY) { 42 }
116+
check(d.await() == 42)
117+
}
118+
119+
@Test(expected = IllegalArgumentException::class)
120+
fun blockingAsyncUndispatched() = runBlocking {
121+
blockingAsync(start = CoroutineStart.UNDISPATCHED) { 42 }
111122
}
112123

113124
@Test
@@ -120,4 +131,10 @@ class AsyncTest : TestBase() {
120131
assertTrue(d.await() === bad)
121132
finish(2)
122133
}
123-
}
134+
135+
class BadClass {
136+
override fun equals(other: Any?): Boolean = error("equals")
137+
override fun hashCode(): Int = error("hashCode")
138+
override fun toString(): String = error("toString")
139+
}
140+
}

0 commit comments

Comments
 (0)