Skip to content

Testing all emissions in StateFlow without losing any #3939

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
dkhalanskyjb opened this issue Nov 13, 2023 · 3 comments
Open

Testing all emissions in StateFlow without losing any #3939

dkhalanskyjb opened this issue Nov 13, 2023 · 3 comments
Labels
docs KDoc and API reference test

Comments

@dkhalanskyjb
Copy link
Collaborator

Problem statement

A very common pain point when using our testing framework is that StateFlow conflates values emitted in quick succession.

Example (all examples are written without Turbine, DI frameworks, or other niceties to show the most universal form that doesn't rely on any technologies but the ones available in our library):

fun testStateFlow() = runTest {
    val flow = MutableStateFlow(0)
    // try to observe values [0; 10] in `flow`
    var i = 0
    val collector = launch {
        flow.collect {
            check(i == it) { "Expected $i but got $it" }
            ++i
            if (i == 10) this@launch.cancel()
        }
    }
    // send values [1; 10] to `flow`
    repeat(10) {
        flow.value = it + 1
    }
    // wait for the collector to check the values that were sent
    collector.join()
}

This test will fail with Expected 0 but got 10.

The reason is that there is no buffer inside StateFlow that stores the values that were emitted. Instead, only the latest value gets stored. runTest uses only a single thread, so while repeat(10) is running, the collector is waiting for its turn before it can start collecting values. The first moment when it gets its turn is collector.join(). Only then does the collector finally have a chance to check which values were sent—but at that point, StateFlow contains 10.

This issue was reported to us many times: #3120, #3367, #3339, #3519, #3143, https://slack-chats.kotlinlang.org/t/16039814/hey-all-wave-i-have-been-trying-to-test-the-intermediate-sta#9c1503b6-6da5-4e21-9aef-8b03c0dc1dbc, https://slack-chats.kotlinlang.org/t/16056041/is-there-any-interest-in-calling-out-that-stateflow-instance#2878e145-9e65-4095-9112-1da730105076, https://www.billjings.com/posts/title/testing-stateflow-is-annoying/?up= ...

Affected code

The typical use case we get approached with is similar to the following.

System under test is:

interface NetworkCaller {
  suspend fun getPage(uri: String): String
}

class AboutPageContents(
  private val scope: CoroutineScope,
  private val networkCaller: NetworkCaller,
) {
  val _stateFlow = MutableStateFlow<State>(State.EMPTY)
  val state = _stateFlow as StateFlow<State>

  fun startLoading() = scope.launch {
    _stateFlow.value = State.LOADING
    val contents = networkCaller.getPage("https://neocities.org/about")
    _stateFlow.value = State.HAS_CONTENTS(contents)
  }

  sealed interface State {
    object EMPTY: State;
    object LOADING: State;
    data class HAS_CONTENTS(val contents: String): State
  }
}

The test:

@Test
fun testAboutPageContentsLoading() = runTest {
    val fakeNetworkCaller = object: NetworkCaller {
        override suspend fun getPage(uri: String): String = "<html></html>"
    }
    val pageContents = AboutPageContents(this, fakeNetworkCaller)
    val collector = launch {
        var expectedState: AboutPageContents.State = AboutPageContents.State.EMPTY
        pageContents.state.collect {
            when (val currentState = expectedState) {
                AboutPageContents.State.EMPTY -> {
                    check(it === AboutPageContents.State.EMPTY) { "Expected EMPTY, got $it" }
                    expectedState = AboutPageContents.State.LOADING
                }
                AboutPageContents.State.LOADING -> {
                    check(it === AboutPageContents.State.LOADING) { "Expected LOADING, got $it" }
                    expectedState = AboutPageContents.State.HAS_CONTENTS("<html></html>")
                }
                is AboutPageContents.State.HAS_CONTENTS -> {
                    check(it is AboutPageContents.State.HAS_CONTENTS) { "Expected HAS_CONTENTS, got $it" }
                    check(it.contents == currentState.contents)
                    this@launch.cancel()
                }
            }
        }
    }
    pageContents.startLoading()
    collector.join()
}

The test tries to observe every step of the process of loading a web page: initially, it's empty; after calling startLoading, it's supposed to enter the LOADING state, and finally, after the page has loaded, we get HAS_CONTENTS. However, this test fails with Expected LOADING, got HAS_CONTENTS(contents=<html></html>): due to conflation, startLoading went from LOADING to HAS_CONTENTS before collect had a chance to process the LOADING state.

Fix

The problem with the above code is here:

    val fakeNetworkCaller = object: NetworkCaller {
        override suspend fun getPage(uri: String): String = "<html></html>"
    }

This should be replaced with

    val fakeNetworkCaller = object: NetworkCaller {
        override suspend fun getPage(uri: String): String {
            delay(10.milliseconds)
            return "<html></html>"
        }
    }

The reasoning is as follows:

  • The test will test exactly what you tell it to test. If the fake network call doesn't have a delay in its implementation, it's happening instantly. If it really happened instantly in real life, the user wouldn't be able to see the spinner or any other visual indication of the LOADING state: the state would get conflated, and we'd observe going from EMPTY to HAS_CONTENTS immediately, which is exactly what the test correctly shows. By inserting delay, we clarify that we want to emulate the case when the network call took a noticeably long time.
  • In fact, it may be worth testing the behavior of the system in both scenarios: how will it react if the network call is slow (and the spinner is visible), and how will it react if the network call is surprisingly fast (and the UI only sees the EMPTY and HAS_CONTENTS emissions in production).
  • If delay(25.milliseconds) executes on a test dispatcher and not on actual Dispatchers.IO, it will be instantaneous. So, this delay call has no impact on the performance of the tests.

Alternative fixes (not recommended)

These are the alternative ways to work around this issue, with explanations of why we don't recommend any of them. Some of these fixes we explicitly recommended before, but now that our understanding of the use case has deepened, we can confidently advise against them.

UnconfinedTestDispatchers

We can work around the issue if we replace the line

    val collector = launch {

with

    val collector = launch(UnconfinedTestDispatcher(testScheduler)) {

Being a test-framework-specific equivalent to Dispatchers.Unconfined, UnconfinedTestDispatcher this will ensure that each time an action to be executed on that dispatcher is enqueued (like a new value being available on the flow that's being collected), that action happens immediately.

This approach works. However, it's unintuitive and requires a deep dive into coroutines to understand it, but also a bit brittle: if assignments to the StateFlow also happen from an unconfined dispatcher, this behavior will not be guaranteed. This is a realistic concern:

  • Many tests are written like runTest(UnconfinedTestDispatcher()) to ensure all launch calls are entered immediately.
  • The Android-specific ViewModelScope uses Dispatchers.Main.immediate, which behaves like Dispatchers.Unconfined when used from the main thread. So, any test for a ViewModelScope is susceptible to this.

See also #3760

A custom CoroutineDispatcher

One can tweak the UnconfinedTestDispatcher approach by defining their own coroutine dispatcher to deal with this case robustly:

object DirectCoroutineDispatcher: CoroutineDispatcher() {
    override fun dispatch(context: CoroutineContext, block: Runnable) { block.run() }
}

Then,

    val collector = launch(DirectCoroutineDispatcher) {

will ensure the collection is happening immediately, even if runTest as a whole runs in UnconfinedTestDispatcher.

This is clearly unreadable to anyone not well-versed in coroutines, and it's giving up the delay-skipping behavior: if collector used a delay inside, it would be performed with the real-time clock, making the test take longer. In addition, and most importantly, none of the dispatchers in real life behave like this, so any test written with DirectCoroutineDispatcher is at risk of testing the behavior that can't actually happen in production, and what even is the point of testing the behavior you can never observe?

yield()

A common approach is to add a yield() after each assignment to make sure the test thread gets the chance to process it. In general, yield should not be used to ensure a specific execution order, as it will only allow the code to run until the next suspension, and the number of suspensions a function will go through during its execution is an implementation detail that may change. If one goes down the yield() path, they may end up with strange constructions like yield(); yield(); yield() // ensure everyone processed the change, which demonstrates the intent much worse than delay(25.milliseconds) // emulate a long network call does.

Semantically, yield is supposed to mean, "let this thread do some other work," without defining exactly which work and how much of it is being done. The behavior of yield is predictable, but tracing through the interleavings of coroutines manually is too much work when reading a simple test.

runCurrent() (/advanceUntilIdle())

Instead of yield, one can use runCurrent(). It's slightly better, as it doesn't rely on an exact number of context switches to perform: depending on your needs, it can work as yield(), or yield(); yield(); yield(): all the work that's scheduled to execute at the current moment will finish by the end of the call.

There are also downsides to runCurrent(). Notably, for this use case, it's a blocking call that can't be cancelled. See #3919 for more details and a discussion.

What should we do?

We should update the documentation with a clear explanation of what to do in this typical scenario. For now, we'll keep this issue open for some time to collect feedback on the proposed approach.

@kevincianfarini
Copy link
Contributor

kevincianfarini commented Nov 13, 2023

Thanks for filing this @dkhalanskyjb. This is a common pain point, and it is technically already called out in the docs by specifying that StateFlow is conflated. I think there's two pain points here:

  1. The documentation snippet is pretty small and is very easy to gloss over. One can read the following snippet without realizing that conflated flows and channels require special testing care compared to "normal" flows which will emit every intermediate element.

Updates to the value are always conflated. So a slow collector skips fast updates, but always collects the most recently emitted value.

Perhaps it's not right to articulate the special testing care that's needed in the StateFlow documentation; I think it's better suited to the conflated documentation. This isn't exclusive to just StateFlow. However, since StateFlow is commonly exposed in API surfaces, perhaps it warrants calling out that unit testing requires special care.

Updates to the value are always conflated. So a slow collector skips fast updates, but always collects the most recently emitted value. Conflated channels and flows require special care under test, which is called out in the conflated documentation.

  1. The second issue is not exclusive to just conflated behavior. StateFlows also effectively have a distinctUntilChanged applied to them. This exposes the scenario where these two examples would behave differently.
val flow1 = flow {
  emit(1)
  emit(2)
  emit(1)
}.conflate()

val flow2 = MutableStateFlow(1).apply {
  value = 2
  value = 1
}

Under the right circumstances, flow1 could potentially emit the values 1, 1 while flow2 would only ever emit a single 1 value. The second flow's behavior more closely resembles

val flow2Alike = flow {
  emit(1)
  emit(2)
  emit(1)
}.conflate().distinctUntilChanged()

This behavior can be confusing and frustrating to test when combined and I think warrants documentation. What do you think?

@jingibus
Copy link

Thanks so much for the issue.

Our preferred fix at Cash App is none of the above: we unit test on APIs that expose Flow, and run in production with APIs that expose StateFlow. The delay fix described above is not sufficient in our view, as it requires prior knowledge of the error before it happens. By using Flow instead of StateFlow, we are able to guard ourselves against spurious emissions that may be inadvertently introduced in the future.

I agree with @kevincianfarini that readability in documentation would help. JetBrains documents are rarely lacking in technical accuracy; however, they frequently leave much to be desired in clarity. We can see that this is true because many readers report that they have read the documentation and believed they understood it, and yet were surprised when they encountered the actual behavior in practice that the documentation described. If you know the facts you are looking for in advance, you can always find the information in JetBrains docs; if you do not, however, the relevant information may not be apparent to you.

@dkhalanskyjb
Copy link
Collaborator Author

conflated flows and channels require special testing care

It would be like a BigInteger library saying, "please carefully ensure that division by zero doesn't happen in your tests." Technically, yes, you should make sure your tests don't divide by zero—but it's much more important that your production doesn't divide by zero, and if you recognize division by zero as a possibility, then checking for that in tests follows naturally.

And conflation can easily occur in production as well: just take any single-threaded dispatcher.

val flow = MutableStateFlow<Int>(0)
println(withContext(Dispatchers.Main) {
  val result = mutableListOf<Int>()
  launch {
    flow.collect {
      if (it > 10) { cancel() } else { result.add(it) }
    }
  }
  repeat(100) {
    flow.value = it
  }
  result
})

If someone's tests fail because they don't understand that conflation will happen, it's good: when it happens in tests, it's much less frustrating than in production.

StateFlows also effectively have a distinctUntilChanged applied to them.

Yes. From the docs of MutableStateFlow:

"all emission-related operators, such as value's setter, emit, and tryEmit, are conflated using Any.equals."

From the docs of StateFlow:

"Values in state flow are conflated using Any.equals comparison in a similar way to distinctUntilChanged operator. It is used to conflate incoming updates to value in MutableStateFlow and to suppress emission of the values to collectors when new value is equal to the previously emitted one. State flow behavior with classes that violate the contract for Any.equals is unspecified."

This behavior can be confusing and frustrating to test when combined and I think warrants documentation. What do you think?

There's a delicate balance between too little and too much documentation. We shouldn't infantilize our users: I'm pretty sure they are capable of reading when they have to. If we add extra examples to every sentence someone somewhere misreads, the documentation will become a monstrosity that's a pain to browse through. The StateFlow documentation is quite long as it is, and adding unnecessary text will make it even harder to find one's way around it.

I'm certainly not saying we shouldn't update the documentation, but we must be careful to make sure the edit is actually useful.

Maybe someone can help me enter the mindset of someone who doesn't understand the issue of StateFlow conflation in tests. Here's how I imagine the happy path: so, here I am, writing tests. Suddenly, a test fails: I see that not all emissions from StateFlow get processed. I'm surprised: hey, aren't flows guaranteed to deliver values to their subscribers? I resort to documentation. I don't remember such issues with just Flow, so I read StateFlow documentation (or MutableStateFlow, which recommends reading StateFlow documentation). In any case, I see something about "conflation"—maybe I see the phrase "So a slow collector skips fast updates, but always collects the most recently emitted value." In any case, I have plenty of pointers to learn what conflation is, as every mention of conflation links to the documentation of conflate() that explains it thoroughly.

So, where does one go off the happy path and need some railing?

The delay fix described above is not sufficient in our view, as it requires prior knowledge of the error before it happens.

You're touching on a deep concept here.

First, I'd like to argue that whether or not it's sufficient to test using a delay (or any other trick to ensure that there's a controllable suspension between one emission and the other), it's necessary, even if we forget about conflation. For example, if we slightly tweak the definition of HAS_CONTENTS, then it's trivial for an overzealous Kotlin enthusiast to introduce a bug to the example code above:

  fun startLoading() {
    _stateFlow.value = State.LOADING
    _stateFlow.value = State.HAS_CONTENTS().apply {
      scope.launch {
        contents = networkCaller.getPage("https://neocities.org/about")
      }
    }
  }

A test that collects all emissions will say that everything's completely fine here, even though HAS_CONTENTS has incorrectly become available before the contents were acquired. If there's no suspension in getPage, then from the point of view of single-threaded dispatchers like the test dispatcher, getPage and the assignment of HAS_CONTENTS have happened simultaneously, even though they must realistically be ordered in a specific way.

If we remember about conflation, then there's another argument for having a test with delay: the test that collects all emissions won't check what happens when conflation does occur. If some code relied explicitly on LOADING always happening, it would pass every test that checked all emissions, but it would crash once in a blue moon in production when the GC took too long to do its thing and HAS_CONTENTS conflated LOADING during the UI hiccup.

Whether it is useful to go the extra mile to collect all suspensions to guard against spurious emissions is the second question. I don't understand why this would be useful.

Let's consider two scenarios:

  1. Collection and emissions happen on the same thread. In this case, if conflation happens in the tests and you accurately place suspensions everywhere where they could also happen realistically in production, you won't have any issues in production, as conflation will happen in the same places.
  2. Emissions happen in one thread, and collection happens in another, and even though there are no suspensions between the emissions (or the emissions come from several threads), the collector may observe the intermediate not-yet-conflated values due to race conditions. That's a fair point. But if you're worried about race conditions, writing deterministic single-threaded tests won't help you much. You'd be better off recreating the scenario you're worried about as a multithreaded stress test to ensure that during thousands of executions, the test checks the cases when all emissions are conflated, or none are, or something in between.

JetBrains documents

I thought about this feedback hard, and I don't think I can do anything with it other than ignore it.

JetBrains produces many documents, from https://www.jetbrains.com/help/youtrack/server/Issues.html to https://www.jetbrains.com/help/kotlin-multiplatform-dev/multiplatform-setup.html to https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-test/kotlinx.coroutines.test/run-test.html. It's all written, edited and reviewed by different teams, and there's no internal guideline for how clear or unclear the texts should be.

In the Kotlin libraries team, clarity is an explicit goal. We often don't fully understand in advance how our code will be used, so it's impossible for us to always hit the mark, but we aspire to. We often incorporate suggestions from our users about how the documentation can be written more clearly for them to find what they seek, or sometimes we do so proactively when we notice a common misunderstanding. The reason we didn't update the StateFlow documentation is that we never noticed the people struggling with understanding that conflation even happens; their question mostly was what to do about it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
docs KDoc and API reference test
Projects
None yet
Development

No branches or pull requests

3 participants