Skip to content

Commit 794c223

Browse files
committed
withIndex and collectIndexed operators
Fixes #1247
1 parent 693142c commit 794c223

File tree

6 files changed

+84
-1
lines changed

6 files changed

+84
-1
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt

+6
Original file line numberDiff line numberDiff line change
@@ -824,6 +824,7 @@ public final class kotlinx/coroutines/flow/FlowKt {
824824
public static final fun channelFlow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
825825
public static final fun collect (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
826826
public static final fun collect (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
827+
public static final fun collectIndexed (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
827828
public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
828829
public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function4;)Lkotlinx/coroutines/flow/Flow;
829830
public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function5;)Lkotlinx/coroutines/flow/Flow;
@@ -921,6 +922,7 @@ public final class kotlinx/coroutines/flow/FlowKt {
921922
public static final fun unsafeFlow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
922923
public static final fun unsafeTransform (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
923924
public static final fun withContext (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function1;)V
925+
public static final fun withIndex (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
924926
public static final fun zip (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
925927
}
926928

@@ -939,6 +941,10 @@ public abstract class kotlinx/coroutines/flow/internal/ChannelFlow : kotlinx/cor
939941
public static synthetic fun update$default (Lkotlinx/coroutines/flow/internal/ChannelFlow;Lkotlin/coroutines/CoroutineContext;IILjava/lang/Object;)Lkotlinx/coroutines/flow/internal/ChannelFlow;
940942
}
941943

944+
public final class kotlinx/coroutines/flow/internal/FlowExceptions_commonKt {
945+
public static final fun checkIndexOverflow (I)I
946+
}
947+
942948
public final class kotlinx/coroutines/flow/internal/SafeCollector : kotlinx/coroutines/flow/FlowCollector {
943949
public fun <init> (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/CoroutineContext;)V
944950
public fun emit (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;

kotlinx-coroutines-core/common/src/flow/internal/FlowExceptions.common.kt

+9
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,12 @@ internal expect class AbortFlowException() : CancellationException
1616
* Exception used to cancel child of [scopedFlow] without cancelling the whole scope.
1717
*/
1818
internal expect class ChildCancelledException() : CancellationException
19+
20+
@Suppress("NOTHING_TO_INLINE")
21+
@PublishedApi
22+
internal inline fun checkIndexOverflow(index: Int): Int {
23+
if (index < 0) {
24+
throw ArithmeticException("Index overflow has happened")
25+
}
26+
return index
27+
}

kotlinx-coroutines-core/common/src/flow/operators/Limit.kt

-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

5-
65
@file:JvmMultifileClass
76
@file:JvmName("FlowKt")
87

kotlinx-coroutines-core/common/src/flow/operators/Transform.kt

+11
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,17 @@ public inline fun <T, R: Any> Flow<T>.mapNotNull(crossinline transform: suspend
6262
return@transform emit(transformed)
6363
}
6464

65+
/**
66+
* Returns a flow that wraps each element into [IndexedValue], containing value and its index (starting from zero).
67+
*/
68+
@ExperimentalCoroutinesApi
69+
public fun <T> Flow<T>.withIndex(): Flow<IndexedValue<T>> = flow {
70+
var index = 0
71+
collect { value ->
72+
emit(IndexedValue(checkIndexOverflow(index++), value))
73+
}
74+
}
75+
6576
/**
6677
* Returns a flow which performs the given [action] on each value of the original flow.
6778
*/

kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt

+13
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,19 @@ public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value
7575
override suspend fun emit(value: T) = action(value)
7676
})
7777

78+
/**
79+
* Terminal flow operator that collects the given flow with a provided [action] that takes the index of an element (zero-based) and the element.
80+
* If any exception occurs during collect or in the provided flow, this exception is rethrown from this method.
81+
*
82+
* See also [collect] and [withIndex].
83+
*/
84+
@ExperimentalCoroutinesApi
85+
public suspend inline fun <T> Flow<T>.collectIndexed(crossinline action: suspend (index: Int, value: T) -> Unit): Unit =
86+
collect(object : FlowCollector<T> {
87+
private var index = 0
88+
override suspend fun emit(value: T) = action(checkIndexOverflow(index++), value)
89+
})
90+
7891
/**
7992
* Collects all the values from the given [flow] and emits them to the collector.
8093
*
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.flow
6+
7+
import kotlinx.coroutines.*
8+
import kotlin.test.*
9+
10+
class IndexedTest : TestBase() {
11+
12+
@Test
13+
fun testWithIndex() = runTest {
14+
val flow = flowOf(3, 2, 1).withIndex()
15+
assertEquals(listOf(IndexedValue(0, 3), IndexedValue(1, 2), IndexedValue(2, 1)), flow.toList())
16+
}
17+
18+
@Test
19+
fun testWithIndexEmpty() = runTest {
20+
val flow = emptyFlow<Int>().withIndex()
21+
assertEquals(emptyList(), flow.toList())
22+
}
23+
24+
@Test
25+
fun testCollectIndexed() = runTest {
26+
val result = ArrayList<IndexedValue<Long>>()
27+
flowOf(3L, 2L, 1L).collectIndexed { index, value ->
28+
result.add(IndexedValue(index, value))
29+
}
30+
assertEquals(listOf(IndexedValue(0, 3L), IndexedValue(1, 2L), IndexedValue(2, 1L)), result)
31+
}
32+
33+
@Test
34+
fun testCollectIndexedEmptyFlow() = runTest {
35+
val flow = flow<Int> {
36+
expect(1)
37+
}
38+
39+
flow.collectIndexed { _, _ ->
40+
expectUnreached()
41+
}
42+
43+
finish(2)
44+
}
45+
}

0 commit comments

Comments
 (0)