Skip to content

Commit c74bfcf

Browse files
committed
Use ArrayDeque from stdlib instead of custom RingBuffer
Minor fix in test
1 parent 6155c22 commit c74bfcf

File tree

2 files changed

+7
-6
lines changed

2 files changed

+7
-6
lines changed

kotlinx-coroutines-core/common/src/flow/operators/Chunk.kt

+6-5
Original file line numberDiff line numberDiff line change
@@ -71,30 +71,31 @@ public fun <T> Flow<T>.windowed(size: Int, step: Int, partialWindows: Boolean):
7171
* @param partialWindows controls whether or not to keep partial windows in the end if any.
7272
*/
7373

74+
@OptIn(ExperimentalStdlibApi::class)
7475
@FlowPreview
7576
public fun <T, R> Flow<T>.windowed(size: Int, step: Int, partialWindows: Boolean, transform: suspend (List<T>) -> R): Flow<R> {
7677
require(size > 0 && step > 0) { "Size and step should be greater than 0, but was size: $size, step: $step" }
7778

7879
return flow {
79-
val buffer = RingBuffer<T>(size)
80+
val buffer = ArrayDeque<T>(size)
8081
val toDrop = min(step, size)
8182
val toSkip = max(step - size, 0)
8283
var skipped = toSkip
8384

8485
collect { value ->
85-
if(toSkip == skipped) buffer.add(value)
86+
if (toSkip == skipped) buffer.addLast(value)
8687
else skipped++
8788

88-
if (buffer.isFull()) {
89+
if (buffer.size == size) {
8990
emit(transform(buffer))
90-
buffer.removeFirst(toDrop)
91+
repeat(toDrop) { buffer.removeFirst() }
9192
skipped = 0
9293
}
9394
}
9495

9596
while (partialWindows && buffer.isNotEmpty()) {
9697
emit(transform(buffer))
97-
buffer.removeFirst(min(toDrop, buffer.size))
98+
repeat(min(toDrop, buffer.size)) { buffer.removeFirst() }
9899
}
99100
}
100101
}

kotlinx-coroutines-core/common/test/flow/operators/WindowedTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ class WindowedTest : TestBase() {
4545
@Test
4646
fun `Emits correct number of overlapping windows for long sequence of overlapping partial windows`() = runTest {
4747
val elements = generateSequence(1) { it + 1 }.take(100)
48-
val flow = elements.asFlow().windowed(100, 1, true) { }
48+
val flow = elements.asFlow().windowed(100, 1, true)
4949
assertEquals(100, flow.count())
5050
}
5151

0 commit comments

Comments
 (0)