Skip to content

Commit c9de9de

Browse files
committed
Simplify JDK9 Flow integration by calling ReactiveStreams integration
1 parent 4a036fe commit c9de9de

File tree

6 files changed

+32
-671
lines changed

6 files changed

+32
-671
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-jdk9.txt

-28
Original file line numberDiff line numberDiff line change
@@ -7,37 +7,9 @@ public final class kotlinx/coroutines/jdk9/AwaitKt {
77
public static final fun awaitSingle (Ljava/util/concurrent/Flow$Publisher;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
88
}
99

10-
public final class kotlinx/coroutines/jdk9/FlowSubscription : kotlinx/coroutines/AbstractCoroutine, java/util/concurrent/Flow$Subscription {
11-
public final field flow Lkotlinx/coroutines/flow/Flow;
12-
public final field subscriber Ljava/util/concurrent/Flow$Subscriber;
13-
public fun <init> (Lkotlinx/coroutines/flow/Flow;Ljava/util/concurrent/Flow$Subscriber;)V
14-
public fun cancel ()V
15-
public fun request (J)V
16-
}
17-
1810
public final class kotlinx/coroutines/jdk9/PublishKt {
1911
public static final fun publish (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Ljava/util/concurrent/Flow$Publisher;
20-
public static final fun publish (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Ljava/util/concurrent/Flow$Publisher;
2112
public static synthetic fun publish$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Ljava/util/concurrent/Flow$Publisher;
22-
public static synthetic fun publish$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Ljava/util/concurrent/Flow$Publisher;
23-
public static final fun publishInternal (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;)Ljava/util/concurrent/Flow$Publisher;
24-
}
25-
26-
public final class kotlinx/coroutines/jdk9/PublisherCoroutine : kotlinx/coroutines/AbstractCoroutine, java/util/concurrent/Flow$Subscription, kotlinx/coroutines/channels/ProducerScope, kotlinx/coroutines/selects/SelectClause2 {
27-
public fun <init> (Lkotlin/coroutines/CoroutineContext;Ljava/util/concurrent/Flow$Subscriber;Lkotlin/jvm/functions/Function2;)V
28-
public fun cancel ()V
29-
public fun close (Ljava/lang/Throwable;)Z
30-
public fun getChannel ()Lkotlinx/coroutines/channels/SendChannel;
31-
public fun getOnSend ()Lkotlinx/coroutines/selects/SelectClause2;
32-
public fun invokeOnClose (Lkotlin/jvm/functions/Function1;)Ljava/lang/Void;
33-
public synthetic fun invokeOnClose (Lkotlin/jvm/functions/Function1;)V
34-
public fun isClosedForSend ()Z
35-
public fun isFull ()Z
36-
public fun offer (Ljava/lang/Object;)Z
37-
public synthetic fun onCompleted (Ljava/lang/Object;)V
38-
public fun registerSelectClause2 (Lkotlinx/coroutines/selects/SelectInstance;Ljava/lang/Object;Lkotlin/jvm/functions/Function2;)V
39-
public fun request (J)V
40-
public fun send (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
4113
}
4214

4315
public final class kotlinx/coroutines/jdk9/ReactiveFlowKt {

reactive/kotlinx-coroutines-jdk9/build.gradle

+2-1
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,9 @@ targetCompatibility = 9
55

66

77
dependencies {
8+
compile project(":kotlinx-coroutines-reactive")
9+
compile "org.reactivestreams:reactive-streams-flow-adapters:$reactive_streams_version"
810
testCompile "org.reactivestreams:reactive-streams-tck:$reactive_streams_version"
9-
testCompile "org.reactivestreams:reactive-streams-flow-adapters:$reactive_streams_version"
1011
}
1112

1213
task testNG(type: Test) {

reactive/kotlinx-coroutines-jdk9/src/Await.kt

+13-83
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,9 @@
44

55
package kotlinx.coroutines.jdk9
66

7-
import kotlinx.coroutines.CancellationException
8-
import kotlinx.coroutines.Job
9-
import kotlinx.coroutines.suspendCancellableCoroutine
107
import java.util.concurrent.Flow.Publisher
11-
import java.util.concurrent.Flow.Subscriber
12-
import java.util.concurrent.Flow.Subscription
13-
import java.util.*
14-
import kotlin.coroutines.*
8+
import org.reactivestreams.FlowAdapters
9+
import kotlinx.coroutines.reactive.*
1510

1611
/**
1712
* Awaits for the first value from the given publisher without blocking a thread and
@@ -23,7 +18,7 @@ import kotlin.coroutines.*
2318
*
2419
* @throws NoSuchElementException if publisher does not emit any value
2520
*/
26-
public suspend fun <T> Publisher<T>.awaitFirst(): T = awaitOne(Mode.FIRST)
21+
public suspend fun <T> Publisher<T>.awaitFirst(): T = FlowAdapters.toPublisher(this).awaitFirst()
2722

2823
/**
2924
* Awaits for the first value from the given observable or the [default] value if none is emitted without blocking a
@@ -33,7 +28,8 @@ public suspend fun <T> Publisher<T>.awaitFirst(): T = awaitOne(Mode.FIRST)
3328
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
3429
* immediately resumes with [CancellationException].
3530
*/
36-
public suspend fun <T> Publisher<T>.awaitFirstOrDefault(default: T): T = awaitOne(Mode.FIRST_OR_DEFAULT, default)
31+
public suspend fun <T> Publisher<T>.awaitFirstOrDefault(default: T): T =
32+
FlowAdapters.toPublisher(this).awaitFirstOrDefault(default)
3733

3834
/**
3935
* Awaits for the first value from the given observable or `null` value if none is emitted without blocking a
@@ -43,7 +39,8 @@ public suspend fun <T> Publisher<T>.awaitFirstOrDefault(default: T): T = awaitOn
4339
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
4440
* immediately resumes with [CancellationException].
4541
*/
46-
public suspend fun <T> Publisher<T>.awaitFirstOrNull(): T? = awaitOne(Mode.FIRST_OR_DEFAULT)
42+
public suspend fun <T> Publisher<T>.awaitFirstOrNull(): T? =
43+
FlowAdapters.toPublisher(this).awaitFirstOrNull()
4744

4845
/**
4946
* Awaits for the first value from the given observable or call [defaultValue] to get a value if none is emitted without blocking a
@@ -53,7 +50,8 @@ public suspend fun <T> Publisher<T>.awaitFirstOrNull(): T? = awaitOne(Mode.FIRST
5350
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
5451
* immediately resumes with [CancellationException].
5552
*/
56-
public suspend fun <T> Publisher<T>.awaitFirstOrElse(defaultValue: () -> T): T = awaitOne(Mode.FIRST_OR_DEFAULT) ?: defaultValue()
53+
public suspend fun <T> Publisher<T>.awaitFirstOrElse(defaultValue: () -> T): T =
54+
FlowAdapters.toPublisher(this).awaitFirstOrElse(defaultValue)
5755

5856
/**
5957
* Awaits for the last value from the given publisher without blocking a thread and
@@ -65,7 +63,8 @@ public suspend fun <T> Publisher<T>.awaitFirstOrElse(defaultValue: () -> T): T =
6563
*
6664
* @throws NoSuchElementException if publisher does not emit any value
6765
*/
68-
public suspend fun <T> Publisher<T>.awaitLast(): T = awaitOne(Mode.LAST)
66+
public suspend fun <T> Publisher<T>.awaitLast(): T =
67+
FlowAdapters.toPublisher(this).awaitLast()
6968

7069
/**
7170
* Awaits for the single value from the given publisher without blocking a thread and
@@ -78,74 +77,5 @@ public suspend fun <T> Publisher<T>.awaitLast(): T = awaitOne(Mode.LAST)
7877
* @throws NoSuchElementException if publisher does not emit any value
7978
* @throws IllegalArgumentException if publisher emits more than one value
8079
*/
81-
public suspend fun <T> Publisher<T>.awaitSingle(): T = awaitOne(Mode.SINGLE)
82-
83-
// ------------------------ private ------------------------
84-
85-
private enum class Mode(val s: String) {
86-
FIRST("awaitFirst"),
87-
FIRST_OR_DEFAULT("awaitFirstOrDefault"),
88-
LAST("awaitLast"),
89-
SINGLE("awaitSingle");
90-
override fun toString(): String = s
91-
}
92-
93-
private suspend fun <T> Publisher<T>.awaitOne(
94-
mode: Mode,
95-
default: T? = null
96-
): T = suspendCancellableCoroutine { cont ->
97-
subscribe(object : Subscriber<T> {
98-
private lateinit var subscription: Subscription
99-
private var value: T? = null
100-
private var seenValue = false
101-
102-
override fun onSubscribe(sub: Subscription) {
103-
subscription = sub
104-
cont.invokeOnCancellation { sub.cancel() }
105-
sub.request(if (mode == Mode.FIRST) 1 else Long.MAX_VALUE)
106-
}
107-
108-
override fun onNext(t: T) {
109-
when (mode) {
110-
Mode.FIRST, Mode.FIRST_OR_DEFAULT -> {
111-
if (!seenValue) {
112-
seenValue = true
113-
subscription.cancel()
114-
cont.resume(t)
115-
}
116-
}
117-
Mode.LAST, Mode.SINGLE -> {
118-
if (mode == Mode.SINGLE && seenValue) {
119-
subscription.cancel()
120-
if (cont.isActive)
121-
cont.resumeWithException(IllegalArgumentException("More than one onNext value for $mode"))
122-
} else {
123-
value = t
124-
seenValue = true
125-
}
126-
}
127-
}
128-
}
129-
130-
@Suppress("UNCHECKED_CAST")
131-
override fun onComplete() {
132-
if (seenValue) {
133-
if (cont.isActive) cont.resume(value as T)
134-
return
135-
}
136-
when {
137-
mode == Mode.FIRST_OR_DEFAULT -> {
138-
cont.resume(default as T)
139-
}
140-
cont.isActive -> {
141-
cont.resumeWithException(NoSuchElementException("No value received via onNext for $mode"))
142-
}
143-
}
144-
}
145-
146-
override fun onError(e: Throwable) {
147-
cont.resumeWithException(e)
148-
}
149-
})
150-
}
151-
80+
public suspend fun <T> Publisher<T>.awaitSingle(): T =
81+
FlowAdapters.toPublisher(this).awaitSingle()

reactive/kotlinx-coroutines-jdk9/src/Channel.kt

-106
This file was deleted.

0 commit comments

Comments
 (0)