Skip to content

Commit 684a97b

Browse files
committed
Use regular produce instead of flowProduce in channelFlow
Concurrent work is already properly decomposed and does not expose an "partial cancellation" behaviour as other operators may do Fixes #1334
1 parent 131c320 commit 684a97b

File tree

5 files changed

+25
-29
lines changed

5 files changed

+25
-29
lines changed

kotlinx-coroutines-core/common/src/channels/Produce.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ public fun <E> CoroutineScope.produce(
126126
return coroutine
127127
}
128128

129-
internal open class ProducerCoroutine<E>(
129+
private class ProducerCoroutine<E>(
130130
parentContext: CoroutineContext, channel: Channel<E>
131131
) : ChannelCoroutine<E>(parentContext, channel, active = true), ProducerScope<E> {
132132
override val isActive: Boolean

kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public abstract class ChannelFlow<T>(
6868
scope.broadcast(context, produceCapacity, start, block = collectToFun)
6969

7070
open fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> =
71-
scope.flowProduce(context, produceCapacity, block = collectToFun)
71+
scope.produce(context, produceCapacity, block = collectToFun)
7272

7373
override suspend fun collect(collector: FlowCollector<T>) =
7474
coroutineScope {

kotlinx-coroutines-core/common/src/flow/internal/FlowCoroutine.kt

-24
Original file line numberDiff line numberDiff line change
@@ -52,20 +52,6 @@ internal fun <R> scopedFlow(@BuilderInference block: suspend CoroutineScope.(Flo
5252
flowScope { block(collector) }
5353
}
5454

55-
/*
56-
* Shortcut for produce { flowScope {block() } }
57-
*/
58-
internal fun <T> CoroutineScope.flowProduce(
59-
context: CoroutineContext,
60-
capacity: Int = 0, @BuilderInference block: suspend ProducerScope<T>.() -> Unit
61-
): ReceiveChannel<T> {
62-
val channel = Channel<T>(capacity)
63-
val newContext = newCoroutineContext(context)
64-
val coroutine = FlowProduceCoroutine(newContext, channel)
65-
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
66-
return coroutine
67-
}
68-
6955
private class FlowCoroutine<T>(
7056
context: CoroutineContext,
7157
uCont: Continuation<T>
@@ -75,13 +61,3 @@ private class FlowCoroutine<T>(
7561
return cancelImpl(cause)
7662
}
7763
}
78-
79-
private class FlowProduceCoroutine<T>(
80-
parentContext: CoroutineContext,
81-
channel: Channel<T>
82-
) : ProducerCoroutine<T>(parentContext, channel) {
83-
public override fun childCancelled(cause: Throwable): Boolean {
84-
if (cause is ChildCancelledException) return true
85-
return cancelImpl(cause)
86-
}
87-
}

kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt

+21-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ class ChannelFlowTest : TestBase() {
2929
assertEquals(listOf(1, 2), flow.toList())
3030
}
3131

32-
// todo: this is pretty useless behavior
3332
@Test
3433
fun testConflated() = runTest {
3534
val flow = channelFlow {
@@ -114,6 +113,7 @@ class ChannelFlowTest : TestBase() {
114113
}
115114

116115
@Test
116+
@Ignore // #1374
117117
fun testBufferWithTimeout() = runTest {
118118
fun Flow<Int>.bufferWithTimeout(): Flow<Int> = channelFlow {
119119
expect(2)
@@ -140,4 +140,24 @@ class ChannelFlowTest : TestBase() {
140140
assertFailsWith<TimeoutCancellationException>(flow)
141141
finish(6)
142142
}
143+
144+
@Test
145+
fun testChildCancellation() = runTest {
146+
channelFlow {
147+
val job = launch {
148+
expect(2)
149+
hang { expect(4) }
150+
}
151+
expect(1)
152+
yield()
153+
expect(3)
154+
job.cancelAndJoin()
155+
send(5)
156+
157+
}.collect {
158+
expect(it)
159+
}
160+
161+
finish(6)
162+
}
143163
}

kotlinx-coroutines-core/jvm/test/flow/CallbackFlowTest.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ class CallbackFlowTest : TestBase() {
8383
}
8484
}
8585

86-
val flow = channelFlow<Int> {
86+
val flow = callbackFlow<Int>() {
8787
api.start(channel)
8888
awaitClose {
8989
api.stop()
@@ -118,7 +118,7 @@ class CallbackFlowTest : TestBase() {
118118
}
119119
}
120120

121-
private fun Flow<Int>.merge(other: Flow<Int>): Flow<Int> = channelFlow {
121+
private fun Flow<Int>.merge(other: Flow<Int>): Flow<Int> = callbackFlow {
122122
launch {
123123
collect { send(it) }
124124
}

0 commit comments

Comments
 (0)