Skip to content

Commit ea99784

Browse files
committed
SafeCollector performance improvements:
* Cache result of the context check thus speeding up safe collector on happy path by a factor of three * Separate fast and slow paths in JobSupport to drastically change inlining decisions of the JVM that are crucial for leaf coroutines with flows
1 parent 3fe7bd2 commit ea99784

File tree

3 files changed

+45
-7
lines changed

3 files changed

+45
-7
lines changed

kotlinx-coroutines-core/common/src/JobSupport.kt

+6-1
Original file line numberDiff line numberDiff line change
@@ -783,6 +783,11 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
783783
if (!tryFinalizeSimpleState(state, proposedUpdate, mode)) return COMPLETING_RETRY
784784
return COMPLETING_COMPLETED
785785
}
786+
// The separate slow-path function to simplify profiling
787+
return tryMakeCompletingSlowPath(state, proposedUpdate, mode)
788+
}
789+
790+
private fun tryMakeCompletingSlowPath(state: Incomplete, proposedUpdate: Any?, mode: Int): Int {
786791
// get state's list or else promote to list to correctly operate on child lists
787792
val list = getOrPromoteCancellingList(state) ?: return COMPLETING_RETRY
788793
// promote to Finishing state if we are not in it yet
@@ -1202,7 +1207,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
12021207
* Class to represent object as the final state of the Job
12031208
*/
12041209
private class IncompleteStateBox(@JvmField val state: Incomplete)
1205-
private fun Any?.boxIncomplete(): Any? = if (this is Incomplete) IncompleteStateBox(this) else this
1210+
internal fun Any?.boxIncomplete(): Any? = if (this is Incomplete) IncompleteStateBox(this) else this
12061211
internal fun Any?.unboxState(): Any? = (this as? IncompleteStateBox)?.state ?: this
12071212

12081213
// --------------- helper classes & constants for job implementation

kotlinx-coroutines-core/common/src/flow/internal/SafeCollector.kt

+18-6
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,26 @@ internal class SafeCollector<T>(
1616
) : FlowCollector<T>, SynchronizedObject() {
1717

1818
private val collectContext = collectContext.minusKey(Job).minusId()
19+
private var lastObservedContext: CoroutineContext? = null
1920

2021
override suspend fun emit(value: T) {
21-
val emitContext = coroutineContext.minusKey(Job).minusId()
22-
if (emitContext != collectContext) {
23-
error(
24-
"Flow invariant is violated: flow was collected in $collectContext, but emission happened in $emitContext. " +
25-
"Please refer to 'flow' documentation or use 'flowOn' instead")
22+
/*
23+
* Benign data-race here:
24+
* We read potentially racy published coroutineContext, but we only use it for
25+
* referential comparison (=> thus safe) and are not using it for actual comparisons.
26+
*/
27+
val currentContext = coroutineContext
28+
if (lastObservedContext !== currentContext) {
29+
val emitContext = currentContext.minusKey(Job).minusId()
30+
if (emitContext != collectContext) {
31+
error(
32+
"Flow invariant is violated: flow was collected in $collectContext, but emission happened in $emitContext. " +
33+
"Please refer to 'flow' documentation or use 'flowOn' instead"
34+
)
35+
}
36+
// Racy publication
37+
lastObservedContext = currentContext
2638
}
27-
collector.emit(value)
39+
collector.emit(value) // TCE
2840
}
2941
}

kotlinx-coroutines-core/common/test/flow/FlowInvariantsTest.kt

+21
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,27 @@ class FlowInvariantsTest : TestBase() {
3434
}
3535
}
3636

37+
@Test
38+
fun testCachedInvariantCheckResult() = runTest {
39+
flow {
40+
emit(1)
41+
42+
try {
43+
kotlinx.coroutines.withContext(NamedDispatchers("foo")) {
44+
emit(1)
45+
}
46+
fail()
47+
} catch (e: IllegalStateException) {
48+
expect(2)
49+
}
50+
51+
emit(3)
52+
}.collect {
53+
expect(it)
54+
}
55+
finish(4)
56+
}
57+
3758
@Test
3859
fun testWithNameContractViolated() = runTest({ it is IllegalStateException }) {
3960
flow {

0 commit comments

Comments
 (0)