5
5
package kotlinx.coroutines.internal
6
6
7
7
import kotlinx.atomicfu.*
8
- import kotlinx.cinterop.*
9
8
import kotlinx.coroutines.*
10
9
import kotlin.coroutines.*
11
10
import kotlin.coroutines.intrinsics.*
@@ -57,6 +56,7 @@ internal actual fun <T> Continuation<T>.useLocal() : Continuation<T> = when (thi
57
56
58
57
internal actual fun <T > Continuation<T>.shareableInterceptedResumeCancellableWith (result : Result <T >) {
59
58
this as ShareableContinuation <T > // must have been shared
59
+ val thread = ownerThreadOrNull ? : wasUsed()
60
60
if (currentThread() == thread) {
61
61
useRef().intercepted().resumeCancellableWith(result)
62
62
} else {
@@ -68,6 +68,7 @@ internal actual fun <T> Continuation<T>.shareableInterceptedResumeCancellableWit
68
68
69
69
internal actual fun <T > Continuation<T>.shareableInterceptedResumeWith (result : Result <T >) {
70
70
this as ShareableContinuation <T > // must have been shared
71
+ val thread = ownerThreadOrNull ? : wasUsed()
71
72
if (currentThread() == thread) {
72
73
useRef().intercepted().resumeWith(result)
73
74
} else {
@@ -102,10 +103,11 @@ internal actual inline fun disposeContinuation(cont: () -> Continuation<*>) {
102
103
103
104
internal actual fun <T > CancellableContinuationImpl<T>.shareableResume (delegate : Continuation <T >, useMode : Int ) {
104
105
if (delegate is ShareableContinuation ) {
105
- if (currentThread() == delegate.thread) {
106
+ val thread = delegate.ownerThreadOrNull ? : delegate.wasUsed()
107
+ if (currentThread() == thread) {
106
108
resumeImpl(delegate.useRef(), useMode)
107
109
} else {
108
- delegate. thread.execute {
110
+ thread.execute {
109
111
resumeImpl(delegate.useRef(), useMode)
110
112
}
111
113
}
@@ -144,54 +146,46 @@ internal actual inline fun Any.weakRef(): Any = WeakReference(this)
144
146
internal actual fun Any?.unweakRef (): Any? = (this as WeakReference <* >? )?.get()
145
147
146
148
internal open class ShareableObject <T : Any >(obj : T ) {
147
- val thread: Thread = currentThread()
148
-
149
- // todo: this is the best effort (fail-fast) double-dispose protection, does not provide memory safety guarantee
150
- private val _ref = atomic<StableRef <T >? > (StableRef .create(obj))
149
+ private val _ref = atomic<WorkerBoundReference <T >? > (WorkerBoundReference (obj))
151
150
151
+ val ownerThreadOrNull: Thread ?
152
+ get() = _ref .value?.worker?.toThread()
153
+
152
154
fun localRef (): T {
153
- checkThread()
154
155
val ref = _ref .value ? : wasUsed()
155
- return ref.get()
156
+ return ref.value
156
157
}
157
158
158
159
fun localRefOrNull (): T ? {
159
- val current = currentThread()
160
- if (current != thread) return null
161
160
val ref = _ref .value ? : wasUsed()
162
- return ref.get()
161
+ if (Worker .current != ref.worker) return null
162
+ return ref.value
163
163
}
164
164
165
165
fun localRefOrNullIfNotUsed (): T ? {
166
- val current = currentThread()
167
- if (current != thread) return null
168
166
val ref = _ref .value ? : return null
169
- return ref.get()
167
+ if (Worker .current != ref.worker) return null
168
+ return ref.value
170
169
}
171
170
172
171
fun useRef (): T {
173
- checkThread()
174
172
val ref = _ref .getAndSet(null ) ? : wasUsed()
175
- return ref.get(). also { ref.dispose() }
173
+ return ref.value
176
174
}
177
175
178
176
fun disposeRef (): T ? {
179
- checkThread()
180
177
val ref = _ref .getAndSet(null ) ? : return null
181
- return ref.get(). also { ref.dispose() }
178
+ return ref.value
182
179
}
183
180
184
- private fun checkThread () {
185
- val current = currentThread()
186
- if (current != thread) error(" Ref $classSimpleName @$hexAddress can be used only from thread $thread but now in $current " )
187
- }
188
-
189
- private fun wasUsed (): Nothing {
181
+ fun wasUsed (): Nothing {
190
182
error(" Ref $classSimpleName @$hexAddress was already used" )
191
183
}
192
184
193
- override fun toString (): String =
194
- " Shareable[${if (currentThread() == thread) _ref .value?.get()?.toString() ? : " used" else " wrong thread" } ]"
185
+ override fun toString (): String {
186
+ val ref = _ref .value ? : return " Shareable[used]"
187
+ return " Shareable[${if (Worker .current == ref.worker) _ref .value.toString() else " wrong worker" } ]"
188
+ }
195
189
}
196
190
197
191
@PublishedApi
@@ -201,6 +195,7 @@ internal class ShareableContinuation<T>(
201
195
override val context: CoroutineContext = cont.context
202
196
203
197
override fun resumeWith (result : Result <T >) {
198
+ val thread = ownerThreadOrNull ? : wasUsed()
204
199
if (currentThread() == thread) {
205
200
useRef().resumeWith(result)
206
201
} else {
@@ -215,6 +210,7 @@ private class ShareableDisposableHandle(
215
210
handle : DisposableHandle
216
211
) : ShareableObject<DisposableHandle>(handle), DisposableHandle {
217
212
override fun dispose () {
213
+ val thread = ownerThreadOrNull ? : return
218
214
if (currentThread() == thread) {
219
215
disposeRef()?.dispose()
220
216
} else {
@@ -247,6 +243,7 @@ private class ShareableBlock<T, R>(
247
243
248
244
fun dispose (useIt : Boolean ) {
249
245
if (willBeUsed.value && ! useIt) return
246
+ val thread = ownerThreadOrNull ? : return
250
247
if (currentThread() == thread) {
251
248
disposeRef()
252
249
} else {
0 commit comments