Skip to content

Commit 15ee8a3

Browse files
qwwdfsadelizarov
authored andcommitted
SafeCollector rework (#1196)
SafeCollector performance improvements: * Cache result of the context check thus speeding up safe collector on happy path by a factor of three * Separate fast and slow paths in JobSupport to drastically change inlining decisions of the JVM that are crucial for leaf coroutines with flows Strengthen flow context preservation invariant * Add additional check in SafeCollector with an error message pointing to channelFlow * Improve performance of the CoroutineId check in SafeCollector * Change wording in documentation Fixes #1210
1 parent b08d61c commit 15ee8a3

File tree

10 files changed

+222
-49
lines changed

10 files changed

+222
-49
lines changed

kotlinx-coroutines-core/common/src/CoroutineContext.common.kt

+1-2
Original file line numberDiff line numberDiff line change
@@ -20,5 +20,4 @@ internal expect val DefaultDelay: Delay
2020
// countOrElement -- pre-cached value for ThreadContext.kt
2121
internal expect inline fun <T> withCoroutineContext(context: CoroutineContext, countOrElement: Any?, block: () -> T): T
2222
internal expect fun Continuation<*>.toDebugString(): String
23-
internal expect val CoroutineContext.coroutineName: String?
24-
internal expect fun CoroutineContext.minusId(): CoroutineContext
23+
internal expect val CoroutineContext.coroutineName: String?

kotlinx-coroutines-core/common/src/JobSupport.kt

+6-1
Original file line numberDiff line numberDiff line change
@@ -776,6 +776,11 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
776776
if (!tryFinalizeSimpleState(state, proposedUpdate, mode)) return COMPLETING_RETRY
777777
return COMPLETING_COMPLETED
778778
}
779+
// The separate slow-path function to simplify profiling
780+
return tryMakeCompletingSlowPath(state, proposedUpdate, mode)
781+
}
782+
783+
private fun tryMakeCompletingSlowPath(state: Incomplete, proposedUpdate: Any?, mode: Int): Int {
779784
// get state's list or else promote to list to correctly operate on child lists
780785
val list = getOrPromoteCancellingList(state) ?: return COMPLETING_RETRY
781786
// promote to Finishing state if we are not in it yet
@@ -1195,7 +1200,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
11951200
* Class to represent object as the final state of the Job
11961201
*/
11971202
private class IncompleteStateBox(@JvmField val state: Incomplete)
1198-
private fun Any?.boxIncomplete(): Any? = if (this is Incomplete) IncompleteStateBox(this) else this
1203+
internal fun Any?.boxIncomplete(): Any? = if (this is Incomplete) IncompleteStateBox(this) else this
11991204
internal fun Any?.unboxState(): Any? = (this as? IncompleteStateBox)?.state ?: this
12001205

12011206
// --------------- helper classes & constants for job implementation

kotlinx-coroutines-core/common/src/flow/Flow.kt

+26-26
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import kotlinx.coroutines.*
2727
* trigger their evaluation every time [collect] is executed) or hot ones, but, conventionally, they represent cold streams.
2828
* Transitions between hot and cold streams are supported via channels and the corresponding API: [flowViaChannel], [broadcastIn], [produceIn].
2929
*
30-
* The flow has a context preserving property: it encapsulates its own execution context and never propagates or leaks it downstream, thus making
30+
* The flow has a context preservation property: it encapsulates its own execution context and never propagates or leaks it downstream, thus making
3131
* reasoning about the execution context of particular transformations or terminal operations trivial.
3232
*
3333
* There are two ways to change the context of a flow: [flowOn][Flow.flowOn] and [flowWith][Flow.flowWith].
@@ -52,24 +52,37 @@ import kotlinx.coroutines.*
5252
* }
5353
* ```
5454
*
55-
* From the implementation point of view it means that all intermediate operators on [Flow] should abide by the following constraint:
56-
* If collection or emission of a flow is to be separated into multiple coroutines, it should use [coroutineScope] or [supervisorScope] and
57-
* is not allowed to modify the coroutines' context:
55+
* From the implementation point of view it means that all intermediate operators on [Flow] should abide by the following constraints:
56+
* 1) If an operator is trivial and does not start any coroutines, regular [flow] builder should be used. Its implementation
57+
* efficiently enforces all the invariants and prevents most of the development mistakes.
58+
*
59+
* 2) If the collection and emission of the flow are to be separated into multiple coroutines, [channelFlow] should be used.
60+
* [channelFlow] encapsulates all the context preservation work and allows you to focus on your domain-specific problem,
61+
* rather than invariant implementation details. It is possible to use any combination of coroutine builders from within [channelFlow].
62+
*
63+
* 3) If you are looking for the performance and are sure that no concurrent emits and context jumps will happen, [flow] builder
64+
* alongside with [coroutineScope] or [supervisorScope] can be used instead:
65+
*
66+
* - Scoped primitive should be used to provide a [CoroutineScope]
67+
* - Changing the context of emission is prohibited, no matter whether it is `withContext(ctx)` or builder argument (e.g. `launch(ctx)`)
68+
* - Changing the context of collection is allowed, but it has the same effect as [flowOn] operator and changes the upstream context.
69+
*
70+
* These constraints are enforced by the default [flow] builder.
71+
* Example of the proper `buffer` implementation:
5872
* ```
5973
* fun <T> Flow<T>.buffer(bufferSize: Int): Flow<T> = flow {
6074
* coroutineScope { // coroutine scope is necessary, withContext is prohibited
61-
* val channel = Channel<T>(bufferSize)
62-
* // GlobalScope.launch { is prohibited
63-
* // launch(Dispatchers.IO) { is prohibited
64-
* launch { // is OK
65-
* collect { value ->
75+
* // GlobalScope.produce { is prohibited
76+
* val channel = produce(bufferSize) {
77+
* collect { value -> // Collect from started coroutine -- OK
6678
* channel.send(value)
6779
* }
68-
* channel.close()
6980
* }
7081
*
7182
* for (i in channel) {
72-
* emit(i)
83+
* emit(i) // Emission from the enclosing scope -- OK
84+
* // launch { emit(i) } -- prohibited
85+
* // withContext(Dispatchers.IO) { emit(i) }
7386
* }
7487
* }
7588
* }
@@ -87,23 +100,10 @@ public interface Flow<out T> {
87100
* A valid implementation of this method has the following constraints:
88101
* 1) It should not change the coroutine context (e.g. with `withContext(Dispatchers.IO)`) when emitting values.
89102
* The emission should happen in the context of the [collect] call.
90-
*
91-
* Only coroutine builders that inherit the context are allowed, for example:
92-
* ```
93-
* class MyFlow : Flow<Int> {
94-
* override suspend fun collect(collector: FlowCollector<Int>) {
95-
* coroutineScope {
96-
* // Context is inherited
97-
* launch { // Dispatcher is not overridden, fine as well
98-
* collector.emit(42) // Emit from the launched coroutine
99-
* }
100-
* }
101-
* }
102-
* }
103-
* ```
104-
* is a proper [Flow] implementation, but using `launch(Dispatchers.IO)` is not.
103+
* Please refer to the top-level [Flow] documentation for more details.
105104
*
106105
* 2) It should serialize calls to [emit][FlowCollector.emit] as [FlowCollector] implementations are not thread safe by default.
106+
* To automatically serialize emissions [channelFlow] builder can be used instead of [flow]
107107
*/
108108
public suspend fun collect(collector: FlowCollector<T>)
109109
}

kotlinx-coroutines-core/common/src/flow/FlowCollector.kt

+1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ public interface FlowCollector<in T> {
1818

1919
/**
2020
* Collects the value emitted by the upstream.
21+
* This method is not thread-safe and should not be invoked concurrently.
2122
*/
2223
public suspend fun emit(value: T)
2324
}

kotlinx-coroutines-core/common/src/flow/internal/SafeCollector.kt

+80-8
Original file line numberDiff line numberDiff line change
@@ -12,18 +12,90 @@ import kotlin.coroutines.*
1212
@PublishedApi
1313
internal class SafeCollector<T>(
1414
private val collector: FlowCollector<T>,
15-
collectContext: CoroutineContext
16-
) : FlowCollector<T>, SynchronizedObject() {
15+
private val collectContext: CoroutineContext
16+
) : FlowCollector<T> {
1717

18-
private val collectContext = collectContext.minusKey(Job).minusId()
18+
// Note, it is non-capturing lambda, so no extra allocation during init of SafeCollector
19+
private val collectContextSize = collectContext.fold(0) { count, _ -> count + 1 }
20+
private var lastEmissionContext: CoroutineContext? = null
1921

2022
override suspend fun emit(value: T) {
21-
val emitContext = coroutineContext.minusKey(Job).minusId()
22-
if (emitContext != collectContext) {
23+
/*
24+
* Benign data-race here:
25+
* We read potentially racy published coroutineContext, but we only use it for
26+
* referential comparison (=> thus safe) and are not using it for structural comparisons.
27+
*/
28+
val currentContext = coroutineContext
29+
// This check is triggered once per flow on happy path.
30+
if (lastEmissionContext !== currentContext) {
31+
checkContext(currentContext)
32+
lastEmissionContext = currentContext
33+
}
34+
collector.emit(value) // TCE
35+
}
36+
37+
private fun checkContext(currentContext: CoroutineContext) {
38+
val result = currentContext.fold(0) fold@{ count, element ->
39+
val key = element.key
40+
val collectElement = collectContext[key]
41+
if (key !== Job) {
42+
return@fold if (element !== collectElement) Int.MIN_VALUE
43+
else count + 1
44+
}
45+
46+
val collectJob = collectElement as Job?
47+
val emissionParentJob = (element as Job).transitiveCoroutineParent(collectJob)
48+
/*
49+
* Things like
50+
* ```
51+
* coroutineScope {
52+
* launch {
53+
* emit(1)
54+
* }
55+
*
56+
* launch {
57+
* emit(2)
58+
* }
59+
* }
60+
* ```
61+
* are prohibited because 'emit' is not thread-safe by default. Use channelFlow instead if you need concurrent emission
62+
* or want to switch context dynamically (e.g. with `withContext`).
63+
*
64+
* Note that collecting from another coroutine is allowed, e.g.:
65+
* ```
66+
* coroutineScope {
67+
* val channel = produce {
68+
* collect { value ->
69+
* send(value)
70+
* }
71+
* }
72+
* channel.consumeEach { value ->
73+
* emit(value)
74+
* }
75+
* }
76+
* ```
77+
* is a completely valid.
78+
*/
79+
if (emissionParentJob !== collectJob) {
80+
error(
81+
"Flow invariant is violated: emission from another coroutine is detected (child of $emissionParentJob, expected child of $collectJob). " +
82+
"FlowCollector is not thread-safe and concurrent emissions are prohibited. To mitigate this restriction please use 'flowChannel' builder instead of 'flow'"
83+
)
84+
}
85+
count + 1
86+
}
87+
if (result != collectContextSize) {
2388
error(
24-
"Flow invariant is violated: flow was collected in $collectContext, but emission happened in $emitContext. " +
25-
"Please refer to 'flow' documentation or use 'flowOn' instead")
89+
"Flow invariant is violated: flow was collected in $collectContext, but emission happened in $currentContext. " +
90+
"Please refer to 'flow' documentation or use 'flowOn' instead"
91+
)
2692
}
27-
collector.emit(value)
93+
}
94+
95+
private tailrec fun Job?.transitiveCoroutineParent(collectJob: Job?): Job? {
96+
if (this === null) return null
97+
if (this === collectJob) return this
98+
if (this !is ScopeCoroutine<*>) return this
99+
return parent.transitiveCoroutineParent(collectJob)
28100
}
29101
}

kotlinx-coroutines-core/common/src/internal/Scopes.kt

+2
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ internal open class ScopeCoroutine<in T>(
1919
final override fun getStackTraceElement(): StackTraceElement? = null
2020
override val defaultResumeMode: Int get() = MODE_DIRECT
2121

22+
internal val parent: Job? get() = parentContext[Job]
23+
2224
override val cancelsParent: Boolean
2325
get() = false // it throws exception to parent instead of cancelling it
2426

kotlinx-coroutines-core/common/test/flow/FlowInvariantsTest.kt

+104-3
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,9 @@ import kotlin.test.*
1212
class FlowInvariantsTest : TestBase() {
1313

1414
@Test
15-
fun testWithContextContract() = runTest {
15+
fun testWithContextContract() = runTest({ it is IllegalStateException }) {
1616
flow {
1717
kotlinx.coroutines.withContext(NonCancellable) {
18-
// This one cannot be prevented :(
1918
emit(1)
2019
}
2120
}.collect {
@@ -34,6 +33,27 @@ class FlowInvariantsTest : TestBase() {
3433
}
3534
}
3635

36+
@Test
37+
fun testCachedInvariantCheckResult() = runTest {
38+
flow {
39+
emit(1)
40+
41+
try {
42+
kotlinx.coroutines.withContext(NamedDispatchers("foo")) {
43+
emit(1)
44+
}
45+
fail()
46+
} catch (e: IllegalStateException) {
47+
expect(2)
48+
}
49+
50+
emit(3)
51+
}.collect {
52+
expect(it)
53+
}
54+
finish(4)
55+
}
56+
3757
@Test
3858
fun testWithNameContractViolated() = runTest({ it is IllegalStateException }) {
3959
flow {
@@ -66,7 +86,7 @@ class FlowInvariantsTest : TestBase() {
6686
}
6787

6888
@Test
69-
fun testScopedJob() = runTest {
89+
fun testScopedJob() = runTest({ it is IllegalStateException }) {
7090
flow { emit(1) }.buffer(EmptyCoroutineContext).collect {
7191
expect(1)
7292
}
@@ -83,6 +103,87 @@ class FlowInvariantsTest : TestBase() {
83103
finish(2)
84104
}
85105

106+
@Test
107+
fun testMergeViolation() = runTest {
108+
fun Flow<Int>.merge(other: Flow<Int>): Flow<Int> = flow {
109+
coroutineScope {
110+
launch {
111+
collect { value -> emit(value) }
112+
}
113+
other.collect { value -> emit(value) }
114+
}
115+
}
116+
117+
fun Flow<Int>.trickyMerge(other: Flow<Int>): Flow<Int> = flow {
118+
coroutineScope {
119+
launch {
120+
collect { value ->
121+
coroutineScope { emit(value) }
122+
}
123+
}
124+
other.collect { value -> emit(value) }
125+
}
126+
}
127+
128+
val flow = flowOf(1)
129+
assertFailsWith<IllegalStateException> { flow.merge(flow).toList() }
130+
assertFailsWith<IllegalStateException> { flow.trickyMerge(flow).toList() }
131+
}
132+
133+
// TODO merge artifact
134+
private fun <T> channelFlow(bufferSize: Int = 16, @BuilderInference block: suspend ProducerScope<T>.() -> Unit): Flow<T> =
135+
flow {
136+
coroutineScope {
137+
val channel = produce(capacity = bufferSize, block = block)
138+
channel.consumeEach { value ->
139+
emit(value)
140+
}
141+
}
142+
}
143+
144+
@Test
145+
fun testNoMergeViolation() = runTest {
146+
fun Flow<Int>.merge(other: Flow<Int>): Flow<Int> = channelFlow {
147+
launch {
148+
collect { value -> send(value) }
149+
}
150+
other.collect { value -> send(value) }
151+
}
152+
153+
fun Flow<Int>.trickyMerge(other: Flow<Int>): Flow<Int> = channelFlow {
154+
coroutineScope {
155+
launch {
156+
collect { value ->
157+
coroutineScope { send(value) }
158+
}
159+
}
160+
other.collect { value -> send(value) }
161+
}
162+
}
163+
164+
val flow = flowOf(1)
165+
assertEquals(listOf(1, 1), flow.merge(flow).toList())
166+
assertEquals(listOf(1, 1), flow.trickyMerge(flow).toList())
167+
}
168+
169+
@Test
170+
fun testScopedCoroutineNoViolation() = runTest {
171+
fun Flow<Int>.buffer(): Flow<Int> = flow {
172+
coroutineScope {
173+
val channel = produce {
174+
collect {
175+
send(it)
176+
}
177+
}
178+
channel.consumeEach {
179+
emit(it)
180+
}
181+
}
182+
}
183+
184+
assertEquals(listOf(1, 1), flowOf(1, 1).buffer().toList())
185+
}
186+
86187
private fun Flow<Int>.buffer(coroutineContext: CoroutineContext): Flow<Int> = flow {
87188
coroutineScope {
88189
val channel = Channel<Int>()

kotlinx-coroutines-core/js/src/CoroutineContext.kt

+1-3
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,4 @@ public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext):
4747
// No debugging facilities on JS
4848
internal actual inline fun <T> withCoroutineContext(context: CoroutineContext, countOrElement: Any?, block: () -> T): T = block()
4949
internal actual fun Continuation<*>.toDebugString(): String = toString()
50-
internal actual val CoroutineContext.coroutineName: String? get() = null // not supported on JS
51-
@Suppress("NOTHING_TO_INLINE")
52-
internal actual inline fun CoroutineContext.minusId(): CoroutineContext = this
50+
internal actual val CoroutineContext.coroutineName: String? get() = null // not supported on JS

kotlinx-coroutines-core/jvm/src/CoroutineContext.kt

-3
Original file line numberDiff line numberDiff line change
@@ -79,9 +79,6 @@ internal actual val CoroutineContext.coroutineName: String? get() {
7979
return "$coroutineName#${coroutineId.id}"
8080
}
8181

82-
@Suppress("NOTHING_TO_INLINE")
83-
internal actual inline fun CoroutineContext.minusId(): CoroutineContext = minusKey(CoroutineId)
84-
8582
private const val DEBUG_THREAD_NAME_SEPARATOR = " @"
8683

8784
internal data class CoroutineId(

kotlinx-coroutines-core/native/src/CoroutineContext.kt

+1-3
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,4 @@ public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext):
4646
// No debugging facilities on native
4747
internal actual inline fun <T> withCoroutineContext(context: CoroutineContext, countOrElement: Any?, block: () -> T): T = block()
4848
internal actual fun Continuation<*>.toDebugString(): String = toString()
49-
internal actual val CoroutineContext.coroutineName: String? get() = null // not supported on native
50-
@Suppress("NOTHING_TO_INLINE")
51-
internal actual inline fun CoroutineContext.minusId(): CoroutineContext = this
49+
internal actual val CoroutineContext.coroutineName: String? get() = null // not supported on native

0 commit comments

Comments
 (0)