diff --git a/kotlinx-coroutines-core/jvm/src/flow/TickerFlow.kt b/kotlinx-coroutines-core/jvm/src/flow/TickerFlow.kt new file mode 100644 index 0000000000..d2a554d12c --- /dev/null +++ b/kotlinx-coroutines-core/jvm/src/flow/TickerFlow.kt @@ -0,0 +1,30 @@ +package kotlinx.coroutines.flow + +import kotlinx.coroutines.Job +import kotlinx.coroutines.channels.TickerMode +import kotlinx.coroutines.channels.ticker +import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.EmptyCoroutineContext + +/** + * Creates a flow that produces the first item after the given initial delay and subsequent items with the + * given delay between them. + * + * The resulting flow is basically using [ticker] + * + * This Flow stops producing elements immediately after [Job.cancel] invocation. + * + * @param delayMillis delay between each element in milliseconds. + * @param initialDelayMillis delay after which the first element will be produced (it is equal to [delayMillis] by default) in milliseconds. + * @param context context of the producing coroutine. + * @param mode specifies behavior when elements are not received ([FIXED_PERIOD][TickerMode.FIXED_PERIOD] by default). + */ +public fun tickerFlow( + delayMillis: Long, + initialDelayMillis: Long = delayMillis, + context: CoroutineContext = EmptyCoroutineContext, + mode: TickerMode = TickerMode.FIXED_PERIOD +): Flow { + require(delayMillis > 0) + return ticker(delayMillis, initialDelayMillis, context, mode).receiveAsFlow() +} \ No newline at end of file diff --git a/kotlinx-coroutines-core/jvm/test/flow/TickerFlowTest.kt b/kotlinx-coroutines-core/jvm/test/flow/TickerFlowTest.kt new file mode 100644 index 0000000000..02db27d6f6 --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/flow/TickerFlowTest.kt @@ -0,0 +1,99 @@ +package flow + +import kotlinx.coroutines.TestBase +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.flow.tickerFlow +import java.util.concurrent.CancellationException +import kotlin.test.Test +import kotlin.test.assertEquals + + +class TickerFlowTest : TestBase() { + + @Test(expected = IllegalArgumentException::class) + fun testNegativePeriod() = runTest { + // WHEN + tickerFlow(-1).launchIn(this) + } + + @Test(expected = IllegalArgumentException::class) + fun testZeroPeriod() = runTest { + // WHEN + tickerFlow(0).launchIn(this) + } + + @Test(expected = IllegalArgumentException::class) + fun testNegativeInitialDelay() = runTest { + // WHEN + tickerFlow(100, -1).launchIn(this) + } + + @Test + fun testInitialDelay() = runTest { + // GIVEN + val inbox = mutableListOf() + + // WHEN + tickerFlow(100, 200).onEach { + inbox.add(Unit) + }.launchIn(this) + + delay(500) + + // THEN + assertEquals(4, inbox.size) + } + + @Test + fun testZeroInitialDelay() = runTest { + // GIVEN + val inbox = mutableListOf() + + // WHEN + tickerFlow(100, 0).onEach { + inbox.add(Unit) + }.launchIn(this) + + delay(500) + + // THEN + assertEquals(6, inbox.size) + } + + + @Test + fun testReceive() = runTest { + // GIVEN + val inbox = mutableListOf() + + // WHEN + tickerFlow(100).onEach { + inbox.add(Unit) + }.launchIn(this) + + delay(500) + + // THEN + assertEquals(5, inbox.size) + } + + @Test + fun testDoNotReceiveAfterCancel() = runTest { + // GIVEN + val inbox = mutableListOf() + + // WHEN + val periodicTicker = + tickerFlow(100).onEach { + inbox.add(Unit) + }.launchIn(this) + + delay(50) + periodicTicker.cancel(CancellationException()) + + // THEN + assertEquals(0, inbox.size) + } +} \ No newline at end of file