Skip to content

PR 1934 #1972

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 29, 2020
Merged

PR 1934 #1972

Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 80 additions & 77 deletions kotlinx-coroutines-core/jvm/src/Interruptible.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,77 +4,65 @@

package kotlinx.coroutines

import kotlinx.atomicfu.AtomicRef
import kotlinx.atomicfu.atomic
import kotlinx.atomicfu.loop
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.coroutines.intrinsics.suspendCoroutineUninterceptedOrReturn
import kotlinx.atomicfu.*
import kotlin.coroutines.*

/**
* Makes a blocking code block cancellable (become a cancellation point of the coroutine).
*
* Calls the specified [block] with a given coroutine context in a interruptible manner.
* The blocking code block will be interrupted and this function will throw [CancellationException]
* if the coroutine is cancelled.
* The specified [coroutineContext] directly translates into [withContext] argument.
*
* Example:
* ```
* GlobalScope.launch(Dispatchers.IO) {
* async {
* // This function will throw [CancellationException].
* runInterruptible {
* doSomethingUseful()
*
* // This blocking procedure will be interrupted when this coroutine is canceled
* // by Exception thrown by the below async block.
* doSomethingElseUsefulInterruptible()
* }
* val blockingJob = launch {
* // This function will throw [CancellationException].
* runInterruptible(Dispatchers.IO) {
* // This blocking procedure will be interrupted when this coroutine is canceled
* // by Exception thrown by the below async block.
* doSomethingElseUsefulInterruptible()
* }
* }
*
* async {
* delay(500L)
* throw Exception()
* }
* delay(500L)
* blockingJob.cancel() // Interrupt blocking call
* }
* ```
*
* There is also an optional context parameter to this function to enable single-call conversion of
* interruptible Java methods into main-safe suspending functions like this:
* interruptible Java methods into suspending functions like this:
* ```
* // With one call here we are moving the call to Dispatchers.IO and supporting interruption.
* suspend fun <T> BlockingQueue<T>.awaitTake(): T =
* runInterruptible(Dispatchers.IO) { queue.take() }
* ```
*
* @param context additional to [CoroutineScope.coroutineContext] context of the coroutine.
* @param block regular blocking block that will be interrupted on coroutine cancellation.
*/
public suspend fun <T> runInterruptible(
context: CoroutineContext = EmptyCoroutineContext,
block: () -> T
): T = withContext(context) { runInterruptibleInExpectedContext(block) }
context: CoroutineContext = EmptyCoroutineContext,
block: () -> T
): T = withContext(context) {
runInterruptibleInExpectedContext(block)
}

private suspend fun <T> runInterruptibleInExpectedContext(block: () -> T): T =
suspendCoroutineUninterceptedOrReturn sc@{ uCont ->
try {
// fast path: no job
val job = uCont.context[Job] ?: return@sc block()
// slow path
val threadState = ThreadState(job)
try {
block()
} finally {
threadState.clear()
}
} catch (e: InterruptedException) {
throw CancellationException("runInterruptible: interrupted").initCause(e)
}
private suspend fun <T> runInterruptibleInExpectedContext(block: () -> T): T {
try {
// No job -> no cancellation
val job = coroutineContext[Job] ?: return block()
val threadState = ThreadState(job)
try {
return block()
} finally {
threadState.clearInterrupt()
}
} catch (e: InterruptedException) {
throw CancellationException("Blocking call was interrupted due to parent cancellation").initCause(e)
}
}

private const val WORKING = 0
private const val FINISH = 1
private const val WORKING = 0
private const val FINISHED = 1
private const val INTERRUPTING = 2
private const val INTERRUPTED = 3
private const val INTERRUPTED = 3

private class ThreadState : CompletionHandler {
/*
Expand All @@ -85,10 +73,9 @@ private class ThreadState : CompletionHandler {
INTERRUPTING: canceled, going to interrupt this thread
INTERRUPTED: this thread is interrupted


=== Possible Transitions ===

+----------------+ remember +-------------------------+
+----------------+ register job +-------------------------+
| WORKING | cancellation listener | WORKING |
| (thread, null) | -------------------------> | (thread, cancel handle) |
+----------------+ +-------------------------+
Expand All @@ -104,58 +91,74 @@ private class ThreadState : CompletionHandler {
| |
V V
+---------------+ +-------------------------+
| INTERRUPTED | | FINISH |
| INTERRUPTED | | FINISHED |
+---------------+ +-------------------------+
*/
private val state: AtomicRef<State>
private val state: AtomicRef<State> = atomic(State(WORKING, null))
private val targetThread = Thread.currentThread()

private data class State(val state: Int, val thread: Thread? = null, val cancelHandle: DisposableHandle? = null)
private data class State(@JvmField val state: Int, @JvmField val cancelHandle: DisposableHandle?)

// We're using a non-primary constructor instead of init block of a primary constructor here, because
// we need to `return`.
constructor (job: Job) {
state = atomic(State(WORKING, Thread.currentThread()))
// watches the job for cancellation
constructor(job: Job) {
// Register cancellation handler
val cancelHandle =
job.invokeOnCompletion(onCancelling = true, invokeImmediately = true, handler = this)
// remembers the cancel handle or drops it
job.invokeOnCompletion(onCancelling = true, invokeImmediately = true, handler = this)
// Either we successfully stored it or it was immediately cancelled
state.loop { s ->
when(s.state) {
WORKING -> if (state.compareAndSet(s, State(WORKING, s.thread, cancelHandle))) return
when (s.state) {
// Happy-path, move forward
WORKING -> if (state.compareAndSet(s, State(WORKING, cancelHandle))) return
// Immediately cancelled, just continue
INTERRUPTING, INTERRUPTED -> return
FINISH -> throw IllegalStateException("impossible state")
else -> throw IllegalStateException("unknown state")
else -> throw IllegalStateException("Illegal state $s")
}
}
}

fun clear() {
fun clearInterrupt() {
/*
* Do not allow to untriggered interrupt to leak
*/
state.loop { s ->
when(s.state) {
WORKING -> if (state.compareAndSet(s, State(FINISH))) { s.cancelHandle!!.dispose(); return }
INTERRUPTING -> { /* spin */ }
INTERRUPTED -> { Thread.interrupted(); return } // no interrupt leak
FINISH -> throw IllegalStateException("impossible state")
else -> throw IllegalStateException("unknown state")
when (s.state) {
WORKING -> if (state.compareAndSet(s, State(FINISHED, null))) {
s.cancelHandle?.dispose()
return
}
INTERRUPTING -> {
/*
* Spin, cancellation mechanism is interrupting our thread right now
* and we have to wait it and then clear interrupt status
*/
}
INTERRUPTED -> {
// Clear it and bail out
Thread.interrupted();
return
}
else -> error("Illegal state: $s")
}
}
}

override fun invoke(cause: Throwable?) = onCancel(cause)

private inline fun onCancel(cause: Throwable?) {
// Cancellation handler
override fun invoke(cause: Throwable?) {
state.loop { s ->
when(s.state) {
when (s.state) {
// Working -> try to transite state and interrupt the thread
WORKING -> {
if (state.compareAndSet(s, State(INTERRUPTING))) {
s.thread!!.interrupt()
state.value = State(INTERRUPTED)
if (state.compareAndSet(s, State(INTERRUPTING, null))) {
targetThread.interrupt()
state.value = State(INTERRUPTED, null)
return
}
}
FINISH -> return
// Finished -- runInterruptible is already complete
FINISHED -> return
INTERRUPTING, INTERRUPTED -> return
else -> throw IllegalStateException("unknown state")
else -> error("Illegal state: $s")
}
}
}
Expand Down
163 changes: 0 additions & 163 deletions kotlinx-coroutines-core/jvm/test/InterruptibleTest.kt

This file was deleted.

Loading