Skip to content

Commit 6057b54

Browse files
committed
Make ConflatedBroadcastChannel allocation-free
1 parent 9d31ffc commit 6057b54

File tree

1 file changed

+122
-200
lines changed

1 file changed

+122
-200
lines changed
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,30 @@
11
/*
2-
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
315
*/
416

517
package kotlinx.coroutines.experimental.channels
618

7-
import kotlinx.atomicfu.*
8-
import kotlinx.coroutines.experimental.internal.*
9-
import kotlinx.coroutines.experimental.internalAnnotations.*
10-
import kotlinx.coroutines.experimental.intrinsics.*
11-
import kotlinx.coroutines.experimental.selects.*
19+
import kotlinx.coroutines.experimental.internal.ReentrantLock
20+
import kotlinx.coroutines.experimental.internal.Symbol
21+
import kotlinx.coroutines.experimental.internal.subscriberList
22+
import kotlinx.coroutines.experimental.internal.withLock
23+
import kotlinx.coroutines.experimental.internalAnnotations.JvmField
24+
import kotlinx.coroutines.experimental.internalAnnotations.Volatile
25+
import kotlinx.coroutines.experimental.intrinsics.startCoroutineUndispatched
26+
import kotlinx.coroutines.experimental.selects.SelectClause2
27+
import kotlinx.coroutines.experimental.selects.SelectInstance
1228

1329
/**
1430
* Broadcasts the most recently sent element (aka [value]) to all [openSubscription] subscribers.
@@ -21,46 +37,24 @@ import kotlinx.coroutines.experimental.selects.*
2137
* A secondary constructor can be used to create an instance of this class that already holds a value.
2238
* This channel is also created by `BroadcastChannel(Channel.CONFLATED)` factory function invocation.
2339
*
24-
* This implementation is fully lock-free. In this implementation
40+
* This implementation is synchronized. In this implementation
2541
* [opening][openSubscription] and [closing][ReceiveChannel.cancel] subscription takes O(N) time, where N is the
2642
* number of subscribers.
2743
*/
2844
public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
29-
/**
30-
* Creates an instance of this class that already holds a value.
31-
*
32-
* It is as a shortcut to creating an instance with a default constructor and
33-
* immediately sending an element: `ConflatedBroadcastChannel().apply { offer(value) }`.
34-
*/
35-
constructor(value: E) : this() {
36-
_state.lazySet(State<E>(value, null))
45+
46+
public constructor(value: E) : this() {
47+
_state = value
3748
}
3849

39-
private val _state = atomic<Any>(INITIAL_STATE) // State | Closed
40-
private val _updating = atomic(0)
41-
// State transitions: null -> handler -> HANDLER_INVOKED
42-
private val onCloseHandler = atomic<Any?>(null)
50+
private val _lock = ReentrantLock()
4351

44-
private companion object {
45-
@JvmField
46-
val CLOSED = Closed(null)
52+
@Volatile
53+
private var _state: Any? = UNDEFINED
4754

48-
@JvmField
49-
val UNDEFINED = Symbol("UNDEFINED")
55+
private val _subscribers = subscriberList<Subscriber>()
5056

51-
@JvmField
52-
val INITIAL_STATE = State<Any?>(UNDEFINED, null)
53-
}
54-
55-
private class State<E>(
56-
@JvmField val value: Any?, // UNDEFINED | E
57-
@JvmField val subscribers: Array<Subscriber<E>>?
58-
)
59-
60-
private class Closed(@JvmField val closeCause: Throwable?) {
61-
val sendException: Throwable get() = closeCause ?: ClosedSendChannelException(DEFAULT_CLOSE_MESSAGE)
62-
val valueException: Throwable get() = closeCause ?: IllegalStateException(DEFAULT_CLOSE_MESSAGE)
63-
}
57+
private val _onCloseHandlers = subscriberList<Handler>()
6458

6559
/**
6660
* The most recently sent element to this channel.
@@ -70,211 +64,139 @@ public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
7064
* It throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
7165
*/
7266
@Suppress("UNCHECKED_CAST")
73-
public val value: E get() {
74-
_state.loop { state ->
75-
when (state) {
76-
is Closed -> throw state.valueException
77-
is State<*> -> {
78-
if (state.value === UNDEFINED) throw IllegalStateException("No value")
79-
return state.value as E
80-
}
81-
else -> error("Invalid state $state")
82-
}
67+
public val value: E
68+
get() {
69+
val state = _state
70+
if (state === UNDEFINED) throw IllegalStateException("No value")
71+
(state as? Closed)?.run { throw valueException }
72+
return state as E
8373
}
84-
}
8574

8675
/**
8776
* The most recently sent element to this channel or `null` when this class is constructed without
8877
* initial value and no value was sent yet or if it was [closed][close].
8978
*/
9079
@Suppress("UNCHECKED_CAST")
91-
public val valueOrNull: E? get() {
92-
val state = _state.value
93-
when (state) {
94-
is Closed -> return null
95-
is State<*> -> {
96-
if (state.value === UNDEFINED) return null
97-
return state.value as E
80+
public val valueOrNull: E?
81+
get() {
82+
val state = _state
83+
return when {
84+
state === UNDEFINED -> null
85+
state is Closed -> state.closeCause?.let { throw it }
86+
else -> state as E
9887
}
99-
else -> error("Invalid state $state")
10088
}
101-
}
10289

103-
public override val isClosedForSend: Boolean get() = _state.value is Closed
10490
public override val isFull: Boolean get() = false
10591

106-
@Suppress("UNCHECKED_CAST")
107-
public override fun openSubscription(): ReceiveChannel<E> {
108-
val subscriber = Subscriber(this)
109-
_state.loop { state ->
110-
when (state) {
111-
is Closed -> {
112-
subscriber.close(state.closeCause)
113-
return subscriber
114-
}
115-
is State<*> -> {
116-
if (state.value !== UNDEFINED)
117-
subscriber.offerInternal(state.value as E)
118-
val update = State(state.value, addSubscriber((state as State<E>).subscribers, subscriber))
119-
if (_state.compareAndSet(state, update))
120-
return subscriber
121-
}
122-
else -> error("Invalid state $state")
123-
}
124-
}
125-
}
92+
public override val isClosedForSend: Boolean get() = _state is Closed
12693

127-
@Suppress("UNCHECKED_CAST")
128-
private fun closeSubscriber(subscriber: Subscriber<E>) {
129-
_state.loop { state ->
130-
when (state) {
131-
is Closed -> return
132-
is State<*> -> {
133-
val update = State(state.value, removeSubscriber((state as State<E>).subscribers!!, subscriber))
134-
if (_state.compareAndSet(state, update))
135-
return
94+
public override val onSend: SelectClause2<E, SendChannel<E>>
95+
get() = object : SelectClause2<E, SendChannel<E>> {
96+
override fun <R> registerSelectClause2(select: SelectInstance<R>, param: E, block: suspend (SendChannel<E>) -> R) {
97+
if (!select.trySelect(null)) return
98+
offerInternal(param)?.let {
99+
select.resumeSelectCancellableWithException(it.sendException)
100+
return
136101
}
137-
else -> error("Invalid state $state")
102+
block.startCoroutineUndispatched(receiver = this@ConflatedBroadcastChannel, completion = select.completion)
138103
}
139104
}
140-
}
141105

142-
private fun addSubscriber(list: Array<Subscriber<E>>?, subscriber: Subscriber<E>): Array<Subscriber<E>> {
143-
if (list == null) return Array(1) { subscriber }
144-
return list + subscriber
145-
}
146-
147-
@Suppress("UNCHECKED_CAST")
148-
private fun removeSubscriber(list: Array<Subscriber<E>>, subscriber: Subscriber<E>): Array<Subscriber<E>>? {
149-
val n = list.size
150-
val i = list.indexOf(subscriber)
151-
check(i >= 0)
152-
if (n == 1) return null
153-
val update = arrayOfNulls<Subscriber<E>>(n - 1)
154-
arraycopy(list, 0, update, 0, i)
155-
arraycopy(list, i + 1, update, i, n - i - 1)
156-
return update as Array<Subscriber<E>>
157-
}
106+
public override fun cancel(cause: Throwable?): Boolean = close(cause)
158107

159-
@Suppress("UNCHECKED_CAST")
160108
public override fun close(cause: Throwable?): Boolean {
161-
_state.loop { state ->
162-
when (state) {
163-
is Closed -> return false
164-
is State<*> -> {
165-
val update = if (cause == null) CLOSED else Closed(cause)
166-
if (_state.compareAndSet(state, update)) {
167-
(state as State<E>).subscribers?.forEach { it.close(cause) }
168-
invokeOnCloseHandler(cause)
169-
return true
170-
}
171-
}
172-
else -> error("Invalid state $state")
173-
}
109+
_lock.withLock {
110+
if (_state is Closed) return false
111+
_state = Closed(cause)
112+
113+
// dispose all subscribers
114+
_subscribers.forEach { it.close(cause) }
115+
_subscribers.clear()
116+
117+
_onCloseHandlers.forEach { it.invoke(cause) }
118+
_onCloseHandlers.clear()
119+
120+
return true
174121
}
175122
}
176123

177-
private fun invokeOnCloseHandler(cause: Throwable?) {
178-
val handler = onCloseHandler.value
179-
if (handler !== null && handler !== HANDLER_INVOKED
180-
&& onCloseHandler.compareAndSet(handler, HANDLER_INVOKED)) {
181-
(handler as Handler)(cause)
182-
}
124+
public override fun offer(element: E): Boolean {
125+
offerInternal(element)?.let { throw it.sendException }
126+
return true
183127
}
184128

185-
override fun invokeOnClose(handler: Handler) {
186-
// Intricate dance for concurrent invokeOnClose and close
187-
if (!onCloseHandler.compareAndSet(null, handler)) {
188-
val value = onCloseHandler.value
189-
if (value === HANDLER_INVOKED) {
190-
throw IllegalStateException("Another handler was already registered and successfully invoked")
191-
} else {
192-
throw IllegalStateException("Another handler was already registered: $value")
129+
private fun offerInternal(element: E): Closed? {
130+
if (_lock.tryLock()) {
131+
try {
132+
(_state as? Closed)?.let { return it }
133+
134+
_state = element
135+
_subscribers.forEach { it.tryOffer(element) }
136+
return null
137+
} finally {
138+
_lock.unlock()
193139
}
194140
} else {
195-
val state = _state.value
196-
if (state is Closed && onCloseHandler.compareAndSet(handler, HANDLER_INVOKED)) {
197-
(handler)(state.closeCause)
198-
}
141+
return _state as? Closed
199142
}
200143
}
201144

202-
/**
203-
* Closes this broadcast channel. Same as [close].
204-
*/
205-
public override fun cancel(cause: Throwable?): Boolean = close(cause)
145+
public override fun openSubscription(): ReceiveChannel<E> {
146+
val subscriber = Subscriber()
147+
_subscribers.add(subscriber)
148+
149+
do {
150+
val state = _state
151+
@Suppress("UNCHECKED_CAST")
152+
when {
153+
state is Closed -> subscriber.close(state.closeCause)
154+
state !== UNDEFINED -> subscriber.tryOffer(state as E)
155+
}
156+
// manage offerInternal/close contention
157+
} while (_state !== state)
206158

207-
/**
208-
* Sends the value to all subscribed receives and stores this value as the most recent state for
209-
* future subscribers. This implementation never suspends.
210-
* It throws exception if the channel [isClosedForSend] (see [close] for details).
211-
*/
212-
public override suspend fun send(element: E) {
213-
offerInternal(element)?.let { throw it.sendException }
159+
if (subscriber.isClosedForSend) _subscribers.remove(subscriber)
160+
return subscriber
214161
}
215162

216-
/**
217-
* Sends the value to all subscribed receives and stores this value as the most recent state for
218-
* future subscribers. This implementation always returns `true`.
219-
* It throws exception if the channel [isClosedForSend] (see [close] for details).
220-
*/
221-
public override fun offer(element: E): Boolean {
222-
offerInternal(element)?.let { throw it.sendException }
223-
return true
163+
public override suspend fun send(element: E) {
164+
offerInternal(element)?.run { throw sendException }
224165
}
225166

226-
@Suppress("UNCHECKED_CAST")
227-
private fun offerInternal(element: E): Closed? {
228-
// If some other thread is updating the state in its offer operation we assume that our offer had linearized
229-
// before that offer (we lost) and that offer overwrote us and conflated our offer.
230-
if (!_updating.compareAndSet(0, 1)) return null
231-
try {
232-
_state.loop { state ->
233-
when (state) {
234-
is Closed -> return state
235-
is State<*> -> {
236-
val update = State(element, (state as State<E>).subscribers)
237-
if (_state.compareAndSet(state, update)) {
238-
// Note: Using offerInternal here to ignore the case when this subscriber was
239-
// already concurrently closed (assume the close had conflated our offer for this
240-
// particular subscriber).
241-
state.subscribers?.forEach { it.offerInternal(element) }
242-
return null
243-
}
244-
}
245-
else -> error("Invalid state $state")
246-
}
167+
override fun invokeOnClose(handler: Handler) {
168+
_lock.withLock {
169+
val state = _state
170+
if (state is Closed) {
171+
handler(state.closeCause)
172+
} else {
173+
_onCloseHandlers += handler
247174
}
248-
} finally {
249-
_updating.value = 0 // reset the updating flag to zero even when something goes wrong
250175
}
251176
}
252177

253-
public override val onSend: SelectClause2<E, SendChannel<E>>
254-
get() = object : SelectClause2<E, SendChannel<E>> {
255-
override fun <R> registerSelectClause2(select: SelectInstance<R>, param: E, block: suspend (SendChannel<E>) -> R) {
256-
registerSelectSend(select, param, block)
257-
}
258-
}
178+
private inner class Subscriber : ConflatedChannel<E>() {
259179

260-
private fun <R> registerSelectSend(select: SelectInstance<R>, element: E, block: suspend (SendChannel<E>) -> R) {
261-
if (!select.trySelect(null)) return
262-
offerInternal(element)?.let {
263-
select.resumeSelectCancellableWithException(it.sendException)
264-
return
180+
/**
181+
* Offer an element without throw exception
182+
*/
183+
fun tryOffer(element: E) {
184+
super.offerInternal(element)
265185
}
266-
block.startCoroutineUndispatched(receiver = this, completion = select.completion)
267-
}
268186

269-
@Suppress("DEPRECATION")
270-
private class Subscriber<E>(
271-
private val broadcastChannel: ConflatedBroadcastChannel<E>
272-
) : ConflatedChannel<E>(), ReceiveChannel<E>, SubscriptionReceiveChannel<E> {
273187
override fun cancel(cause: Throwable?): Boolean =
274-
close(cause).also { closed ->
275-
if (closed) broadcastChannel.closeSubscriber(this)
276-
}
188+
super.cancel(cause).also { closed ->
189+
if (closed) this@ConflatedBroadcastChannel._subscribers.remove(this)
190+
}
191+
}
277192

278-
public override fun offerInternal(element: E): Any = super.offerInternal(element)
193+
private class Closed(@JvmField val closeCause: Throwable?) {
194+
val sendException: Throwable get() = closeCause ?: ClosedSendChannelException(DEFAULT_CLOSE_MESSAGE)
195+
val valueException: Throwable get() = closeCause ?: IllegalStateException(DEFAULT_CLOSE_MESSAGE)
196+
}
197+
198+
private companion object {
199+
@JvmField
200+
val UNDEFINED = Symbol("UNDEFINED")
279201
}
280202
}

0 commit comments

Comments
 (0)