Skip to content

Commit da9159c

Browse files
committed
Introduce SharedFlow and sharing operators
Summary of changes: * SharedFlow, MutableSharedFlow and its constructor. * StateFlow implements SharedFlow. * SharedFlow.onSubscription operator, clarified docs in other onXxx operators. * BufferOverflow strategy in kotlinx.coroutines.channels package. * shareIn and stateIn operators and SharingStarted strategies for them. * SharedFlow.flowOn error lint (up from StateFlow). * Precise cancellable() operator fusion. * Precise distinctUntilChanged() operator fusion. * StateFlow.compareAndSet function. * asStateFlow and asSharedFlow read-only view functions. * Consistently clarified docs on cold vs hot flows. * Future deprecation notice for BroadcastChannel, ConflatedBroadcastChannel, broadcast, and broadcastIn. This is a DRAFT. Parts that are not done yet: * BufferOverflow strategy support in channels. * Sharing operators conflation with preceding buffer(...) operator and the corresponding tests. * Better functional tests for SharingStarted strategies. * Test cancellability of shared flows. * Add reactive operator migration hints. Fixes #2034 Fixes #2047
1 parent 3a8a0ea commit da9159c

23 files changed

+2516
-153
lines changed

kotlinx-coroutines-core/api/kotlinx-coroutines-core.api

+62-4
Original file line numberDiff line numberDiff line change
@@ -578,6 +578,14 @@ public final class kotlinx/coroutines/channels/BroadcastKt {
578578
public static synthetic fun broadcast$default (Lkotlinx/coroutines/channels/ReceiveChannel;ILkotlinx/coroutines/CoroutineStart;ILjava/lang/Object;)Lkotlinx/coroutines/channels/BroadcastChannel;
579579
}
580580

581+
public final class kotlinx/coroutines/channels/BufferOverflow : java/lang/Enum {
582+
public static final field DROP_LATEST Lkotlinx/coroutines/channels/BufferOverflow;
583+
public static final field KEEP_LATEST Lkotlinx/coroutines/channels/BufferOverflow;
584+
public static final field SUSPEND Lkotlinx/coroutines/channels/BufferOverflow;
585+
public static fun valueOf (Ljava/lang/String;)Lkotlinx/coroutines/channels/BufferOverflow;
586+
public static fun values ()[Lkotlinx/coroutines/channels/BufferOverflow;
587+
}
588+
581589
public abstract interface class kotlinx/coroutines/channels/Channel : kotlinx/coroutines/channels/ReceiveChannel, kotlinx/coroutines/channels/SendChannel {
582590
public static final field BUFFERED I
583591
public static final field CONFLATED I
@@ -842,7 +850,7 @@ public synthetic class kotlinx/coroutines/debug/internal/DebugProbesImplSequence
842850
public fun <init> (J)V
843851
}
844852

845-
public abstract class kotlinx/coroutines/flow/AbstractFlow : kotlinx/coroutines/flow/Flow {
853+
public abstract class kotlinx/coroutines/flow/AbstractFlow : kotlinx/coroutines/flow/CancellableFlow, kotlinx/coroutines/flow/Flow {
846854
public fun <init> ()V
847855
public final fun collect (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
848856
public abstract fun collectSafely (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
@@ -869,6 +877,8 @@ public final class kotlinx/coroutines/flow/FlowKt {
869877
public static final fun asFlow ([I)Lkotlinx/coroutines/flow/Flow;
870878
public static final fun asFlow ([J)Lkotlinx/coroutines/flow/Flow;
871879
public static final fun asFlow ([Ljava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
880+
public static final fun asSharedFlow (Lkotlinx/coroutines/flow/MutableSharedFlow;)Lkotlinx/coroutines/flow/SharedFlow;
881+
public static final fun asStateFlow (Lkotlinx/coroutines/flow/MutableStateFlow;)Lkotlinx/coroutines/flow/StateFlow;
872882
public static final fun broadcastIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/CoroutineStart;)Lkotlinx/coroutines/channels/BroadcastChannel;
873883
public static synthetic fun broadcastIn$default (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/CoroutineStart;ILjava/lang/Object;)Lkotlinx/coroutines/channels/BroadcastChannel;
874884
public static final fun buffer (Lkotlinx/coroutines/flow/Flow;I)Lkotlinx/coroutines/flow/Flow;
@@ -962,6 +972,7 @@ public final class kotlinx/coroutines/flow/FlowKt {
962972
public static final fun onErrorReturn (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
963973
public static synthetic fun onErrorReturn$default (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
964974
public static final fun onStart (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
975+
public static final fun onSubscription (Lkotlinx/coroutines/flow/SharedFlow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/SharedFlow;
965976
public static final fun produceIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;)Lkotlinx/coroutines/channels/ReceiveChannel;
966977
public static final fun publishOn (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/flow/Flow;
967978
public static final fun receiveAsFlow (Lkotlinx/coroutines/channels/ReceiveChannel;)Lkotlinx/coroutines/flow/Flow;
@@ -976,11 +987,16 @@ public final class kotlinx/coroutines/flow/FlowKt {
976987
public static final fun scan (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
977988
public static final fun scanFold (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
978989
public static final fun scanReduce (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
990+
public static final fun shareIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;ILkotlinx/coroutines/flow/SharingStarted;Ljava/lang/Object;)Lkotlinx/coroutines/flow/SharedFlow;
991+
public static synthetic fun shareIn$default (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;ILkotlinx/coroutines/flow/SharingStarted;Ljava/lang/Object;ILjava/lang/Object;)Lkotlinx/coroutines/flow/SharedFlow;
979992
public static final fun single (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
980993
public static final fun singleOrNull (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
981994
public static final fun skip (Lkotlinx/coroutines/flow/Flow;I)Lkotlinx/coroutines/flow/Flow;
982995
public static final fun startWith (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
983996
public static final fun startWith (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
997+
public static final fun stateIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
998+
public static final fun stateIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/flow/SharingStarted;Ljava/lang/Object;)Lkotlinx/coroutines/flow/StateFlow;
999+
public static synthetic fun stateIn$default (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/flow/SharingStarted;Ljava/lang/Object;ILjava/lang/Object;)Lkotlinx/coroutines/flow/StateFlow;
9841000
public static final fun subscribe (Lkotlinx/coroutines/flow/Flow;)V
9851001
public static final fun subscribe (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)V
9861002
public static final fun subscribe (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;)V
@@ -1001,17 +1017,55 @@ public final class kotlinx/coroutines/flow/FlowKt {
10011017
}
10021018

10031019
public final class kotlinx/coroutines/flow/LintKt {
1020+
public static final fun cancellable (Lkotlinx/coroutines/flow/SharedFlow;)Lkotlinx/coroutines/flow/Flow;
10041021
public static final fun conflate (Lkotlinx/coroutines/flow/StateFlow;)Lkotlinx/coroutines/flow/Flow;
10051022
public static final fun distinctUntilChanged (Lkotlinx/coroutines/flow/StateFlow;)Lkotlinx/coroutines/flow/Flow;
1006-
public static final fun flowOn (Lkotlinx/coroutines/flow/StateFlow;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/flow/Flow;
1023+
public static final fun flowOn (Lkotlinx/coroutines/flow/SharedFlow;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/flow/Flow;
1024+
}
1025+
1026+
public abstract interface class kotlinx/coroutines/flow/MutableSharedFlow : kotlinx/coroutines/flow/FlowCollector, kotlinx/coroutines/flow/SharedFlow {
1027+
public abstract fun getSubscriptionCount ()Lkotlinx/coroutines/flow/StateFlow;
1028+
public abstract fun resetBuffer ()V
1029+
public abstract fun tryEmit (Ljava/lang/Object;)Z
10071030
}
10081031

1009-
public abstract interface class kotlinx/coroutines/flow/MutableStateFlow : kotlinx/coroutines/flow/StateFlow {
1032+
public abstract interface class kotlinx/coroutines/flow/MutableStateFlow : kotlinx/coroutines/flow/MutableSharedFlow, kotlinx/coroutines/flow/StateFlow {
1033+
public abstract fun compareAndSet (Ljava/lang/Object;Ljava/lang/Object;)Z
10101034
public abstract fun getValue ()Ljava/lang/Object;
10111035
public abstract fun setValue (Ljava/lang/Object;)V
10121036
}
10131037

1014-
public abstract interface class kotlinx/coroutines/flow/StateFlow : kotlinx/coroutines/flow/Flow {
1038+
public abstract interface class kotlinx/coroutines/flow/SharedFlow : kotlinx/coroutines/flow/Flow {
1039+
public abstract fun getReplayCache ()Ljava/util/List;
1040+
}
1041+
1042+
public final class kotlinx/coroutines/flow/SharedFlowKt {
1043+
public static final fun MutableSharedFlow (IILkotlinx/coroutines/channels/BufferOverflow;Ljava/lang/Object;)Lkotlinx/coroutines/flow/MutableSharedFlow;
1044+
public static synthetic fun MutableSharedFlow$default (IILkotlinx/coroutines/channels/BufferOverflow;Ljava/lang/Object;ILjava/lang/Object;)Lkotlinx/coroutines/flow/MutableSharedFlow;
1045+
}
1046+
1047+
public final class kotlinx/coroutines/flow/SharingCommand : java/lang/Enum {
1048+
public static final field START Lkotlinx/coroutines/flow/SharingCommand;
1049+
public static final field STOP Lkotlinx/coroutines/flow/SharingCommand;
1050+
public static final field STOP_AND_RESET_BUFFER Lkotlinx/coroutines/flow/SharingCommand;
1051+
public static fun valueOf (Ljava/lang/String;)Lkotlinx/coroutines/flow/SharingCommand;
1052+
public static fun values ()[Lkotlinx/coroutines/flow/SharingCommand;
1053+
}
1054+
1055+
public abstract interface class kotlinx/coroutines/flow/SharingStarted {
1056+
public static final field Companion Lkotlinx/coroutines/flow/SharingStarted$Companion;
1057+
public abstract fun commandFlow (Lkotlinx/coroutines/flow/StateFlow;)Lkotlinx/coroutines/flow/Flow;
1058+
}
1059+
1060+
public final class kotlinx/coroutines/flow/SharingStarted$Companion {
1061+
public final fun WhileSubscribed (JJ)Lkotlinx/coroutines/flow/SharingStarted;
1062+
public static synthetic fun WhileSubscribed$default (Lkotlinx/coroutines/flow/SharingStarted$Companion;JJILjava/lang/Object;)Lkotlinx/coroutines/flow/SharingStarted;
1063+
public final fun getEagerly ()Lkotlinx/coroutines/flow/SharingStarted;
1064+
public final fun getLazily ()Lkotlinx/coroutines/flow/SharingStarted;
1065+
public final fun getWhileSubscribed ()Lkotlinx/coroutines/flow/SharingStarted;
1066+
}
1067+
1068+
public abstract interface class kotlinx/coroutines/flow/StateFlow : kotlinx/coroutines/flow/SharedFlow {
10151069
public abstract fun getValue ()Ljava/lang/Object;
10161070
}
10171071

@@ -1037,6 +1091,10 @@ public final class kotlinx/coroutines/flow/internal/CombineKt {
10371091
public static final fun combineInternal (Lkotlinx/coroutines/flow/FlowCollector;[Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
10381092
}
10391093

1094+
public final class kotlinx/coroutines/flow/internal/DistinctFlowKt {
1095+
public static final fun unsafeDistinctFlow (ZLkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
1096+
}
1097+
10401098
public final class kotlinx/coroutines/flow/internal/FlowExceptions_commonKt {
10411099
public static final fun checkIndexOverflow (I)I
10421100
}

kotlinx-coroutines-core/common/src/channels/Broadcast.kt

+4-3
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
1010
import kotlinx.coroutines.intrinsics.*
1111
import kotlin.coroutines.*
1212
import kotlin.coroutines.intrinsics.*
13-
import kotlin.native.concurrent.*
1413

1514
/**
1615
* Broadcasts all elements of the channel.
@@ -34,8 +33,10 @@ import kotlin.native.concurrent.*
3433
*
3534
* This function has an inappropriate result type of [BroadcastChannel] which provides
3635
* [send][BroadcastChannel.send] and [close][BroadcastChannel.close] operations that interfere with
37-
* the broadcasting coroutine in hard-to-specify ways. It will be replaced with
38-
* sharing operators on [Flow][kotlinx.coroutines.flow.Flow] in the future.
36+
* the broadcasting coroutine in hard-to-specify ways.
37+
*
38+
* **Note: This API is obsolete.** It will be deprecated and replaced with
39+
* [Flow.shareIn][kotlinx.coroutines.flow.shareIn] operator when it becomes stable.
3940
*
4041
* @param start coroutine start option. The default value is [CoroutineStart.LAZY].
4142
*/

kotlinx-coroutines-core/common/src/channels/BroadcastChannel.kt

+4-3
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@
77
package kotlinx.coroutines.channels
88

99
import kotlinx.coroutines.*
10-
import kotlinx.coroutines.channels.Channel.Factory.CONFLATED
1110
import kotlinx.coroutines.channels.Channel.Factory.BUFFERED
1211
import kotlinx.coroutines.channels.Channel.Factory.CHANNEL_DEFAULT_CAPACITY
12+
import kotlinx.coroutines.channels.Channel.Factory.CONFLATED
1313
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
1414

1515
/**
@@ -20,9 +20,10 @@ import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
2020
* See `BroadcastChannel()` factory function for the description of available
2121
* broadcast channel implementations.
2222
*
23-
* **Note: This is an experimental api.** It may be changed in the future updates.
23+
* **Note: This API is obsolete.** It will be deprecated and replaced by [SharedFlow][kotlinx.coroutines.flow.SharedFlow]
24+
* when it becomes stable.
2425
*/
25-
@ExperimentalCoroutinesApi
26+
@ExperimentalCoroutinesApi // not @ObsoleteCoroutinesApi to reduce burden for people who are still using it
2627
public interface BroadcastChannel<E> : SendChannel<E> {
2728
/**
2829
* Subscribes to this [BroadcastChannel] and returns a channel to receive elements from it.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.channels
6+
7+
import kotlinx.coroutines.*
8+
9+
/**
10+
* A strategy for buffer overflow handling in [channels][Channel] and [flows][kotlinx.coroutines.flow.Flow] that
11+
* controls behavior on buffer overflow and typically defaults to [SUSPEND].
12+
*/
13+
@ExperimentalCoroutinesApi
14+
public enum class BufferOverflow {
15+
/**
16+
* Suspend on buffer overflow.
17+
*/
18+
SUSPEND,
19+
20+
/**
21+
* Keep latest value on buffer overflow, drop oldest, do not suspend.
22+
*/
23+
KEEP_LATEST,
24+
25+
/**
26+
* Drop latest value on buffer overflow, keep oldest, do not suspend.
27+
*/
28+
DROP_LATEST
29+
}

kotlinx-coroutines-core/common/src/channels/ConflatedBroadcastChannel.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import kotlinx.coroutines.internal.*
1010
import kotlinx.coroutines.intrinsics.*
1111
import kotlinx.coroutines.selects.*
1212
import kotlin.jvm.*
13-
import kotlin.native.concurrent.*
1413

1514
/**
1615
* Broadcasts the most recently sent element (aka [value]) to all [openSubscription] subscribers.
@@ -27,9 +26,10 @@ import kotlin.native.concurrent.*
2726
* [opening][openSubscription] and [closing][ReceiveChannel.cancel] subscription takes O(N) time, where N is the
2827
* number of subscribers.
2928
*
30-
* **Note: This API is experimental.** It may be changed in the future updates.
29+
* **Note: This API is obsolete.** It will be deprecated and replaced by [StateFlow][kotlinx.coroutines.flow.StateFlow]
30+
* when it becomes stable.
3131
*/
32-
@ExperimentalCoroutinesApi
32+
@ExperimentalCoroutinesApi // not @ObsoleteCoroutinesApi to reduce burden for people who are still using it
3333
public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
3434
/**
3535
* Creates an instance of this class that already holds a value.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.flow
6+
7+
import kotlinx.coroutines.internal.*
8+
import kotlin.coroutines.*
9+
10+
// Note: Always guarantee FIFO processing of slots by keeping a doubly-linked list of them
11+
internal abstract class AbstractSharedFlowSlot<F> {
12+
abstract fun allocateLocked(flow: F): Boolean
13+
abstract fun freeLocked(flow: F): List<Continuation<Unit>>? // returns a list of continuation to resume after lock
14+
}
15+
16+
internal abstract class AbstractSharedFlow<S : AbstractSharedFlowSlot<*>> : SynchronizedObject() {
17+
@Suppress("UNCHECKED_CAST")
18+
protected var slots: Array<S?>? = null // allocated when needed
19+
private set
20+
protected var nCollectors = 0 // number of allocated (!free) slots
21+
private set
22+
private var nextIndex = 0 // oracle for the next free slot index
23+
private var _subscriptionCount: MutableStateFlow<Int>? = null // init on first need
24+
25+
val subscriptionCount: StateFlow<Int>
26+
get() = synchronized(this) {
27+
// allocate under lock in sync with nCollectors variable
28+
_subscriptionCount ?: MutableStateFlow(nCollectors).also {
29+
_subscriptionCount = it
30+
}
31+
}
32+
33+
protected abstract fun createSlot(): S
34+
35+
protected abstract fun createSlotArray(size: Int): Array<S?>
36+
37+
@Suppress("UNCHECKED_CAST")
38+
protected fun allocateSlot(): S {
39+
// Actually create slot under lock
40+
var subscriptionCount: MutableStateFlow<Int>? = null
41+
val slot = synchronized(this) {
42+
val slots = when(val curSlots = slots) {
43+
null -> createSlotArray(2).also { slots = it }
44+
else -> if (nCollectors >= curSlots.size) {
45+
curSlots.copyOf(2 * curSlots.size).also { slots = it }
46+
} else {
47+
curSlots
48+
}
49+
}
50+
var index = nextIndex
51+
var slot: S
52+
while (true) {
53+
slot = slots[index] ?: createSlot().also { slots[index] = it }
54+
index++
55+
if (index >= slots.size) index = 0
56+
if ((slot as AbstractSharedFlowSlot<Any>).allocateLocked(this)) break // break when found and allocated free slot
57+
}
58+
nextIndex = index
59+
nCollectors++
60+
subscriptionCount = _subscriptionCount // retrieve under lock if initialized
61+
slot
62+
}
63+
// increments subscription count
64+
subscriptionCount?.increment(1)
65+
return slot
66+
}
67+
68+
@Suppress("UNCHECKED_CAST")
69+
protected fun freeSlot(slot: S) {
70+
// Release slot under lock
71+
var subscriptionCount: MutableStateFlow<Int>? = null
72+
val resumeList = synchronized(this) {
73+
nCollectors--
74+
subscriptionCount = _subscriptionCount // retrieve under lock if initialized
75+
// Reset next index oracle if we have no more active collectors for more predictable behavior next time
76+
if (nCollectors == 0) nextIndex = 0
77+
(slot as AbstractSharedFlowSlot<Any>).freeLocked(this)
78+
}
79+
// Resume suspended coroutines
80+
resumeList?.forEach { it.resume(Unit) }
81+
// decrement subscription count
82+
subscriptionCount?.increment(-1)
83+
}
84+
85+
protected inline fun forEachSlotLocked(block: (S) -> Unit) {
86+
if (nCollectors == 0) return
87+
slots?.forEach { slot ->
88+
if (slot != null) block(slot)
89+
}
90+
}
91+
}

0 commit comments

Comments
 (0)