Skip to content

Commit 996b838

Browse files
committed
Introduce Stream.consumeAsFlow
* Even though java.lang.Stream is collected rather than consumed, collectAsFlow will clash with Flow terminology where collect is a terminal operator * Close the stream in the end of collection despite the fact that regular terminal operations don't do that. We are already in suspending world (empty close() call won't make any difference in a common case) and "consume" implies closing the underlying resource (note that we already do it for channels) * Remove obsolete examples from the module Fixes #1601
1 parent e2a72a0 commit 996b838

File tree

11 files changed

+75
-224
lines changed

11 files changed

+75
-224
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-jdk8.txt

+4
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ public final class kotlinx/coroutines/future/FutureKt {
77
public static synthetic fun future$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlinx/coroutines/CoroutineStart;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Ljava/util/concurrent/CompletableFuture;
88
}
99

10+
public final class kotlinx/coroutines/stream/StreamKt {
11+
public static final fun consumeAsFlow (Ljava/util/stream/Stream;)Lkotlinx/coroutines/flow/Flow;
12+
}
13+
1014
public final class kotlinx/coroutines/time/TimeKt {
1115
public static final fun delay (Ljava/time/Duration;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
1216
public static final fun onTimeout (Lkotlinx/coroutines/selects/SelectBuilder;Ljava/time/Duration;Lkotlin/jvm/functions/Function1;)V
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.stream
6+
7+
import kotlinx.atomicfu.*
8+
import kotlinx.coroutines.*
9+
import kotlinx.coroutines.flow.*
10+
import java.util.stream.*
11+
12+
/**
13+
* Represents the given stream as a flow and [closes][Stream.close] the stream afterwards.
14+
* The resulting flow can be [collected][Flow.collect] only once
15+
* and throws [IllegalStateException] when trying to collect it more than once.
16+
*/
17+
public fun <T> Stream<T>.consumeAsFlow(): Flow<T> = StreamFlow(this)
18+
19+
private class StreamFlow<T>(private val stream: Stream<T>) : Flow<T> {
20+
private val consumed = atomic(false)
21+
22+
@InternalCoroutinesApi
23+
override suspend fun collect(collector: FlowCollector<T>) {
24+
if (!consumed.compareAndSet(false, true)) error("Stream.consumeAsFlow can be collected only once")
25+
try {
26+
for (value in stream.iterator()) {
27+
collector.emit(value)
28+
}
29+
} finally {
30+
stream.close()
31+
}
32+
}
33+
}

integration/kotlinx-coroutines-jdk8/test/examples/CancelFuture-example.kt

-31
This file was deleted.

integration/kotlinx-coroutines-jdk8/test/examples/ExplicitJob-example.kt

-36
This file was deleted.

integration/kotlinx-coroutines-jdk8/test/examples/ToFuture-example.kt

-23
This file was deleted.

integration/kotlinx-coroutines-jdk8/test/examples/Try.kt

-26
This file was deleted.

integration/kotlinx-coroutines-jdk8/test/examples/simple-example-1.kt

-22
This file was deleted.

integration/kotlinx-coroutines-jdk8/test/examples/simple-example-2.kt

-22
This file was deleted.

integration/kotlinx-coroutines-jdk8/test/examples/simple-example-3.kt

-26
This file was deleted.

integration/kotlinx-coroutines-jdk8/test/examples/withTimeout-example.kt

-38
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.stream
6+
7+
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.flow.*
9+
import org.junit.Test
10+
import java.lang.IllegalStateException
11+
import kotlin.test.*
12+
13+
class ConsumeAsFlowTest : TestBase() {
14+
15+
@Test
16+
fun testCollect() = runTest {
17+
val list = listOf(1, 2, 3)
18+
assertEquals(list, list.stream().consumeAsFlow().toList())
19+
}
20+
21+
@Test
22+
fun testCollectInvokesClose() = runTest {
23+
val list = listOf(3, 4, 5)
24+
expect(1)
25+
assertEquals(list, list.stream().onClose { expect(2) }.consumeAsFlow().toList())
26+
finish(3)
27+
}
28+
29+
@Test
30+
fun testCollectTwice() = runTest {
31+
val list = listOf(2, 3, 9)
32+
val flow = list.stream().onClose { expect(2) } .consumeAsFlow()
33+
expect(1)
34+
assertEquals(list, flow.toList())
35+
assertFailsWith<IllegalStateException> { flow.collect() }
36+
finish(3)
37+
}
38+
}

0 commit comments

Comments
 (0)