Skip to content

Commit 1652bb9

Browse files
committed
Fix ReactorFlow integration bug exposed by TestBase
1 parent 9a2eb38 commit 1652bb9

File tree

5 files changed

+55
-44
lines changed

5 files changed

+55
-44
lines changed

reactive/kotlinx-coroutines-reactor/src/ReactorFlow.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ public fun <T: Any> Flow<T>.asFlux(): Flux<T> = FlowAsFlux(this)
2121
private class FlowAsFlux<T : Any>(private val flow: Flow<T>) : Flux<T>() {
2222
override fun subscribe(subscriber: CoreSubscriber<in T>?) {
2323
if (subscriber == null) throw NullPointerException()
24-
val hasContext = subscriber.currentContext().isEmpty
24+
val hasContext = !subscriber.currentContext().isEmpty
2525
val source = if (hasContext) flow.flowOn(subscriber.currentContext().asCoroutineContext()) else flow
2626
subscriber.onSubscribe(FlowSubscription(source, subscriber))
2727
}

reactive/kotlinx-coroutines-reactor/test/FlowAsFluxTest.kt

+7-7
Original file line numberDiff line numberDiff line change
@@ -10,18 +10,18 @@ import kotlin.test.assertEquals
1010

1111
class FlowAsFluxTest : TestBase() {
1212
@Test
13-
fun testFlowToFluxContextPropagation() = runBlocking<Unit> {
13+
fun testFlowToFluxContextPropagation() {
1414
val flux = flow<String> {
15-
(1..4).forEach { i -> emit(m(i).awaitFirst()) }
15+
(1..4).forEach { i -> emit(createMono(i).awaitFirst()) }
1616
} .asFlux()
1717
.subscriberContext(Context.of(1, "1"))
1818
.subscriberContext(Context.of(2, "2", 3, "3", 4, "4"))
19-
var i = 0
20-
flux.subscribe { str -> i++; println(str); assertEquals(str, i.toString()) }
19+
val list = flux.collectList().block()!!
20+
assertEquals(listOf("1", "2", "3", "4"), list)
2121
}
2222

23-
private fun m(i: Int): Mono<String> = mono {
24-
val ctx = coroutineContext[ReactorContext]?.context
25-
ctx?.getOrDefault(i, "noValue")
23+
private fun createMono(i: Int): Mono<String> = mono {
24+
val ctx = coroutineContext[ReactorContext]!!.context
25+
ctx.getOrDefault(i, "noValue")
2626
}
2727
}

reactive/kotlinx-coroutines-reactor/test/FluxSingleTest.kt

+7-1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,12 @@ import reactor.core.publisher.*
1212
import java.time.Duration.*
1313

1414
class FluxSingleTest : TestBase() {
15+
16+
@Before
17+
fun setup() {
18+
ignoreLostThreads("parallel-")
19+
}
20+
1521
@Test
1622
fun testSingleNoWait() {
1723
val flux = flux {
@@ -167,7 +173,7 @@ class FluxSingleTest : TestBase() {
167173
@Test
168174
fun testExceptionFromCoroutine() {
169175
val flux = flux<String> {
170-
error(Flux.just("O").awaitSingle() + "K")
176+
throw IllegalStateException(Flux.just("O").awaitSingle() + "K")
171177
}
172178

173179
checkErroneous(flux) {

reactive/kotlinx-coroutines-reactor/test/ReactorContextTest.kt

+34-34
Original file line numberDiff line numberDiff line change
@@ -7,38 +7,40 @@ import kotlinx.coroutines.reactive.*
77
import org.junit.Test
88
import reactor.core.publisher.*
99
import reactor.util.context.*
10+
import kotlin.coroutines.*
1011
import kotlin.test.*
1112

1213
class ReactorContextTest : TestBase() {
1314
@Test
1415
fun testMonoHookedContext() = runBlocking {
1516
val mono = mono(Context.of(1, "1", 7, "7").asCoroutineContext()) {
16-
val ctx = coroutineContext[ReactorContext]?.context
17+
val ctx = reactorContext()
1718
buildString {
18-
(1..7).forEach { append(ctx?.getOrDefault(it, "noValue")) }
19+
(1..7).forEach { append(ctx.getOrDefault(it, "noValue")) }
1920
}
2021
} .subscriberContext(Context.of(2, "2", 3, "3", 4, "4", 5, "5"))
2122
.subscriberContext { ctx -> ctx.put(6, "6") }
2223
assertEquals(mono.awaitFirst(), "1234567")
2324
}
2425

2526
@Test
26-
fun testFluxContext() = runBlocking<Unit> {
27+
fun testFluxContext() {
2728
val flux = flux(Context.of(1, "1", 7, "7").asCoroutineContext()) {
28-
val ctx = coroutineContext[ReactorContext]!!.context
29+
val ctx = reactorContext()
2930
(1..7).forEach { send(ctx.getOrDefault(it, "noValue")) }
30-
} .subscriberContext(Context.of(2, "2", 3, "3", 4, "4", 5, "5"))
31+
}
32+
.subscriberContext(Context.of(2, "2", 3, "3", 4, "4", 5, "5"))
3133
.subscriberContext { ctx -> ctx.put(6, "6") }
32-
var i = 0
33-
flux.subscribe { str -> i++; assertEquals(str, i.toString()) }
34+
val list = flux.collectList().block()!!
35+
assertEquals((1..7).map { it.toString() }, list)
3436
}
3537

3638
@Test
3739
fun testAwait() = runBlocking(Context.of(3, "3").asCoroutineContext()) {
3840
val result = mono(Context.of(1, "1").asCoroutineContext()) {
39-
val ctx = coroutineContext[ReactorContext]?.context
41+
val ctx = reactorContext()
4042
buildString {
41-
(1..3).forEach { append(ctx?.getOrDefault(it, "noValue")) }
43+
(1..3).forEach { append(ctx.getOrDefault(it, "noValue")) }
4244
}
4345
} .subscriberContext(Context.of(2, "2"))
4446
.awaitFirst()
@@ -47,36 +49,34 @@ class ReactorContextTest : TestBase() {
4749

4850
@Test
4951
fun testMonoAwaitContextPropagation() = runBlocking(Context.of(7, "7").asCoroutineContext()) {
50-
assertEquals(m().awaitFirst(), "7")
51-
assertEquals(m().awaitFirstOrDefault("noValue"), "7")
52-
assertEquals(m().awaitFirstOrNull(), "7")
53-
assertEquals(m().awaitFirstOrElse { "noValue" }, "7")
54-
assertEquals(m().awaitLast(), "7")
55-
assertEquals(m().awaitSingle(), "7")
52+
assertEquals(createMono().awaitFirst(), "7")
53+
assertEquals(createMono().awaitFirstOrDefault("noValue"), "7")
54+
assertEquals(createMono().awaitFirstOrNull(), "7")
55+
assertEquals(createMono().awaitFirstOrElse { "noValue" }, "7")
56+
assertEquals(createMono().awaitLast(), "7")
57+
assertEquals(createMono().awaitSingle(), "7")
5658
}
5759

5860
@Test
5961
fun testFluxAwaitContextPropagation() = runBlocking<Unit>(
6062
Context.of(1, "1", 2, "2", 3, "3").asCoroutineContext()
6163
) {
62-
assertEquals(f().awaitFirst(), "1")
63-
assertEquals(f().awaitFirstOrDefault("noValue"), "1")
64-
assertEquals(f().awaitFirstOrNull(), "1")
65-
assertEquals(f().awaitFirstOrElse { "noValue" }, "1")
66-
assertEquals(f().awaitLast(), "3")
67-
var i = 0
68-
f().subscribe { str -> i++; assertEquals(str, i.toString()) }
64+
assertEquals(createFlux().awaitFirst(), "1")
65+
assertEquals(createFlux().awaitFirstOrDefault("noValue"), "1")
66+
assertEquals(createFlux().awaitFirstOrNull(), "1")
67+
assertEquals(createFlux().awaitFirstOrElse { "noValue" }, "1")
68+
assertEquals(createFlux().awaitLast(), "3")
6969
}
7070

71-
private fun m(): Mono<String> = mono {
72-
val ctx = coroutineContext[ReactorContext]?.context
73-
ctx?.getOrDefault(7, "noValue")
71+
private fun createMono(): Mono<String> = mono {
72+
val ctx = reactorContext()
73+
ctx.getOrDefault(7, "noValue")
7474
}
7575

7676

77-
private fun f(): Flux<String?> = flux {
78-
val ctx = coroutineContext[ReactorContext]?.context
79-
(1..3).forEach { send(ctx?.getOrDefault(it, "noValue")) }
77+
private fun createFlux(): Flux<String?> = flux {
78+
val ctx = reactorContext()
79+
(1..3).forEach { send(ctx.getOrDefault(it, "noValue")) }
8080
}
8181

8282
@Test
@@ -95,17 +95,17 @@ class ReactorContextTest : TestBase() {
9595
fun testFlowToFluxDirectContextPropagation() = runBlocking(
9696
Context.of(1, "1", 2, "2", 3, "3").asCoroutineContext()
9797
) {
98-
var i = 0
9998
// convert resulting flow to channel using "produceIn"
10099
val channel = bar().produceIn(this)
101-
channel.consumeEach { str ->
102-
i++; assertEquals(str, i.toString())
103-
}
104-
assertEquals(i, 3)
100+
val list = channel.toList()
101+
assertEquals(listOf("1", "2", "3"), list)
105102
}
106103

107104
private fun bar(): Flow<String> = flux {
108-
val ctx = coroutineContext[ReactorContext]!!.context
105+
val ctx = reactorContext()
109106
(1..3).forEach { send(ctx.getOrDefault(it, "noValue")) }
110107
}.asFlow()
108+
109+
private suspend fun reactorContext() =
110+
coroutineContext[ReactorContext]!!.context
111111
}

reactive/kotlinx-coroutines-rx2/test/ObservableSingleTest.kt

+6-1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,11 @@ import org.junit.Assert.*
1111
import java.util.concurrent.*
1212

1313
class ObservableSingleTest : TestBase() {
14+
@Before
15+
fun setup() {
16+
ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-")
17+
}
18+
1419
@Test
1520
fun testSingleNoWait() {
1621
val observable = rxObservable {
@@ -166,7 +171,7 @@ class ObservableSingleTest : TestBase() {
166171
@Test
167172
fun testExceptionFromCoroutine() {
168173
val observable = rxObservable<String> {
169-
error(Observable.just("O").awaitSingle() + "K")
174+
throw IllegalStateException(Observable.just("O").awaitSingle() + "K")
170175
}
171176

172177
checkErroneous(observable) {

0 commit comments

Comments
 (0)