Skip to content

Commit 535c184

Browse files
committed
add kotlin flows to storage
1 parent 6bc2882 commit 535c184

File tree

3 files changed

+111
-0
lines changed

3 files changed

+111
-0
lines changed

firebase-storage/ktx/ktx.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ dependencies {
6161
implementation project(':firebase-storage')
6262
implementation 'androidx.annotation:annotation:1.1.0'
6363
implementation 'com.google.android.gms:play-services-tasks:18.0.1'
64+
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.4'
6465

6566
androidTestImplementation 'junit:junit:4.12'
6667
androidTestImplementation "com.google.truth:truth:$googleTruthVersion"

firebase-storage/ktx/src/main/kotlin/com/google/firebase/storage/ktx/Storage.kt

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,17 @@ import com.google.firebase.platforminfo.LibraryVersionComponent
2323
import com.google.firebase.storage.FileDownloadTask
2424
import com.google.firebase.storage.FirebaseStorage
2525
import com.google.firebase.storage.ListResult
26+
import com.google.firebase.storage.OnPausedListener
27+
import com.google.firebase.storage.OnProgressListener
2628
import com.google.firebase.storage.StorageMetadata
2729
import com.google.firebase.storage.StorageReference
30+
import com.google.firebase.storage.StorageTaskScheduler
2831
import com.google.firebase.storage.StreamDownloadTask
2932
import com.google.firebase.storage.UploadTask
33+
import kotlinx.coroutines.channels.awaitClose
34+
import kotlinx.coroutines.channels.trySendBlocking
35+
import kotlinx.coroutines.flow.Flow
36+
import kotlinx.coroutines.flow.callbackFlow
3037

3138
/** Returns the [FirebaseStorage] instance of the default [FirebaseApp]. */
3239
val Firebase.storage: FirebaseStorage
@@ -133,6 +140,93 @@ operator fun ListResult.component2(): List<StorageReference> = prefixes
133140
*/
134141
operator fun ListResult.component3(): String? = pageToken
135142

143+
/**
144+
* Starts listening to this task's stream download progress and emits its values via a [Flow].
145+
*
146+
* - When the returned flow starts being collected, it attaches the following listeners:
147+
* [OnPausedListener], [OnProgressListener].
148+
* - When the flow completes the listeners will be removed.
149+
*/
150+
val StreamDownloadTask.progress: Flow<StorageProgress<StreamDownloadTask.TaskSnapshot>>
151+
get() = callbackFlow {
152+
val progressListener = OnProgressListener<StreamDownloadTask.TaskSnapshot> { snapshot ->
153+
StorageTaskScheduler.getInstance().scheduleCallback {
154+
trySendBlocking(StorageProgress.InProgress(snapshot))
155+
}
156+
}
157+
val pauseListener = OnPausedListener<StreamDownloadTask.TaskSnapshot> { snapshot ->
158+
StorageTaskScheduler.getInstance().scheduleCallback {
159+
trySendBlocking(StorageProgress.Paused(snapshot))
160+
}
161+
}
162+
163+
addOnProgressListener(progressListener)
164+
addOnPausedListener(pauseListener)
165+
166+
awaitClose {
167+
removeOnProgressListener(progressListener)
168+
removeOnPausedListener(pauseListener)
169+
}
170+
}
171+
172+
/**
173+
* Starts listening to this task's file download progress and emits its values via a [Flow].
174+
*
175+
* - When the returned flow starts being collected, it attaches the following listeners:
176+
* [OnPausedListener], [OnProgressListener].
177+
* - When the flow completes the listeners will be removed.
178+
*/
179+
val FileDownloadTask.progress: Flow<StorageProgress<FileDownloadTask.TaskSnapshot>>
180+
get() = callbackFlow {
181+
val progressListener = OnProgressListener<FileDownloadTask.TaskSnapshot> { snapshot ->
182+
StorageTaskScheduler.getInstance().scheduleCallback {
183+
trySendBlocking(StorageProgress.InProgress(snapshot))
184+
}
185+
}
186+
val pauseListener = OnPausedListener<FileDownloadTask.TaskSnapshot> { snapshot ->
187+
StorageTaskScheduler.getInstance().scheduleCallback {
188+
trySendBlocking(StorageProgress.Paused(snapshot))
189+
}
190+
}
191+
192+
addOnProgressListener(progressListener)
193+
addOnPausedListener(pauseListener)
194+
195+
awaitClose {
196+
removeOnProgressListener(progressListener)
197+
removeOnPausedListener(pauseListener)
198+
}
199+
}
200+
201+
/**
202+
* Starts listening to this task's upload progress and emits its values via a [Flow].
203+
*
204+
* - When the returned flow starts being collected, it attaches the following listeners:
205+
* [OnPausedListener], [OnProgressListener].
206+
* - When the flow completes the listeners will be removed.
207+
*/
208+
val UploadTask.progress: Flow<StorageProgress<UploadTask.TaskSnapshot>>
209+
get() = callbackFlow {
210+
val progressListener = OnProgressListener<UploadTask.TaskSnapshot> { snapshot ->
211+
StorageTaskScheduler.getInstance().scheduleCallback {
212+
trySendBlocking(StorageProgress.InProgress(snapshot))
213+
}
214+
}
215+
val pauseListener = OnPausedListener<UploadTask.TaskSnapshot> { snapshot ->
216+
StorageTaskScheduler.getInstance().scheduleCallback {
217+
trySendBlocking(StorageProgress.Paused(snapshot))
218+
}
219+
}
220+
221+
addOnProgressListener(progressListener)
222+
addOnPausedListener(pauseListener)
223+
224+
awaitClose {
225+
removeOnProgressListener(progressListener)
226+
removeOnPausedListener(pauseListener)
227+
}
228+
}
229+
136230
internal const val LIBRARY_NAME: String = "fire-stg-ktx"
137231

138232
/** @suppress */
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package com.google.firebase.storage.ktx
2+
3+
/**
4+
* Used to emit events about the progress of storage tasks.
5+
*/
6+
abstract class StorageProgress<T> private constructor() {
7+
/**
8+
* Called periodically as data is transferred and can be used to populate an upload/download indicator.
9+
*/
10+
class InProgress<T>(val snapshot: T) : StorageProgress<T>()
11+
12+
/**
13+
* Called any time the upload/download is paused.
14+
*/
15+
class Paused<T>(val snapshot: T) : StorageProgress<T>()
16+
}

0 commit comments

Comments
 (0)