Skip to content

Commit 103e3d5

Browse files
qwwdfsaddkhalanskyjb
authored andcommitted
Introduce SharedFlow collect overload and override that return Nothing (Kotlin#2803)
* Introduce SharedFlow collect overload and override that return Nothing * Override will ensure the proper implementation of the interface * collect extension is added as the very basic lint helper Fixes Kotlin#2789 Fixes Kotlin#2502 Co-authored-by: dkhalanskyjb <[email protected]>
1 parent b485e1b commit 103e3d5

File tree

4 files changed

+30
-2
lines changed

4 files changed

+30
-2
lines changed

kotlinx-coroutines-core/api/kotlinx-coroutines-core.api

+2
Original file line numberDiff line numberDiff line change
@@ -902,6 +902,7 @@ public final class kotlinx/coroutines/flow/FlowKt {
902902
public static final fun channelFlow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
903903
public static final fun collect (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
904904
public static final fun collect (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
905+
public static final fun collect (Lkotlinx/coroutines/flow/SharedFlow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
905906
public static final fun collectIndexed (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
906907
public static final fun collectLatest (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
907908
public static final fun combine (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
@@ -1063,6 +1064,7 @@ public abstract interface class kotlinx/coroutines/flow/MutableStateFlow : kotli
10631064
}
10641065

10651066
public abstract interface class kotlinx/coroutines/flow/SharedFlow : kotlinx/coroutines/flow/Flow {
1067+
public abstract fun collect (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
10661068
public abstract fun getReplayCache ()Ljava/util/List;
10671069
}
10681070

kotlinx-coroutines-core/common/src/flow/SharedFlow.kt

+14-1
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,19 @@ public interface SharedFlow<out T> : Flow<T> {
129129
* A snapshot of the replay cache.
130130
*/
131131
public val replayCache: List<T>
132+
133+
/**
134+
* Accepts the given [collector] and [emits][FlowCollector.emit] values into it.
135+
* This method should never be used directly. To emit values from a shared flow into a specific collector, either `collector.emitAll(flow)` or `collect { ... }` extension
136+
* should be used.
137+
*
138+
* **A shared flow never completes**. A call to [Flow.collect] or any other terminal operator
139+
* on a shared flow never completes normally.
140+
*
141+
* @see [Flow.collect]
142+
*/
143+
@InternalCoroutinesApi
144+
override suspend fun collect(collector: FlowCollector<T>): Nothing
132145
}
133146

134147
/**
@@ -344,7 +357,7 @@ internal open class SharedFlowImpl<T>(
344357
get() = buffer!!.getBufferAt(replayIndex + replaySize - 1) as T
345358

346359
@Suppress("UNCHECKED_CAST")
347-
override suspend fun collect(collector: FlowCollector<T>) {
360+
override suspend fun collect(collector: FlowCollector<T>): Nothing {
348361
val slot = allocateSlot()
349362
try {
350363
if (collector is SubscribedFlowCollector) collector.onSubscription()

kotlinx-coroutines-core/common/src/flow/StateFlow.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -380,7 +380,7 @@ private class StateFlowImpl<T>(
380380
throw UnsupportedOperationException("MutableStateFlow.resetReplayCache is not supported")
381381
}
382382

383-
override suspend fun collect(collector: FlowCollector<T>) {
383+
override suspend fun collect(collector: FlowCollector<T>): Nothing {
384384
val slot = allocateSlot()
385385
try {
386386
if (collector is SubscribedFlowCollector) collector.onSubscription()

kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt

+13
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,19 @@ public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value
7373
override suspend fun emit(value: T) = action(value)
7474
})
7575

76+
/**
77+
* Terminal flow operator that collects the given [SharedFlow] with the provided [action].
78+
* If any exception occurs during `collect` or in the provided flow, this exception is rethrown from this method.
79+
*
80+
* This is a counterpart of a regular [Flow.collect] extension, only different in the return type
81+
* so that any code below `collect` produces a compilation warning.
82+
*/
83+
public suspend inline fun <T> SharedFlow<T>.collect(crossinline action: suspend (value: T) -> Unit): Nothing {
84+
collect(object : FlowCollector<T> {
85+
override suspend fun emit(value: T) = action(value)
86+
})
87+
}
88+
7689
/**
7790
* Terminal flow operator that collects the given flow with a provided [action] that takes the index of an element (zero-based) and the element.
7891
* If any exception occurs during collect or in the provided flow, this exception is rethrown from this method.

0 commit comments

Comments
 (0)