Skip to content

Commit d3cc25f

Browse files
committed
Rework purity concept:
* Purity is a way too polluted word and it is hard to tell what it means in Flow * Rename "purity" to "context preservation" * Prohibit changing the context, ignore only Job and CoroutineId (for debug mode) in SafeCollector * Reword documentation, add more samples * Add explanation for deprecated Rx-like methods
1 parent 8788488 commit d3cc25f

File tree

12 files changed

+255
-67
lines changed

12 files changed

+255
-67
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt

+1-1
Original file line numberDiff line numberDiff line change
@@ -864,7 +864,7 @@ public final class kotlinx/coroutines/flow/MigrationKt {
864864
}
865865

866866
public final class kotlinx/coroutines/flow/internal/SafeCollector : kotlinx/coroutines/flow/FlowCollector {
867-
public fun <init> (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/ContinuationInterceptor;)V
867+
public fun <init> (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/CoroutineContext;)V
868868
public fun emit (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
869869
}
870870

kotlinx-coroutines-core/common/src/CoroutineContext.common.kt

+2-1
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,5 @@ internal expect val DefaultDelay: Delay
2020
// countOrElement -- pre-cached value for ThreadContext.kt
2121
internal expect inline fun <T> withCoroutineContext(context: CoroutineContext, countOrElement: Any?, block: () -> T): T
2222
internal expect fun Continuation<*>.toDebugString(): String
23-
internal expect val CoroutineContext.coroutineName: String?
23+
internal expect val CoroutineContext.coroutineName: String?
24+
internal expect fun CoroutineContext.minusId(): CoroutineContext

kotlinx-coroutines-core/common/src/flow/Builders.kt

+15-15
Original file line numberDiff line numberDiff line change
@@ -19,26 +19,26 @@ import kotlin.jvm.*
1919
* Example of usage:
2020
* ```
2121
* fun fibonacci(): Flow<Long> = flow {
22-
* emit(1L)
23-
* var f1 = 1L
24-
* var f2 = 1L
25-
* repeat(100) {
26-
* var tmp = f1
27-
* f1 = f2
28-
* f2 += tmp
29-
* emit(f1)
30-
* }
22+
* emit(1L)
23+
* var f1 = 1L
24+
* var f2 = 1L
25+
* repeat(100) {
26+
* var tmp = f1
27+
* f1 = f2
28+
* f2 += tmp
29+
* emit(f1)
30+
* }
3131
* }
3232
* ```
3333
*
34-
* `emit` should happen strictly in the dispatchers of the [block] in order to preserve flow purity.
34+
* `emit` should happen strictly in the dispatchers of the [block] in order to preserve flow context.
3535
* For example, the following code will produce [IllegalStateException]:
3636
* ```
3737
* flow {
38-
* emit(1) // Ok
39-
* withContext(Dispatcher.IO) {
40-
* emit(2) // Will fail with ISE
41-
* }
38+
* emit(1) // Ok
39+
* withContext(Dispatcher.IO) {
40+
* emit(2) // Will fail with ISE
41+
* }
4242
* }
4343
* ```
4444
* If you want to switch the context where this flow is executed use [flowOn] operator.
@@ -47,7 +47,7 @@ import kotlin.jvm.*
4747
public fun <T> flow(@BuilderInference block: suspend FlowCollector<in T>.() -> Unit): Flow<T> {
4848
return object : Flow<T> {
4949
override suspend fun collect(collector: FlowCollector<in T>) {
50-
SafeCollector(collector, coroutineContext[ContinuationInterceptor]).block()
50+
SafeCollector(collector, coroutineContext).block()
5151
}
5252
}
5353
}

kotlinx-coroutines-core/common/src/flow/Flow.kt

+37-15
Original file line numberDiff line numberDiff line change
@@ -7,30 +7,29 @@ package kotlinx.coroutines.flow
77
import kotlinx.coroutines.*
88

99
/**
10-
* A cold asynchronous stream of the data, that emits from zero to N (where N can be unbounded)
11-
* values and completes normally or with an exception.
10+
* A cold asynchronous stream of the data, that emits from zero to N (where N can be unbounded) values and completes normally or with an exception.
1211
*
13-
* All transformations on the flow, such as [map] and [filter] do not trigger flow collection or execution, only
14-
* terminal operators (e.g. [single])do trigger it.
12+
* All transformations on the flow, such as [map] and [filter] do not trigger flow collection or execution, only terminal operators (e.g. [single]) do trigger it.
1513
*
16-
* Flow can be collected in a suspending manner, without actual blocking using [collect] extension that will complete normally or exceptionally:
14+
* Flow can be collected in a suspending manner, without actual blocking, using [collect] extension that will complete normally or exceptionally:
1715
* ```
1816
* try {
19-
* flow.collect { value ->
20-
* println("Received $value")
21-
* }
17+
* flow.collect { value ->
18+
* println("Received $value")
19+
* }
2220
* } catch (e: Exception) {
23-
* println("Flow has thrown an exception: $e")
21+
* println("Flow has thrown an exception: $e")
2422
* }
2523
* ```
2624
* Additionally, the library provides a rich set of terminal operators such as [single], [reduce] and others.
2725
*
2826
* Flow does not carry information whether it is a cold stream (that can be collected multiple times and
29-
* triggers its evaluation every time collection is executed) or hot one, but conventionally flow represents a cold stream.
30-
* Transitions between hot and cold streams are supported via channels and corresponding API: [flowViaChannel], [broadcastIn], [produceIn].
27+
* triggers its evaluation every time [collect] is executed) or a hot one, but conventionally flow represents a cold stream.
28+
* Transitions between hot and cold streams are supported via channels and the corresponding API: [flowViaChannel], [broadcastIn], [produceIn].
3129
*
32-
* Flow is a **pure** concept: it encapsulates its own execution context and never propagates it to the downstream, thus making
30+
* Flow has a context preserving property: it encapsulates its own execution context and never propagates or leaks it to the downstream, thus making
3331
* reasoning about execution context of particular transformations or terminal operations trivial.
32+
*
3433
* There are two ways of changing the flow's context: [flowOn][Flow.flowOn] and [flowWith][Flow.flowWith].
3534
* The former changes the upstream context ("everything above the flowOn operator") while the latter
3635
* changes the context of the flow within [flowWith] body. For additional information refer to these operators documentation.
@@ -41,10 +40,10 @@ import kotlinx.coroutines.*
4140
* .map { it + 1 } // Will be executed in ctx_1
4241
* .flowOn(ctx_1) // Changes upstream context: flowOf and map
4342
*
44-
* // Now we have flow that is pure: it is executed somewhere but this information is encapsulated in the flow itself
43+
* // Now we have flow that is context-preserving: it is executed somewhere but this information is encapsulated in the flow itself
4544
*
46-
* val filtered = flow
47-
* .filter { it == 3 } // Pure operator without a context
45+
* val filtered = flow // ctx_1 is inaccessible
46+
* .filter { it == 3 } // Pure operator without a context yet
4847
*
4948
* withContext(Dispatchers.Main) {
5049
* // All not encapsulated operators will be executed in Main: filter and single
@@ -53,6 +52,29 @@ import kotlinx.coroutines.*
5352
* }
5453
* ```
5554
*
55+
* From the implementation point of view it means that all intermediate operators on [Flow] should use the following constraint:
56+
* If one wants to separate collection or emission into multiple coroutines, it should use [coroutineScope] or [supervisorScope] and
57+
* is not allowed to modify coroutines context:
58+
* ```
59+
* fun <T> Flow<T>.buffer(bufferSize: Int): Flow<T> = flow {
60+
* coroutineScope { // coroutine scope is necessary, withContext is prohibited
61+
* val channel = Channel<T>(bufferSize)
62+
* // GlobalScope.launch { is prohibited
63+
* // launch(Dispatchers.IO) { is prohibited
64+
* launch { // is OK
65+
* collect { value ->
66+
* channel.send(value)
67+
* }
68+
* channel.close()
69+
* }
70+
*
71+
* for (i in channel) {
72+
* emit(i)
73+
* }
74+
* }
75+
* }
76+
* ```
77+
*
5678
* Flow is [Reactive Streams](http://www.reactive-streams.org/) compliant, you can safely interop it with reactive streams using [Flow.asPublisher] and [Publisher.asFlow] from
5779
* kotlinx-coroutines-reactive module.
5880
*/

kotlinx-coroutines-core/common/src/flow/Migration.kt

+117-13
Original file line numberDiff line numberDiff line change
@@ -11,18 +11,98 @@ import kotlin.coroutines.*
1111
* search for their favourite operators and/or patterns that are missing or renamed in Flow.
1212
*/
1313

14-
/** @suppress **/
15-
@Deprecated(message = "Use flowWith or flowOn instead", level = DeprecationLevel.ERROR)
16-
public fun <T> Flow<T>.subscribeOn(context: CoroutineContext): Flow<T> = error("Should not be called")
17-
18-
/** @suppress **/
19-
@Deprecated(message = "Use flowWith or flowOn instead", level = DeprecationLevel.ERROR)
14+
/**
15+
* `observeOn` has no direct match in [Flow] API because all terminal flow operators are suspending and
16+
* thus use the context of the caller.
17+
*
18+
* For example, the following code:
19+
* ```
20+
* flowable
21+
* .observeOn(Schedulers.io())
22+
* .doOnEach { value -> println("Received $value") }
23+
* .subscribe()
24+
* ```
25+
*
26+
* has the following Flow equivalent:
27+
* ```
28+
* withContext(Dispatchers.IO) {
29+
* flow.collect { value -> println("Received $value") }
30+
* }
31+
*
32+
* ```
33+
* @suppress
34+
*/
35+
@Deprecated(message = "Collect flow in the desired context instead", level = DeprecationLevel.ERROR)
2036
public fun <T> Flow<T>.observeOn(context: CoroutineContext): Flow<T> = error("Should not be called")
2137

22-
/** @suppress **/
23-
@Deprecated(message = "Use flowWith or flowOn instead", level = DeprecationLevel.ERROR)
38+
/**
39+
* `publishOn` has no direct match in [Flow] API because all terminal flow operators are suspending and
40+
* thus use the context of the caller.
41+
*
42+
* For example, the following code:
43+
* ```
44+
* flux
45+
* .publishOn(Schedulers.io())
46+
* .doOnEach { value -> println("Received $value") }
47+
* .subscribe()
48+
* ```
49+
*
50+
* has the following Flow equivalent:
51+
* ```
52+
* withContext(Dispatchers.IO) {
53+
* flow.collect { value -> println("Received $value") }
54+
* }
55+
*
56+
* ```
57+
* @suppress
58+
*/
59+
@Deprecated(message = "Collect flow in the desired context instead", level = DeprecationLevel.ERROR)
2460
public fun <T> Flow<T>.publishOn(context: CoroutineContext): Flow<T> = error("Should not be called")
2561

62+
/**
63+
* `subscribeOn` has no direct match in [Flow] API because [Flow] preserves its context and does not leak it.
64+
*
65+
* For example, the following code:
66+
* ```
67+
* flowable
68+
* .map { value -> println("Doing map in IO"); value }
69+
* .subscribeOn(Schedulers.io())
70+
* .observeOn(Schedulers.computation())
71+
* .doOnEach { value -> println("Processing $value in computation")
72+
* .subscribe()
73+
* ```
74+
* has the following Flow equivalents:
75+
* ```
76+
* withContext(Dispatchers.Default) {
77+
* flow
78+
* .map { value -> println("Doing map in IO"); value }
79+
* .flowOn(Dispatchers.IO) // Works upstream, doesn't change downstream
80+
* .collect { value ->
81+
* println("Processing $value in computation")
82+
* }
83+
* }
84+
* ```
85+
* or
86+
*
87+
* ```
88+
* withContext(Dispatchers.Default) {
89+
* flow
90+
* .flowWith(Dispatchers.IO) { map { value -> println("Doing map in IO"); value } }
91+
* .collect { value ->
92+
* println("Processing $value in computation")
93+
* }
94+
* }
95+
* ```
96+
*
97+
* The difference is that [flowWith] encapsulates ("preserves") the context within its lambda
98+
* while [flowOn] changes the context of all preceding operators.
99+
* Opposed to subscribeOn, it it **possible** to use multiple `flowOn` operators in the one flow.
100+
*
101+
* @suppress
102+
*/
103+
@Deprecated(message = "Use flowWith or flowOn instead", level = DeprecationLevel.ERROR)
104+
public fun <T> Flow<T>.subscribeOn(context: CoroutineContext): Flow<T> = error("Should not be called")
105+
26106
/** @suppress **/
27107
@Deprecated(message = "Use BroadcastChannel.asFlow()", level = DeprecationLevel.ERROR)
28108
public fun BehaviourSubject(): Any = error("Should not be called")
@@ -45,14 +125,40 @@ public fun PublishSubject(): Any = error("Should not be called")
45125
)
46126
public fun <T> Flow<T>.onErrorResume(fallback: Flow<T>): Flow<T> = error("Should not be called")
47127

48-
49-
/** @suppress **/
128+
/**
129+
* Self-explanatory, the reason of deprecation is "context preservation" property (you can read more in [Flow] documentation)
130+
* @suppress
131+
**/
50132
@Suppress("UNUSED_PARAMETER", "UNUSED", "DeprecatedCallableAddReplaceWith")
51133
@Deprecated(message = "withContext in flow body is deprecated, use flowOn instead", level = DeprecationLevel.ERROR)
52134
public fun <T, R> FlowCollector<T>.withContext(context: CoroutineContext, block: suspend () -> R): Unit = error("Should not be called")
53135

54136

55-
/** @suppress **/
137+
/**
138+
* `subscribe` is Rx-specific API that has no direct match in flows.
139+
* One can use `launch` instead, for example the following:
140+
* ```
141+
* flowable
142+
* .observeOn(Schedulers.io())
143+
* .subscribe({ println("Received $it") }, { println("Exception $it happened") }, { println("Flowable is completed successfully") }
144+
* ```
145+
*
146+
* has the following Flow equivalent:
147+
* ```
148+
* launch(Dispatchers.IO) {
149+
* try {
150+
* flow.collect { value ->
151+
* println("Received $value")
152+
* }
153+
* println("Flow is completed successfully")
154+
* } catch (e: Throwable) {
155+
* println("Exception $e happened")
156+
* }
157+
* }
158+
* ```
159+
* But most of the time it is better to use terminal operators like [single] instead of [collect].
160+
* @suppress
161+
*/
56162
@Deprecated(message = "Use launch + collect instead", level = DeprecationLevel.ERROR)
57163
public fun <T> Flow<T>.subscribe(): Unit = error("Should not be called")
58164

@@ -64,7 +170,6 @@ public fun <T> Flow<T>.subscribe(onEach: (T) -> Unit): Unit = error("Should not
64170
@Deprecated(message = "Use launch + collect instead", level = DeprecationLevel.ERROR)
65171
public fun <T> Flow<T>.subscribe(onEach: (T) -> Unit, onError: (Throwable) -> Unit): Unit = error("Should not be called")
66172

67-
68173
/**
69174
* Note that this replacement is sequential (`concat`) by default.
70175
* For concurrent flatMap [flatMapMerge] can be used instead.
@@ -85,7 +190,6 @@ public fun <T, R> Flow<T>.flatMap(mapper: suspend (T) -> Flow<R>): Flow<R> = err
85190
)
86191
public fun <T, R> Flow<T>.concatMap(mapper: (T) -> Flow<R>): Flow<R> = error("Should not be called")
87192

88-
89193
/**
90194
* Note that this replacement is sequential (`concat`) by default.
91195
* For concurrent flatMap [flattenMerge] can be used instead.

kotlinx-coroutines-core/common/src/flow/internal/SafeCollector.kt

+8-5
Original file line numberDiff line numberDiff line change
@@ -4,22 +4,25 @@
44

55
package kotlinx.coroutines.flow.internal
66

7+
import kotlinx.coroutines.*
78
import kotlinx.coroutines.flow.*
89
import kotlinx.coroutines.internal.*
910
import kotlin.coroutines.*
1011

1112
@PublishedApi
1213
internal class SafeCollector<T>(
1314
private val collector: FlowCollector<T>,
14-
private val interceptor: ContinuationInterceptor?
15+
collectContext: CoroutineContext
1516
) : FlowCollector<T>, SynchronizedObject() {
1617

18+
private val collectContext = collectContext.minusKey(Job).minusId()
19+
1720
override suspend fun emit(value: T) {
18-
if (interceptor != coroutineContext[ContinuationInterceptor]) {
21+
val emitContext = coroutineContext.minusKey(Job).minusId()
22+
if (emitContext != collectContext) {
1923
error(
20-
"Flow invariant is violated: flow was collected in $interceptor, but emission happened in ${coroutineContext[ContinuationInterceptor]}. " +
21-
"Please refer to 'flow' documentation or use 'flowOn' instead"
22-
)
24+
"Flow invariant is violated: flow was collected in $collectContext, but emission happened in $emitContext. " +
25+
"Please refer to 'flow' documentation or use 'flowOn' instead")
2326
}
2427
collector.emit(value)
2528
}

0 commit comments

Comments
 (0)