-
Notifications
You must be signed in to change notification settings - Fork 617
storage-ktx: add callbackFlow for upload/download progress #4139
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
535c184
2319f36
27ed605
cf62e3f
471edc5
8f0d7c7
bf8507a
7a27083
3d7dc0f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -23,10 +23,17 @@ import com.google.firebase.platforminfo.LibraryVersionComponent | |||||||||||||
import com.google.firebase.storage.FileDownloadTask | ||||||||||||||
import com.google.firebase.storage.FirebaseStorage | ||||||||||||||
import com.google.firebase.storage.ListResult | ||||||||||||||
import com.google.firebase.storage.OnPausedListener | ||||||||||||||
import com.google.firebase.storage.OnProgressListener | ||||||||||||||
import com.google.firebase.storage.StorageMetadata | ||||||||||||||
import com.google.firebase.storage.StorageReference | ||||||||||||||
import com.google.firebase.storage.StorageTaskScheduler | ||||||||||||||
import com.google.firebase.storage.StreamDownloadTask | ||||||||||||||
import com.google.firebase.storage.UploadTask | ||||||||||||||
import kotlinx.coroutines.channels.awaitClose | ||||||||||||||
import kotlinx.coroutines.channels.trySendBlocking | ||||||||||||||
import kotlinx.coroutines.flow.Flow | ||||||||||||||
import kotlinx.coroutines.flow.callbackFlow | ||||||||||||||
|
||||||||||||||
/** Returns the [FirebaseStorage] instance of the default [FirebaseApp]. */ | ||||||||||||||
val Firebase.storage: FirebaseStorage | ||||||||||||||
|
@@ -133,6 +140,93 @@ operator fun ListResult.component2(): List<StorageReference> = prefixes | |||||||||||||
*/ | ||||||||||||||
operator fun ListResult.component3(): String? = pageToken | ||||||||||||||
|
||||||||||||||
/** | ||||||||||||||
* Starts listening to this task's stream download progress and emits its values via a [Flow]. | ||||||||||||||
* | ||||||||||||||
* - When the returned flow starts being collected, it attaches the following listeners: | ||||||||||||||
* [OnPausedListener], [OnProgressListener]. | ||||||||||||||
* - When the flow completes the listeners will be removed. | ||||||||||||||
*/ | ||||||||||||||
val StreamDownloadTask.progress: Flow<StorageProgress<StreamDownloadTask.TaskSnapshot>> | ||||||||||||||
get() = callbackFlow { | ||||||||||||||
val progressListener = OnProgressListener<StreamDownloadTask.TaskSnapshot> { snapshot -> | ||||||||||||||
StorageTaskScheduler.getInstance().scheduleCallback { | ||||||||||||||
trySendBlocking(StorageProgress.InProgress(snapshot)) | ||||||||||||||
} | ||||||||||||||
} | ||||||||||||||
val pauseListener = OnPausedListener<StreamDownloadTask.TaskSnapshot> { snapshot -> | ||||||||||||||
StorageTaskScheduler.getInstance().scheduleCallback { | ||||||||||||||
trySendBlocking(StorageProgress.Paused(snapshot)) | ||||||||||||||
} | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
addOnProgressListener(progressListener) | ||||||||||||||
addOnPausedListener(pauseListener) | ||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My initial idea was to pass the same executor to these listeners, but Storage's callback executor is marked as private: Lines 54 to 56 in 4f24be3
So instead I'm using the firebase-android-sdk/firebase-storage/ktx/src/main/kotlin/com/google/firebase/storage/ktx/Storage.kt Lines 153 to 155 in 27ed605
|
||||||||||||||
|
||||||||||||||
awaitClose { | ||||||||||||||
removeOnProgressListener(progressListener) | ||||||||||||||
removeOnPausedListener(pauseListener) | ||||||||||||||
} | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
/** | ||||||||||||||
* Starts listening to this task's file download progress and emits its values via a [Flow]. | ||||||||||||||
* | ||||||||||||||
* - When the returned flow starts being collected, it attaches the following listeners: | ||||||||||||||
* [OnPausedListener], [OnProgressListener]. | ||||||||||||||
* - When the flow completes the listeners will be removed. | ||||||||||||||
*/ | ||||||||||||||
val FileDownloadTask.progress: Flow<StorageProgress<FileDownloadTask.TaskSnapshot>> | ||||||||||||||
get() = callbackFlow { | ||||||||||||||
val progressListener = OnProgressListener<FileDownloadTask.TaskSnapshot> { snapshot -> | ||||||||||||||
StorageTaskScheduler.getInstance().scheduleCallback { | ||||||||||||||
trySendBlocking(StorageProgress.InProgress(snapshot)) | ||||||||||||||
} | ||||||||||||||
} | ||||||||||||||
val pauseListener = OnPausedListener<FileDownloadTask.TaskSnapshot> { snapshot -> | ||||||||||||||
StorageTaskScheduler.getInstance().scheduleCallback { | ||||||||||||||
trySendBlocking(StorageProgress.Paused(snapshot)) | ||||||||||||||
} | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
addOnProgressListener(progressListener) | ||||||||||||||
addOnPausedListener(pauseListener) | ||||||||||||||
|
||||||||||||||
awaitClose { | ||||||||||||||
removeOnProgressListener(progressListener) | ||||||||||||||
removeOnPausedListener(pauseListener) | ||||||||||||||
} | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
/** | ||||||||||||||
* Starts listening to this task's upload progress and emits its values via a [Flow]. | ||||||||||||||
* | ||||||||||||||
* - When the returned flow starts being collected, it attaches the following listeners: | ||||||||||||||
* [OnPausedListener], [OnProgressListener]. | ||||||||||||||
* - When the flow completes the listeners will be removed. | ||||||||||||||
*/ | ||||||||||||||
val UploadTask.progress: Flow<StorageProgress<UploadTask.TaskSnapshot>> | ||||||||||||||
get() = callbackFlow { | ||||||||||||||
val progressListener = OnProgressListener<UploadTask.TaskSnapshot> { snapshot -> | ||||||||||||||
StorageTaskScheduler.getInstance().scheduleCallback { | ||||||||||||||
trySendBlocking(StorageProgress.InProgress(snapshot)) | ||||||||||||||
} | ||||||||||||||
} | ||||||||||||||
val pauseListener = OnPausedListener<UploadTask.TaskSnapshot> { snapshot -> | ||||||||||||||
StorageTaskScheduler.getInstance().scheduleCallback { | ||||||||||||||
trySendBlocking(StorageProgress.Paused(snapshot)) | ||||||||||||||
} | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
addOnProgressListener(progressListener) | ||||||||||||||
addOnPausedListener(pauseListener) | ||||||||||||||
|
||||||||||||||
maneesht marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||
awaitClose { | ||||||||||||||
removeOnProgressListener(progressListener) | ||||||||||||||
removeOnPausedListener(pauseListener) | ||||||||||||||
} | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
internal const val LIBRARY_NAME: String = "fire-stg-ktx" | ||||||||||||||
|
||||||||||||||
/** @suppress */ | ||||||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
package com.google.firebase.storage.ktx | ||
|
||
/** | ||
* Used to emit events about the progress of storage tasks. | ||
*/ | ||
abstract class StorageProgress<T> private constructor() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not using a Binary incompatibility is well explained here: https://jakewharton.com/public-api-challenges-in-kotlin/#binary-compatibility |
||
/** | ||
* Called periodically as data is transferred and can be used to populate an upload/download indicator. | ||
*/ | ||
class InProgress<T>(val snapshot: T) : StorageProgress<T>() | ||
|
||
/** | ||
* Called any time the upload/download is paused. | ||
*/ | ||
class Paused<T>(val snapshot: T) : StorageProgress<T>() | ||
} |
Uh oh!
There was an error while loading. Please reload this page.