Skip to content

CoroutineDispatchers-compatible ContinuationInterceptor composition #2478

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
yorickhenning opened this issue Jan 14, 2021 · 3 comments
Closed
Labels

Comments

@yorickhenning
Copy link
Contributor

I'm integrating coroutines with an existing Java/JVM ThreadLocal-centric function tracing system.

When a plain Java stack frame launches a coroutine, it needs to get its trace from the caller. Passing it as part of the context is natural:

injectedScope.launch(
  getTrace() // CoroutineContext.Element.
) {

}

... And with a ThreadContextElement, the CoroutineScope saves and restores ThreadLocal so things are transparent to Java code the coroutine calls.

Manually passing the trace at each coroutine launch is more error-prone than I'd like. Given codebases with thousands of launch sites, trace handoff can get forgotten. I can use runtime checks and static analysis, but coroutines should make it possible to intercept resumptions and make tracing transparent rather than manual.

Omitting some frames, the call stack for a dispatched coroutine at start time is:

dispatch:93, ExecutorCoroutineDispatcherBase (kotlinx.coroutines)
resumeWith:184, DispatchedContinuation (kotlinx.coroutines.internal)
resumeCancellableWith:266, DispatchedContinuationKt (kotlinx.coroutines.internal)
startCoroutineCancellable:30, CancellableKt (kotlinx.coroutines.intrinsics)
invoke:109, CoroutineStart (kotlinx.coroutines)
start:158, AbstractCoroutine (kotlinx.coroutines)
launch:1, BuildersKt (kotlinx.coroutines)
fancyClientFunction:99, SomeTest (com.google.stuff)

I'd like to be able to write a composed interceptor to create the context for resumptions, only without reimplementing the whole dispatcher stack. So, something like:

dispatch:93, ExecutorCoroutineDispatcherBase (kotlinx.coroutines)
resumeWith:184, DispatchedContinuation (kotlinx.coroutines.internal)
resumeCancellableWith:266, DispatchedContinuationKt (kotlinx.coroutines.internal)
***resumeWith:10, UserDefinedContinuation (com.google.stuff)***
startCoroutineCancellable:30, CancellableKt (kotlinx.coroutines.intrinsics)
invoke:109, CoroutineStart (kotlinx.coroutines)
start:158, AbstractCoroutine (kotlinx.coroutines)
launch:1, BuildersKt (kotlinx.coroutines)
fancyClientFunction:99, SomeTest (com.google.stuff)

This is how I thought ContinuationInterceptor might work when I first read it, but CoroutineDispatcher inherits from ContinuationInterceptor and occupies its CoroutineContext.Key. Composition is doubly-incompatible with dispatchers, because CoroutineDispatcher.interceptContinuation() is a final method returning a library-internal class instance, and optimizations care about what the type of the continuation is.

It'd be nice if user-defined interceptors could compose with the CoroutineDispatcher interceptions, allowing transparent modification of coroutine context with dynamic (albeit static/threadlocal) state.

It'd be even nicer if continuation interceptors could modify the CoroutineContext after the continuation executes, so they could maintain state without locking. I've debugged intercept a bit, and I think the lock-free resumption/suspension in the JVM dispatch stack and (maybe?) reusable continuations have led to resumeWith executing concurrently for the same coroutine and its Continuation object. The resumeWith() stack frame can still be unwinding on one thread, while the continuation has been redispatched and the same object's resumeWith() has begun execution on another thread. That makes state handoff hard without locking.

I can make things work having each call site pass its trace to the CoroutineContext, but a properly transparent API would have been better. It's so close. I'd only have to reimplement most of kotlinx-coroutines-core/jvm. :)

@yorickhenning
Copy link
Contributor Author

Naturally, we solve this the day after I upstream a bug. :)

I got nice and confused trying to carry state in the CoroutineContext. It's possible and simpler to keep it in the Continuation. The tricky part is threadsafety.

Here's a pattern for a delegating interceptor which can wrap a stock CoroutineDispatcher:

class DelegatingInterceptor(
  private val delegateInterceptor: ContinuationInterceptor // The CoroutineDispatcher, say.
) :
  AbstractCoroutineContextElement(ContinuationInterceptor.Key), ContinuationInterceptor {

  override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> {
    return delegateInterceptor.interceptContinuation(
      CustomContinuation(continuation)
    )
  }
}

private class CustomContinuation<T> constructor(
  val delegateContinuation: Continuation<T>
) : Continuation<T> {
  override val context: CoroutineContext = continuation.context
  private var state: SomeMutableState? = null

  override fun resumeWith(result: Result<T>) {
    StateManager.setUp(
      state, // StateManager reapplies state, happening-before `resumeWith`.
      { state = it }, // Executed when the threadlocal state changes, happens-beforeing any yield.
    ).use { // Cleanup via closeable must be safe to run concurrent with/after resumption
            // of `this` on a different thread. Otherwise, mutex at a steep performance cost.
      continuation.resumeWith(result)
    }
  }
}

// DI scope creation.
fun provideCoroutineScope(dispatcher: CoroutineDispatcher): CoroutineScope =
  CoroutineScope(
    DelegatingInterceptor(
      dispatcher
    )
  )

The obvious-seeming solution, using a try-finally, is subtly very broken unless it acquires a mutex for the duration of resumeWith.

This pattern will execute correctly >99% of the time, but it's not fully correct if dispatch is multithreaded.

private class BrokenContinuation<T> constructor(
  val delegateContinuation: Continuation<T>
) : Continuation<T> {
  override val context: CoroutineContext = continuation.context
  private var state: SomeMutableState? = null

  override fun resumeWith(result: Result<T>) {
    state = SomeThreadState.get()
    try {
      continuation.resumeWith(result)
    } finally {
      state = SomeThreadState.get()
    }
  }
}

By the time the inner continuation's resumeWith() has finished, the continuation might've already been made eligible for dispatch and started executing on a different thread. Presto, 1/10,000 race condition, flaky tests, and production heisenbugs.

Something that gives me pause is that I think the same threadsafety bug applies to ThreadContextElement subclasses. If they write to a mutable member variable in restoreThreadContext() in order to "pass" state to the next thread resumption by reading the member variable in updateThreadContext(), it isn't threadsafe.

The documentation isn't clear that resumeWith() needs to be threadsafe and that it needn't execute in isolation after the expression continuation.resumeWith() starts. If an interceptor tracks state to save and restore, the implementation must make sure that the continuation's mutable state is assigned before the coroutine could suspend/yield. Since the yield point is inside it and any delegate's continuation's resumeWith(), happens-before (or equivalent non-JVM semantic) is non-obvious to set up.

We're solving the problem at hand with a setter callback to assign the state kept in the Continuation as a side-effect of assigning the state to the threadlocal, creating the necessary happens-before between assignment and any point at which the coroutine might suspend. It works, but it took us some time to figure this one out.

I guess I'd like to see more documentation about the threadsafety requirements of classes and functions that library users to override. Documented threadsafety constraints would've helped us arrive at a fully-correct design earlier. ContinuationInterceptor has no documentation about resumeWith()'s threadsafety. A reasonable reading is that resumeWith needn't be threadsafe because a coroutine framework would provide statement order consistency across all resumptions of the one Continuation object, but that's not how the (JVM?) implementation works.

@elizarov
Copy link
Contributor

We planned to support "delegating interceptors" in the initial coroutines design. That was actually the original plan to support thread-locals. However, we had found that is it more painful than it is worth since those combinations become fragile when you start adding many of them, so we've ended up with a different design that you can now see in ThreadContextElement. You can read the original discussion here: #119 (comment)

Now, it should be possible to use ThreadContextElement for your use-case, but keeping a mutable state in the coroutine context is indeed an extremely tricky endeavor. One problem you've already encountered. But there's also another one.

Coroutines are phantom (see Phantom of the Coroutine) and many complex primitives launch coroutines under the hood, expecting that the caller does not care about it. If you add any kind of a context element that contains a mutable state, then you'll also face a problem that any code performing any kind of concurrency will inherit this context element and will start updating it from concurrent coroutines.

In fact, I would suggest reassessing the design of tracing integration with coroutines. It is Ok to use coroutine context to keep some immutable data (like some kind of "correlation id" for tracing distribute operations, which would be perfectly and correctly inherited by children coroutines), but any kind of mutable state must be managed explicitly and separately, with purposefully-built DSL like: trace { .... }, that you'd put around blocks of code that need to trace their entry and exit points.

@yorickhenning
Copy link
Contributor Author

We planned to support "delegating interceptors" in the initial coroutines design. That was actually the original plan to support thread-locals. However, we had found that is it more painful than it is worth since those combinations become fragile when you start adding many of them, so we've ended up with a different design that you can now see in ThreadContextElement. You can read the original discussion here: #119 (comment)

They're fragile if they're configured dynamically. If a program sets up its CoroutineContexts/CoroutineScopes once and everything in the program uses those contexts, though, even sequences of interceptors are fine.

The previous discussion seems to focus on ad-hoc context creation. In that world, I agree that interceptors will do really unexpected stuff.

Interceptors are a great fit for dependency injection and big programs using framework execution contexts, though. ContinuationInterceptor is clunky, but I've found it's still the best option for hooking a framework into coroutine execution. There are APIs exactly like ContinuationInterceptor (or equivalent delegation idioms) for Executors, RPC dispatch/stubs, logging APIs...

We're using dependency injection to provide contexts, and statically checking that no code sneaks another dispatcher in. So, we have certainty that interceptors won't go missing mid-coroutine-stack, making them more useful.

Three things made arriving at a working ContinuationInterceptor difficult:

  1. CoroutineDispatcher inherits from ContinuationInterceptor, so installing an interceptor overrides dispatch instead of intercepting it
  2. There's no framework-supported way to set up an interceptor sequence, and the delegation idiom I've described above has to deal with the subtle threading bugs stemming from atomic-powered continuation dispatch; Delay is its own complication
  3. Interceptors/continuations need to bring their own thread safety and isolation, but KDoc on the relevant interfaces isn't clear about what execution isolation exists

In fact, I would suggest reassessing the design of tracing integration with coroutines. It is Ok to use coroutine context to keep some immutable data (like some kind of "correlation id" for tracing distribute operations, which would be perfectly and correctly inherited by children coroutines), but any kind of mutable state must be managed explicitly and separately, with purposefully-built DSL like: trace { .... }, that you'd put around blocks of code that need to trace their entry and exit points.

I wrote a coroutine-specific API first. It's simpler than a DSL, just a couple of higher-order coroutine builder functions summing a trace into the CoroutineContext.

Unfortunately, we're expanding support for an existing, widely deployed API made up of plain Java functions. That API can (and so will) be called from within coroutines. If coroutines had to call specific API functions, and malfunctioned when they called existing API functions, we'd have bugs.

Static analysis checks to figure out if a non-suspend function invocation is actually in a suspend block so should be some other call instead have to wander around the AST a fair bit. I've written checks like that, but I'm pretty sure they're heuristic, and there's only so many O(n)-or-thereabouts-with-respect-to-AST-depth checks we can run in the compilation toolchain. APIs that don't need a static analysis check tend to be more popular, anyway.

So, we used Continuations to make existing plain function calls work for both coroutine and non-coroutine callers in order to keep a lid on defect rates. That's why we've gone deep on CoroutineInterceptor, and so why I think that API is valuable and might warrant a revisit.

Coroutines are phantom (see Phantom of the Coroutine) and many complex primitives launch coroutines under the hood, expecting that the caller does not care about it. If you add any kind of a context element that contains a mutable state, then you'll also face a problem that any code performing any kind of concurrency will inherit this context element and will start updating it from concurrent coroutines.

Which is why in the end we're using the same ThreadLocal that Threads respect during coroutine execution, storing a coroutine's trace in the Continuation only across dispatch.

I thought for a while that a CoroutineContext.Element would work, but you're quite right that it can't because the context is shared among coroutines. A Continuation isn't shared among coroutines, though it is shared among Executor threads, which took a while to figure out and deal with.

If a coroutine is undispatched, its threadlocals are inherited by direct invocation from its parent coroutine. Whenever the coroutine suspends and goes to dispatch, unless I've made a(nother) grave error, its current trace will always already have been written into its continuation, ready for restoration at resumeWith() time. This doesn't require any call site modifications, and (importantly) doesn't require each call site to pick the correct flavour of API function to avoid silent defects.

Anyway, I don't have a real feature request here, just a story about using ContinuationInterceptor to do something that otherwise would have required writing a new CoroutineDispatcher.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants