-
Notifications
You must be signed in to change notification settings - Fork 1.9k
CoroutineDispatchers-compatible ContinuationInterceptor composition #2478
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
Naturally, we solve this the day after I upstream a bug. :) I got nice and confused trying to carry state in the Here's a pattern for a delegating interceptor which can wrap a stock class DelegatingInterceptor(
private val delegateInterceptor: ContinuationInterceptor // The CoroutineDispatcher, say.
) :
AbstractCoroutineContextElement(ContinuationInterceptor.Key), ContinuationInterceptor {
override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> {
return delegateInterceptor.interceptContinuation(
CustomContinuation(continuation)
)
}
}
private class CustomContinuation<T> constructor(
val delegateContinuation: Continuation<T>
) : Continuation<T> {
override val context: CoroutineContext = continuation.context
private var state: SomeMutableState? = null
override fun resumeWith(result: Result<T>) {
StateManager.setUp(
state, // StateManager reapplies state, happening-before `resumeWith`.
{ state = it }, // Executed when the threadlocal state changes, happens-beforeing any yield.
).use { // Cleanup via closeable must be safe to run concurrent with/after resumption
// of `this` on a different thread. Otherwise, mutex at a steep performance cost.
continuation.resumeWith(result)
}
}
}
// DI scope creation.
fun provideCoroutineScope(dispatcher: CoroutineDispatcher): CoroutineScope =
CoroutineScope(
DelegatingInterceptor(
dispatcher
)
) The obvious-seeming solution, using a This pattern will execute correctly >99% of the time, but it's not fully correct if dispatch is multithreaded. private class BrokenContinuation<T> constructor(
val delegateContinuation: Continuation<T>
) : Continuation<T> {
override val context: CoroutineContext = continuation.context
private var state: SomeMutableState? = null
override fun resumeWith(result: Result<T>) {
state = SomeThreadState.get()
try {
continuation.resumeWith(result)
} finally {
state = SomeThreadState.get()
}
}
} By the time the inner continuation's Something that gives me pause is that I think the same threadsafety bug applies to The documentation isn't clear that We're solving the problem at hand with a setter callback to assign the state kept in the I guess I'd like to see more documentation about the threadsafety requirements of classes and functions that library users to override. Documented threadsafety constraints would've helped us arrive at a fully-correct design earlier. |
We planned to support "delegating interceptors" in the initial coroutines design. That was actually the original plan to support thread-locals. However, we had found that is it more painful than it is worth since those combinations become fragile when you start adding many of them, so we've ended up with a different design that you can now see in Now, it should be possible to use Coroutines are phantom (see Phantom of the Coroutine) and many complex primitives launch coroutines under the hood, expecting that the caller does not care about it. If you add any kind of a context element that contains a mutable state, then you'll also face a problem that any code performing any kind of concurrency will inherit this context element and will start updating it from concurrent coroutines. In fact, I would suggest reassessing the design of tracing integration with coroutines. It is Ok to use coroutine context to keep some immutable data (like some kind of "correlation id" for tracing distribute operations, which would be perfectly and correctly inherited by children coroutines), but any kind of mutable state must be managed explicitly and separately, with purposefully-built DSL like: |
They're fragile if they're configured dynamically. If a program sets up its The previous discussion seems to focus on ad-hoc context creation. In that world, I agree that interceptors will do really unexpected stuff. Interceptors are a great fit for dependency injection and big programs using framework execution contexts, though. We're using dependency injection to provide contexts, and statically checking that no code sneaks another dispatcher in. So, we have certainty that interceptors won't go missing mid-coroutine-stack, making them more useful. Three things made arriving at a working
I wrote a coroutine-specific API first. It's simpler than a DSL, just a couple of higher-order coroutine builder functions summing a trace into the Unfortunately, we're expanding support for an existing, widely deployed API made up of plain Java functions. That API can (and so will) be called from within coroutines. If coroutines had to call specific API functions, and malfunctioned when they called existing API functions, we'd have bugs. Static analysis checks to figure out if a non- So, we used
Which is why in the end we're using the same I thought for a while that a If a coroutine is undispatched, its threadlocals are inherited by direct invocation from its parent coroutine. Whenever the coroutine suspends and goes to dispatch, unless I've made a(nother) grave error, its current trace will always already have been written into its continuation, ready for restoration at Anyway, I don't have a real feature request here, just a story about using |
I'm integrating coroutines with an existing Java/JVM
ThreadLocal
-centric function tracing system.When a plain Java stack frame launches a coroutine, it needs to get its trace from the caller. Passing it as part of the context is natural:
injectedScope.launch( getTrace() // CoroutineContext.Element. ) { }
... And with a
ThreadContextElement
, theCoroutineScope
saves and restoresThreadLocal
so things are transparent to Java code the coroutine calls.Manually passing the trace at each coroutine launch is more error-prone than I'd like. Given codebases with thousands of launch sites, trace handoff can get forgotten. I can use runtime checks and static analysis, but coroutines should make it possible to intercept resumptions and make tracing transparent rather than manual.
Omitting some frames, the call stack for a dispatched coroutine at start time is:
I'd like to be able to write a composed interceptor to create the context for resumptions, only without reimplementing the whole dispatcher stack. So, something like:
This is how I thought
ContinuationInterceptor
might work when I first read it, butCoroutineDispatcher
inherits fromContinuationInterceptor
and occupies itsCoroutineContext.Key
. Composition is doubly-incompatible with dispatchers, becauseCoroutineDispatcher.interceptContinuation()
is a final method returning a library-internal
class instance, and optimizations care about what the type of the continuation is.It'd be nice if user-defined interceptors could compose with the
CoroutineDispatcher
interceptions, allowing transparent modification of coroutine context with dynamic (albeit static/threadlocal) state.It'd be even nicer if continuation interceptors could modify the
CoroutineContext
after the continuation executes, so they could maintain state without locking. I've debugged intercept a bit, and I think the lock-free resumption/suspension in the JVM dispatch stack and (maybe?) reusable continuations have led toresumeWith
executing concurrently for the same coroutine and itsContinuation
object. TheresumeWith()
stack frame can still be unwinding on one thread, while the continuation has been redispatched and the same object'sresumeWith()
has begun execution on another thread. That makes state handoff hard without locking.I can make things work having each call site pass its trace to the
CoroutineContext
, but a properly transparent API would have been better. It's so close. I'd only have to reimplement most ofkotlinx-coroutines-core/jvm
. :)The text was updated successfully, but these errors were encountered: