Skip to content

Commit de491d2

Browse files
authored
Make SafeCollector platform-specific declaration and enforce exceptio… (#1793)
* Make SafeCollector platform-specific declaration and enforce exception transparency invariant on JVM * Make it in an allocation-free manner by using a crafty trick with casting KSuspendFunction to Function and pass a reusable object as a completion Fixes #1657
1 parent b64a23b commit de491d2

File tree

13 files changed

+446
-156
lines changed

13 files changed

+446
-156
lines changed

kotlinx-coroutines-core/api/kotlinx-coroutines-core.api

+1-1
Original file line numberDiff line numberDiff line change
@@ -1001,7 +1001,7 @@ public final class kotlinx/coroutines/flow/internal/FlowExceptions_commonKt {
10011001
public static final fun checkIndexOverflow (I)I
10021002
}
10031003

1004-
public final class kotlinx/coroutines/flow/internal/SafeCollectorKt {
1004+
public final class kotlinx/coroutines/flow/internal/SafeCollector_commonKt {
10051005
public static final fun unsafeFlow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
10061006
}
10071007

kotlinx-coroutines-core/common/src/flow/Builders.kt

+6-1
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,12 @@ public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit
5151
// Named anonymous object
5252
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : Flow<T> {
5353
override suspend fun collect(collector: FlowCollector<T>) {
54-
SafeCollector(collector, coroutineContext).block()
54+
val safeCollector = SafeCollector(collector, coroutineContext)
55+
try {
56+
safeCollector.block()
57+
} finally {
58+
safeCollector.releaseIntercepted()
59+
}
5560
}
5661
}
5762

kotlinx-coroutines-core/common/src/flow/Flow.kt

+9-4
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
package kotlinx.coroutines.flow
66

77
import kotlinx.coroutines.*
8-
import kotlinx.coroutines.flow.internal.SafeCollector
8+
import kotlinx.coroutines.flow.internal.*
99
import kotlin.coroutines.*
1010

1111
/**
@@ -149,8 +149,8 @@ import kotlin.coroutines.*
149149
* it hard to reason about the code because an exception in the `collect { ... }` could be somehow "caught"
150150
* by an upstream flow, limiting the ability of local reasoning about the code.
151151
*
152-
* Currently, the flow infrastructure does not enforce exception transparency contracts, however, it might be enforced
153-
* in the future either at run time or at compile time.
152+
* Flow machinery enforces exception transparency at runtime and throws [IllegalStateException] on any attempt to emit a value,
153+
* if an exception has been thrown on previous attempt.
154154
*
155155
* ### Reactive streams
156156
*
@@ -199,7 +199,12 @@ public abstract class AbstractFlow<T> : Flow<T> {
199199

200200
@InternalCoroutinesApi
201201
public final override suspend fun collect(collector: FlowCollector<T>) {
202-
collectSafely(SafeCollector(collector, collectContext = coroutineContext))
202+
val safeCollector = SafeCollector(collector, coroutineContext)
203+
try {
204+
collectSafely(safeCollector)
205+
} finally {
206+
safeCollector.releaseIntercepted()
207+
}
203208
}
204209

205210
/**
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.flow.internal
6+
7+
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.flow.*
9+
import kotlinx.coroutines.internal.ScopeCoroutine
10+
import kotlin.coroutines.*
11+
import kotlin.jvm.*
12+
13+
internal expect class SafeCollector<T>(
14+
collector: FlowCollector<T>,
15+
collectContext: CoroutineContext
16+
) : FlowCollector<T> {
17+
internal val collector: FlowCollector<T>
18+
internal val collectContext: CoroutineContext
19+
internal val collectContextSize: Int
20+
public fun releaseIntercepted()
21+
}
22+
23+
@JvmName("checkContext") // For prettier stack traces
24+
internal fun SafeCollector<*>.checkContext(currentContext: CoroutineContext) {
25+
val result = currentContext.fold(0) fold@{ count, element ->
26+
val key = element.key
27+
val collectElement = collectContext[key]
28+
if (key !== Job) {
29+
return@fold if (element !== collectElement) Int.MIN_VALUE
30+
else count + 1
31+
}
32+
33+
val collectJob = collectElement as Job?
34+
val emissionParentJob = (element as Job).transitiveCoroutineParent(collectJob)
35+
/*
36+
* Code like
37+
* ```
38+
* coroutineScope {
39+
* launch {
40+
* emit(1)
41+
* }
42+
*
43+
* launch {
44+
* emit(2)
45+
* }
46+
* }
47+
* ```
48+
* is prohibited because 'emit' is not thread-safe by default. Use 'channelFlow' instead if you need concurrent emission
49+
* or want to switch context dynamically (e.g. with `withContext`).
50+
*
51+
* Note that collecting from another coroutine is allowed, e.g.:
52+
* ```
53+
* coroutineScope {
54+
* val channel = produce {
55+
* collect { value ->
56+
* send(value)
57+
* }
58+
* }
59+
* channel.consumeEach { value ->
60+
* emit(value)
61+
* }
62+
* }
63+
* ```
64+
* is a completely valid.
65+
*/
66+
if (emissionParentJob !== collectJob) {
67+
error(
68+
"Flow invariant is violated:\n" +
69+
"\t\tEmission from another coroutine is detected.\n" +
70+
"\t\tChild of $emissionParentJob, expected child of $collectJob.\n" +
71+
"\t\tFlowCollector is not thread-safe and concurrent emissions are prohibited.\n" +
72+
"\t\tTo mitigate this restriction please use 'channelFlow' builder instead of 'flow'"
73+
)
74+
}
75+
76+
/*
77+
* If collect job is null (-> EmptyCoroutineContext, probably run from `suspend fun main`), then invariant is maintained
78+
* (common transitive parent is "null"), but count check will fail, so just do not count job context element when
79+
* flow is collected from EmptyCoroutineContext
80+
*/
81+
if (collectJob == null) count else count + 1
82+
}
83+
if (result != collectContextSize) {
84+
error(
85+
"Flow invariant is violated:\n" +
86+
"\t\tFlow was collected in $collectContext,\n" +
87+
"\t\tbut emission happened in $currentContext.\n" +
88+
"\t\tPlease refer to 'flow' documentation or use 'flowOn' instead"
89+
)
90+
}
91+
}
92+
93+
internal tailrec fun Job?.transitiveCoroutineParent(collectJob: Job?): Job? {
94+
if (this === null) return null
95+
if (this === collectJob) return this
96+
if (this !is ScopeCoroutine<*>) return this
97+
return parent.transitiveCoroutineParent(collectJob)
98+
}
99+
100+
/**
101+
* An analogue of the [flow] builder that does not check the context of execution of the resulting flow.
102+
* Used in our own operators where we trust the context of invocations.
103+
*/
104+
@PublishedApi
105+
internal inline fun <T> unsafeFlow(@BuilderInference crossinline block: suspend FlowCollector<T>.() -> Unit): Flow<T> {
106+
return object : Flow<T> {
107+
override suspend fun collect(collector: FlowCollector<T>) {
108+
collector.block()
109+
}
110+
}
111+
}

kotlinx-coroutines-core/common/src/flow/internal/SafeCollector.kt

-124
This file was deleted.

kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt

+12-2
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,12 @@ internal inline fun <T, R> Flow<T>.unsafeTransform(
7171
public fun <T> Flow<T>.onStart(
7272
action: suspend FlowCollector<T>.() -> Unit
7373
): Flow<T> = unsafeFlow { // Note: unsafe flow is used here, but safe collector is used to invoke start action
74-
SafeCollector<T>(this, coroutineContext).action()
74+
val safeCollector = SafeCollector<T>(this, coroutineContext)
75+
try {
76+
safeCollector.action()
77+
} finally {
78+
safeCollector.releaseIntercepted()
79+
}
7580
collect(this) // directly delegate
7681
}
7782

@@ -141,7 +146,12 @@ public fun <T> Flow<T>.onCompletion(
141146
throw e
142147
}
143148
// Exception from the upstream or normal completion
144-
SafeCollector(this, coroutineContext).invokeSafely(action, exception)
149+
val safeCollector = SafeCollector(this, coroutineContext)
150+
try {
151+
safeCollector.invokeSafely(action, exception)
152+
} finally {
153+
safeCollector.releaseIntercepted()
154+
}
145155
exception?.let { throw it }
146156
}
147157

kotlinx-coroutines-core/common/test/TestBase.common.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public suspend inline fun <reified T : Throwable> assertFailsWith(flow: Flow<*>)
5050
flow.collect()
5151
fail("Should be unreached")
5252
} catch (e: Throwable) {
53-
assertTrue(e is T)
53+
assertTrue(e is T, "Expected exception ${T::class}, but had $e instead")
5454
}
5555
}
5656

kotlinx-coroutines-core/common/test/flow/FlowInvariantsTest.kt

+4-23
Original file line numberDiff line numberDiff line change
@@ -59,25 +59,6 @@ class FlowInvariantsTest : TestBase() {
5959
}
6060
}
6161

62-
@Test
63-
fun testCachedInvariantCheckResult() = runParametrizedTest<Int> { flow ->
64-
flow {
65-
emit(1)
66-
try {
67-
withContext(NamedDispatchers("foo")) {
68-
emit(1)
69-
}
70-
fail()
71-
} catch (e: IllegalStateException) {
72-
expect(2)
73-
}
74-
emit(3)
75-
}.collect {
76-
expect(it)
77-
}
78-
finish(4)
79-
}
80-
8162
@Test
8263
fun testWithNameContractViolated() = runParametrizedTest<Int>(IllegalStateException::class) { flow ->
8364
flow {
@@ -146,9 +127,9 @@ class FlowInvariantsTest : TestBase() {
146127
}
147128
}
148129

149-
val flow = flowOf(1)
150-
assertFailsWith<IllegalStateException> { flow.merge(flow).toList() }
151-
assertFailsWith<IllegalStateException> { flow.trickyMerge(flow).toList() }
130+
val flowInstance = flowOf(1)
131+
assertFailsWith<IllegalStateException> { flowInstance.merge(flowInstance).toList() }
132+
assertFailsWith<IllegalStateException> { flowInstance.trickyMerge(flowInstance).toList() }
152133
}
153134

154135
@Test
@@ -237,7 +218,7 @@ class FlowInvariantsTest : TestBase() {
237218
emptyContextTest {
238219
transform {
239220
expect(it)
240-
kotlinx.coroutines.withContext(Dispatchers.Unconfined) {
221+
withContext(Dispatchers.Unconfined) {
241222
emit(it + 1)
242223
}
243224
}

0 commit comments

Comments
 (0)