Skip to content

Commit 8c83d34

Browse files
committed
~ No default for shareIn/stateIn started parameter
1 parent 3028679 commit 8c83d34

File tree

8 files changed

+46
-50
lines changed

8 files changed

+46
-50
lines changed

kotlinx-coroutines-core/api/kotlinx-coroutines-core.api

-2
Original file line numberDiff line numberDiff line change
@@ -1025,15 +1025,13 @@ public final class kotlinx/coroutines/flow/FlowKt {
10251025
public static final fun scanFold (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
10261026
public static final fun scanReduce (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
10271027
public static final fun shareIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;ILkotlinx/coroutines/flow/SharingStarted;)Lkotlinx/coroutines/flow/SharedFlow;
1028-
public static synthetic fun shareIn$default (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;ILkotlinx/coroutines/flow/SharingStarted;ILjava/lang/Object;)Lkotlinx/coroutines/flow/SharedFlow;
10291028
public static final fun single (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
10301029
public static final fun singleOrNull (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
10311030
public static final fun skip (Lkotlinx/coroutines/flow/Flow;I)Lkotlinx/coroutines/flow/Flow;
10321031
public static final fun startWith (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
10331032
public static final fun startWith (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
10341033
public static final fun stateIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
10351034
public static final fun stateIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/flow/SharingStarted;Ljava/lang/Object;)Lkotlinx/coroutines/flow/StateFlow;
1036-
public static synthetic fun stateIn$default (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/flow/SharingStarted;Ljava/lang/Object;ILjava/lang/Object;)Lkotlinx/coroutines/flow/StateFlow;
10371035
public static final fun subscribe (Lkotlinx/coroutines/flow/Flow;)V
10381036
public static final fun subscribe (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)V
10391037
public static final fun subscribe (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;)V

kotlinx-coroutines-core/common/src/flow/operators/Share.kt

+9-12
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,12 @@ import kotlin.jvm.*
2121
* and replaying a specified number of [replay] values to new subscribers. See the [SharedFlow] documentation
2222
* for the general concepts of shared flows.
2323
*
24-
* The starting of the sharing coroutine is controlled by the [started] parameter. By default, the sharing coroutine is started
25-
* [Eagerly][SharingStarted.Eagerly], so the upstream flow is started even before the first subscribers appear. Note
26-
* that in this case all values emitted by the upstream beyond the most recent values as specified by
27-
* [replay] parameter **will be immediately discarded**.
28-
*
29-
* Additional options for the [started] parameter are:
24+
* The starting of the sharing coroutine is controlled by the [started] parameter. The following options
25+
* are supported.
3026
*
27+
* * [Eagerly][SharingStarted.Eagerly] — the upstream flow is started even before the first subscriber appears. Note
28+
* that in this case all values emitted by the upstream beyond the most recent values as specified by
29+
* [replay] parameter **will be immediately discarded**.
3130
* * [Lazily][SharingStarted.Lazily] — starts the upstream flow after the first subscriber appears, which guarantees
3231
* that this first subscriber gets all the emitted values, while subsequent subscribers are only guaranteed to
3332
* get the most recent [replay] values. The upstream flow continues to be active even when all subscribers
@@ -131,14 +130,13 @@ import kotlin.jvm.*
131130
*
132131
* @param scope the coroutine scope in which sharing is started.
133132
* @param replay the number of values replayed to new subscribers (cannot be negative).
134-
* @param started the strategy that controls when sharing is started and stopped
135-
* (optional, default to [Eagerly][SharingStarted.Eagerly] starting the sharing without waiting for subscribers).
133+
* @param started the strategy that controls when sharing is started and stopped.
136134
*/
137135
@ExperimentalCoroutinesApi
138136
public fun <T> Flow<T>.shareIn(
139137
scope: CoroutineScope,
140138
replay: Int,
141-
started: SharingStarted = SharingStarted.Eagerly
139+
started: SharingStarted
142140
): SharedFlow<T> {
143141
val config = configureSharing(replay)
144142
val shared = MutableSharedFlow<T>(
@@ -279,16 +277,15 @@ private fun <T> CoroutineScope.launchSharing(
279277
* [distinctUntilChanged][Flow.distinctUntilChanged], or [cancellable] operators to a state flow has no effect.
280278
*
281279
* @param scope the coroutine scope in which sharing is started.
282-
* @param started the strategy that controls when sharing is started and stopped
283-
* (optional, default to [Eagerly][SharingStarted.Eagerly] starting the sharing without waiting for subscribers).
280+
* @param started the strategy that controls when sharing is started and stopped.
284281
* @param initialValue the initial value of the state flow.
285282
* This value is also used when the state flow is reset using the [SharingStarted.WhileSubscribed] strategy
286283
* with the `replayExpirationMillis` parameter.
287284
*/
288285
@ExperimentalCoroutinesApi
289286
public fun <T> Flow<T>.stateIn(
290287
scope: CoroutineScope,
291-
started: SharingStarted = SharingStarted.Eagerly,
288+
started: SharingStarted,
292289
initialValue: T
293290
): StateFlow<T> {
294291
val config = configureSharing(1)

kotlinx-coroutines-core/common/test/flow/sharing/ShareInBufferTest.kt

+8-8
Original file line numberDiff line numberDiff line change
@@ -51,48 +51,48 @@ class ShareInBufferTest : TestBase() {
5151
@Test
5252
fun testReplay0DefaultBuffer() =
5353
checkBuffer(defaultBufferSize) {
54-
shareIn(it, 0)
54+
shareIn(it, 0, SharingStarted.Eagerly)
5555
}
5656

5757
@Test
5858
fun testReplay1DefaultBuffer() =
5959
checkBuffer(defaultBufferSize) {
60-
shareIn(it, 1)
60+
shareIn(it, 1, SharingStarted.Eagerly)
6161
}
6262

6363
@Test // buffer is padded to default size as needed
6464
fun testReplay10DefaultBuffer() =
6565
checkBuffer(maxOf(10, defaultBufferSize)) {
66-
shareIn(it, 10)
66+
shareIn(it, 10, SharingStarted.Eagerly)
6767
}
6868

6969
@Test // buffer is padded to default size as needed
7070
fun testReplay100DefaultBuffer() =
7171
checkBuffer( maxOf(100, defaultBufferSize)) {
72-
shareIn(it, 100)
72+
shareIn(it, 100, SharingStarted.Eagerly)
7373
}
7474

7575
@Test
7676
fun testDefaultBufferKeepsDefault() =
7777
checkBuffer(defaultBufferSize) {
78-
buffer().shareIn(it, 0)
78+
buffer().shareIn(it, 0, SharingStarted.Eagerly)
7979
}
8080

8181
@Test
8282
fun testOverrideDefaultBuffer0() =
8383
checkBuffer(0) {
84-
buffer(0).shareIn(it, 0)
84+
buffer(0).shareIn(it, 0, SharingStarted.Eagerly)
8585
}
8686

8787
@Test
8888
fun testOverrideDefaultBuffer10() =
8989
checkBuffer(10) {
90-
buffer(10).shareIn(it, 0)
90+
buffer(10).shareIn(it, 0, SharingStarted.Eagerly)
9191
}
9292

9393
@Test // buffer and replay sizes add up
9494
fun testBufferReplaySum() =
9595
checkBuffer(41) {
96-
buffer(10).buffer(20).shareIn(it, 11)
96+
buffer(10).buffer(20).shareIn(it, 11, SharingStarted.Eagerly)
9797
}
9898
}

kotlinx-coroutines-core/common/test/flow/sharing/ShareInConflationTest.kt

+19-19
Original file line numberDiff line numberDiff line change
@@ -49,114 +49,114 @@ class ShareInConflationTest : TestBase() {
4949
@Test
5050
fun testConflateReplay1() =
5151
checkConflation(1) {
52-
conflate().shareIn(it, 1)
52+
conflate().shareIn(it, 1, SharingStarted.Eagerly)
5353
}
5454

5555
@Test // still looks like conflating the last value for the first subscriber (will not replay to others though)
5656
fun testConflateReplay0() =
5757
checkConflation(1) {
58-
conflate().shareIn(it, 0)
58+
conflate().shareIn(it, 0, SharingStarted.Eagerly)
5959
}
6060

6161
@Test
6262
fun testConflateReplay5() =
6363
checkConflation(5) {
64-
conflate().shareIn(it, 5)
64+
conflate().shareIn(it, 5, SharingStarted.Eagerly)
6565
}
6666

6767
@Test
6868
fun testBufferDropOldestReplay1() =
6969
checkConflation(1) {
70-
buffer(onBufferOverflow = BufferOverflow.DROP_OLDEST).shareIn(it, 1)
70+
buffer(onBufferOverflow = BufferOverflow.DROP_OLDEST).shareIn(it, 1, SharingStarted.Eagerly)
7171
}
7272

7373
@Test
7474
fun testBufferDropOldestReplay0() =
7575
checkConflation(1) {
76-
buffer(onBufferOverflow = BufferOverflow.DROP_OLDEST).shareIn(it, 0)
76+
buffer(onBufferOverflow = BufferOverflow.DROP_OLDEST).shareIn(it, 0, SharingStarted.Eagerly)
7777
}
7878

7979
@Test
8080
fun testBufferDropOldestReplay10() =
8181
checkConflation(10) {
82-
buffer(onBufferOverflow = BufferOverflow.DROP_OLDEST).shareIn(it, 10)
82+
buffer(onBufferOverflow = BufferOverflow.DROP_OLDEST).shareIn(it, 10, SharingStarted.Eagerly)
8383
}
8484

8585
@Test
8686
fun testBuffer20DropOldestReplay0() =
8787
checkConflation(20) {
88-
buffer(20, onBufferOverflow = BufferOverflow.DROP_OLDEST).shareIn(it, 0)
88+
buffer(20, onBufferOverflow = BufferOverflow.DROP_OLDEST).shareIn(it, 0, SharingStarted.Eagerly)
8989
}
9090

9191
@Test
9292
fun testBuffer7DropOldestReplay11() =
9393
checkConflation(18) {
94-
buffer(7, onBufferOverflow = BufferOverflow.DROP_OLDEST).shareIn(it, 11)
94+
buffer(7, onBufferOverflow = BufferOverflow.DROP_OLDEST).shareIn(it, 11, SharingStarted.Eagerly)
9595
}
9696

9797
@Test // a preceding buffer() gets overridden by conflate()
9898
fun testBufferConflateOverride() =
9999
checkConflation(1) {
100-
buffer(23).conflate().shareIn(it, 1)
100+
buffer(23).conflate().shareIn(it, 1, SharingStarted.Eagerly)
101101
}
102102

103103
@Test // a preceding buffer() gets overridden by buffer(onBufferOverflow = BufferOverflow.DROP_OLDEST)
104104
fun testBufferDropOldestOverride() =
105105
checkConflation(1) {
106-
buffer(23).buffer(onBufferOverflow = BufferOverflow.DROP_OLDEST).shareIn(it, 1)
106+
buffer(23).buffer(onBufferOverflow = BufferOverflow.DROP_OLDEST).shareIn(it, 1, SharingStarted.Eagerly)
107107
}
108108

109109
@Test
110110
fun testBufferDropLatestReplay0() =
111111
checkConflation(1, BufferOverflow.DROP_LATEST) {
112-
buffer(onBufferOverflow = BufferOverflow.DROP_LATEST).shareIn(it, 0)
112+
buffer(onBufferOverflow = BufferOverflow.DROP_LATEST).shareIn(it, 0, SharingStarted.Eagerly)
113113
}
114114

115115
@Test
116116
fun testBufferDropLatestReplay1() =
117117
checkConflation(1, BufferOverflow.DROP_LATEST) {
118-
buffer(onBufferOverflow = BufferOverflow.DROP_LATEST).shareIn(it, 1)
118+
buffer(onBufferOverflow = BufferOverflow.DROP_LATEST).shareIn(it, 1, SharingStarted.Eagerly)
119119
}
120120

121121
@Test
122122
fun testBufferDropLatestReplay10() =
123123
checkConflation(10, BufferOverflow.DROP_LATEST) {
124-
buffer(onBufferOverflow = BufferOverflow.DROP_LATEST).shareIn(it, 10)
124+
buffer(onBufferOverflow = BufferOverflow.DROP_LATEST).shareIn(it, 10, SharingStarted.Eagerly)
125125
}
126126

127127
@Test
128128
fun testBuffer0DropLatestReplay0() =
129129
checkConflation(1, BufferOverflow.DROP_LATEST) {
130-
buffer(0, onBufferOverflow = BufferOverflow.DROP_LATEST).shareIn(it, 0)
130+
buffer(0, onBufferOverflow = BufferOverflow.DROP_LATEST).shareIn(it, 0, SharingStarted.Eagerly)
131131
}
132132

133133
@Test
134134
fun testBuffer0DropLatestReplay1() =
135135
checkConflation(1, BufferOverflow.DROP_LATEST) {
136-
buffer(0, onBufferOverflow = BufferOverflow.DROP_LATEST).shareIn(it, 1)
136+
buffer(0, onBufferOverflow = BufferOverflow.DROP_LATEST).shareIn(it, 1, SharingStarted.Eagerly)
137137
}
138138

139139
@Test
140140
fun testBuffer0DropLatestReplay10() =
141141
checkConflation(10, BufferOverflow.DROP_LATEST) {
142-
buffer(0, onBufferOverflow = BufferOverflow.DROP_LATEST).shareIn(it, 10)
142+
buffer(0, onBufferOverflow = BufferOverflow.DROP_LATEST).shareIn(it, 10, SharingStarted.Eagerly)
143143
}
144144

145145
@Test
146146
fun testBuffer5DropLatestReplay0() =
147147
checkConflation(5, BufferOverflow.DROP_LATEST) {
148-
buffer(5, onBufferOverflow = BufferOverflow.DROP_LATEST).shareIn(it, 0)
148+
buffer(5, onBufferOverflow = BufferOverflow.DROP_LATEST).shareIn(it, 0, SharingStarted.Eagerly)
149149
}
150150

151151
@Test
152152
fun testBuffer5DropLatestReplay10() =
153153
checkConflation(15, BufferOverflow.DROP_LATEST) {
154-
buffer(5, onBufferOverflow = BufferOverflow.DROP_LATEST).shareIn(it, 10)
154+
buffer(5, onBufferOverflow = BufferOverflow.DROP_LATEST).shareIn(it, 10, SharingStarted.Eagerly)
155155
}
156156

157157
@Test // a preceding buffer() gets overridden by buffer(onBufferOverflow = BufferOverflow.DROP_LATEST)
158158
fun testBufferDropLatestOverride() =
159159
checkConflation(1, BufferOverflow.DROP_LATEST) {
160-
buffer(23).buffer(onBufferOverflow = BufferOverflow.DROP_LATEST).shareIn(it, 0)
160+
buffer(23).buffer(onBufferOverflow = BufferOverflow.DROP_LATEST).shareIn(it, 0, SharingStarted.Eagerly)
161161
}
162162
}

kotlinx-coroutines-core/common/test/flow/sharing/ShareInFusionTest.kt

+5-5
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ class ShareInFusionTest : TestBase() {
1414
*/
1515
@Test
1616
fun testOperatorFusion() = runTest {
17-
val sh = emptyFlow<Int>().shareIn(this, 0)
17+
val sh = emptyFlow<Int>().shareIn(this, 0, SharingStarted.Eagerly)
1818
assertTrue(sh !is MutableSharedFlow<*>) // cannot be cast to mutable shared flow!!!
1919
assertSame(sh, (sh as Flow<*>).cancellable())
2020
assertSame(sh, (sh as Flow<*>).flowOn(Dispatchers.Default))
@@ -24,11 +24,11 @@ class ShareInFusionTest : TestBase() {
2424

2525
@Test
2626
fun testFlowOnContextFusion() = runTest {
27-
val flow = flow<String> {
27+
val flow = flow {
2828
assertEquals("FlowCtx", currentCoroutineContext()[CoroutineName]?.name)
2929
emit("OK")
3030
}.flowOn(CoroutineName("FlowCtx"))
31-
assertEquals("OK", flow.shareIn(this, 1).first())
31+
assertEquals("OK", flow.shareIn(this, 1, SharingStarted.Eagerly).first())
3232
coroutineContext.cancelChildren()
3333
}
3434

@@ -39,15 +39,15 @@ class ShareInFusionTest : TestBase() {
3939
@Test
4040
fun testChannelFlowBufferShareIn() = runTest {
4141
expect(1)
42-
val flow = channelFlow<Int> {
42+
val flow = channelFlow {
4343
// send a batch of 10 elements using [offer]
4444
for (i in 1..10) {
4545
assertTrue(offer(i)) // offer must succeed, because buffer
4646
}
4747
send(0) // done
4848
}.buffer(10) // request a buffer of 10
4949
// ^^^^^^^^^ buffer stays here
50-
val shared = flow.shareIn(this, 0)
50+
val shared = flow.shareIn(this, 0, SharingStarted.Eagerly)
5151
shared
5252
.takeWhile { it > 0 }
5353
.collect { i -> expect(i + 1) }

kotlinx-coroutines-core/common/test/flow/sharing/ShareInTest.kt

+3-2
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ class ShareInTest : TestBase() {
1313
fun testReplay0Eager() = runTest {
1414
expect(1)
1515
val flow = flowOf("OK")
16-
val shared = flow.shareIn(this, 0)
16+
val shared = flow.shareIn(this, 0, SharingStarted.Eagerly)
1717
yield() // actually start sharing
1818
// all subscribers miss "OK"
1919
val jobs = List(10) {
@@ -95,7 +95,7 @@ class ShareInTest : TestBase() {
9595
terminate.join()
9696
if (failed) throw TestException()
9797
}
98-
val shared = upstream.shareIn(this + sharingJob, 1)
98+
val shared = upstream.shareIn(this + sharingJob, 1, SharingStarted.Eagerly)
9999
assertEquals(emptyList(), shared.replayCache)
100100
emitted.join() // should start sharing, emit & cache
101101
assertEquals(listOf("OK"), shared.replayCache)
@@ -186,6 +186,7 @@ class ShareInTest : TestBase() {
186186
finish(2)
187187
}
188188

189+
@Suppress("TestFunctionName")
189190
private fun SharingStarted.Companion.WhileSubscribedAtLeast(threshold: Int): SharingStarted =
190191
object : SharingStarted {
191192
override fun command(subscriptionCount: StateFlow<Int>): Flow<SharingCommand> =

kotlinx-coroutines-core/common/test/flow/sharing/SharedFlowTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -710,7 +710,7 @@ class SharedFlowTest : TestBase() {
710710
}
711711
}
712712
}
713-
repeat(1000) { index ->
713+
repeat(1000) {
714714
val value = if (rnd.nextBoolean()) null else rnd.nextData()
715715
if (rnd.nextInt(20) == 0) {
716716
result.add("resetReplayCache & emit: $value")

kotlinx-coroutines-core/common/test/flow/sharing/StateInTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ class StateInTest : TestBase() {
5757
val scope = this + sharingJob
5858
val shared: StateFlow<String?>
5959
if (iv) {
60-
shared = upstream.stateIn(scope, initialValue = null)
60+
shared = upstream.stateIn(scope, SharingStarted.Eagerly, null)
6161
assertEquals(null, shared.value)
6262
} else {
6363
shared = upstream.stateIn(scope)

0 commit comments

Comments
 (0)