Skip to content

Consume as flow #1343

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,9 @@ public final class kotlinx/coroutines/channels/ChannelsKt {
public static final fun minWith (Lkotlinx/coroutines/channels/ReceiveChannel;Ljava/util/Comparator;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun none (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun none (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun onReceiveOrNull (Lkotlinx/coroutines/channels/ReceiveChannel;)Lkotlinx/coroutines/selects/SelectClause1;
public static final fun partition (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun receiveOrNull (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun reduce (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun reduceIndexed (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun requireNoNulls (Lkotlinx/coroutines/channels/ReceiveChannel;)Lkotlinx/coroutines/channels/ReceiveChannel;
Expand Down Expand Up @@ -743,12 +745,14 @@ public abstract interface class kotlinx/coroutines/channels/ReceiveChannel {
public abstract synthetic fun cancel (Ljava/lang/Throwable;)Z
public abstract fun cancel (Ljava/util/concurrent/CancellationException;)V
public abstract fun getOnReceive ()Lkotlinx/coroutines/selects/SelectClause1;
public abstract fun getOnReceiveOrClosed ()Lkotlinx/coroutines/selects/SelectClause1;
public abstract fun getOnReceiveOrNull ()Lkotlinx/coroutines/selects/SelectClause1;
public abstract fun isClosedForReceive ()Z
public abstract fun isEmpty ()Z
public abstract fun iterator ()Lkotlinx/coroutines/channels/ChannelIterator;
public abstract fun poll ()Ljava/lang/Object;
public abstract fun receive (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun receiveOrClosed (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun receiveOrNull (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

Expand Down Expand Up @@ -784,6 +788,23 @@ public final class kotlinx/coroutines/channels/TickerMode : java/lang/Enum {
public static fun values ()[Lkotlinx/coroutines/channels/TickerMode;
}

public final class kotlinx/coroutines/channels/ValueOrClosed {
public static final field Companion Lkotlinx/coroutines/channels/ValueOrClosed$Companion;
public static final synthetic fun box-impl (Ljava/lang/Object;)Lkotlinx/coroutines/channels/ValueOrClosed;
public fun equals (Ljava/lang/Object;)Z
public static fun equals-impl (Ljava/lang/Object;Ljava/lang/Object;)Z
public static final fun equals-impl0 (Ljava/lang/Object;Ljava/lang/Object;)Z
public static final fun getCloseCause-impl (Ljava/lang/Object;)Ljava/lang/Throwable;
public static final fun getValue-impl (Ljava/lang/Object;)Ljava/lang/Object;
public static final fun getValueOrNull-impl (Ljava/lang/Object;)Ljava/lang/Object;
public fun hashCode ()I
public static fun hashCode-impl (Ljava/lang/Object;)I
public static final fun isClosed-impl (Ljava/lang/Object;)Z
public fun toString ()Ljava/lang/String;
public static fun toString-impl (Ljava/lang/Object;)Ljava/lang/String;
public final synthetic fun unbox-impl ()Ljava/lang/Object;
}

public abstract class kotlinx/coroutines/flow/AbstractFlow : kotlinx/coroutines/flow/Flow {
public fun <init> ()V
public final fun collect (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
Expand Down Expand Up @@ -827,6 +848,7 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;[Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final synthetic fun combineLatest (Lkotlinx/coroutines/flow/Flow;[Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun conflate (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
public static final fun consumeAsFlow (Lkotlinx/coroutines/channels/ReceiveChannel;)Lkotlinx/coroutines/flow/Flow;
public static final fun count (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun count (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun debounce (Lkotlinx/coroutines/flow/Flow;J)Lkotlinx/coroutines/flow/Flow;
Expand All @@ -836,6 +858,7 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static final fun distinctUntilChangedBy (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
public static final fun drop (Lkotlinx/coroutines/flow/Flow;I)Lkotlinx/coroutines/flow/Flow;
public static final fun dropWhile (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun emitAll (Lkotlinx/coroutines/flow/FlowCollector;Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun emitAll (Lkotlinx/coroutines/flow/FlowCollector;Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun emptyFlow ()Lkotlinx/coroutines/flow/Flow;
public static final fun filter (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
Expand Down Expand Up @@ -923,7 +946,7 @@ public abstract class kotlinx/coroutines/flow/internal/ChannelFlow : kotlinx/cor
public final field context Lkotlin/coroutines/CoroutineContext;
public fun <init> (Lkotlin/coroutines/CoroutineContext;I)V
public fun additionalToStringProps ()Ljava/lang/String;
public final fun broadcastImpl (Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/CoroutineStart;)Lkotlinx/coroutines/channels/BroadcastChannel;
public fun broadcastImpl (Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/CoroutineStart;)Lkotlinx/coroutines/channels/BroadcastChannel;
public fun collect (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
protected abstract fun collectTo (Lkotlinx/coroutines/channels/ProducerScope;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
protected abstract fun create (Lkotlin/coroutines/CoroutineContext;I)Lkotlinx/coroutines/flow/internal/ChannelFlow;
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@ allprojects {
def platform = platformOf(it)
apply from: rootProject.file("gradle/compile-${platform}.gradle")


dependencies {
// See comment below for rationale, it will be replaced with "project" dependency
compile "org.jetbrains.kotlinx:kotlinx-coroutines-core:$version"
Expand All @@ -135,6 +134,7 @@ allprojects {
tasks.withType(org.jetbrains.kotlin.gradle.tasks.AbstractKotlinCompile).all {
kotlinOptions.freeCompilerArgs += experimentalAnnotations.collect { "-Xuse-experimental=" + it }
kotlinOptions.freeCompilerArgs += "-progressive"
kotlinOptions.freeCompilerArgs += "-XXLanguage:+InlineClasses"
// Binary compatibility support
kotlinOptions.freeCompilerArgs += ["-Xdump-declarations-to=${buildDir}/visibilities.json"]
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.channels

import kotlinx.coroutines.*
import kotlin.test.*

class ChannelReceiveOrClosedTest : TestBase() {
@Test
fun testChannelOfThrowables() = runTest {
val channel = Channel<Throwable>()
launch {
channel.send(TestException1())
channel.close(TestException2())
}

val element = channel.receiveOrClosed()
assertTrue(element.isValue)
assertTrue(element.value is TestException1)
assertTrue(element.valueOrNull is TestException1)

val closed = channel.receiveOrClosed()
assertTrue(closed.isClosed)
assertTrue(closed.closeCause is TestException2)
}

@Test
@Suppress("ReplaceAssertBooleanWithAssertEquality") // inline classes test
fun testNullableIntChanel() = runTest {
val channel = Channel<Int?>()
launch {
expect(2)
channel.send(1)
expect(3)
channel.send(null)

expect(6)
channel.close()
}

expect(1)
val element = channel.receiveOrClosed()
assertTrue(element.isValue)
assertEquals(1, element.value)
assertEquals(1, element.valueOrNull)
assertEquals("Value(1)", element.toString())
assertTrue(ValueOrClosed.value(1) == element) // Don't box

expect(4)
val nullElement = channel.receiveOrClosed()
assertTrue(nullElement.isValue)
assertNull(nullElement.value)
assertNull(nullElement.valueOrNull)
assertEquals("Value(null)", nullElement.toString())
assertTrue(ValueOrClosed.value(null) == nullElement) // Don't box

expect(5)
val closed = channel.receiveOrClosed()
assertTrue(closed.isClosed)

val closed2 = channel.receiveOrClosed()
assertTrue(closed2.isClosed)
assertTrue(closed2.closeCause is ClosedReceiveChannelException)
finish(7)
}

@Test
@ExperimentalUnsignedTypes
fun testUIntChannel() = runTest {
val channel = Channel<UInt>()
launch {
expect(2)
channel.send(1u)
yield()
expect(4)
channel.send((Long.MAX_VALUE - 1).toUInt())
expect(5)
}

expect(1)
val element = channel.receiveOrClosed()
assertEquals(1u, element.value)

expect(3)
val element2 = channel.receiveOrClosed()
assertEquals((Long.MAX_VALUE - 1).toUInt(), element2.value)
finish(6)
}

@Test
fun testCancelChannel() = runTest {
val channel = Channel<Boolean>()
launch {
expect(2)
channel.cancel()
}

expect(1)
val closed = channel.receiveOrClosed()
assertTrue(closed.isClosed)
assertTrue(closed.closeCause is ClosedReceiveChannelException)
finish(3)
}

@Test
@ExperimentalUnsignedTypes
fun testReceiveResultChannel() = runTest {
val channel = Channel<ValueOrClosed<UInt>>()
launch {
channel.send(ValueOrClosed.value(1u))
channel.send(ValueOrClosed.closed(TestException1()))
channel.close(TestException2())
}

val intResult = channel.receiveOrClosed()
assertTrue(intResult.isValue)
assertEquals(1u, intResult.value.value)

val closeCauseResult = channel.receiveOrClosed()
assertTrue(closeCauseResult.isValue)
assertTrue(closeCauseResult.value.closeCause is TestException1)

val closeCause = channel.receiveOrClosed()
assertTrue(closeCause.isClosed)
assertTrue(closeCause.closeCause is TestException2)
assertFailsWith<TestException2> { closeCause.valueOrThrow }
}

@Test
fun testToString() = runTest {
val channel = Channel<String>(1)
channel.send("message")
channel.close(TestException1())
assertEquals("Value(message)", channel.receiveOrClosed().toString())
// toString implementation for exception differs on every platform
val str = channel.receiveOrClosed().toString()
assertTrue(str.matches("Closed\\(.*TestException1\\)".toRegex()))
}
}
12 changes: 8 additions & 4 deletions docs/select-expression.md
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ buzz -> 'Buzz!'
### Selecting on close

The [onReceive][ReceiveChannel.onReceive] clause in `select` fails when the channel is closed causing the corresponding
`select` to throw an exception. We can use [onReceiveOrNull][ReceiveChannel.onReceiveOrNull] clause to perform a
`select` to throw an exception. We can use [onReceiveOrNull][onReceiveOrNull] clause to perform a
specific action when the channel is closed. The following example also shows that `select` is an expression that returns
the result of its selected clause:

Expand All @@ -189,6 +189,10 @@ suspend fun selectAorB(a: ReceiveChannel<String>, b: ReceiveChannel<String>): St

</div>

Note that [onReceiveOrNull][onReceiveOrNull] is an extension function defined only
for channels with non-nullable elements so that there is no accidental confusion between a closed channel
and a null value.

Let's use it with channel `a` that produces "Hello" string four times and
channel `b` that produces "World" four times:

Expand Down Expand Up @@ -259,7 +263,7 @@ the first one among them gets selected. Here, both channels are constantly produ
being the first clause in select, wins. However, because we are using unbuffered channel, the `a` gets suspended from
time to time on its [send][SendChannel.send] invocation and gives a chance for `b` to send, too.

The second observation, is that [onReceiveOrNull][ReceiveChannel.onReceiveOrNull] gets immediately selected when the
The second observation, is that [onReceiveOrNull][onReceiveOrNull] gets immediately selected when the
channel is already closed.

### Selecting to send
Expand Down Expand Up @@ -433,7 +437,7 @@ Deferred 4 produced answer 'Waited for 128 ms'

Let us write a channel producer function that consumes a channel of deferred string values, waits for each received
deferred value, but only until the next deferred value comes over or the channel is closed. This example puts together
[onReceiveOrNull][ReceiveChannel.onReceiveOrNull] and [onAwait][Deferred.onAwait] clauses in the same `select`:
[onReceiveOrNull][onReceiveOrNull] and [onAwait][Deferred.onAwait] clauses in the same `select`:

<div class="sample" markdown="1" theme="idea" data-highlight-only>

Expand Down Expand Up @@ -556,7 +560,7 @@ Channel was closed
<!--- INDEX kotlinx.coroutines.channels -->
[ReceiveChannel.receive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/receive.html
[ReceiveChannel.onReceive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/on-receive.html
[ReceiveChannel.onReceiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/on-receive-or-null.html
[onReceiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/on-receive-or-null.html
[SendChannel.send]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/send.html
[SendChannel.onSend]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/on-send.html
<!--- INDEX kotlinx.coroutines.selects -->
Expand Down
6 changes: 3 additions & 3 deletions kotlinx-coroutines-core/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ helper function. [NonCancellable] job object is provided to suppress cancellatio
| [Deferred] | [await][Deferred.await] | [onAwait][Deferred.onAwait] | [isCompleted][Job.isCompleted]
| [SendChannel][kotlinx.coroutines.channels.SendChannel] | [send][kotlinx.coroutines.channels.SendChannel.send] | [onSend][kotlinx.coroutines.channels.SendChannel.onSend] | [offer][kotlinx.coroutines.channels.SendChannel.offer]
| [ReceiveChannel][kotlinx.coroutines.channels.ReceiveChannel] | [receive][kotlinx.coroutines.channels.ReceiveChannel.receive] | [onReceive][kotlinx.coroutines.channels.ReceiveChannel.onReceive] | [poll][kotlinx.coroutines.channels.ReceiveChannel.poll]
| [ReceiveChannel][kotlinx.coroutines.channels.ReceiveChannel] | [receiveOrNull][kotlinx.coroutines.channels.ReceiveChannel.receiveOrNull] | [onReceiveOrNull][kotlinx.coroutines.channels.ReceiveChannel.onReceiveOrNull] | [poll][kotlinx.coroutines.channels.ReceiveChannel.poll]
| [ReceiveChannel][kotlinx.coroutines.channels.ReceiveChannel] | [receiveOrNull][kotlinx.coroutines.channels.receiveOrNull] | [onReceiveOrNull][kotlinx.coroutines.channels.onReceiveOrNull] | [poll][kotlinx.coroutines.channels.ReceiveChannel.poll]
| [Mutex][kotlinx.coroutines.sync.Mutex] | [lock][kotlinx.coroutines.sync.Mutex.lock] | [onLock][kotlinx.coroutines.sync.Mutex.onLock] | [tryLock][kotlinx.coroutines.sync.Mutex.tryLock]
| none | [delay] | [onTimeout][kotlinx.coroutines.selects.SelectBuilder.onTimeout] | none

Expand Down Expand Up @@ -131,8 +131,8 @@ Obsolete and deprecated module to test coroutines. Replaced with `kotlinx-corout
[kotlinx.coroutines.channels.SendChannel.offer]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/offer.html
[kotlinx.coroutines.channels.ReceiveChannel.onReceive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/on-receive.html
[kotlinx.coroutines.channels.ReceiveChannel.poll]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/poll.html
[kotlinx.coroutines.channels.ReceiveChannel.receiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/receive-or-null.html
[kotlinx.coroutines.channels.ReceiveChannel.onReceiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/on-receive-or-null.html
[kotlinx.coroutines.channels.receiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/receive-or-null.html
[kotlinx.coroutines.channels.onReceiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/on-receive-or-null.html
<!--- INDEX kotlinx.coroutines.selects -->
[kotlinx.coroutines.selects.select]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.selects/select.html
[kotlinx.coroutines.selects.SelectBuilder.onTimeout]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.selects/-select-builder/on-timeout.html
Expand Down
Loading