Skip to content

Commit c9c735a

Browse files
Introduce basic Flow<T>.chunked operator (#4127)
* Introduce basic Flow<T>.chunked operator Fixes #1290 Co-authored-by: Dmitry Khalanskiy <[email protected]>
1 parent 2803a33 commit c9c735a

File tree

4 files changed

+126
-0
lines changed

4 files changed

+126
-0
lines changed

kotlinx-coroutines-core/api/kotlinx-coroutines-core.api

+1
Original file line numberDiff line numberDiff line change
@@ -995,6 +995,7 @@ public final class kotlinx/coroutines/flow/FlowKt {
995995
public static final fun cancellable (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
996996
public static final fun catch (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
997997
public static final fun channelFlow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
998+
public static final fun chunked (Lkotlinx/coroutines/flow/Flow;I)Lkotlinx/coroutines/flow/Flow;
998999
public static final fun collect (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
9991000
public static final synthetic fun collect (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
10001001
public static final fun collectIndexed (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;

kotlinx-coroutines-core/api/kotlinx-coroutines-core.klib.api

+1
Original file line numberDiff line numberDiff line change
@@ -473,6 +473,7 @@ final fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutine
473473
final fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutines.flow/cache(): kotlinx.coroutines.flow/Flow<#A> // kotlinx.coroutines.flow/cache|[email protected]<0:0>(){0§<kotlin.Any?>}[0]
474474
final fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutines.flow/cancellable(): kotlinx.coroutines.flow/Flow<#A> // kotlinx.coroutines.flow/cancellable|[email protected]<0:0>(){0§<kotlin.Any?>}[0]
475475
final fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutines.flow/catch(kotlin.coroutines/SuspendFunction2<kotlinx.coroutines.flow/FlowCollector<#A>, kotlin/Throwable, kotlin/Unit>): kotlinx.coroutines.flow/Flow<#A> // kotlinx.coroutines.flow/catch|[email protected]<0:0>(kotlin.coroutines.SuspendFunction2<kotlinx.coroutines.flow.FlowCollector<0:0>,kotlin.Throwable,kotlin.Unit>){0§<kotlin.Any?>}[0]
476+
final fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutines.flow/chunked(kotlin/Int): kotlinx.coroutines.flow/Flow<kotlin.collections/List<#A>> // kotlinx.coroutines.flow/chunked|[email protected]<0:0>(kotlin.Int){0§<kotlin.Any?>}[0]
476477
final fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutines.flow/concatWith(#A): kotlinx.coroutines.flow/Flow<#A> // kotlinx.coroutines.flow/concatWith|[email protected]<0:0>(0:0){0§<kotlin.Any?>}[0]
477478
final fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutines.flow/concatWith(kotlinx.coroutines.flow/Flow<#A>): kotlinx.coroutines.flow/Flow<#A> // kotlinx.coroutines.flow/concatWith|[email protected]<0:0>(kotlinx.coroutines.flow.Flow<0:0>){0§<kotlin.Any?>}[0]
478479
final fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutines.flow/conflate(): kotlinx.coroutines.flow/Flow<#A> // kotlinx.coroutines.flow/conflate|[email protected]<0:0>(){0§<kotlin.Any?>}[0]

kotlinx-coroutines-core/common/src/flow/operators/Transform.kt

+35
Original file line numberDiff line numberDiff line change
@@ -129,3 +129,38 @@ public fun <T> Flow<T>.runningReduce(operation: suspend (accumulator: T, value:
129129
emit(accumulator as T)
130130
}
131131
}
132+
133+
/**
134+
* Splits the given flow into a flow of non-overlapping lists each not exceeding the given [size] but never empty.
135+
* The last emitted list may have fewer elements than the given size.
136+
*
137+
* Example of usage:
138+
* ```
139+
* flowOf("a", "b", "c", "d", "e")
140+
* .chunked(2) // ["a", "b"], ["c", "d"], ["e"]
141+
* .map { it.joinToString(separator = "") }
142+
* .collect {
143+
* println(it) // Prints "ab", "cd", e"
144+
* }
145+
* ```
146+
*
147+
* @throws IllegalArgumentException if [size] is not positive.
148+
*/
149+
@ExperimentalCoroutinesApi
150+
public fun <T> Flow<T>.chunked(size: Int): Flow<List<T>> {
151+
require(size >= 1) { "Expected positive chunk size, but got $size" }
152+
return flow {
153+
var result: ArrayList<T>? = null // Do not preallocate anything
154+
collect { value ->
155+
// Allocate if needed
156+
val acc = result ?: ArrayList<T>(size).also { result = it }
157+
acc.add(value)
158+
if (acc.size == size) {
159+
emit(acc)
160+
// Cleanup, but don't allocate -- it might've been the case this is the last element
161+
result = null
162+
}
163+
}
164+
result?.let { emit(it) }
165+
}
166+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
package kotlinx.coroutines.flow.operators
2+
3+
import kotlinx.coroutines.*
4+
import kotlinx.coroutines.channels.*
5+
import kotlinx.coroutines.flow.*
6+
import kotlinx.coroutines.testing.*
7+
import kotlin.test.*
8+
9+
@OptIn(ExperimentalCoroutinesApi::class)
10+
class ChunkedTest : TestBase() {
11+
12+
@Test
13+
fun testChunked() = runTest {
14+
doTest(flowOf(1, 2, 3, 4, 5), 2, listOf(listOf(1, 2), listOf(3, 4), listOf(5)))
15+
doTest(flowOf(1, 2, 3, 4, 5), 3, listOf(listOf(1, 2, 3), listOf(4, 5)))
16+
doTest(flowOf(1, 2, 3, 4), 2, listOf(listOf(1, 2), listOf(3, 4)))
17+
doTest(flowOf(1), 3, listOf(listOf(1)))
18+
}
19+
20+
private suspend fun <T> doTest(flow: Flow<T>, chunkSize: Int, expected: List<List<T>>) {
21+
assertEquals(expected, flow.chunked(chunkSize).toList())
22+
assertEquals(flow.toList().chunked(chunkSize), flow.chunked(chunkSize).toList())
23+
}
24+
25+
@Test
26+
fun testEmpty() = runTest {
27+
doTest(emptyFlow<Int>(), 1, emptyList())
28+
doTest(emptyFlow<Int>(), 2, emptyList())
29+
}
30+
31+
@Test
32+
fun testChunkedCancelled() = runTest {
33+
val result = flow {
34+
expect(1)
35+
emit(1)
36+
emit(2)
37+
expect(2)
38+
}.chunked(1).buffer().take(1).toList()
39+
assertEquals(listOf(listOf(1)), result)
40+
finish(3)
41+
}
42+
43+
@Test
44+
fun testChunkedCancelledWithSuspension() = runTest {
45+
val result = flow {
46+
expect(1)
47+
emit(1)
48+
yield()
49+
expectUnreached()
50+
emit(2)
51+
}.chunked(1).buffer().take(1).toList()
52+
assertEquals(listOf(listOf(1)), result)
53+
finish(2)
54+
}
55+
56+
@Test
57+
fun testChunkedDoesNotIgnoreCancellation() = runTest {
58+
expect(1)
59+
val result = flow {
60+
coroutineScope {
61+
launch {
62+
hang { expect(2) }
63+
}
64+
yield()
65+
emit(1)
66+
emit(2)
67+
}
68+
}.chunked(1).take(1).toList()
69+
assertEquals(listOf(listOf(1)), result)
70+
finish(3)
71+
}
72+
73+
@Test
74+
fun testIae() {
75+
assertFailsWith<IllegalArgumentException> { emptyFlow<Int>().chunked(-1) }
76+
assertFailsWith<IllegalArgumentException> { emptyFlow<Int>().chunked(0) }
77+
assertFailsWith<IllegalArgumentException> { emptyFlow<Int>().chunked(Int.MIN_VALUE) }
78+
assertFailsWith<IllegalArgumentException> { emptyFlow<Int>().chunked(Int.MIN_VALUE + 1) }
79+
}
80+
81+
@Test
82+
fun testSample() = runTest {
83+
val result = flowOf("a", "b", "c", "d", "e")
84+
.chunked(2)
85+
.map { it.joinToString(separator = "") }
86+
.toList()
87+
assertEquals(listOf("ab", "cd", "e"), result)
88+
}
89+
}

0 commit comments

Comments
 (0)