@@ -7,41 +7,43 @@ package kotlinx.coroutines.jdk9
7
7
import kotlinx.coroutines.*
8
8
import kotlinx.coroutines.flow.*
9
9
import kotlinx.coroutines.reactive.asFlow
10
- import kotlinx.coroutines.reactive.asPublisher
10
+ import kotlinx.coroutines.reactive.asPublisher as asReactivePublisher
11
11
import kotlinx.coroutines.reactive.collect
12
+ import kotlinx.coroutines.channels.*
12
13
import org.reactivestreams.*
13
14
import kotlin.coroutines.*
14
15
import java.util.concurrent.Flow as JFlow
15
16
16
17
/* *
17
- * Transforms the given reactive [Publisher] into [Flow].
18
- * Use [buffer] operator on the resulting flow to specify the size of the backpressure .
19
- * More precisely , it specifies the value of the subscription's [request][Subscription.request].
20
- * [ buffer] default capacity is used by default.
18
+ * Transforms the given reactive [Flow Publisher][JFlow. Publisher] into [Flow].
19
+ * Use the [buffer] operator on the resulting flow to specify the size of the back-pressure .
20
+ * In effect , it specifies the value of the subscription's [request][JFlow. Subscription.request].
21
+ * The [default buffer capacity][Channel.BUFFERED] for a suspending channel is used by default.
21
22
*
22
- * If any of the resulting flow transformations fails, subscription is immediately cancelled and all in-flight elements
23
- * are discarded.
23
+ * If any of the resulting flow transformations fails, the subscription is immediately cancelled and all the in-flight
24
+ * elements are discarded.
24
25
*/
25
26
public fun <T : Any > JFlow.Publisher<T>.asFlow (): Flow <T > =
26
- FlowAdapters .toPublisher(this ).asFlow()
27
+ FlowAdapters .toPublisher(this ).asFlow()
27
28
28
29
/* *
29
- * Transforms the given flow to a reactive specification compliant [Publisher].
30
+ * Transforms the given flow into a reactive specification compliant [Flow Publisher][JFlow. Publisher].
30
31
*
31
- * An optional [context] can be specified to control the execution context of calls to [Subscriber] methods.
32
- * You can set a [CoroutineDispatcher] to confine them to a specific thread and/or various [ThreadContextElement] to
32
+ * An optional [context] can be specified to control the execution context of calls to the [Flow Subscriber][Subscriber]
33
+ * methods.
34
+ * A [CoroutineDispatcher] can be set to confine them to a specific thread; various [ThreadContextElement] can be set to
33
35
* inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
34
36
* is used, so calls are performed from an arbitrary thread.
35
37
*/
36
38
@JvmOverloads // binary compatibility
37
- public fun <T : Any > Flow<T>.asPublisher (context : CoroutineContext = EmptyCoroutineContext ): JFlow .Publisher <T > {
38
- val reactivePublisher : org.reactivestreams.Publisher <T > = this .asPublisher<T >(context)
39
- return FlowAdapters .toFlowPublisher(reactivePublisher)
40
- }
39
+ public fun <T : Any > Flow<T>.asPublisher (context : CoroutineContext = EmptyCoroutineContext ): JFlow .Publisher <T > =
40
+ FlowAdapters .toFlowPublisher(asReactivePublisher(context))
41
41
42
42
/* *
43
- * Subscribes to this [Publisher] and performs the specified action for each received element.
44
- * Cancels subscription if any exception happens during collect.
43
+ * Subscribes to this [Flow Publisher][JFlow.Publisher] and performs the specified action for each received element.
44
+ *
45
+ * If [action] throws an exception at some point, the subscription is cancelled, and the exception is rethrown from
46
+ * [collect]. Also, if the publisher signals an error, that error is rethrown from [collect].
45
47
*/
46
48
public suspend inline fun <T > JFlow.Publisher<T>.collect (action : (T ) -> Unit ): Unit =
47
49
FlowAdapters .toPublisher(this ).collect(action)
0 commit comments