Skip to content

Commit ace5899

Browse files
zach-klippensteinqwwdfsad
authored andcommitted
Add distinctUntilChanged operator that uses a comparator function instead of a key extractor.
1 parent ae225bd commit ace5899

File tree

3 files changed

+64
-3
lines changed

3 files changed

+64
-3
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt

+1
Original file line numberDiff line numberDiff line change
@@ -833,6 +833,7 @@ public final class kotlinx/coroutines/flow/FlowKt {
833833
public static final fun delayEach (Lkotlinx/coroutines/flow/Flow;J)Lkotlinx/coroutines/flow/Flow;
834834
public static final fun delayFlow (Lkotlinx/coroutines/flow/Flow;J)Lkotlinx/coroutines/flow/Flow;
835835
public static final fun distinctUntilChanged (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
836+
public static final fun distinctUntilChanged (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
836837
public static final fun distinctUntilChangedBy (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
837838
public static final fun drop (Lkotlinx/coroutines/flow/Flow;I)Lkotlinx/coroutines/flow/Flow;
838839
public static final fun dropWhile (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;

kotlinx-coroutines-core/common/src/flow/operators/Distinct.kt

+21-1
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,37 @@ import kotlinx.coroutines.flow.unsafeFlow as flow
1818
@ExperimentalCoroutinesApi
1919
public fun <T> Flow<T>.distinctUntilChanged(): Flow<T> = distinctUntilChangedBy { it }
2020

21+
/**
22+
* Returns flow where all subsequent repetitions of the same value are filtered out, when compared
23+
* with each other via the provided [areEquivalent] function.
24+
*/
25+
@FlowPreview
26+
public fun <T> Flow<T>.distinctUntilChanged(areEquivalent: (old: T, new: T) -> Boolean): Flow<T> =
27+
distinctUntilChangedBy(keySelector = { it }, areEquivalent = areEquivalent)
28+
2129
/**
2230
* Returns flow where all subsequent repetitions of the same key are filtered out, where
2331
* key is extracted with [keySelector] function.
2432
*/
2533
@FlowPreview
2634
public fun <T, K> Flow<T>.distinctUntilChangedBy(keySelector: (T) -> K): Flow<T> =
35+
distinctUntilChangedBy(keySelector = keySelector, areEquivalent = { old, new -> old == new })
36+
37+
/**
38+
* Returns flow where all subsequent repetitions of the same key are filtered out, where
39+
* keys are extracted with [keySelector] function and compared with each other via the
40+
* provided [areEquivalent] function.
41+
*/
42+
private inline fun <T, K> Flow<T>.distinctUntilChangedBy(
43+
crossinline keySelector: (T) -> K,
44+
crossinline areEquivalent: (old: K, new: K) -> Boolean
45+
): Flow<T> =
2746
flow {
2847
var previousKey: Any? = NULL
2948
collect { value ->
3049
val key = keySelector(value)
31-
if (previousKey != key) {
50+
@Suppress("UNCHECKED_CAST")
51+
if (previousKey === NULL || !areEquivalent(previousKey as K, key)) {
3252
previousKey = key
3353
emit(value)
3454
}

kotlinx-coroutines-core/common/test/flow/operators/DistinctUntilChangedTest.kt

+42-2
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,28 @@ class DistinctUntilChangedTest : TestBase() {
3232
assertEquals(4, sum2)
3333
}
3434

35+
@Test
36+
fun testDistinctUntilChangedAreEquivalent() = runTest {
37+
val flow = flow {
38+
emit(Box(1))
39+
emit(Box(1))
40+
emit(Box(2))
41+
emit(Box(1))
42+
}
43+
44+
val sum1 = flow.distinctUntilChanged().map { it.i }.sum()
45+
val sum2 = flow.distinctUntilChanged { old, new -> old.i == new.i }.map { it.i }.sum()
46+
assertEquals(5, sum1)
47+
assertEquals(4, sum2)
48+
}
49+
50+
@Test
51+
fun testDistinctUntilChangedAreEquivalentSingleValue() = runTest {
52+
val flow = flowOf(1)
53+
val values = flow.distinctUntilChanged { _, _ -> fail("Expected not to compare single value.") }.toList()
54+
assertEquals(listOf(1), values)
55+
}
56+
3557
@Test
3658
fun testThrowingKeySelector() = runTest {
3759
val flow = flow {
@@ -50,8 +72,26 @@ class DistinctUntilChangedTest : TestBase() {
5072
}
5173

5274
@Test
53-
fun testDistinctUntilChangedNull() = runTest{
54-
val flow = flowOf(null, 1, null).distinctUntilChanged()
75+
fun testThrowingAreEquivalent() = runTest {
76+
val flow = flow {
77+
coroutineScope {
78+
launch(start = CoroutineStart.ATOMIC) {
79+
hang { expect(3) }
80+
}
81+
expect(2)
82+
emit(1)
83+
emit(2)
84+
}
85+
}.distinctUntilChanged { _, _ -> throw TestException() }
86+
87+
expect(1)
88+
assertFailsWith<TestException>(flow)
89+
finish(4)
90+
}
91+
92+
@Test
93+
fun testDistinctUntilChangedNull() = runTest {
94+
val flow = flowOf(null, 1, null, null).distinctUntilChanged()
5595
assertEquals(listOf(null, 1, null), flow.toList())
5696
}
5797
}

0 commit comments

Comments
 (0)