Skip to content

Define awaitCancel helper function to produce callback-induced streams #289

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
elizarov opened this issue Mar 15, 2018 · 8 comments
Closed

Comments

@elizarov
Copy link
Contributor

The implementation is trivial:

suspend fun awaitCancel(): Nothing = suspendCancellableCoroutine {}

The indented usage is to convert callback-based multi-fire APIs to channels like this:

fun produceXxx(): Channel<Xxx> = 
    // here context is an appropriate context where registerXxxCallback is allowed
    produce(context, capacity = Channel.CONFLATED) { // or Channel.UNLIMITED, -- depends on use-case
        try {
            registerXxxCallback { offer(it) } 
            awaitCancel()
        } finally { 
            unregisterXxxCallback()
        }
    }
@fvasco
Copy link
Contributor

fvasco commented Mar 15, 2018

Why not use the job property (#260)?

fun produceXxx(): ReceiveChannel<Xxx> =
        Channel<Xxx>(Channel.UNLIMITED).apply {
            registerXxxCallback { offer(it) }
            job.invokeOnCompletion { unregisterXxxCallback() }
        }

@elizarov
Copy link
Contributor Author

elizarov commented Mar 15, 2018

@fvasco Thanks. job property can subsume the need for awaitCancel in most (all?) cases. With the job property you should be also able to use it from inside of produce (No! Inside of produce you need to wait)

@fvasco
Copy link
Contributor

fvasco commented Mar 16, 2018

I correct myself, the right way is:

fun produceXxx(): ReceiveChannel<Xxx> =
        Channel<Xxx>(Channel.UNLIMITED).apply {
            val callback = XxxCallback { offer(it) }
            registerXxxCallback(callback)
            job.invokeOnCompletion { unregisterXxxCallback(callback) }
        }

@jcornaz
Copy link
Contributor

jcornaz commented Sep 17, 2018

Is it planned to add this? Or shall we prefer to use invokeOnCompletion of the channel instead (like int @fvasco's example?

Personally, I prefer the code style using awaitCancel i find it more natural, and less error-prone when callback registering/deregistering has to be done in a specific thread (often the case with UI framework).

Here's how one can open a click subscription in Swing using awaitCancel:

fun Component.openMouseClickedSubscription(): ReceiveChannel<MouseEvent> =
    GlobalScope.produce(Dispatchers.Swing, Channel.UNLIMITED) {
      val listener = object : MouseAdapter() {
        override fun mouseClicked(e: MouseEvent) {
          offer(e)
        }
      }

      addMouseListener(listener) // called in EDT

      try {
        awaitCancel()
      } finally {
        removeMouseListener(listener) // called in EDT
      }
    }

And the same example using invokeOnClose instead of awaitCancel :

fun Component.openMouseClickedSubscription(): ReceiveChannel<MouseEvent> {
  val channel = Channel<MouseEvent>(Channel.UNLIMITED)

  val listener = object : MouseAdapter() {
    override fun mouseClicked(e: MouseEvent) {
      channel.offer(e)
    }
  }

  // this is needed if we want `openMouseClickedSubscription` to be callable from any thread
  SwingUtilities.invokeLater {

    addMouseListener(listener) // Has to be called in EDT

    channel.invokeOnClose {       // We cannot make any assumption about in which thread this will be executed

      SwingUtilities.invokeLater {
        removeMouseListener(listener) // Has to be called in EDT
      }
    }
  }

  return channel
}

Of course, it is a matter of personal taste. But I prefer the first one (using awaitCancel).

@Dico200
Copy link

Dico200 commented Dec 6, 2018

awaitCancellation might be a slightly better name.

The only point of invoking awaitCancel is when you want to keep coroutine alive until cancellation, and then run some code.

When the coroutines resumes, an exception will be thrown. So you need to wrap in try/finally block (which is better than invokeOnCompletion because the coroutine is able to resume in the same thread)

Maybe the two should be combined into one function.

@streetsofboston
Copy link

streetsofboston commented Dec 6, 2018

I suggest we'd name it suspendForever :-)

I like @Dico200 suggestion to combine the the two into one function:

suspend fun suspendForever(onCancellation: (Throwable?) -> Unit): Nothing

@LouisCAD
Copy link
Contributor

LouisCAD commented Dec 6, 2018 via email

@qwwdfsad
Copy link
Collaborator

qwwdfsad commented Dec 13, 2019

awaitClose was implemented since 1.3.0.
The actual pattern is even more elegant:

produce(context, capacity = Channel.CONFLATED) { // or Channel.UNLIMITED
    registerXxxCallback { offer(it) } 
    awaitClose { unregisterXxxCallback() }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

7 participants