Skip to content

Commit d2cb27d

Browse files
committed
Introduce basic Flow<T>.chunked operator
Fixes #1290
1 parent 84a6ef9 commit d2cb27d

File tree

4 files changed

+131
-0
lines changed

4 files changed

+131
-0
lines changed

kotlinx-coroutines-core/api/kotlinx-coroutines-core.api

+1
Original file line numberDiff line numberDiff line change
@@ -995,6 +995,7 @@ public final class kotlinx/coroutines/flow/FlowKt {
995995
public static final fun cancellable (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
996996
public static final fun catch (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
997997
public static final fun channelFlow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
998+
public static final fun chunked (Lkotlinx/coroutines/flow/Flow;I)Lkotlinx/coroutines/flow/Flow;
998999
public static final fun collect (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
9991000
public static final synthetic fun collect (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
10001001
public static final fun collectIndexed (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;

kotlinx-coroutines-core/api/kotlinx-coroutines-core.klib.api

+1
Original file line numberDiff line numberDiff line change
@@ -473,6 +473,7 @@ final fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutine
473473
final fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutines.flow/cache(): kotlinx.coroutines.flow/Flow<#A> // kotlinx.coroutines.flow/cache|[email protected]<0:0>(){0§<kotlin.Any?>}[0]
474474
final fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutines.flow/cancellable(): kotlinx.coroutines.flow/Flow<#A> // kotlinx.coroutines.flow/cancellable|[email protected]<0:0>(){0§<kotlin.Any?>}[0]
475475
final fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutines.flow/catch(kotlin.coroutines/SuspendFunction2<kotlinx.coroutines.flow/FlowCollector<#A>, kotlin/Throwable, kotlin/Unit>): kotlinx.coroutines.flow/Flow<#A> // kotlinx.coroutines.flow/catch|[email protected]<0:0>(kotlin.coroutines.SuspendFunction2<kotlinx.coroutines.flow.FlowCollector<0:0>,kotlin.Throwable,kotlin.Unit>){0§<kotlin.Any?>}[0]
476+
final fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutines.flow/chunked(kotlin/Int): kotlinx.coroutines.flow/Flow<kotlin.collections/List<#A>> // kotlinx.coroutines.flow/chunked|[email protected]<0:0>(kotlin.Int){0§<kotlin.Any?>}[0]
476477
final fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutines.flow/concatWith(#A): kotlinx.coroutines.flow/Flow<#A> // kotlinx.coroutines.flow/concatWith|[email protected]<0:0>(0:0){0§<kotlin.Any?>}[0]
477478
final fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutines.flow/concatWith(kotlinx.coroutines.flow/Flow<#A>): kotlinx.coroutines.flow/Flow<#A> // kotlinx.coroutines.flow/concatWith|[email protected]<0:0>(kotlinx.coroutines.flow.Flow<0:0>){0§<kotlin.Any?>}[0]
478479
final fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutines.flow/conflate(): kotlinx.coroutines.flow/Flow<#A> // kotlinx.coroutines.flow/conflate|[email protected]<0:0>(){0§<kotlin.Any?>}[0]

kotlinx-coroutines-core/common/src/flow/operators/Transform.kt

+38
Original file line numberDiff line numberDiff line change
@@ -129,3 +129,41 @@ public fun <T> Flow<T>.runningReduce(operation: suspend (accumulator: T, value:
129129
emit(accumulator as T)
130130
}
131131
}
132+
133+
/**
134+
* Splits the given flow into a flow of non-overlapping lists each not exceeding the given [size] but never empty.
135+
* The last emitted list may have fewer elements than the given size.
136+
*
137+
* Example of usage:
138+
* ```
139+
* flowOf("a", "b", "c", "d", "e")
140+
* .chunked(2) // ["a", "b"], ["c", "d"], ["e"]
141+
* .map { it.joinToString(separator = "") }
142+
* .collect {
143+
* println(it) // Prints "ab", "cd", e"
144+
* }
145+
* ```
146+
*
147+
* @throws IllegalArgumentException if [size] is not positive.
148+
*/
149+
@ExperimentalCoroutinesApi
150+
public fun <T> Flow<T>.chunked(size: Int): Flow<List<T>> {
151+
require(size >= 1) { "Expected positive chunk size, but got $size" }
152+
return flow {
153+
var result: ArrayList<T>? = null // Do not preallocate anything
154+
collect { value ->
155+
// Allocate if needed
156+
val acc = result ?: ArrayList<T>(size).also { result = it }
157+
acc.add(value)
158+
if (acc.size == size) {
159+
emit(acc)
160+
// Cleanup, but don't allocate -- it might've been the case this is the last element
161+
result = null
162+
}
163+
}
164+
val acc = result ?: return@flow
165+
if (acc.isNotEmpty()) {
166+
emit(acc)
167+
}
168+
}
169+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
package kotlinx.coroutines.flow.operators
2+
3+
import kotlinx.coroutines.*
4+
import kotlinx.coroutines.channels.*
5+
import kotlinx.coroutines.flow.*
6+
import kotlinx.coroutines.testing.*
7+
import kotlin.test.*
8+
9+
@OptIn(ExperimentalCoroutinesApi::class)
10+
class ChunkedTest : TestBase() {
11+
12+
@Test
13+
fun testChunked() {
14+
doTest(flowOf(1, 2, 3, 4, 5), 2, listOf(listOf(1, 2), listOf(3, 4), listOf(5)))
15+
doTest(flowOf(1, 2, 3, 4, 5), 3, listOf(listOf(1, 2, 3), listOf(4, 5)))
16+
doTest(flowOf(1, 2, 3, 4), 2, listOf(listOf(1, 2), listOf(3, 4)))
17+
doTest(flowOf(1), 3, listOf(listOf(1)))
18+
}
19+
20+
private fun <T> doTest(flow: Flow<T>, chunkSize: Int, expected: List<List<T>>) {
21+
runTest {
22+
assertEquals(expected, flow.chunked(chunkSize).toList())
23+
assertEquals(flow.toList().chunked(chunkSize), flow.chunked(chunkSize).toList())
24+
}
25+
}
26+
27+
@Test
28+
fun testEmpty() = runTest {
29+
doTest(emptyFlow<Int>(), 1, emptyList())
30+
doTest(emptyFlow<Int>(), 2, emptyList())
31+
}
32+
33+
@Test
34+
fun testChunkedCancelled() = runTest {
35+
val result = flow {
36+
expect(1)
37+
emit(1)
38+
emit(2)
39+
expect(2)
40+
}.chunked(1).buffer().take(1).toList()
41+
assertEquals(listOf(listOf(1)), result)
42+
finish(3)
43+
}
44+
45+
@Test
46+
fun testChunkedCancelledWithSuspension() = runTest {
47+
val result = flow {
48+
expect(1)
49+
emit(1)
50+
yield()
51+
expectUnreached()
52+
emit(2)
53+
}.chunked(1).buffer().take(1).toList()
54+
assertEquals(listOf(listOf(1)), result)
55+
finish(2)
56+
}
57+
58+
@Test
59+
fun testChunkedDoesNotIgnoreCancellation() = runTest {
60+
expect(1)
61+
val result = flow {
62+
coroutineScope {
63+
launch {
64+
hang { expect(2) }
65+
}
66+
yield()
67+
emit(1)
68+
emit(2)
69+
}
70+
}.chunked(1).take(1).toList()
71+
assertEquals(listOf(listOf(1)), result)
72+
finish(3)
73+
}
74+
75+
@Test
76+
fun testIae() {
77+
assertFailsWith<IllegalArgumentException> { emptyFlow<Int>().chunked(-1) }
78+
assertFailsWith<IllegalArgumentException> { emptyFlow<Int>().chunked(0) }
79+
assertFailsWith<IllegalArgumentException> { emptyFlow<Int>().chunked(Int.MIN_VALUE) }
80+
assertFailsWith<IllegalArgumentException> { emptyFlow<Int>().chunked(Int.MIN_VALUE + 1) }
81+
}
82+
83+
@Test
84+
fun testSample() = runTest {
85+
val result = flowOf("a", "b", "c", "d", "e")
86+
.chunked(2)
87+
.map { it.joinToString(separator = "") }
88+
.toList()
89+
assertEquals(listOf("ab", "cd", "e"), result)
90+
}
91+
}

0 commit comments

Comments
 (0)