Skip to content

Commit 836bc56

Browse files
elizarov1zamanThomas-Vostwyatt
authored andcommitted
Introduce SharedFlow and sharing operators (Kotlin#2069)
* Introduce SharedFlow and sharing operators Summary of changes: * SharedFlow, MutableSharedFlow and its constructor. * StateFlow implements SharedFlow. * SharedFlow.onSubscription operator, clarified docs in other onXxx operators. * BufferOverflow strategy in kotlinx.coroutines.channels package. * shareIn and stateIn operators and SharingStarted strategies for them. * SharedFlow.flowOn error lint (up from StateFlow). * Precise cancellable() operator fusion. * Precise distinctUntilChanged() operator fusion. * StateFlow.compareAndSet function. * asStateFlow and asSharedFlow read-only view functions. * Consistently clarified docs on cold vs hot flows. * Future deprecation notice for BroadcastChannel, ConflatedBroadcastChannel, broadcast, and broadcastIn. * Channel(...) constructor function has onBufferOverflow parameter. * buffer(...) operator has onBufferOverflow parameter. * shareIn/stateIn buffer and overflow strategy are configured via upstream buffer operators. * shareIn/stateIn fuse with upstream flowOn for more efficient execution. * conflate() is implemented as buffer(onBufferOverflow=KEEP_LATEST), non-suspending strategies are reasonably supported with 0 and default capacities. * Added reactive operator migration hints. * WhileSubscribed with kotlin.time.Duration params Fixes Kotlin#2034 Fixes Kotlin#2047 Co-authored-by: Ibraheem Zaman <[email protected]> Co-authored-by: Thomas Vos <[email protected]> Co-authored-by: Travis Wyatt <[email protected]>
1 parent 24edb02 commit 836bc56

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

58 files changed

+4879
-422
lines changed

README.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ suspend fun main() = coroutineScope {
4646
* [DebugProbes] API to probe, keep track of, print and dump active coroutines;
4747
* [CoroutinesTimeout] test rule to automatically dump coroutines on test timeout.
4848
* [reactive](reactive/README.md) &mdash; modules that provide builders and iteration support for various reactive streams libraries:
49-
* Reactive Streams ([Publisher.collect], [Publisher.awaitSingle], [publish], etc),
49+
* Reactive Streams ([Publisher.collect], [Publisher.awaitSingle], [kotlinx.coroutines.reactive.publish], etc),
5050
* Flow (JDK 9) (the same interface as for Reactive Streams),
5151
* RxJava 2.x ([rxFlowable], [rxSingle], etc), and
5252
* RxJava 3.x ([rxFlowable], [rxSingle], etc), and
@@ -302,7 +302,7 @@ The `develop` branch is pushed to `master` during release.
302302
<!--- INDEX kotlinx.coroutines.reactive -->
303303
[Publisher.collect]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/org.reactivestreams.-publisher/collect.html
304304
[Publisher.awaitSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/org.reactivestreams.-publisher/await-single.html
305-
[publish]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/publish.html
305+
[kotlinx.coroutines.reactive.publish]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/publish.html
306306
<!--- MODULE kotlinx-coroutines-rx2 -->
307307
<!--- INDEX kotlinx.coroutines.rx2 -->
308308
[rxFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/rx-flowable.html

kotlinx-coroutines-core/api/kotlinx-coroutines-core.api

+79-13
Large diffs are not rendered by default.

kotlinx-coroutines-core/common/src/CompletableDeferred.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import kotlinx.coroutines.selects.*
1919
* All functions on this interface are **thread-safe** and can
2020
* be safely invoked from concurrent coroutines without external synchronization.
2121
*
22-
* **`CompletableDeferred` interface is not stable for inheritance in 3rd party libraries**,
22+
* **The `CompletableDeferred` interface is not stable for inheritance in 3rd party libraries**,
2323
* as new methods might be added to this interface in the future, but is stable for use.
2424
*/
2525
public interface CompletableDeferred<T> : Deferred<T> {

kotlinx-coroutines-core/common/src/CompletableJob.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ package kotlinx.coroutines
1111
* All functions on this interface are **thread-safe** and can
1212
* be safely invoked from concurrent coroutines without external synchronization.
1313
*
14-
* **`CompletableJob` interface is not stable for inheritance in 3rd party libraries**,
14+
* **The `CompletableJob` interface is not stable for inheritance in 3rd party libraries**,
1515
* as new methods might be added to this interface in the future, but is stable for use.
1616
*/
1717
public interface CompletableJob : Job {

kotlinx-coroutines-core/common/src/Delay.kt

+4-1
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,10 @@ public suspend fun awaitCancellation(): Nothing = suspendCancellableCoroutine {}
117117
public suspend fun delay(timeMillis: Long) {
118118
if (timeMillis <= 0) return // don't delay
119119
return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
120-
cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
120+
// if timeMillis == Long.MAX_VALUE then just wait forever like awaitCancellation, don't schedule.
121+
if (timeMillis < Long.MAX_VALUE) {
122+
cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
123+
}
121124
}
122125
}
123126

kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt

+5-5
Original file line numberDiff line numberDiff line change
@@ -1019,23 +1019,23 @@ internal val EMPTY = Symbol("EMPTY") // marker for Conflated & Buffered channels
10191019

10201020
@JvmField
10211021
@SharedImmutable
1022-
internal val OFFER_SUCCESS: Any = Symbol("OFFER_SUCCESS")
1022+
internal val OFFER_SUCCESS = Symbol("OFFER_SUCCESS")
10231023

10241024
@JvmField
10251025
@SharedImmutable
1026-
internal val OFFER_FAILED: Any = Symbol("OFFER_FAILED")
1026+
internal val OFFER_FAILED = Symbol("OFFER_FAILED")
10271027

10281028
@JvmField
10291029
@SharedImmutable
1030-
internal val POLL_FAILED: Any = Symbol("POLL_FAILED")
1030+
internal val POLL_FAILED = Symbol("POLL_FAILED")
10311031

10321032
@JvmField
10331033
@SharedImmutable
1034-
internal val ENQUEUE_FAILED: Any = Symbol("ENQUEUE_FAILED")
1034+
internal val ENQUEUE_FAILED = Symbol("ENQUEUE_FAILED")
10351035

10361036
@JvmField
10371037
@SharedImmutable
1038-
internal val HANDLER_INVOKED: Any = Symbol("ON_CLOSE_HANDLER_INVOKED")
1038+
internal val HANDLER_INVOKED = Symbol("ON_CLOSE_HANDLER_INVOKED")
10391039

10401040
internal typealias Handler = (Throwable?) -> Unit
10411041

kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt

+78-55
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,18 @@ internal open class ArrayChannel<E>(
2323
/**
2424
* Buffer capacity.
2525
*/
26-
val capacity: Int,
26+
private val capacity: Int,
27+
private val onBufferOverflow: BufferOverflow,
2728
onUndeliveredElement: OnUndeliveredElement<E>?
2829
) : AbstractChannel<E>(onUndeliveredElement) {
2930
init {
31+
// This check is actually used by the Channel(...) constructor function which checks only for known
32+
// capacities and calls ArrayChannel constructor for everything else.
3033
require(capacity >= 1) { "ArrayChannel capacity must be at least 1, but $capacity was specified" }
3134
}
3235

3336
private val lock = ReentrantLock()
37+
3438
/*
3539
* Guarded by lock.
3640
* Allocate minimum of capacity and 16 to avoid excess memory pressure for large channels when it's not necessary.
@@ -43,7 +47,7 @@ internal open class ArrayChannel<E>(
4347
protected final override val isBufferAlwaysEmpty: Boolean get() = false
4448
protected final override val isBufferEmpty: Boolean get() = size.value == 0
4549
protected final override val isBufferAlwaysFull: Boolean get() = false
46-
protected final override val isBufferFull: Boolean get() = size.value == capacity
50+
protected final override val isBufferFull: Boolean get() = size.value == capacity && onBufferOverflow == BufferOverflow.SUSPEND
4751

4852
override val isFull: Boolean get() = lock.withLock { isFullImpl }
4953
override val isEmpty: Boolean get() = lock.withLock { isEmptyImpl }
@@ -55,31 +59,26 @@ internal open class ArrayChannel<E>(
5559
lock.withLock {
5660
val size = this.size.value
5761
closedForSend?.let { return it }
58-
if (size < capacity) {
59-
// tentatively put element to buffer
60-
this.size.value = size + 1 // update size before checking queue (!!!)
61-
// check for receivers that were waiting on empty queue
62-
if (size == 0) {
63-
loop@ while (true) {
64-
receive = takeFirstReceiveOrPeekClosed() ?: break@loop // break when no receivers queued
65-
if (receive is Closed) {
66-
this.size.value = size // restore size
67-
return receive!!
68-
}
69-
val token = receive!!.tryResumeReceive(element, null)
70-
if (token != null) {
71-
assert { token === RESUME_TOKEN }
72-
this.size.value = size // restore size
73-
return@withLock
74-
}
62+
// update size before checking queue (!!!)
63+
updateBufferSize(size)?.let { return it }
64+
// check for receivers that were waiting on empty queue
65+
if (size == 0) {
66+
loop@ while (true) {
67+
receive = takeFirstReceiveOrPeekClosed() ?: break@loop // break when no receivers queued
68+
if (receive is Closed) {
69+
this.size.value = size // restore size
70+
return receive!!
71+
}
72+
val token = receive!!.tryResumeReceive(element, null)
73+
if (token != null) {
74+
assert { token === RESUME_TOKEN }
75+
this.size.value = size // restore size
76+
return@withLock
7577
}
7678
}
77-
ensureCapacity(size)
78-
buffer[(head + size) % buffer.size] = element // actually queue element
79-
return OFFER_SUCCESS
8079
}
81-
// size == capacity: full
82-
return OFFER_FAILED
80+
enqueueElement(size, element)
81+
return OFFER_SUCCESS
8382
}
8483
// breaks here if offer meets receiver
8584
receive!!.completeResumeReceive(element)
@@ -92,41 +91,36 @@ internal open class ArrayChannel<E>(
9291
lock.withLock {
9392
val size = this.size.value
9493
closedForSend?.let { return it }
95-
if (size < capacity) {
96-
// tentatively put element to buffer
97-
this.size.value = size + 1 // update size before checking queue (!!!)
98-
// check for receivers that were waiting on empty queue
99-
if (size == 0) {
100-
loop@ while (true) {
101-
val offerOp = describeTryOffer(element)
102-
val failure = select.performAtomicTrySelect(offerOp)
103-
when {
104-
failure == null -> { // offered successfully
105-
this.size.value = size // restore size
106-
receive = offerOp.result
107-
return@withLock
108-
}
109-
failure === OFFER_FAILED -> break@loop // cannot offer -> Ok to queue to buffer
110-
failure === RETRY_ATOMIC -> {} // retry
111-
failure === ALREADY_SELECTED || failure is Closed<*> -> {
112-
this.size.value = size // restore size
113-
return failure
114-
}
115-
else -> error("performAtomicTrySelect(describeTryOffer) returned $failure")
94+
// update size before checking queue (!!!)
95+
updateBufferSize(size)?.let { return it }
96+
// check for receivers that were waiting on empty queue
97+
if (size == 0) {
98+
loop@ while (true) {
99+
val offerOp = describeTryOffer(element)
100+
val failure = select.performAtomicTrySelect(offerOp)
101+
when {
102+
failure == null -> { // offered successfully
103+
this.size.value = size // restore size
104+
receive = offerOp.result
105+
return@withLock
106+
}
107+
failure === OFFER_FAILED -> break@loop // cannot offer -> Ok to queue to buffer
108+
failure === RETRY_ATOMIC -> {} // retry
109+
failure === ALREADY_SELECTED || failure is Closed<*> -> {
110+
this.size.value = size // restore size
111+
return failure
116112
}
113+
else -> error("performAtomicTrySelect(describeTryOffer) returned $failure")
117114
}
118115
}
119-
// let's try to select sending this element to buffer
120-
if (!select.trySelect()) { // :todo: move trySelect completion outside of lock
121-
this.size.value = size // restore size
122-
return ALREADY_SELECTED
123-
}
124-
ensureCapacity(size)
125-
buffer[(head + size) % buffer.size] = element // actually queue element
126-
return OFFER_SUCCESS
127116
}
128-
// size == capacity: full
129-
return OFFER_FAILED
117+
// let's try to select sending this element to buffer
118+
if (!select.trySelect()) { // :todo: move trySelect completion outside of lock
119+
this.size.value = size // restore size
120+
return ALREADY_SELECTED
121+
}
122+
enqueueElement(size, element)
123+
return OFFER_SUCCESS
130124
}
131125
// breaks here if offer meets receiver
132126
receive!!.completeResumeReceive(element)
@@ -137,6 +131,35 @@ internal open class ArrayChannel<E>(
137131
super.enqueueSend(send)
138132
}
139133

134+
// Guarded by lock
135+
// Result is `OFFER_SUCCESS | OFFER_FAILED | null`
136+
private fun updateBufferSize(currentSize: Int): Symbol? {
137+
if (currentSize < capacity) {
138+
size.value = currentSize + 1 // tentatively put it into the buffer
139+
return null // proceed
140+
}
141+
// buffer is full
142+
return when (onBufferOverflow) {
143+
BufferOverflow.SUSPEND -> OFFER_FAILED
144+
BufferOverflow.DROP_LATEST -> OFFER_SUCCESS
145+
BufferOverflow.DROP_OLDEST -> null // proceed, will drop oldest in enqueueElement
146+
}
147+
}
148+
149+
// Guarded by lock
150+
private fun enqueueElement(currentSize: Int, element: E) {
151+
if (currentSize < capacity) {
152+
ensureCapacity(currentSize)
153+
buffer[(head + currentSize) % buffer.size] = element // actually queue element
154+
} else {
155+
// buffer is full
156+
assert { onBufferOverflow == BufferOverflow.DROP_OLDEST } // the only way we can get here
157+
buffer[head % buffer.size] = null // drop oldest element
158+
buffer[(head + currentSize) % buffer.size] = element // actually queue element
159+
head = (head + 1) % buffer.size
160+
}
161+
}
162+
140163
// Guarded by lock
141164
private fun ensureCapacity(currentSize: Int) {
142165
if (currentSize >= buffer.size) {

kotlinx-coroutines-core/common/src/channels/Broadcast.kt

+4-3
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
1010
import kotlinx.coroutines.intrinsics.*
1111
import kotlin.coroutines.*
1212
import kotlin.coroutines.intrinsics.*
13-
import kotlin.native.concurrent.*
1413

1514
/**
1615
* Broadcasts all elements of the channel.
@@ -34,8 +33,10 @@ import kotlin.native.concurrent.*
3433
*
3534
* This function has an inappropriate result type of [BroadcastChannel] which provides
3635
* [send][BroadcastChannel.send] and [close][BroadcastChannel.close] operations that interfere with
37-
* the broadcasting coroutine in hard-to-specify ways. It will be replaced with
38-
* sharing operators on [Flow][kotlinx.coroutines.flow.Flow] in the future.
36+
* the broadcasting coroutine in hard-to-specify ways.
37+
*
38+
* **Note: This API is obsolete.** It will be deprecated and replaced with the
39+
* [Flow.shareIn][kotlinx.coroutines.flow.shareIn] operator when it becomes stable.
3940
*
4041
* @param start coroutine start option. The default value is [CoroutineStart.LAZY].
4142
*/

kotlinx-coroutines-core/common/src/channels/BroadcastChannel.kt

+4-3
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@
77
package kotlinx.coroutines.channels
88

99
import kotlinx.coroutines.*
10-
import kotlinx.coroutines.channels.Channel.Factory.CONFLATED
1110
import kotlinx.coroutines.channels.Channel.Factory.BUFFERED
1211
import kotlinx.coroutines.channels.Channel.Factory.CHANNEL_DEFAULT_CAPACITY
12+
import kotlinx.coroutines.channels.Channel.Factory.CONFLATED
1313
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
1414

1515
/**
@@ -20,9 +20,10 @@ import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
2020
* See `BroadcastChannel()` factory function for the description of available
2121
* broadcast channel implementations.
2222
*
23-
* **Note: This is an experimental api.** It may be changed in the future updates.
23+
* **Note: This API is obsolete.** It will be deprecated and replaced by [SharedFlow][kotlinx.coroutines.flow.SharedFlow]
24+
* when it becomes stable.
2425
*/
25-
@ExperimentalCoroutinesApi
26+
@ExperimentalCoroutinesApi // not @ObsoleteCoroutinesApi to reduce burden for people who are still using it
2627
public interface BroadcastChannel<E> : SendChannel<E> {
2728
/**
2829
* Subscribes to this [BroadcastChannel] and returns a channel to receive elements from it.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.channels
6+
7+
import kotlinx.coroutines.*
8+
9+
/**
10+
* A strategy for buffer overflow handling in [channels][Channel] and [flows][kotlinx.coroutines.flow.Flow] that
11+
* controls what is going to be sacrificed on buffer overflow:
12+
*
13+
* * [SUSPEND] &mdash; the upstream that is [sending][SendChannel.send] or
14+
* is [emitting][kotlinx.coroutines.flow.FlowCollector.emit] a value is **suspended** while the buffer is full.
15+
* * [DROP_OLDEST] &mdash; drop **the oldest** value in the buffer on overflow, add the new value to the buffer, do not suspend.
16+
* * [DROP_LATEST] &mdash; drop **the latest** value that is being added to the buffer right now on buffer overflow
17+
* (so that buffer contents stay the same), do not suspend.
18+
*/
19+
@ExperimentalCoroutinesApi
20+
public enum class BufferOverflow {
21+
/**
22+
* Suspend on buffer overflow.
23+
*/
24+
SUSPEND,
25+
26+
/**
27+
* Drop **the oldest** value in the buffer on overflow, add the new value to the buffer, do not suspend.
28+
*/
29+
DROP_OLDEST,
30+
31+
/**
32+
* Drop **the latest** value that is being added to the buffer right now on buffer overflow
33+
* (so that buffer contents stay the same), do not suspend.
34+
*/
35+
DROP_LATEST
36+
}

0 commit comments

Comments
 (0)