Skip to content

Commit 28ce4fd

Browse files
committed
Make SafeCollector platform-specific declaration and enforce exception transparency invariant on JVM
* Make it in allocation-free manner by using crafty trick with casting KSuspendFunction to Function and pass reusable object as a completion Fixes #1657
1 parent 09cb4bf commit 28ce4fd

File tree

9 files changed

+398
-126
lines changed

9 files changed

+398
-126
lines changed

kotlinx-coroutines-core/common/src/flow/Builders.kt

+6-1
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,12 @@ public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit
5151
// Named anonymous object
5252
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : Flow<T> {
5353
override suspend fun collect(collector: FlowCollector<T>) {
54-
SafeCollector(collector, coroutineContext).block()
54+
val safeCollector = SafeCollector(collector, coroutineContext)
55+
try {
56+
safeCollector.block()
57+
} finally {
58+
safeCollector.release()
59+
}
5560
}
5661
}
5762

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.flow.internal
6+
7+
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.flow.*
9+
import kotlinx.coroutines.internal.ScopeCoroutine
10+
import kotlin.coroutines.*
11+
import kotlin.jvm.*
12+
13+
internal expect class SafeCollector<T>(
14+
collector: FlowCollector<T>,
15+
collectContext: CoroutineContext
16+
) : FlowCollector<T> {
17+
internal val collector: FlowCollector<T>
18+
internal val collectContext: CoroutineContext
19+
internal val collectContextSize: Int
20+
public fun release()
21+
}
22+
23+
@JvmName("checkContext") // For prettier stack traces
24+
internal fun SafeCollector<*>.checkContext(currentContext: CoroutineContext) {
25+
val result = currentContext.fold(0) fold@{ count, element ->
26+
val key = element.key
27+
val collectElement = collectContext[key]
28+
if (key !== Job) {
29+
return@fold if (element !== collectElement) Int.MIN_VALUE
30+
else count + 1
31+
}
32+
33+
val collectJob = collectElement as Job?
34+
val emissionParentJob = (element as Job).transitiveCoroutineParent(collectJob)
35+
/*
36+
* Code like
37+
* ```
38+
* coroutineScope {
39+
* launch {
40+
* emit(1)
41+
* }
42+
*
43+
* launch {
44+
* emit(2)
45+
* }
46+
* }
47+
* ```
48+
* is prohibited because 'emit' is not thread-safe by default. Use 'channelFlow' instead if you need concurrent emission
49+
* or want to switch context dynamically (e.g. with `withContext`).
50+
*
51+
* Note that collecting from another coroutine is allowed, e.g.:
52+
* ```
53+
* coroutineScope {
54+
* val channel = produce {
55+
* collect { value ->
56+
* send(value)
57+
* }
58+
* }
59+
* channel.consumeEach { value ->
60+
* emit(value)
61+
* }
62+
* }
63+
* ```
64+
* is a completely valid.
65+
*/
66+
if (emissionParentJob !== collectJob) {
67+
error(
68+
"Flow invariant is violated:\n" +
69+
"\t\tEmission from another coroutine is detected.\n" +
70+
"\t\tChild of $emissionParentJob, expected child of $collectJob.\n" +
71+
"\t\tFlowCollector is not thread-safe and concurrent emissions are prohibited.\n" +
72+
"\t\tTo mitigate this restriction please use 'channelFlow' builder instead of 'flow'"
73+
)
74+
}
75+
76+
/*
77+
* If collect job is null (-> EmptyCoroutineContext, probably run from `suspend fun main`), then invariant is maintained
78+
* (common transitive parent is "null"), but count check will fail, so just do not count job context element when
79+
* flow is collected from EmptyCoroutineContext
80+
*/
81+
if (collectJob == null) count else count + 1
82+
}
83+
if (result != collectContextSize) {
84+
error(
85+
"Flow invariant is violated:\n" +
86+
"\t\tFlow was collected in $collectContext,\n" +
87+
"\t\tbut emission happened in $currentContext.\n" +
88+
"\t\tPlease refer to 'flow' documentation or use 'flowOn' instead"
89+
)
90+
}
91+
}
92+
93+
@JvmName("checkContext") // For prettier stack traces
94+
internal tailrec fun Job?.transitiveCoroutineParent(collectJob: Job?): Job? {
95+
if (this === null) return null
96+
if (this === collectJob) return this
97+
if (this !is ScopeCoroutine<*>) return this
98+
return parent.transitiveCoroutineParent(collectJob)
99+
}
100+
101+
/**
102+
* An analogue of the [flow] builder that does not check the context of execution of the resulting flow.
103+
* Used in our own operators where we trust the context of invocations.
104+
*/
105+
@PublishedApi
106+
internal inline fun <T> unsafeFlow(@BuilderInference crossinline block: suspend FlowCollector<T>.() -> Unit): Flow<T> {
107+
return object : Flow<T> {
108+
override suspend fun collect(collector: FlowCollector<T>) {
109+
collector.block()
110+
}
111+
}
112+
}

kotlinx-coroutines-core/common/src/flow/internal/SafeCollector.kt

-124
This file was deleted.

kotlinx-coroutines-core/common/test/TestBase.common.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public suspend inline fun <reified T : Throwable> assertFailsWith(flow: Flow<*>)
5050
flow.collect()
5151
fail("Should be unreached")
5252
} catch (e: Throwable) {
53-
assertTrue(e is T)
53+
assertTrue(e is T, "Expected exception ${T::class}, but had $e instead")
5454
}
5555
}
5656

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.flow
6+
7+
import kotlinx.coroutines.*
8+
import kotlin.test.*
9+
10+
class SafeFlowTest : TestBase() {
11+
12+
@Test
13+
fun testEmissionsFromDifferentStateMachine() = runTest {
14+
val result = flow<Int> {
15+
emit1(1)
16+
emit2(2)
17+
}.onEach { yield() }.toList()
18+
assertEquals(listOf(1, 2), result)
19+
finish(3)
20+
}
21+
22+
private suspend fun FlowCollector<Int>.emit1(expect: Int) {
23+
emit(expect)
24+
expect(expect)
25+
}
26+
27+
private suspend fun FlowCollector<Int>.emit2(expect: Int) {
28+
emit(expect)
29+
expect(expect)
30+
}
31+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.flow.internal
6+
7+
import kotlinx.coroutines.flow.*
8+
import kotlin.coroutines.*
9+
10+
internal actual class SafeCollector<T> actual constructor(
11+
internal actual val collector: FlowCollector<T>,
12+
internal actual val collectContext: CoroutineContext
13+
) : FlowCollector<T> {
14+
15+
// Note, it is non-capturing lambda, so no extra allocation during init of SafeCollector
16+
internal actual val collectContextSize = collectContext.fold(0) { count, _ -> count + 1 }
17+
private var lastEmissionContext: CoroutineContext? = null
18+
19+
override suspend fun emit(value: T) {
20+
val currentContext = coroutineContext
21+
if (lastEmissionContext !== currentContext) {
22+
checkContext(currentContext)
23+
lastEmissionContext = currentContext
24+
}
25+
collector.emit(value)
26+
}
27+
28+
public actual fun release() {
29+
}
30+
}

0 commit comments

Comments
 (0)