Skip to content

Commit 2f87fe4

Browse files
committed
~rename currentContext to currentCoroutineContext, add fusion
1 parent a247fbe commit 2f87fe4

File tree

11 files changed

+40
-34
lines changed

11 files changed

+40
-34
lines changed

kotlinx-coroutines-core/api/kotlinx-coroutines-core.api

+1-1
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ public final class kotlinx/coroutines/CoroutineScopeKt {
214214
public static synthetic fun cancel$default (Lkotlinx/coroutines/CoroutineScope;Ljava/lang/String;Ljava/lang/Throwable;ILjava/lang/Object;)V
215215
public static synthetic fun cancel$default (Lkotlinx/coroutines/CoroutineScope;Ljava/util/concurrent/CancellationException;ILjava/lang/Object;)V
216216
public static final fun coroutineScope (Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
217-
public static final fun currentContext (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
217+
public static final fun currentCoroutineContext (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
218218
public static final fun ensureActive (Lkotlinx/coroutines/CoroutineScope;)V
219219
public static final fun isActive (Lkotlinx/coroutines/CoroutineScope;)Z
220220
public static final fun plus (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/CoroutineScope;

kotlinx-coroutines-core/common/src/CoroutineScope.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -242,10 +242,10 @@ public fun CoroutineScope.ensureActive(): Unit = coroutineContext.ensureActive()
242242
* ```
243243
* launch { // this: CoroutineScope
244244
* val flow = flow<Unit> {
245-
* coroutineContext // Resolves into the context of outer launch, which is incorrect, see KT
246-
* currentContext() // Retrieves actual context whe the flow is collected
245+
* coroutineContext // Resolves into the context of outer launch, which is incorrect, see KT-38033
246+
* currentCoroutineContext() // Retrieves actual context where the flow is collected
247247
* }
248248
* }
249249
* ```
250250
*/
251-
public suspend inline fun currentContext(): CoroutineContext = coroutineContext
251+
public suspend inline fun currentCoroutineContext(): CoroutineContext = coroutineContext

kotlinx-coroutines-core/common/src/flow/Builders.kt

+3-8
Original file line numberDiff line numberDiff line change
@@ -50,14 +50,9 @@ import kotlinx.coroutines.flow.internal.unsafeFlow as flow
5050
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
5151

5252
// Named anonymous object
53-
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : Flow<T> {
54-
override suspend fun collect(collector: FlowCollector<T>) {
55-
val safeCollector = SafeCollector(collector, coroutineContext)
56-
try {
57-
safeCollector.block()
58-
} finally {
59-
safeCollector.releaseIntercepted()
60-
}
53+
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
54+
override suspend fun collectSafely(collector: FlowCollector<T>) {
55+
collector.block()
6156
}
6257
}
6358

kotlinx-coroutines-core/common/src/flow/operators/Context.kt

+8-5
Original file line numberDiff line numberDiff line change
@@ -213,10 +213,13 @@ public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T> {
213213
* the corresponding cancellation cause if flow collector was cancelled.
214214
* Note that [flow] builder is [cancellable] by default.
215215
*/
216-
public fun <T> Flow<T>.cancellable(): Flow<T> = unsafeFlow {
217-
collect {
218-
currentContext().ensureActive()
219-
emit(it)
216+
public fun <T> Flow<T>.cancellable(): Flow<T> {
217+
if (this is AbstractFlow<*>) return this // Fast-path, already cancellable
218+
return unsafeFlow {
219+
collect {
220+
currentCoroutineContext().ensureActive()
221+
emit(it)
222+
}
220223
}
221224
}
222225

@@ -268,7 +271,7 @@ public fun <T, R> Flow<T>.flowWith(
268271
* All builders are written using scoping and no global coroutines are launched, so it is safe not to provide explicit Job.
269272
* It is also necessary not to mess with cancellation if multiple flowWith are used.
270273
*/
271-
val originalContext = currentContext().minusKey(Job)
274+
val originalContext = currentCoroutineContext().minusKey(Job)
272275
val prepared = source.flowOn(originalContext).buffer(bufferSize)
273276
builder(prepared).flowOn(flowContext).buffer(bufferSize).collect { value ->
274277
return@collect emit(value)

kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt

+3-4
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ package kotlinx.coroutines.flow
1010

1111
import kotlinx.coroutines.*
1212
import kotlinx.coroutines.flow.internal.*
13-
import kotlin.coroutines.*
1413
import kotlin.jvm.*
1514

1615
// ------------------ WARNING ------------------
@@ -70,7 +69,7 @@ internal inline fun <T, R> Flow<T>.unsafeTransform(
7069
public fun <T> Flow<T>.onStart(
7170
action: suspend FlowCollector<T>.() -> Unit
7271
): Flow<T> = unsafeFlow { // Note: unsafe flow is used here, but safe collector is used to invoke start action
73-
val safeCollector = SafeCollector<T>(this, currentContext())
72+
val safeCollector = SafeCollector<T>(this, currentCoroutineContext())
7473
try {
7574
safeCollector.action()
7675
} finally {
@@ -153,7 +152,7 @@ public fun <T> Flow<T>.onCompletion(
153152
throw e
154153
}
155154
// Normal completion
156-
SafeCollector(this, currentContext()).invokeSafely(action, null)
155+
SafeCollector(this, currentCoroutineContext()).invokeSafely(action, null)
157156
}
158157

159158
/**
@@ -178,7 +177,7 @@ public fun <T> Flow<T>.onEmpty(
178177
emit(it)
179178
}
180179
if (isEmpty) {
181-
val collector = SafeCollector(this, currentContext())
180+
val collector = SafeCollector(this, currentCoroutineContext())
182181
try {
183182
collector.action()
184183
} finally {

kotlinx-coroutines-core/common/src/flow/operators/Lint.kt

+6-6
Original file line numberDiff line numberDiff line change
@@ -45,28 +45,28 @@ public fun <T> StateFlow<T>.distinctUntilChanged(): Flow<T> = noImpl()
4545

4646
@Deprecated(
4747
message = "isActive is resolved into the extension of outer CoroutineScope which is likely to be an error." +
48-
"Use currentContext().isActive or cancellable() operator instead " +
48+
"Use currentCoroutineContext().isActive or cancellable() operator instead " +
4949
"or specify the receiver of isActive explicitly. " +
5050
"Additionally, flow {} builder emissions are cancellable by default.",
5151
level = DeprecationLevel.WARNING, // ERROR in 1.4
52-
replaceWith = ReplaceWith("currentContext().isActive")
52+
replaceWith = ReplaceWith("currentCoroutineContext().isActive")
5353
)
5454
public val FlowCollector<*>.isActive: Boolean
5555
get() = noImpl()
5656

5757
@Deprecated(
5858
message = "cancel() is resolved into the extension of outer CoroutineScope which is likely to be an error." +
59-
"Use currentContext().cancel() instead or specify the receiver of cancel() explicitly",
59+
"Use currentCoroutineContext().cancel() instead or specify the receiver of cancel() explicitly",
6060
level = DeprecationLevel.WARNING, // ERROR in 1.4
61-
replaceWith = ReplaceWith("currentContext().cancel(cause)")
61+
replaceWith = ReplaceWith("currentCoroutineContext().cancel(cause)")
6262
)
6363
public fun FlowCollector<*>.cancel(cause: CancellationException? = null): Unit = noImpl()
6464

6565
@Deprecated(
6666
message = "coroutineContext is resolved into the property of outer CoroutineScope which is likely to be an error." +
67-
"Use currentContext() instead or specify the receiver of coroutineContext explicitly",
67+
"Use currentCoroutineContext() instead or specify the receiver of coroutineContext explicitly",
6868
level = DeprecationLevel.WARNING, // ERROR in 1.4
69-
replaceWith = ReplaceWith("currentContext()")
69+
replaceWith = ReplaceWith("currentCoroutineContext()")
7070
)
7171
public val FlowCollector<*>.coroutineContext: CoroutineContext
7272
get() = noImpl()

kotlinx-coroutines-core/common/test/flow/operators/CancellableTest.kt

+11-2
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ class CancellableTest : TestBase() {
1515
var sum = 0
1616
val flow = (0..1000).asFlow()
1717
.onEach {
18-
if (it != 0) currentContext().cancel()
18+
if (it != 0) currentCoroutineContext().cancel()
1919
sum += it
2020
}
2121

@@ -26,4 +26,13 @@ class CancellableTest : TestBase() {
2626
flow.cancellable().launchIn(this).join()
2727
assertEquals(1, sum)
2828
}
29-
}
29+
30+
@Test
31+
fun testFastPath() {
32+
val flow = listOf(1).asFlow()
33+
assertNotSame(flow, flow.cancellable())
34+
35+
val cancellableFlow = flow { emit(42) }
36+
assertSame(cancellableFlow, cancellableFlow.cancellable())
37+
}
38+
}

kotlinx-coroutines-core/js/src/flow/internal/SafeCollector.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ internal actual class SafeCollector<T> actual constructor(
1818
private var lastEmissionContext: CoroutineContext? = null
1919

2020
override suspend fun emit(value: T) {
21-
val currentContext = currentContext()
21+
val currentContext = currentCoroutineContext()
2222
currentContext.ensureActive()
2323
if (lastEmissionContext !== currentContext) {
2424
checkContext(currentContext)

kotlinx-coroutines-core/jvm/test/flow/FlowCancellationTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ class FlowCancellationTest : TestBase() {
3333
val job = flow<Unit> {
3434
expect(1)
3535
latch.send(Unit)
36-
while (currentContext().isActive) {
36+
while (currentCoroutineContext().isActive) {
3737
// Do nothing
3838
}
3939
}.launchIn(this + Dispatchers.Default)

kotlinx-coroutines-core/native/src/flow/internal/SafeCollector.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ internal actual class SafeCollector<T> actual constructor(
1818
private var lastEmissionContext: CoroutineContext? = null
1919

2020
override suspend fun emit(value: T) {
21-
val currentContext = currentContext()
21+
val currentContext = currentCoroutineContext()
2222
currentContext.ensureActive()
2323
if (lastEmissionContext !== currentContext) {
2424
checkContext(currentContext)

reactive/kotlinx-coroutines-reactor/test/BackpressureTest.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,13 @@ class BackpressureTest : TestBase() {
4141
@Test
4242
fun testCooperativeCancellation() = runTest {
4343
val flow = Flux.fromIterable((0L..Long.MAX_VALUE)).asFlow()
44-
flow.onEach { if (it > 10) currentContext().cancel() }.launchIn(this + Dispatchers.Default).join()
44+
flow.onEach { if (it > 10) currentCoroutineContext().cancel() }.launchIn(this + Dispatchers.Default).join()
4545
}
4646

4747
@Test
4848
fun testCooperativeCancellationForBuffered() = runTest(expected = { it is CancellationException }) {
4949
val flow = Flux.fromIterable((0L..Long.MAX_VALUE)).asFlow()
50-
val channel = flow.onEach { if (it > 10) currentContext().cancel() }.produceIn(this + Dispatchers.Default)
50+
val channel = flow.onEach { if (it > 10) currentCoroutineContext().cancel() }.produceIn(this + Dispatchers.Default)
5151
channel.consumeEach { /* Do nothing, just consume elements */ }
5252
}
5353
}

0 commit comments

Comments
 (0)