@@ -435,7 +435,7 @@ public fun <T> Flow<T>.timeout(
435
435
private fun <T > Flow<T>.timeoutInternal (
436
436
timeoutMillis : Long ,
437
437
action : suspend FlowCollector <T >.() -> Unit
438
- ): Flow <T > = scopedFlow< T > { downStream ->
438
+ ): Flow <T > = scopedFlow { downStream ->
439
439
require(timeoutMillis >= 0L ) { " Timeout should not be negative" }
440
440
441
441
// Produce the values using the default (rendezvous) channel
@@ -451,7 +451,7 @@ private fun <T> Flow<T>.timeoutInternal(
451
451
452
452
send(it ? : NULL )
453
453
454
- // We reset the job here! . The reason being is that the `flow.emit()` suspends, which in turn suspends `send()`.
454
+ // We reset the job here. The reason being is that the `flow.emit()` suspends, which in turn suspends `send()`.
455
455
// We only want to measure a timeout if the producer took longer than `timeoutMillis`, not producer + consumer
456
456
timeoutJob = launch {
457
457
delay(timeoutMillis)
@@ -464,24 +464,23 @@ private fun <T> Flow<T>.timeoutInternal(
464
464
}
465
465
}
466
466
467
- // Await for values from our producer now
468
- whileSelect {
469
- values.onReceiveOrNull { value ->
470
- if (value != = DONE ) {
471
- if (value == = TIMEOUT ) {
472
- throw InternalFlowTimeoutException ()
467
+ try {
468
+ // Await for values from our producer now
469
+ whileSelect {
470
+ values.onReceiveOrNull { value ->
471
+ if (value != = DONE ) {
472
+ if (value == = TIMEOUT ) {
473
+ throw InternalFlowTimeoutException ()
474
+ }
475
+ downStream.emit(NULL .unbox(value))
476
+ return @onReceiveOrNull true
473
477
}
474
- downStream.emit(NULL .unbox(value))
475
- return @onReceiveOrNull true
478
+ return @onReceiveOrNull false // We got the DONE signal, so exit the while loop
476
479
}
477
- return @onReceiveOrNull false // We got the DONE signal, so exit the while loop
478
480
}
479
- }
480
- }.catch { e ->
481
- if (e is InternalFlowTimeoutException ) {
482
- action()
483
- } else {
484
- throw e
481
+ } catch (e: InternalFlowTimeoutException ) {
482
+ action(downStream)
483
+ values.cancel(ChildCancelledException ())
485
484
}
486
485
}
487
486
0 commit comments