Skip to content

The source channel is not cancelled when cancelling the result of produce(onCompletion = consumes()) #407

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
jcornaz opened this issue Jun 24, 2018 · 7 comments
Labels

Comments

@jcornaz
Copy link
Contributor

jcornaz commented Jun 24, 2018

The following test hangs:

@Test
fun testCancelProduce() = runTest {
    val source = Channel<Int>()

    val produced = produce<Int>(Unconfined, onCompletion = source.consumes()) {
        source.receive()
    }

    produced.cancel()

    assertFails<Exception> { runTest { source.receive() } }
}

I would expect the test to pass.

@qwwdfsad
Copy link
Member

qwwdfsad commented Jun 29, 2018

It should hang.
Documentation to Unconfined states: "It executes initial continuation of the coroutine right here in the current call-frame".

So your code is roughly equivalent to

@Test
fun testCancelProduce() = runTest {
    val source = Channel<Int>()
    source.receive()
}

@jcornaz
Copy link
Contributor Author

jcornaz commented Jun 29, 2018

Thanks @qwwdfsad for your help.

However, I don't understand why it should hang. If Unconfined executes initial continuation of the coroutine right here in the current call-frame, then the call of cancel should make the first source.receive() fails in its current call frame (and it does) as well as executing the onCompletion. Because of the Unconfined, we should be able to assume that when the second source.receive is reached, the produce job cancellation should be done and onCompletion should already be executed.

Consider this new code:

fun main(args: Array<String>) = runBlocking<Unit> {
  val source = Channel<Int>()

  println("start an unconfined produce job")
  val produced = produce<Int>(Unconfined, onCompletion = source.consumes()) {
    println("produce job started")
    try {
      source.receive()
    } catch (t: Throwable) {
      println("job failed") // cancel should make fail [receive] right here, in the current call frame
    }
  }

  println("cancel job")
  produced.cancel()
  println("job cancelled")

  try {
    source.receive()
  } catch (t: Throwable) {
    println("receive from source failed")
  }
}

Because of the Unconfined state, I would expect the following result:

start an unconfined produce job
produce job started
cancel job
job failed
job cancelled
receive from source failed

But I have:

start an unconfined produce job
produce job started
cancel job
job failed
job cancelled
Exception in thread "main" kotlinx.coroutines.experimental.CompletionHandlerException: Exception in completion handler InvokeOnCompletion[InvokeOnCompletion@5fa7e7ff] for ProducerCoroutine{Cancelled}@4ccabbaa
	at kotlinx.coroutines.experimental.JobSupport.notifyCompletion(JobSupport.kt:1009)
	at kotlinx.coroutines.experimental.JobSupport.completeUpdateState(JobSupport.kt:217)
	at kotlinx.coroutines.experimental.JobSupport.updateState(JobSupport.kt:154)
	at kotlinx.coroutines.experimental.JobSupport.makeCompletingInternal(JobSupport.kt:555)
	at kotlinx.coroutines.experimental.JobSupport.makeCompletingOnce$kotlinx_coroutines_core(JobSupport.kt:539)
	at kotlinx.coroutines.experimental.AbstractCoroutine.resume(AbstractCoroutine.kt:115)
	at kotlin.coroutines.experimental.jvm.internal.CoroutineImpl.resumeWithException(CoroutineImpl.kt:47)
	at kotlinx.coroutines.experimental.ResumeModeKt.resumeWithExceptionMode(ResumeMode.kt:64)
	at kotlinx.coroutines.experimental.DispatchedKt.dispatch(Dispatched.kt:192)
	at kotlinx.coroutines.experimental.AbstractContinuation.onCompletionInternal(AbstractContinuation.kt:206)
	at kotlinx.coroutines.experimental.AbstractContinuation.completeStateUpdate(AbstractContinuation.kt:341)
	at kotlinx.coroutines.experimental.AbstractContinuation.updateStateToFinal(AbstractContinuation.kt:324)
	at kotlinx.coroutines.experimental.AbstractContinuation.tryCancel(AbstractContinuation.kt:200)
	at kotlinx.coroutines.experimental.AbstractContinuation.cancel(AbstractContinuation.kt:121)
	at kotlinx.coroutines.experimental.ChildContinuation.invoke(JobSupport.kt:974)
	at kotlinx.coroutines.experimental.JobSupport.notifyCancellation(JobSupport.kt:1024)
	at kotlinx.coroutines.experimental.JobSupport.tryMakeCancelling(JobSupport.kt:514)
	at kotlinx.coroutines.experimental.JobSupport.makeCancelling(JobSupport.kt:490)
	at kotlinx.coroutines.experimental.JobSupport.cancel(JobSupport.kt:462)
	at kotlinx.coroutines.experimental.AbstractCoroutine.cancel(AbstractCoroutine.kt:178)
	at kotlinx.coroutines.experimental.channels.ChannelCoroutine.cancel(ChannelCoroutine.kt:37)
	at kotlinx.coroutines.experimental.channels.ReceiveChannel$DefaultImpls.cancel$default(Channel.kt:206)
	at MainKt$main$1.doResume(Main.kt:22)
	at kotlin.coroutines.experimental.jvm.internal.CoroutineImpl.resume(CoroutineImpl.kt:42)
	at kotlinx.coroutines.experimental.DispatchedTask$DefaultImpls.run(Dispatched.kt:162)
	at kotlinx.coroutines.experimental.DispatchedContinuation.run(Dispatched.kt:26)
	at kotlinx.coroutines.experimental.EventLoopBase.processNextEvent(EventLoop.kt:148)
	at kotlinx.coroutines.experimental.BlockingCoroutine.joinBlocking(Builders.kt:82)
	at kotlinx.coroutines.experimental.BuildersKt__BuildersKt.runBlocking(Builders.kt:58)
	at kotlinx.coroutines.experimental.BuildersKt.runBlocking(Unknown Source)
	at kotlinx.coroutines.experimental.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:48)
	at kotlinx.coroutines.experimental.BuildersKt.runBlocking$default(Unknown Source)
	at MainKt.main(Main.kt:8)
Caused by: java.lang.IllegalStateException: Unexpected update, state: CancelledContinuation[kotlinx.coroutines.experimental.JobCancellationException: Job was cancelled normally; job=ProducerCoroutine{Cancelled}@4ccabbaa], update: CompletedExceptionally[kotlinx.coroutines.experimental.JobCancellationException: Job was cancelled normally; job=ProducerCoroutine{Cancelled}@4ccabbaa]
	at kotlinx.coroutines.experimental.AbstractContinuation.resumeImpl(AbstractContinuation.kt:293)
	at kotlinx.coroutines.experimental.AbstractContinuation.resumeWithException(AbstractContinuation.kt:158)
	at kotlinx.coroutines.experimental.channels.AbstractChannel$ReceiveElement.resumeReceiveClosed(AbstractChannel.kt:852)
	at kotlinx.coroutines.experimental.channels.AbstractSendChannel.close(AbstractChannel.kt:262)
	at kotlinx.coroutines.experimental.channels.AbstractChannel.cancel(AbstractChannel.kt:603)
	at kotlinx.coroutines.experimental.channels.ChannelsKt__Channels_commonKt$consumes$1.invoke(Channels.common.kt:91)
	at kotlinx.coroutines.experimental.channels.ChannelsKt__Channels_commonKt$consumes$1.invoke(Channels.common.kt)
	at kotlinx.coroutines.experimental.InvokeOnCompletion.invoke(JobSupport.kt:893)
	at kotlinx.coroutines.experimental.JobSupport.notifyCompletion(JobSupport.kt:1005)
	... 32 more

@qwwdfsad
Copy link
Member

qwwdfsad commented Jun 29, 2018

Thanks for the clarification, I've misread a problem.

I'll try to comment your code to make a problem more obvious:

val source = Channel<Int>() // You create a standalone channel
val produced = produce<Int>(Unconfined, onCompletion = source.consumes()) { // You create a producer
  source.receive() // producer suspends on source channel
}

produced.cancel() // You cancel producer's channel and producer is cancelled as well. Now no one is trying to receive from source channel
assertFailsWith<Exception> { runTest { source.receive() } } // You are trying to receive from source channel

source channel and producer lifecycles are not bound to each other, so receive hangs.
No one cancels source in your example

@jcornaz
Copy link
Contributor Author

jcornaz commented Jun 29, 2018

source channel and producer lifecycles are not bounded to each other, so receive hangs.

That's the point. My understanding is: Because I used Unconfined and onCompletion = source.consumes(), they should be bounded.

No one cancels source in your example

onCompletion = source.consumes() is supposed to cancel the source.

@jcornaz
Copy link
Contributor Author

jcornaz commented Jun 29, 2018

By the way why does this code produce a CompletionHandlerException?

fun main(args: Array<String>) = runBlocking<Unit> {
  val source = Channel<Int>()

  val produced = produce<Int>(Unconfined, onCompletion = source.consumes()) {
    source.receive()
  }

  produced.cancel()
}

@qwwdfsad qwwdfsad reopened this Jun 29, 2018
@qwwdfsad qwwdfsad added the bug label Jul 2, 2018
@qwwdfsad
Copy link
Member

qwwdfsad commented Jul 2, 2018

Thanks for your report.
I've investigated your problem properly, It's looks like a bug in coroutines+unconfined machinery (or at least it's a poor-defined interaction between unconfined and other parts of coroutines).
Will fix it in the next release

@qwwdfsad
Copy link
Member

qwwdfsad commented Jul 2, 2018

Thank you again for the report, your issue revealed even bigger problem which may cause unexpected non-determinism (and probably heisenbugs with "Exception in completion handler") in programs without Unconfined.

I described it under #415 and will fix it there

@qwwdfsad qwwdfsad closed this as completed Jul 2, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants