Skip to content

Commit 1901a16

Browse files
committed
Initial prototype for public SafeCollector with Throwable.isFromDownstream (JVM-only yet) with explicit guarantees on context/exception transparency enforcement
Pre-design for #2860
1 parent e123c8a commit 1901a16

File tree

12 files changed

+100
-25
lines changed

12 files changed

+100
-25
lines changed

kotlinx-coroutines-core/api/kotlinx-coroutines-core.api

+11-1
Original file line numberDiff line numberDiff line change
@@ -861,7 +861,12 @@ public final class kotlinx/coroutines/debug/internal/DebuggerInfo : java/io/Seri
861861
public abstract class kotlinx/coroutines/flow/AbstractFlow : kotlinx/coroutines/flow/CancellableFlow, kotlinx/coroutines/flow/Flow {
862862
public fun <init> ()V
863863
public final fun collect (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
864-
public abstract fun collectSafely (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
864+
public abstract fun collectSafely (Lkotlinx/coroutines/flow/SafeCollector;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
865+
}
866+
867+
public abstract interface class kotlinx/coroutines/flow/ExceptionAwareFlowCollector : kotlinx/coroutines/flow/FlowCollector {
868+
public abstract synthetic fun doNotImplementMe ()V
869+
public abstract fun isFromDownstream (Ljava/lang/Throwable;)Z
865870
}
866871

867872
public abstract interface class kotlinx/coroutines/flow/Flow {
@@ -1060,6 +1065,11 @@ public abstract interface class kotlinx/coroutines/flow/MutableStateFlow : kotli
10601065
public abstract fun setValue (Ljava/lang/Object;)V
10611066
}
10621067

1068+
public abstract interface class kotlinx/coroutines/flow/SafeCollector : kotlinx/coroutines/flow/FlowCollector {
1069+
public abstract synthetic fun doNotImplementMe ()V
1070+
public abstract fun isFromDownstream (Ljava/lang/Throwable;)Z
1071+
}
1072+
10631073
public abstract interface class kotlinx/coroutines/flow/SharedFlow : kotlinx/coroutines/flow/Flow {
10641074
public abstract fun getReplayCache ()Ljava/util/List;
10651075
}

kotlinx-coroutines-core/common/src/flow/Builders.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -53,11 +53,11 @@ import kotlinx.coroutines.flow.internal.unsafeFlow as flow
5353
*
5454
* If you want to switch the context of execution of a flow, use the [flowOn] operator.
5555
*/
56-
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
56+
public fun <T> flow(@BuilderInference block: suspend SafeCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
5757

5858
// Named anonymous object
59-
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
60-
override suspend fun collectSafely(collector: FlowCollector<T>) {
59+
private class SafeFlow<T>(private val block: suspend SafeCollector<T>.() -> Unit) : AbstractFlow<T>() {
60+
override suspend fun collectSafely(collector: SafeCollector<T>) {
6161
collector.block()
6262
}
6363
}

kotlinx-coroutines-core/common/src/flow/Flow.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> {
207207

208208
@InternalCoroutinesApi
209209
public final override suspend fun collect(collector: FlowCollector<T>) {
210-
val safeCollector = SafeCollector(collector, coroutineContext)
210+
val safeCollector = SafeCollectorImpl(collector, coroutineContext)
211211
try {
212212
collectSafely(safeCollector)
213213
} finally {
@@ -228,5 +228,5 @@ public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> {
228228
*
229229
* @throws IllegalStateException if any of the invariants are violated.
230230
*/
231-
public abstract suspend fun collectSafely(collector: FlowCollector<T>)
231+
public abstract suspend fun collectSafely(collector: SafeCollector<T>)
232232
}

kotlinx-coroutines-core/common/src/flow/FlowCollector.kt

+10
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,13 @@ public interface FlowCollector<in T> {
1919
*/
2020
public suspend fun emit(value: T)
2121
}
22+
23+
// Name TBD
24+
// Contracts: exception transparency preservation, context transparency preservation, isFromDownstream
25+
public interface SafeCollector<in T> : FlowCollector<T> {
26+
27+
public fun Throwable.isFromDownstream(): Boolean
28+
29+
@Deprecated(message = "", level = DeprecationLevel.HIDDEN)
30+
public fun doNotImplementMe()
31+
}

kotlinx-coroutines-core/common/src/flow/internal/SafeCollector.common.kt

+9-3
Original file line numberDiff line numberDiff line change
@@ -10,18 +10,18 @@ import kotlinx.coroutines.internal.ScopeCoroutine
1010
import kotlin.coroutines.*
1111
import kotlin.jvm.*
1212

13-
internal expect class SafeCollector<T>(
13+
internal expect class SafeCollectorImpl<T>(
1414
collector: FlowCollector<T>,
1515
collectContext: CoroutineContext
16-
) : FlowCollector<T> {
16+
) : SafeCollector<T> {
1717
internal val collector: FlowCollector<T>
1818
internal val collectContext: CoroutineContext
1919
internal val collectContextSize: Int
2020
public fun releaseIntercepted()
2121
}
2222

2323
@JvmName("checkContext") // For prettier stack traces
24-
internal fun SafeCollector<*>.checkContext(currentContext: CoroutineContext) {
24+
internal fun SafeCollectorImpl<*>.checkContext(currentContext: CoroutineContext) {
2525
val result = currentContext.fold(0) fold@{ count, element ->
2626
val key = element.key
2727
val collectElement = collectContext[key]
@@ -109,3 +109,9 @@ internal inline fun <T> unsafeFlow(@BuilderInference crossinline block: suspend
109109
}
110110
}
111111
}
112+
113+
private class DownstreamExceptionElement(@JvmField val e: Throwable) : CoroutineContext.Element {
114+
companion object Key : CoroutineContext.Key<DownstreamExceptionElement>
115+
116+
override val key: CoroutineContext.Key<*> = Key
117+
}

kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ internal inline fun <T, R> Flow<T>.unsafeTransform(
7474
public fun <T> Flow<T>.onStart(
7575
action: suspend FlowCollector<T>.() -> Unit
7676
): Flow<T> = unsafeFlow { // Note: unsafe flow is used here, but safe collector is used to invoke start action
77-
val safeCollector = SafeCollector<T>(this, currentCoroutineContext())
77+
val safeCollector = SafeCollectorImpl<T>(this, currentCoroutineContext())
7878
try {
7979
safeCollector.action()
8080
} finally {
@@ -156,7 +156,7 @@ public fun <T> Flow<T>.onCompletion(
156156
throw e
157157
}
158158
// Normal completion
159-
val sc = SafeCollector(this, currentCoroutineContext())
159+
val sc = SafeCollectorImpl(this, currentCoroutineContext())
160160
try {
161161
sc.action(null)
162162
} finally {
@@ -185,7 +185,7 @@ public fun <T> Flow<T>.onEmpty(
185185
emit(it)
186186
}
187187
if (isEmpty) {
188-
val collector = SafeCollector(this, currentCoroutineContext())
188+
val collector = SafeCollectorImpl(this, currentCoroutineContext())
189189
try {
190190
collector.action()
191191
} finally {

kotlinx-coroutines-core/common/src/flow/operators/Share.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -405,7 +405,7 @@ internal class SubscribedFlowCollector<T>(
405405
private val action: suspend FlowCollector<T>.() -> Unit
406406
) : FlowCollector<T> by collector {
407407
suspend fun onSubscription() {
408-
val safeCollector = SafeCollector(collector, currentCoroutineContext())
408+
val safeCollector = SafeCollectorImpl(collector, currentCoroutineContext())
409409
try {
410410
safeCollector.action()
411411
} finally {

kotlinx-coroutines-core/common/test/flow/FlowInvariantsTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ class FlowInvariantsTest : TestBase() {
2525
}
2626

2727
private fun <T> abstractFlow(block: suspend FlowCollector<T>.() -> Unit): Flow<T> = object : AbstractFlow<T>() {
28-
override suspend fun collectSafely(collector: FlowCollector<T>) {
28+
override suspend fun collectSafely(collector: SafeCollector<T>) {
2929
collector.block()
3030
}
3131
}

kotlinx-coroutines-core/js/src/flow/internal/SafeCollector.kt renamed to kotlinx-coroutines-core/js/src/flow/internal/SafeCollectorImpl.kt

+11-2
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,10 @@ import kotlinx.coroutines.*
88
import kotlinx.coroutines.flow.*
99
import kotlin.coroutines.*
1010

11-
internal actual class SafeCollector<T> actual constructor(
11+
internal actual class SafeCollectorImpl<T> actual constructor(
1212
internal actual val collector: FlowCollector<T>,
1313
internal actual val collectContext: CoroutineContext
14-
) : FlowCollector<T> {
14+
) : SafeCollector<T> {
1515

1616
// Note, it is non-capturing lambda, so no extra allocation during init of SafeCollector
1717
internal actual val collectContextSize = collectContext.fold(0) { count, _ -> count + 1 }
@@ -27,6 +27,15 @@ internal actual class SafeCollector<T> actual constructor(
2727
collector.emit(value)
2828
}
2929

30+
override fun Throwable.isFromDownstream(): Boolean {
31+
// TODO
32+
return true
33+
}
34+
35+
override fun doNotImplementMe() {
36+
TODO()
37+
}
38+
3039
public actual fun releaseIntercepted() {
3140
}
3241
}

kotlinx-coroutines-core/jvm/src/flow/internal/SafeCollector.kt renamed to kotlinx-coroutines-core/jvm/src/flow/internal/SafeCollectorImpl.kt

+9-7
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@ private val emitFun =
1818
* in order to properly control 'intercepted()' lifecycle.
1919
*/
2020
@Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER", "INVISIBLE_MEMBER", "INVISIBLE_REFERENCE", "UNCHECKED_CAST")
21-
internal actual class SafeCollector<T> actual constructor(
21+
internal actual class SafeCollectorImpl<T> actual constructor(
2222
@JvmField internal actual val collector: FlowCollector<T>,
2323
@JvmField internal actual val collectContext: CoroutineContext
24-
) : FlowCollector<T>, ContinuationImpl(NoOpContinuation, EmptyCoroutineContext), CoroutineStackFrame {
24+
) : SafeCollector<T>, ContinuationImpl(NoOpContinuation, EmptyCoroutineContext), CoroutineStackFrame {
2525

2626
override val callerFrame: CoroutineStackFrame? get() = completion as? CoroutineStackFrame
2727

@@ -123,12 +123,14 @@ internal actual class SafeCollector<T> actual constructor(
123123
""".trimIndent())
124124
}
125125

126-
}
127-
128-
internal class DownstreamExceptionElement(@JvmField val e: Throwable) : CoroutineContext.Element {
129-
companion object Key : CoroutineContext.Key<DownstreamExceptionElement>
126+
override fun Throwable.isFromDownstream(): Boolean {
127+
val ctx = lastEmissionContext
128+
return ctx is DownstreamExceptionElement && ctx.e == this // TODO also check stracktrace recovery
129+
}
130130

131-
override val key: CoroutineContext.Key<*> = Key
131+
override fun doNotImplementMe() {
132+
TODO()
133+
}
132134
}
133135

134136
private object NoOpContinuation : Continuation<Any?> {

kotlinx-coroutines-core/jvm/test/flow/ExceptionTransparencyTest.kt

+28
Original file line numberDiff line numberDiff line change
@@ -74,4 +74,32 @@ class ExceptionTransparencyTest : TestBase() {
7474
assertTrue { e.message!!.contains("channelFlow") }
7575
finish(3)
7676
}
77+
78+
@Test
79+
fun testExamples() = runTest {
80+
try {
81+
flow {
82+
try {
83+
emit(42)
84+
throw TestException()
85+
} catch (e: Throwable) {
86+
println("E1 " + e.isFromDownstream())
87+
}
88+
}.collect()
89+
} catch (e: TestException) {
90+
println("Caught")
91+
}
92+
93+
try {
94+
flow {
95+
try {
96+
emit(42)
97+
} catch (e: Throwable) {
98+
println("E2 " + e.isFromDownstream())
99+
}
100+
}.onEach { throw TestException() }.collect()
101+
} catch (e: TestException) {
102+
println("Caught 2")
103+
}
104+
}
77105
}

kotlinx-coroutines-core/native/src/flow/internal/SafeCollector.kt renamed to kotlinx-coroutines-core/native/src/flow/internal/SafeCollectorImpl.kt

+12-2
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,10 @@ import kotlinx.coroutines.*
88
import kotlinx.coroutines.flow.*
99
import kotlin.coroutines.*
1010

11-
internal actual class SafeCollector<T> actual constructor(
11+
internal actual class SafeCollectorImpl<T> actual constructor(
1212
internal actual val collector: FlowCollector<T>,
1313
internal actual val collectContext: CoroutineContext
14-
) : FlowCollector<T> {
14+
) : SafeCollector<T> {
1515

1616
// Note, it is non-capturing lambda, so no extra allocation during init of SafeCollector
1717
internal actual val collectContextSize = collectContext.fold(0) { count, _ -> count + 1 }
@@ -24,9 +24,19 @@ internal actual class SafeCollector<T> actual constructor(
2424
checkContext(currentContext)
2525
lastEmissionContext = currentContext
2626
}
27+
2728
collector.emit(value)
2829
}
2930

31+
override fun Throwable.isFromDownstream(): Boolean {
32+
// TODO
33+
return true
34+
}
35+
36+
override fun doNotImplementMe() {
37+
TODO()
38+
}
39+
3040
public actual fun releaseIntercepted() {
3141
}
3242
}

0 commit comments

Comments
 (0)