@@ -11,9 +11,9 @@ import kotlinx.coroutines.*
11
11
import kotlinx.coroutines.channels.*
12
12
import kotlinx.coroutines.channels.Channel.Factory.BUFFERED
13
13
import kotlinx.coroutines.flow.internal.*
14
- import kotlinx.coroutines.flow.internal.unsafeFlow as flow
15
14
import kotlin.coroutines.*
16
15
import kotlin.jvm.*
16
+ import kotlinx.coroutines.flow.internal.unsafeFlow as flow
17
17
18
18
/* *
19
19
* Creates a flow from the given suspendable [block].
@@ -259,10 +259,16 @@ public fun <T> channelFlow(@BuilderInference block: suspend ProducerScope<T>.()
259
259
*
260
260
* This builder ensures thread-safety and context preservation, thus the provided [ProducerScope] can be used
261
261
* from any context, e.g. from a callback-based API.
262
- * The resulting flow completes as soon as the code in the [block] and all its children completes.
263
- * Use [awaitClose] as the last statement to keep it running.
264
- * The [awaitClose] argument is called either when a flow consumer cancels the flow collection
265
- * or when a callback-based API invokes [SendChannel.close] manually.
262
+ * The resulting flow completes as soon as the code in the [block] completes.
263
+ * [awaitClose] should be used to keep the flow running, otherwise the channel will be closed immediately
264
+ * when block completes.
265
+ * [awaitClose] argument is called either when a flow consumer cancels the flow collection
266
+ * or when a callback-based API invokes [SendChannel.close] manually and is typically used
267
+ * to cleanup the resources after the completion, e.g. unregister a callback.
268
+ * Using [awaitClose] is mandatory in order to prevent memory leaks when the flow collection is cancelled,
269
+ * otherwise the callback may keep running even when the flow collector is already completed.
270
+ * To avoid such leaks, this method throws [IllegalStateException] if block returns, but the channel
271
+ * is not closed yet.
266
272
*
267
273
* A channel with the [default][Channel.BUFFERED] buffer size is used. Use the [buffer] operator on the
268
274
* resulting flow to specify a user-defined value and to control what happens when data is produced faster
@@ -277,31 +283,34 @@ public fun <T> channelFlow(@BuilderInference block: suspend ProducerScope<T>.()
277
283
* fun flowFrom(api: CallbackBasedApi): Flow<T> = callbackFlow {
278
284
* val callback = object : Callback { // implementation of some callback interface
279
285
* override fun onNextValue(value: T) {
280
- * // Note: offer drops value when buffer is full
281
- * // Use either buffer(Channel.CONFLATED) or buffer(Channel.UNLIMITED) to avoid overfill
282
- * offer(value)
286
+ * // To avoid blocking you can configure channel capacity using
287
+ * // either buffer(Channel.CONFLATED) or buffer(Channel.UNLIMITED) to avoid overfill
288
+ * try {
289
+ * sendBlocking(value)
290
+ * } catch (e: Exception) {
291
+ * // Handle exception from the channel: failure in flow or premature closing
292
+ * }
283
293
* }
284
294
* override fun onApiError(cause: Throwable) {
285
295
* cancel(CancellationException("API Error", cause))
286
296
* }
287
297
* override fun onCompleted() = channel.close()
288
298
* }
289
299
* api.register(callback)
290
- * // Suspend until either onCompleted or external cancellation are invoked
300
+ * /*
301
+ * * Suspends until either 'onCompleted' from the callback is invoked
302
+ * * or flow collector is cancelled (e.g. by 'take(1)' or because a collector's activity was destroyed).
303
+ * * In both cases, callback will be properly unregistered.
304
+ * */
291
305
* awaitClose { api.unregister(callback) }
292
306
* }
293
307
* ```
294
- *
295
- * This function is an alias for [channelFlow], it has a separate name to reflect
296
- * the intent of the usage (integration with a callback-based API) better.
297
308
*/
298
- @Suppress(" NOTHING_TO_INLINE" )
299
309
@ExperimentalCoroutinesApi
300
- public inline fun <T > callbackFlow (@BuilderInference noinline block : suspend ProducerScope <T >.() -> Unit ): Flow <T > =
301
- channelFlow(block)
310
+ public fun <T > callbackFlow (@BuilderInference block : suspend ProducerScope <T >.() -> Unit ): Flow <T > = CallbackFlowBuilder (block)
302
311
303
312
// ChannelFlow implementation that is the first in the chain of flow operations and introduces (builds) a flow
304
- private class ChannelFlowBuilder <T >(
313
+ private open class ChannelFlowBuilder <T >(
305
314
private val block : suspend ProducerScope <T >.() -> Unit ,
306
315
context : CoroutineContext = EmptyCoroutineContext ,
307
316
capacity : Int = BUFFERED
@@ -315,3 +324,31 @@ private class ChannelFlowBuilder<T>(
315
324
override fun toString (): String =
316
325
" block[$block ] -> ${super .toString()} "
317
326
}
327
+
328
+ private class CallbackFlowBuilder <T >(
329
+ private val block : suspend ProducerScope <T >.() -> Unit ,
330
+ context : CoroutineContext = EmptyCoroutineContext ,
331
+ capacity : Int = BUFFERED
332
+ ) : ChannelFlowBuilder<T>(block, context, capacity) {
333
+
334
+ override suspend fun collectTo (scope : ProducerScope <T >) {
335
+ super .collectTo(scope)
336
+ /*
337
+ * We expect user either call `awaitClose` from within a block (then the channel is closed at this moment)
338
+ * or being closed/cancelled externally/manually. Otherwise "user forgot to call
339
+ * awaitClose and receives unhelpful ClosedSendChannelException exceptions" situation is detected.
340
+ */
341
+ if (! scope.isClosedForSend) {
342
+ throw IllegalStateException (
343
+ """
344
+ 'awaitClose { yourCallbackOrListener.cancel() }' should be used in the end of callbackFlow block.
345
+ Otherwise, a callback/listener may leak in case of cancellation external cancellation (e.g. by 'take(1)' or destroyed activity).
346
+ See callbackFlow API documentation for the details.
347
+ """ .trimIndent()
348
+ )
349
+ }
350
+ }
351
+
352
+ override fun create (context : CoroutineContext , capacity : Int ): ChannelFlow <T > =
353
+ CallbackFlowBuilder (block, context, capacity)
354
+ }
0 commit comments