Skip to content

Commit ff057ab

Browse files
committed
Check args validity right away. Pass ephemeral list to transformer.
1 parent 0f8bb71 commit ff057ab

File tree

3 files changed

+48
-38
lines changed

3 files changed

+48
-38
lines changed

kotlinx-coroutines-core/common/src/flow/operators/Chunk.kt

+35-23
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,28 @@ import kotlinx.coroutines.flow.Flow
44
import kotlinx.coroutines.flow.collect
55
import kotlinx.coroutines.flow.flow
66

7+
fun <T> Flow<T>.chunked(size: Int): Flow<List<T>> = chunked(size) { it.toList() }
78

8-
fun <T, R> Flow<T>.chunked(size: Int, transform: suspend (List<T>) -> R): Flow<R> = nonOverlappingWindowed(size, size, true, transform)
9+
fun <T, R> Flow<T>.chunked(size: Int, transform: suspend (List<T>) -> R): Flow<R> =
10+
nonOverlappingWindowed(size, size, true, transform)
11+
12+
fun <T> Flow<T>.windowed(size: Int, step: Int, partialWindows: Boolean): Flow<List<T>> =
13+
windowed(size, step, partialWindows) { it.toList() }
914

1015
fun <T, R> Flow<T>.windowed(size: Int, step: Int, partialWindows: Boolean, transform: suspend (List<T>) -> R): Flow<R> =
1116
if (size <= step) nonOverlappingWindowed(size, step, partialWindows, transform)
1217
else overlappingWindowed(size, step, partialWindows, transform)
1318

1419

15-
private fun <T, R> Flow<T>.nonOverlappingWindowed(size: Int, step: Int, partialWindows: Boolean, transform: suspend (List<T>) -> R): Flow<R> {
20+
private fun <T, R> Flow<T>.nonOverlappingWindowed(
21+
size: Int,
22+
step: Int,
23+
partialWindows: Boolean,
24+
transform: suspend (List<T>) -> R
25+
): Flow<R> {
26+
require(size in 1..step) { "Size should be non-negative, and equal to or lesser than step, but was size: $size, step: $step"}
1627

1728
return flow {
18-
require(size in 1..step)
1929
val window = ArrayList<T>(size)
2030
val toSkip = step - size
2131
var skipped = toSkip
@@ -26,39 +36,41 @@ private fun <T, R> Flow<T>.nonOverlappingWindowed(size: Int, step: Int, partialW
2636
} else skipped++
2737

2838
if (window.size == size) {
29-
emit(transform(window.toList()))
39+
emit(transform(window))
3040
window.clear()
3141
skipped = 0
3242
}
3343
}
3444

3545
if (partialWindows && window.isNotEmpty()) {
36-
emit(transform(window.toList()))
46+
emit(transform(window))
3747
}
3848
}
3949
}
4050

41-
private fun <T, R> Flow<T>.overlappingWindowed(size: Int, step: Int, partialWindows: Boolean, transform: suspend (List<T>) -> R) = flow {
42-
require(step in 1 until size)
43-
val buffer = ArrayList<T>(size)
51+
private inline fun <T, R> Flow<T>.overlappingWindowed(
52+
size: Int,
53+
step: Int,
54+
partialWindows: Boolean,
55+
crossinline transform: suspend (List<T>) -> R
56+
): Flow<R> {
57+
require(step in 1 until size) { "Size should be non-negative, and greater than step, but was size: $size, step: $step"}
58+
59+
return flow {
60+
val buffer = ArrayList<T>(size)
4461

45-
collect { value ->
46-
buffer.add(value)
62+
collect { value ->
63+
buffer.add(value)
4764

48-
if (buffer.size == size) {
49-
val window = makeWindow(step, buffer)
50-
emit(transform(window))
65+
if (buffer.size == size) {
66+
emit(transform(buffer))
67+
repeat(step) { buffer.removeAt(0) }
68+
}
5169
}
52-
}
5370

54-
while (partialWindows && buffer.isNotEmpty()) {
55-
val window = makeWindow(step, buffer)
56-
emit(transform(window))
71+
while (partialWindows && buffer.isNotEmpty()) {
72+
emit(transform(buffer))
73+
repeat(step) { buffer.removeAt(0) }
74+
}
5775
}
58-
}
59-
60-
private fun <T> makeWindow(step: Int, buffer: MutableList<T>): List<T> {
61-
val window = ArrayList(buffer)
62-
repeat(step) { buffer.removeAt(0) }
63-
return window
6476
}

kotlinx-coroutines-core/common/test/flow/operators/ChunkedTest.kt

+7-9
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@ import kotlinx.coroutines.channels.Channel
55
import kotlinx.coroutines.flow.*
66
import kotlin.test.Test
77
import kotlin.test.assertEquals
8-
import kotlin.test.assertNull
9-
import kotlin.test.assertTrue
108

119
class ChunkedTest : TestBase() {
1210

@@ -19,20 +17,20 @@ class ChunkedTest : TestBase() {
1917

2018
@Test
2119
fun `Chunks correct number of emissions with possible partial window at the end`() = runTest {
22-
assertEquals(2, flow.chunked(2) { }.count())
23-
assertEquals(2, flow.chunked(3) { }.count())
24-
assertEquals(1, flow.chunked(5) { }.count())
20+
assertEquals(2, flow.chunked(2).count())
21+
assertEquals(2, flow.chunked(3).count())
22+
assertEquals(1, flow.chunked(5).count())
2523
}
2624

2725
@Test
28-
fun `Throws IllegalArgumentException for chunk of size less than 1`() = runTest {
29-
assertFailsWith<IllegalArgumentException>(flow.chunked(0) {})
30-
assertFailsWith<IllegalArgumentException>(flow.chunked(-1) {})
26+
fun `Throws IllegalArgumentException for chunk of size less than 1`() {
27+
assertFailsWith<IllegalArgumentException> { flow.chunked(0) }
28+
assertFailsWith<IllegalArgumentException> { flow.chunked(-1) }
3129
}
3230

3331
@Test
3432
fun `No emissions with empty flow`() = runTest {
35-
assertEquals(0, flowOf<Int>().chunked(2) {}.count())
33+
assertEquals(0, flowOf<Int>().chunked(2).count())
3634
}
3735

3836
@Test

kotlinx-coroutines-core/common/test/flow/operators/WindowedTest.kt

+6-6
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,16 @@ class WindowedTest : TestBase() {
1616
}
1717

1818
@Test
19-
fun `Throws IllegalArgumentException for window of size or step less than 1`() = runTest {
20-
assertFailsWith<IllegalArgumentException>(flow.windowed(0, 1, false) {})
21-
assertFailsWith<IllegalArgumentException>(flow.windowed(-1, 2, false) {})
22-
assertFailsWith<IllegalArgumentException>(flow.windowed(2, 0, false) {})
23-
assertFailsWith<IllegalArgumentException>(flow.windowed(5, -2, false) {})
19+
fun `Throws IllegalArgumentException for window of size or step less than 1`() {
20+
assertFailsWith<IllegalArgumentException> { flow.windowed(0, 1, false) }
21+
assertFailsWith<IllegalArgumentException> { flow.windowed(-1, 2, false) }
22+
assertFailsWith<IllegalArgumentException> { flow.windowed(2, 0, false) }
23+
assertFailsWith<IllegalArgumentException> { flow.windowed(5, -2, false) }
2424
}
2525

2626
@Test
2727
fun `No emissions with empty flow`() = runTest {
28-
assertEquals(0, flowOf<Int>().windowed(2, 2, false) {}.count())
28+
assertEquals(0, flowOf<Int>().windowed(2, 2, false).count())
2929
}
3030

3131
@Test

0 commit comments

Comments
 (0)