Skip to content

kotlinx.coroutines.flow.internal.ChildCancelledException: Child of the scoped flow was cancelled #1433

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Thomas-Vos opened this issue Aug 9, 2019 · 5 comments
Assignees
Labels
Milestone

Comments

@Thomas-Vos
Copy link
Contributor

Thomas-Vos commented Aug 9, 2019

Hi, in my app I randomly get a ChildCancellationException. It comes from the flatMapLatest operator (previously named switchMap). I tested it on the JVM with versions 1.3.0-M2, 1.3.0-RC, 1.3.0-RC2. I was able to reproduce it in a test:

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.async
import kotlinx.coroutines.channels.ConflatedBroadcastChannel
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.combine
import kotlinx.coroutines.flow.flatMapLatest
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlin.test.Test

class MyTest {

    @Test
    fun test(): Unit = runBlocking {
        val context = Dispatchers.Default

        val deferred = GlobalScope.async(context) {
            launch(context) {
                observeFlow().collect {}
            }

            while (true) {
                channel.send(channel.value)
            }
        }

        deferred.await()
    }

    val channel = ConflatedBroadcastChannel("")
    val flow = channel.asFlow()

    fun observeFlow() = flow.flatMapLatest {
        val flows = List(2) {
            flow {
                while (true) {
                    emit("")
                }
            }
        }
        combine(flows) { it.asList() }
    }
}
Exception in thread "DefaultDispatcher-worker-5 @coroutine#716" kotlinx.coroutines.flow.internal.ChildCancelledException: Child of the scoped flow was cancelled
	(Coroutine boundary)
	at kotlinx.coroutines.channels.AbstractChannel.registerSelectReceiveOrNull(AbstractChannel.kt:753)
	at kotlinx.coroutines.channels.AbstractChannel.access$registerSelectReceiveOrNull(AbstractChannel.kt:484)
	at kotlinx.coroutines.channels.AbstractChannel$onReceiveOrNull$1.registerSelectClause1(AbstractChannel.kt:732)
	at kotlinx.coroutines.selects.SelectBuilderImpl.invoke(Select.kt:415)
	at kotlinx.coroutines.flow.internal.CombineKt$combineInternal$2.invokeSuspend(Combine.kt:151)
Caused by: kotlinx.coroutines.flow.internal.ChildCancelledException: Child of the scoped flow was cancelled
	at kotlinx.coroutines.flow.internal.ChannelFlowTransformLatest$flowCollect$3$invokeSuspend$$inlined$collect$1.emit(Collect.kt:137)
	at kotlinx.coroutines.flow.FlowKt__ChannelsKt.emitAll(Channels.kt:56)
	at kotlinx.coroutines.flow.FlowKt.emitAll(Unknown Source)
	at kotlinx.coroutines.flow.FlowKt__ChannelsKt$emitAll$1.invokeSuspend(Channels.kt)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
	at kotlinx.coroutines.DispatchedTask.run(Dispatched.kt:241)
	at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:594)
	at kotlinx.coroutines.scheduling.CoroutineScheduler.access$runSafely(CoroutineScheduler.kt:60)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:740)

Of course this test is not my real code, but this appears to reproduce the exception (almost) every time it is run. You may need to run the test again if you do not get the exception. It should happen within a few seconds of running the test.

Is this expected behaviour and/or is there anything I can do to fix this?

There was a discussion about this on Slack with @qwwdfsad: https://kotlinlang.slack.com/archives/C1CFAFJSK/p1563388954310700

@qwwdfsad qwwdfsad self-assigned this Aug 12, 2019
@qwwdfsad qwwdfsad added the bug label Aug 12, 2019
@qwwdfsad
Copy link
Collaborator

Apparently, it is a bug in the underlying implementation mechanism of combine
Will be fixed in 1.3.0, thanks for the repro

@sellmair
Copy link
Member

sellmair commented Aug 20, 2019

@qwwdfsad I see a random
kotlinx.coroutines.flow.internal.AbortFlowException: Flow was aborted, no more elements needed when running something like

return channel.consumeAsFlow()
        .first { capture ->
            if (this(capture)) {
                true
            } else {
                capture.close()
                false
            }
        }

The following code (using the channel directly) gives me another stack trace:

return channel
        .first { capture ->
            if (this(capture)) {
                true
            } else {
                capture.close()
                false
            }
        }
java.util.concurrent.CancellationException: RendezvousChannel was cancelled
        at kotlinx.coroutines.channels.AbstractChannel.cancel(AbstractChannel.kt:624)
        at kotlinx.coroutines.channels.ChannelsKt__Channels_commonKt.cancelConsumed(Channels.common.kt:117)
        at kotlinx.coroutines.channels.ChannelsKt.cancelConsumed(Unknown Source:1)
        at com.roche.greendot.camera.ImageSelectorKt.invoke(ImageSelector.kt:136)
        at com.roche.greendot.camera.ImageSelectorKt$invoke$4.invokeSuspend(Unknown Source:14)

Is this related, or should I create a new ticket (and even try to create a sample for it)?

@qwwdfsad
Copy link
Collaborator

qwwdfsad commented Aug 20, 2019

@sellmair It's better to open a new issue with a reproducer I can run

@psteiger
Copy link

psteiger commented Nov 16, 2019

I still see this on 1.3.2. I'm not sure if I'm doing something wrong, but the use case is very similar to the one described.

The issue happens when the callback inside a channelFlow, inside a flatMapLatest, tries to offer the channel a value after the channelFlow is already cancelled.

Code:

   private val geoQueryData: Flow<HashMap<Key, GeoLocation>> = geoQueryChannel.asFlow().flatMapLatest {
        channelFlow {
            val searchRadiusChannel = searchRadiusInKms.openSubscription()
            val searchRadiusObserver = launch {
                for (searchRadius in searchRadiusChannel) {
                    delay(1000)
                    it.radius = searchRadius
                }
            }

            val map = hashMapOf<Key, GeoLocation>()
            val listener = object : GeoQueryEventListener {
                override fun onKeyEntered(key: String, location: GeoLocation) {
                    map[key] = location
                }

                override fun onKeyExited(key: String) {
                    map.remove(key)
                }

                override fun onKeyMoved(key: String, location: GeoLocation) {
                    map[key] = location
                }

                override fun onGeoQueryReady() {
                    offer(map)
                }

                override fun onGeoQueryError(error: DatabaseError) {
                    cancel(CancellationException("API Error", error.toException()))
                }
            }

            it.addGeoQueryEventListener(listener)

            awaitClose {
                searchRadiusObserver.cancel()
                searchRadiusChannel.cancel()
                it.removeGeoQueryEventListener(listener)
            }
        }
    }.conflate().flowOn(Dispatchers.IO)

    fun DatabaseReference.asChannelFlow(geoLocation: GeoLocation) = channelFlow {
        val listener = this@asChannelFlow.addValueEventListener(
            object : ValueEventListener {
                override fun onDataChange(userSnap: DataSnapshot) {
                    logd("Got snap $userSnap")
                    userSnap.getValue(User::class.java)?.apply {
                            pos = geoLocation.asLatLng()
                        }?.let {
                            logd("Got user, offering $it")
                            offer(it)
                        }
                }

                override fun onCancelled(p0: DatabaseError) {
                    cancel("OnCancelled $p0", p0.toException())
                }
            }
        )

        logd("Got - asChannelFlow")
        awaitClose {
            logd("Got - asChannelFlow closing")
            this@asChannelFlow.removeEventListener(listener)
        }
    }

    val nearbyUsers: Flow<Resource<List<User>>> = geoQueryData.flatMapLatest {
        logd("Got from GeoQuery $it")
        combine(
            it.map { (key, geoLocation) -> db.userRef(key).asChannelFlow(geoLocation) }
        ) { usersArray ->
            logd("Got usersArray $usersArray")
            Resource.Success(usersArray.asList().sortedByDistance(centerLocation))
        }
    }.conflate().flowOn(Dispatchers.IO)

Stacktrace shows the line that caused the crash was:

offer(it)

Inside the Firebase listener, inside the channelFlow.

2019-11-16 19:53:05.890 18263-18263/com.faztudo E/AndroidRuntime: FATAL EXCEPTION: main
    Process: com.faztudo, PID: 18263
    kotlinx.coroutines.flow.internal.ChildCancelledException: Child of the scoped flow was cancelled
        (Coroutine boundary)
        at kotlinx.coroutines.channels.AbstractSendChannel.offer(AbstractChannel.kt:166)
        at kotlinx.coroutines.channels.ChannelCoroutine.offer(Unknown Source:2)
        at com.faztudo.common.data.NearbyUsersFlow$asChannelFlow$1$listener$1.onDataChange(NearbyUsersFlow.kt:96)
        at com.google.firebase.database.core.ValueEventRegistration.fireEvent(com.google.firebase:firebase-database@@19.2.0:75)
        at com.google.firebase.database.core.view.DataEvent.fire(com.google.firebase:firebase-database@@19.2.0:63)
        at com.google.firebase.database.core.view.EventRaiser$1.run(com.google.firebase:firebase-database@@19.2.0:55)
        at android.os.Handler.handleCallback(Handler.java:883)
        at android.os.Handler.dispatchMessage(Handler.java:100)
        at android.os.Looper.loop(Looper.java:214)
        at android.app.ActivityThread.main(ActivityThread.java:7356)
        at java.lang.reflect.Method.invoke(Native Method)
        at com.android.internal.os.RuntimeInit$MethodAndArgsCaller.run(RuntimeInit.java:492)
        at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:930)
     Caused by: kotlinx.coroutines.flow.internal.ChildCancelledException: Child of the scoped flow was cancelled
        at kotlinx.coroutines.flow.internal.ChannelFlowTransformLatest$flowCollect$3$invokeSuspend$$inlined$collect$1.emit(Collect.kt:137)
        at kotlinx.coroutines.flow.FlowKt__ChannelsKt.emitAll(Channels.kt:56)
        at kotlinx.coroutines.flow.FlowKt.emitAll(Unknown Source:1)
        at kotlinx.coroutines.flow.FlowKt__ChannelsKt$emitAll$1.invokeSuspend(Unknown Source:10)
        at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
        at kotlinx.coroutines.DispatchedTask.run(Dispatched.kt:241)
        at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:594)
        at kotlinx.coroutines.scheduling.CoroutineScheduler.access$runSafely(CoroutineScheduler.kt:60)
        at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:740)

Reading #1454, It seems perhaps it is as expected. Using send inside a launch coroutine on the callback seems to fix it. This is quite unexpected to me, though, and it should be mentioned clearly on the channelFlow documentation, which uses an example with offer.

@qwwdfsad
Copy link
Collaborator

offer is for non-suspending context, while send is for suspending ones.
offer is, unfortunately, non-symmetric to send in terms of propagated exceptions (CancellationException from send is usually ignored, while CancellationException from offer in nom-suspending context is not).
We hope to fix it in #974 either with offerOrClosed or changing offer semantics

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants