Skip to content

Introduce SharedFlow and sharing operators #2069

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 32 commits into from
Oct 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
6e7aa7b
Introduce SharedFlow and sharing operators
elizarov May 8, 2020
67ec501
Merge branch 'develop' into shared-flow
elizarov Oct 9, 2020
3028679
~ Experimental resetReplayCache
elizarov Oct 9, 2020
8c83d34
~ No default for shareIn/stateIn started parameter
elizarov Oct 9, 2020
4196a1a
~ Deprecate Flow.broadcastIn
elizarov Oct 12, 2020
f67bbb7
~ Rephrased SharingStarted docs
elizarov Oct 12, 2020
6567aee
~ Added version to hidden declarations
elizarov Oct 12, 2020
c349fde
~ Private val flow in CancellableFlowImpl
elizarov Oct 12, 2020
1658064
~ Build: testNG task should be triggered by check, not test
elizarov Oct 12, 2020
cdb3162
~ Additional PublisherAsFlow tests for asFlow().buffer(...)
elizarov Oct 12, 2020
f0e7c6a
~ Fixed StartedWhileSubscribed.hashCode
elizarov Oct 12, 2020
3e9b12f
~ Optimized sharedIn(Lazily/Eagerly), removed DistinctFlow abstraction
elizarov Oct 12, 2020
de97928
~ Fix new DistinctUntilChanged tests on K/N
elizarov Oct 12, 2020
06cfa80
~ Bit more details in subscriptionCount docs
elizarov Oct 12, 2020
4735501
~ ArrayChannel params checks with assert
elizarov Oct 12, 2020
e86a111
~ Revert ArrayChannel params checks with assert, added comment
elizarov Oct 12, 2020
9120f28
~ Improve SharedFlow & StateFlow example code
elizarov Oct 12, 2020
cc03f3a
~ More eager cancellation check in StateFlow, better test
elizarov Oct 12, 2020
322f615
~ Updated API dump
elizarov Oct 12, 2020
22e4800
~ Optimized resume list to array
elizarov Oct 12, 2020
faaad4f
~ Optimized delay(Long.MAX_VALUE), SharingStarted docs
elizarov Oct 12, 2020
56ff4fb
~ Moved AbstractSharedFlow to internal package
elizarov Oct 12, 2020
17c8fac
~ More detailed docs on StateFlow.compareAndSet and test
elizarov Oct 12, 2020
25e08d4
~ Fixed Robolectric test for optimized delay impl
elizarov Oct 12, 2020
fe64780
~ Added comment on how freeSlot can resume coroutines
elizarov Oct 12, 2020
a0f7666
~ Code style
elizarov Oct 12, 2020
1900c9c
Merge remote-tracking branch 'origin/develop' into shared-flow
elizarov Oct 12, 2020
0fa4de6
~ One more fix for delay(MAX_VALUE) optimization
elizarov Oct 12, 2020
996e618
~ Fixed distinctUntilChanged on Kotlin/Native
elizarov Oct 12, 2020
bd32090
~ Tweaked defaults and parameter order for shareIn
elizarov Oct 13, 2020
db7e689
~ Code style
elizarov Oct 13, 2020
5db8a5d
~ Text typo
elizarov Oct 13, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ suspend fun main() = coroutineScope {
* [DebugProbes] API to probe, keep track of, print and dump active coroutines;
* [CoroutinesTimeout] test rule to automatically dump coroutines on test timeout.
* [reactive](reactive/README.md) — modules that provide builders and iteration support for various reactive streams libraries:
* Reactive Streams ([Publisher.collect], [Publisher.awaitSingle], [publish], etc),
* Reactive Streams ([Publisher.collect], [Publisher.awaitSingle], [kotlinx.coroutines.reactive.publish], etc),
* Flow (JDK 9) (the same interface as for Reactive Streams),
* RxJava 2.x ([rxFlowable], [rxSingle], etc), and
* RxJava 3.x ([rxFlowable], [rxSingle], etc), and
Expand Down Expand Up @@ -302,7 +302,7 @@ The `develop` branch is pushed to `master` during release.
<!--- INDEX kotlinx.coroutines.reactive -->
[Publisher.collect]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/org.reactivestreams.-publisher/collect.html
[Publisher.awaitSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/org.reactivestreams.-publisher/await-single.html
[publish]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/publish.html
[kotlinx.coroutines.reactive.publish]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/publish.html
<!--- MODULE kotlinx-coroutines-rx2 -->
<!--- INDEX kotlinx.coroutines.rx2 -->
[rxFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/rx-flowable.html
Expand Down
92 changes: 79 additions & 13 deletions kotlinx-coroutines-core/api/kotlinx-coroutines-core.api

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/common/src/CompletableDeferred.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import kotlinx.coroutines.selects.*
* All functions on this interface are **thread-safe** and can
* be safely invoked from concurrent coroutines without external synchronization.
*
* **`CompletableDeferred` interface is not stable for inheritance in 3rd party libraries**,
* **The `CompletableDeferred` interface is not stable for inheritance in 3rd party libraries**,
* as new methods might be added to this interface in the future, but is stable for use.
*/
public interface CompletableDeferred<T> : Deferred<T> {
Expand Down
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/common/src/CompletableJob.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ package kotlinx.coroutines
* All functions on this interface are **thread-safe** and can
* be safely invoked from concurrent coroutines without external synchronization.
*
* **`CompletableJob` interface is not stable for inheritance in 3rd party libraries**,
* **The `CompletableJob` interface is not stable for inheritance in 3rd party libraries**,
* as new methods might be added to this interface in the future, but is stable for use.
*/
public interface CompletableJob : Job {
Expand Down
5 changes: 4 additions & 1 deletion kotlinx-coroutines-core/common/src/Delay.kt
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,10 @@ public suspend fun awaitCancellation(): Nothing = suspendCancellableCoroutine {}
public suspend fun delay(timeMillis: Long) {
if (timeMillis <= 0) return // don't delay
return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
// if timeMillis == Long.MAX_VALUE then just wait forever like awaitCancellation, don't schedule.
if (timeMillis < Long.MAX_VALUE) {
cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
}
}
}

Expand Down
10 changes: 5 additions & 5 deletions kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -1019,23 +1019,23 @@ internal val EMPTY = Symbol("EMPTY") // marker for Conflated & Buffered channels

@JvmField
@SharedImmutable
internal val OFFER_SUCCESS: Any = Symbol("OFFER_SUCCESS")
internal val OFFER_SUCCESS = Symbol("OFFER_SUCCESS")

@JvmField
@SharedImmutable
internal val OFFER_FAILED: Any = Symbol("OFFER_FAILED")
internal val OFFER_FAILED = Symbol("OFFER_FAILED")

@JvmField
@SharedImmutable
internal val POLL_FAILED: Any = Symbol("POLL_FAILED")
internal val POLL_FAILED = Symbol("POLL_FAILED")

@JvmField
@SharedImmutable
internal val ENQUEUE_FAILED: Any = Symbol("ENQUEUE_FAILED")
internal val ENQUEUE_FAILED = Symbol("ENQUEUE_FAILED")

@JvmField
@SharedImmutable
internal val HANDLER_INVOKED: Any = Symbol("ON_CLOSE_HANDLER_INVOKED")
internal val HANDLER_INVOKED = Symbol("ON_CLOSE_HANDLER_INVOKED")

internal typealias Handler = (Throwable?) -> Unit

Expand Down
133 changes: 78 additions & 55 deletions kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,18 @@ internal open class ArrayChannel<E>(
/**
* Buffer capacity.
*/
val capacity: Int,
private val capacity: Int,
private val onBufferOverflow: BufferOverflow,
onUndeliveredElement: OnUndeliveredElement<E>?
) : AbstractChannel<E>(onUndeliveredElement) {
init {
// This check is actually used by the Channel(...) constructor function which checks only for known
// capacities and calls ArrayChannel constructor for everything else.
require(capacity >= 1) { "ArrayChannel capacity must be at least 1, but $capacity was specified" }
}

private val lock = ReentrantLock()

/*
* Guarded by lock.
* Allocate minimum of capacity and 16 to avoid excess memory pressure for large channels when it's not necessary.
Expand All @@ -43,7 +47,7 @@ internal open class ArrayChannel<E>(
protected final override val isBufferAlwaysEmpty: Boolean get() = false
protected final override val isBufferEmpty: Boolean get() = size.value == 0
protected final override val isBufferAlwaysFull: Boolean get() = false
protected final override val isBufferFull: Boolean get() = size.value == capacity
protected final override val isBufferFull: Boolean get() = size.value == capacity && onBufferOverflow == BufferOverflow.SUSPEND

override val isFull: Boolean get() = lock.withLock { isFullImpl }
override val isEmpty: Boolean get() = lock.withLock { isEmptyImpl }
Expand All @@ -55,31 +59,26 @@ internal open class ArrayChannel<E>(
lock.withLock {
val size = this.size.value
closedForSend?.let { return it }
if (size < capacity) {
// tentatively put element to buffer
this.size.value = size + 1 // update size before checking queue (!!!)
// check for receivers that were waiting on empty queue
if (size == 0) {
loop@ while (true) {
receive = takeFirstReceiveOrPeekClosed() ?: break@loop // break when no receivers queued
if (receive is Closed) {
this.size.value = size // restore size
return receive!!
}
val token = receive!!.tryResumeReceive(element, null)
if (token != null) {
assert { token === RESUME_TOKEN }
this.size.value = size // restore size
return@withLock
}
// update size before checking queue (!!!)
updateBufferSize(size)?.let { return it }
// check for receivers that were waiting on empty queue
if (size == 0) {
loop@ while (true) {
receive = takeFirstReceiveOrPeekClosed() ?: break@loop // break when no receivers queued
if (receive is Closed) {
this.size.value = size // restore size
return receive!!
}
val token = receive!!.tryResumeReceive(element, null)
if (token != null) {
assert { token === RESUME_TOKEN }
this.size.value = size // restore size
return@withLock
}
}
ensureCapacity(size)
buffer[(head + size) % buffer.size] = element // actually queue element
return OFFER_SUCCESS
}
// size == capacity: full
return OFFER_FAILED
enqueueElement(size, element)
return OFFER_SUCCESS
}
// breaks here if offer meets receiver
receive!!.completeResumeReceive(element)
Expand All @@ -92,41 +91,36 @@ internal open class ArrayChannel<E>(
lock.withLock {
val size = this.size.value
closedForSend?.let { return it }
if (size < capacity) {
// tentatively put element to buffer
this.size.value = size + 1 // update size before checking queue (!!!)
// check for receivers that were waiting on empty queue
if (size == 0) {
loop@ while (true) {
val offerOp = describeTryOffer(element)
val failure = select.performAtomicTrySelect(offerOp)
when {
failure == null -> { // offered successfully
this.size.value = size // restore size
receive = offerOp.result
return@withLock
}
failure === OFFER_FAILED -> break@loop // cannot offer -> Ok to queue to buffer
failure === RETRY_ATOMIC -> {} // retry
failure === ALREADY_SELECTED || failure is Closed<*> -> {
this.size.value = size // restore size
return failure
}
else -> error("performAtomicTrySelect(describeTryOffer) returned $failure")
// update size before checking queue (!!!)
updateBufferSize(size)?.let { return it }
// check for receivers that were waiting on empty queue
if (size == 0) {
loop@ while (true) {
val offerOp = describeTryOffer(element)
val failure = select.performAtomicTrySelect(offerOp)
when {
failure == null -> { // offered successfully
this.size.value = size // restore size
receive = offerOp.result
return@withLock
}
failure === OFFER_FAILED -> break@loop // cannot offer -> Ok to queue to buffer
failure === RETRY_ATOMIC -> {} // retry
failure === ALREADY_SELECTED || failure is Closed<*> -> {
this.size.value = size // restore size
return failure
}
else -> error("performAtomicTrySelect(describeTryOffer) returned $failure")
}
}
// let's try to select sending this element to buffer
if (!select.trySelect()) { // :todo: move trySelect completion outside of lock
this.size.value = size // restore size
return ALREADY_SELECTED
}
ensureCapacity(size)
buffer[(head + size) % buffer.size] = element // actually queue element
return OFFER_SUCCESS
}
// size == capacity: full
return OFFER_FAILED
// let's try to select sending this element to buffer
if (!select.trySelect()) { // :todo: move trySelect completion outside of lock
this.size.value = size // restore size
return ALREADY_SELECTED
}
enqueueElement(size, element)
return OFFER_SUCCESS
}
// breaks here if offer meets receiver
receive!!.completeResumeReceive(element)
Expand All @@ -137,6 +131,35 @@ internal open class ArrayChannel<E>(
super.enqueueSend(send)
}

// Guarded by lock
// Result is `OFFER_SUCCESS | OFFER_FAILED | null`
private fun updateBufferSize(currentSize: Int): Symbol? {
if (currentSize < capacity) {
size.value = currentSize + 1 // tentatively put it into the buffer
return null // proceed
}
// buffer is full
return when (onBufferOverflow) {
BufferOverflow.SUSPEND -> OFFER_FAILED
BufferOverflow.DROP_LATEST -> OFFER_SUCCESS
BufferOverflow.DROP_OLDEST -> null // proceed, will drop oldest in enqueueElement
}
}

// Guarded by lock
private fun enqueueElement(currentSize: Int, element: E) {
if (currentSize < capacity) {
ensureCapacity(currentSize)
buffer[(head + currentSize) % buffer.size] = element // actually queue element
} else {
// buffer is full
assert { onBufferOverflow == BufferOverflow.DROP_OLDEST } // the only way we can get here
buffer[head % buffer.size] = null // drop oldest element
buffer[(head + currentSize) % buffer.size] = element // actually queue element
head = (head + 1) % buffer.size
}
}

// Guarded by lock
private fun ensureCapacity(currentSize: Int) {
if (currentSize >= buffer.size) {
Expand Down
7 changes: 4 additions & 3 deletions kotlinx-coroutines-core/common/src/channels/Broadcast.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
import kotlinx.coroutines.intrinsics.*
import kotlin.coroutines.*
import kotlin.coroutines.intrinsics.*
import kotlin.native.concurrent.*

/**
* Broadcasts all elements of the channel.
Expand All @@ -34,8 +33,10 @@ import kotlin.native.concurrent.*
*
* This function has an inappropriate result type of [BroadcastChannel] which provides
* [send][BroadcastChannel.send] and [close][BroadcastChannel.close] operations that interfere with
* the broadcasting coroutine in hard-to-specify ways. It will be replaced with
* sharing operators on [Flow][kotlinx.coroutines.flow.Flow] in the future.
* the broadcasting coroutine in hard-to-specify ways.
*
* **Note: This API is obsolete.** It will be deprecated and replaced with the
* [Flow.shareIn][kotlinx.coroutines.flow.shareIn] operator when it becomes stable.
*
* @param start coroutine start option. The default value is [CoroutineStart.LAZY].
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
package kotlinx.coroutines.channels

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel.Factory.CONFLATED
import kotlinx.coroutines.channels.Channel.Factory.BUFFERED
import kotlinx.coroutines.channels.Channel.Factory.CHANNEL_DEFAULT_CAPACITY
import kotlinx.coroutines.channels.Channel.Factory.CONFLATED
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED

/**
Expand All @@ -20,9 +20,10 @@ import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
* See `BroadcastChannel()` factory function for the description of available
* broadcast channel implementations.
*
* **Note: This is an experimental api.** It may be changed in the future updates.
* **Note: This API is obsolete.** It will be deprecated and replaced by [SharedFlow][kotlinx.coroutines.flow.SharedFlow]
* when it becomes stable.
*/
@ExperimentalCoroutinesApi
@ExperimentalCoroutinesApi // not @ObsoleteCoroutinesApi to reduce burden for people who are still using it
public interface BroadcastChannel<E> : SendChannel<E> {
/**
* Subscribes to this [BroadcastChannel] and returns a channel to receive elements from it.
Expand Down
36 changes: 36 additions & 0 deletions kotlinx-coroutines-core/common/src/channels/BufferOverflow.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.channels

import kotlinx.coroutines.*

/**
* A strategy for buffer overflow handling in [channels][Channel] and [flows][kotlinx.coroutines.flow.Flow] that
* controls what is going to be sacrificed on buffer overflow:
*
* * [SUSPEND] &mdash; the upstream that is [sending][SendChannel.send] or
* is [emitting][kotlinx.coroutines.flow.FlowCollector.emit] a value is **suspended** while the buffer is full.
* * [DROP_OLDEST] &mdash; drop **the oldest** value in the buffer on overflow, add the new value to the buffer, do not suspend.
* * [DROP_LATEST] &mdash; drop **the latest** value that is being added to the buffer right now on buffer overflow
* (so that buffer contents stay the same), do not suspend.
*/
@ExperimentalCoroutinesApi
public enum class BufferOverflow {
/**
* Suspend on buffer overflow.
*/
SUSPEND,

/**
* Drop **the oldest** value in the buffer on overflow, add the new value to the buffer, do not suspend.
*/
DROP_OLDEST,

/**
* Drop **the latest** value that is being added to the buffer right now on buffer overflow
* (so that buffer contents stay the same), do not suspend.
*/
DROP_LATEST
}
Loading