Skip to content

Commit 8f39109

Browse files
committed
Replace all volatiles with atomics in common code
In preparation to native multithreading.
1 parent 3dbe82b commit 8f39109

File tree

7 files changed

+74
-45
lines changed

7 files changed

+74
-45
lines changed

kotlinx-coroutines-core/common/src/Await.kt

+8-8
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
11
/*
2-
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

55
package kotlinx.coroutines
66

77
import kotlinx.atomicfu.*
88
import kotlin.coroutines.*
9-
import kotlin.jvm.*
109

1110
/**
1211
* Awaits for completion of given deferred values without blocking a thread and resumes normally with the list of values
@@ -62,7 +61,7 @@ private class AwaitAll<T>(private val deferreds: Array<out Deferred<T>>) {
6261
suspend fun await(): List<T> = suspendCancellableCoroutine { cont ->
6362
// Intricate dance here
6463
// Step 1: Create nodes and install them as completion handlers, they may fire!
65-
val nodes = Array<AwaitAllNode>(deferreds.size) { i ->
64+
val nodes = Array(deferreds.size) { i ->
6665
val deferred = deferreds[i]
6766
deferred.start() // To properly await lazily started deferreds
6867
AwaitAllNode(cont, deferred).apply {
@@ -72,7 +71,7 @@ private class AwaitAll<T>(private val deferreds: Array<out Deferred<T>>) {
7271
val disposer = DisposeHandlersOnCancel(nodes)
7372
// Step 2: Set disposer to each node
7473
nodes.forEach { it.disposer = disposer }
75-
// Here we know that if any code the nodes complete, it will dipsose the rest
74+
// Here we know that if any code the nodes complete, it will dispose the rest
7675
// Step 3: Now we can check if continuation is complete
7776
if (cont.isCompleted) {
7877
// it is already complete while handlers were being installed -- dispose them all
@@ -94,18 +93,19 @@ private class AwaitAll<T>(private val deferreds: Array<out Deferred<T>>) {
9493
private inner class AwaitAllNode(private val continuation: CancellableContinuation<List<T>>, job: Job) : JobNode<Job>(job) {
9594
lateinit var handle: DisposableHandle
9695

97-
@Volatile
98-
var disposer: DisposeHandlersOnCancel? = null
96+
private val _disposer = atomic<DisposeHandlersOnCancel?>(null)
97+
var disposer: DisposeHandlersOnCancel?
98+
get() = _disposer.value
99+
set(value) { _disposer.value = value }
99100

100101
override fun invoke(cause: Throwable?) {
101102
if (cause != null) {
102103
val token = continuation.tryResumeWithException(cause)
103104
if (token != null) {
104105
continuation.completeResume(token)
105106
// volatile read of disposer AFTER continuation is complete
106-
val disposer = this.disposer
107107
// and if disposer was already set (all handlers where already installed, then dispose them all)
108-
if (disposer != null) disposer.disposeAll()
108+
disposer?.disposeAll()
109109
}
110110
} else if (notCompletedCount.decrementAndGet() == 0) {
111111
continuation.resume(deferreds.map { it.getCompleted() })

kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt

+4-2
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,10 @@ internal open class CancellableContinuationImpl<in T>(
6363
*/
6464
private val _state = atomic<Any?>(Active)
6565

66-
@Volatile
67-
private var parentHandle: DisposableHandle? = null
66+
private val _parentHandle = atomic<DisposableHandle?>(null)
67+
private var parentHandle: DisposableHandle?
68+
get() = _parentHandle.value
69+
set(value) { _parentHandle.value = value }
6870

6971
internal val state: Any? get() = _state.value
7072

kotlinx-coroutines-core/common/src/EventLoop.common.kt

+5-4
Original file line numberDiff line numberDiff line change
@@ -182,15 +182,16 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
182182
// Allocated only only once
183183
private val _delayed = atomic<DelayedTaskQueue?>(null)
184184

185-
@Volatile
186-
private var isCompleted = false
185+
private val _isCompleted = atomic(false)
186+
private var isCompleted
187+
get() = _isCompleted.value
188+
set(value) { _isCompleted.value = value }
187189

188190
override val isEmpty: Boolean get() {
189191
if (!isUnconfinedQueueEmpty) return false
190192
val delayed = _delayed.value
191193
if (delayed != null && !delayed.isEmpty) return false
192-
val queue = _queue.value
193-
return when (queue) {
194+
return when (val queue = _queue.value) {
194195
null -> true
195196
is Queue<*> -> queue.isEmpty
196197
else -> queue === CLOSED_EMPTY

kotlinx-coroutines-core/common/src/JobSupport.kt

+27-16
Original file line numberDiff line numberDiff line change
@@ -127,9 +127,10 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
127127
// Note: use shared objects while we have no listeners
128128
private val _state = atomic<Any?>(if (active) EMPTY_ACTIVE else EMPTY_NEW)
129129

130-
@Volatile
131-
@JvmField
132-
internal var parentHandle: ChildHandle? = null
130+
private val _parentHandle = atomic<ChildHandle?>(null)
131+
internal var parentHandle: ChildHandle?
132+
get() = _parentHandle.value
133+
set(value) { _parentHandle.value = value }
133134

134135
// ------------ initialization ------------
135136

@@ -1019,23 +1020,33 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
10191020
@Suppress("UNCHECKED_CAST")
10201021
private class Finishing(
10211022
override val list: NodeList,
1022-
@Volatile
1023-
@JvmField var isCompleting: Boolean,
1024-
@Volatile
1025-
@JvmField var rootCause: Throwable? // NOTE: rootCause is kept even when SEALED
1023+
isCompleting: Boolean,
1024+
rootCause: Throwable?
10261025
) : SynchronizedObject(), Incomplete {
1027-
@Volatile
1028-
private var _exceptionsHolder: Any? = null // Contains null | Throwable | ArrayList | SEALED
1026+
private val _isCompleting = atomic(isCompleting)
1027+
var isCompleting: Boolean
1028+
get() = _isCompleting.value
1029+
set(value) { _isCompleting.value = value }
1030+
1031+
private val _rootCause = atomic(rootCause)
1032+
var rootCause: Throwable? // NOTE: rootCause is kept even when SEALED
1033+
get() = _rootCause.value
1034+
set(value) { _rootCause.value = value }
1035+
1036+
private val _exceptionsHolder = atomic<Any?>(null)
1037+
private var exceptionsHolder: Any? // Contains null | Throwable | ArrayList | SEALED
1038+
get() = _exceptionsHolder.value
1039+
set(value) { _exceptionsHolder.value = value }
10291040

10301041
// NotE: cannot be modified when sealed
1031-
val isSealed: Boolean get() = _exceptionsHolder === SEALED
1042+
val isSealed: Boolean get() = exceptionsHolder === SEALED
10321043
val isCancelling: Boolean get() = rootCause != null
10331044
override val isActive: Boolean get() = rootCause == null // !isCancelling
10341045

10351046
// Seals current state and returns list of exceptions
10361047
// guarded by `synchronized(this)`
10371048
fun sealLocked(proposedException: Throwable?): List<Throwable> {
1038-
val list = when(val eh = _exceptionsHolder) { // volatile read
1049+
val list = when(val eh = exceptionsHolder) { // volatile read
10391050
null -> allocateList()
10401051
is Throwable -> allocateList().also { it.add(eh) }
10411052
is ArrayList<*> -> eh as ArrayList<Throwable>
@@ -1044,7 +1055,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
10441055
val rootCause = this.rootCause // volatile read
10451056
rootCause?.let { list.add(0, it) } // note -- rootCause goes to the beginning
10461057
if (proposedException != null && proposedException != rootCause) list.add(proposedException)
1047-
_exceptionsHolder = SEALED
1058+
exceptionsHolder = SEALED
10481059
return list
10491060
}
10501061

@@ -1056,11 +1067,11 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
10561067
return
10571068
}
10581069
if (exception === rootCause) return // nothing to do
1059-
when (val eh = _exceptionsHolder) { // volatile read
1060-
null -> _exceptionsHolder = exception
1070+
when (val eh = exceptionsHolder) { // volatile read
1071+
null -> exceptionsHolder = exception
10611072
is Throwable -> {
10621073
if (exception === eh) return // nothing to do
1063-
_exceptionsHolder = allocateList().apply {
1074+
exceptionsHolder = allocateList().apply {
10641075
add(eh)
10651076
add(exception)
10661077

@@ -1074,7 +1085,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
10741085
private fun allocateList() = ArrayList<Throwable>(4)
10751086

10761087
override fun toString(): String =
1077-
"Finishing[cancelling=$isCancelling, completing=$isCompleting, rootCause=$rootCause, exceptions=$_exceptionsHolder, list=$list]"
1088+
"Finishing[cancelling=$isCancelling, completing=$isCompleting, rootCause=$rootCause, exceptions=$exceptionsHolder, list=$list]"
10781089
}
10791090

10801091
private val Incomplete.isCancelling: Boolean

kotlinx-coroutines-core/common/src/channels/ArrayBroadcastChannel.kt

+20-10
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@
44

55
package kotlinx.coroutines.channels
66

7+
import kotlinx.atomicfu.*
78
import kotlinx.coroutines.*
89
import kotlinx.coroutines.internal.*
910
import kotlinx.coroutines.selects.*
10-
import kotlin.jvm.*
1111

1212
/**
1313
* Broadcast channel with array buffer of a fixed [capacity].
@@ -44,12 +44,21 @@ internal class ArrayBroadcastChannel<E>(
4444

4545
// head & tail are Long (64 bits) and we assume that they never wrap around
4646
// head, tail, and size are guarded by bufferLock
47-
@Volatile
48-
private var head: Long = 0 // do modulo on use of head
49-
@Volatile
50-
private var tail: Long = 0 // do modulo on use of tail
51-
@Volatile
52-
private var size: Int = 0
47+
48+
private val _head = atomic(0L)
49+
private var head: Long // do modulo on use of head
50+
get() = _head.value
51+
set(value) { _head.value = value }
52+
53+
private val _tail = atomic(0L)
54+
private var tail: Long // do modulo on use of tail
55+
get() = _tail.value
56+
set(value) { _tail.value = value }
57+
58+
private val _size = atomic(0)
59+
private var size: Int
60+
get() = _size.value
61+
set(value) { _size.value = value }
5362

5463
private val subscribers = subscriberList<Subscriber<E>>()
5564

@@ -199,9 +208,10 @@ internal class ArrayBroadcastChannel<E>(
199208
) : AbstractChannel<E>(), ReceiveChannel<E> {
200209
private val subLock = ReentrantLock()
201210

202-
@Volatile
203-
@JvmField
204-
var subHead: Long = 0 // guarded by subLock
211+
private val _subHead = atomic(0L)
212+
var subHead: Long // guarded by subLock
213+
get() = _subHead.value
214+
set(value) { _subHead.value = value }
205215

206216
override val isBufferAlwaysEmpty: Boolean get() = false
207217
override val isBufferEmpty: Boolean get() = subHead >= broadcastChannel.tail

kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt

+6-3
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@
44

55
package kotlinx.coroutines.channels
66

7+
import kotlinx.atomicfu.*
78
import kotlinx.coroutines.*
89
import kotlinx.coroutines.internal.*
910
import kotlinx.coroutines.selects.*
10-
import kotlin.jvm.*
1111
import kotlin.math.*
1212

1313
/**
@@ -36,8 +36,11 @@ internal open class ArrayChannel<E>(
3636
*/
3737
private var buffer: Array<Any?> = arrayOfNulls<Any?>(min(capacity, 8))
3838
private var head: Int = 0
39-
@Volatile
40-
private var size: Int = 0 // Invariant: size <= capacity
39+
40+
private val _size = atomic(0)
41+
private var size: Int // Invariant: size <= capacity
42+
get() = _size.value
43+
set(value) { _size.value = value }
4144

4245
protected final override val isBufferAlwaysEmpty: Boolean get() = false
4346
protected final override val isBufferEmpty: Boolean get() = size == 0

kotlinx-coroutines-core/common/src/selects/Select.kt

+4-2
Original file line numberDiff line numberDiff line change
@@ -232,8 +232,10 @@ internal class SelectBuilderImpl<in R>(
232232
private val _result = atomic<Any?>(UNDECIDED)
233233

234234
// cancellability support
235-
@Volatile
236-
private var parentHandle: DisposableHandle? = null
235+
private val _parentHandle = atomic<DisposableHandle?>(null)
236+
private var parentHandle: DisposableHandle?
237+
get() = _parentHandle.value
238+
set(value) { _parentHandle.value = value }
237239

238240
/* Result state machine
239241

0 commit comments

Comments
 (0)