Skip to content

Commit f0f19f8

Browse files
committed
Make ConflatedBroadcastChannel allocation-free
1 parent 0cf99bc commit f0f19f8

File tree

1 file changed

+90
-178
lines changed

1 file changed

+90
-178
lines changed

common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannel.kt

+90-178
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,12 @@
1616

1717
package kotlinx.coroutines.experimental.channels
1818

19-
import kotlinx.atomicfu.*
20-
import kotlinx.coroutines.experimental.internal.*
21-
import kotlinx.coroutines.experimental.internalAnnotations.*
22-
import kotlinx.coroutines.experimental.intrinsics.*
23-
import kotlinx.coroutines.experimental.selects.*
19+
import kotlinx.coroutines.experimental.internal.Symbol
20+
import kotlinx.coroutines.experimental.internalAnnotations.JvmField
21+
import kotlinx.coroutines.experimental.internalAnnotations.Volatile
22+
import kotlinx.coroutines.experimental.intrinsics.startCoroutineUndispatched
23+
import kotlinx.coroutines.experimental.selects.SelectClause2
24+
import kotlinx.coroutines.experimental.selects.SelectInstance
2425

2526
/**
2627
* Broadcasts the most recently sent element (aka [value]) to all [openSubscription] subscribers.
@@ -33,44 +34,20 @@ import kotlinx.coroutines.experimental.selects.*
3334
* A secondary constructor can be used to create an instance of this class that already holds a value.
3435
* This channel is also created by `BroadcastChannel(Channel.CONFLATED)` factory function invocation.
3536
*
36-
* This implementation is fully lock-free. In this implementation
37+
* This implementation is synchronized. In this implementation
3738
* [opening][openSubscription] and [closing][ReceiveChannel.cancel] subscription takes O(N) time, where N is the
3839
* number of subscribers.
3940
*/
4041
public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
41-
/**
42-
* Creates an instance of this class that already holds a value.
43-
*
44-
* It is as a shortcut to creating an instance with a default constructor and
45-
* immediately sending an element: `ConflatedBroadcastChannel().apply { offer(value) }`.
46-
*/
47-
constructor(value: E) : this() {
48-
_state.lazySet(State<E>(value, null))
49-
}
50-
51-
private val _state = atomic<Any>(INITIAL_STATE) // State | Closed
52-
private val _updating = atomic(0)
53-
54-
private companion object {
55-
@JvmField
56-
val CLOSED = Closed(null)
57-
58-
@JvmField
59-
val UNDEFINED = Symbol("UNDEFINED")
6042

61-
@JvmField
62-
val INITIAL_STATE = State<Any?>(UNDEFINED, null)
43+
public constructor(value: E) : this() {
44+
_state = value
6345
}
6446

65-
private class State<E>(
66-
@JvmField val value: Any?, // UNDEFINED | E
67-
@JvmField val subscribers: Array<Subscriber<E>>?
68-
)
47+
@Volatile
48+
private var _state: Any? = UNDEFINED
6949

70-
private class Closed(@JvmField val closeCause: Throwable?) {
71-
val sendException: Throwable get() = closeCause ?: ClosedSendChannelException(DEFAULT_CLOSE_MESSAGE)
72-
val valueException: Throwable get() = closeCause ?: IllegalStateException(DEFAULT_CLOSE_MESSAGE)
73-
}
50+
private var _subscriptions: List<Subscription> = emptyList()
7451

7552
/**
7653
* The most recently sent element to this channel.
@@ -80,184 +57,119 @@ public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
8057
* It throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
8158
*/
8259
@Suppress("UNCHECKED_CAST")
83-
public val value: E get() {
84-
_state.loop { state ->
85-
when (state) {
86-
is Closed -> throw state.valueException
87-
is State<*> -> {
88-
if (state.value === UNDEFINED) throw IllegalStateException("No value")
89-
return state.value as E
90-
}
91-
else -> error("Invalid state $state")
92-
}
60+
public val value: E
61+
get() {
62+
val state = _state
63+
if (state === UNDEFINED) throw IllegalStateException("No value")
64+
(state as? Closed)?.run { throw valueException }
65+
return state as E
9366
}
94-
}
9567

9668
/**
9769
* The most recently sent element to this channel or `null` when this class is constructed without
9870
* initial value and no value was sent yet or if it was [closed][close].
9971
*/
10072
@Suppress("UNCHECKED_CAST")
101-
public val valueOrNull: E? get() {
102-
val state = _state.value
103-
when (state) {
104-
is Closed -> return null
105-
is State<*> -> {
106-
if (state.value === UNDEFINED) return null
107-
return state.value as E
73+
public val valueOrNull: E?
74+
get() {
75+
val state = _state
76+
return when {
77+
state === UNDEFINED -> null
78+
state is Closed -> state.closeCause?.let { throw it }
79+
else -> state as E
10880
}
109-
else -> error("Invalid state $state")
11081
}
111-
}
11282

113-
public override val isClosedForSend: Boolean get() = _state.value is Closed
11483
public override val isFull: Boolean get() = false
11584

116-
@Suppress("UNCHECKED_CAST")
117-
public override fun openSubscription(): ReceiveChannel<E> {
118-
val subscriber = Subscriber<E>(this)
119-
_state.loop { state ->
120-
when (state) {
121-
is Closed -> {
122-
subscriber.close(state.closeCause)
123-
return subscriber
124-
}
125-
is State<*> -> {
126-
if (state.value !== UNDEFINED)
127-
subscriber.offerInternal(state.value as E)
128-
val update = State(state.value, addSubscriber((state as State<E>).subscribers, subscriber))
129-
if (_state.compareAndSet(state, update))
130-
return subscriber
85+
public override val isClosedForSend: Boolean get() = _state is Closed
86+
87+
public override val onSend: SelectClause2<E, SendChannel<E>>
88+
get() = object : SelectClause2<E, SendChannel<E>> {
89+
override fun <R> registerSelectClause2(select: SelectInstance<R>, param: E, block: suspend (SendChannel<E>) -> R) {
90+
if (!select.trySelect(null)) return
91+
offerInternal(param)?.let {
92+
select.resumeSelectCancellableWithException(it.sendException)
93+
return
13194
}
132-
else -> error("Invalid state $state")
95+
block.startCoroutineUndispatched(receiver = this@ConflatedBroadcastChannel, completion = select.completion)
13396
}
13497
}
135-
}
13698

137-
@Suppress("UNCHECKED_CAST")
138-
private fun closeSubscriber(subscriber: Subscriber<E>) {
139-
_state.loop { state ->
140-
when (state) {
141-
is Closed -> return
142-
is State<*> -> {
143-
val update = State(state.value, removeSubscriber((state as State<E>).subscribers!!, subscriber))
144-
if (_state.compareAndSet(state, update))
145-
return
146-
}
147-
else -> error("Invalid state $state")
148-
}
99+
public override fun cancel(cause: Throwable?): Boolean = close(cause)
100+
101+
public override fun close(cause: Throwable?): Boolean {
102+
synchronized(this) {
103+
if (_state !is Closed) {
104+
val closed = Closed(cause)
105+
_state = closed
106+
107+
// dispose all subscriptions
108+
_subscriptions.forEach { it.close(cause) }
109+
_subscriptions = emptyList()
110+
return true
111+
} else
112+
return false
149113
}
150114
}
151115

152-
private fun addSubscriber(list: Array<Subscriber<E>>?, subscriber: Subscriber<E>): Array<Subscriber<E>> {
153-
if (list == null) return Array(1) { subscriber }
154-
return list + subscriber
155-
}
116+
public override fun offer(element: E): Boolean = offerInternal(element) == null
156117

157-
@Suppress("UNCHECKED_CAST")
158-
private fun removeSubscriber(list: Array<Subscriber<E>>, subscriber: Subscriber<E>): Array<Subscriber<E>>? {
159-
val n = list.size
160-
val i = list.indexOf(subscriber)
161-
check(i >= 0)
162-
if (n == 1) return null
163-
val update = arrayOfNulls<Subscriber<E>>(n - 1)
164-
arraycopy(list, 0, update, 0, i)
165-
arraycopy(list, i + 1, update, i, n - i - 1)
166-
return update as Array<Subscriber<E>>
118+
private fun offerInternal(element: E): Closed? {
119+
synchronized(this) {
120+
(_state as? Closed)?.let { return it }
121+
_state = element
122+
_subscriptions.forEach { it.offer(element) }
123+
}
124+
return null
167125
}
168126

169-
@Suppress("UNCHECKED_CAST")
170-
public override fun close(cause: Throwable?): Boolean {
171-
_state.loop { state ->
172-
when (state) {
173-
is Closed -> return false
174-
is State<*> -> {
175-
val update = if (cause == null) CLOSED else Closed(cause)
176-
if (_state.compareAndSet(state, update)) {
177-
(state as State<E>).subscribers?.forEach { it.close(cause) }
178-
return true
179-
}
127+
public override fun openSubscription(): ReceiveChannel<E> {
128+
val subscription = Subscription()
129+
synchronized(this) {
130+
val state = _state
131+
if (state is Closed) {
132+
subscription.close(state.closeCause)
133+
} else {
134+
if (state !== UNDEFINED) {
135+
// offer initial value
136+
@Suppress("UNCHECKED_CAST")
137+
subscription.offer(_state as E)
180138
}
181-
else -> error("Invalid state $state")
139+
140+
_subscriptions += subscription
182141
}
183142
}
143+
return subscription
184144
}
185145

186-
/**
187-
* Closes this broadcast channel. Same as [close].
188-
*/
189-
public override fun cancel(cause: Throwable?): Boolean = close(cause)
190-
191-
/**
192-
* Sends the value to all subscribed receives and stores this value as the most recent state for
193-
* future subscribers. This implementation never suspends.
194-
* It throws exception if the channel [isClosedForSend] (see [close] for details).
195-
*/
196146
public override suspend fun send(element: E) {
197-
offerInternal(element)?.let { throw it.sendException }
147+
offerInternal(element)?.run { throw sendException }
198148
}
199149

200-
/**
201-
* Sends the value to all subscribed receives and stores this value as the most recent state for
202-
* future subscribers. This implementation always returns `true`.
203-
* It throws exception if the channel [isClosedForSend] (see [close] for details).
204-
*/
205-
public override fun offer(element: E): Boolean {
206-
offerInternal(element)?.let { throw it.sendException }
207-
return true
208-
}
209-
210-
@Suppress("UNCHECKED_CAST")
211-
private fun offerInternal(element: E): Closed? {
212-
// If some other thread is updating the state in its offer operation we assume that our offer had linearized
213-
// before that offer (we lost) and that offer overwrote us and conflated our offer.
214-
if (!_updating.compareAndSet(0, 1)) return null
215-
try {
216-
_state.loop { state ->
217-
when (state) {
218-
is Closed -> return state
219-
is State<*> -> {
220-
val update = State(element, (state as State<E>).subscribers)
221-
if (_state.compareAndSet(state, update)) {
222-
// Note: Using offerInternal here to ignore the case when this subscriber was
223-
// already concurrently closed (assume the close had conflated our offer for this
224-
// particular subscriber).
225-
state.subscribers?.forEach { it.offerInternal(element) }
226-
return null
227-
}
228-
}
229-
else -> error("Invalid state $state")
230-
}
231-
}
232-
} finally {
233-
_updating.value = 0 // reset the updating flag to zero even when something goes wrong
150+
private fun removeSubscription(subscription: Subscription) {
151+
synchronized(this) {
152+
_subscriptions -= subscription
234153
}
235154
}
236155

237-
public override val onSend: SelectClause2<E, SendChannel<E>>
238-
get() = object : SelectClause2<E, SendChannel<E>> {
239-
override fun <R> registerSelectClause2(select: SelectInstance<R>, param: E, block: suspend (SendChannel<E>) -> R) {
240-
registerSelectSend(select, param, block)
241-
}
242-
}
243-
244-
private fun <R> registerSelectSend(select: SelectInstance<R>, element: E, block: suspend (SendChannel<E>) -> R) {
245-
if (!select.trySelect(null)) return
246-
offerInternal(element)?.let {
247-
select.resumeSelectCancellableWithException(it.sendException)
248-
return
156+
/**
157+
* This is the [openSubscription] return type.
158+
*/
159+
private inner class Subscription : ConflatedChannel<E>() {
160+
override fun onClosed(closed: kotlinx.coroutines.experimental.channels.Closed<E>) {
161+
removeSubscription(this)
162+
super.onClosed(closed)
249163
}
250-
block.startCoroutineUndispatched(receiver = this, completion = select.completion)
251164
}
252165

253-
private class Subscriber<E>(
254-
private val broadcastChannel: ConflatedBroadcastChannel<E>
255-
) : ConflatedChannel<E>(), ReceiveChannel<E>, SubscriptionReceiveChannel<E> {
256-
override fun cancel(cause: Throwable?): Boolean =
257-
close(cause).also { closed ->
258-
if (closed) broadcastChannel.closeSubscriber(this)
259-
}
166+
private class Closed(@JvmField val closeCause: Throwable?) {
167+
val sendException: Throwable get() = closeCause ?: ClosedSendChannelException(DEFAULT_CLOSE_MESSAGE)
168+
val valueException: Throwable get() = closeCause ?: IllegalStateException(DEFAULT_CLOSE_MESSAGE)
169+
}
260170

261-
public override fun offerInternal(element: E): Any = super.offerInternal(element)
171+
private companion object {
172+
@JvmField
173+
val UNDEFINED = Symbol("UNDEFINED")
262174
}
263-
}
175+
}

0 commit comments

Comments
 (0)