@@ -11,27 +11,22 @@ import kotlin.coroutines.*
11
11
* Calls the specified [block] with a given coroutine context in a interruptible manner.
12
12
* The blocking code block will be interrupted and this function will throw [CancellationException]
13
13
* if the coroutine is cancelled.
14
- * The specified [coroutineContext] directly translates into [withContext] argument.
15
14
*
16
15
* Example:
16
+ *
17
17
* ```
18
- * val blockingJob = launch {
19
- * // This function will throw CancellationException
20
- * runInterruptible(Dispatchers.IO) {
21
- * // This blocking procedure will be interrupted when this coroutine is canceled
22
- * doSomethingElseUsefulInterruptible()
18
+ * withTimeout(500L) { // Cancels coroutine on timeout
19
+ * runInterruptible { // Throws CancellationException if interrupted
20
+ * doSomethingBlocking() // Interrupted on coroutines cancellation
23
21
* }
24
22
* }
25
- *
26
- * delay(500L)
27
- * blockingJob.cancel() // Interrupt blocking call
28
- * }
29
23
* ```
30
24
*
31
- * There is also an optional context parameter to this function to enable single-call conversion of
32
- * interruptible Java methods into suspending functions like this:
25
+ * There is an optional [context] parameter to this function working just like [withContext].
26
+ * It enables single-call conversion of interruptible Java methods into suspending functions.
27
+ * With one call here we are moving the call to [Dispatchers.IO] and supporting interruption:
28
+ *
33
29
* ```
34
- * // With one call here we are moving the call to Dispatchers.IO and supporting interruption.
35
30
* suspend fun <T> BlockingQueue<T>.awaitTake(): T =
36
31
* runInterruptible(Dispatchers.IO) { queue.take() }
37
32
* ```
@@ -40,14 +35,14 @@ public suspend fun <T> runInterruptible(
40
35
context : CoroutineContext = EmptyCoroutineContext ,
41
36
block : () -> T
42
37
): T = withContext(context) {
43
- runInterruptibleInExpectedContext(block)
38
+ runInterruptibleInExpectedContext(coroutineContext, block)
44
39
}
45
40
46
- private suspend fun <T > runInterruptibleInExpectedContext (block : () -> T ): T {
41
+ private fun <T > runInterruptibleInExpectedContext (coroutineContext : CoroutineContext , block : () -> T ): T {
47
42
try {
48
- // No job -> no cancellation
49
- val job = coroutineContext[Job ] ? : return block()
43
+ val job = coroutineContext[Job ]!! // withContext always creates a job
50
44
val threadState = ThreadState (job)
45
+ threadState.setup()
51
46
try {
52
47
return block()
53
48
} finally {
@@ -63,7 +58,7 @@ private const val FINISHED = 1
63
58
private const val INTERRUPTING = 2
64
59
private const val INTERRUPTED = 3
65
60
66
- private class ThreadState : CompletionHandler {
61
+ private class ThreadState ( private val job : Job ) : CompletionHandler {
67
62
/*
68
63
=== States ===
69
64
@@ -90,28 +85,25 @@ private class ThreadState : CompletionHandler {
90
85
| |
91
86
V V
92
87
+---------------+ +-------------------------+
93
- | INTERRUPTED | | FINISHED |
88
+ | INTERRUPTED | | FINISHED |
94
89
+---------------+ +-------------------------+
95
90
*/
96
- private val state : AtomicRef < State > = atomic(State ( WORKING , null ) )
91
+ private val _state = atomic(WORKING )
97
92
private val targetThread = Thread .currentThread()
98
93
99
- private data class State (@JvmField val state : Int , @JvmField val cancelHandle : DisposableHandle ? )
94
+ // Registered cancellation handler
95
+ private var cancelHandle: DisposableHandle ? = null
100
96
101
- // We're using a non-primary constructor instead of init block of a primary constructor here, because
102
- // we need to `return`.
103
- constructor (job: Job ) {
104
- // Register cancellation handler
105
- val cancelHandle =
106
- job.invokeOnCompletion(onCancelling = true , invokeImmediately = true , handler = this )
97
+ fun setup () {
98
+ cancelHandle = job.invokeOnCompletion(onCancelling = true , invokeImmediately = true , handler = this )
107
99
// Either we successfully stored it or it was immediately cancelled
108
- state .loop { s ->
109
- when (s. state) {
100
+ _state .loop { state ->
101
+ when (state) {
110
102
// Happy-path, move forward
111
- WORKING -> if (state .compareAndSet(s, State ( WORKING , cancelHandle) )) return
103
+ WORKING -> if (_state .compareAndSet(state, WORKING )) return
112
104
// Immediately cancelled, just continue
113
105
INTERRUPTING , INTERRUPTED -> return
114
- else -> throw IllegalStateException ( " Illegal state $s " )
106
+ else -> invalidState( state)
115
107
}
116
108
}
117
109
}
@@ -120,10 +112,10 @@ private class ThreadState : CompletionHandler {
120
112
/*
121
113
* Do not allow to untriggered interrupt to leak
122
114
*/
123
- state .loop { s ->
124
- when (s. state) {
125
- WORKING -> if (state .compareAndSet(s, State ( FINISHED , null ) )) {
126
- s. cancelHandle?.dispose()
115
+ _state .loop { state ->
116
+ when (state) {
117
+ WORKING -> if (_state .compareAndSet(state, FINISHED )) {
118
+ cancelHandle?.dispose()
127
119
return
128
120
}
129
121
INTERRUPTING -> {
@@ -134,31 +126,32 @@ private class ThreadState : CompletionHandler {
134
126
}
135
127
INTERRUPTED -> {
136
128
// Clear it and bail out
137
- Thread .interrupted();
129
+ Thread .interrupted()
138
130
return
139
131
}
140
- else -> error( " Illegal state: $s " )
132
+ else -> invalidState( state)
141
133
}
142
134
}
143
135
}
144
136
145
137
// Cancellation handler
146
138
override fun invoke (cause : Throwable ? ) {
147
- state .loop { s ->
148
- when (s. state) {
139
+ _state .loop { state ->
140
+ when (state) {
149
141
// Working -> try to transite state and interrupt the thread
150
142
WORKING -> {
151
- if (state .compareAndSet(s, State ( INTERRUPTING , null ) )) {
143
+ if (_state .compareAndSet(state, INTERRUPTING )) {
152
144
targetThread.interrupt()
153
- state .value = State ( INTERRUPTED , null )
145
+ _state .value = INTERRUPTED
154
146
return
155
147
}
156
148
}
157
- // Finished -- runInterruptible is already complete
158
- FINISHED -> return
159
- INTERRUPTING , INTERRUPTED -> return
160
- else -> error(" Illegal state: $s " )
149
+ // Finished -- runInterruptible is already complete, INTERRUPTING - ignore
150
+ FINISHED , INTERRUPTING , INTERRUPTED -> return
151
+ else -> invalidState(state)
161
152
}
162
153
}
163
154
}
155
+
156
+ private fun invalidState (state : Int ): Nothing = error(" Illegal state $state " )
164
157
}
0 commit comments