Skip to content

Commit 46f2356

Browse files
committed
Make ConflatedBroadcastChannel allocation-free
1 parent 82a5dfd commit 46f2356

File tree

1 file changed

+123
-195
lines changed

1 file changed

+123
-195
lines changed
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,17 @@
11
/*
2-
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
315
*/
416

517
package kotlinx.coroutines.experimental.channels
@@ -8,6 +20,15 @@ import kotlinx.atomicfu.*
820
import kotlinx.coroutines.experimental.internal.*
921
import kotlinx.coroutines.experimental.intrinsics.*
1022
import kotlinx.coroutines.experimental.selects.*
23+
import kotlinx.coroutines.experimental.internal.ReentrantLock
24+
import kotlinx.coroutines.experimental.internal.Symbol
25+
import kotlinx.coroutines.experimental.internal.subscriberList
26+
import kotlinx.coroutines.experimental.internal.withLock
27+
import kotlinx.coroutines.experimental.internalAnnotations.JvmField
28+
import kotlinx.coroutines.experimental.internalAnnotations.Volatile
29+
import kotlinx.coroutines.experimental.intrinsics.startCoroutineUndispatched
30+
import kotlinx.coroutines.experimental.selects.SelectClause2
31+
import kotlinx.coroutines.experimental.selects.SelectInstance
1132

1233
/**
1334
* Broadcasts the most recently sent element (aka [value]) to all [openSubscription] subscribers.
@@ -20,46 +41,24 @@ import kotlinx.coroutines.experimental.selects.*
2041
* A secondary constructor can be used to create an instance of this class that already holds a value.
2142
* This channel is also created by `BroadcastChannel(Channel.CONFLATED)` factory function invocation.
2243
*
23-
* This implementation is fully lock-free. In this implementation
44+
* This implementation is synchronized. In this implementation
2445
* [opening][openSubscription] and [closing][ReceiveChannel.cancel] subscription takes O(N) time, where N is the
2546
* number of subscribers.
2647
*/
2748
public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
28-
/**
29-
* Creates an instance of this class that already holds a value.
30-
*
31-
* It is as a shortcut to creating an instance with a default constructor and
32-
* immediately sending an element: `ConflatedBroadcastChannel().apply { offer(value) }`.
33-
*/
34-
constructor(value: E) : this() {
35-
_state.lazySet(State<E>(value, null))
49+
50+
public constructor(value: E) : this() {
51+
_state = value
3652
}
3753

38-
private val _state = atomic<Any>(INITIAL_STATE) // State | Closed
39-
private val _updating = atomic(0)
40-
// State transitions: null -> handler -> HANDLER_INVOKED
41-
private val onCloseHandler = atomic<Any?>(null)
54+
private val _lock = ReentrantLock()
4255

43-
private companion object {
44-
@JvmField
45-
val CLOSED = Closed(null)
56+
@Volatile
57+
private var _state: Any? = UNDEFINED
4658

47-
@JvmField
48-
val UNDEFINED = Symbol("UNDEFINED")
59+
private val _subscribers = subscriberList<Subscriber>()
4960

50-
@JvmField
51-
val INITIAL_STATE = State<Any?>(UNDEFINED, null)
52-
}
53-
54-
private class State<E>(
55-
@JvmField val value: Any?, // UNDEFINED | E
56-
@JvmField val subscribers: Array<Subscriber<E>>?
57-
)
58-
59-
private class Closed(@JvmField val closeCause: Throwable?) {
60-
val sendException: Throwable get() = closeCause ?: ClosedSendChannelException(DEFAULT_CLOSE_MESSAGE)
61-
val valueException: Throwable get() = closeCause ?: IllegalStateException(DEFAULT_CLOSE_MESSAGE)
62-
}
61+
private val _onCloseHandlers = subscriberList<Handler>()
6362

6463
/**
6564
* The most recently sent element to this channel.
@@ -69,211 +68,140 @@ public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
6968
* It throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
7069
*/
7170
@Suppress("UNCHECKED_CAST")
72-
public val value: E get() {
73-
_state.loop { state ->
74-
when (state) {
75-
is Closed -> throw state.valueException
76-
is State<*> -> {
77-
if (state.value === UNDEFINED) throw IllegalStateException("No value")
78-
return state.value as E
79-
}
80-
else -> error("Invalid state $state")
81-
}
71+
public val value: E
72+
get() {
73+
val state = _state
74+
if (state === UNDEFINED) throw IllegalStateException("No value")
75+
(state as? Closed)?.run { throw valueException }
76+
return state as E
8277
}
83-
}
8478

8579
/**
8680
* The most recently sent element to this channel or `null` when this class is constructed without
8781
* initial value and no value was sent yet or if it was [closed][close].
8882
*/
8983
@Suppress("UNCHECKED_CAST")
90-
public val valueOrNull: E? get() {
91-
val state = _state.value
92-
when (state) {
93-
is Closed -> return null
94-
is State<*> -> {
95-
if (state.value === UNDEFINED) return null
96-
return state.value as E
84+
public val valueOrNull: E?
85+
get() {
86+
val state = _state
87+
return when {
88+
state === UNDEFINED -> null
89+
state is Closed -> state.closeCause?.let { throw it }
90+
else -> state as E
9791
}
98-
else -> error("Invalid state $state")
9992
}
100-
}
10193

102-
public override val isClosedForSend: Boolean get() = _state.value is Closed
10394
public override val isFull: Boolean get() = false
10495

105-
@Suppress("UNCHECKED_CAST")
106-
public override fun openSubscription(): ReceiveChannel<E> {
107-
val subscriber = Subscriber(this)
108-
_state.loop { state ->
109-
when (state) {
110-
is Closed -> {
111-
subscriber.close(state.closeCause)
112-
return subscriber
113-
}
114-
is State<*> -> {
115-
if (state.value !== UNDEFINED)
116-
subscriber.offerInternal(state.value as E)
117-
val update = State(state.value, addSubscriber((state as State<E>).subscribers, subscriber))
118-
if (_state.compareAndSet(state, update))
119-
return subscriber
120-
}
121-
else -> error("Invalid state $state")
122-
}
123-
}
124-
}
96+
public override val isClosedForSend: Boolean get() = _state is Closed
12597

126-
@Suppress("UNCHECKED_CAST")
127-
private fun closeSubscriber(subscriber: Subscriber<E>) {
128-
_state.loop { state ->
129-
when (state) {
130-
is Closed -> return
131-
is State<*> -> {
132-
val update = State(state.value, removeSubscriber((state as State<E>).subscribers!!, subscriber))
133-
if (_state.compareAndSet(state, update))
134-
return
98+
public override val onSend: SelectClause2<E, SendChannel<E>>
99+
get() = object : SelectClause2<E, SendChannel<E>> {
100+
override fun <R> registerSelectClause2(select: SelectInstance<R>, param: E, block: suspend (SendChannel<E>) -> R) {
101+
if (!select.trySelect(null)) return
102+
offerInternal(param)?.let {
103+
select.resumeSelectCancellableWithException(it.sendException)
104+
return
135105
}
136-
else -> error("Invalid state $state")
106+
block.startCoroutineUnintercepted(receiver = this@ConflatedBroadcastChannel, completion = select.completion)
137107
}
138108
}
139-
}
140-
141-
private fun addSubscriber(list: Array<Subscriber<E>>?, subscriber: Subscriber<E>): Array<Subscriber<E>> {
142-
if (list == null) return Array(1) { subscriber }
143-
return list + subscriber
144-
}
145109

146-
@Suppress("UNCHECKED_CAST")
147-
private fun removeSubscriber(list: Array<Subscriber<E>>, subscriber: Subscriber<E>): Array<Subscriber<E>>? {
148-
val n = list.size
149-
val i = list.indexOf(subscriber)
150-
check(i >= 0)
151-
if (n == 1) return null
152-
val update = arrayOfNulls<Subscriber<E>>(n - 1)
153-
arraycopy(list, 0, update, 0, i)
154-
arraycopy(list, i + 1, update, i, n - i - 1)
155-
return update as Array<Subscriber<E>>
156-
}
110+
public override fun cancel(cause: Throwable?): Boolean = close(cause)
157111

158-
@Suppress("UNCHECKED_CAST")
159112
public override fun close(cause: Throwable?): Boolean {
160-
_state.loop { state ->
161-
when (state) {
162-
is Closed -> return false
163-
is State<*> -> {
164-
val update = if (cause == null) CLOSED else Closed(cause)
165-
if (_state.compareAndSet(state, update)) {
166-
(state as State<E>).subscribers?.forEach { it.close(cause) }
167-
invokeOnCloseHandler(cause)
168-
return true
169-
}
170-
}
171-
else -> error("Invalid state $state")
172-
}
113+
_lock.withLock {
114+
if (_state is Closed) return false
115+
_state = Closed(cause)
116+
117+
// dispose all subscribers
118+
_subscribers.forEach { it.close(cause) }
119+
_subscribers.clear()
120+
121+
_onCloseHandlers.forEach { it.invoke(cause) }
122+
_onCloseHandlers.clear()
123+
124+
return true
173125
}
174126
}
175127

176-
private fun invokeOnCloseHandler(cause: Throwable?) {
177-
val handler = onCloseHandler.value
178-
if (handler !== null && handler !== HANDLER_INVOKED
179-
&& onCloseHandler.compareAndSet(handler, HANDLER_INVOKED)) {
180-
(handler as Handler)(cause)
181-
}
128+
public override fun offer(element: E): Boolean {
129+
offerInternal(element)?.let { throw it.sendException }
130+
return true
182131
}
183132

184-
override fun invokeOnClose(handler: Handler) {
185-
// Intricate dance for concurrent invokeOnClose and close
186-
if (!onCloseHandler.compareAndSet(null, handler)) {
187-
val value = onCloseHandler.value
188-
if (value === HANDLER_INVOKED) {
189-
throw IllegalStateException("Another handler was already registered and successfully invoked")
190-
} else {
191-
throw IllegalStateException("Another handler was already registered: $value")
133+
private fun offerInternal(element: E): Closed? {
134+
if (_lock.tryLock()) {
135+
try {
136+
(_state as? Closed)?.let { return it }
137+
138+
_state = element
139+
_subscribers.forEach { it.tryOffer(element) }
140+
return null
141+
} finally {
142+
_lock.unlock()
192143
}
193144
} else {
194-
val state = _state.value
195-
if (state is Closed && onCloseHandler.compareAndSet(handler, HANDLER_INVOKED)) {
196-
(handler)(state.closeCause)
197-
}
145+
return _state as? Closed
198146
}
199147
}
200148

201-
/**
202-
* Closes this broadcast channel. Same as [close].
203-
*/
204-
public override fun cancel(cause: Throwable?): Boolean = close(cause)
149+
public override fun openSubscription(): ReceiveChannel<E> {
150+
val subscriber = Subscriber()
151+
_subscribers.add(subscriber)
152+
153+
do {
154+
val state = _state
155+
@Suppress("UNCHECKED_CAST")
156+
when {
157+
state is Closed -> subscriber.close(state.closeCause)
158+
state !== UNDEFINED -> subscriber.tryOffer(state as E)
159+
}
160+
// manage offerInternal/close contention
161+
} while (_state !== state)
205162

206-
/**
207-
* Sends the value to all subscribed receives and stores this value as the most recent state for
208-
* future subscribers. This implementation never suspends.
209-
* It throws exception if the channel [isClosedForSend] (see [close] for details).
210-
*/
211-
public override suspend fun send(element: E) {
212-
offerInternal(element)?.let { throw it.sendException }
163+
if (subscriber.isClosedForSend) _subscribers.remove(subscriber)
164+
return subscriber
213165
}
214166

215-
/**
216-
* Sends the value to all subscribed receives and stores this value as the most recent state for
217-
* future subscribers. This implementation always returns `true`.
218-
* It throws exception if the channel [isClosedForSend] (see [close] for details).
219-
*/
220-
public override fun offer(element: E): Boolean {
221-
offerInternal(element)?.let { throw it.sendException }
222-
return true
167+
public override suspend fun send(element: E) {
168+
offerInternal(element)?.run { throw sendException }
223169
}
224170

225-
@Suppress("UNCHECKED_CAST")
226-
private fun offerInternal(element: E): Closed? {
227-
// If some other thread is updating the state in its offer operation we assume that our offer had linearized
228-
// before that offer (we lost) and that offer overwrote us and conflated our offer.
229-
if (!_updating.compareAndSet(0, 1)) return null
230-
try {
231-
_state.loop { state ->
232-
when (state) {
233-
is Closed -> return state
234-
is State<*> -> {
235-
val update = State(element, (state as State<E>).subscribers)
236-
if (_state.compareAndSet(state, update)) {
237-
// Note: Using offerInternal here to ignore the case when this subscriber was
238-
// already concurrently closed (assume the close had conflated our offer for this
239-
// particular subscriber).
240-
state.subscribers?.forEach { it.offerInternal(element) }
241-
return null
242-
}
243-
}
244-
else -> error("Invalid state $state")
245-
}
171+
override fun invokeOnClose(handler: Handler) {
172+
_lock.withLock {
173+
val state = _state
174+
if (state is Closed) {
175+
handler(state.closeCause)
176+
} else {
177+
_onCloseHandlers += handler
246178
}
247-
} finally {
248-
_updating.value = 0 // reset the updating flag to zero even when something goes wrong
249179
}
250180
}
251181

252-
public override val onSend: SelectClause2<E, SendChannel<E>>
253-
get() = object : SelectClause2<E, SendChannel<E>> {
254-
override fun <R> registerSelectClause2(select: SelectInstance<R>, param: E, block: suspend (SendChannel<E>) -> R) {
255-
registerSelectSend(select, param, block)
256-
}
257-
}
182+
private inner class Subscriber : ConflatedChannel<E>() {
258183

259-
private fun <R> registerSelectSend(select: SelectInstance<R>, element: E, block: suspend (SendChannel<E>) -> R) {
260-
if (!select.trySelect(null)) return
261-
offerInternal(element)?.let {
262-
select.resumeSelectCancellableWithException(it.sendException)
263-
return
184+
/**
185+
* Offer an element without throw exception
186+
*/
187+
fun tryOffer(element: E) {
188+
super.offerInternal(element)
264189
}
265-
block.startCoroutineUnintercepted(receiver = this, completion = select.completion)
266190
}
267191

268-
@Suppress("DEPRECATION")
269-
private class Subscriber<E>(
270-
private val broadcastChannel: ConflatedBroadcastChannel<E>
271-
) : ConflatedChannel<E>(), ReceiveChannel<E>, SubscriptionReceiveChannel<E> {
272-
override fun cancel(cause: Throwable?): Boolean =
273-
close(cause).also { closed ->
274-
if (closed) broadcastChannel.closeSubscriber(this)
192+
override fun cancel(cause: Throwable?): Boolean =
193+
super.cancel(cause).also { closed ->
194+
if (closed) this@ConflatedBroadcastChannel._subscribers.remove(this)
275195
}
196+
}
276197

277-
public override fun offerInternal(element: E): Any = super.offerInternal(element)
278-
}
198+
private class Closed(@JvmField val closeCause: Throwable?) {
199+
val sendException: Throwable get() = closeCause ?: ClosedSendChannelException(DEFAULT_CLOSE_MESSAGE)
200+
val valueException: Throwable get() = closeCause ?: IllegalStateException(DEFAULT_CLOSE_MESSAGE)
201+
}
202+
203+
private companion object {
204+
@JvmField
205+
val UNDEFINED = Symbol("UNDEFINED")
206+
}
279207
}

0 commit comments

Comments
 (0)