-
Notifications
You must be signed in to change notification settings - Fork 1.9k
/
Copy pathReactorContextTest.kt
111 lines (98 loc) · 3.77 KB
/
ReactorContextTest.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package kotlinx.coroutines.reactor
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.reactive.*
import org.junit.Test
import reactor.core.publisher.*
import reactor.util.context.*
import kotlin.coroutines.*
import kotlin.test.*
class ReactorContextTest : TestBase() {
@Test
fun testMonoHookedContext() = runBlocking {
val mono = mono(Context.of(1, "1", 7, "7").asCoroutineContext()) {
val ctx = reactorContext()
buildString {
(1..7).forEach { append(ctx.getOrDefault(it, "noValue")) }
}
} .contextWrite(Context.of(2, "2", 3, "3", 4, "4", 5, "5"))
.contextWrite { ctx -> ctx.put(6, "6") }
assertEquals(mono.awaitFirst(), "1234567")
}
@Test
fun testFluxContext() {
val flux = flux(Context.of(1, "1", 7, "7").asCoroutineContext()) {
val ctx = reactorContext()
(1..7).forEach { send(ctx.getOrDefault(it, "noValue")) }
}
.contextWrite(Context.of(2, "2", 3, "3", 4, "4", 5, "5"))
.contextWrite { ctx -> ctx.put(6, "6") }
val list = flux.collectList().block()!!
assertEquals((1..7).map { it.toString() }, list)
}
@Test
fun testAwait() = runBlocking(Context.of(3, "3").asCoroutineContext()) {
val result = mono(Context.of(1, "1").asCoroutineContext()) {
val ctx = reactorContext()
buildString {
(1..3).forEach { append(ctx.getOrDefault(it, "noValue")) }
}
} .contextWrite(Context.of(2, "2"))
.awaitFirst()
assertEquals(result, "123")
}
@Test
fun testMonoAwaitContextPropagation() = runBlocking(Context.of(7, "7").asCoroutineContext()) {
assertEquals(createMono().awaitFirst(), "7")
assertEquals(createMono().awaitFirstOrDefault("noValue"), "7")
assertEquals(createMono().awaitFirstOrNull(), "7")
assertEquals(createMono().awaitFirstOrElse { "noValue" }, "7")
assertEquals(createMono().awaitLast(), "7")
assertEquals(createMono().awaitSingle(), "7")
}
@Test
fun testFluxAwaitContextPropagation() = runBlocking<Unit>(
Context.of(1, "1", 2, "2", 3, "3").asCoroutineContext()
) {
assertEquals(createFlux().awaitFirst(), "1")
assertEquals(createFlux().awaitFirstOrDefault("noValue"), "1")
assertEquals(createFlux().awaitFirstOrNull(), "1")
assertEquals(createFlux().awaitFirstOrElse { "noValue" }, "1")
assertEquals(createFlux().awaitLast(), "3")
}
private fun createMono(): Mono<String> = mono {
val ctx = reactorContext()
ctx.getOrDefault(7, "noValue")
}
private fun createFlux(): Flux<String> = flux {
val ctx = reactorContext()
(1..3).forEach { send(ctx.getOrDefault(it, "noValue")) }
}
@Test
fun testFlowToFluxContextPropagation() = runBlocking(
Context.of(1, "1", 2, "2", 3, "3").asCoroutineContext()
) {
var i = 0
// call "collect" on the converted Flow
bar().collect { str ->
i++; assertEquals(str, i.toString())
}
assertEquals(i, 3)
}
@Test
fun testFlowToFluxDirectContextPropagation() = runBlocking(
Context.of(1, "1", 2, "2", 3, "3").asCoroutineContext()
) {
// convert resulting flow to channel using "produceIn"
val channel = bar().produceIn(this)
val list = channel.toList()
assertEquals(listOf("1", "2", "3"), list)
}
private fun bar(): Flow<String> = flux {
val ctx = reactorContext()
(1..3).forEach { send(ctx.getOrDefault(it, "noValue")) }
}.asFlow()
private suspend fun reactorContext() =
coroutineContext[ReactorContext]!!.context
}