@@ -18,22 +18,22 @@ import kotlin.native.concurrent.*
18
18
* presence of collectors. This is opposed to a regular [Flow], such as defined by the [`flow { ... }`][flow] function,
19
19
* which is _cold_ and is started separately for each collector.
20
20
*
21
- * **Shared flow never completes**. A call to [Flow.collect] on a shared flow never completes normally and
22
- * so does a coroutine started by [Flow.launchIn] function. An active collector of a shared flow is called a _subscriber_.
21
+ * **Shared flow never completes**. A call to [Flow.collect] on a shared flow never completes normally, and
22
+ * neither does a coroutine started by the [Flow.launchIn] function. An active collector of a shared flow is called a _subscriber_.
23
23
*
24
- * A subscriber of a shared flow can be cancelled, which usually happens when the scope the coroutine is running
25
- * in is cancelled. A subscriber to a shared flow in always [cancellable][Flow.cancellable] and checks for
26
- * cancellation before each emission. Note that most terminal operators like [Flow.toList] would not complete, too ,
24
+ * A subscriber of a shared flow can be cancelled. This usually happens when the scope in which the coroutine is running
25
+ * is cancelled. A subscriber to a shared flow is always [cancellable][Flow.cancellable], and checks for
26
+ * cancellation before each emission. Note that most terminal operators like [Flow.toList] would also not complete,
27
27
* when applied to a shared flow, but flow-truncating operators like [Flow.take] and [Flow.takeWhile] can be used on a
28
28
* shared flow to turn it into a completing one.
29
29
*
30
- * A [mutable shared flow][MutableSharedFlow] is created using [MutableSharedFlow(...)] constructor function.
30
+ * A [mutable shared flow][MutableSharedFlow] is created using the [MutableSharedFlow(...)] constructor function.
31
31
* Its state can be updated by [emitting][MutableSharedFlow.emit] values to it and performing other operations.
32
- * See [MutableSharedFlow] documentation for details.
32
+ * See the [MutableSharedFlow] documentation for details.
33
33
*
34
- * [SharedFlow] is useful to broadcast events that happens inside application to subscribers that can come and go.
34
+ * [SharedFlow] is useful for broadcasting events that happen inside an application to subscribers that can come and go.
35
35
* For example, the following class encapsulates an event bus that distributes events to all subscribers
36
- * in _rendezvous_ manner, suspending until all subscribers process each event:
36
+ * in a _rendezvous_ manner, suspending until all subscribers process each event:
37
37
*
38
38
* ```
39
39
* class EventBus {
@@ -46,27 +46,27 @@ import kotlin.native.concurrent.*
46
46
* }
47
47
* ```
48
48
*
49
- * As an alternative to the above usage with `MutableSharedFlow(...)` constructor function,
50
- * any _cold_ [Flow] can be converted to a shared flow using [shareIn] operator.
49
+ * As an alternative to the above usage with the `MutableSharedFlow(...)` constructor function,
50
+ * any _cold_ [Flow] can be converted to a shared flow using the [shareIn] operator.
51
51
*
52
- * There is a specialized implementation of shared flow for a case where the most recent state value needs
52
+ * There is a specialized implementation of shared flow for the case where the most recent state value needs
53
53
* to be shared. See [StateFlow] for details.
54
54
*
55
55
* ### Replay cache and buffer
56
56
*
57
- * A shared flow keeps a specific number of the most recent values in its _replay cache_. Every new subscribers first
58
- * gets the values from the replay cache and then gets new emitted values. The maximal size of the replay cache is
57
+ * A shared flow keeps a specific number of the most recent values in its _replay cache_. Every new subscriber first
58
+ * gets the values from the replay cache and then gets new emitted values. The maximum size of the replay cache is
59
59
* specified when the shared flow is created by the `replay` parameter. A snapshot of the current replay cache
60
- * is available via [replayCache] property and it can be reset with [MutableSharedFlow.resetReplayCache] function.
60
+ * is available via the [replayCache] property and it can be reset with the [MutableSharedFlow.resetReplayCache] function.
61
61
*
62
- * A replay cache provides buffer for emissions to the shared flow. Buffer space allows slow subscribers to
62
+ * A replay cache also provides buffer for emissions to the shared flow, allowing slow subscribers to
63
63
* get values from the buffer without suspending emitters. The buffer space determines how much slow subscribers
64
- * can lag from the fast ones. When creating a shared flow additional buffer capacity beyond replay can be reserved
65
- * using `extraBufferCapacity` parameter.
64
+ * can lag from the fast ones. When creating a shared flow, additional buffer capacity beyond replay can be reserved
65
+ * using the `extraBufferCapacity` parameter.
66
66
*
67
67
* A shared flow with a buffer can be configured to avoid suspension of emitters on buffer overflow using
68
- * `onBufferOverflow` parameter, which is equal to one of the entries of [BufferOverflow] enum. When a strategy other
69
- * than [SUSPENDED][BufferOverflow.SUSPEND] is configured emissions to the shared flow never suspend.
68
+ * the `onBufferOverflow` parameter, which is equal to one of the entries of the [BufferOverflow] enum. When a strategy other
69
+ * than [SUSPENDED][BufferOverflow.SUSPEND] is configured, emissions to the shared flow never suspend.
70
70
*
71
71
* ### SharedFlow vs BroadcastChannel
72
72
*
@@ -79,9 +79,9 @@ import kotlin.native.concurrent.*
79
79
* * `SharedFlow` supports configurable replay and buffer overflow strategy.
80
80
* * `SharedFlow` has a clear separation into a read-only `SharedFlow` interface and a [MutableSharedFlow].
81
81
* * `SharedFlow` cannot be closed like `BroadcastChannel` and can never represent a failure.
82
- * All errors and completion signals shall be explicitly _materialized_ if needed.
82
+ * All errors and completion signals should be explicitly _materialized_ if needed.
83
83
*
84
- * To migrate [BroadcastChannel] usage to [SharedFlow] start by replacing `BroadcastChannel(capacity)`
84
+ * To migrate [BroadcastChannel] usage to [SharedFlow], start by replacing usages of the `BroadcastChannel(capacity)`
85
85
* constructor with `MutableSharedFlow(0, extraBufferCapacity=capacity)` (broadcast channel does not replay
86
86
* values to new subscribers). Replace [send][BroadcastChannel.send] and [offer][BroadcastChannel.offer] calls
87
87
* with [emit][MutableStateFlow.emit] and [tryEmit][MutableStateFlow.tryEmit], and convert subscribers' code to flow operators.
@@ -106,7 +106,7 @@ import kotlin.native.concurrent.*
106
106
*
107
107
* **`SharedFlow` interface is not stable for inheritance in 3rd party libraries**, as new methods
108
108
* might be added to this interface in the future, but is stable for use.
109
- * Use `MutableSharedFlow(replay, ...)` constructor function to create an implementation.
109
+ * Use the `MutableSharedFlow(replay, ...)` constructor function to create an implementation.
110
110
*/
111
111
@ExperimentalCoroutinesApi
112
112
public interface SharedFlow <out T > : Flow <T > {
@@ -121,9 +121,9 @@ public interface SharedFlow<out T> : Flow<T> {
121
121
* Its instance with the given configuration parameters can be created using `MutableSharedFlow(...)`
122
122
* constructor function.
123
123
*
124
- * See [SharedFlow] documentation for details on shared flows.
124
+ * See the [SharedFlow] documentation for details on shared flows.
125
125
*
126
- * `MutableSharedFlow` is a [SharedFlow] that also provides abilities to [emit] a value,
126
+ * `MutableSharedFlow` is a [SharedFlow] that also provides the abilities to [emit] a value,
127
127
* to [tryEmit] without suspension if possible, to track the [subscriptionCount],
128
128
* and to [resetReplayCache].
129
129
*
@@ -142,17 +142,17 @@ public interface SharedFlow<out T> : Flow<T> {
142
142
public interface MutableSharedFlow <T > : SharedFlow <T >, FlowCollector <T > {
143
143
/* *
144
144
* Tries to emit a [value] to this shared flow without suspending. It returns `true` if the value was
145
- * emitted successfully. When this function returns `false` it means that the call to a plain [emit]
145
+ * emitted successfully. When this function returns `false`, it means that the call to a plain [emit]
146
146
* function will suspend until there is a buffer space available.
147
147
*
148
- * A shared flow configured with [BufferOverflow] strategy other than [SUSPEND][BufferOverflow.SUSPEND]
148
+ * A shared flow configured with a [BufferOverflow] strategy other than [SUSPEND][BufferOverflow.SUSPEND]
149
149
* (either [DROP_OLDEST][BufferOverflow.DROP_OLDEST] or [DROP_LATEST][BufferOverflow.DROP_LATEST]) never
150
- * suspends on [emit] and thus `tryEmit` to such a shared flow always returns `true`.
150
+ * suspends on [emit], and thus `tryEmit` to such a shared flow always returns `true`.
151
151
*/
152
152
public fun tryEmit (value : T ): Boolean
153
153
154
154
/* *
155
- * A number of subscribers (active collectors) to this shared flow.
155
+ * The number of subscribers (active collectors) to this shared flow.
156
156
*
157
157
* This state can be used to react to changes in the number of subscriptions to this shared flow.
158
158
* For example, if you need to call `onActive` function when the first subscriber appears and `onInactive`
@@ -171,29 +171,28 @@ public interface MutableSharedFlow<T> : SharedFlow<T>, FlowCollector<T> {
171
171
public val subscriptionCount: StateFlow <Int >
172
172
173
173
/* *
174
- * Resets [replayCache] of this shared flow to an empty state.
175
- * New subscribers will be receiving only the values emitted after this call,
174
+ * Resets the [replayCache] of this shared flow to an empty state.
175
+ * New subscribers will be receiving only the values that were emitted after this call,
176
176
* while old subscribers will still be receiving previously buffered values.
177
- * To reset a shared flow to an initial value, emit this value after this call.
177
+ * To reset a shared flow to an initial value, emit the value after this call.
178
178
*
179
179
* On a [MutableStateFlow], which always contains a single value, this function is not
180
- * supported and throws [UnsupportedOperationException]. To reset a [MutableStateFlow]
181
- * to an initial value just update its [value][MutableStateFlow.value].
180
+ * supported, and throws an [UnsupportedOperationException]. To reset a [MutableStateFlow]
181
+ * to an initial value, just update its [value][MutableStateFlow.value].
182
182
*/
183
- @Suppress(" UNCHECKED_CAST" )
184
183
public fun resetReplayCache ()
185
184
}
186
185
187
186
/* *
188
187
* Creates a [MutableSharedFlow] with the given configuration parameters.
189
188
*
190
- * This function throws [IllegalArgumentException] on unsupported values of parameters of combinations thereof.
189
+ * This function throws [IllegalArgumentException] on unsupported values of parameters or combinations thereof.
191
190
*
192
191
* @param replay the number of values replayed to new subscribers (cannot be negative).
193
192
* @param extraBufferCapacity the number of values buffered in addition to `replay`.
194
193
* [emit][MutableSharedFlow.emit] does not suspend while there is a buffer space remaining (optional, cannot be negative, defaults to zero).
195
194
* @param onBufferOverflow configures an action on buffer overflow (optional, defaults to
196
- * [suspending][BufferOverflow.SUSPEND] attempt to [emit][MutableSharedFlow.emit] a value,
195
+ * [suspending][BufferOverflow.SUSPEND] attempts to [emit][MutableSharedFlow.emit] a value,
197
196
* supported only when `replay > 0` or `extraBufferCapacity > 0`).
198
197
*/
199
198
@Suppress(" FunctionName" , " UNCHECKED_CAST" )
@@ -341,7 +340,7 @@ private class SharedFlowImpl<T>(
341
340
// Fast path without collectors -> no buffering
342
341
if (nCollectors == 0 ) return tryEmitNoCollectorsLocked(value) // always returns true
343
342
// With collectors we'll have to buffer
344
- // cannot emit now if buffer is full && blocked by slow collectors
343
+ // cannot emit now if buffer is full & blocked by slow collectors
345
344
if (bufferSize >= bufferCapacity && minCollectorIndex <= replayIndex) {
346
345
when (onBufferOverflow) {
347
346
BufferOverflow .SUSPEND -> return false // will suspend
@@ -448,7 +447,7 @@ private class SharedFlowImpl<T>(
448
447
return index
449
448
}
450
449
451
- // Is called when collector disappears of changes index, returns a list of continuations to resume after lock
450
+ // Is called when a collector disappears or changes index, returns a list of continuations to resume after lock
452
451
internal fun updateCollectorIndexLocked (oldIndex : Long ): List <Continuation <Unit >>? {
453
452
assert { oldIndex >= minCollectorIndex }
454
453
if (oldIndex > minCollectorIndex) return null // nothing changes, it was not min
@@ -520,7 +519,7 @@ private class SharedFlowImpl<T>(
520
519
assert { newHead >= head }
521
520
// cleanup items we don't have to buffer anymore (because head is about to move)
522
521
for (index in head until newHead) buffer!! .setBufferAt(index, null )
523
- // update all state variable to newly computed values
522
+ // update all state variables to newly computed values
524
523
replayIndex = newReplayIndex
525
524
minCollectorIndex = newMinCollectorIndex
526
525
bufferSize = (newBufferEndIndex - newHead).toInt()
@@ -531,7 +530,7 @@ private class SharedFlowImpl<T>(
531
530
assert { replayIndex <= this .head + bufferSize }
532
531
}
533
532
534
- // :todo:
533
+ // Removes all the NO_VALUE items from the end of the queue and reduces its size
535
534
private fun cleanupTailLocked () {
536
535
// If we have synchronous case, then keep one emitter queued
537
536
if (bufferCapacity == 0 && queueSize <= 1 ) return // return, don't clear it
@@ -563,7 +562,7 @@ private class SharedFlowImpl<T>(
563
562
564
563
// returns -1 if cannot peek value without suspension
565
564
private fun tryPeekLocked (slot : SharedFlowSlot ): Long {
566
- // return buffered value is possible
565
+ // return buffered value if possible
567
566
val index = slot.index
568
567
if (index < bufferEndIndex) return index
569
568
if (bufferCapacity > 0 ) return - 1L // if there's a buffer, never try to rendezvous with emitters
0 commit comments