Skip to content

Commit 6f64a1b

Browse files
committed
Optimize debounce operator allocation pressure by using conflated produce. Previously it was not possible due to not implemented #1235
1 parent a3763e8 commit 6f64a1b

File tree

2 files changed

+22
-19
lines changed

2 files changed

+22
-19
lines changed

kotlinx-coroutines-core/common/src/flow/internal/NullSurrogate.kt

+8
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,11 @@ import kotlin.jvm.*
1414
@JvmField
1515
@SharedImmutable
1616
internal val NULL = Symbol("NULL")
17+
18+
/*
19+
* Symbol used to indicate that the flow is complete.
20+
* It should never leak to the outside world.
21+
*/
22+
@JvmField
23+
@SharedImmutable
24+
internal val DONE = Symbol("DONE")

kotlinx-coroutines-core/common/src/flow/operators/Delay.kt

+14-19
Original file line numberDiff line numberDiff line change
@@ -62,18 +62,21 @@ public fun <T> Flow<T>.delayEach(timeMillis: Long): Flow<T> = flow {
6262
public fun <T> Flow<T>.debounce(timeoutMillis: Long): Flow<T> {
6363
require(timeoutMillis > 0) { "Debounce timeout should be positive" }
6464
return scopedFlow { downstream ->
65-
val values = Channel<Any?>(Channel.CONFLATED) // Actually Any, KT-30796
66-
// Channel is not closed deliberately as there is no close with value
67-
val collector = async {
68-
collect { value -> values.send(value ?: NULL) }
65+
// Actually Any, KT-30796
66+
val values = produce<Any?>(capacity = Channel.CONFLATED) {
67+
collect { value -> send(value ?: NULL) }
6968
}
70-
71-
var isDone = false
7269
var lastValue: Any? = null
73-
while (!isDone) {
70+
while (lastValue !== DONE) {
7471
select<Unit> {
75-
values.onReceive {
76-
lastValue = it
72+
// Should be receiveOrClosed when boxing issues are fixed
73+
values.onReceiveOrNull {
74+
if (it == null) {
75+
if (lastValue != null) downstream.emit(NULL.unbox(lastValue))
76+
lastValue = DONE
77+
} else {
78+
lastValue = it
79+
}
7780
}
7881

7982
lastValue?.let { value ->
@@ -83,12 +86,6 @@ public fun <T> Flow<T>.debounce(timeoutMillis: Long): Flow<T> {
8386
downstream.emit(NULL.unbox(value))
8487
}
8588
}
86-
87-
// Close with value 'idiom'
88-
collector.onAwait {
89-
if (lastValue != null) downstream.emit(NULL.unbox(lastValue))
90-
isDone = true
91-
}
9289
}
9390
}
9491
}
@@ -118,16 +115,14 @@ public fun <T> Flow<T>.sample(periodMillis: Long): Flow<T> {
118115
// Actually Any, KT-30796
119116
collect { value -> send(value ?: NULL) }
120117
}
121-
122-
var isDone = false
123118
var lastValue: Any? = null
124119
val ticker = fixedPeriodTicker(periodMillis)
125-
while (!isDone) {
120+
while (lastValue !== DONE) {
126121
select<Unit> {
127122
values.onReceiveOrNull {
128123
if (it == null) {
129124
ticker.cancel(ChildCancelledException())
130-
isDone = true
125+
lastValue = DONE
131126
} else {
132127
lastValue = it
133128
}

0 commit comments

Comments
 (0)