diff --git a/coroutines-guide.md b/coroutines-guide.md index b2be6120b6..e1a1a9eea2 100644 --- a/coroutines-guide.md +++ b/coroutines-guide.md @@ -1184,6 +1184,35 @@ fun main(args: Array) = runBlocking { Done! --> +Instead of a `for` loop you can also use convenient extension functions, such as [collectList], +[collectMap], [collectSequence] or [collectSet], +which create a collection of elements received from `ReceiveChannel`: + +```kotlin +fun main(args: Array) = runBlocking { + val channel = Channel() + launch(CommonPool) { + for (x in 1..5) channel.send(x * x) + channel.close() // we're done sending + } + + val list = channel.collectList() + list.forEach { println(it) } + println("Done!") +} +``` + +> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-10.kt) + + + ### Building channel producers The pattern where a coroutine is producing a sequence of elements is quite common. @@ -2219,6 +2248,10 @@ Channel was closed [SendChannel.close]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-send-channel/close.html [produce]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/produce.html [consumeEach]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/consume-each.html +[collectList]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/collect-list.html +[collectMap]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/collect-map.html +[collectSequence]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/collect-sequence.html +[collectSet]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/collect-set.html [Channel.invoke]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-channel/invoke.html [actor]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/actor.html diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channels.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channels.kt index ec9ec7d072..fcc95d9147 100644 --- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channels.kt +++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channels.kt @@ -35,3 +35,42 @@ public suspend fun BroadcastChannel.consumeEach(action: suspend (E) -> Un for (x in channel) action(x) } } + +/** + * Collects all received elements into a [List]. + */ +public suspend fun ReceiveChannel.collectList(): List { + val list = mutableListOf() + + consumeEach { list += it } + + return list +} + +/** + * Collects all received elements into a [Map] using specified [keyExtractor] to extract key from element. + */ +public suspend fun ReceiveChannel.collectMap(keyExtractor: (E) -> K): Map { + val map = mutableMapOf() + + consumeEach { map += keyExtractor(it) to it } + + return map +} + +/** + * Collects all received elements into a [Sequence]. + */ +public suspend fun ReceiveChannel.collectSequence(): Sequence = collectList().asSequence() + +/** + * Collects all received elements into a [Set]. + */ +public suspend fun ReceiveChannel.collectSet(): Set { + val set = mutableSetOf() + + consumeEach { set += it } + + return set +} + diff --git a/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-10.kt b/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-10.kt new file mode 100644 index 0000000000..599a0a034c --- /dev/null +++ b/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-10.kt @@ -0,0 +1,33 @@ +/* + * Copyright 2016-2017 JetBrains s.r.o. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit. +package guide.channel.example10 + +import kotlinx.coroutines.experimental.* +import kotlinx.coroutines.experimental.channels.* + +fun main(args: Array) = runBlocking { + val channel = Channel() + launch(CommonPool) { + for (x in 1..5) channel.send(x * x) + channel.close() // we're done sending + } + + val list = channel.collectList() + list.forEach { println(it) } + println("Done!") +} diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelsTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelsTest.kt new file mode 100644 index 0000000000..a395ff5046 --- /dev/null +++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelsTest.kt @@ -0,0 +1,107 @@ +/* + * Copyright 2016-2017 JetBrains s.r.o. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kotlinx.coroutines.experimental.channels + +import kotlinx.coroutines.experimental.runBlocking +import org.junit.Assert.assertEquals +import org.junit.Assert.assertTrue +import org.junit.Test + +class ChannelsTest { + @Test + fun testEmptyList() = runBlocking { + val channel = Channel() + channel.close() + + assertTrue(channel.collectList().isEmpty()) + } + + @Test + fun testCollectList() = runBlocking { + val values = listOf("A", "B", "F") + val channel = Channel(values.size) + values.forEach { + channel.send(it) + } + channel.close() + + assertEquals(channel.collectList(), values) + } + + @Test + fun testEmptySet() = runBlocking { + val channel = Channel() + channel.close() + + assertTrue(channel.collectSet().isEmpty()) + } + + @Test + fun testCollectSet() = runBlocking { + val values = setOf("A", "B", "F") + val channel = Channel(values.size) + values.forEach { + channel.send(it) + } + channel.close() + + assertEquals(channel.collectSet(), values) + } + + @Test + fun testEmptySequence() = runBlocking { + val channel = Channel() + channel.close() + + assertTrue(channel.collectSequence().count() == 0) + } + + @Test + fun testCollectSequence() = runBlocking { + val values = listOf("A", "B", "F") + val channel = Channel(values.size) + values.forEach { + channel.send(it) + } + channel.close() + + assertEquals(channel.collectSequence().toList(), values.toList()) + } + + @Test + fun testEmptyMap() = runBlocking { + val channel = Channel() + channel.close() + + assertTrue(channel.collectMap { it }.isEmpty()) + } + + @Test + fun testCollectMap() = runBlocking { + val values = mapOf("A" to 1, "B" to 2, "F" to 3) + val channel = Channel>(values.size) + values.entries.forEach { + channel.send(it.key to it.value) + } + channel.close() + + val expected = values.mapValues { (k, v) -> k to v } + + assertEquals(expected, channel.collectMap { it.first }) + } + +} \ No newline at end of file