Skip to content

Commit 5378b80

Browse files
committed
Deprecate obsolete reactive API, improve existing Reactive documentation
* Remove obsolete rx-example module * Properly document interoperability with Reactor context * Update reactive readme
1 parent 52b97b9 commit 5378b80

File tree

16 files changed

+104
-144
lines changed

16 files changed

+104
-144
lines changed

binary-compatibility-validator/resources/api.properties

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,6 @@
44

55
module.roots=/ integration reactive ui
66
module.marker=build.gradle
7-
module.ignore=kotlinx-coroutines-rx-example stdlib-stubs benchmarks knit binary-compatibility-validator site publication-validator kotlinx-coroutines-bom
7+
module.ignore=stdlib-stubs benchmarks knit binary-compatibility-validator site publication-validator kotlinx-coroutines-bom
88

99
packages.internal=kotlinx.coroutines.internal

build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ def coreModule = "kotlinx-coroutines-core"
1111
def sourceless = ['kotlinx.coroutines', 'site', 'kotlinx-coroutines-bom']
1212
def internal = ['kotlinx.coroutines', 'site', 'benchmarks', 'knit', 'js-stub', 'stdlib-stubs', 'binary-compatibility-validator']
1313
// Not published
14-
def unpublished = internal + ['kotlinx-coroutines-rx-example', 'example-frontend-js', 'android-unit-tests']
14+
def unpublished = internal + ['example-frontend-js', 'android-unit-tests']
1515

1616
static def platformOf(project) {
1717
def name = project.name

reactive/kotlinx-coroutines-reactive/README.md

+14-10
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,16 @@ Coroutine builders:
88
| --------------- | ----------------------------- | ---------------- | ---------------
99
| [publish] | `Publisher` | [ProducerScope] | Cold reactive publisher that starts coroutine on subscribe
1010

11+
Integration with [Flow]:
12+
13+
| **Name** | **Result** | **Description**
14+
| --------------- | -------------- | ---------------
15+
| [Publisher.asFlow] | `Flow` | Converts the given publisher to flow
16+
| [Flow.asPublisher] | `Publisher` | Converts the given flow to the TCK-compliant publisher
17+
18+
If these adapters are used along with `kotlinx-coroutines-reactor` in the classpath, then Reactor's `Context` is properly
19+
propagated as coroutine context element (`ReactorContext`) and vice versa.
20+
1121
Suspending extension functions and suspending iteration:
1222

1323
| **Name** | **Description**
@@ -18,29 +28,23 @@ Suspending extension functions and suspending iteration:
1828
| [Publisher.awaitFirstOrNull][org.reactivestreams.Publisher.awaitFirstOrNull] | Returns the first value from the given publisher or null
1929
| [Publisher.awaitLast][org.reactivestreams.Publisher.awaitFirst] | Returns the last value from the given publisher
2030
| [Publisher.awaitSingle][org.reactivestreams.Publisher.awaitSingle] | Returns the single value from the given publisher
21-
| [Publisher.openSubscription][org.reactivestreams.Publisher.openSubscription] | Subscribes to publisher and returns [ReceiveChannel]
22-
23-
Conversion functions:
24-
25-
| **Name** | **Description**
26-
| -------- | ---------------
27-
| [ReceiveChannel.asPublisher][kotlinx.coroutines.channels.ReceiveChannel.asPublisher] | Converts streaming channel to hot publisher
2831

2932
<!--- MODULE kotlinx-coroutines-core -->
3033
<!--- INDEX kotlinx.coroutines -->
34+
<!--- INDEX kotlinx.coroutines.flow -->
35+
[Flow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow/index.html
3136
<!--- INDEX kotlinx.coroutines.channels -->
3237
[ProducerScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-producer-scope/index.html
33-
[ReceiveChannel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/index.html
3438
<!--- MODULE kotlinx-coroutines-reactive -->
3539
<!--- INDEX kotlinx.coroutines.reactive -->
3640
[publish]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/publish.html
41+
[Publisher.asFlow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/org.reactivestreams.-publisher/as-flow.html
42+
[Flow.asPublisher]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/kotlinx.coroutines.flow.-flow/as-publisher.html
3743
[org.reactivestreams.Publisher.awaitFirst]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/org.reactivestreams.-publisher/await-first.html
3844
[org.reactivestreams.Publisher.awaitFirstOrDefault]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/org.reactivestreams.-publisher/await-first-or-default.html
3945
[org.reactivestreams.Publisher.awaitFirstOrElse]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/org.reactivestreams.-publisher/await-first-or-else.html
4046
[org.reactivestreams.Publisher.awaitFirstOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/org.reactivestreams.-publisher/await-first-or-null.html
4147
[org.reactivestreams.Publisher.awaitSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/org.reactivestreams.-publisher/await-single.html
42-
[org.reactivestreams.Publisher.openSubscription]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/org.reactivestreams.-publisher/open-subscription.html
43-
[kotlinx.coroutines.channels.ReceiveChannel.asPublisher]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/kotlinx.coroutines.channels.-receive-channel/as-publisher.html
4448
<!--- END -->
4549

4650
# Package kotlinx.coroutines.reactive

reactive/kotlinx-coroutines-reactive/src/Channel.kt

+13-6
Original file line numberDiff line numberDiff line change
@@ -5,21 +5,28 @@
55
package kotlinx.coroutines.reactive
66

77
import kotlinx.atomicfu.*
8-
import kotlinx.coroutines.*
98
import kotlinx.coroutines.channels.*
9+
import kotlinx.coroutines.flow.*
1010
import kotlinx.coroutines.internal.*
1111
import org.reactivestreams.*
1212

1313
/**
1414
* Subscribes to this [Publisher] and returns a channel to receive elements emitted by it.
1515
* The resulting channel shall be [cancelled][ReceiveChannel.cancel] to unsubscribe from this publisher.
16-
*
17-
* **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
18-
* See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
19-
*
16+
2017
* @param request how many items to request from publisher in advance (optional, one by default).
18+
*
19+
* This method is deprecated in the favor of [Flow].
20+
* Instead of iterating over the resulting channel please use [collect][Flow.collect]:
21+
* ```
22+
* asFlow().collect { value ->
23+
* // process value
24+
* }
25+
* ```
2126
*/
22-
@ObsoleteCoroutinesApi
27+
@Deprecated(
28+
message = "Transforming publisher to channel is deprecated, use asFlow() instead",
29+
level = DeprecationLevel.WARNING) // Will be error in 1.4
2330
public fun <T> Publisher<T>.openSubscription(request: Int = 1): ReceiveChannel<T> {
2431
val channel = SubscriptionChannel<T>(request)
2532
subscribe(channel)

reactive/kotlinx-coroutines-reactive/src/Convert.kt

+3-6
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
package kotlinx.coroutines.reactive
66

7-
import kotlinx.coroutines.*
87
import kotlinx.coroutines.channels.*
98
import org.reactivestreams.*
109
import kotlin.coroutines.*
@@ -14,13 +13,11 @@ import kotlin.coroutines.*
1413
*
1514
* Every subscriber receives values from this channel in **fan-out** fashion. If the are multiple subscribers,
1615
* they'll receive values in round-robin way.
17-
*
18-
* **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
19-
* See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
20-
*
2116
* @param context -- the coroutine context from which the resulting observable is going to be signalled
2217
*/
23-
@ObsoleteCoroutinesApi
18+
@Deprecated(message = "Deprecated in the favour of consumeAsFlow()",
19+
level = DeprecationLevel.WARNING,
20+
replaceWith = ReplaceWith("this.consumeAsFlow().asPublisher()"))
2421
public fun <T> ReceiveChannel<T>.asPublisher(context: CoroutineContext = EmptyCoroutineContext): Publisher<T> = publish(context) {
2522
for (t in this@asPublisher)
2623
send(t)

reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt

+8-2
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,23 @@ import kotlin.coroutines.*
1717
/**
1818
* Transforms the given reactive [Publisher] into [Flow].
1919
* Use [buffer] operator on the resulting flow to specify the size of the backpressure.
20-
* More precisely, to it specifies the value of the subscription's [request][Subscription.request].
20+
* More precisely, it specifies the value of the subscription's [request][Subscription.request].
2121
* `1` is used by default.
2222
*
2323
* If any of the resulting flow transformations fails, subscription is immediately cancelled and all in-flights elements
2424
* are discarded.
25+
*
26+
* This function is integrated with `ReactorContext` from `kotlinx-coroutines-reactor` module,
27+
* see its documentation for additional details.
2528
*/
2629
public fun <T : Any> Publisher<T>.asFlow(): Flow<T> =
2730
PublisherAsFlow(this, 1)
2831

2932
/**
30-
* Transforms the given flow to a spec-compliant [Publisher].
33+
* Transforms the given flow to a reactive specification compliant [Publisher].
34+
*
35+
* This function is integrated with `ReactorContext` from `kotlinx-coroutines-reactor` module,
36+
* see its documentation for additional details.
3137
*/
3238
public fun <T : Any> Flow<T>.asPublisher(): Publisher<T> = FlowAsPublisher(this)
3339

reactive/kotlinx-coroutines-reactor/README.md

+13-4
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,23 @@ Utilities for [Reactor](https://projectreactor.io).
44

55
Coroutine builders:
66

7-
| **Name** | **Result** | **Scope** | **Description**
8-
| --------------- | -------------------------------------- | ---------------- | ---------------
9-
| [mono] | `Mono` | [CoroutineScope] | Cold mono that starts coroutine on subscribe
10-
| [flux] | `Flux` | [CoroutineScope] | Cold flux that starts coroutine on subscribe
7+
| **Name** | **Result** | **Scope** | **Description**
8+
| --------------- | ------------| ---------------- | ---------------
9+
| [mono] | `Mono` | [CoroutineScope] | Cold mono that starts coroutine on subscribe
10+
| [flux] | `Flux` | [CoroutineScope] | Cold flux that starts coroutine on subscribe
1111

1212
Note that `Mono` and `Flux` are a subclass of [Reactive Streams](https://www.reactive-streams.org)
1313
`Publisher` and extensions for it are covered by
1414
[kotlinx-coroutines-reactive](../kotlinx-coroutines-reactive) module.
1515

16+
Integration with [Flow]:
17+
18+
| **Name** | **Result** | **Description**
19+
| --------------- | -------------- | ---------------
20+
| [Flow.asFlux] | `Flux` | Converts the given flow to the TCK-compliant Flux.
21+
22+
This adapter is integrated with Reactor's `Context` and coroutines [ReactiveContext].
23+
1624
Conversion functions:
1725

1826
| **Name** | **Description**
@@ -31,6 +39,7 @@ Conversion functions:
3139
<!--- INDEX kotlinx.coroutines.reactor -->
3240
[mono]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactor/kotlinx.coroutines.reactor/mono.html
3341
[flux]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactor/kotlinx.coroutines.reactor/flux.html
42+
[Flow.asFlux]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactor/kotlinx.coroutines.reactor/kotlinx.coroutines.flow.-flow/as-flux.html
3443
[kotlinx.coroutines.Job.asMono]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactor/kotlinx.coroutines.reactor/kotlinx.coroutines.-job/as-mono.html
3544
[kotlinx.coroutines.Deferred.asMono]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactor/kotlinx.coroutines.reactor/kotlinx.coroutines.-deferred/as-mono.html
3645
[kotlinx.coroutines.channels.ReceiveChannel.asFlux]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactor/kotlinx.coroutines.reactor/kotlinx.coroutines.channels.-receive-channel/as-flux.html

reactive/kotlinx-coroutines-reactor/src/Convert.kt

+3-5
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,11 @@ public fun <T> Deferred<T?>.asMono(context: CoroutineContext): Mono<T> = mono(co
4343
*
4444
* Every subscriber receives values from this channel in **fan-out** fashion. If the are multiple subscribers,
4545
* they'll receive values in round-robin way.
46-
*
47-
* **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
48-
* See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
49-
*
5046
* @param context -- the coroutine context from which the resulting flux is going to be signalled
5147
*/
52-
@ObsoleteCoroutinesApi
48+
@Deprecated(message = "Deprecated in the favour of consumeAsFlow()",
49+
level = DeprecationLevel.WARNING,
50+
replaceWith = ReplaceWith("this.consumeAsFlow().asFlux()"))
5351
public fun <T> ReceiveChannel<T>.asFlux(context: CoroutineContext = EmptyCoroutineContext): Flux<T> = flux(context) {
5452
for (t in this@asFlux)
5553
send(t)

reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt

+27-27
Original file line numberDiff line numberDiff line change
@@ -3,48 +3,48 @@ package kotlinx.coroutines.reactor
33
import kotlinx.coroutines.ExperimentalCoroutinesApi
44
import reactor.util.context.Context
55
import kotlin.coroutines.*
6+
import kotlinx.coroutines.reactive.*
67

78
/**
89
* Wraps Reactor's [Context] into [CoroutineContext] element for seamless integration Reactor and kotlinx.coroutines.
9-
*
1010
* [Context.asCoroutineContext] is defined to add Reactor's [Context] elements as part of [CoroutineContext].
11+
* Coroutine context element that propagates information about Reactor's [Context] through coroutines.
1112
*
12-
* Reactor builders [mono] and [flux] use this context element to enhance the resulting `subscriberContext`.
13+
* This context element is implicitly propagated through subscriber's context by all Reactive integrations, such as [mono], [flux],
14+
* [Publisher.asFlow][asFlow], [Flow.asPublisher][asPublisher] and [Flow.asFlux][asFlux].
15+
* Functions that subscribe to the reactive stream (e.g. [Publisher.awaitFirst][awaitFirst]) also propagate [ReactorContext] to the
16+
* subscriber's [Context].
17+
**
18+
* ### Examples of Reactive context integration.
1319
*
14-
* ### Usages
15-
* Passing reactor context from coroutine builder to reactor entity:
16-
* ```
17-
* launch(Context.of("key", "value").asCoroutineContext()) {
18-
* mono {
19-
* println(coroutineContext[ReactorContext]) // Prints { "key": "value" }
20-
* }.subscribe()
21-
* }
20+
* #### Propagating ReactorContext to Reactor's Context
2221
* ```
22+
* val flux = myDatabaseService.getUsers()
23+
* .subscriberContext() { ctx -> println(ctx); ctx }
24+
* flux.await() // Will print "null"
2325
*
24-
* Accessing modified reactor context enriched from the downstream:
25-
* ```
26-
* launch {
27-
* mono {
28-
* println(coroutineContext[ReactorContext]) // Prints { "key": "value" }
29-
* }.subscriberContext(Context.of("key", "value"))
30-
* .subscribe()
26+
* // Now add ReactorContext
27+
* withContext(Context.of("answer", "42").asCoroutineContext()) {
28+
* flux.await() // Will print "Context{'key'='value'}"
3129
* }
3230
* ```
3331
*
34-
* [CoroutineContext] of a suspendable function that awaits a value from [Mono] or [Flux] instance
35-
* is propagated into [mono] and [flux] Reactor builders:
32+
* #### Propagating subscriber's Context to ReactorContext:
3633
* ```
37-
* launch(Context.of("key", "value").asCoroutineContext()) {
38-
* assertEquals(bar().awaitFirst(), "value")
39-
* }
40-
*
41-
* fun bar(): Mono<String> = mono {
42-
* coroutineContext[ReactorContext]!!.context.get("key")
34+
* val flow = flow {
35+
* println("Reactor context in Flow: " + coroutineContext[ReactorContext])
4336
* }
37+
* // No context
38+
* flow.asFlux()
39+
* .subscribe() // Will print 'Reactor context in Flow: null'
40+
* // Add subscriber's context
41+
* flow.asFlux()
42+
* .subscriberContext { ctx -> ctx.put("answer", 42) }
43+
* .subscribe() // Will print "Reactor context in Flow: Context{'answer'=42}"
4444
* ```
4545
*/
4646
@ExperimentalCoroutinesApi
47-
public class ReactorContext(val context: Context) : AbstractCoroutineContextElement(ReactorContext) {
47+
public class ReactorContext(public val context: Context) : AbstractCoroutineContextElement(ReactorContext) {
4848
companion object Key : CoroutineContext.Key<ReactorContext>
4949
}
5050

@@ -53,4 +53,4 @@ public class ReactorContext(val context: Context) : AbstractCoroutineContextElem
5353
* and later used via `coroutineContext[ReactorContext]`.
5454
*/
5555
@ExperimentalCoroutinesApi
56-
public fun Context.asCoroutineContext(): ReactorContext = ReactorContext(this)
56+
public fun Context.asCoroutineContext(): ReactorContext = ReactorContext(this)

reactive/kotlinx-coroutines-reactor/src/ReactorFlow.kt

+2
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ import reactor.core.publisher.Flux
1313
/**
1414
* Converts the given flow to a cold flux.
1515
* The original flow is cancelled when the flux subscriber is disposed.
16+
*
17+
* This function is integrated with [ReactorContext], see its documentation for additional details.
1618
*/
1719
public fun <T: Any> Flow<T>.asFlux(): Flux<T> = FlowAsFlux(this)
1820

reactive/kotlinx-coroutines-rx-example/build.gradle

-15
This file was deleted.

reactive/kotlinx-coroutines-rx-example/src/main.kt

-52
This file was deleted.

0 commit comments

Comments
 (0)