Skip to content

JDK9 Flow integration #1783

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Mar 13, 2020
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ suspend fun main() = coroutineScope {
* [CoroutinesTimeout] test rule to automatically dump coroutines on test timeout.
* [reactive](reactive/README.md) — modules that provide builders and iteration support for various reactive streams libraries:
* Reactive Streams ([Publisher.collect], [Publisher.awaitSingle], [publish], etc),
RxJava 2.x ([rxFlowable], [rxSingle], etc), and
Project Reactor ([flux], [mono], etc).
* Flow (JDK 9) (the same interface as for Reactive Streams),
* RxJava 2.x ([rxFlowable], [rxSingle], etc), and
* Project Reactor ([flux], [mono], etc).
* [ui](ui/README.md) — modules that provide coroutine dispatchers for various single-threaded UI libraries:
* Android, JavaFX, and Swing.
* [integration](integration/README.md) — modules that provide integration with various asynchronous callback- and future-based libraries:
Expand Down
10 changes: 10 additions & 0 deletions reactive/kotlinx-coroutines-jdk9/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Module kotlinx-coroutines-jdk9

Utilities for [Java Flow](https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/Flow.html).

Implemented as a collection of thin wrappers over [kotlinx-coroutines-reactive](../kotlinx-coroutines-reactive),
an equivalent package for the Reactive Streams.

# Package kotlinx.coroutines.jdk9

Utilities for [Java Flow](https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/Flow.html).
20 changes: 20 additions & 0 deletions reactive/kotlinx-coroutines-jdk9/api/kotlinx-coroutines-jdk9.api
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
public final class kotlinx/coroutines/jdk9/AwaitKt {
public static final fun awaitFirst (Ljava/util/concurrent/Flow$Publisher;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun awaitFirstOrDefault (Ljava/util/concurrent/Flow$Publisher;Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun awaitFirstOrElse (Ljava/util/concurrent/Flow$Publisher;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun awaitFirstOrNull (Ljava/util/concurrent/Flow$Publisher;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun awaitLast (Ljava/util/concurrent/Flow$Publisher;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun awaitSingle (Ljava/util/concurrent/Flow$Publisher;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public final class kotlinx/coroutines/jdk9/PublishKt {
public static final fun publish (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Ljava/util/concurrent/Flow$Publisher;
public static synthetic fun publish$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Ljava/util/concurrent/Flow$Publisher;
}

public final class kotlinx/coroutines/jdk9/ReactiveFlowKt {
public static final fun asFlow (Ljava/util/concurrent/Flow$Publisher;)Lkotlinx/coroutines/flow/Flow;
public static final fun asPublisher (Lkotlinx/coroutines/flow/Flow;)Ljava/util/concurrent/Flow$Publisher;
public static final fun collect (Ljava/util/concurrent/Flow$Publisher;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

24 changes: 24 additions & 0 deletions reactive/kotlinx-coroutines-jdk9/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
targetCompatibility = 9

dependencies {
compile project(":kotlinx-coroutines-reactive")
compile "org.reactivestreams:reactive-streams-flow-adapters:$reactive_streams_version"
}

compileTestKotlin {
kotlinOptions.jvmTarget = "9"
}

compileKotlin {
kotlinOptions.jvmTarget = "9"
}

tasks.withType(dokka.getClass()) {
externalDocumentationLink {
url = new URL("https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/Flow.html")
packageListUrl = projectDir.toPath().resolve("package.list").toUri().toURL()
}
}
1 change: 1 addition & 0 deletions reactive/kotlinx-coroutines-jdk9/package.list
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
java.util.concurrent.Flow
81 changes: 81 additions & 0 deletions reactive/kotlinx-coroutines-jdk9/src/Await.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.jdk9

import java.util.concurrent.Flow.Publisher
import org.reactivestreams.FlowAdapters
import kotlinx.coroutines.reactive.*

/**
* Awaits for the first value from the given publisher without blocking a thread and
* returns the resulting value or throws the corresponding exception if this publisher had produced error.
*
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
* immediately resumes with [CancellationException].
*
* @throws NoSuchElementException if publisher does not emit any value
*/
public suspend fun <T> Publisher<T>.awaitFirst(): T = FlowAdapters.toPublisher(this).awaitFirst()

/**
* Awaits for the first value from the given observable or the [default] value if none is emitted without blocking a
* thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
*
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
* immediately resumes with [CancellationException].
*/
public suspend fun <T> Publisher<T>.awaitFirstOrDefault(default: T): T =
FlowAdapters.toPublisher(this).awaitFirstOrDefault(default)

/**
* Awaits for the first value from the given observable or `null` value if none is emitted without blocking a
* thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
*
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
* immediately resumes with [CancellationException].
*/
public suspend fun <T> Publisher<T>.awaitFirstOrNull(): T? =
FlowAdapters.toPublisher(this).awaitFirstOrNull()

/**
* Awaits for the first value from the given observable or call [defaultValue] to get a value if none is emitted without blocking a
* thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
*
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
* immediately resumes with [CancellationException].
*/
public suspend fun <T> Publisher<T>.awaitFirstOrElse(defaultValue: () -> T): T =
FlowAdapters.toPublisher(this).awaitFirstOrElse(defaultValue)

/**
* Awaits for the last value from the given publisher without blocking a thread and
* returns the resulting value or throws the corresponding exception if this publisher had produced error.
*
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
* immediately resumes with [CancellationException].
*
* @throws NoSuchElementException if publisher does not emit any value
*/
public suspend fun <T> Publisher<T>.awaitLast(): T =
FlowAdapters.toPublisher(this).awaitLast()

/**
* Awaits for the single value from the given publisher without blocking a thread and
* returns the resulting value or throws the corresponding exception if this publisher had produced error.
*
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
* immediately resumes with [CancellationException].
*
* @throws NoSuchElementException if publisher does not emit any value
* @throws IllegalArgumentException if publisher emits more than one value
*/
public suspend fun <T> Publisher<T>.awaitSingle(): T =
FlowAdapters.toPublisher(this).awaitSingle()
38 changes: 38 additions & 0 deletions reactive/kotlinx-coroutines-jdk9/src/Publish.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.jdk9

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import java.util.concurrent.Flow.*
import kotlin.coroutines.*
import org.reactivestreams.FlowAdapters

/**
* Creates cold reactive [Publisher] that runs a given [block] in a coroutine.
* Every time the returned flux is subscribed, it starts a new coroutine in the specified [context].
* Coroutine emits ([Subscriber.onNext]) values with `send`, completes ([Subscriber.onComplete])
* when the coroutine completes or channel is explicitly closed and emits error ([Subscriber.onError])
* if coroutine throws an exception or closes channel with a cause.
* Unsubscribing cancels running coroutine.
*
* Invocations of `send` are suspended appropriately when subscribers apply back-pressure and to ensure that
* `onNext` is not invoked concurrently.
*
* Coroutine context can be specified with [context] argument.
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
* Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance.
*
* **Note: This is an experimental api.** Behaviour of publishers that work as children in a parent scope with respect
* to cancellation and error handling may change in the future.
*/
@ExperimentalCoroutinesApi
public fun <T> publish(
context: CoroutineContext = EmptyCoroutineContext,
@BuilderInference block: suspend ProducerScope<T>.() -> Unit
): Publisher<T> {
val reactivePublisher : org.reactivestreams.Publisher<T> = kotlinx.coroutines.reactive.publish<T>(context, block)
return FlowAdapters.toFlowPublisher(reactivePublisher)
}
39 changes: 39 additions & 0 deletions reactive/kotlinx-coroutines-jdk9/src/ReactiveFlow.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.jdk9

import kotlinx.coroutines.flow.*
import kotlinx.coroutines.reactive.asFlow
import kotlinx.coroutines.reactive.asPublisher
import kotlinx.coroutines.reactive.collect
import java.util.concurrent.Flow.*
import org.reactivestreams.FlowAdapters

/**
* Transforms the given reactive [Publisher] into [Flow].
* Use [buffer] operator on the resulting flow to specify the size of the backpressure.
* More precisely, it specifies the value of the subscription's [request][Subscription.request].
* [buffer] default capacity is used by default.
*
* If any of the resulting flow transformations fails, subscription is immediately cancelled and all in-flight elements
* are discarded.
*/
public fun <T : Any> Publisher<T>.asFlow(): Flow<T> =
FlowAdapters.toPublisher(this).asFlow()

/**
* Transforms the given flow to a reactive specification compliant [Publisher].
*/
public fun <T : Any> Flow<T>.asPublisher(): Publisher<T> {
val reactivePublisher : org.reactivestreams.Publisher<T> = this.asPublisher<T>()
return FlowAdapters.toFlowPublisher(reactivePublisher)
}

/**
* Subscribes to this [Publisher] and performs the specified action for each received element.
* Cancels subscription if any exception happens during collect.
*/
public suspend inline fun <T> Publisher<T>.collect(action: (T) -> Unit) =
FlowAdapters.toPublisher(this).collect(action)
79 changes: 79 additions & 0 deletions reactive/kotlinx-coroutines-jdk9/test/FlowAsPublisherTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.jdk9

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import org.junit.Test
import java.util.concurrent.Flow.*
import kotlin.test.*

class FlowAsPublisherTest : TestBase() {

@Test
fun testErrorOnCancellationIsReported() {
expect(1)
flow<Int> {
emit(2)
try {
hang { expect(3) }
} finally {
throw TestException()
}
}.asPublisher().subscribe(object : Subscriber<Int> {
private lateinit var subscription: Subscription

override fun onComplete() {
expectUnreached()
}

override fun onSubscribe(s: Subscription?) {
subscription = s!!
subscription.request(2)
}

override fun onNext(t: Int) {
expect(t)
subscription.cancel()
}

override fun onError(t: Throwable?) {
assertTrue(t is TestException)
expect(4)
}
})
finish(5)
}

@Test
fun testCancellationIsNotReported() {
expect(1)
flow<Int> {
emit(2)
hang { expect(3) }
}.asPublisher().subscribe(object : Subscriber<Int> {
private lateinit var subscription: Subscription

override fun onComplete() {
expect(4)
}

override fun onSubscribe(s: Subscription?) {
subscription = s!!
subscription.request(2)
}

override fun onNext(t: Int) {
expect(t)
subscription.cancel()
}

override fun onError(t: Throwable?) {
expectUnreached()
}
})
finish(5)
}
}
Loading