-
Notifications
You must be signed in to change notification settings - Fork 1.9k
/
Copy pathReactorContext.kt
80 lines (72 loc) · 3.2 KB
/
ReactorContext.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.reactor
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlin.coroutines.*
import kotlinx.coroutines.reactive.*
import reactor.util.context.*
/**
* Wraps Reactor's [Context] into a [CoroutineContext] element for seamless integration between
* Reactor and kotlinx.coroutines.
* [Context.asCoroutineContext] puts Reactor's [Context] elements into a [CoroutineContext],
* which can be used to propagate the information about Reactor's [Context] through coroutines.
*
* This context element is implicitly propagated through subscribers' context by all Reactive integrations,
* such as [mono], [flux], [Publisher.asFlow][asFlow], [Flow.asPublisher][asPublisher] and [Flow.asFlux][asFlux].
* Functions that subscribe to a reactive stream
* (e.g. [Publisher.awaitFirst][kotlinx.coroutines.reactive.awaitFirst]), too, propagate [ReactorContext]
* to the subscriber's [Context].
**
* ### Examples of Reactive context integration.
*
* #### Propagating ReactorContext to Reactor's Context
* ```
* val flux = myDatabaseService.getUsers()
* .contextWrite { ctx -> println(ctx); ctx }
* flux.await() // Will print "null"
*
* // Now add ReactorContext
* withContext(Context.of("answer", "42").asCoroutineContext()) {
* flux.await() // Will print "Context{'key'='value'}"
* }
* ```
*
* #### Propagating subscriber's Context to ReactorContext:
* ```
* val flow = flow {
* println("Reactor context in Flow: " + coroutineContext[ReactorContext])
* }
* // No context
* flow.asFlux()
* .subscribe() // Will print 'Reactor context in Flow: null'
* // Add subscriber's context
* flow.asFlux()
* .contextWrite { ctx -> ctx.put("answer", 42) }
* .subscribe() // Will print "Reactor context in Flow: Context{'answer'=42}"
* ```
*/
@ExperimentalCoroutinesApi
public class ReactorContext(public val context: Context) : AbstractCoroutineContextElement(ReactorContext) {
// `Context.of` is zero-cost if the argument is a `Context`
public constructor(contextView: ContextView): this(Context.of(contextView))
public companion object Key : CoroutineContext.Key<ReactorContext>
}
/**
* Wraps the given [ContextView] into [ReactorContext], so it can be added to the coroutine's context
* and later used via `coroutineContext[ReactorContext]`.
*/
@ExperimentalCoroutinesApi
public fun ContextView.asCoroutineContext(): ReactorContext = ReactorContext(this)
/**
* Wraps the given [Context] into [ReactorContext], so it can be added to the coroutine's context
* and later used via `coroutineContext[ReactorContext]`.
*/
@ExperimentalCoroutinesApi
@Deprecated("The more general version for ContextView should be used instead", level = DeprecationLevel.HIDDEN)
public fun Context.asCoroutineContext(): ReactorContext = readOnly().asCoroutineContext() // `readOnly()` is zero-cost.
/**
* Updates the Reactor context in this [CoroutineContext], adding (or possibly replacing) some values.
*/
internal fun CoroutineContext.extendReactorContext(extensions: ContextView): CoroutineContext =
(this[ReactorContext]?.context?.putAll(extensions) ?: extensions).asCoroutineContext()