-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Coroutine context propagation for Reactor to coroutines API migration #1377
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
1bf0674
to
6947e0d
Compare
reactive/kotlinx-coroutines-reactor/src/ReactorContextInjector.kt
Outdated
Show resolved
Hide resolved
09fb1b9
to
740a5e6
Compare
reactive/kotlinx-coroutines-reactor/src/ReactorContextInjector.kt
Outdated
Show resolved
Hide resolved
reactive/kotlinx-coroutines-reactive/src/flow/FlowAsPublisher.kt
Outdated
Show resolved
Hide resolved
reactive/kotlinx-coroutines-reactive/src/flow/FlowAsPublisher.kt
Outdated
Show resolved
Hide resolved
* Propagation of the coroutine context of await calls into Mono/Flux builder * Publisher.asFlow propagates coroutine context from `collect` call to the Publisher * Flow.asFlux transform Fixes #284
740a5e6
to
4c069fc
Compare
I've resolved some of the issues, squashed commits, dropped Force-pushed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Integration in produceImpl
is missing
This comment has been minimized.
This comment has been minimized.
fb0d9e3
to
975ed04
Compare
…from onComplete and onError, make context injector R8-friendly, do not use flowOn for empty context
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Look great now.
We've supported the following cases of the coroutine context propagation for Reactor to coroutines API migration:
kotlinx.coroutines.reactive
contains extension functions for (Mono
/Flux
) to suspending function:suspend fun <T> Publisher<T>.await*(): T?
. Coroutine context of theseawait
functions is propagated intoPublisher
's context. Consider the use case:Publisher
toFlow
: suspendingcollect
call propagates it's coroutine context to the source Publisher context.This propagation is implemented the following way:
ContextInjector
service API that contains the only function to inject the given coroutine context into thePublisher
:reactor
module contains the implementation of this interface (ReactorContextInjector
) that enriches Reactor'sMono
/Flux
contexts with values fromcoroutineContext[ReactorContext]
, instances of otherPublisher
types are returned unchanged.Publisher<T>.await*()
functions search for the availableContextInjector
services (ifreactor
module is included -ReactorContextInjector
is available, otherwise no services found) and enrichPublisher
's context if possible.PublisherAsFlow
injects the coroutine context ofcollect
call the same way.Available services are searched once, if
reactor
module is not included - the list of services is empty, so no overhead added.