Skip to content

database-ktx: add callbackFlow for eventlisteners #4012

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Sep 23, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions firebase-database/ktx/ktx.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ dependencies {
implementation project(':firebase-database')
implementation 'androidx.annotation:annotation:1.1.0'
implementation 'com.google.android.gms:play-services-tasks:18.0.1'
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.5.2'

androidTestImplementation 'junit:junit:4.12'
androidTestImplementation "com.google.truth:truth:$googleTruthVersion"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.google.firebase.database.ktx

import com.google.firebase.database.DataSnapshot

/**
* Used to emit events about changes in the child locations of a given
* [Query] when using the [childEvents] Flow.
*/
sealed class ChildEvent {
/**
* Emitted when a new child is added to the location.
*
* @param snapshot An immutable snapshot of the data at the new child location
* @param previousChildName The key name of sibling location ordered before the new child. This
* will be null for the first child node of a location.
*/
data class Added(val snapshot: DataSnapshot, val previousChildName: String?) : ChildEvent()

/**
* Emitted when the data at a child location has changed.
*
* @param snapshot An immutable snapshot of the data at the new data at the child location
* @param previousChildName The key name of sibling location ordered before the child. This will
* be null for the first child node of a location.
*/
data class Changed(val snapshot: DataSnapshot, val previousChildName: String?) : ChildEvent()

/**
* Emitted when a child is removed from the location.
*
* @param snapshot An immutable snapshot of the data at the child that was removed.
*/
data class Removed(val snapshot: DataSnapshot) : ChildEvent()

/**
* Emitted when a child location's priority changes.
*
* @param snapshot An immutable snapshot of the data at the location that moved.
* @param previousChildName The key name of the sibling location ordered before the child
* location. This will be null if this location is ordered first.
*/
data class Moved(val snapshot: DataSnapshot, val previousChildName: String?) : ChildEvent()
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,19 @@ import androidx.annotation.Keep
import com.google.firebase.FirebaseApp
import com.google.firebase.components.Component
import com.google.firebase.components.ComponentRegistrar
import com.google.firebase.database.ChildEventListener
import com.google.firebase.database.DataSnapshot
import com.google.firebase.database.DatabaseError
import com.google.firebase.database.FirebaseDatabase
import com.google.firebase.database.GenericTypeIndicator
import com.google.firebase.database.MutableData
import com.google.firebase.database.Query
import com.google.firebase.database.ValueEventListener
import com.google.firebase.ktx.Firebase
import com.google.firebase.platforminfo.LibraryVersionComponent
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.flow.callbackFlow

/** Returns the [FirebaseDatabase] instance of the default [FirebaseApp]. */
val Firebase.database: FirebaseDatabase
Expand Down Expand Up @@ -59,6 +66,56 @@ inline fun <reified T> MutableData.getValue(): T? {
return getValue(object : GenericTypeIndicator<T>() {})
}

/**
* Starts listening to this query and emits its values via a [Flow].
*
* - When the returned flow starts being collected, a [ValueEventListener] will be attached.
* - When the flow completes, the listener will be removed.
*/
fun Query.snapshots() = callbackFlow<DataSnapshot> {
val listener = addValueEventListener(object : ValueEventListener {
override fun onDataChange(snapshot: DataSnapshot) {
trySend(snapshot)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For 7235473 we went with trySendBlocking is there any difference between cases?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When using trySendBlocking Android Studio shows me this warning:

Screenshot 2022-08-19 at 16 26 25

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. I think it is a false positive in the linter.
  2. That said, it looks like the events are dispatched on the UI thread, so it's not ok to directly trySendBlocking:

imo instead we can do the same thing we did for firestore, something like

Suggested change
trySend(snapshot)
someExecutor.execute(() -> trySendBlocking(snapshot));

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vkryachko Looks like firestore's background executor uses a ThreadPool with 4 threads:

public static final Executor BACKGROUND_EXECUTOR =
new ThrottledForwardingExecutor(
ASYNC_THREAD_POOL_MAXIMUM_CONCURRENCY, AsyncTask.THREAD_POOL_EXECUTOR);

private static final int ASYNC_THREAD_POOL_MAXIMUM_CONCURRENCY = 4;

Should we also use a ThreadPool for RTDB?

Suggested change
trySend(snapshot)
Executors.newFixedThreadPool(nThreads = 4).execute {
trySendBlocking(ChildEvent.Added(snapshot, previousChildName))
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vkryachko You were right, it was a false-positive in the linter. After the AGP7 update, the lint warning went away.

But my question on what executor we'll want to use remains.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if that is safe, i.e. isn't in happening on the UI thread?

@maneesht what are your thoughts?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like scheduleNow() delegates to DefaultRunLoop which uses a ScheduledThreadPoolExecutor , so it doesn't block the UI Thread.

public DefaultRunLoop() {
int threadsInPool = 1;
ThreadFactory threadFactory = new FirebaseThreadFactory();
executor =
new ScheduledThreadPoolExecutor(threadsInPool, threadFactory) {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My only concern with this is that we would end up blocking other operations within the Repo's actions. Maybe we should create a new executor solely for this? As RTDB only has one ATM

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought this through a bit more, and let's go ahead and just use the default run loop. If we feel like there's a performance gain, we can always change this in a future patch.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for looking into it @maneesht !

}

override fun onCancelled(error: DatabaseError) {
cancel(message = "Error getting Query snapshot", cause = error.toException())
}
})
awaitClose { removeEventListener(listener) }
}

/**
* Starts listening to this query's child events and emits its values via a [Flow].
*
* - When the returned flow starts being collected, a [ChildEventListener] will be attached.
* - When the flow completes, the listener will be removed.
*/
fun Query.childEvents() = callbackFlow<ChildEvent> {
val listener = addChildEventListener(object : ChildEventListener {
override fun onChildAdded(snapshot: DataSnapshot, previousChildName: String?) {
trySend(ChildEvent.Added(snapshot, previousChildName))
}

override fun onChildChanged(snapshot: DataSnapshot, previousChildName: String?) {
trySend(ChildEvent.Changed(snapshot, previousChildName))
}

override fun onChildRemoved(snapshot: DataSnapshot) {
trySend(ChildEvent.Removed(snapshot))
}

override fun onChildMoved(snapshot: DataSnapshot, previousChildName: String?) {
trySend(ChildEvent.Moved(snapshot, previousChildName))
}

override fun onCancelled(error: DatabaseError) {
cancel(message = "Error getting Query childEvent", cause = error.toException())
}
})
awaitClose { removeEventListener(listener) }
}

internal const val LIBRARY_NAME: String = "fire-db-ktx"

/** @suppress */
Expand Down