Skip to content

Commit 0c6de5a

Browse files
committed
Make ConflatedBroadcastChannel allocation-free
1 parent 0cf99bc commit 0c6de5a

File tree

1 file changed

+99
-177
lines changed

1 file changed

+99
-177
lines changed

common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannel.kt

+99-177
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,15 @@
1616

1717
package kotlinx.coroutines.experimental.channels
1818

19-
import kotlinx.atomicfu.*
20-
import kotlinx.coroutines.experimental.internal.*
21-
import kotlinx.coroutines.experimental.internalAnnotations.*
22-
import kotlinx.coroutines.experimental.intrinsics.*
23-
import kotlinx.coroutines.experimental.selects.*
19+
import kotlinx.coroutines.experimental.internal.ReentrantLock
20+
import kotlinx.coroutines.experimental.internal.Symbol
21+
import kotlinx.coroutines.experimental.internal.subscriberList
22+
import kotlinx.coroutines.experimental.internal.withLock
23+
import kotlinx.coroutines.experimental.internalAnnotations.JvmField
24+
import kotlinx.coroutines.experimental.internalAnnotations.Volatile
25+
import kotlinx.coroutines.experimental.intrinsics.startCoroutineUndispatched
26+
import kotlinx.coroutines.experimental.selects.SelectClause2
27+
import kotlinx.coroutines.experimental.selects.SelectInstance
2428

2529
/**
2630
* Broadcasts the most recently sent element (aka [value]) to all [openSubscription] subscribers.
@@ -33,44 +37,22 @@ import kotlinx.coroutines.experimental.selects.*
3337
* A secondary constructor can be used to create an instance of this class that already holds a value.
3438
* This channel is also created by `BroadcastChannel(Channel.CONFLATED)` factory function invocation.
3539
*
36-
* This implementation is fully lock-free. In this implementation
40+
* This implementation is synchronized. In this implementation
3741
* [opening][openSubscription] and [closing][ReceiveChannel.cancel] subscription takes O(N) time, where N is the
3842
* number of subscribers.
3943
*/
4044
public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
41-
/**
42-
* Creates an instance of this class that already holds a value.
43-
*
44-
* It is as a shortcut to creating an instance with a default constructor and
45-
* immediately sending an element: `ConflatedBroadcastChannel().apply { offer(value) }`.
46-
*/
47-
constructor(value: E) : this() {
48-
_state.lazySet(State<E>(value, null))
49-
}
50-
51-
private val _state = atomic<Any>(INITIAL_STATE) // State | Closed
52-
private val _updating = atomic(0)
53-
54-
private companion object {
55-
@JvmField
56-
val CLOSED = Closed(null)
57-
58-
@JvmField
59-
val UNDEFINED = Symbol("UNDEFINED")
6045

61-
@JvmField
62-
val INITIAL_STATE = State<Any?>(UNDEFINED, null)
46+
public constructor(value: E) : this() {
47+
_state = value
6348
}
6449

65-
private class State<E>(
66-
@JvmField val value: Any?, // UNDEFINED | E
67-
@JvmField val subscribers: Array<Subscriber<E>>?
68-
)
50+
private val _lock = ReentrantLock()
6951

70-
private class Closed(@JvmField val closeCause: Throwable?) {
71-
val sendException: Throwable get() = closeCause ?: ClosedSendChannelException(DEFAULT_CLOSE_MESSAGE)
72-
val valueException: Throwable get() = closeCause ?: IllegalStateException(DEFAULT_CLOSE_MESSAGE)
73-
}
52+
@Volatile
53+
private var _state: Any? = UNDEFINED
54+
55+
private var _subscribers = subscriberList<Subscriber>()
7456

7557
/**
7658
* The most recently sent element to this channel.
@@ -80,184 +62,124 @@ public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
8062
* It throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
8163
*/
8264
@Suppress("UNCHECKED_CAST")
83-
public val value: E get() {
84-
_state.loop { state ->
85-
when (state) {
86-
is Closed -> throw state.valueException
87-
is State<*> -> {
88-
if (state.value === UNDEFINED) throw IllegalStateException("No value")
89-
return state.value as E
90-
}
91-
else -> error("Invalid state $state")
92-
}
65+
public val value: E
66+
get() {
67+
val state = _state
68+
if (state === UNDEFINED) throw IllegalStateException("No value")
69+
(state as? Closed)?.run { throw valueException }
70+
return state as E
9371
}
94-
}
9572

9673
/**
9774
* The most recently sent element to this channel or `null` when this class is constructed without
9875
* initial value and no value was sent yet or if it was [closed][close].
9976
*/
10077
@Suppress("UNCHECKED_CAST")
101-
public val valueOrNull: E? get() {
102-
val state = _state.value
103-
when (state) {
104-
is Closed -> return null
105-
is State<*> -> {
106-
if (state.value === UNDEFINED) return null
107-
return state.value as E
78+
public val valueOrNull: E?
79+
get() {
80+
val state = _state
81+
return when {
82+
state === UNDEFINED -> null
83+
state is Closed -> state.closeCause?.let { throw it }
84+
else -> state as E
10885
}
109-
else -> error("Invalid state $state")
11086
}
111-
}
11287

113-
public override val isClosedForSend: Boolean get() = _state.value is Closed
11488
public override val isFull: Boolean get() = false
11589

116-
@Suppress("UNCHECKED_CAST")
117-
public override fun openSubscription(): ReceiveChannel<E> {
118-
val subscriber = Subscriber<E>(this)
119-
_state.loop { state ->
120-
when (state) {
121-
is Closed -> {
122-
subscriber.close(state.closeCause)
123-
return subscriber
124-
}
125-
is State<*> -> {
126-
if (state.value !== UNDEFINED)
127-
subscriber.offerInternal(state.value as E)
128-
val update = State(state.value, addSubscriber((state as State<E>).subscribers, subscriber))
129-
if (_state.compareAndSet(state, update))
130-
return subscriber
131-
}
132-
else -> error("Invalid state $state")
133-
}
134-
}
135-
}
90+
public override val isClosedForSend: Boolean get() = _state is Closed
13691

137-
@Suppress("UNCHECKED_CAST")
138-
private fun closeSubscriber(subscriber: Subscriber<E>) {
139-
_state.loop { state ->
140-
when (state) {
141-
is Closed -> return
142-
is State<*> -> {
143-
val update = State(state.value, removeSubscriber((state as State<E>).subscribers!!, subscriber))
144-
if (_state.compareAndSet(state, update))
145-
return
92+
public override val onSend: SelectClause2<E, SendChannel<E>>
93+
get() = object : SelectClause2<E, SendChannel<E>> {
94+
override fun <R> registerSelectClause2(select: SelectInstance<R>, param: E, block: suspend (SendChannel<E>) -> R) {
95+
if (!select.trySelect(null)) return
96+
offerInternal(param)?.let {
97+
select.resumeSelectCancellableWithException(it.sendException)
98+
return
14699
}
147-
else -> error("Invalid state $state")
100+
block.startCoroutineUndispatched(receiver = this@ConflatedBroadcastChannel, completion = select.completion)
148101
}
149102
}
150-
}
151-
152-
private fun addSubscriber(list: Array<Subscriber<E>>?, subscriber: Subscriber<E>): Array<Subscriber<E>> {
153-
if (list == null) return Array(1) { subscriber }
154-
return list + subscriber
155-
}
156103

157-
@Suppress("UNCHECKED_CAST")
158-
private fun removeSubscriber(list: Array<Subscriber<E>>, subscriber: Subscriber<E>): Array<Subscriber<E>>? {
159-
val n = list.size
160-
val i = list.indexOf(subscriber)
161-
check(i >= 0)
162-
if (n == 1) return null
163-
val update = arrayOfNulls<Subscriber<E>>(n - 1)
164-
arraycopy(list, 0, update, 0, i)
165-
arraycopy(list, i + 1, update, i, n - i - 1)
166-
return update as Array<Subscriber<E>>
167-
}
104+
public override fun cancel(cause: Throwable?): Boolean = close(cause)
168105

169-
@Suppress("UNCHECKED_CAST")
170106
public override fun close(cause: Throwable?): Boolean {
171-
_state.loop { state ->
172-
when (state) {
173-
is Closed -> return false
174-
is State<*> -> {
175-
val update = if (cause == null) CLOSED else Closed(cause)
176-
if (_state.compareAndSet(state, update)) {
177-
(state as State<E>).subscribers?.forEach { it.close(cause) }
178-
return true
179-
}
180-
}
181-
else -> error("Invalid state $state")
182-
}
107+
_lock.withLock {
108+
if (_state is Closed) return false
109+
_state = Closed(cause)
110+
111+
// dispose all subscribers
112+
_subscribers.forEach { it.close(cause) }
113+
_subscribers.clear()
114+
return true
183115
}
184116
}
185117

186-
/**
187-
* Closes this broadcast channel. Same as [close].
188-
*/
189-
public override fun cancel(cause: Throwable?): Boolean = close(cause)
190-
191-
/**
192-
* Sends the value to all subscribed receives and stores this value as the most recent state for
193-
* future subscribers. This implementation never suspends.
194-
* It throws exception if the channel [isClosedForSend] (see [close] for details).
195-
*/
196-
public override suspend fun send(element: E) {
197-
offerInternal(element)?.let { throw it.sendException }
198-
}
199-
200-
/**
201-
* Sends the value to all subscribed receives and stores this value as the most recent state for
202-
* future subscribers. This implementation always returns `true`.
203-
* It throws exception if the channel [isClosedForSend] (see [close] for details).
204-
*/
205118
public override fun offer(element: E): Boolean {
206119
offerInternal(element)?.let { throw it.sendException }
207120
return true
208121
}
209122

210-
@Suppress("UNCHECKED_CAST")
211123
private fun offerInternal(element: E): Closed? {
212-
// If some other thread is updating the state in its offer operation we assume that our offer had linearized
213-
// before that offer (we lost) and that offer overwrote us and conflated our offer.
214-
if (!_updating.compareAndSet(0, 1)) return null
215-
try {
216-
_state.loop { state ->
217-
when (state) {
218-
is Closed -> return state
219-
is State<*> -> {
220-
val update = State(element, (state as State<E>).subscribers)
221-
if (_state.compareAndSet(state, update)) {
222-
// Note: Using offerInternal here to ignore the case when this subscriber was
223-
// already concurrently closed (assume the close had conflated our offer for this
224-
// particular subscriber).
225-
state.subscribers?.forEach { it.offerInternal(element) }
226-
return null
227-
}
228-
}
229-
else -> error("Invalid state $state")
230-
}
124+
if (_lock.tryLock()) {
125+
try {
126+
(_state as? Closed)?.let { return it }
127+
128+
_state = element
129+
_subscribers.forEach { it.tryOffer(element) }
130+
return null
131+
} finally {
132+
_lock.unlock()
231133
}
232-
} finally {
233-
_updating.value = 0 // reset the updating flag to zero even when something goes wrong
134+
} else {
135+
return _state as? Closed
234136
}
235137
}
236138

237-
public override val onSend: SelectClause2<E, SendChannel<E>>
238-
get() = object : SelectClause2<E, SendChannel<E>> {
239-
override fun <R> registerSelectClause2(select: SelectInstance<R>, param: E, block: suspend (SendChannel<E>) -> R) {
240-
registerSelectSend(select, param, block)
139+
public override fun openSubscription(): ReceiveChannel<E> {
140+
val subscriber = Subscriber()
141+
_subscribers.add(subscriber)
142+
143+
do {
144+
val state = _state
145+
@Suppress("UNCHECKED_CAST")
146+
when {
147+
state is Closed -> subscriber.close(state.closeCause)
148+
state !== UNDEFINED -> subscriber.tryOffer(state as E)
241149
}
242-
}
150+
// manage offerInternal/close contention
151+
} while (_state !== state)
243152

244-
private fun <R> registerSelectSend(select: SelectInstance<R>, element: E, block: suspend (SendChannel<E>) -> R) {
245-
if (!select.trySelect(null)) return
246-
offerInternal(element)?.let {
247-
select.resumeSelectCancellableWithException(it.sendException)
248-
return
249-
}
250-
block.startCoroutineUndispatched(receiver = this, completion = select.completion)
153+
if (subscriber.isClosedForSend) _subscribers.remove(subscriber)
154+
return subscriber
155+
}
156+
157+
public override suspend fun send(element: E) {
158+
offerInternal(element)?.run { throw sendException }
251159
}
252160

253-
private class Subscriber<E>(
254-
private val broadcastChannel: ConflatedBroadcastChannel<E>
255-
) : ConflatedChannel<E>(), ReceiveChannel<E>, SubscriptionReceiveChannel<E> {
161+
private inner class Subscriber : ConflatedChannel<E>() {
162+
163+
/**
164+
* Offer an element without throw exception
165+
*/
166+
fun tryOffer(element: E) {
167+
super.offerInternal(element)
168+
}
169+
256170
override fun cancel(cause: Throwable?): Boolean =
257-
close(cause).also { closed ->
258-
if (closed) broadcastChannel.closeSubscriber(this)
259-
}
171+
super.cancel(cause).also { closed ->
172+
if (closed) this@ConflatedBroadcastChannel._subscribers.remove(this)
173+
}
174+
}
260175

261-
public override fun offerInternal(element: E): Any = super.offerInternal(element)
176+
private class Closed(@JvmField val closeCause: Throwable?) {
177+
val sendException: Throwable get() = closeCause ?: ClosedSendChannelException(DEFAULT_CLOSE_MESSAGE)
178+
val valueException: Throwable get() = closeCause ?: IllegalStateException(DEFAULT_CLOSE_MESSAGE)
179+
}
180+
181+
private companion object {
182+
@JvmField
183+
val UNDEFINED = Symbol("UNDEFINED")
262184
}
263-
}
185+
}

0 commit comments

Comments
 (0)