Skip to content

Commit 2fb4016

Browse files
committed
Introduce SharedFlow and sharing operators
Summary of changes: * SharedFlow, MutableSharedFlow and its constructor. * StateFlow implements SharedFlow. * SharedFlow.onSubscription operator, clarified docs in other onXxx operators. * BufferOverflow strategy in kotlinx.coroutines.channels package. * shareIn and stateIn operators and SharingStarted strategies for them. * SharedFlow.flowOn error lint (up from StateFlow). * Precise cancellable() operator fusion. * Precise distinctUntilChanged() operator fusion. * StateFlow.compareAndSet function. * asStateFlow and asSharedFlow read-only view functions. * Consistently clarified docs on cold vs hot flows. * Future deprecation notice for BroadcastChannel, ConflatedBroadcastChannel, broadcast, and broadcastIn. * Channel(...) constructor function has onBufferOverflow parameter. * buffer(...) operator has onBufferOverflow parameter. * shareIn/stateIn buffer and overflow strategy are configured via upstream buffer operators. * shareIn/stateIn fuse with upstream flowOn for more efficient execution. * conflate() is implemented as buffer(onBufferOverflow=KEEP_LATEST), non-suspending strategies are reasonably supported with 0 and default capacities. * Added reactive operator migration hints. * WhileSubscribed with kotlin.time.Duration params Fixes #2034 Fixes #2047
1 parent 4b16e1b commit 2fb4016

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+4767
-499
lines changed

kotlinx-coroutines-core/api/kotlinx-coroutines-core.api

+85-12
Large diffs are not rendered by default.

kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt

+5-5
Original file line numberDiff line numberDiff line change
@@ -959,23 +959,23 @@ internal const val RECEIVE_RESULT = 2
959959

960960
@JvmField
961961
@SharedImmutable
962-
internal val OFFER_SUCCESS: Any = Symbol("OFFER_SUCCESS")
962+
internal val OFFER_SUCCESS = Symbol("OFFER_SUCCESS")
963963

964964
@JvmField
965965
@SharedImmutable
966-
internal val OFFER_FAILED: Any = Symbol("OFFER_FAILED")
966+
internal val OFFER_FAILED = Symbol("OFFER_FAILED")
967967

968968
@JvmField
969969
@SharedImmutable
970-
internal val POLL_FAILED: Any = Symbol("POLL_FAILED")
970+
internal val POLL_FAILED = Symbol("POLL_FAILED")
971971

972972
@JvmField
973973
@SharedImmutable
974-
internal val ENQUEUE_FAILED: Any = Symbol("ENQUEUE_FAILED")
974+
internal val ENQUEUE_FAILED = Symbol("ENQUEUE_FAILED")
975975

976976
@JvmField
977977
@SharedImmutable
978-
internal val HANDLER_INVOKED: Any = Symbol("ON_CLOSE_HANDLER_INVOKED")
978+
internal val HANDLER_INVOKED = Symbol("ON_CLOSE_HANDLER_INVOKED")
979979

980980
internal typealias Handler = (Throwable?) -> Unit
981981

kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt

+78-56
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,15 @@ internal open class ArrayChannel<E>(
2323
/**
2424
* Buffer capacity.
2525
*/
26-
val capacity: Int
26+
private val capacity: Int,
27+
private val onBufferOverflow: BufferOverflow
2728
) : AbstractChannel<E>() {
2829
init {
2930
require(capacity >= 1) { "ArrayChannel capacity must be at least 1, but $capacity was specified" }
3031
}
3132

3233
private val lock = ReentrantLock()
34+
3335
/*
3436
* Guarded by lock.
3537
* Allocate minimum of capacity and 16 to avoid excess memory pressure for large channels when it's not necessary.
@@ -41,7 +43,7 @@ internal open class ArrayChannel<E>(
4143
protected final override val isBufferAlwaysEmpty: Boolean get() = false
4244
protected final override val isBufferEmpty: Boolean get() = size.value == 0
4345
protected final override val isBufferAlwaysFull: Boolean get() = false
44-
protected final override val isBufferFull: Boolean get() = size.value == capacity
46+
protected final override val isBufferFull: Boolean get() = size.value == capacity && onBufferOverflow == BufferOverflow.SUSPEND
4547

4648
override val isFull: Boolean get() = lock.withLock { isFullImpl }
4749
override val isEmpty: Boolean get() = lock.withLock { isEmptyImpl }
@@ -53,31 +55,26 @@ internal open class ArrayChannel<E>(
5355
lock.withLock {
5456
val size = this.size.value
5557
closedForSend?.let { return it }
56-
if (size < capacity) {
57-
// tentatively put element to buffer
58-
this.size.value = size + 1 // update size before checking queue (!!!)
59-
// check for receivers that were waiting on empty queue
60-
if (size == 0) {
61-
loop@ while (true) {
62-
receive = takeFirstReceiveOrPeekClosed() ?: break@loop // break when no receivers queued
63-
if (receive is Closed) {
64-
this.size.value = size // restore size
65-
return receive!!
66-
}
67-
val token = receive!!.tryResumeReceive(element, null)
68-
if (token != null) {
69-
assert { token === RESUME_TOKEN }
70-
this.size.value = size // restore size
71-
return@withLock
72-
}
58+
// update size before checking queue (!!!)
59+
updateBufferSize(size)?.let { return it }
60+
// check for receivers that were waiting on empty queue
61+
if (size == 0) {
62+
loop@ while (true) {
63+
receive = takeFirstReceiveOrPeekClosed() ?: break@loop // break when no receivers queued
64+
if (receive is Closed) {
65+
this.size.value = size // restore size
66+
return receive!!
67+
}
68+
val token = receive!!.tryResumeReceive(element, null)
69+
if (token != null) {
70+
assert { token === RESUME_TOKEN }
71+
this.size.value = size // restore size
72+
return@withLock
7373
}
7474
}
75-
ensureCapacity(size)
76-
buffer[(head + size) % buffer.size] = element // actually queue element
77-
return OFFER_SUCCESS
7875
}
79-
// size == capacity: full
80-
return OFFER_FAILED
76+
enqueueElement(size, element)
77+
return OFFER_SUCCESS
8178
}
8279
// breaks here if offer meets receiver
8380
receive!!.completeResumeReceive(element)
@@ -90,41 +87,36 @@ internal open class ArrayChannel<E>(
9087
lock.withLock {
9188
val size = this.size.value
9289
closedForSend?.let { return it }
93-
if (size < capacity) {
94-
// tentatively put element to buffer
95-
this.size.value = size + 1 // update size before checking queue (!!!)
96-
// check for receivers that were waiting on empty queue
97-
if (size == 0) {
98-
loop@ while (true) {
99-
val offerOp = describeTryOffer(element)
100-
val failure = select.performAtomicTrySelect(offerOp)
101-
when {
102-
failure == null -> { // offered successfully
103-
this.size.value = size // restore size
104-
receive = offerOp.result
105-
return@withLock
106-
}
107-
failure === OFFER_FAILED -> break@loop // cannot offer -> Ok to queue to buffer
108-
failure === RETRY_ATOMIC -> {} // retry
109-
failure === ALREADY_SELECTED || failure is Closed<*> -> {
110-
this.size.value = size // restore size
111-
return failure
112-
}
113-
else -> error("performAtomicTrySelect(describeTryOffer) returned $failure")
90+
// update size before checking queue (!!!)
91+
updateBufferSize(size)?.let { return it }
92+
// check for receivers that were waiting on empty queue
93+
if (size == 0) {
94+
loop@ while (true) {
95+
val offerOp = describeTryOffer(element)
96+
val failure = select.performAtomicTrySelect(offerOp)
97+
when {
98+
failure == null -> { // offered successfully
99+
this.size.value = size // restore size
100+
receive = offerOp.result
101+
return@withLock
102+
}
103+
failure === OFFER_FAILED -> break@loop // cannot offer -> Ok to queue to buffer
104+
failure === RETRY_ATOMIC -> {} // retry
105+
failure === ALREADY_SELECTED || failure is Closed<*> -> {
106+
this.size.value = size // restore size
107+
return failure
114108
}
109+
else -> error("performAtomicTrySelect(describeTryOffer) returned $failure")
115110
}
116111
}
117-
// let's try to select sending this element to buffer
118-
if (!select.trySelect()) { // :todo: move trySelect completion outside of lock
119-
this.size.value = size // restore size
120-
return ALREADY_SELECTED
121-
}
122-
ensureCapacity(size)
123-
buffer[(head + size) % buffer.size] = element // actually queue element
124-
return OFFER_SUCCESS
125112
}
126-
// size == capacity: full
127-
return OFFER_FAILED
113+
// let's try to select sending this element to buffer
114+
if (!select.trySelect()) { // :todo: move trySelect completion outside of lock
115+
this.size.value = size // restore size
116+
return ALREADY_SELECTED
117+
}
118+
enqueueElement(size, element)
119+
return OFFER_SUCCESS
128120
}
129121
// breaks here if offer meets receiver
130122
receive!!.completeResumeReceive(element)
@@ -135,6 +127,35 @@ internal open class ArrayChannel<E>(
135127
super.enqueueSend(send)
136128
}
137129

130+
// Guarded by lock
131+
// Result is `OFFER_SUCCESS | OFFER_FAILED | null`
132+
private fun updateBufferSize(currentSize: Int): Symbol? {
133+
if (currentSize < capacity) {
134+
size.value = currentSize + 1 // tentatively put it into the buffer
135+
return null // proceed
136+
}
137+
// buffer is full
138+
return when (onBufferOverflow) {
139+
BufferOverflow.SUSPEND -> OFFER_FAILED
140+
BufferOverflow.DROP_LATEST -> OFFER_SUCCESS
141+
BufferOverflow.DROP_OLDEST -> null // proceed, will drop oldest in enqueueElement
142+
}
143+
}
144+
145+
// Guarded by lock
146+
private fun enqueueElement(currentSize: Int, element: E) {
147+
if (currentSize < capacity) {
148+
ensureCapacity(currentSize)
149+
buffer[(head + currentSize) % buffer.size] = element // actually queue element
150+
} else {
151+
// buffer is full
152+
assert { onBufferOverflow == BufferOverflow.DROP_OLDEST } // the only way we can get here
153+
buffer[head % buffer.size] = null // drop oldest element
154+
buffer[(head + currentSize) % buffer.size] = element // actually queue element
155+
head = (head + 1) % buffer.size
156+
}
157+
}
158+
138159
// Guarded by lock
139160
private fun ensureCapacity(currentSize: Int) {
140161
if (currentSize >= buffer.size) {
@@ -212,7 +233,8 @@ internal open class ArrayChannel<E>(
212233
break@loop
213234
}
214235
failure === POLL_FAILED -> break@loop // cannot poll -> Ok to take from buffer
215-
failure === RETRY_ATOMIC -> {} // retry
236+
failure === RETRY_ATOMIC -> {
237+
} // retry
216238
failure === ALREADY_SELECTED -> {
217239
this.size.value = size // restore size
218240
buffer[head] = result // restore head

kotlinx-coroutines-core/common/src/channels/Broadcast.kt

+4-3
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
1010
import kotlinx.coroutines.intrinsics.*
1111
import kotlin.coroutines.*
1212
import kotlin.coroutines.intrinsics.*
13-
import kotlin.native.concurrent.*
1413

1514
/**
1615
* Broadcasts all elements of the channel.
@@ -34,8 +33,10 @@ import kotlin.native.concurrent.*
3433
*
3534
* This function has an inappropriate result type of [BroadcastChannel] which provides
3635
* [send][BroadcastChannel.send] and [close][BroadcastChannel.close] operations that interfere with
37-
* the broadcasting coroutine in hard-to-specify ways. It will be replaced with
38-
* sharing operators on [Flow][kotlinx.coroutines.flow.Flow] in the future.
36+
* the broadcasting coroutine in hard-to-specify ways.
37+
*
38+
* **Note: This API is obsolete.** It will be deprecated and replaced with
39+
* [Flow.shareIn][kotlinx.coroutines.flow.shareIn] operator when it becomes stable.
3940
*
4041
* @param start coroutine start option. The default value is [CoroutineStart.LAZY].
4142
*/

kotlinx-coroutines-core/common/src/channels/BroadcastChannel.kt

+4-3
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@
77
package kotlinx.coroutines.channels
88

99
import kotlinx.coroutines.*
10-
import kotlinx.coroutines.channels.Channel.Factory.CONFLATED
1110
import kotlinx.coroutines.channels.Channel.Factory.BUFFERED
1211
import kotlinx.coroutines.channels.Channel.Factory.CHANNEL_DEFAULT_CAPACITY
12+
import kotlinx.coroutines.channels.Channel.Factory.CONFLATED
1313
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
1414

1515
/**
@@ -20,9 +20,10 @@ import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
2020
* See `BroadcastChannel()` factory function for the description of available
2121
* broadcast channel implementations.
2222
*
23-
* **Note: This is an experimental api.** It may be changed in the future updates.
23+
* **Note: This API is obsolete.** It will be deprecated and replaced by [SharedFlow][kotlinx.coroutines.flow.SharedFlow]
24+
* when it becomes stable.
2425
*/
25-
@ExperimentalCoroutinesApi
26+
@ExperimentalCoroutinesApi // not @ObsoleteCoroutinesApi to reduce burden for people who are still using it
2627
public interface BroadcastChannel<E> : SendChannel<E> {
2728
/**
2829
* Subscribes to this [BroadcastChannel] and returns a channel to receive elements from it.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.channels
6+
7+
import kotlinx.coroutines.*
8+
9+
/**
10+
* A strategy for buffer overflow handling in [channels][Channel] and [flows][kotlinx.coroutines.flow.Flow] that
11+
* controls what is going to be sacrificed on buffer overflow:
12+
*
13+
* * [SUSPEND] &mdash; upstream that is [sending][SendChannel.send] or
14+
* is [emitting][kotlinx.coroutines.flow.FlowCollector.emit] a value is **suspended** while the buffer is full.
15+
* * [DROP_OLDEST] &mdash; drop **the oldest** value in the buffer on overflow, add the new value to the buffer, do not suspend.
16+
* * [DROP_LATEST] &mdash; drop **the latest** value that is being added to the buffer right now on buffer overflow,
17+
* so that buffer contents stay the same, do not suspend.
18+
*/
19+
@ExperimentalCoroutinesApi
20+
public enum class BufferOverflow {
21+
/**
22+
* Suspend on buffer overflow.
23+
*/
24+
SUSPEND,
25+
26+
/**
27+
* Drop **the oldest** value in the buffer on overflow, add the new value to the buffer, do not suspend.
28+
*/
29+
DROP_OLDEST,
30+
31+
/**
32+
* Drop **the latest** value that is being added to the buffer right now on buffer overflow,
33+
* so that buffer contents stay the same, do not suspend.
34+
*/
35+
DROP_LATEST
36+
}

0 commit comments

Comments
 (0)