diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt index 8379ae1551..b1375b257e 100644 --- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt +++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt @@ -796,6 +796,7 @@ public final class kotlinx/coroutines/flow/FlowKt { public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow; public static final fun count (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun count (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun debounce (Lkotlinx/coroutines/flow/Flow;J)Lkotlinx/coroutines/flow/Flow; public static final fun delayEach (Lkotlinx/coroutines/flow/Flow;J)Lkotlinx/coroutines/flow/Flow; public static final fun delayFlow (Lkotlinx/coroutines/flow/Flow;J)Lkotlinx/coroutines/flow/Flow; public static final fun distinctUntilChanged (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow; @@ -833,8 +834,10 @@ public final class kotlinx/coroutines/flow/FlowKt { public static final fun reduce (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun retry (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow; public static synthetic fun retry$default (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow; + public static final fun sample (Lkotlinx/coroutines/flow/Flow;J)Lkotlinx/coroutines/flow/Flow; public static final fun single (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun singleOrNull (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun switchMap (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; public static final fun take (Lkotlinx/coroutines/flow/Flow;I)Lkotlinx/coroutines/flow/Flow; public static final fun takeWhile (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; public static final fun toCollection (Lkotlinx/coroutines/flow/Flow;Ljava/util/Collection;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; diff --git a/gradle.properties b/gradle.properties index 62b308e509..7662a68090 100644 --- a/gradle.properties +++ b/gradle.properties @@ -13,6 +13,7 @@ bintray_version=1.8.4-jetbrains-5 byte_buddy_version=1.9.3 reactor_vesion=3.2.5.RELEASE reactive_streams_version=1.0.2 +rxjava2_version=2.2.8 artifactory_plugin_version=4.7.3 # JS diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt index bc407f015f..111ef7cf9a 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt @@ -8,8 +8,11 @@ package kotlinx.coroutines.flow import kotlinx.coroutines.* -import kotlinx.coroutines.flow.unsafeFlow as flow +import kotlinx.coroutines.channels.* +import kotlinx.coroutines.flow.internal.* +import kotlinx.coroutines.selects.* import kotlin.jvm.* +import kotlinx.coroutines.flow.unsafeFlow as flow /** * Delays the emission of values from this flow for the given [timeMillis]. @@ -17,9 +20,7 @@ import kotlin.jvm.* @FlowPreview public fun Flow.delayFlow(timeMillis: Long): Flow = flow { delay(timeMillis) - collect { value -> - emit(value) - } + collect(this@flow) } /** @@ -32,3 +33,133 @@ public fun Flow.delayEach(timeMillis: Long): Flow = flow { emit(value) } } + +/** + * Returns a flow that mirrors the original flow, but filters out values + * that are followed by the newer values within the given [timeout][timeoutMillis]. + * The latest value is always emitted. + * + * Example: + * ``` + * flow { + * emit(1) + * delay(99) + * emit(2) + * delay(99) + * emit(3) + * delay(1001) + * emit(4) + * delay(1001) + * emit(5) + * }.debounce(1000) + * ``` + * produces `3, 4, 5`. + * + * Note that the resulting flow does not emit anything as long as the original flow emits + * items faster than every [timeoutMillis] milliseconds. + */ +public fun Flow.debounce(timeoutMillis: Long): Flow { + require(timeoutMillis > 0) { "Debounce timeout should be positive" } + return flow { + coroutineScope { + val values = Channel(Channel.CONFLATED) // Actually Any, KT-30796 + // Channel is not closed deliberately as there is no close with value + val collector = launch { + try { + collect { value -> values.send(value ?: NullSurrogate) } + } catch (e: Throwable) { + values.close(e) // Workaround for #1130 + throw e + } + } + + var isDone = false + var lastValue: Any? = null + while (!isDone) { + select { + values.onReceive { + lastValue = it + } + + lastValue?.let { value -> // set timeout when lastValue != null + onTimeout(timeoutMillis) { + lastValue = null // Consume the value + emit(NullSurrogate.unbox(value)) + } + } + + // Close with value 'idiom' + collector.onJoin { + if (lastValue != null) emit(NullSurrogate.unbox(lastValue)) + isDone = true + } + } + } + } + } +} + +/** + * Returns a flow that emits only the latest value emitted by the original flow during the given sampling [period][periodMillis]. + * + * Example: + * ``` + * flow { + * repeat(10) { + * emit(it) + * delay(50) + * } + * }.sample(100) + * ``` + * produces `1, 3, 5, 7, 9`. + * + * Note that the latest element is not emitted if it does not fit into the sampling window. + */ +public fun Flow.sample(periodMillis: Long): Flow { + require(periodMillis > 0) { "Sample period should be positive" } + return flow { + coroutineScope { + val values = produce(capacity = Channel.CONFLATED) { // Actually Any, KT-30796 + collect { value -> send(value ?: NullSurrogate) } + } + + var isDone = false + var lastValue: Any? = null + val ticker = fixedPeriodTicker(periodMillis) + while (!isDone) { + select { + values.onReceiveOrNull { + if (it == null) { + ticker.cancel() + isDone = true + } else { + lastValue = it + } + } + + // todo: shall be start sampling only when an element arrives or sample aways as here? + ticker.onReceive { + val value = lastValue ?: return@onReceive + lastValue = null // Consume the value + emit(NullSurrogate.unbox(value)) + } + } + } + } + } +} + +/* + * TODO this design (and design of the corresponding operator) depends on #540 + */ +internal fun CoroutineScope.fixedPeriodTicker(delayMillis: Long, initialDelayMillis: Long = delayMillis): ReceiveChannel { + require(delayMillis >= 0) { "Expected non-negative delay, but has $delayMillis ms" } + require(initialDelayMillis >= 0) { "Expected non-negative initial delay, but has $initialDelayMillis ms" } + return produce(capacity = 0) { + delay(initialDelayMillis) + while (true) { + channel.send(Unit) + delay(delayMillis) + } + } +} diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt b/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt index 5f3d167e58..0b6625b8bb 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt @@ -45,9 +45,9 @@ public fun Flow.flatMapMerge(concurrency: Int = 16, bufferSize: Int = require(bufferSize >= 0) { "Expected non-negative buffer size, but had $bufferSize" } require(concurrency >= 0) { "Expected non-negative concurrency level, but had $concurrency" } return flow { - val semaphore = Channel(concurrency) - val flatMap = SerializingFlatMapCollector(this, bufferSize) coroutineScope { + val semaphore = Channel(concurrency) + val flatMap = SerializingFlatMapCollector(this@flow, bufferSize) collect { outerValue -> // TODO real semaphore (#94) semaphore.send(Unit) // Acquire concurrency permit @@ -89,15 +89,50 @@ public fun Flow>.flattenConcat(): Flow = flow { @FlowPreview public fun Flow>.flattenMerge(concurrency: Int = 16, bufferSize: Int = 16): Flow = flatMapMerge(concurrency, bufferSize) { it } +/** + * Returns a flow that switches to a new flow produced by [transform] function every time the original flow emits a value. + * When switch on the a flow is performed, the previous one is cancelled. + * + * For example, the following flow: + * ``` + * flow { + * emit("a") + * delay(100) + * emit("b") + * }.switchMap { value -> + * flow { + * emit(value + value) + * delay(200) + * emit(value + "_last") + * } + * } + * ``` + * produces `aa bb b_last` + */ +@FlowPreview +public fun Flow.switchMap(transform: suspend (value: T) -> Flow): Flow = flow { + coroutineScope { + var previousFlow: Job? = null + collect { value -> + // Linearize calls to emit as alternative to the channel. Bonus points for never-overlapping channels. + previousFlow?.cancelAndJoin() + // Undispatched to have better user experience in case of synchronous flows + previousFlow = launch(start = CoroutineStart.UNDISPATCHED) { + transform(value).collect { innerValue -> + emit(innerValue) + } + } + } + } +} // Effectively serializes access to downstream collector from flatMap private class SerializingFlatMapCollector( - private val downstream: FlowCollector, - private val bufferSize: Int + private val downstream: FlowCollector, bufferSize: Int ) { // Let's try to leverage the fact that flatMapMerge is never contended - // TODO 1.2.1 do not allocate channel + // TODO do not allocate channel private val channel = Channel(bufferSize) // Should be any, but KT-30796 private val inProgressLock = atomic(false) diff --git a/kotlinx-coroutines-core/common/test/TestBase.common.kt b/kotlinx-coroutines-core/common/test/TestBase.common.kt index 04e75e3264..735a5b12a6 100644 --- a/kotlinx-coroutines-core/common/test/TestBase.common.kt +++ b/kotlinx-coroutines-core/common/test/TestBase.common.kt @@ -17,6 +17,7 @@ public expect open class TestBase constructor() { public fun expect(index: Int) public fun expectUnreached() public fun finish(index: Int) + public fun ensureFinished() // Ensures that 'finish' was invoked public fun reset() // Resets counter and finish flag. Workaround for parametrized tests absence in common public fun runTest( @@ -44,20 +45,12 @@ public inline fun assertFailsWith(block: () -> Unit) { } public suspend inline fun assertFailsWith(flow: Flow<*>) { - var e: Throwable? = null - var completed = false - flow.launchIn(CoroutineScope(Dispatchers.Unconfined)) { - onEach {} - catch { - e = it - } - finally { - completed = true - assertTrue(it is T) - } - }.join() - assertTrue(e is T) - assertTrue(completed) + try { + flow.collect { /* Do nothing */ } + fail("Should be unreached") + } catch (e: Throwable) { + assertTrue(e is T) + } } public suspend fun Flow.sum() = fold(0) { acc, value -> acc + value } diff --git a/kotlinx-coroutines-core/common/test/NamedDispatchers.kt b/kotlinx-coroutines-core/common/test/flow/NamedDispatchers.kt similarity index 100% rename from kotlinx-coroutines-core/common/test/NamedDispatchers.kt rename to kotlinx-coroutines-core/common/test/flow/NamedDispatchers.kt diff --git a/kotlinx-coroutines-core/common/test/flow/VirtualTime.kt b/kotlinx-coroutines-core/common/test/flow/VirtualTime.kt new file mode 100644 index 0000000000..9b257d933e --- /dev/null +++ b/kotlinx-coroutines-core/common/test/flow/VirtualTime.kt @@ -0,0 +1,85 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines + +import kotlin.coroutines.* +import kotlin.jvm.* + +private class VirtualTimeDispatcher(enclosingScope: CoroutineScope) : CoroutineDispatcher(), Delay { + + private val originalDispatcher = enclosingScope.coroutineContext[ContinuationInterceptor] as CoroutineDispatcher + private val heap = ArrayList() // TODO use MPP heap/ordered set implementation (commonize ThreadSafeHeap) + private var currentTime = 0L + + init { + /* + * Launch "event-loop-owning" task on start of the virtual time event loop. + * It ensures the progress of the enclosing event-loop and polls the timed queue + * when the enclosing event loop is empty, emulating virtual time. + */ + enclosingScope.launch { + while (true) { + val delayNanos = ThreadLocalEventLoop.currentOrNull()?.processNextEvent() + ?: error("Event loop is missing, virtual time source works only as part of event loop") + if (delayNanos <= 0) continue + if (delayNanos > 0 && delayNanos != Long.MAX_VALUE) error("Unexpected external delay: $delayNanos") + val nextTask = heap.minBy { it.deadline } ?: return@launch + heap.remove(nextTask) + currentTime = nextTask.deadline + nextTask.run() + } + } + } + + private inner class TimedTask( + private val runnable: Runnable, + @JvmField val deadline: Long + ) : DisposableHandle, Runnable by runnable { + + override fun dispose() { + heap.remove(this) + } + } + + override fun dispatch(context: CoroutineContext, block: Runnable) { + originalDispatcher.dispatch(context, block) + } + + @ExperimentalCoroutinesApi + override fun isDispatchNeeded(context: CoroutineContext): Boolean = originalDispatcher.isDispatchNeeded(context) + + override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle { + val task = TimedTask(block, currentTime + timeMillis) + heap += task + return task + } + + override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation) { + val task = TimedTask(Runnable { with(continuation) { resumeUndispatched(Unit) } }, currentTime + timeMillis) + heap += task + continuation.invokeOnCancellation { task.dispose() } + } +} + +/** + * Runs a test ([TestBase.runTest]) with a virtual time source. + * This runner has the following constraints: + * 1) It works only in the event-loop environment and it is relying on it. + * None of the coroutines should be launched in any dispatcher different from a current + * 2) Regular tasks always dominate delayed ones. It means that + * `launch { while(true) yield() }` will block the progress of the delayed tasks + * 3) [TestBase.finish] should always be invoked. + * Given all the constraints into account, it is easy to mess up with a test and actually + * return from [withVirtualTime] before the test is executed completely. + * To decrease the probability of such error, additional `finish` constraint is added. + */ +public fun TestBase.withVirtualTime(block: suspend CoroutineScope.() -> Unit) = runTest { + withContext(Dispatchers.Unconfined) { + // Create a platform-independent event loop + val dispatcher = VirtualTimeDispatcher(this) + withContext(dispatcher) { block() } + ensureFinished() + } +} diff --git a/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt b/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt index 438b7c8313..c2051e7d33 100644 --- a/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt @@ -23,6 +23,7 @@ class ChannelFlowTest : TestBase() { @Test fun testExceptionInBroadcast() = runTest { + expect(1) val channel = broadcast(NonCancellable) { // otherwise failure will cancel scope as well repeat(10) { send(it + 1) @@ -30,7 +31,14 @@ class ChannelFlowTest : TestBase() { throw TestException() } assertEquals(15, channel.asFlow().take(5).sum()) - assertFailsWith(channel.asFlow()) + + // Workaround for JS bug + try { + channel.asFlow().collect { /* Do nothing */ } + expectUnreached() + } catch (e: TestException) { + finish(2) + } } @Test diff --git a/kotlinx-coroutines-core/common/test/flow/operators/CombineLatestTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/CombineLatestTest.kt index 5954e211ad..25bb75be57 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/CombineLatestTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/CombineLatestTest.kt @@ -16,7 +16,7 @@ class CombineLatestTest : TestBase() { fun testCombineLatest() = runTest { val flow = flowOf("a", "b", "c") val flow2 = flowOf(1, 2, 3) - val list = flow.combineLatest(flow2, { i, j -> i + j }).toList() + val list = flow.combineLatest(flow2) { i, j -> i + j }.toList() assertEquals(listOf("a1", "b1", "b2", "c2", "c3"), list) } @@ -24,7 +24,7 @@ class CombineLatestTest : TestBase() { fun testNulls() = runTest { val flow = flowOf("a", null, null) val flow2 = flowOf(1, 2, 3) - val list = flow.combineLatest(flow2, { i, j -> i + j }).toList() + val list = flow.combineLatest(flow2, { i, j -> i + j }).toList() assertEquals(listOf("a1", "null1", "null2", "null2", "null3"), list) } @@ -32,13 +32,13 @@ class CombineLatestTest : TestBase() { fun testNullsOther() = runTest { val flow = flowOf("a", "b", "c") val flow2 = flowOf(null, 2, null) - val list = flow.combineLatest(flow2, { i, j -> i + j }).toList() + val list = flow.combineLatest(flow2, { i, j -> i + j }).toList() assertEquals(listOf("anull", "bnull", "b2", "c2", "cnull"), list) } @Test fun testEmptyFlow() = runTest { - val flow = emptyFlow().combineLatest(emptyFlow(), { i, j -> i + j }) + val flow = emptyFlow().combineLatest(emptyFlow(), { i, j -> i + j }) assertNull(flow.singleOrNull()) } @@ -46,14 +46,14 @@ class CombineLatestTest : TestBase() { fun testFirstIsEmpty() = runTest { val f1 = emptyFlow() val f2 = flowOf(1) - assertEquals(emptyList(), f1.combineLatest(f2, { i, j -> i + j }).toList()) + assertEquals(emptyList(), f1.combineLatest(f2) { i, j -> i + j }.toList()) } @Test fun testSecondIsEmpty() = runTest { val f1 = flowOf("a") val f2 = emptyFlow() - assertEquals(emptyList(), f1.combineLatest(f2, { i, j -> i + j }).toList()) + assertEquals(emptyList(), f1.combineLatest(f2) { i, j -> i + j }.toList()) } @Test @@ -80,7 +80,7 @@ class CombineLatestTest : TestBase() { emit(3) } - val result = f1.combineLatest(f2, { i, j -> i + j }).toList() + val result = f1.combineLatest(f2) { i, j -> i + j }.toList() assertEquals(listOf("a1", "b1", "c1", "c2", "c3"), result) finish(8) } @@ -137,7 +137,7 @@ class CombineLatestTest : TestBase() { } } - val value = withContext(NamedDispatchers("main")) { + val value = withContext(NamedDispatchers("main")) { f1.combineLatest(f2) { i, j -> assertEquals("main", NamedDispatchers.name()) expect(5) @@ -170,8 +170,8 @@ class CombineLatestTest : TestBase() { expect(1) i + j }.flowOn(NamedDispatchers("combine")).onEach { - throw TestException() - } + throw TestException() + } assertFailsWith(flow) finish(4) diff --git a/kotlinx-coroutines-core/common/test/flow/operators/DebounceTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/DebounceTest.kt new file mode 100644 index 0000000000..a31854bade --- /dev/null +++ b/kotlinx-coroutines-core/common/test/flow/operators/DebounceTest.kt @@ -0,0 +1,199 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.flow + +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* +import kotlin.test.* + +class DebounceTest : TestBase() { + @Test + public fun testBasic() = withVirtualTime { + expect(1) + val flow = flow { + expect(3) + emit("A") + delay(1500) + emit("B") + delay(500) + emit("C") + delay(250) + emit("D") + delay(2000) + emit("E") + expect(4) + } + + expect(2) + val result = flow.debounce(1000).toList() + assertEquals(listOf("A", "D", "E"), result) + finish(5) + } + + @Test + fun testSingleNull() = runTest { + val flow = flowOf(null).debounce(Long.MAX_VALUE) + assertNull(flow.single()) + } + + @Test + fun testBasicWithNulls() = withVirtualTime { + expect(1) + val flow = flow { + expect(3) + emit("A") + delay(1500) + emit("B") + delay(500) + emit("C") + delay(250) + emit(null) + delay(2000) + emit(null) + expect(4) + } + + expect(2) + val result = flow.debounce(1000).toList() + assertEquals(listOf("A", null, null), result) + finish(5) + } + + @Test + fun testEmpty() = runTest { + val flow = emptyFlow().debounce(Long.MAX_VALUE) + assertNull(flow.singleOrNull()) + } + + @Test + fun testScalar() = withVirtualTime { + val flow = flowOf(1, 2, 3).debounce(1000) + assertEquals(3, flow.single()) + finish(1) + } + + @Test + fun testPace() = withVirtualTime { + val flow = flow { + expect(1) + repeat(10) { + emit(-it) + delay(99) + } + + repeat(10) { + emit(it) + delay(101) + } + expect(2) + }.debounce(100) + + assertEquals((0..9).toList(), flow.toList()) + finish(3) + } + + @Test + fun testUpstreamError() = runTest { + val latch = Channel() + val flow = flow { + expect(1) + emit(1) + expect(2) + latch.receive() + throw TestException() + }.debounce(1).map { + latch.send(Unit) + hang { expect(3) } + } + + assertFailsWith(flow) + finish(4) + } + + @Test + fun testUpstreamErrorIsolatedContext() = runTest { + val latch = Channel() + val flow = flow { + assertEquals("upstream", NamedDispatchers.name()) + expect(1) + emit(1) + expect(2) + latch.receive() + throw TestException() + }.flowOn(NamedDispatchers("upstream")).debounce(1).map { + latch.send(Unit) + hang { expect(3) } + } + + assertFailsWith(flow) + finish(4) + } + + @Test + fun testUpstreamErrorDebounceNotTriggered() = runTest { + val flow = flow { + expect(1) + emit(1) + expect(2) + throw TestException() + }.debounce(Long.MAX_VALUE).map { + expectUnreached() + } + assertFailsWith(flow) + finish(3) + } + + @Test + fun testUpstreamErrorDebounceNotTriggeredInIsolatedContext() = runTest { + val flow = flow { + expect(1) + emit(1) + expect(2) + throw TestException() + }.flowWith(NamedDispatchers("unused")) { + debounce(Long.MAX_VALUE).map { + expectUnreached() + } + } + + assertFailsWith(flow) + finish(3) + } + + @Test + fun testDownstreamError() = runTest { + val flow = flow { + expect(1) + emit(1) + hang { expect(3) } + }.debounce(100).map { + expect(2) + yield() + throw TestException() + it + } + + assertFailsWith(flow) + finish(4) + } + + @Test + fun testDownstreamErrorIsolatedContext() = runTest { + val flow = flow { + assertEquals("upstream", NamedDispatchers.name()) + expect(1) + emit(1) + hang { expect(3) } + }.flowOn(NamedDispatchers("upstream")).debounce(100).map { + expect(2) + yield() + throw TestException() + it + } + + assertFailsWith(flow) + finish(4) + } +} diff --git a/kotlinx-coroutines-core/common/test/flow/operators/SampleTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/SampleTest.kt new file mode 100644 index 0000000000..814aec669e --- /dev/null +++ b/kotlinx-coroutines-core/common/test/flow/operators/SampleTest.kt @@ -0,0 +1,274 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.flow.operators + +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* +import kotlinx.coroutines.flow.* +import kotlin.test.* + +class SampleTest : TestBase() { + @Test + public fun testBasic() = withVirtualTime { + expect(1) + val flow = flow { + expect(3) + emit("A") + delay(1500) + emit("B") + delay(500) + emit("C") + delay(250) + emit("D") + delay(2000) + emit("E") + expect(4) + } + + expect(2) + val result = flow.sample(1000).toList() + assertEquals(listOf("A", "B", "D"), result) + finish(5) + } + + @Test + fun testDelayedFirst() = withVirtualTime { + val flow = flow { + delay(60) + emit(1) + delay(60) + expect(1) + }.sample(100) + assertEquals(1, flow.singleOrNull()) + finish(2) + } + + @Test + fun testBasic2() = withVirtualTime { + expect(1) + val flow = flow { + expect(3) + emit(1) + emit(2) + delay(501) + emit(3) + delay(100) + emit(4) + delay(100) + emit(5) + emit(6) + delay(301) + emit(7) + delay(501) + expect(4) + } + + expect(2) + val result = flow.sample(500).toList() + assertEquals(listOf(2, 6, 7), result) + finish(5) + } + + @Test + fun testFixedDelay() = withVirtualTime { + val flow = flow { + emit("A") + delay(150) + emit("B") + expect(1) + }.sample(100) + assertEquals("A", flow.single()) + finish(2) + } + + @Test + fun testSingleNull() = withVirtualTime { + val flow = flow { + emit(null) + delay(2) + expect(1) + }.sample(1) + assertNull(flow.single()) + finish(2) + } + + @Test + fun testBasicWithNulls() = withVirtualTime { + expect(1) + val flow = flow { + expect(3) + emit("A") + delay(1500) + emit(null) + delay(500) + emit("C") + delay(250) + emit(null) + delay(2000) + emit("E") + expect(4) + } + + expect(2) + val result = flow.sample(1000).toList() + assertEquals(listOf("A", null, null), result) + finish(5) + } + + @Test + fun testEmpty() = runTest { + val flow = emptyFlow().sample(Long.MAX_VALUE) + assertNull(flow.singleOrNull()) + } + + @Test + fun testScalar() = runTest { + val flow = flowOf(1, 2, 3).sample(Long.MAX_VALUE) + assertNull(flow.singleOrNull()) + } + + @Test + // note that this test depends on the sampling strategy -- when sampling time starts on a quiescent flow that suddenly emits + fun testLongWait() = withVirtualTime { + expect(1) + val flow = flow { + expect(2) + emit("A") + delay(3500) // long delay -- multiple sampling intervals + emit("B") + delay(900) // crosses time = 4000 barrier + emit("C") + delay(3000) // long wait again + + } + val result = flow.sample(1000).toList() + assertEquals(listOf("A", "B", "C"), result) + finish(3) + } + + @Test + fun testPace() = withVirtualTime { + val flow = flow { + expect(1) + repeat(4) { + emit(-it) + delay(50) + } + + repeat(4) { + emit(it) + delay(100) + } + expect(2) + }.sample(100) + + assertEquals(listOf(-1, -3, 0, 1, 2, 3), flow.toList()) + finish(3) + } + + @Test + fun testUpstreamError() = runTest { + val latch = Channel() + val flow = flow { + expect(1) + emit(1) + expect(2) + latch.receive() + throw TestException() + }.sample(1).map { + latch.send(Unit) + hang { expect(3) } + } + + assertFailsWith(flow) + finish(4) + } + + @Test + fun testUpstreamErrorIsolatedContext() = runTest { + val latch = Channel() + val flow = flow { + assertEquals("upstream", NamedDispatchers.name()) + expect(1) + emit(1) + expect(2) + latch.receive() + throw TestException() + }.flowOn(NamedDispatchers("upstream")).sample(1).map { + latch.send(Unit) + hang { expect(3) } + } + + assertFailsWith(flow) + finish(4) + } + + @Test + fun testUpstreamErrorSampleNotTriggered() = runTest { + val flow = flow { + expect(1) + emit(1) + expect(2) + throw TestException() + }.sample(Long.MAX_VALUE).map { + expectUnreached() + } + assertFailsWith(flow) + finish(3) + } + + + @Test + fun testUpstreamErrorSampleNotTriggeredInIsolatedContext() = runTest { + val flow = flow { + expect(1) + emit(1) + expect(2) + throw TestException() + }.flowWith(NamedDispatchers("unused")) { + sample(Long.MAX_VALUE).map { + expectUnreached() + } + } + + assertFailsWith(flow) + finish(3) + } + + @Test + fun testDownstreamError() = runTest { + val flow = flow { + expect(1) + emit(1) + hang { expect(3) } + }.sample(100).map { + expect(2) + yield() + throw TestException() + it + } + + assertFailsWith(flow) + finish(4) + } + + @Test + fun testDownstreamErrorIsolatedContext() = runTest { + val flow = flow { + assertEquals("upstream", NamedDispatchers.name()) + expect(1) + emit(1) + hang { expect(3) } + }.flowOn(NamedDispatchers("upstream")).sample(100).map { + expect(2) + yield() + throw TestException() + it + } + + assertFailsWith(flow) + finish(4) + } +} diff --git a/kotlinx-coroutines-core/common/test/flow/operators/SwitchMapTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/SwitchMapTest.kt new file mode 100644 index 0000000000..933bb1628e --- /dev/null +++ b/kotlinx-coroutines-core/common/test/flow/operators/SwitchMapTest.kt @@ -0,0 +1,116 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.flow + +import kotlinx.coroutines.* +import kotlin.test.* + +class SwitchMapTest : TestBase() { + + @Test + fun testConstantDynamic() = runTest { + val flow = flowOf(1, 2, 3).switchMap { value -> (value until value + 3).asFlow() } + assertEquals(listOf(1, 2, 3, 2, 3, 4, 3, 4, 5), flow.toList()) + } + + @Test + fun testHangFlows() = runTest { + val flow = listOf(1, 2, 3, 4).asFlow() + val result = flow.switchMap { value -> + flow { + if (value != 4) hang { expect(value) } + else emit(42) + } + }.toList() + + assertEquals(listOf(42), result) + finish(4) + } + + @Test + fun testEmptyFlow() = runTest { + assertNull(emptyFlow().switchMap { flowOf(1) }.singleOrNull()) + } + + @Test + fun testIsolatedContext() = runTest { + val flow = flow { + assertEquals("source", NamedDispatchers.name()) + expect(1) + emit(2) + emit(4) + }.flowOn(NamedDispatchers("source")).switchMap { value -> + flow { + assertEquals("switch$value", NamedDispatchers.name()) + emit(value) + expect(value) + }.flowOn(NamedDispatchers("switch$value")) + }.onEach { + expect(it + 1) + assertEquals("main", NamedDispatchers.nameOr("main")) + } + + assertEquals(2, flow.count()) + finish(6) + } + + @Test + fun testFailureInTransform() = runTest { + val flow = flowOf(1, 2).switchMap { value -> + if (value == 1) { + flow { + emit(1) + hang { expect(1) } + } + } else { + expect(2) + throw TestException() + } + } + + assertFailsWith(flow) + finish(3) + } + + @Test + fun testFailureDownstream() = runTest { + val flow = flowOf(1).switchMap { value -> + flow { + expect(1) + emit(value) + expect(2) + hang { expect(4) } + } + }.flowOn(NamedDispatchers("downstream")).map { + expect(3) + throw TestException() + it + } + + assertFailsWith(flow) + finish(5) + } + + @Test + fun testFailureUpstream() = runTest { + val flow = flow { + expect(1) + emit(1) + yield() + expect(3) + throw TestException() + }.switchMap { + flow { + expect(2) + hang { + expect(4) + } + } + } + + assertFailsWith(flow) + finish(5) + } +} diff --git a/kotlinx-coroutines-core/js/test/TestBase.kt b/kotlinx-coroutines-core/js/test/TestBase.kt index 163688a8e2..3bf49ef8c9 100644 --- a/kotlinx-coroutines-core/js/test/TestBase.kt +++ b/kotlinx-coroutines-core/js/test/TestBase.kt @@ -57,6 +57,13 @@ public actual open class TestBase actual constructor() { finished = true } + /** + * Asserts that [finish] was invoked + */ + public actual fun ensureFinished() { + require(finished) { "finish(...) should be caller prior to this check" } + } + public actual fun reset() { check(actionIndex == 0 || finished) { "Expecting that 'finish(...)' was invoked, but it was not" } actionIndex = 0 diff --git a/kotlinx-coroutines-core/jvm/test/TestBase.kt b/kotlinx-coroutines-core/jvm/test/TestBase.kt index 6fef760af1..5c0117e3a9 100644 --- a/kotlinx-coroutines-core/jvm/test/TestBase.kt +++ b/kotlinx-coroutines-core/jvm/test/TestBase.kt @@ -32,7 +32,6 @@ private val VERBOSE = systemProp("test.verbose", false) * } * ``` */ -@Suppress("DEPRECATION") public actual open class TestBase actual constructor() { /** * Is `true` when running in a nightly stress test mode. @@ -114,6 +113,13 @@ public actual open class TestBase actual constructor() { check(!finished.getAndSet(true)) { "Should call 'finish(...)' at most once" } } + /** + * Asserts that [finish] was invoked + */ + public actual fun ensureFinished() { + require(finished.get()) { "finish(...) should be caller prior to this check" } + } + public actual fun reset() { check(actionIndex.get() == 0 || finished.get()) { "Expecting that 'finish(...)' was invoked, but it was not" } actionIndex.set(0) diff --git a/kotlinx-coroutines-core/native/test/TestBase.kt b/kotlinx-coroutines-core/native/test/TestBase.kt index afb1e03265..4f41faa5a1 100644 --- a/kotlinx-coroutines-core/native/test/TestBase.kt +++ b/kotlinx-coroutines-core/native/test/TestBase.kt @@ -52,6 +52,13 @@ public actual open class TestBase actual constructor() { finished = true } + /** + * Asserts that [finish] was invoked + */ + public actual fun ensureFinished() { + require(finished) { "finish(...) should be caller prior to this check" } + } + public actual fun reset() { check(actionIndex == 0 || finished) { "Expecting that 'finish(...)' was invoked, but it was not" } actionIndex = 0 diff --git a/reactive/kotlinx-coroutines-rx2/build.gradle b/reactive/kotlinx-coroutines-rx2/build.gradle index f87c89dd8c..0bb02b2b45 100644 --- a/reactive/kotlinx-coroutines-rx2/build.gradle +++ b/reactive/kotlinx-coroutines-rx2/build.gradle @@ -6,7 +6,7 @@ dependencies { compile project(':kotlinx-coroutines-reactive') testCompile project(':kotlinx-coroutines-reactive').sourceSets.test.output testCompile "org.reactivestreams:reactive-streams-tck:$reactive_streams_version" - compile 'io.reactivex.rxjava2:rxjava:2.1.9' + compile "io.reactivex.rxjava2:rxjava:$rxjava2_version" } tasks.withType(dokka.getClass()) {