Skip to content

Commit a25bf36

Browse files
authored
JDK9 Flow integration (#1783)
JDK9 Flow integration is implemented as thin wrappers around the Reactive Streams integration.
1 parent 8aaf2f7 commit a25bf36

18 files changed

+933
-45
lines changed

README.md

+3-2
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,9 @@ suspend fun main() = coroutineScope {
4545
* [CoroutinesTimeout] test rule to automatically dump coroutines on test timeout.
4646
* [reactive](reactive/README.md) — modules that provide builders and iteration support for various reactive streams libraries:
4747
* Reactive Streams ([Publisher.collect], [Publisher.awaitSingle], [publish], etc),
48-
RxJava 2.x ([rxFlowable], [rxSingle], etc), and
49-
Project Reactor ([flux], [mono], etc).
48+
* Flow (JDK 9) (the same interface as for Reactive Streams),
49+
* RxJava 2.x ([rxFlowable], [rxSingle], etc), and
50+
* Project Reactor ([flux], [mono], etc).
5051
* [ui](ui/README.md) — modules that provide coroutine dispatchers for various single-threaded UI libraries:
5152
* Android, JavaFX, and Swing.
5253
* [integration](integration/README.md) — modules that provide integration with various asynchronous callback- and future-based libraries:
+10
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
# Module kotlinx-coroutines-jdk9
2+
3+
Utilities for [Java Flow](https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/Flow.html).
4+
5+
Implemented as a collection of thin wrappers over [kotlinx-coroutines-reactive](../kotlinx-coroutines-reactive),
6+
an equivalent package for the Reactive Streams.
7+
8+
# Package kotlinx.coroutines.jdk9
9+
10+
Utilities for [Java Flow](https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/Flow.html).
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
public final class kotlinx/coroutines/jdk9/AwaitKt {
2+
public static final fun awaitFirst (Ljava/util/concurrent/Flow$Publisher;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
3+
public static final fun awaitFirstOrDefault (Ljava/util/concurrent/Flow$Publisher;Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
4+
public static final fun awaitFirstOrElse (Ljava/util/concurrent/Flow$Publisher;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
5+
public static final fun awaitFirstOrNull (Ljava/util/concurrent/Flow$Publisher;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
6+
public static final fun awaitLast (Ljava/util/concurrent/Flow$Publisher;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
7+
public static final fun awaitSingle (Ljava/util/concurrent/Flow$Publisher;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
8+
}
9+
10+
public final class kotlinx/coroutines/jdk9/PublishKt {
11+
public static final fun flowPublish (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Ljava/util/concurrent/Flow$Publisher;
12+
public static synthetic fun flowPublish$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Ljava/util/concurrent/Flow$Publisher;
13+
}
14+
15+
public final class kotlinx/coroutines/jdk9/ReactiveFlowKt {
16+
public static final fun asFlow (Ljava/util/concurrent/Flow$Publisher;)Lkotlinx/coroutines/flow/Flow;
17+
public static final fun asPublisher (Lkotlinx/coroutines/flow/Flow;)Ljava/util/concurrent/Flow$Publisher;
18+
public static final fun collect (Ljava/util/concurrent/Flow$Publisher;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
19+
}
20+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
targetCompatibility = 9
5+
6+
dependencies {
7+
compile project(":kotlinx-coroutines-reactive")
8+
compile "org.reactivestreams:reactive-streams-flow-adapters:$reactive_streams_version"
9+
}
10+
11+
compileTestKotlin {
12+
kotlinOptions.jvmTarget = "9"
13+
}
14+
15+
compileKotlin {
16+
kotlinOptions.jvmTarget = "9"
17+
}
18+
19+
tasks.withType(dokka.getClass()) {
20+
externalDocumentationLink {
21+
url = new URL("https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/Flow.html")
22+
packageListUrl = projectDir.toPath().resolve("package.list").toUri().toURL()
23+
}
24+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
java.util.concurrent.Flow
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.jdk9
6+
7+
import java.util.concurrent.*
8+
import org.reactivestreams.FlowAdapters
9+
import kotlinx.coroutines.reactive.*
10+
11+
/**
12+
* Awaits for the first value from the given publisher without blocking a thread and
13+
* returns the resulting value or throws the corresponding exception if this publisher had produced error.
14+
*
15+
* This suspending function is cancellable.
16+
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
17+
* immediately resumes with [CancellationException].
18+
*
19+
* @throws NoSuchElementException if publisher does not emit any value
20+
*/
21+
public suspend fun <T> Flow.Publisher<T>.awaitFirst(): T = FlowAdapters.toPublisher(this).awaitFirst()
22+
23+
/**
24+
* Awaits for the first value from the given observable or the [default] value if none is emitted without blocking a
25+
* thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
26+
*
27+
* This suspending function is cancellable.
28+
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
29+
* immediately resumes with [CancellationException].
30+
*/
31+
public suspend fun <T> Flow.Publisher<T>.awaitFirstOrDefault(default: T): T =
32+
FlowAdapters.toPublisher(this).awaitFirstOrDefault(default)
33+
34+
/**
35+
* Awaits for the first value from the given observable or `null` value if none is emitted without blocking a
36+
* thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
37+
*
38+
* This suspending function is cancellable.
39+
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
40+
* immediately resumes with [CancellationException].
41+
*/
42+
public suspend fun <T> Flow.Publisher<T>.awaitFirstOrNull(): T? =
43+
FlowAdapters.toPublisher(this).awaitFirstOrNull()
44+
45+
/**
46+
* Awaits for the first value from the given observable or call [defaultValue] to get a value if none is emitted without blocking a
47+
* thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
48+
*
49+
* This suspending function is cancellable.
50+
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
51+
* immediately resumes with [CancellationException].
52+
*/
53+
public suspend fun <T> Flow.Publisher<T>.awaitFirstOrElse(defaultValue: () -> T): T =
54+
FlowAdapters.toPublisher(this).awaitFirstOrElse(defaultValue)
55+
56+
/**
57+
* Awaits for the last value from the given publisher without blocking a thread and
58+
* returns the resulting value or throws the corresponding exception if this publisher had produced error.
59+
*
60+
* This suspending function is cancellable.
61+
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
62+
* immediately resumes with [CancellationException].
63+
*
64+
* @throws NoSuchElementException if publisher does not emit any value
65+
*/
66+
public suspend fun <T> Flow.Publisher<T>.awaitLast(): T =
67+
FlowAdapters.toPublisher(this).awaitLast()
68+
69+
/**
70+
* Awaits for the single value from the given publisher without blocking a thread and
71+
* returns the resulting value or throws the corresponding exception if this publisher had produced error.
72+
*
73+
* This suspending function is cancellable.
74+
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
75+
* immediately resumes with [CancellationException].
76+
*
77+
* @throws NoSuchElementException if publisher does not emit any value
78+
* @throws IllegalArgumentException if publisher emits more than one value
79+
*/
80+
public suspend fun <T> Flow.Publisher<T>.awaitSingle(): T =
81+
FlowAdapters.toPublisher(this).awaitSingle()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.jdk9
6+
7+
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.channels.*
9+
import java.util.concurrent.*
10+
import kotlin.coroutines.*
11+
import org.reactivestreams.FlowAdapters
12+
13+
/**
14+
* Creates cold reactive [Flow.Publisher] that runs a given [block] in a coroutine.
15+
* Every time the returned flux is subscribed, it starts a new coroutine in the specified [context].
16+
* Coroutine emits ([Subscriber.onNext]) values with `send`, completes ([Subscriber.onComplete])
17+
* when the coroutine completes or channel is explicitly closed and emits error ([Subscriber.onError])
18+
* if coroutine throws an exception or closes channel with a cause.
19+
* Unsubscribing cancels running coroutine.
20+
*
21+
* Invocations of `send` are suspended appropriately when subscribers apply back-pressure and to ensure that
22+
* `onNext` is not invoked concurrently.
23+
*
24+
* Coroutine context can be specified with [context] argument.
25+
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
26+
* Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance.
27+
*
28+
* **Note: This is an experimental api.** Behaviour of publishers that work as children in a parent scope with respect
29+
* to cancellation and error handling may change in the future.
30+
*/
31+
@ExperimentalCoroutinesApi
32+
public fun <T> flowPublish(
33+
context: CoroutineContext = EmptyCoroutineContext,
34+
@BuilderInference block: suspend ProducerScope<T>.() -> Unit
35+
): Flow.Publisher<T> {
36+
val reactivePublisher : org.reactivestreams.Publisher<T> = kotlinx.coroutines.reactive.publish<T>(context, block)
37+
return FlowAdapters.toFlowPublisher(reactivePublisher)
38+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.jdk9
6+
7+
import kotlinx.coroutines.flow.*
8+
import kotlinx.coroutines.reactive.asFlow
9+
import kotlinx.coroutines.reactive.asPublisher
10+
import kotlinx.coroutines.reactive.collect
11+
import java.util.concurrent.Flow as JFlow
12+
import org.reactivestreams.FlowAdapters
13+
14+
/**
15+
* Transforms the given reactive [Publisher] into [Flow].
16+
* Use [buffer] operator on the resulting flow to specify the size of the backpressure.
17+
* More precisely, it specifies the value of the subscription's [request][Subscription.request].
18+
* [buffer] default capacity is used by default.
19+
*
20+
* If any of the resulting flow transformations fails, subscription is immediately cancelled and all in-flight elements
21+
* are discarded.
22+
*/
23+
public fun <T : Any> JFlow.Publisher<T>.asFlow(): Flow<T> =
24+
FlowAdapters.toPublisher(this).asFlow()
25+
26+
/**
27+
* Transforms the given flow to a reactive specification compliant [Publisher].
28+
*/
29+
public fun <T : Any> Flow<T>.asPublisher(): JFlow.Publisher<T> {
30+
val reactivePublisher : org.reactivestreams.Publisher<T> = this.asPublisher<T>()
31+
return FlowAdapters.toFlowPublisher(reactivePublisher)
32+
}
33+
34+
/**
35+
* Subscribes to this [Publisher] and performs the specified action for each received element.
36+
* Cancels subscription if any exception happens during collect.
37+
*/
38+
public suspend inline fun <T> JFlow.Publisher<T>.collect(action: (T) -> Unit) =
39+
FlowAdapters.toPublisher(this).collect(action)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.jdk9
6+
7+
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.flow.*
9+
import org.junit.Test
10+
import java.util.concurrent.Flow as JFlow
11+
import kotlin.test.*
12+
13+
class FlowAsPublisherTest : TestBase() {
14+
15+
@Test
16+
fun testErrorOnCancellationIsReported() {
17+
expect(1)
18+
flow<Int> {
19+
emit(2)
20+
try {
21+
hang { expect(3) }
22+
} finally {
23+
throw TestException()
24+
}
25+
}.asPublisher().subscribe(object : JFlow.Subscriber<Int> {
26+
private lateinit var subscription: JFlow.Subscription
27+
28+
override fun onComplete() {
29+
expectUnreached()
30+
}
31+
32+
override fun onSubscribe(s: JFlow.Subscription?) {
33+
subscription = s!!
34+
subscription.request(2)
35+
}
36+
37+
override fun onNext(t: Int) {
38+
expect(t)
39+
subscription.cancel()
40+
}
41+
42+
override fun onError(t: Throwable?) {
43+
assertTrue(t is TestException)
44+
expect(4)
45+
}
46+
})
47+
finish(5)
48+
}
49+
50+
@Test
51+
fun testCancellationIsNotReported() {
52+
expect(1)
53+
flow<Int> {
54+
emit(2)
55+
hang { expect(3) }
56+
}.asPublisher().subscribe(object : JFlow.Subscriber<Int> {
57+
private lateinit var subscription: JFlow.Subscription
58+
59+
override fun onComplete() {
60+
expect(4)
61+
}
62+
63+
override fun onSubscribe(s: JFlow.Subscription?) {
64+
subscription = s!!
65+
subscription.request(2)
66+
}
67+
68+
override fun onNext(t: Int) {
69+
expect(t)
70+
subscription.cancel()
71+
}
72+
73+
override fun onError(t: Throwable?) {
74+
expectUnreached()
75+
}
76+
})
77+
finish(5)
78+
}
79+
}

0 commit comments

Comments
 (0)