diff --git a/kotlinx-coroutines-core/common/src/flow/internal/NullSurrogate.kt b/kotlinx-coroutines-core/common/src/flow/internal/NullSurrogate.kt index dbd7120e27..c6ff12fc4e 100644 --- a/kotlinx-coroutines-core/common/src/flow/internal/NullSurrogate.kt +++ b/kotlinx-coroutines-core/common/src/flow/internal/NullSurrogate.kt @@ -14,3 +14,11 @@ import kotlin.jvm.* @JvmField @SharedImmutable internal val NULL = Symbol("NULL") + +/* + * Symbol used to indicate that the flow is complete. + * It should never leak to the outside world. + */ +@JvmField +@SharedImmutable +internal val DONE = Symbol("DONE") diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt index 8d74be5584..09a8b7f636 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt @@ -62,18 +62,21 @@ public fun Flow.delayEach(timeMillis: Long): Flow = flow { public fun Flow.debounce(timeoutMillis: Long): Flow { require(timeoutMillis > 0) { "Debounce timeout should be positive" } return scopedFlow { downstream -> - val values = Channel(Channel.CONFLATED) // Actually Any, KT-30796 - // Channel is not closed deliberately as there is no close with value - val collector = async { - collect { value -> values.send(value ?: NULL) } + // Actually Any, KT-30796 + val values = produce(capacity = Channel.CONFLATED) { + collect { value -> send(value ?: NULL) } } - - var isDone = false var lastValue: Any? = null - while (!isDone) { + while (lastValue !== DONE) { select { - values.onReceive { - lastValue = it + // Should be receiveOrClosed when boxing issues are fixed + values.onReceiveOrNull { + if (it == null) { + if (lastValue != null) downstream.emit(NULL.unbox(lastValue)) + lastValue = DONE + } else { + lastValue = it + } } lastValue?.let { value -> @@ -83,12 +86,6 @@ public fun Flow.debounce(timeoutMillis: Long): Flow { downstream.emit(NULL.unbox(value)) } } - - // Close with value 'idiom' - collector.onAwait { - if (lastValue != null) downstream.emit(NULL.unbox(lastValue)) - isDone = true - } } } } @@ -118,16 +115,14 @@ public fun Flow.sample(periodMillis: Long): Flow { // Actually Any, KT-30796 collect { value -> send(value ?: NULL) } } - - var isDone = false var lastValue: Any? = null val ticker = fixedPeriodTicker(periodMillis) - while (!isDone) { + while (lastValue !== DONE) { select { values.onReceiveOrNull { if (it == null) { ticker.cancel(ChildCancelledException()) - isDone = true + lastValue = DONE } else { lastValue = it }