Skip to content

Commit b386963

Browse files
committed
BlockingPool implementation.
1 parent a9fb479 commit b386963

File tree

3 files changed

+124
-0
lines changed

3 files changed

+124
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package kotlinx.coroutines.experimental
18+
19+
import java.util.concurrent.Executor
20+
import java.util.concurrent.Executors
21+
import kotlin.coroutines.experimental.CoroutineContext
22+
23+
/**
24+
* This is a common pool of shared threads for blocking tasks.
25+
*
26+
* It uses a *unbounded* cached thread pool.
27+
*/
28+
object BlockingPool : CoroutineDispatcher(), Executor {
29+
30+
private val threadGroup = ThreadGroup("kotlinx.coroutines.BlockingPool")
31+
32+
private val executor = Executors.newCachedThreadPool({
33+
Thread(threadGroup, it, "kotlinx.coroutines.BlockingPool-${System.identityHashCode(it).toString(16)}").apply {
34+
isDaemon = true
35+
}
36+
})
37+
38+
override fun execute(command: Runnable) = executor.execute(command)
39+
40+
override fun dispatch(context: CoroutineContext, block: Runnable) {
41+
execute(block)
42+
}
43+
44+
internal fun isDispatchNeeded(): Boolean = threadGroup !== Thread.currentThread().threadGroup
45+
46+
override fun isDispatchNeeded(context: CoroutineContext): Boolean = isDispatchNeeded()
47+
}

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt

+33
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,39 @@ import kotlin.coroutines.experimental.intrinsics.suspendCoroutineOrReturn
2424

2525
// --------------- basic coroutine builders ---------------
2626

27+
/**
28+
* Execute the given [block] in an unbounded executor.
29+
* If the current thread is a blocking one then the [block] is executed immediately,
30+
* otherwise the coroutine is suspended and the [block] is scheduled for execution on a different thread.
31+
*/
32+
public suspend fun <T> blocking(block: () -> T): T =
33+
if (!BlockingPool.isDispatchNeeded())
34+
// execute block undispatched
35+
block()
36+
else
37+
suspendCancellableCoroutine { continuation ->
38+
val runnable = object : Runnable {
39+
@JvmField
40+
@Volatile
41+
var uncompletedContinuation: CancellableContinuation<T>? = continuation
42+
43+
override fun run() {
44+
val res =
45+
try {
46+
block()
47+
} catch (exception: Throwable) {
48+
uncompletedContinuation?.resumeWithException(exception)
49+
return
50+
}
51+
uncompletedContinuation?.resume(res)
52+
}
53+
}
54+
55+
// execute block in blocking pool
56+
BlockingPool.execute(runnable)
57+
continuation.invokeOnCompletion { runnable.uncompletedContinuation = null }
58+
}
59+
2760
/**
2861
* Launches new coroutine without blocking current thread and returns a reference to the coroutine as a [Job].
2962
* The coroutine is cancelled when the resulting job is [cancelled][Job.cancel].
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package kotlinx.coroutines.experimental
18+
19+
import guide.test.ignoreLostThreads
20+
import org.junit.Test
21+
import java.util.concurrent.CountDownLatch
22+
import java.util.concurrent.TimeUnit
23+
24+
class BlockingTest : TestBase() {
25+
@Test
26+
fun testBlockingDelay() {
27+
ignoreLostThreads("kotlinx.coroutines.BlockingPool")
28+
runBlocking {
29+
val taskCount = Runtime.getRuntime().availableProcessors() * 3
30+
val countDown = CountDownLatch(taskCount)
31+
repeat(taskCount) {
32+
async {
33+
assert(BlockingPool.isDispatchNeeded())
34+
blocking {
35+
assert(!BlockingPool.isDispatchNeeded())
36+
Thread.sleep(50L)
37+
countDown.countDown()
38+
}
39+
}
40+
}
41+
countDown.await(80L, TimeUnit.MILLISECONDS)
42+
}
43+
}
44+
}

0 commit comments

Comments
 (0)