Skip to content

Add debounce selector #2148

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 9 commits into from
6 changes: 3 additions & 3 deletions docs/shared-mutable-state-and-concurrency.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ but others are unique.

### The problem

Let us launch a hundred coroutines all doing the same action thousand times.
Let us launch a hundred coroutines all doing the same action a thousand times.
We'll also measure their completion time for further comparisons:

<div class="sample" markdown="1" theme="idea" data-highlight-only>
Expand Down Expand Up @@ -102,7 +102,7 @@ increment the `counter` concurrently from multiple threads without any synchroni

### Volatiles are of no help

There is common misconception that making a variable `volatile` solves concurrency problem. Let us try it:
There is a common misconception that making a variable `volatile` solves concurrency problem. Let us try it:

<!--- CLEAR -->

Expand Down Expand Up @@ -158,7 +158,7 @@ do not provide atomicity of larger actions (increment in our case).
### Thread-safe data structures

The general solution that works both for threads and for coroutines is to use a thread-safe (aka synchronized,
linearizable, or atomic) data structure that provides all the necessarily synchronization for the corresponding
linearizable, or atomic) data structure that provides all the necessary synchronization for the corresponding
operations that needs to be performed on a shared state.
In the case of a simple counter we can use `AtomicInteger` class which has atomic `incrementAndGet` operations:

Expand Down
175 changes: 144 additions & 31 deletions kotlinx-coroutines-core/common/src/flow/operators/Delay.kt
Original file line number Diff line number Diff line change
Expand Up @@ -64,36 +64,58 @@ fun main() = runBlocking {
*/
@FlowPreview
public fun <T> Flow<T>.debounce(timeoutMillis: Long): Flow<T> {
require(timeoutMillis > 0) { "Debounce timeout should be positive" }
return scopedFlow { downstream ->
// Actually Any, KT-30796
val values = produce<Any?>(capacity = Channel.CONFLATED) {
collect { value -> send(value ?: NULL) }
}
var lastValue: Any? = null
while (lastValue !== DONE) {
select<Unit> {
// Should be receiveOrClosed when boxing issues are fixed
values.onReceiveOrNull {
if (it == null) {
if (lastValue != null) downstream.emit(NULL.unbox(lastValue))
lastValue = DONE
} else {
lastValue = it
}
}
require(timeoutMillis >= 0L) { "Debounce timeout should not be negative" }
if (timeoutMillis == 0L) return this
return debounceInternal { timeoutMillis }
}

lastValue?.let { value ->
// set timeout when lastValue != null
onTimeout(timeoutMillis) {
lastValue = null // Consume the value
downstream.emit(NULL.unbox(value))
}
}
}
}
/**
* Returns a flow that mirrors the original flow, but filters out values
* that are followed by the newer values within the given [timeout][timeoutMillis].
* The latest value is always emitted.
*
* A variation of [debounce] that allows specifying the timeout value dynamically.
*
* Example:
*
* ```kotlin
* flow {
* emit(1)
* delay(90)
* emit(2)
* delay(90)
* emit(3)
* delay(1010)
* emit(4)
* delay(1010)
* emit(5)
* }.debounce {
* if (it == 1) {
* 0L
* } else {
* 1000L
* }
* }
* ```
* <!--- KNIT example-delay-02.kt -->
*
* produces the following emissions
*
* ```text
* 1, 3, 4, 5
* ```
* <!--- TEST -->
*
* Note that the resulting flow does not emit anything as long as the original flow emits
* items faster than every [timeoutMillis] milliseconds.
*
* @param timeoutMillis [T] is the emitted value and the return value is timeout in milliseconds.
*/
@FlowPreview
public fun <T> Flow<T>.debounce(timeoutMillis: (T) -> Long): Flow<T> =
debounceInternal { emittedItem ->
timeoutMillis(emittedItem)
}
}

/**
* Returns a flow that mirrors the original flow, but filters out values
Expand Down Expand Up @@ -129,7 +151,98 @@ public fun <T> Flow<T>.debounce(timeoutMillis: Long): Flow<T> {
*/
@ExperimentalTime
@FlowPreview
public fun <T> Flow<T>.debounce(timeout: Duration): Flow<T> = debounce(timeout.toDelayMillis())
public fun <T> Flow<T>.debounceWithDuration(timeout: Duration): Flow<T> = debounce(timeout.toDelayMillis())

/**
* Returns a flow that mirrors the original flow, but filters out values
* that are followed by the newer values within the given [timeout].
* The latest value is always emitted.
*
* A variation of [debounceWithDuration] that allows specifying the timeout value dynamically.
*
* Example:
*
* ```kotlin
* flow {
* emit(1)
* delay(90.milliseconds)
* emit(2)
* delay(90.milliseconds)
* emit(3)
* delay(1010.milliseconds)
* emit(4)
* delay(1010.milliseconds)
* emit(5)
* }.debounce {
* if (it == 1) {
* 0.milliseconds
* } else {
* 1000.milliseconds
* }
* }
* ```
* <!--- KNIT example-delay-duration-02.kt -->
*
* produces the following emissions
*
* ```text
* 1, 3, 4, 5
* ```
* <!--- TEST -->
*
* Note that the resulting flow does not emit anything as long as the original flow emits
* items faster than every [timeout] milliseconds.
*
* @param timeout [T] is the emitted value and the return value is timeout in [Duration].
*/
@ExperimentalTime
@FlowPreview
public fun <T> Flow<T>.debounceWithDuration(timeout: (T) -> Duration): Flow<T> =
debounceInternal { emittedItem ->
timeout(emittedItem).toDelayMillis()
}

private fun <T> Flow<T>.debounceInternal(timeoutMillisSelector: (T) -> Long) : Flow<T> =
scopedFlow { downstream ->
// Actually Any, KT-30796
val values = produce<Any?>(capacity = 0) {
collect { value -> send(value ?: NULL) }
}
var lastValue: Any? = null
while (lastValue !== DONE) {
select<Unit> {
// Give a chance to consume lastValue first before onReceiveOrNull receives a new value
lastValue?.let { value ->
val unboxedValue: T = NULL.unbox(value)
val timeoutMillis = timeoutMillisSelector(unboxedValue)
require(timeoutMillis >= 0L) { "Debounce timeout should not be negative" }

if (timeoutMillis == 0L) {
lastValue = null
runBlocking {
launch { downstream.emit(unboxedValue) }
}
} else {
// Set timeout when lastValue != null
onTimeout(timeoutMillis) {
lastValue = null // Consume the value
downstream.emit(unboxedValue)
}
}
}

// Should be receiveOrClosed when boxing issues are fixed
values.onReceiveOrNull {
if (it == null) {
if (lastValue != null) downstream.emit(NULL.unbox(lastValue))
lastValue = DONE
} else {
lastValue = it
}
}
}
}
}

/**
* Returns a flow that emits only the latest value emitted by the original flow during the given sampling [period][periodMillis].
Expand All @@ -144,7 +257,7 @@ public fun <T> Flow<T>.debounce(timeout: Duration): Flow<T> = debounce(timeout.t
* }
* }.sample(200)
* ```
* <!--- KNIT example-delay-02.kt -->
* <!--- KNIT example-delay-03.kt -->
*
* produces the following emissions
*
Expand Down Expand Up @@ -215,7 +328,7 @@ internal fun CoroutineScope.fixedPeriodTicker(delayMillis: Long, initialDelayMil
* }
* }.sample(200.milliseconds)
* ```
* <!--- KNIT example-delay-duration-02.kt -->
* <!--- KNIT example-delay-duration-03.kt -->
*
* produces the following emissions
*
Expand Down
108 changes: 102 additions & 6 deletions kotlinx-coroutines-core/common/test/flow/operators/DebounceTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import kotlin.time.*

class DebounceTest : TestBase() {
@Test
public fun testBasic() = withVirtualTime {
fun testBasic() = withVirtualTime {
expect(1)
val flow = flow {
expect(3)
Expand Down Expand Up @@ -159,7 +159,7 @@ class DebounceTest : TestBase() {
expect(2)
throw TestException()
}.flowOn(NamedDispatchers("source")).debounce(Long.MAX_VALUE).map {
expectUnreached()
expectUnreached()
}
assertFailsWith<TestException>(flow)
finish(3)
Expand All @@ -175,7 +175,6 @@ class DebounceTest : TestBase() {
expect(2)
yield()
throw TestException()
it
}

assertFailsWith<TestException>(flow)
Expand All @@ -193,7 +192,6 @@ class DebounceTest : TestBase() {
expect(2)
yield()
throw TestException()
it
}

assertFailsWith<TestException>(flow)
Expand All @@ -202,7 +200,7 @@ class DebounceTest : TestBase() {

@ExperimentalTime
@Test
public fun testDurationBasic() = withVirtualTime {
fun testDurationBasic() = withVirtualTime {
expect(1)
val flow = flow {
expect(3)
Expand All @@ -219,8 +217,106 @@ class DebounceTest : TestBase() {
}

expect(2)
val result = flow.debounce(1000.milliseconds).toList()
val result = flow.debounceWithDuration(1000.milliseconds).toList()
assertEquals(listOf("A", "D", "E"), result)
finish(5)
}

@ExperimentalTime
@Test
fun testDebounceSelectorBasic() = withVirtualTime {
expect(1)
val flow = flow {
expect(3)
emit(1)
delay(90)
emit(2)
delay(90)
emit(3)
delay(1010)
emit(4)
delay(1010)
emit(5)
expect(4)
}

expect(2)
val result = flow.debounce {
if (it == 1) {
0
} else {
1000
}
}.toList()

assertEquals(listOf(1, 3, 4, 5), result)
finish(5)
}

@Test
fun testZeroDebounceTime() = withVirtualTime {
expect(1)
val flow = flow {
expect(3)
emit("A")
emit("B")
emit("C")
expect(4)
}

expect(2)
val result = flow.debounce(0).toList()

assertEquals(listOf("A", "B", "C"), result)
finish(5)
}

@ExperimentalTime
@Test
fun testZeroDebounceTimeSelector() = withVirtualTime {
expect(1)
val flow = flow {
expect(3)
emit("A")
emit("B")
expect(4)
}

expect(2)
val result = flow.debounce { 0 }.toList()

assertEquals(listOf("A", "B"), result)
finish(5)
}

@ExperimentalTime
@Test
fun testDebounceDurationSelectorBasic() = withVirtualTime {
expect(1)
val flow = flow {
expect(3)
emit("A")
delay(1500.milliseconds)
emit("B")
delay(500.milliseconds)
emit("C")
delay(250.milliseconds)
emit("D")
delay(2000.milliseconds)
emit("E")
expect(4)
}

expect(2)
val result = flow.debounceWithDuration {
if (it == "C") {
0.milliseconds
} else {
1000.milliseconds
}
}.toList()

assertEquals(listOf("A", "C", "D", "E"), result)
finish(5)
}
}