Skip to content

Commit ff12a4a

Browse files
committed
Reword some docs in jdk9 and reactive, refactor
* Fixed the broken links in the docs * Some new links are added to the docs * More detailed descriptions of error handling for some methods * Small grammar fixes * Code formatting changes
1 parent e62f8f7 commit ff12a4a

File tree

7 files changed

+146
-129
lines changed

7 files changed

+146
-129
lines changed

reactive/kotlinx-coroutines-jdk9/src/Await.kt

+38-34
Original file line numberDiff line numberDiff line change
@@ -4,78 +4,82 @@
44

55
package kotlinx.coroutines.jdk9
66

7+
import kotlinx.coroutines.Job
78
import java.util.concurrent.*
89
import org.reactivestreams.FlowAdapters
910
import kotlinx.coroutines.reactive.*
1011

1112
/**
12-
* Awaits for the first value from the given publisher without blocking a thread and
13-
* returns the resulting value or throws the corresponding exception if this publisher had produced error.
13+
* Awaits the first value from the given publisher without blocking the thread and returns the resulting value, or, if
14+
* the publisher has produced an error, throws the corresponding exception.
1415
*
1516
* This suspending function is cancellable.
16-
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
17-
* immediately resumes with [CancellationException].
17+
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
18+
* function immediately cancels its [Flow.Subscription] and resumes with [CancellationException].
1819
*
19-
* @throws NoSuchElementException if publisher does not emit any value
20+
* @throws NoSuchElementException if the publisher does not emit any value
2021
*/
21-
public suspend fun <T> Flow.Publisher<T>.awaitFirst(): T = FlowAdapters.toPublisher(this).awaitFirst()
22+
public suspend fun <T> Flow.Publisher<T>.awaitFirst(): T =
23+
FlowAdapters.toPublisher(this).awaitFirst()
2224

2325
/**
24-
* Awaits for the first value from the given observable or the [default] value if none is emitted without blocking a
25-
* thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
26+
* Awaits the first value from the given observable, or returns the [default] value if none is emitted, without blocking
27+
* the thread, and returns the resulting value, or, if this observable has produced an error, throws the corresponding
28+
* exception.
2629
*
2730
* This suspending function is cancellable.
28-
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
29-
* immediately resumes with [CancellationException].
31+
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
32+
* function immediately cancels its [Flow.Subscription] and resumes with [CancellationException].
3033
*/
3134
public suspend fun <T> Flow.Publisher<T>.awaitFirstOrDefault(default: T): T =
32-
FlowAdapters.toPublisher(this).awaitFirstOrDefault(default)
35+
FlowAdapters.toPublisher(this).awaitFirstOrDefault(default)
3336

3437
/**
35-
* Awaits for the first value from the given observable or `null` value if none is emitted without blocking a
36-
* thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
38+
* Awaits the first value from the given observable, or returns `null` if none is emitted, without blocking the thread,
39+
* and returns the resulting value, or, if this observable has produced an error, throws the corresponding exception.
3740
*
3841
* This suspending function is cancellable.
39-
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
40-
* immediately resumes with [CancellationException].
42+
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
43+
* function immediately cancels its [Flow.Subscription] and resumes with [CancellationException].
4144
*/
4245
public suspend fun <T> Flow.Publisher<T>.awaitFirstOrNull(): T? =
43-
FlowAdapters.toPublisher(this).awaitFirstOrNull()
46+
FlowAdapters.toPublisher(this).awaitFirstOrNull()
4447

4548
/**
46-
* Awaits for the first value from the given observable or call [defaultValue] to get a value if none is emitted without blocking a
47-
* thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
49+
* Awaits the first value from the given observable, or calls [defaultValue] to get a value if none is emitted, without
50+
* blocking the thread, and returns the resulting value, or, if this observable has produced an error, throws the
51+
* corresponding exception.
4852
*
4953
* This suspending function is cancellable.
50-
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
51-
* immediately resumes with [CancellationException].
54+
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
55+
* function immediately cancels its [Flow.Subscription] and resumes with [CancellationException].
5256
*/
5357
public suspend fun <T> Flow.Publisher<T>.awaitFirstOrElse(defaultValue: () -> T): T =
54-
FlowAdapters.toPublisher(this).awaitFirstOrElse(defaultValue)
58+
FlowAdapters.toPublisher(this).awaitFirstOrElse(defaultValue)
5559

5660
/**
57-
* Awaits for the last value from the given publisher without blocking a thread and
58-
* returns the resulting value or throws the corresponding exception if this publisher had produced error.
61+
* Awaits the last value from the given publisher without blocking the thread and
62+
* returns the resulting value, or, if this publisher has produced an error, throws the corresponding exception.
5963
*
6064
* This suspending function is cancellable.
61-
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
62-
* immediately resumes with [CancellationException].
65+
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
66+
* function immediately cancels its [Flow.Subscription] and resumes with [CancellationException].
6367
*
64-
* @throws NoSuchElementException if publisher does not emit any value
68+
* @throws NoSuchElementException if the publisher does not emit any value
6569
*/
6670
public suspend fun <T> Flow.Publisher<T>.awaitLast(): T =
67-
FlowAdapters.toPublisher(this).awaitLast()
71+
FlowAdapters.toPublisher(this).awaitLast()
6872

6973
/**
70-
* Awaits for the single value from the given publisher without blocking a thread and
71-
* returns the resulting value or throws the corresponding exception if this publisher had produced error.
74+
* Awaits the single value from the given publisher without blocking the thread and returns the resulting value, or,
75+
* if this publisher has produced an error, throws the corresponding exception.
7276
*
7377
* This suspending function is cancellable.
74-
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
75-
* immediately resumes with [CancellationException].
78+
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
79+
* function immediately cancels its [Flow.Subscription] and resumes with [CancellationException].
7680
*
77-
* @throws NoSuchElementException if publisher does not emit any value
78-
* @throws IllegalArgumentException if publisher emits more than one value
81+
* @throws NoSuchElementException if the publisher does not emit any value
82+
* @throws IllegalArgumentException if the publisher emits more than one value
7983
*/
8084
public suspend fun <T> Flow.Publisher<T>.awaitSingle(): T =
81-
FlowAdapters.toPublisher(this).awaitSingle()
85+
FlowAdapters.toPublisher(this).awaitSingle()

reactive/kotlinx-coroutines-jdk9/src/Publish.kt

+15-14
Original file line numberDiff line numberDiff line change
@@ -6,33 +6,34 @@ package kotlinx.coroutines.jdk9
66

77
import kotlinx.coroutines.*
88
import kotlinx.coroutines.channels.*
9+
import kotlinx.coroutines.reactive.*
910
import java.util.concurrent.*
1011
import kotlin.coroutines.*
1112
import org.reactivestreams.FlowAdapters
1213

1314
/**
14-
* Creates cold reactive [Flow.Publisher] that runs a given [block] in a coroutine.
15+
* Creates a cold reactive [Flow.Publisher] that runs a given [block] in a coroutine.
16+
*
1517
* Every time the returned flux is subscribed, it starts a new coroutine in the specified [context].
16-
* Coroutine emits ([Subscriber.onNext]) values with `send`, completes ([Subscriber.onComplete])
17-
* when the coroutine completes or channel is explicitly closed and emits error ([Subscriber.onError])
18-
* if coroutine throws an exception or closes channel with a cause.
19-
* Unsubscribing cancels running coroutine.
18+
* The coroutine emits (via [Flow.Subscriber.onNext]) values with [send][ProducerScope.send],
19+
* completes (via [Flow.Subscriber.onComplete]) when the coroutine completes or channel is explicitly closed, and emits
20+
* errors (via [Flow.Subscriber.onError]) if the coroutine throws an exception or closes channel with a cause.
21+
* Unsubscribing cancels the running coroutine.
2022
*
21-
* Invocations of `send` are suspended appropriately when subscribers apply back-pressure and to ensure that
22-
* `onNext` is not invoked concurrently.
23+
* Invocations of [send][ProducerScope.send] are suspended appropriately when subscribers apply back-pressure and to
24+
* ensure that [onNext][Flow.Subscriber.onNext] is not invoked concurrently.
2325
*
2426
* Coroutine context can be specified with [context] argument.
25-
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
26-
* Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance.
27+
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is
28+
* used.
2729
*
2830
* **Note: This is an experimental api.** Behaviour of publishers that work as children in a parent scope with respect
2931
* to cancellation and error handling may change in the future.
32+
*
33+
* @throws IllegalArgumentException if the provided [context] contains a [Job] instance.
3034
*/
31-
@ExperimentalCoroutinesApi // Since 1.3.x
35+
@ExperimentalCoroutinesApi
3236
public fun <T> flowPublish(
3337
context: CoroutineContext = EmptyCoroutineContext,
3438
@BuilderInference block: suspend ProducerScope<T>.() -> Unit
35-
): Flow.Publisher<T> {
36-
val reactivePublisher : org.reactivestreams.Publisher<T> = kotlinx.coroutines.reactive.publish<T>(context, block)
37-
return FlowAdapters.toFlowPublisher(reactivePublisher)
38-
}
39+
): Flow.Publisher<T> = FlowAdapters.toFlowPublisher(publish(context, block))

reactive/kotlinx-coroutines-jdk9/src/ReactiveFlow.kt

+19-17
Original file line numberDiff line numberDiff line change
@@ -7,41 +7,43 @@ package kotlinx.coroutines.jdk9
77
import kotlinx.coroutines.*
88
import kotlinx.coroutines.flow.*
99
import kotlinx.coroutines.reactive.asFlow
10-
import kotlinx.coroutines.reactive.asPublisher
10+
import kotlinx.coroutines.reactive.asPublisher as asReactivePublisher
1111
import kotlinx.coroutines.reactive.collect
12+
import kotlinx.coroutines.channels.*
1213
import org.reactivestreams.*
1314
import kotlin.coroutines.*
1415
import java.util.concurrent.Flow as JFlow
1516

1617
/**
17-
* Transforms the given reactive [Publisher] into [Flow].
18-
* Use [buffer] operator on the resulting flow to specify the size of the backpressure.
19-
* More precisely, it specifies the value of the subscription's [request][Subscription.request].
20-
* [buffer] default capacity is used by default.
18+
* Transforms the given reactive [Flow Publisher][JFlow.Publisher] into [Flow].
19+
* Use the [buffer] operator on the resulting flow to specify the size of the back-pressure.
20+
* More precisely, it specifies the value of the subscription's [request][JFlow.Subscription.request].
21+
* The [default buffer capacity][Channel.BUFFERED] for a suspending channel is used by default.
2122
*
22-
* If any of the resulting flow transformations fails, subscription is immediately cancelled and all in-flight elements
23-
* are discarded.
23+
* If any of the resulting flow transformations fails, the subscription is immediately cancelled and all the in-flight
24+
* elements are discarded.
2425
*/
2526
public fun <T : Any> JFlow.Publisher<T>.asFlow(): Flow<T> =
26-
FlowAdapters.toPublisher(this).asFlow()
27+
FlowAdapters.toPublisher(this).asFlow()
2728

2829
/**
29-
* Transforms the given flow to a reactive specification compliant [Publisher].
30+
* Transforms the given flow into a reactive specification compliant [Flow Publisher][JFlow.Publisher].
3031
*
31-
* An optional [context] can be specified to control the execution context of calls to [Subscriber] methods.
32-
* You can set a [CoroutineDispatcher] to confine them to a specific thread and/or various [ThreadContextElement] to
32+
* An optional [context] can be specified to control the execution context of calls to the [Flow Subscriber][Subscriber]
33+
* methods.
34+
* A [CoroutineDispatcher] can be set to confine them to a specific thread; various [ThreadContextElement] can be set to
3335
* inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
3436
* is used, so calls are performed from an arbitrary thread.
3537
*/
3638
@JvmOverloads // binary compatibility
37-
public fun <T : Any> Flow<T>.asPublisher(context: CoroutineContext = EmptyCoroutineContext): JFlow.Publisher<T> {
38-
val reactivePublisher : org.reactivestreams.Publisher<T> = this.asPublisher<T>(context)
39-
return FlowAdapters.toFlowPublisher(reactivePublisher)
40-
}
39+
public fun <T : Any> Flow<T>.asPublisher(context: CoroutineContext = EmptyCoroutineContext): JFlow.Publisher<T> =
40+
FlowAdapters.toFlowPublisher(asReactivePublisher(context))
4141

4242
/**
43-
* Subscribes to this [Publisher] and performs the specified action for each received element.
44-
* Cancels subscription if any exception happens during collect.
43+
* Subscribes to this [Flow Publisher][JFlow.Publisher] and performs the specified action for each received element.
44+
*
45+
* If [action] throws an exception at some point, the subscription is cancelled, and the exception is rethrown from
46+
* [collect]. Also, if the publisher signals an error, that error is rethrown from [collect].
4547
*/
4648
public suspend inline fun <T> JFlow.Publisher<T>.collect(action: (T) -> Unit): Unit =
4749
FlowAdapters.toPublisher(this).collect(action)

0 commit comments

Comments
 (0)