Skip to content

Commit 99f0b2b

Browse files
committed
Implement Java9 reactive streams integration
This was accomplished by copying `kotlinx-coroutines-reactive`. Judging by the fact that https://github.com/reactive-streams/reactive-streams-jvm/blob/master/api/src/main/java9/org/reactivestreams/FlowAdapters.java defines converters between Java9 Flow and Reactive Streams as thin wrappers that simply redirect methods, this should be enough. Divergences from `kotlinx-coroutines-reactive` are as follows: * The target bytecode is set to JDK9. * JDK11 is required to build the module. * Automated change of all occurrences `org.reactivestreams` to `java.util.concurrent.Flow`. * Removal of `RangePublisherTest`, `RangePublisherBufferdTest`, and `UnboundedIntegerIncrementPublisherTest`. They all are heavily based on the examples provided for the Reactive Streams, and to use them for testing this integration would only be possible with heavy use of `FlowAdapters`, which seems redundant as the integration with the examples themselves is already tested in `kotlinx-coroutines-reactive`, and the correctness of the wrappers is probably a given. * Inclusion of 2020 in copyright notices. * Use of `FlowAdapters` where needed to make everything valid code.
1 parent c37b5f9 commit 99f0b2b

24 files changed

+1962
-2
lines changed

README.md

+3-2
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,9 @@ suspend fun main() = coroutineScope {
4545
* [CoroutinesTimeout] test rule to automatically dump coroutines on test timeout.
4646
* [reactive](reactive/README.md) — modules that provide builders and iteration support for various reactive streams libraries:
4747
* Reactive Streams ([Publisher.collect], [Publisher.awaitSingle], [publish], etc),
48-
RxJava 2.x ([rxFlowable], [rxSingle], etc), and
49-
Project Reactor ([flux], [mono], etc).
48+
* Flow (JDK 9) (the same interface as for Reactive Streams),
49+
* RxJava 2.x ([rxFlowable], [rxSingle], etc), and
50+
* Project Reactor ([flux], [mono], etc).
5051
* [ui](ui/README.md) — modules that provide coroutine dispatchers for various single-threaded UI libraries:
5152
* Android, JavaFX, and Swing.
5253
* [integration](integration/README.md) — modules that provide integration with various asynchronous callback- and future-based libraries:

binary-compatibility-validator/build.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ dependencies {
1919

2020
testArtifacts project(':kotlinx-coroutines-reactive')
2121
testArtifacts project(':kotlinx-coroutines-reactor')
22+
testArtifacts project(':kotlinx-coroutines-jdk9')
2223
testArtifacts project(':kotlinx-coroutines-rx2')
2324

2425
testArtifacts project(':kotlinx-coroutines-guava')
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
public final class kotlinx/coroutines/jdk9/AwaitKt {
2+
public static final fun awaitFirst (Ljava/util/concurrent/Flow$Publisher;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
3+
public static final fun awaitFirstOrDefault (Ljava/util/concurrent/Flow$Publisher;Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
4+
public static final fun awaitFirstOrElse (Ljava/util/concurrent/Flow$Publisher;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
5+
public static final fun awaitFirstOrNull (Ljava/util/concurrent/Flow$Publisher;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
6+
public static final fun awaitLast (Ljava/util/concurrent/Flow$Publisher;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
7+
public static final fun awaitSingle (Ljava/util/concurrent/Flow$Publisher;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
8+
}
9+
10+
public final class kotlinx/coroutines/jdk9/ChannelKt {
11+
public static final fun collect (Ljava/util/concurrent/Flow$Publisher;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
12+
public static final fun consumeEach (Ljava/util/concurrent/Flow$Publisher;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
13+
public static final fun openSubscription (Ljava/util/concurrent/Flow$Publisher;I)Lkotlinx/coroutines/channels/ReceiveChannel;
14+
public static synthetic fun openSubscription$default (Ljava/util/concurrent/Flow$Publisher;IILjava/lang/Object;)Lkotlinx/coroutines/channels/ReceiveChannel;
15+
}
16+
17+
public abstract interface class kotlinx/coroutines/jdk9/ContextInjector {
18+
public abstract fun injectCoroutineContext (Ljava/util/concurrent/Flow$Publisher;Lkotlin/coroutines/CoroutineContext;)Ljava/util/concurrent/Flow$Publisher;
19+
}
20+
21+
public final class kotlinx/coroutines/jdk9/ConvertKt {
22+
public static final fun asPublisher (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/CoroutineContext;)Ljava/util/concurrent/Flow$Publisher;
23+
public static synthetic fun asPublisher$default (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Ljava/util/concurrent/Flow$Publisher;
24+
}
25+
26+
public final class kotlinx/coroutines/jdk9/FlowKt {
27+
public static final fun asFlow (Ljava/util/concurrent/Flow$Publisher;)Lkotlinx/coroutines/flow/Flow;
28+
public static final fun asFlow (Ljava/util/concurrent/Flow$Publisher;I)Lkotlinx/coroutines/flow/Flow;
29+
public static final fun asPublisher (Lkotlinx/coroutines/flow/Flow;)Ljava/util/concurrent/Flow$Publisher;
30+
}
31+
32+
public final class kotlinx/coroutines/jdk9/FlowSubscription : kotlinx/coroutines/AbstractCoroutine, java/util/concurrent/Flow$Subscription {
33+
public final field flow Lkotlinx/coroutines/flow/Flow;
34+
public final field subscriber Ljava/util/concurrent/Flow$Subscriber;
35+
public fun <init> (Lkotlinx/coroutines/flow/Flow;Ljava/util/concurrent/Flow$Subscriber;)V
36+
public fun cancel ()V
37+
public fun request (J)V
38+
}
39+
40+
public final class kotlinx/coroutines/jdk9/PublishKt {
41+
public static final fun publish (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Ljava/util/concurrent/Flow$Publisher;
42+
public static final fun publish (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Ljava/util/concurrent/Flow$Publisher;
43+
public static synthetic fun publish$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Ljava/util/concurrent/Flow$Publisher;
44+
public static synthetic fun publish$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Ljava/util/concurrent/Flow$Publisher;
45+
public static final fun publishInternal (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;)Ljava/util/concurrent/Flow$Publisher;
46+
}
47+
48+
public final class kotlinx/coroutines/jdk9/PublisherCoroutine : kotlinx/coroutines/AbstractCoroutine, java/util/concurrent/Flow$Subscription, kotlinx/coroutines/channels/ProducerScope, kotlinx/coroutines/selects/SelectClause2 {
49+
public fun <init> (Lkotlin/coroutines/CoroutineContext;Ljava/util/concurrent/Flow$Subscriber;Lkotlin/jvm/functions/Function2;)V
50+
public fun cancel ()V
51+
public fun close (Ljava/lang/Throwable;)Z
52+
public fun getChannel ()Lkotlinx/coroutines/channels/SendChannel;
53+
public fun getOnSend ()Lkotlinx/coroutines/selects/SelectClause2;
54+
public fun invokeOnClose (Lkotlin/jvm/functions/Function1;)Ljava/lang/Void;
55+
public synthetic fun invokeOnClose (Lkotlin/jvm/functions/Function1;)V
56+
public fun isClosedForSend ()Z
57+
public fun isFull ()Z
58+
public fun offer (Ljava/lang/Object;)Z
59+
public synthetic fun onCompleted (Ljava/lang/Object;)V
60+
public fun registerSelectClause2 (Lkotlinx/coroutines/selects/SelectInstance;Ljava/lang/Object;Lkotlin/jvm/functions/Function2;)V
61+
public fun request (J)V
62+
public fun send (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
63+
}
64+
65+
public final class kotlinx/coroutines/jdk9/ReactiveFlowKt {
66+
public static final fun asFlow (Ljava/util/concurrent/Flow$Publisher;)Lkotlinx/coroutines/flow/Flow;
67+
public static final fun asPublisher (Lkotlinx/coroutines/flow/Flow;)Ljava/util/concurrent/Flow$Publisher;
68+
}
69+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
# Module kotlinx-coroutines-jdk9
2+
3+
Utilities for [Java Flow](https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/Flow.html).
4+
5+
Replicates [kotlinx-coroutines-reactive](../kotlinx-coroutines-reactive), an equivalent package for the Reactive Streams.
6+
7+
# Package kotlinx.coroutines.jdk9
8+
9+
Utilities for [Java Flow](https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/Flow.html).
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
targetCompatibility = 9
5+
6+
7+
dependencies {
8+
testCompile "org.reactivestreams:reactive-streams-tck:$reactive_streams_version"
9+
testCompile "org.reactivestreams:reactive-streams-flow-adapters:$reactive_streams_version"
10+
}
11+
12+
task testNG(type: Test) {
13+
useTestNG()
14+
reports.html.destination = file("$buildDir/reports/testng")
15+
include '**/*ReactiveStreamTckTest.*'
16+
// Skip testNG when tests are filtered with --tests, otherwise it simply fails
17+
onlyIf {
18+
filter.includePatterns.isEmpty()
19+
}
20+
doFirst {
21+
// Classic gradle, nothing works without doFirst
22+
println "TestNG tests: ($includes)"
23+
}
24+
}
25+
26+
task checkJdk11() {
27+
// only fail w/o JDK_11 when actually trying to compile, not during project setup phase
28+
doLast {
29+
if (!System.env.JDK_11) {
30+
throw new GradleException("JDK_11 environment variable is not defined. " +
31+
"Can't build against JDK 11 runtime and run JDK 11 compatibility tests. " +
32+
"Please ensure JDK 11 is installed and that JDK_11 points to it.")
33+
}
34+
}
35+
}
36+
37+
compileTestKotlin {
38+
kotlinOptions.jvmTarget = "9"
39+
}
40+
41+
compileKotlin {
42+
kotlinOptions.jvmTarget = "9"
43+
kotlinOptions.jdkHome = System.env.JDK_11
44+
dependsOn(checkJdk11)
45+
}
46+
47+
test {
48+
dependsOn(testNG)
49+
reports.html.destination = file("$buildDir/reports/junit")
50+
}
51+
52+
tasks.withType(dokka.getClass()) {
53+
externalDocumentationLink {
54+
url = new URL("https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/Flow.html")
55+
packageListUrl = projectDir.toPath().resolve("package.list").toUri().toURL()
56+
}
57+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
java.util.concurrent.Flow
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
/*
2+
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.jdk9
6+
7+
import kotlinx.coroutines.CancellationException
8+
import kotlinx.coroutines.Job
9+
import kotlinx.coroutines.suspendCancellableCoroutine
10+
import java.util.concurrent.Flow.Publisher
11+
import java.util.concurrent.Flow.Subscriber
12+
import java.util.concurrent.Flow.Subscription
13+
import java.util.*
14+
import kotlin.coroutines.*
15+
16+
/**
17+
* Awaits for the first value from the given publisher without blocking a thread and
18+
* returns the resulting value or throws the corresponding exception if this publisher had produced error.
19+
*
20+
* This suspending function is cancellable.
21+
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
22+
* immediately resumes with [CancellationException].
23+
*
24+
* @throws NoSuchElementException if publisher does not emit any value
25+
*/
26+
public suspend fun <T> Publisher<T>.awaitFirst(): T = awaitOne(Mode.FIRST)
27+
28+
/**
29+
* Awaits for the first value from the given observable or the [default] value if none is emitted without blocking a
30+
* thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
31+
*
32+
* This suspending function is cancellable.
33+
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
34+
* immediately resumes with [CancellationException].
35+
*/
36+
public suspend fun <T> Publisher<T>.awaitFirstOrDefault(default: T): T = awaitOne(Mode.FIRST_OR_DEFAULT, default)
37+
38+
/**
39+
* Awaits for the first value from the given observable or `null` value if none is emitted without blocking a
40+
* thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
41+
*
42+
* This suspending function is cancellable.
43+
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
44+
* immediately resumes with [CancellationException].
45+
*/
46+
public suspend fun <T> Publisher<T>.awaitFirstOrNull(): T? = awaitOne(Mode.FIRST_OR_DEFAULT)
47+
48+
/**
49+
* Awaits for the first value from the given observable or call [defaultValue] to get a value if none is emitted without blocking a
50+
* thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
51+
*
52+
* This suspending function is cancellable.
53+
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
54+
* immediately resumes with [CancellationException].
55+
*/
56+
public suspend fun <T> Publisher<T>.awaitFirstOrElse(defaultValue: () -> T): T = awaitOne(Mode.FIRST_OR_DEFAULT) ?: defaultValue()
57+
58+
/**
59+
* Awaits for the last value from the given publisher without blocking a thread and
60+
* returns the resulting value or throws the corresponding exception if this publisher had produced error.
61+
*
62+
* This suspending function is cancellable.
63+
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
64+
* immediately resumes with [CancellationException].
65+
*
66+
* @throws NoSuchElementException if publisher does not emit any value
67+
*/
68+
public suspend fun <T> Publisher<T>.awaitLast(): T = awaitOne(Mode.LAST)
69+
70+
/**
71+
* Awaits for the single value from the given publisher without blocking a thread and
72+
* returns the resulting value or throws the corresponding exception if this publisher had produced error.
73+
*
74+
* This suspending function is cancellable.
75+
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
76+
* immediately resumes with [CancellationException].
77+
*
78+
* @throws NoSuchElementException if publisher does not emit any value
79+
* @throws IllegalArgumentException if publisher emits more than one value
80+
*/
81+
public suspend fun <T> Publisher<T>.awaitSingle(): T = awaitOne(Mode.SINGLE)
82+
83+
// ------------------------ private ------------------------
84+
85+
// ContextInjector service is implemented in `kotlinx-coroutines-reactor` module only.
86+
// If `kotlinx-coroutines-reactor` module is not included, the list is empty.
87+
private val contextInjectors: Array<ContextInjector> =
88+
ServiceLoader.load(ContextInjector::class.java, ContextInjector::class.java.classLoader).iterator().asSequence().toList().toTypedArray() // R8 opto
89+
90+
private fun <T> Publisher<T>.injectCoroutineContext(coroutineContext: CoroutineContext) =
91+
contextInjectors.fold(this) { pub, contextInjector ->
92+
contextInjector.injectCoroutineContext(pub, coroutineContext)
93+
}
94+
95+
private enum class Mode(val s: String) {
96+
FIRST("awaitFirst"),
97+
FIRST_OR_DEFAULT("awaitFirstOrDefault"),
98+
LAST("awaitLast"),
99+
SINGLE("awaitSingle");
100+
override fun toString(): String = s
101+
}
102+
103+
private suspend fun <T> Publisher<T>.awaitOne(
104+
mode: Mode,
105+
default: T? = null
106+
): T = suspendCancellableCoroutine { cont ->
107+
injectCoroutineContext(cont.context).subscribe(object : Subscriber<T> {
108+
private lateinit var subscription: Subscription
109+
private var value: T? = null
110+
private var seenValue = false
111+
112+
override fun onSubscribe(sub: Subscription) {
113+
subscription = sub
114+
cont.invokeOnCancellation { sub.cancel() }
115+
sub.request(if (mode == Mode.FIRST) 1 else Long.MAX_VALUE)
116+
}
117+
118+
override fun onNext(t: T) {
119+
when (mode) {
120+
Mode.FIRST, Mode.FIRST_OR_DEFAULT -> {
121+
if (!seenValue) {
122+
seenValue = true
123+
subscription.cancel()
124+
cont.resume(t)
125+
}
126+
}
127+
Mode.LAST, Mode.SINGLE -> {
128+
if (mode == Mode.SINGLE && seenValue) {
129+
subscription.cancel()
130+
if (cont.isActive)
131+
cont.resumeWithException(IllegalArgumentException("More than one onNext value for $mode"))
132+
} else {
133+
value = t
134+
seenValue = true
135+
}
136+
}
137+
}
138+
}
139+
140+
@Suppress("UNCHECKED_CAST")
141+
override fun onComplete() {
142+
if (seenValue) {
143+
if (cont.isActive) cont.resume(value as T)
144+
return
145+
}
146+
when {
147+
mode == Mode.FIRST_OR_DEFAULT -> {
148+
cont.resume(default as T)
149+
}
150+
cont.isActive -> {
151+
cont.resumeWithException(NoSuchElementException("No value received via onNext for $mode"))
152+
}
153+
}
154+
}
155+
156+
override fun onError(e: Throwable) {
157+
cont.resumeWithException(e)
158+
}
159+
})
160+
}
161+

0 commit comments

Comments
 (0)