-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Make ConflatedBroadcastChannel allocation-free #395
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
"Observable value source" was discussed here #274 I made a similar proposal (with some issues), here the synchronized version. import kotlinx.coroutines.experimental.channels.Closed
import kotlinx.coroutines.experimental.channels.ConflatedChannel
import kotlinx.coroutines.experimental.channels.ReceiveChannel
private class SubscribableVariableImpl<T>(initialValue: T) : SubscribableVariable<T> {
/**
* Subscription list
*/
private var _subscriptions: Array<Subscription>? = null
@Volatile
public override var value: T = initialValue
set(newValue) {
// lock value and notify value update
synchronized(this) {
field = newValue
_subscriptions?.forEach { it.offer(newValue) }
}
}
public override fun openSubscription(): ReceiveChannel<T> {
val subscription = Subscription()
synchronized(this) {
_subscriptions = _subscriptions?.let { it + subscription } ?: arrayOf(subscription)
// offer initial value
subscription.offer(value)
}
return subscription
}
/**
* Remove [subscription] from [_subscriptions] list
*/
private fun removeSubscription(subscription: Subscription) {
synchronized(this) {
val oldSubscriptions = _subscriptions ?: error("Subscription not found")
if (oldSubscriptions.size == 1) {
if (oldSubscriptions[0] == subscription) {
_subscriptions = null
return
} else {
error("Subscription not found")
}
}
val newSubscriptions: Array<Subscription?> = arrayOfNulls(oldSubscriptions.size - 1)
var newI = 0
for (oldSubscription in oldSubscriptions) {
if (oldSubscription != subscription) {
if (newI == newSubscriptions.size)
error("Subscription not found")
newSubscriptions[newI] = oldSubscription
newI++
}
}
_subscriptions = newSubscriptions as Array<Subscription>
}
}
/**
* Private [ReceiveChannel] implementation
*/
private inner class Subscription : ConflatedChannel<T>(), ReceiveChannel<T> {
override fun onClosed(closed: Closed<T>) {
removeSubscription(this)
super.onClosed(closed)
}
}
} |
In the above draft replacing Moreover the inner class |
Any updates on this? Maybe something changed after the release of cold streams preview because even now with Flow, to implement something like "observable value source" you have to use a channel with all allocation problems, maybe we should expect some optimization at least for channel created for Flow ( |
No updates yet. We are now focused on two areas:
The idea that I have in mind, though, is that for #1082, instead of turning "ConflatedBroadcastChannel" into the "DataFlow" we can reimplement "DataFlow" from scratch, maybe dropping some for its channel features, thus making it simpler, and open the road to lock-based allocation-free implementation. |
@elizarov Thanks for the update! |
StateFlow is a Flow analogue to ConflatedBroadcastChannel. Since Flow API is simpler than channels APIs, the implementation of StateFlow is simpler. It consumes and allocates less memory, while still providing full deadlock-freedom (even though it is not lock-free internally). Fixes #1973 Fixes #395
This issue will be fixed in #1974 by providing allocation-free capabilities similar to |
StateFlow is a Flow analogue to ConflatedBroadcastChannel. Since Flow API is simpler than channels APIs, the implementation of StateFlow is simpler. It consumes and allocates less memory, while still providing full deadlock-freedom (even though it is not lock-free internally). Fixes #1973 Fixes #395 Fixes #1816
StateFlow is a Flow analogue to ConflatedBroadcastChannel. Since Flow API is simpler than channels APIs, the implementation of StateFlow is simpler. It consumes and allocates less memory, while still providing full deadlock-freedom (even though it is not lock-free internally). Fixes Kotlin#1973 Fixes Kotlin#395 Fixes Kotlin#1816 Co-authored-by: Vsevolod Tolstopyatov <[email protected]>
ConflatedBroadcastChannel
is used in UI applications as "observable value source" and it would be useful to make it operate in a fully allocation-free way on its send-receive path even though for the cost of losing some lock-freedom (it is not much of importance there).The text was updated successfully, but these errors were encountered: