Skip to content

Provide CoroutineScope into flowViaChannel block, but make it non-sus… #1134

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions kotlinx-coroutines-core/common/src/flow/Builders.kt
Original file line number Diff line number Diff line change
Expand Up @@ -185,18 +185,17 @@ public fun LongRange.asFlow(): Flow<Long> = flow {
/**
* Creates an instance of the cold [Flow] with elements that are sent to a [SendChannel]
* that is provided to the builder's [block] of code. It allows elements to be
* produced by the code that is running in a different context,
* e.g. from a callback-based API.
* produced by the code that is running in a different context, e.g. from a callback-based API.
*
* The resulting flow is _cold_, which means that [block] is called on each call of a terminal operator
* on the resulting flow.
* on the resulting flow. The [block] is not suspending deliberately, if you need suspending scope, [flow] builder
* should be used instead.
*
* To control backpressure, [bufferSize] is used and matches directly the `capacity` parameter of [Channel] factory.
* The provided channel can later be used by any external service to communicate with flow and its buffer determines
* backpressure buffer size or its behaviour (e.g. in case when [Channel.CONFLATED] was used).
*
* Example of usage:
*
* ```
* fun flowFrom(api: CallbackBasedApi): Flow<T> = flowViaChannel { channel ->
* val callback = object : Callback { // implementation of some callback interface
Expand All @@ -206,6 +205,7 @@ public fun LongRange.asFlow(): Flow<Long> = flow {
* override fun onApiError(cause: Throwable) {
* channel.cancel("API Error", CancellationException(cause))
* }
* override fun onCompleted() = channel.close()
* }
* api.register(callback)
* channel.invokeOnClose {
Expand All @@ -217,7 +217,7 @@ public fun LongRange.asFlow(): Flow<Long> = flow {
@FlowPreview
public fun <T> flowViaChannel(
bufferSize: Int = 16,
@BuilderInference block: suspend (SendChannel<T>) -> Unit
@BuilderInference block: CoroutineScope.(channel: SendChannel<T>) -> Unit
): Flow<T> {
return flow {
coroutineScope {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,20 @@ class FlowViaChannelTest : TestBase() {
@Test
fun testRegular() = runTest {
val flow = flowViaChannel<Int> {
it.send(1)
it.send(2)
assertTrue(it.offer(1))
assertTrue(it.offer(2))
assertTrue(it.offer(3))
it.close()
}
assertEquals(listOf(1, 2, 3), flow.toList())
}

@Test
fun testBuffer() = runTest {
val flow = flowViaChannel<Int>(bufferSize = 1) {
assertTrue(it.offer(1))
assertTrue(it.offer(2))
assertFalse(it.offer(3))
it.close()
}
assertEquals(listOf(1, 2), flow.toList())
Expand All @@ -22,10 +34,51 @@ class FlowViaChannelTest : TestBase() {
@Test
fun testConflated() = runTest {
val flow = flowViaChannel<Int>(bufferSize = Channel.CONFLATED) {
it.send(1)
it.send(2)
assertTrue(it.offer(1))
assertTrue(it.offer(2))
it.close()
}
assertEquals(listOf(1), flow.toList())
}

@Test
fun testFailureCancelsChannel() = runTest {
val flow = flowViaChannel<Int> {
it.offer(1)
it.invokeOnClose {
expect(2)
}
}.onEach { throw TestException() }

expect(1)
assertFailsWith<TestException>(flow)
finish(3)
}

@Test
fun testFailureInSourceCancelsConsumer() = runTest {
val flow = flowViaChannel<Int> {
expect(2)
throw TestException()
}.onEach { expectUnreached() }

expect(1)
assertFailsWith<TestException>(flow)
finish(3)
}

@Test
fun testScopedCancellation() = runTest {
val flow = flowViaChannel<Int> {
expect(2)
launch(start = CoroutineStart.ATOMIC) {
hang { expect(3) }
}
throw TestException()
}.onEach { expectUnreached() }

expect(1)
assertFailsWith<TestException>(flow)
finish(4)
}
}