Skip to content

Introduce SharedFlow and sharing operators #2069

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 32 commits into from
Oct 13, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
6e7aa7b
Introduce SharedFlow and sharing operators
elizarov May 8, 2020
67ec501
Merge branch 'develop' into shared-flow
elizarov Oct 9, 2020
3028679
~ Experimental resetReplayCache
elizarov Oct 9, 2020
8c83d34
~ No default for shareIn/stateIn started parameter
elizarov Oct 9, 2020
4196a1a
~ Deprecate Flow.broadcastIn
elizarov Oct 12, 2020
f67bbb7
~ Rephrased SharingStarted docs
elizarov Oct 12, 2020
6567aee
~ Added version to hidden declarations
elizarov Oct 12, 2020
c349fde
~ Private val flow in CancellableFlowImpl
elizarov Oct 12, 2020
1658064
~ Build: testNG task should be triggered by check, not test
elizarov Oct 12, 2020
cdb3162
~ Additional PublisherAsFlow tests for asFlow().buffer(...)
elizarov Oct 12, 2020
f0e7c6a
~ Fixed StartedWhileSubscribed.hashCode
elizarov Oct 12, 2020
3e9b12f
~ Optimized sharedIn(Lazily/Eagerly), removed DistinctFlow abstraction
elizarov Oct 12, 2020
de97928
~ Fix new DistinctUntilChanged tests on K/N
elizarov Oct 12, 2020
06cfa80
~ Bit more details in subscriptionCount docs
elizarov Oct 12, 2020
4735501
~ ArrayChannel params checks with assert
elizarov Oct 12, 2020
e86a111
~ Revert ArrayChannel params checks with assert, added comment
elizarov Oct 12, 2020
9120f28
~ Improve SharedFlow & StateFlow example code
elizarov Oct 12, 2020
cc03f3a
~ More eager cancellation check in StateFlow, better test
elizarov Oct 12, 2020
322f615
~ Updated API dump
elizarov Oct 12, 2020
22e4800
~ Optimized resume list to array
elizarov Oct 12, 2020
faaad4f
~ Optimized delay(Long.MAX_VALUE), SharingStarted docs
elizarov Oct 12, 2020
56ff4fb
~ Moved AbstractSharedFlow to internal package
elizarov Oct 12, 2020
17c8fac
~ More detailed docs on StateFlow.compareAndSet and test
elizarov Oct 12, 2020
25e08d4
~ Fixed Robolectric test for optimized delay impl
elizarov Oct 12, 2020
fe64780
~ Added comment on how freeSlot can resume coroutines
elizarov Oct 12, 2020
a0f7666
~ Code style
elizarov Oct 12, 2020
1900c9c
Merge remote-tracking branch 'origin/develop' into shared-flow
elizarov Oct 12, 2020
0fa4de6
~ One more fix for delay(MAX_VALUE) optimization
elizarov Oct 12, 2020
996e618
~ Fixed distinctUntilChanged on Kotlin/Native
elizarov Oct 12, 2020
bd32090
~ Tweaked defaults and parameter order for shareIn
elizarov Oct 13, 2020
db7e689
~ Code style
elizarov Oct 13, 2020
5db8a5d
~ Text typo
elizarov Oct 13, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,10 @@ private class ReactiveSubscriber<T : Any>(
private val requestSize: Long
) : Subscriber<T> {
private lateinit var subscription: Subscription
private val channel = Channel<T>(capacity, onBufferOverflow)

// This implementation of ReactiveSubscriber always uses "offer" in its onNext implementation and it cannot
// be reliable with rendezvous channel, so a rendezvous channel is replaced with buffer=1 channel
private val channel = Channel<T>(if (capacity == Channel.RENDEZVOUS) 1 else capacity, onBufferOverflow)

suspend fun takeNextOrNull(): T? = channel.receiveOrNull()

Expand Down
83 changes: 83 additions & 0 deletions reactive/kotlinx-coroutines-reactive/test/PublisherAsFlowTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package kotlinx.coroutines.reactive
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*
import org.reactivestreams.*
import kotlin.test.*

class PublisherAsFlowTest : TestBase() {
Expand Down Expand Up @@ -181,4 +182,86 @@ class PublisherAsFlowTest : TestBase() {
}
finish(6)
}

@Test
fun testRequestRendezvous() =
testRequestSizeWithBuffer(Channel.RENDEZVOUS, BufferOverflow.SUSPEND, 1)

@Test
fun testRequestBuffer1() =
testRequestSizeWithBuffer(1, BufferOverflow.SUSPEND, 1)

@Test
fun testRequestBuffer10() =
testRequestSizeWithBuffer(10, BufferOverflow.SUSPEND, 10)

@Test
fun testRequestBufferUnlimited() =
testRequestSizeWithBuffer(Channel.UNLIMITED, BufferOverflow.SUSPEND, Long.MAX_VALUE)

@Test
fun testRequestBufferOverflowSuspend() =
testRequestSizeWithBuffer(Channel.BUFFERED, BufferOverflow.SUSPEND, 64)

@Test
fun testRequestBufferOverflowDropOldest() =
testRequestSizeWithBuffer(Channel.BUFFERED, BufferOverflow.DROP_OLDEST, Long.MAX_VALUE)

@Test
fun testRequestBufferOverflowDropLatest() =
testRequestSizeWithBuffer(Channel.BUFFERED, BufferOverflow.DROP_LATEST, Long.MAX_VALUE)

@Test
fun testRequestBuffer10OverflowDropOldest() =
testRequestSizeWithBuffer(10, BufferOverflow.DROP_OLDEST, Long.MAX_VALUE)

@Test
fun testRequestBuffer10OverflowDropLatest() =
testRequestSizeWithBuffer(10, BufferOverflow.DROP_LATEST, Long.MAX_VALUE)

/**
* Tests `publisher.asFlow.buffer(...)` chain, verifying expected requests size and that only expected
* values are delivered.
*/
private fun testRequestSizeWithBuffer(
capacity: Int,
onBufferOverflow: BufferOverflow,
expectedRequestSize: Long
) = runTest {
val m = 50
// publishers numbers from 1 to m
val publisher = Publisher<Int> { s ->
s.onSubscribe(object : Subscription {
var lastSent = 0
var remaining = 0L
override fun request(n: Long) {
assertEquals(expectedRequestSize, n)
remaining += n
check(remaining >= 0)
while (lastSent < m && remaining > 0) {
s.onNext(++lastSent)
remaining--
}
if (lastSent == m) s.onComplete()
}

override fun cancel() {}
})
}
val flow = publisher
.asFlow()
.buffer(capacity, onBufferOverflow)
val list = flow.toList()
val runSize = if (capacity == Channel.BUFFERED) 1 else capacity
val expected = when(onBufferOverflow) {
// Everything is expected to be delivered
BufferOverflow.SUSPEND -> (1..m).toList()
// Only the last one (by default) or the last "capacity" items delivered
BufferOverflow.DROP_OLDEST -> (m - runSize + 1..m).toList()
// Only the first one (by default) or the first "capacity" items delivered
BufferOverflow.DROP_LATEST -> (1..runSize).toList()
}
assertEquals(expected, list)
}

}