Skip to content

Fixes for the reactive integrations #2617

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 15 additions & 14 deletions reactive/kotlinx-coroutines-jdk9/src/Publish.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,33 +6,34 @@ package kotlinx.coroutines.jdk9

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.reactive.*
import java.util.concurrent.*
import kotlin.coroutines.*
import org.reactivestreams.FlowAdapters

/**
* Creates cold reactive [Flow.Publisher] that runs a given [block] in a coroutine.
* Creates a cold reactive [Flow.Publisher] that runs a given [block] in a coroutine.
*
* Every time the returned flux is subscribed, it starts a new coroutine in the specified [context].
* Coroutine emits ([Subscriber.onNext]) values with `send`, completes ([Subscriber.onComplete])
* when the coroutine completes or channel is explicitly closed and emits error ([Subscriber.onError])
* if coroutine throws an exception or closes channel with a cause.
* Unsubscribing cancels running coroutine.
* The coroutine emits (via [Flow.Subscriber.onNext]) values with [send][ProducerScope.send],
* completes (via [Flow.Subscriber.onComplete]) when the coroutine completes or channel is explicitly closed, and emits
* errors (via [Flow.Subscriber.onError]) if the coroutine throws an exception or closes channel with a cause.
* Unsubscribing cancels the running coroutine.
*
* Invocations of `send` are suspended appropriately when subscribers apply back-pressure and to ensure that
* `onNext` is not invoked concurrently.
* Invocations of [send][ProducerScope.send] are suspended appropriately when subscribers apply back-pressure and to
* ensure that [onNext][Flow.Subscriber.onNext] is not invoked concurrently.
*
* Coroutine context can be specified with [context] argument.
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
* Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance.
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is
* used.
*
* **Note: This is an experimental api.** Behaviour of publishers that work as children in a parent scope with respect
* to cancellation and error handling may change in the future.
*
* @throws IllegalArgumentException if the provided [context] contains a [Job] instance.
*/
@ExperimentalCoroutinesApi // Since 1.3.x
@ExperimentalCoroutinesApi
public fun <T> flowPublish(
context: CoroutineContext = EmptyCoroutineContext,
@BuilderInference block: suspend ProducerScope<T>.() -> Unit
): Flow.Publisher<T> {
val reactivePublisher : org.reactivestreams.Publisher<T> = kotlinx.coroutines.reactive.publish<T>(context, block)
return FlowAdapters.toFlowPublisher(reactivePublisher)
}
): Flow.Publisher<T> = FlowAdapters.toFlowPublisher(publish(context, block))
36 changes: 19 additions & 17 deletions reactive/kotlinx-coroutines-jdk9/src/ReactiveFlow.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,41 +7,43 @@ package kotlinx.coroutines.jdk9
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.reactive.asFlow
import kotlinx.coroutines.reactive.asPublisher
import kotlinx.coroutines.reactive.asPublisher as asReactivePublisher
import kotlinx.coroutines.reactive.collect
import kotlinx.coroutines.channels.*
import org.reactivestreams.*
import kotlin.coroutines.*
import java.util.concurrent.Flow as JFlow

/**
* Transforms the given reactive [Publisher] into [Flow].
* Use [buffer] operator on the resulting flow to specify the size of the backpressure.
* More precisely, it specifies the value of the subscription's [request][Subscription.request].
* [buffer] default capacity is used by default.
* Transforms the given reactive [Flow Publisher][JFlow.Publisher] into [Flow].
* Use the [buffer] operator on the resulting flow to specify the size of the back-pressure.
* In effect, it specifies the value of the subscription's [request][JFlow.Subscription.request].
* The [default buffer capacity][Channel.BUFFERED] for a suspending channel is used by default.
*
* If any of the resulting flow transformations fails, subscription is immediately cancelled and all in-flight elements
* are discarded.
* If any of the resulting flow transformations fails, the subscription is immediately cancelled and all the in-flight
* elements are discarded.
*/
public fun <T : Any> JFlow.Publisher<T>.asFlow(): Flow<T> =
FlowAdapters.toPublisher(this).asFlow()
FlowAdapters.toPublisher(this).asFlow()

/**
* Transforms the given flow to a reactive specification compliant [Publisher].
* Transforms the given flow into a reactive specification compliant [Flow Publisher][JFlow.Publisher].
*
* An optional [context] can be specified to control the execution context of calls to [Subscriber] methods.
* You can set a [CoroutineDispatcher] to confine them to a specific thread and/or various [ThreadContextElement] to
* An optional [context] can be specified to control the execution context of calls to the [Flow Subscriber][Subscriber]
* methods.
* A [CoroutineDispatcher] can be set to confine them to a specific thread; various [ThreadContextElement] can be set to
* inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
* is used, so calls are performed from an arbitrary thread.
*/
@JvmOverloads // binary compatibility
public fun <T : Any> Flow<T>.asPublisher(context: CoroutineContext = EmptyCoroutineContext): JFlow.Publisher<T> {
val reactivePublisher : org.reactivestreams.Publisher<T> = this.asPublisher<T>(context)
return FlowAdapters.toFlowPublisher(reactivePublisher)
}
public fun <T : Any> Flow<T>.asPublisher(context: CoroutineContext = EmptyCoroutineContext): JFlow.Publisher<T> =
FlowAdapters.toFlowPublisher(asReactivePublisher(context))

/**
* Subscribes to this [Publisher] and performs the specified action for each received element.
* Cancels subscription if any exception happens during collect.
* Subscribes to this [Flow Publisher][JFlow.Publisher] and performs the specified action for each received element.
*
* If [action] throws an exception at some point, the subscription is cancelled, and the exception is rethrown from
* [collect]. Also, if the publisher signals an error, that error is rethrown from [collect].
*/
public suspend inline fun <T> JFlow.Publisher<T>.collect(action: (T) -> Unit): Unit =
FlowAdapters.toPublisher(this).collect(action)
146 changes: 146 additions & 0 deletions reactive/kotlinx-coroutines-jdk9/test/PublisherCollectTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.jdk9

import kotlinx.coroutines.*
import kotlinx.coroutines.reactive.*
import org.junit.Test
import org.reactivestreams.*
import kotlin.test.*
import java.util.concurrent.Flow as JFlow

class PublisherCollectTest: TestBase() {

/** Tests the simple scenario where the publisher outputs a bounded stream of values to collect. */
@Test
fun testCollect() = runTest {
val x = 100
val xSum = x * (x + 1) / 2
val publisher = JFlow.Publisher<Int> { subscriber ->
var requested = 0L
var lastOutput = 0
subscriber.onSubscribe(object: JFlow.Subscription {

override fun request(n: Long) {
requested += n
if (n <= 0) {
subscriber.onError(IllegalArgumentException())
return
}
while (lastOutput < x && lastOutput < requested) {
lastOutput += 1
subscriber.onNext(lastOutput)
}
if (lastOutput == x)
subscriber.onComplete()
}

override fun cancel() {
/** According to rule 3.5 of the
* [reactive spec](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#3.5),
* this method can be called by the subscriber at any point, so it's not an error if it's called
* in this scenario. */
}

})
}
var sum = 0
publisher.collect {
sum += it
}
assertEquals(xSum, sum)
}

/** Tests the behavior of [collect] when the publisher raises an error. */
@Test
fun testCollectThrowingPublisher() = runTest {
val errorString = "Too many elements requested"
val x = 100
val xSum = x * (x + 1) / 2
val publisher = Publisher<Int> { subscriber ->
var requested = 0L
var lastOutput = 0
subscriber.onSubscribe(object: Subscription {

override fun request(n: Long) {
requested += n
if (n <= 0) {
subscriber.onError(IllegalArgumentException())
return
}
while (lastOutput < x && lastOutput < requested) {
lastOutput += 1
subscriber.onNext(lastOutput)
}
if (lastOutput == x)
subscriber.onError(IllegalArgumentException(errorString))
}

override fun cancel() {
/** See the comment for the corresponding part of [testCollect]. */
}

})
}
var sum = 0
try {
publisher.collect {
sum += it
}
} catch (e: IllegalArgumentException) {
assertEquals(errorString, e.message)
}
assertEquals(xSum, sum)
}

/** Tests the behavior of [collect] when the action throws. */
@Test
fun testCollectThrowingAction() = runTest {
val errorString = "Too many elements produced"
val x = 100
val xSum = x * (x + 1) / 2
val publisher = Publisher<Int> { subscriber ->
var requested = 0L
var lastOutput = 0
subscriber.onSubscribe(object: Subscription {

override fun request(n: Long) {
requested += n
if (n <= 0) {
subscriber.onError(IllegalArgumentException())
return
}
while (lastOutput < x && lastOutput < requested) {
lastOutput += 1
subscriber.onNext(lastOutput)
}
}

override fun cancel() {
assertEquals(x, lastOutput)
expect(x + 2)
}

})
}
var sum = 0
try {
expect(1)
var i = 1
publisher.collect {
sum += it
i += 1
expect(i)
if (sum >= xSum) {
throw IllegalArgumentException(errorString)
}
}
} catch (e: IllegalArgumentException) {
expect(x + 3)
assertEquals(errorString, e.message)
}
finish(x + 4)
}
}
14 changes: 8 additions & 6 deletions reactive/kotlinx-coroutines-reactive/src/Channel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ import kotlinx.coroutines.internal.*
import org.reactivestreams.*

/**
* Subscribes to this [Publisher] and returns a channel to receive elements emitted by it.
* The resulting channel shall be [cancelled][ReceiveChannel.cancel] to unsubscribe from this publisher.
* Subscribes to this [Publisher] and returns a channel to receive the elements emitted by it.
* The resulting channel needs to be [cancelled][ReceiveChannel.cancel] in order to unsubscribe from this publisher.

* @param request how many items to request from publisher in advance (optional, one by default).
* @param request how many items to request from the publisher in advance (optional, a single element by default).
*
* This method is deprecated in the favor of [Flow].
* Instead of iterating over the resulting channel please use [collect][Flow.collect]:
Expand All @@ -35,7 +35,9 @@ public fun <T> Publisher<T>.openSubscription(request: Int = 1): ReceiveChannel<T

/**
* Subscribes to this [Publisher] and performs the specified action for each received element.
* Cancels subscription if any exception happens during collect.
*
* If [action] throws an exception at some point, the subscription is cancelled, and the exception is rethrown from
* [collect]. Also, if the publisher signals an error, that error is rethrown from [collect].
*/
public suspend inline fun <T> Publisher<T>.collect(action: (T) -> Unit): Unit =
toChannel().consumeEach(action)
Expand All @@ -61,7 +63,7 @@ private class SubscriptionChannel<T>(
// can be negative if we have receivers, but no subscription yet
private val _requested = atomic(0)

// AbstractChannel overrides
// --------------------- AbstractChannel overrides -------------------------------
@Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER")
override fun onReceiveEnqueued() {
_requested.loop { wasRequested ->
Expand Down Expand Up @@ -89,7 +91,7 @@ private class SubscriptionChannel<T>(
_subscription.getAndSet(null)?.cancel() // cancel exactly once
}

// Subscriber overrides
// --------------------- Subscriber overrides -------------------------------
override fun onSubscribe(s: Subscription) {
_subscription.value = s
while (true) { // lock-free loop on _requested
Expand Down
21 changes: 12 additions & 9 deletions reactive/kotlinx-coroutines-reactive/src/Publish.kt
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,25 @@ import org.reactivestreams.*
import kotlin.coroutines.*

/**
* Creates cold reactive [Publisher] that runs a given [block] in a coroutine.
* Creates a cold reactive [Publisher] that runs a given [block] in a coroutine.
*
* Every time the returned flux is subscribed, it starts a new coroutine in the specified [context].
* Coroutine emits ([Subscriber.onNext]) values with `send`, completes ([Subscriber.onComplete])
* when the coroutine completes or channel is explicitly closed and emits error ([Subscriber.onError])
* if coroutine throws an exception or closes channel with a cause.
* Unsubscribing cancels running coroutine.
* The coroutine emits (via [Subscriber.onNext]) values with [send][ProducerScope.send],
* completes (via [Subscriber.onComplete]) when the coroutine completes or channel is explicitly closed, and emits
* errors (via [Subscriber.onError]) if the coroutine throws an exception or closes channel with a cause.
* Unsubscribing cancels the running coroutine.
*
* Invocations of `send` are suspended appropriately when subscribers apply back-pressure and to ensure that
* `onNext` is not invoked concurrently.
* Invocations of [send][ProducerScope.send] are suspended appropriately when subscribers apply back-pressure and to
* ensure that [onNext][Subscriber.onNext] is not invoked concurrently.
*
* Coroutine context can be specified with [context] argument.
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
* Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance.
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is
* used.
*
* **Note: This is an experimental api.** Behaviour of publishers that work as children in a parent scope with respect
* to cancellation and error handling may change in the future.
*
* @throws IllegalArgumentException if the provided [context] contains a [Job] instance.
*/
@ExperimentalCoroutinesApi
public fun <T> publish(
Expand Down
20 changes: 10 additions & 10 deletions reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ import kotlinx.coroutines.internal.*

/**
* Transforms the given reactive [Publisher] into [Flow].
* Use [buffer] operator on the resulting flow to specify the size of the backpressure.
* More precisely, it specifies the value of the subscription's [request][Subscription.request].
* [buffer] default capacity is used by default.
* Use the [buffer] operator on the resulting flow to specify the size of the back-pressure.
* In effect, it specifies the value of the subscription's [request][Subscription.request].
* The [default buffer capacity][Channel.BUFFERED] for a suspending channel is used by default.
*
* If any of the resulting flow transformations fails, subscription is immediately cancelled and all in-flight elements
* are discarded.
* If any of the resulting flow transformations fails, the subscription is immediately cancelled and all the in-flight
* elements are discarded.
*
* This function is integrated with `ReactorContext` from `kotlinx-coroutines-reactor` module,
* see its documentation for additional details.
Expand All @@ -31,13 +31,13 @@ public fun <T : Any> Publisher<T>.asFlow(): Flow<T> =
PublisherAsFlow(this)

/**
* Transforms the given flow to a reactive specification compliant [Publisher].
* Transforms the given flow into a reactive specification compliant [Publisher].
*
* This function is integrated with `ReactorContext` from `kotlinx-coroutines-reactor` module,
* see its documentation for additional details.
*
* An optional [context] can be specified to control the execution context of calls to [Subscriber] methods.
* You can set a [CoroutineDispatcher] to confine them to a specific thread and/or various [ThreadContextElement] to
* An optional [context] can be specified to control the execution context of calls to the [Subscriber] methods.
* A [CoroutineDispatcher] can be set to confine them to a specific thread; various [ThreadContextElement] can be set to
* inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
* is used, so calls are performed from an arbitrary thread.
*/
Expand All @@ -55,8 +55,8 @@ private class PublisherAsFlow<T : Any>(
PublisherAsFlow(publisher, context, capacity, onBufferOverflow)

/*
* Suppress for Channel.CHANNEL_DEFAULT_CAPACITY.
* It's too counter-intuitive to be public and moving it to Flow companion
* The @Suppress is for Channel.CHANNEL_DEFAULT_CAPACITY.
* It's too counter-intuitive to be public, and moving it to Flow companion
* will also create undesired effect.
*/
@Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
Expand Down
Loading