Skip to content

Commit 7045308

Browse files
fvascoelizarov
authored andcommitted
Channel operators (#88)
1 parent 0163bf1 commit 7045308

File tree

4 files changed

+1536
-0
lines changed
  • integration/kotlinx-coroutines-jdk8/src
    • main/kotlin/kotlinx/coroutines/experimental/channels8
    • test/kotlin/kotlinx/coroutines/experimental/channels8
  • kotlinx-coroutines-core/src
    • main/kotlin/kotlinx/coroutines/experimental/channels
    • test/kotlin/kotlinx/coroutines/experimental/channels

4 files changed

+1536
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package kotlinx.coroutines.experimental.channels8
18+
19+
import kotlinx.coroutines.experimental.CommonPool
20+
import kotlinx.coroutines.experimental.channels.ProducerJob
21+
import kotlinx.coroutines.experimental.channels.ReceiveChannel
22+
import kotlinx.coroutines.experimental.channels.consumeEach
23+
import kotlinx.coroutines.experimental.channels.produce
24+
import kotlinx.coroutines.experimental.runBlocking
25+
import java.util.*
26+
import java.util.function.BiConsumer
27+
import java.util.function.Consumer
28+
import java.util.stream.Collector
29+
import java.util.stream.Stream
30+
import java.util.stream.StreamSupport
31+
import kotlin.coroutines.experimental.CoroutineContext
32+
33+
/**
34+
* Creates a [ProducerJob] to read all element of the [Stream].
35+
*/
36+
public fun <E> Stream<E>.asReceiveChannel(context: CoroutineContext = CommonPool): ProducerJob<E> = produce(context) {
37+
for (element in this@asReceiveChannel)
38+
send(element)
39+
}
40+
41+
/**
42+
* Creates a [Stream] of elements in this [ReceiveChannel].
43+
*/
44+
public fun <E : Any> ReceiveChannel<E>.asStream(): Stream<E> = StreamSupport.stream<E>(SpliteratorAdapter(this), false)
45+
46+
/**
47+
* Applies the [collector] to the [ReceiveChannel]
48+
*/
49+
public suspend fun <T, A : Any, R> ReceiveChannel<T>.collect(collector: Collector<T, A, R>): R {
50+
val container: A = collector.supplier().get()
51+
val accumulator: BiConsumer<A, T> = collector.accumulator()
52+
consumeEach { accumulator.accept(container, it) }
53+
return collector.finisher().apply(container)
54+
}
55+
56+
private class SpliteratorAdapter<E : Any>(val channel: ReceiveChannel<E>) : Spliterator<E> {
57+
override fun estimateSize(): Long = Long.MAX_VALUE
58+
59+
override fun forEachRemaining(action: Consumer<in E>) {
60+
runBlocking {
61+
for (element in channel)
62+
action.accept(element)
63+
}
64+
}
65+
66+
override fun tryAdvance(action: Consumer<in E>): Boolean = runBlocking {
67+
val element = channel.receiveOrNull()
68+
if (element != null) {
69+
action.accept(element)
70+
true
71+
} else false
72+
}
73+
74+
override fun characteristics(): Int = characteristics
75+
76+
override fun trySplit(): Spliterator<E>? = null
77+
78+
private companion object {
79+
@JvmStatic
80+
private val characteristics = Spliterator.ORDERED or Spliterator.NONNULL
81+
}
82+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package kotlinx.coroutines.experimental.channels8
2+
3+
import kotlinx.coroutines.experimental.TestBase
4+
import kotlinx.coroutines.experimental.channels.asReceiveChannel
5+
import kotlinx.coroutines.experimental.channels.toList
6+
import kotlinx.coroutines.experimental.runBlocking
7+
import org.junit.Assert.assertEquals
8+
import org.junit.Test
9+
import java.util.stream.Collectors
10+
11+
class ChannelsTest : TestBase() {
12+
private val testList = listOf(1, 2, 3)
13+
14+
@Test
15+
fun testCollect() = runBlocking {
16+
assertEquals(testList, testList.asReceiveChannel().collect(Collectors.toList()))
17+
}
18+
19+
@Test
20+
fun testStreamAsReceiveChannel() = runBlocking {
21+
assertEquals(testList, testList.stream().asReceiveChannel().toList())
22+
}
23+
24+
@Test
25+
fun testReceiveChannelAsStream() {
26+
assertEquals(testList, testList.asReceiveChannel().asStream().collect(Collectors.toList()))
27+
}
28+
}

0 commit comments

Comments
 (0)