-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Inconsistency between Flow combine and RxJava combineLatest #2082
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
Yes. It is the expected behavior. You can mimic Rx behavior with coroutines by using Does it help? |
Thank you for your reply @elizarov. I updated the test to use @Test
fun `combine Coroutines`() = runBlockingTest {
val subject1 = MutableStateFlow(1)
val subject2 = MutableStateFlow("a")
val values = mutableListOf<Pair<Int, String>>()
val job = launch(Dispatchers.Unconfined) {
combine(subject1, subject2) { intVal, strVal -> intVal to strVal }
.collect { values += it }
}
subject1.value = 2
subject2.value = "b"
subject1.value = 3
subject2.value = "c"
job.cancel()
println(values)
} Result:
Just as you said, now the emissions are consistent with those of the Rx-based test. Thank you very much! |
I guess what I still don't understand in the original example is why anything beyond |
Ugh... That's not easy to explain. There's a lot of moving pieces involved in this machinery. Internally |
Found another issue when testing @Test
fun test() = runBlockingTest {
val testDispatcher = TestCoroutineDispatcher()
val channel1 = Channel<Int>()
val channel2 = Channel<String>()
val flow1 = channel1.consumeAsFlow()
val flow2 = channel2.consumeAsFlow()
val values = mutableListOf<Pair<Int, String>>()
launch {
combine(flow1, flow2) { a: Int, b: String -> a to b }
.flowOn(testDispatcher)
.collect { values += it }
}
channel1.send(1)
channel2.send("a")
channel1.send(2)
channel2.send("b")
channel1.close()
channel2.close()
assertThat(values).containsExactly(
1 to "a",
2 to "a",
2 to "b"
)
} The test ends up failing with the following exception: java.lang.IllegalStateException: This job has not completed yet
at kotlinx.coroutines.JobSupport.getCompletionExceptionOrNull(JobSupport.kt:1189)
at kotlinx.coroutines.test.TestBuildersKt.runBlockingTest(TestBuilders.kt:53)
at kotlinx.coroutines.test.TestBuildersKt.runBlockingTest$default(TestBuilders.kt:45) This only occurs when |
@elizarov It'd be really great to have this. Our codebase uses both |
With the latest release, the test from the initial submission outputs just
I didn't bother to check what exactly changed since then. In any case, diving into the code, the reason for not having to So, I see three possible solutions here:
In my opinion, a properly unconfined dispatcher does have its merits, as it allows one to test only the functionality, forgetting about the parallelism, but it should probably be marked as such to avoid the false sense of security when parallelism does matter. |
Defines two test dispatchers: * StandardTestDispatcher, which, combined with runTest, gives an illusion of an event loop; * UnconfinedTestDispatcher, which is like Dispatchers.Unconfined, but skips delays. By default, StandardTestDispatcher is used due to the somewhat chaotic execution order of Dispatchers.Unconfined. TestCoroutineDispatcher is deprecated. Fixes #1626 Fixes #1742 Fixes #2082 Fixes #2102 Fixes #2405 Fixes #2462
Defines two test dispatchers: * StandardTestDispatcher, which, combined with runTest, gives an illusion of an event loop; * UnconfinedTestDispatcher, which is like Dispatchers.Unconfined, but skips delays. By default, StandardTestDispatcher is used due to the somewhat chaotic execution order of Dispatchers.Unconfined. TestCoroutineDispatcher is deprecated. Fixes #1626 Fixes #1742 Fixes #2082 Fixes #2102 Fixes #2405 Fixes #2462
Defines two test dispatchers: * StandardTestDispatcher, which, combined with runTest, gives an illusion of an event loop; * UnconfinedTestDispatcher, which is like Dispatchers.Unconfined, but skips delays. By default, StandardTestDispatcher is used due to the somewhat chaotic execution order of Dispatchers.Unconfined. TestCoroutineDispatcher is deprecated. Fixes #1626 Fixes #1742 Fixes #2082 Fixes #2102 Fixes #2405 Fixes #2462
Defines two test dispatchers: * StandardTestDispatcher, which, combined with runTest, gives an illusion of an event loop; * UnconfinedTestDispatcher, which is like Dispatchers.Unconfined, but skips delays. By default, StandardTestDispatcher is used due to the somewhat chaotic execution order of Dispatchers.Unconfined. TestCoroutineDispatcher is deprecated. Fixes #1626 Fixes #1742 Fixes #2082 Fixes #2102 Fixes #2405 Fixes #2462
This commit introduces the new version of the test module. Please see README.md and MIGRATION.md for a thorough discussion of the changes. Fixes Kotlin#1203 Fixes Kotlin#1609 Fixes Kotlin#2379 Fixes Kotlin#1749 Fixes Kotlin#1204 Fixes Kotlin#1390 Fixes Kotlin#1222 Fixes Kotlin#1395 Fixes Kotlin#1881 Fixes Kotlin#1910 Fixes Kotlin#1772 Fixes Kotlin#1626 Fixes Kotlin#1742 Fixes Kotlin#2082 Fixes Kotlin#2102 Fixes Kotlin#2405 Fixes Kotlin#2462 Co-authored-by: Vsevolod Tolstopyatov <[email protected]>
This commit introduces the new version of the test module. Please see README.md and MIGRATION.md for a thorough discussion of the changes. Fixes Kotlin#1203 Fixes Kotlin#1609 Fixes Kotlin#2379 Fixes Kotlin#1749 Fixes Kotlin#1204 Fixes Kotlin#1390 Fixes Kotlin#1222 Fixes Kotlin#1395 Fixes Kotlin#1881 Fixes Kotlin#1910 Fixes Kotlin#1772 Fixes Kotlin#1626 Fixes Kotlin#1742 Fixes Kotlin#2082 Fixes Kotlin#2102 Fixes Kotlin#2405 Fixes Kotlin#2462 Co-authored-by: Vsevolod Tolstopyatov <[email protected]>
I am looking to migrate some existing RxJava-based code to Coroutines but I ran into some test failures that touched some code that uses
Observable.combineLatest
. My tests originally usePublishSubjects
from RxJava to mock the underlying streams within thecombineLatest
. However, when I switch toFlow
streams in thecombine
and then update the tests to useChannels
instead ofPublishSubjects
, my tests fail because the expected number of emissions are not received.I have come up with the following reduced repro (uses
MutableStateFlows
instead ofChannels
):Result:
And then:
Result:
Is this expected behavior? The only way I can seem to get the second test to produce the same emissions as the first test is to call
yield()
after every singlevalue
assignment.The text was updated successfully, but these errors were encountered: