Skip to content

Commit 595bf11

Browse files
committed
~Properly handle cancellation in FlowSubscription, handle exceptions from onComplete and onError, make context injector R8-friendly, do not use flowOn for empty context
1 parent 975ed04 commit 595bf11

File tree

6 files changed

+106
-28
lines changed

6 files changed

+106
-28
lines changed

reactive/kotlinx-coroutines-reactive/src/Await.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ public suspend fun <T> Publisher<T>.awaitSingle(): T = awaitOne(Mode.SINGLE)
8585
// ContextInjector service is implemented in `kotlinx-coroutines-reactor` module only.
8686
// If `kotlinx-coroutines-reactor` module is not included, the list is empty.
8787
private val contextInjectors: Array<ContextInjector> =
88-
ServiceLoader.load(ContextInjector::class.java, ContextInjector::class.java.classLoader).toList().toTypedArray()
88+
ServiceLoader.load(ContextInjector::class.java, ContextInjector::class.java.classLoader).iterator().asSequence().toList().toTypedArray() // R8 opto
8989

9090
private fun <T> Publisher<T>.injectCoroutineContext(coroutineContext: CoroutineContext) =
9191
contextInjectors.fold(this) { pub, contextInjector ->

reactive/kotlinx-coroutines-reactive/src/ContextInjector.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ import kotlin.coroutines.CoroutineContext
88
@InternalCoroutinesApi
99
public interface ContextInjector {
1010
/**
11-
* Injects the coroutine context into the context of the publisher.
11+
* Injects `ReactorContext` element from the given context into the `SubscriberContext` of the publisher.
12+
* This API used as an indirection layer between `reactive` and `reactor` modules.
1213
*/
1314
public fun <T> injectCoroutineContext(publisher: Publisher<T>, coroutineContext: CoroutineContext): Publisher<T>
1415
}

reactive/kotlinx-coroutines-reactive/src/FlowAsPublisher.kt

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,11 @@
77

88
package kotlinx.coroutines.reactive
99

10+
import kotlinx.atomicfu.*
1011
import kotlinx.coroutines.*
1112
import kotlinx.coroutines.flow.*
1213
import org.reactivestreams.*
13-
import java.util.concurrent.atomic.*
14+
import java.util.concurrent.atomic.AtomicLong
1415
import kotlin.coroutines.*
1516

1617
/**
@@ -40,35 +41,40 @@ public class FlowSubscription<T>(
4041
@Volatile
4142
private var canceled: Boolean = false
4243
private val requested = AtomicLong(0L)
43-
private val producer: AtomicReference<CancellableContinuation<Unit>?> = AtomicReference()
44+
private val producer = atomic<CancellableContinuation<Unit>?>(null)
4445

4546
// This is actually optimizable
4647
private var job = GlobalScope.launch(Dispatchers.Unconfined, start = CoroutineStart.LAZY) {
4748
try {
4849
consumeFlow()
4950
subscriber.onComplete()
5051
} catch (e: Throwable) {
51-
// Failed with real exception, not due to cancellation
52-
if (!coroutineContext[Job]!!.isCancelled) {
53-
subscriber.onError(e)
52+
try {
53+
if (e is CancellationException) {
54+
subscriber.onComplete()
55+
} else {
56+
subscriber.onError(e)
57+
}
58+
} catch (e: Throwable) {
59+
// Last ditch report
60+
handleCoroutineException(coroutineContext, e)
5461
}
5562
}
5663
}
5764

5865
private suspend fun consumeFlow() {
5966
flow.collect { value ->
60-
if (!coroutineContext.isActive) {
61-
subscriber.onComplete()
62-
coroutineContext.ensureActive()
63-
}
64-
67+
/*
68+
* Flow is scopeless, thus if it's not active, its subscription was cancelled.
69+
* No intermediate "child failed, but flow coroutine is not" states are allowed.
70+
*/
71+
coroutineContext.ensureActive()
6572
if (requested.get() == 0L) {
6673
suspendCancellableCoroutine<Unit> {
67-
producer.set(it)
74+
producer.value = it
6875
if (requested.get() != 0L) it.resumeSafely()
6976
}
7077
}
71-
7278
requested.decrementAndGet()
7379
subscriber.onNext(value)
7480
}
@@ -80,12 +86,9 @@ public class FlowSubscription<T>(
8086
}
8187

8288
override fun request(n: Long) {
83-
if (n <= 0) {
89+
if (n <= 0 || canceled) {
8490
return
8591
}
86-
87-
if (canceled) return
88-
8992
job.start()
9093
var snapshot: Long
9194
var newValue: Long
@@ -95,7 +98,7 @@ public class FlowSubscription<T>(
9598
if (newValue <= 0L) newValue = Long.MAX_VALUE
9699
} while (!requested.compareAndSet(snapshot, newValue))
97100

98-
val prev = producer.get()
101+
val prev = producer.value
99102
if (prev == null || !producer.compareAndSet(prev, null)) return
100103
prev.resumeSafely()
101104
}
@@ -106,4 +109,4 @@ public class FlowSubscription<T>(
106109
completeResume(token)
107110
}
108111
}
109-
}
112+
}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.reactive
6+
7+
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.flow.*
9+
import org.junit.Test
10+
import org.reactivestreams.*
11+
import kotlin.test.*
12+
13+
class FlowAsPublisherTest : TestBase() {
14+
15+
@Test
16+
fun testErrorOnCancellationIsReported() {
17+
expect(1)
18+
flow<Int> {
19+
emit(2)
20+
try {
21+
hang { expect(3) }
22+
} finally {
23+
throw TestException()
24+
}
25+
}.asPublisher().subscribe(object : Subscriber<Int> {
26+
private lateinit var subscription: Subscription
27+
28+
override fun onComplete() {
29+
expectUnreached()
30+
}
31+
32+
override fun onSubscribe(s: Subscription?) {
33+
subscription = s!!
34+
subscription.request(2)
35+
}
36+
37+
override fun onNext(t: Int) {
38+
expect(t)
39+
subscription.cancel()
40+
}
41+
42+
override fun onError(t: Throwable?) {
43+
assertTrue(t is TestException)
44+
expect(4)
45+
}
46+
})
47+
finish(5)
48+
}
49+
50+
@Test
51+
fun testCancellationIsNotReported() {
52+
expect(1)
53+
flow {
54+
emit(2)
55+
hang { expect(3) }
56+
}.asPublisher().subscribe(object : Subscriber<Int> {
57+
private lateinit var subscription: Subscription
58+
59+
override fun onComplete() {
60+
expect(4)
61+
}
62+
63+
override fun onSubscribe(s: Subscription?) {
64+
subscription = s!!
65+
subscription.request(2)
66+
}
67+
68+
override fun onNext(t: Int) {
69+
expect(t)
70+
subscription.cancel()
71+
}
72+
73+
override fun onError(t: Throwable?) {
74+
expectUnreached()
75+
}
76+
})
77+
finish(5)
78+
}
79+
}

reactive/kotlinx-coroutines-reactor/src/FlowAsFlux.kt

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,8 @@ public fun <T: Any> Flow<T>.asFlux(): Flux<T> = FlowAsFlux(this)
1919
private class FlowAsFlux<T : Any>(private val flow: Flow<T>) : Flux<T>() {
2020
override fun subscribe(subscriber: CoreSubscriber<in T>?) {
2121
if (subscriber == null) throw NullPointerException()
22-
subscriber.onSubscribe(
23-
FlowSubscription(
24-
flow.flowOn(subscriber.currentContext().asCoroutineContext()),
25-
subscriber
26-
)
27-
)
22+
val hasContext = subscriber.currentContext().isEmpty
23+
val source = if (hasContext) flow.flowOn(subscriber.currentContext().asCoroutineContext()) else flow
24+
subscriber.onSubscribe(FlowSubscription(source, subscriber))
2825
}
2926
}

reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ import kotlin.coroutines.*
3333
*
3434
* [CoroutineContext] of a suspendable function that awaits a value from [Mono] or [Flux] instance
3535
* is propagated into [mono] and [flux] Reactor builders:
36-
*
3736
* ```
3837
* launch(Context.of("key", "value").asCoroutineContext()) {
3938
* assertEquals(bar().awaitFirst(), "value")
@@ -43,7 +42,6 @@ import kotlin.coroutines.*
4342
* coroutineContext[ReactorContext]!!.context.get("key")
4443
* }
4544
* ```
46-
}
4745
*/
4846
@ExperimentalCoroutinesApi
4947
public class ReactorContext(val context: Context) : AbstractCoroutineContextElement(ReactorContext) {

0 commit comments

Comments
 (0)