-
Notifications
You must be signed in to change notification settings - Fork 1.9k
BroadcastChannel.asFlow().onStart(...) is invoked before the subscription is opened #1758
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
…art behavior Fixes Kotlin#1758
Your second example feels wrong. In your third example, the code block sends |
You're right, if you have multiple consumers then the second example would be a bit different, but for the sake of the example let's assume it's only being consumed by that one Regarding the class Connection {
val gatt: BluetoothGatt = ...
val callback: CallbackWrapper = ...
suspend fun writeData(data: Data) = writeMutex.withLock {
withTimeoutOrNull(TIMEOUT) {
callback.output.onStart {
// We want to wait until the flow is actually being collected
// before we send the data, so that we're sure to be observing
// when we get the response, except that this lambda is invoked
// before the subscription is actually created.
gatt.writeCharacteristic(data)
}
.take(1)
.collect {
// ...
}
}
}
}
class CallbackWrapper : GiantAbstractCallbackForAndroidBluetooth() {
private val outputChannel = BroadcastChannel<Data>(20)
val output: Flow<Data>
// in this case, we can "fix" the issue
// by using openSubscription().consumeAsFlow()
get() = outputChannel.asFlow()
override fun onCharacteristicWrite(data: Data) {
outputChannel.sendBlocking(data)
}
// lots more functions here
} I'm not actually interested in using In this example, The issue I see here is that |
Hi @RBusarow,
Please read the official KEEP https://github.com/Kotlin/KEEP/blob/master/proposals/coroutines.md#wrapping-callbacks and reconsider your code, can you |
No, that wouldn't work here since the callback can only be set when the connection is established. This is some pretty old Android code - not anything I have control over. The callback is actually an abstract class with 13 different callback functions (link), so the only real solutions are Channel-based. We had a few options for this specific use-case. For now, we're just using My concern is really just that With Moving the callback invocation to after the subscription creation just gives us more options. |
@RBusarow Why would you want the change in behaviour that you are writing about? The example you give... did it come out why you were solving some practical problem? What was the problem you were trying to solve? |
I believe I'm faced with the same issue as @RBusarow, so I'll try to give more clarity into the problem we're trying to solve. In Bluetooth Low-Energy (BLE) we read/write to characteristics on the peripheral, unfortunately in Android BLE, the object representations of these characteristics are shared/reused, so writing to a single characteristic could trample incoming data for the same characteristic, or visa versa. The common workaround is to use 2 characteristics for communication (a characteristic dedicated for writing and a characteristic dedicated for "reading" via change notifications). All characteristic changes are delivered to Android's interface GattIo {
// Suspends until Android's `BluetoothGattCallback.onCharacteristicWrite` is called, indicating successful write.
suspend fun writeCharacteristic(characteristic: BluetoothGattCharacteristic, value: ByteArray): OnCharacteristicWrite
val onCharacteristicChanged: Flow<OnCharacteristicChanged>
} If you want request/response style communication, then you'll need to handle sending the request (via In the diagram above, I've tried writing the
As @RBusarow pointed out, the current behavior will invoke
In the following pseudo code we face the same potential issue as number 1 above, that suspend fun request(request: Request): Response = coroutineScope {
val response = async {
onCharacteristicChanged
.map { /* ... */ }
.first { response -> request.id == response.id }
}
val bytes: ByteArray = request.toBytes()
writeCharacteristic(transmitCharacteristic, bytes)
response.await()
} Changing the behavior (#1759) would make Is there perhaps another approach that we've overlooked (aside from the attempts listed above) that would be better suited? |
Thanks a lot for a detailed explanation of your use-case. Please note, that we are currently working on a replacement for all kinds of So, what would be a flow-based replacement for your use-case? There will be two recommended approaches to disseminate events:
Anyway, consumers get a reference to a
What we want to achieve here is to have some kind of a reliable signal that the collector had, indeed, started collecting and will receive any further events from the upstream flow. Essentially the design of the flow contains just three kinds of signals that travel between the downstream collector of the events and an upstream emitter of the events:
The solution proposed in #1759 is local. It does not propagate through the operator chains and cannot be fixed. It means that the solution is fragile. We need a robust one. How can we have it? The solution the preserves this simple design of
Now a collector can be written like this:
But sharing operators pose a special challenge. When you share a single upstream collection with multiple downstream ones the upstream does not get any signal on incoming downstream collectors. They are all handled by the sharing operator itself.
So, the design of the sharing operators has to include this feature of materializing "after start" signal for all incoming collectors. The draft design that I envision is to add some kind of a
What do you think? |
It sounds promising, but I'm not quite sure how to expose this paradigm on my public API. For the Android Bluetooth Low-Energy (BLE) API, the I could workaround that limitation by baking in my own callback registration and holding a reference to that callback ( class GattCallback : BluetoothGattCallback() {
@Volative
private var callback: Callback? = null
fun register(callback: Callback) {
this.callback = callback
}
fun unregister() {
callback = null
}
override fun onCharacteristicChanged(
gatt: BluetoothGatt,
characteristic: BluetoothGattCharacteristic
) {
callback?.invoke(/* ... */)
}
// ...
} Since Based on your example, to get the desired multicast behavior I could have: sealed class Event {
object Started : Event()
data class Value(value: OnCharacteristicChanged) : Event()
}
class Gatt(
private val gattCallback: GattCallback
) {
val onCharacteristicChanged: Flow<Event> = callbackFlow<Event> {
val callback = { sendBlocking(Value(it)) }
gattCallback.register(callback)
send(Started)
awaitClose { gattCallback.unregister() }
}.shareIn(GlobalScope, keepFirstIf = { it is Started })
} Although it's a bit awkward to document the usage of If I instead expose |
@twyatt You can expose
|
The extension function makes sense, but the type ( Whereas before I would've exposed it as: data class OnCharacteristicChanged(...)
class Gatt {
// Multicast hot-Flow backed by BroadcastChannel.asFlow
val onCharacteristicChanged: Flow<OnCharacteristicChanged> = ...
} To support sealed class Event {
object Started : Event()
data class Value(value: OnCharacteristicChanged) : Event()
}
class Gatt {
// Multicast hot-Flow backed by callbackFlow.shareIn
val onCharacteristicChanged: Flow<Event> = ...
} As you pointed out, an fun Flow<Event>.onCommunicationStarted(block: () -> Unit): Flow<OnCharacteristicChanged> = ... But in situations where consumer isn't concerned with the fun Flow<Event>.characteristicChanges(): Flow<OnCharacteristicChanged> = ... Is there a possibility the |
Here are potential solutions:
What can we have "out-of-the-box" in the library? We can provide some kind of ready-to-use |
@elizarov thanks so much for explaining the design hurdles and why certain options are off the table.
I appreciate you hearing and working through our use-case and offering possible solutions. For now, it seems reasonable for me to continue to expose I'll monitor the |
@twyatt @RBusarow With upcoming Unfortunately, in the current shared flow prototype if you expose
Any other ideas? We also have an option to rename |
This comment has been minimized.
This comment has been minimized.
`Flow`'s `onStart` is called **after** subscribing to underlying data stream. When performing a send-then-receive the response could be lost if the `Flow` is slow to subscribe. Problem was described/discussed in [Kotlin/kotlinx.coroutines#1758] and will be fixed in [Kotlin/kotlinx.coroutines#2034]. The workaround is to use `openSubscription().consumeAsFlow()` which opens the subscription prior to spinning up the `Flow` (and in turn will be subscribed before `onStart`). [Kotlin/kotlinx.coroutines#1758]: Kotlin/kotlinx.coroutines#1758 (comment) [Kotlin/kotlinx.coroutines#2034]: Kotlin/kotlinx.coroutines#2034
I believe I solved/worked around this problem in my use of ABLE and previously RxAndroidBLE (bridged into coroutines) by starting collection through CoroutineStart.UNDISPATCHED: private suspend fun writeBytesAndAwaitResponse(command: ByteArray, configId: HAConfigID): HaConfigDataSourcePacket {
val notification = connection.async(start = CoroutineStart.UNDISPATCHED) {
controlPointNotifications.filter { it.opCode.configId == configId }.first()
}
connection.writeCharacteristicOrThrow(controlPoint, command, WRITE_TYPE_DEFAULT)
return notification.await()
} |
@elizarov is that the correct usage of UNDISPATCHED? |
Btw, we'd decided to name the |
Yes. That is exactly the case |
this comment chain is getting rather long, is there anything mentioned that isn't solved by async undispatched? Does the onSubscription method add anything that does not handle? |
@Computr0n I had missed the |
@twyatt sweet, glad I could help. |
The
@elizarov does that mean that using When we hit the "first suspension point" (being |
filter is not a suspension point, the stream has not been assembled at that point. the only suspension point is at collection, which is only happening after first() is called. spreading the lines out in your IDE and looking for the suspend icon in the gutter will show this in a more visual way. |
Whoopsie, oversight on my part. 🤦 |
Prints: One advantage that Here's the code with undispatched start:
Here's how you'd write the same code with
Unlike a solution with
And then use various terminal operators on it like |
@elizarov brilliant. thank you! |
I'm closing this issue as the design of shared flows in #2034 now takes this use-case into account. |
In the docs, we have this example of
onStart(...)
:Not just with
emit(...)
, this works with sending to aChannel
as well.However, with a
BroadcastChannel
we won't get anything:If
onStart(...)
is primarily a callback to say that "we're collecting data", instead of a fancy way to prepend data to theFlow
, then this feels wrong.The difference is because of how the subscription is created. The subscription channel isn't created until the builder is reached, which is after
onStart(...)
:onStart(...)
is being invoked at the same time as always, and the value is being sent, but the subscription'sReceiveChannel
is created afterwards and never gets it.Proposed Solution
Make
asFlow()
return its own internal type (BroadcastFlow
?), and give it anupdate(...)
function similar toChannelFlow
. This function will just accumulate theaction
(s) and store it/them until the finalcollect(...)
is called.Then make
onStart(...)
do a type check and just callupdate(...)
if it's aBroadcastFlow
.When it's time to start collecting, create the subscription,
then
invoke the action, then callemitAll(...)
.The text was updated successfully, but these errors were encountered: