From 86a503da72d9a47717d59049fc0dd2493f147440 Mon Sep 17 00:00:00 2001 From: Lukasz Wojtach Date: Fri, 20 Sep 2019 10:04:05 +0200 Subject: [PATCH 1/7] Add chunked and windowed operators --- .../common/src/flow/operators/Chunk.kt | 91 +++++++++++++++ .../common/src/internal/RingBuffer.kt | 105 ++++++++++++++++++ .../common/test/flow/operators/ChunkedTest.kt | 59 ++++++++++ .../test/flow/operators/WindowedTest.kt | 82 ++++++++++++++ 4 files changed, 337 insertions(+) create mode 100644 kotlinx-coroutines-core/common/src/flow/operators/Chunk.kt create mode 100644 kotlinx-coroutines-core/common/src/internal/RingBuffer.kt create mode 100644 kotlinx-coroutines-core/common/test/flow/operators/ChunkedTest.kt create mode 100644 kotlinx-coroutines-core/common/test/flow/operators/WindowedTest.kt diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Chunk.kt b/kotlinx-coroutines-core/common/src/flow/operators/Chunk.kt new file mode 100644 index 0000000000..ea20035cd1 --- /dev/null +++ b/kotlinx-coroutines-core/common/src/flow/operators/Chunk.kt @@ -0,0 +1,91 @@ +package kotlinx.coroutines.flow.operators + +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.internal.RingBuffer +import kotlin.math.max +import kotlin.math.min + +/** + * Returns a flow of lists each not exceeding the given [size]. + *The last list in the resulting flow may have less elements than the given [size]. + * + * @param size the number of elements to take in each list, must be positive and can be greater than the number of elements in this flow. + */ +fun Flow.chunked(size: Int): Flow> = chunked(size) { it.toList() } + +/** + * Chunks a flow of elements into flow of lists, each not exceeding the given [size] + * and applies the given [transform] function to an each. + * + * Note that the list passed to the [transform] function is ephemeral and is valid only inside that function. + * You should not store it or allow it to escape in some way, unless you made a snapshot of it. + * The last list may have less elements than the given [size]. + * + * This is slightly faster, than using flow.chunked(n).map { ... } + * + * @param size the number of elements to take in each list, must be positive and can be greater than the number of elements in this flow. + */ +fun Flow.chunked(size: Int, transform: suspend (List) -> R): Flow { + require(size > 0) { "Size should be greater than 0, but was $size" } + return windowed(size, size, true, transform) +} + +/** + * Returns a flow of snapshots of the window of the given [size] + * sliding along this flow with the given [step], where each + * snapshot is a list. + * + * Several last lists may have less elements than the given [size]. + * + * Both [size] and [step] must be positive and can be greater than the number of elements in this flow. + * @param size the number of elements to take in each window + * @param step the number of elements to move the window forward by on an each step + * @param partialWindows controls whether or not to keep partial windows in the end if any. + */ +fun Flow.windowed(size: Int, step: Int, partialWindows: Boolean): Flow> = + windowed(size, step, partialWindows) { it.toList() } + +/** + * Returns a flow of results of applying the given [transform] function to + * an each list representing a view over the window of the given [size] + * sliding along this collection with the given [step]. + * + * Note that the list passed to the [transform] function is ephemeral and is valid only inside that function. + * You should not store it or allow it to escape in some way, unless you made a snapshot of it. + * Several last lists may have less elements than the given [size]. + * + * This is slightly faster, than using flow.windowed(...).map { ... } + * + * Both [size] and [step] must be positive and can be greater than the number of elements in this collection. + * @param size the number of elements to take in each window + * @param step the number of elements to move the window forward by on an each step. + * @param partialWindows controls whether or not to keep partial windows in the end if any. + */ +fun Flow.windowed(size: Int, step: Int, partialWindows: Boolean, transform: suspend (List) -> R): Flow { + require(size > 0 && step > 0) { "Size and step should be greater than 0, but was size: $size, step: $step" } + + return flow { + val buffer = RingBuffer(size) + val toDrop = min(step, size) + val toSkip = max(step - size, 0) + var skipped = toSkip + + collect { value -> + if(toSkip == skipped) buffer.add(value) + else skipped++ + + if (buffer.isFull()) { + emit(transform(buffer)) + buffer.removeFirst(toDrop) + skipped = 0 + } + } + + while (partialWindows && buffer.isNotEmpty()) { + emit(transform(buffer)) + buffer.removeFirst(min(toDrop, buffer.size)) + } + } +} \ No newline at end of file diff --git a/kotlinx-coroutines-core/common/src/internal/RingBuffer.kt b/kotlinx-coroutines-core/common/src/internal/RingBuffer.kt new file mode 100644 index 0000000000..377b520541 --- /dev/null +++ b/kotlinx-coroutines-core/common/src/internal/RingBuffer.kt @@ -0,0 +1,105 @@ +package kotlinx.coroutines.internal + +internal class RingBuffer(val capacity: Int) : AbstractList(), RandomAccess { + init { + require(capacity >= 0) { "ring buffer capacity should not be negative but it is $capacity" } + } + + private val buffer = arrayOfNulls(capacity) + private var startIndex: Int = 0 + + override var size: Int = 0 + private set + + override fun get(index: Int): T { + require(index in 0 until size) { "Index out of bounds: $index" } + @Suppress("UNCHECKED_CAST") + return buffer[startIndex.forward(index)] as T + } + + fun isFull() = size == capacity + + override fun iterator(): Iterator = object : AbstractIterator() { + private var count = size + private var index = startIndex + + override fun computeNext() { + if (count == 0) { + done() + } else { + @Suppress("UNCHECKED_CAST") + setNext(buffer[index] as T) + index = index.forward(1) + count-- + } + } + } + + @Suppress("UNCHECKED_CAST") + override fun toArray(array: Array): Array { + val result: Array = + if (array.size < this.size) array.copyOf(this.size) else array as Array + + val size = this.size + + var widx = 0 + var idx = startIndex + + while (widx < size && idx < capacity) { + result[widx] = buffer[idx] as T + widx++ + idx++ + } + + idx = 0 + while (widx < size) { + result[widx] = buffer[idx] as T + widx++ + idx++ + } + if (result.size > this.size) result[this.size] = null + + return result as Array + } + + override fun toArray(): Array { + return toArray(arrayOfNulls(size)) + } + + /** + * Add [element] to the buffer or fail with [IllegalStateException] if no free space available in the buffer + */ + fun add(element: T) { + check(!isFull()) { "Ring buffer is full" } + + buffer[startIndex.forward(size)] = element + size++ + } + + /** + * Removes [n] first elements from the buffer or fails with [IllegalArgumentException] if not enough elements in the buffer to remove + */ + fun removeFirst(n: Int) { + require(n >= 0) { "n shouldn't be negative but it is $n" } + require(n <= size) { "n shouldn't be greater than the buffer size: n = $n, size = $size" } + + if (n > 0) { + val start = startIndex + val end = start.forward(n) + + if (start > end) { + buffer.fill(null, start, capacity) + buffer.fill(null, 0, end) + } else { + buffer.fill(null, start, end) + } + + startIndex = end + size -= n + } + } + + + @Suppress("NOTHING_TO_INLINE") + private inline fun Int.forward(n: Int): Int = (this + n) % capacity +} \ No newline at end of file diff --git a/kotlinx-coroutines-core/common/test/flow/operators/ChunkedTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/ChunkedTest.kt new file mode 100644 index 0000000000..f3169d0d3f --- /dev/null +++ b/kotlinx-coroutines-core/common/test/flow/operators/ChunkedTest.kt @@ -0,0 +1,59 @@ +package kotlinx.coroutines.flow.operators + +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.flow.* +import kotlin.test.Test +import kotlin.test.assertEquals + +class ChunkedTest : TestBase() { + + private val flow = flow { + emit(1) + emit(2) + emit(3) + emit(4) + } + + @Test + fun `Chunks correct number of emissions with possible partial window at the end`() = runTest { + assertEquals(2, flow.chunked(2).count()) + assertEquals(2, flow.chunked(3).count()) + assertEquals(1, flow.chunked(5).count()) + } + + @Test + fun `Throws IllegalArgumentException for chunk of size less than 1`() { + assertFailsWith { flow.chunked(0) } + assertFailsWith { flow.chunked(-1) } + } + + @Test + fun `No emissions with empty flow`() = runTest { + assertEquals(0, flowOf().chunked(2).count()) + } + + @Test + fun testErrorCancelsUpstream() = runTest { + val latch = Channel() + val flow = flow { + coroutineScope { + launch(start = CoroutineStart.ATOMIC) { + latch.send(Unit) + hang { expect(3) } + } + emit(1) + expect(1) + emit(2) + expectUnreached() + } + }.chunked(2) { chunk -> + expect(2) // 2 + latch.receive() + throw TestException() + }.catch { emit(42) } + + assertEquals(42, flow.single()) + finish(4) + } +} \ No newline at end of file diff --git a/kotlinx-coroutines-core/common/test/flow/operators/WindowedTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/WindowedTest.kt new file mode 100644 index 0000000000..ca2c02297f --- /dev/null +++ b/kotlinx-coroutines-core/common/test/flow/operators/WindowedTest.kt @@ -0,0 +1,82 @@ +package kotlinx.coroutines.flow.operators + +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.flow.* +import kotlin.test.Test +import kotlin.test.assertEquals + +class WindowedTest : TestBase() { + + private val flow = flow { + emit(1) + emit(2) + emit(3) + emit(4) + } + + @Test + fun `Throws IllegalArgumentException for window of size or step less than 1`() { + assertFailsWith { flow.windowed(0, 1, false) } + assertFailsWith { flow.windowed(-1, 2, false) } + assertFailsWith { flow.windowed(2, 0, false) } + assertFailsWith { flow.windowed(5, -2, false) } + } + + @Test + fun `No emissions with empty flow`() = runTest { + assertEquals(0, flowOf().windowed(2, 2, false).count()) + } + + @Test + fun `Emits correct sum with overlapping non partial windows`() = runTest { + assertEquals(15, flow.windowed(3, 1, false) { window -> + window.sum() + }.sum()) + } + + @Test + fun `Emits correct sum with overlapping partial windows`() = runTest { + assertEquals(13, flow.windowed(3, 2, true) { window -> + window.sum() + }.sum()) + } + + @Test + fun `Emits correct number of overlapping windows for long sequence of overlapping partial windows`() = runTest { + val elements = generateSequence(1) { it + 1 }.take(100) + val flow = elements.asFlow().windowed(100, 1, true) { } + assertEquals(100, flow.count()) + } + + @Test + fun `Emits correct sum with partial windows set apart`() = runTest { + assertEquals(7, flow.windowed(2, 3, true) { window -> + window.sum() + }.sum()) + } + + @Test + fun testErrorCancelsUpstream() = runTest { + val latch = Channel() + val flow = flow { + coroutineScope { + launch(start = CoroutineStart.ATOMIC) { + latch.send(Unit) + hang { expect(3) } + } + emit(1) + expect(1) + emit(2) + expectUnreached() + } + }.windowed(2, 3, false) { window -> + expect(2) // 2 + latch.receive() + throw TestException() + }.catch { emit(42) } + + assertEquals(42, flow.single()) + finish(4) + } +} \ No newline at end of file From dbe84eaa441f47d7156ca959b0cabf7ac46b9da0 Mon Sep 17 00:00:00 2001 From: Lukasz Wojtach Date: Thu, 19 Mar 2020 00:07:09 +0100 Subject: [PATCH 2/7] Add changes requested by R Elizarov Add @FlowPreview Add public modifier Minor improvement in docs --- .../common/src/flow/operators/Chunk.kt | 23 +++++++++++++------ 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Chunk.kt b/kotlinx-coroutines-core/common/src/flow/operators/Chunk.kt index ea20035cd1..87d4ed844d 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Chunk.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Chunk.kt @@ -1,5 +1,6 @@ package kotlinx.coroutines.flow.operators +import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.flow @@ -9,11 +10,13 @@ import kotlin.math.min /** * Returns a flow of lists each not exceeding the given [size]. - *The last list in the resulting flow may have less elements than the given [size]. + * The last list in the resulting flow may have less elements than the given [size]. * * @param size the number of elements to take in each list, must be positive and can be greater than the number of elements in this flow. */ -fun Flow.chunked(size: Int): Flow> = chunked(size) { it.toList() } + +@FlowPreview +public fun Flow.chunked(size: Int): Flow> = chunked(size) { it.toList() } /** * Chunks a flow of elements into flow of lists, each not exceeding the given [size] @@ -23,11 +26,13 @@ fun Flow.chunked(size: Int): Flow> = chunked(size) { it.toList() * You should not store it or allow it to escape in some way, unless you made a snapshot of it. * The last list may have less elements than the given [size]. * - * This is slightly faster, than using flow.chunked(n).map { ... } + * This is more efficient, than using flow.chunked(n).map { ... } * * @param size the number of elements to take in each list, must be positive and can be greater than the number of elements in this flow. */ -fun Flow.chunked(size: Int, transform: suspend (List) -> R): Flow { + +@FlowPreview +public fun Flow.chunked(size: Int, transform: suspend (List) -> R): Flow { require(size > 0) { "Size should be greater than 0, but was $size" } return windowed(size, size, true, transform) } @@ -44,7 +49,9 @@ fun Flow.chunked(size: Int, transform: suspend (List) -> R): Flow Flow.windowed(size: Int, step: Int, partialWindows: Boolean): Flow> = + +@FlowPreview +public fun Flow.windowed(size: Int, step: Int, partialWindows: Boolean): Flow> = windowed(size, step, partialWindows) { it.toList() } /** @@ -56,14 +63,16 @@ fun Flow.windowed(size: Int, step: Int, partialWindows: Boolean): Flow
  • Flow.windowed(size: Int, step: Int, partialWindows: Boolean, transform: suspend (List) -> R): Flow { + +@FlowPreview +public fun Flow.windowed(size: Int, step: Int, partialWindows: Boolean, transform: suspend (List) -> R): Flow { require(size > 0 && step > 0) { "Size and step should be greater than 0, but was size: $size, step: $step" } return flow { From 6155c22d99e0cbcb9bd1117f87673047f9a60f28 Mon Sep 17 00:00:00 2001 From: Lukasz Wojtach Date: Thu, 19 Mar 2020 00:13:51 +0100 Subject: [PATCH 3/7] Generate api dump --- kotlinx-coroutines-core/api/kotlinx-coroutines-core.api | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api index a6e5fd513e..359a795e6e 100644 --- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api +++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api @@ -1020,6 +1020,13 @@ public final class kotlinx/coroutines/flow/internal/SendingCollector : kotlinx/c public fun emit (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; } +public final class kotlinx/coroutines/flow/operators/ChunkKt { + public static final fun chunked (Lkotlinx/coroutines/flow/Flow;I)Lkotlinx/coroutines/flow/Flow; + public static final fun chunked (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; + public static final fun windowed (Lkotlinx/coroutines/flow/Flow;IIZ)Lkotlinx/coroutines/flow/Flow; + public static final fun windowed (Lkotlinx/coroutines/flow/Flow;IIZLkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; +} + public final class kotlinx/coroutines/intrinsics/CancellableKt { public static final fun startCoroutineCancellable (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)V } From c74bfcf2757f7fca2be633f325aa444d68e8623c Mon Sep 17 00:00:00 2001 From: Lukasz Wojtach Date: Thu, 19 Mar 2020 00:32:38 +0100 Subject: [PATCH 4/7] Use ArrayDeque from stdlib instead of custom RingBuffer Minor fix in test --- .../common/src/flow/operators/Chunk.kt | 11 ++++++----- .../common/test/flow/operators/WindowedTest.kt | 2 +- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Chunk.kt b/kotlinx-coroutines-core/common/src/flow/operators/Chunk.kt index 87d4ed844d..535536efd4 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Chunk.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Chunk.kt @@ -71,30 +71,31 @@ public fun Flow.windowed(size: Int, step: Int, partialWindows: Boolean): * @param partialWindows controls whether or not to keep partial windows in the end if any. */ +@OptIn(ExperimentalStdlibApi::class) @FlowPreview public fun Flow.windowed(size: Int, step: Int, partialWindows: Boolean, transform: suspend (List) -> R): Flow { require(size > 0 && step > 0) { "Size and step should be greater than 0, but was size: $size, step: $step" } return flow { - val buffer = RingBuffer(size) + val buffer = ArrayDeque(size) val toDrop = min(step, size) val toSkip = max(step - size, 0) var skipped = toSkip collect { value -> - if(toSkip == skipped) buffer.add(value) + if (toSkip == skipped) buffer.addLast(value) else skipped++ - if (buffer.isFull()) { + if (buffer.size == size) { emit(transform(buffer)) - buffer.removeFirst(toDrop) + repeat(toDrop) { buffer.removeFirst() } skipped = 0 } } while (partialWindows && buffer.isNotEmpty()) { emit(transform(buffer)) - buffer.removeFirst(min(toDrop, buffer.size)) + repeat(min(toDrop, buffer.size)) { buffer.removeFirst() } } } } \ No newline at end of file diff --git a/kotlinx-coroutines-core/common/test/flow/operators/WindowedTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/WindowedTest.kt index ca2c02297f..173c50caf2 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/WindowedTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/WindowedTest.kt @@ -45,7 +45,7 @@ class WindowedTest : TestBase() { @Test fun `Emits correct number of overlapping windows for long sequence of overlapping partial windows`() = runTest { val elements = generateSequence(1) { it + 1 }.take(100) - val flow = elements.asFlow().windowed(100, 1, true) { } + val flow = elements.asFlow().windowed(100, 1, true) assertEquals(100, flow.count()) } From 07f72e981d04579c7fbf11f60835c1d0ba04f7a0 Mon Sep 17 00:00:00 2001 From: Lukasz Wojtach Date: Thu, 19 Mar 2020 00:46:56 +0100 Subject: [PATCH 5/7] Change package name to match convention --- .../common/src/flow/operators/Chunk.kt | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Chunk.kt b/kotlinx-coroutines-core/common/src/flow/operators/Chunk.kt index 535536efd4..cfc2121d58 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Chunk.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Chunk.kt @@ -1,12 +1,11 @@ -package kotlinx.coroutines.flow.operators +@file:JvmMultifileClass +@file:JvmName("FlowKt") -import kotlinx.coroutines.FlowPreview -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.collect -import kotlinx.coroutines.flow.flow -import kotlinx.coroutines.internal.RingBuffer -import kotlin.math.max -import kotlin.math.min +package kotlinx.coroutines.flow + +import kotlinx.coroutines.* +import kotlin.jvm.* +import kotlin.math.* /** * Returns a flow of lists each not exceeding the given [size]. From 9392bbda6333b70c8280ed9cf16fa7cdf3f6243d Mon Sep 17 00:00:00 2001 From: Lukasz Wojtach Date: Fri, 27 Mar 2020 00:25:46 +0100 Subject: [PATCH 6/7] Remove RingBuffer from coroutines lib --- .../common/src/internal/RingBuffer.kt | 105 ------------------ 1 file changed, 105 deletions(-) delete mode 100644 kotlinx-coroutines-core/common/src/internal/RingBuffer.kt diff --git a/kotlinx-coroutines-core/common/src/internal/RingBuffer.kt b/kotlinx-coroutines-core/common/src/internal/RingBuffer.kt deleted file mode 100644 index 377b520541..0000000000 --- a/kotlinx-coroutines-core/common/src/internal/RingBuffer.kt +++ /dev/null @@ -1,105 +0,0 @@ -package kotlinx.coroutines.internal - -internal class RingBuffer(val capacity: Int) : AbstractList(), RandomAccess { - init { - require(capacity >= 0) { "ring buffer capacity should not be negative but it is $capacity" } - } - - private val buffer = arrayOfNulls(capacity) - private var startIndex: Int = 0 - - override var size: Int = 0 - private set - - override fun get(index: Int): T { - require(index in 0 until size) { "Index out of bounds: $index" } - @Suppress("UNCHECKED_CAST") - return buffer[startIndex.forward(index)] as T - } - - fun isFull() = size == capacity - - override fun iterator(): Iterator = object : AbstractIterator() { - private var count = size - private var index = startIndex - - override fun computeNext() { - if (count == 0) { - done() - } else { - @Suppress("UNCHECKED_CAST") - setNext(buffer[index] as T) - index = index.forward(1) - count-- - } - } - } - - @Suppress("UNCHECKED_CAST") - override fun toArray(array: Array): Array { - val result: Array = - if (array.size < this.size) array.copyOf(this.size) else array as Array - - val size = this.size - - var widx = 0 - var idx = startIndex - - while (widx < size && idx < capacity) { - result[widx] = buffer[idx] as T - widx++ - idx++ - } - - idx = 0 - while (widx < size) { - result[widx] = buffer[idx] as T - widx++ - idx++ - } - if (result.size > this.size) result[this.size] = null - - return result as Array - } - - override fun toArray(): Array { - return toArray(arrayOfNulls(size)) - } - - /** - * Add [element] to the buffer or fail with [IllegalStateException] if no free space available in the buffer - */ - fun add(element: T) { - check(!isFull()) { "Ring buffer is full" } - - buffer[startIndex.forward(size)] = element - size++ - } - - /** - * Removes [n] first elements from the buffer or fails with [IllegalArgumentException] if not enough elements in the buffer to remove - */ - fun removeFirst(n: Int) { - require(n >= 0) { "n shouldn't be negative but it is $n" } - require(n <= size) { "n shouldn't be greater than the buffer size: n = $n, size = $size" } - - if (n > 0) { - val start = startIndex - val end = start.forward(n) - - if (start > end) { - buffer.fill(null, start, capacity) - buffer.fill(null, 0, end) - } else { - buffer.fill(null, start, end) - } - - startIndex = end - size -= n - } - } - - - @Suppress("NOTHING_TO_INLINE") - private inline fun Int.forward(n: Int): Int = (this + n) % capacity -} \ No newline at end of file From 8e7ee37477179afa0e025a1bd2f27f17e62be40d Mon Sep 17 00:00:00 2001 From: Lukasz Wojtach Date: Fri, 27 Mar 2020 00:26:34 +0100 Subject: [PATCH 7/7] Update coroutines api with chunked and windowed operators ...and remove them from ChunkKt file api --- .../api/kotlinx-coroutines-core.api | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api index 359a795e6e..0d4c906051 100644 --- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api +++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api @@ -866,6 +866,8 @@ public final class kotlinx/coroutines/flow/FlowKt { public static final fun callbackFlow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; public static final fun catch (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow; public static final fun channelFlow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; + public static final fun chunked (Lkotlinx/coroutines/flow/Flow;I)Lkotlinx/coroutines/flow/Flow; + public static final fun chunked (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; public static final fun collect (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun collect (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun collectIndexed (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; @@ -984,6 +986,8 @@ public final class kotlinx/coroutines/flow/FlowKt { public static final fun transform (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow; public static final fun transformLatest (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow; public static final fun unsafeTransform (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow; + public static final fun windowed (Lkotlinx/coroutines/flow/Flow;IIZ)Lkotlinx/coroutines/flow/Flow; + public static final fun windowed (Lkotlinx/coroutines/flow/Flow;IIZLkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; public static final fun withIndex (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow; public static final fun zip (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow; } @@ -1020,13 +1024,6 @@ public final class kotlinx/coroutines/flow/internal/SendingCollector : kotlinx/c public fun emit (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; } -public final class kotlinx/coroutines/flow/operators/ChunkKt { - public static final fun chunked (Lkotlinx/coroutines/flow/Flow;I)Lkotlinx/coroutines/flow/Flow; - public static final fun chunked (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; - public static final fun windowed (Lkotlinx/coroutines/flow/Flow;IIZ)Lkotlinx/coroutines/flow/Flow; - public static final fun windowed (Lkotlinx/coroutines/flow/Flow;IIZLkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; -} - public final class kotlinx/coroutines/intrinsics/CancellableKt { public static final fun startCoroutineCancellable (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)V }