Skip to content

Commit 50787bf

Browse files
committed
Code style + rebase
1 parent 602ec61 commit 50787bf

File tree

4 files changed

+17
-11
lines changed

4 files changed

+17
-11
lines changed

kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -63,10 +63,10 @@ internal abstract class ChannelFlow<T>(
6363
scope.broadcast(context, produceCapacity, start, block = collectToFun)
6464

6565
fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> =
66-
scope.produce(context, produceCapacity, block = collectToFun)
66+
scope.flowProduce(context, produceCapacity, block = collectToFun)
6767

6868
override suspend fun collect(collector: FlowCollector<T>) =
69-
coroutineScope { // todo: flowScope
69+
coroutineScope {
7070
val channel = produceImpl(this)
7171
channel.consumeEach { collector.emit(it) }
7272
}

kotlinx-coroutines-core/common/src/flow/internal/FlowCoroutine.kt

+14-7
Original file line numberDiff line numberDiff line change
@@ -55,26 +55,33 @@ internal fun <R> scopedFlow(@BuilderInference block: suspend CoroutineScope.(Flo
5555
/*
5656
* Shortcut for produce { flowScope {block() } }
5757
*/
58-
internal fun <T> CoroutineScope.flowProduce(capacity: Int = 0, @BuilderInference block: suspend ProducerScope<T>.() -> Unit): ReceiveChannel<T> {
58+
internal fun <T> CoroutineScope.flowProduce(
59+
context: CoroutineContext,
60+
capacity: Int = 0, @BuilderInference block: suspend ProducerScope<T>.() -> Unit
61+
): ReceiveChannel<T> {
5962
val channel = Channel<T>(capacity)
60-
val newContext = newCoroutineContext(EmptyCoroutineContext) // To have a default dispatcher and coroutine id
63+
val newContext = newCoroutineContext(context)
6164
val coroutine = FlowProduceCoroutine(newContext, channel)
6265
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
6366
return coroutine
64-
6567
}
6668

67-
private class FlowCoroutine<T>(context: CoroutineContext, uCont: Continuation<T>) :
68-
ScopeCoroutine<T>(context, uCont) {
69+
private class FlowCoroutine<T>(
70+
context: CoroutineContext,
71+
uCont: Continuation<T>
72+
) : ScopeCoroutine<T>(context, uCont) {
6973

7074
public override fun childCancelled(cause: Throwable): Boolean {
7175
if (cause is ChildCancelledException) return true
7276
return cancelImpl(cause)
7377
}
7478
}
7579

76-
private class FlowProduceCoroutine<T>(parentContext: CoroutineContext, channel: Channel<T>) :
77-
ProducerCoroutine<T>(parentContext, channel) {
80+
private class FlowProduceCoroutine<T>(
81+
parentContext: CoroutineContext,
82+
channel: Channel<T>
83+
) : ProducerCoroutine<T>(parentContext, channel) {
84+
7885
public override fun childCancelled(cause: Throwable): Boolean {
7986
if (cause is ChildCancelledException) return true
8087
return cancelImpl(cause)

kotlinx-coroutines-core/common/src/flow/operators/Merge.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ private class ChannelFlowMerge<T>(
174174
override suspend fun flowCollect(collector: FlowCollector<T>) {
175175
// this function should not have been invoked when channel was explicitly requested
176176
check(capacity == OPTIONAL_CHANNEL)
177-
coroutineScope { // todo: flowScope
177+
flowScope {
178178
mergeImpl(this, collector.asConcurrentFlowCollector())
179179
}
180180
}

kotlinx-coroutines-core/common/test/flow/channels/FlowCallbackTest.kt

-1
Original file line numberDiff line numberDiff line change
@@ -45,4 +45,3 @@ class FlowCallbackTest : TestBase() {
4545
finish(3)
4646
}
4747
}
48-

0 commit comments

Comments
 (0)