Skip to content

Commit 05d3018

Browse files
authored
Fixes for the reactive integrations (#2617)
Reworked the comments, added new tests. Fixed a bug where `Maybe.collect()` would hang on success.
1 parent 835ee18 commit 05d3018

File tree

13 files changed

+610
-63
lines changed

13 files changed

+610
-63
lines changed

reactive/kotlinx-coroutines-jdk9/src/Publish.kt

+15-14
Original file line numberDiff line numberDiff line change
@@ -6,33 +6,34 @@ package kotlinx.coroutines.jdk9
66

77
import kotlinx.coroutines.*
88
import kotlinx.coroutines.channels.*
9+
import kotlinx.coroutines.reactive.*
910
import java.util.concurrent.*
1011
import kotlin.coroutines.*
1112
import org.reactivestreams.FlowAdapters
1213

1314
/**
14-
* Creates cold reactive [Flow.Publisher] that runs a given [block] in a coroutine.
15+
* Creates a cold reactive [Flow.Publisher] that runs a given [block] in a coroutine.
16+
*
1517
* Every time the returned flux is subscribed, it starts a new coroutine in the specified [context].
16-
* Coroutine emits ([Subscriber.onNext]) values with `send`, completes ([Subscriber.onComplete])
17-
* when the coroutine completes or channel is explicitly closed and emits error ([Subscriber.onError])
18-
* if coroutine throws an exception or closes channel with a cause.
19-
* Unsubscribing cancels running coroutine.
18+
* The coroutine emits (via [Flow.Subscriber.onNext]) values with [send][ProducerScope.send],
19+
* completes (via [Flow.Subscriber.onComplete]) when the coroutine completes or channel is explicitly closed, and emits
20+
* errors (via [Flow.Subscriber.onError]) if the coroutine throws an exception or closes channel with a cause.
21+
* Unsubscribing cancels the running coroutine.
2022
*
21-
* Invocations of `send` are suspended appropriately when subscribers apply back-pressure and to ensure that
22-
* `onNext` is not invoked concurrently.
23+
* Invocations of [send][ProducerScope.send] are suspended appropriately when subscribers apply back-pressure and to
24+
* ensure that [onNext][Flow.Subscriber.onNext] is not invoked concurrently.
2325
*
2426
* Coroutine context can be specified with [context] argument.
25-
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
26-
* Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance.
27+
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is
28+
* used.
2729
*
2830
* **Note: This is an experimental api.** Behaviour of publishers that work as children in a parent scope with respect
2931
* to cancellation and error handling may change in the future.
32+
*
33+
* @throws IllegalArgumentException if the provided [context] contains a [Job] instance.
3034
*/
31-
@ExperimentalCoroutinesApi // Since 1.3.x
35+
@ExperimentalCoroutinesApi
3236
public fun <T> flowPublish(
3337
context: CoroutineContext = EmptyCoroutineContext,
3438
@BuilderInference block: suspend ProducerScope<T>.() -> Unit
35-
): Flow.Publisher<T> {
36-
val reactivePublisher : org.reactivestreams.Publisher<T> = kotlinx.coroutines.reactive.publish<T>(context, block)
37-
return FlowAdapters.toFlowPublisher(reactivePublisher)
38-
}
39+
): Flow.Publisher<T> = FlowAdapters.toFlowPublisher(publish(context, block))

reactive/kotlinx-coroutines-jdk9/src/ReactiveFlow.kt

+19-17
Original file line numberDiff line numberDiff line change
@@ -7,41 +7,43 @@ package kotlinx.coroutines.jdk9
77
import kotlinx.coroutines.*
88
import kotlinx.coroutines.flow.*
99
import kotlinx.coroutines.reactive.asFlow
10-
import kotlinx.coroutines.reactive.asPublisher
10+
import kotlinx.coroutines.reactive.asPublisher as asReactivePublisher
1111
import kotlinx.coroutines.reactive.collect
12+
import kotlinx.coroutines.channels.*
1213
import org.reactivestreams.*
1314
import kotlin.coroutines.*
1415
import java.util.concurrent.Flow as JFlow
1516

1617
/**
17-
* Transforms the given reactive [Publisher] into [Flow].
18-
* Use [buffer] operator on the resulting flow to specify the size of the backpressure.
19-
* More precisely, it specifies the value of the subscription's [request][Subscription.request].
20-
* [buffer] default capacity is used by default.
18+
* Transforms the given reactive [Flow Publisher][JFlow.Publisher] into [Flow].
19+
* Use the [buffer] operator on the resulting flow to specify the size of the back-pressure.
20+
* In effect, it specifies the value of the subscription's [request][JFlow.Subscription.request].
21+
* The [default buffer capacity][Channel.BUFFERED] for a suspending channel is used by default.
2122
*
22-
* If any of the resulting flow transformations fails, subscription is immediately cancelled and all in-flight elements
23-
* are discarded.
23+
* If any of the resulting flow transformations fails, the subscription is immediately cancelled and all the in-flight
24+
* elements are discarded.
2425
*/
2526
public fun <T : Any> JFlow.Publisher<T>.asFlow(): Flow<T> =
26-
FlowAdapters.toPublisher(this).asFlow()
27+
FlowAdapters.toPublisher(this).asFlow()
2728

2829
/**
29-
* Transforms the given flow to a reactive specification compliant [Publisher].
30+
* Transforms the given flow into a reactive specification compliant [Flow Publisher][JFlow.Publisher].
3031
*
31-
* An optional [context] can be specified to control the execution context of calls to [Subscriber] methods.
32-
* You can set a [CoroutineDispatcher] to confine them to a specific thread and/or various [ThreadContextElement] to
32+
* An optional [context] can be specified to control the execution context of calls to the [Flow Subscriber][Subscriber]
33+
* methods.
34+
* A [CoroutineDispatcher] can be set to confine them to a specific thread; various [ThreadContextElement] can be set to
3335
* inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
3436
* is used, so calls are performed from an arbitrary thread.
3537
*/
3638
@JvmOverloads // binary compatibility
37-
public fun <T : Any> Flow<T>.asPublisher(context: CoroutineContext = EmptyCoroutineContext): JFlow.Publisher<T> {
38-
val reactivePublisher : org.reactivestreams.Publisher<T> = this.asPublisher<T>(context)
39-
return FlowAdapters.toFlowPublisher(reactivePublisher)
40-
}
39+
public fun <T : Any> Flow<T>.asPublisher(context: CoroutineContext = EmptyCoroutineContext): JFlow.Publisher<T> =
40+
FlowAdapters.toFlowPublisher(asReactivePublisher(context))
4141

4242
/**
43-
* Subscribes to this [Publisher] and performs the specified action for each received element.
44-
* Cancels subscription if any exception happens during collect.
43+
* Subscribes to this [Flow Publisher][JFlow.Publisher] and performs the specified action for each received element.
44+
*
45+
* If [action] throws an exception at some point, the subscription is cancelled, and the exception is rethrown from
46+
* [collect]. Also, if the publisher signals an error, that error is rethrown from [collect].
4547
*/
4648
public suspend inline fun <T> JFlow.Publisher<T>.collect(action: (T) -> Unit): Unit =
4749
FlowAdapters.toPublisher(this).collect(action)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
/*
2+
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.jdk9
6+
7+
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.reactive.*
9+
import org.junit.Test
10+
import org.reactivestreams.*
11+
import kotlin.test.*
12+
import java.util.concurrent.Flow as JFlow
13+
14+
class PublisherCollectTest: TestBase() {
15+
16+
/** Tests the simple scenario where the publisher outputs a bounded stream of values to collect. */
17+
@Test
18+
fun testCollect() = runTest {
19+
val x = 100
20+
val xSum = x * (x + 1) / 2
21+
val publisher = JFlow.Publisher<Int> { subscriber ->
22+
var requested = 0L
23+
var lastOutput = 0
24+
subscriber.onSubscribe(object: JFlow.Subscription {
25+
26+
override fun request(n: Long) {
27+
requested += n
28+
if (n <= 0) {
29+
subscriber.onError(IllegalArgumentException())
30+
return
31+
}
32+
while (lastOutput < x && lastOutput < requested) {
33+
lastOutput += 1
34+
subscriber.onNext(lastOutput)
35+
}
36+
if (lastOutput == x)
37+
subscriber.onComplete()
38+
}
39+
40+
override fun cancel() {
41+
/** According to rule 3.5 of the
42+
* [reactive spec](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#3.5),
43+
* this method can be called by the subscriber at any point, so it's not an error if it's called
44+
* in this scenario. */
45+
}
46+
47+
})
48+
}
49+
var sum = 0
50+
publisher.collect {
51+
sum += it
52+
}
53+
assertEquals(xSum, sum)
54+
}
55+
56+
/** Tests the behavior of [collect] when the publisher raises an error. */
57+
@Test
58+
fun testCollectThrowingPublisher() = runTest {
59+
val errorString = "Too many elements requested"
60+
val x = 100
61+
val xSum = x * (x + 1) / 2
62+
val publisher = Publisher<Int> { subscriber ->
63+
var requested = 0L
64+
var lastOutput = 0
65+
subscriber.onSubscribe(object: Subscription {
66+
67+
override fun request(n: Long) {
68+
requested += n
69+
if (n <= 0) {
70+
subscriber.onError(IllegalArgumentException())
71+
return
72+
}
73+
while (lastOutput < x && lastOutput < requested) {
74+
lastOutput += 1
75+
subscriber.onNext(lastOutput)
76+
}
77+
if (lastOutput == x)
78+
subscriber.onError(IllegalArgumentException(errorString))
79+
}
80+
81+
override fun cancel() {
82+
/** See the comment for the corresponding part of [testCollect]. */
83+
}
84+
85+
})
86+
}
87+
var sum = 0
88+
try {
89+
publisher.collect {
90+
sum += it
91+
}
92+
} catch (e: IllegalArgumentException) {
93+
assertEquals(errorString, e.message)
94+
}
95+
assertEquals(xSum, sum)
96+
}
97+
98+
/** Tests the behavior of [collect] when the action throws. */
99+
@Test
100+
fun testCollectThrowingAction() = runTest {
101+
val errorString = "Too many elements produced"
102+
val x = 100
103+
val xSum = x * (x + 1) / 2
104+
val publisher = Publisher<Int> { subscriber ->
105+
var requested = 0L
106+
var lastOutput = 0
107+
subscriber.onSubscribe(object: Subscription {
108+
109+
override fun request(n: Long) {
110+
requested += n
111+
if (n <= 0) {
112+
subscriber.onError(IllegalArgumentException())
113+
return
114+
}
115+
while (lastOutput < x && lastOutput < requested) {
116+
lastOutput += 1
117+
subscriber.onNext(lastOutput)
118+
}
119+
}
120+
121+
override fun cancel() {
122+
assertEquals(x, lastOutput)
123+
expect(x + 2)
124+
}
125+
126+
})
127+
}
128+
var sum = 0
129+
try {
130+
expect(1)
131+
var i = 1
132+
publisher.collect {
133+
sum += it
134+
i += 1
135+
expect(i)
136+
if (sum >= xSum) {
137+
throw IllegalArgumentException(errorString)
138+
}
139+
}
140+
} catch (e: IllegalArgumentException) {
141+
expect(x + 3)
142+
assertEquals(errorString, e.message)
143+
}
144+
finish(x + 4)
145+
}
146+
}

reactive/kotlinx-coroutines-reactive/src/Channel.kt

+8-6
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,10 @@ import kotlinx.coroutines.internal.*
1111
import org.reactivestreams.*
1212

1313
/**
14-
* Subscribes to this [Publisher] and returns a channel to receive elements emitted by it.
15-
* The resulting channel shall be [cancelled][ReceiveChannel.cancel] to unsubscribe from this publisher.
14+
* Subscribes to this [Publisher] and returns a channel to receive the elements emitted by it.
15+
* The resulting channel needs to be [cancelled][ReceiveChannel.cancel] in order to unsubscribe from this publisher.
1616
17-
* @param request how many items to request from publisher in advance (optional, one by default).
17+
* @param request how many items to request from the publisher in advance (optional, a single element by default).
1818
*
1919
* This method is deprecated in the favor of [Flow].
2020
* Instead of iterating over the resulting channel please use [collect][Flow.collect]:
@@ -35,7 +35,9 @@ public fun <T> Publisher<T>.openSubscription(request: Int = 1): ReceiveChannel<T
3535

3636
/**
3737
* Subscribes to this [Publisher] and performs the specified action for each received element.
38-
* Cancels subscription if any exception happens during collect.
38+
*
39+
* If [action] throws an exception at some point, the subscription is cancelled, and the exception is rethrown from
40+
* [collect]. Also, if the publisher signals an error, that error is rethrown from [collect].
3941
*/
4042
public suspend inline fun <T> Publisher<T>.collect(action: (T) -> Unit): Unit =
4143
toChannel().consumeEach(action)
@@ -61,7 +63,7 @@ private class SubscriptionChannel<T>(
6163
// can be negative if we have receivers, but no subscription yet
6264
private val _requested = atomic(0)
6365

64-
// AbstractChannel overrides
66+
// --------------------- AbstractChannel overrides -------------------------------
6567
@Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER")
6668
override fun onReceiveEnqueued() {
6769
_requested.loop { wasRequested ->
@@ -89,7 +91,7 @@ private class SubscriptionChannel<T>(
8991
_subscription.getAndSet(null)?.cancel() // cancel exactly once
9092
}
9193

92-
// Subscriber overrides
94+
// --------------------- Subscriber overrides -------------------------------
9395
override fun onSubscribe(s: Subscription) {
9496
_subscription.value = s
9597
while (true) { // lock-free loop on _requested

reactive/kotlinx-coroutines-reactive/src/Publish.kt

+12-9
Original file line numberDiff line numberDiff line change
@@ -12,22 +12,25 @@ import org.reactivestreams.*
1212
import kotlin.coroutines.*
1313

1414
/**
15-
* Creates cold reactive [Publisher] that runs a given [block] in a coroutine.
15+
* Creates a cold reactive [Publisher] that runs a given [block] in a coroutine.
16+
*
1617
* Every time the returned flux is subscribed, it starts a new coroutine in the specified [context].
17-
* Coroutine emits ([Subscriber.onNext]) values with `send`, completes ([Subscriber.onComplete])
18-
* when the coroutine completes or channel is explicitly closed and emits error ([Subscriber.onError])
19-
* if coroutine throws an exception or closes channel with a cause.
20-
* Unsubscribing cancels running coroutine.
18+
* The coroutine emits (via [Subscriber.onNext]) values with [send][ProducerScope.send],
19+
* completes (via [Subscriber.onComplete]) when the coroutine completes or channel is explicitly closed, and emits
20+
* errors (via [Subscriber.onError]) if the coroutine throws an exception or closes channel with a cause.
21+
* Unsubscribing cancels the running coroutine.
2122
*
22-
* Invocations of `send` are suspended appropriately when subscribers apply back-pressure and to ensure that
23-
* `onNext` is not invoked concurrently.
23+
* Invocations of [send][ProducerScope.send] are suspended appropriately when subscribers apply back-pressure and to
24+
* ensure that [onNext][Subscriber.onNext] is not invoked concurrently.
2425
*
2526
* Coroutine context can be specified with [context] argument.
26-
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
27-
* Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance.
27+
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is
28+
* used.
2829
*
2930
* **Note: This is an experimental api.** Behaviour of publishers that work as children in a parent scope with respect
3031
* to cancellation and error handling may change in the future.
32+
*
33+
* @throws IllegalArgumentException if the provided [context] contains a [Job] instance.
3134
*/
3235
@ExperimentalCoroutinesApi
3336
public fun <T> publish(

reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt

+10-10
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,12 @@ import kotlinx.coroutines.internal.*
1717

1818
/**
1919
* Transforms the given reactive [Publisher] into [Flow].
20-
* Use [buffer] operator on the resulting flow to specify the size of the backpressure.
21-
* More precisely, it specifies the value of the subscription's [request][Subscription.request].
22-
* [buffer] default capacity is used by default.
20+
* Use the [buffer] operator on the resulting flow to specify the size of the back-pressure.
21+
* In effect, it specifies the value of the subscription's [request][Subscription.request].
22+
* The [default buffer capacity][Channel.BUFFERED] for a suspending channel is used by default.
2323
*
24-
* If any of the resulting flow transformations fails, subscription is immediately cancelled and all in-flight elements
25-
* are discarded.
24+
* If any of the resulting flow transformations fails, the subscription is immediately cancelled and all the in-flight
25+
* elements are discarded.
2626
*
2727
* This function is integrated with `ReactorContext` from `kotlinx-coroutines-reactor` module,
2828
* see its documentation for additional details.
@@ -31,13 +31,13 @@ public fun <T : Any> Publisher<T>.asFlow(): Flow<T> =
3131
PublisherAsFlow(this)
3232

3333
/**
34-
* Transforms the given flow to a reactive specification compliant [Publisher].
34+
* Transforms the given flow into a reactive specification compliant [Publisher].
3535
*
3636
* This function is integrated with `ReactorContext` from `kotlinx-coroutines-reactor` module,
3737
* see its documentation for additional details.
3838
*
39-
* An optional [context] can be specified to control the execution context of calls to [Subscriber] methods.
40-
* You can set a [CoroutineDispatcher] to confine them to a specific thread and/or various [ThreadContextElement] to
39+
* An optional [context] can be specified to control the execution context of calls to the [Subscriber] methods.
40+
* A [CoroutineDispatcher] can be set to confine them to a specific thread; various [ThreadContextElement] can be set to
4141
* inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
4242
* is used, so calls are performed from an arbitrary thread.
4343
*/
@@ -55,8 +55,8 @@ private class PublisherAsFlow<T : Any>(
5555
PublisherAsFlow(publisher, context, capacity, onBufferOverflow)
5656

5757
/*
58-
* Suppress for Channel.CHANNEL_DEFAULT_CAPACITY.
59-
* It's too counter-intuitive to be public and moving it to Flow companion
58+
* The @Suppress is for Channel.CHANNEL_DEFAULT_CAPACITY.
59+
* It's too counter-intuitive to be public, and moving it to Flow companion
6060
* will also create undesired effect.
6161
*/
6262
@Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")

0 commit comments

Comments
 (0)