Skip to content

Commit f44942a

Browse files
qwwdfsadelizarov
authored andcommitted
Deprecate flowWith operator
1 parent daf8502 commit f44942a

File tree

10 files changed

+34
-84
lines changed

10 files changed

+34
-84
lines changed

kotlinx-coroutines-core/common/src/flow/Flow.kt

+2-3
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,8 @@ import kotlinx.coroutines.*
3030
* The flow has a context preservation property: it encapsulates its own execution context and never propagates or leaks it downstream, thus making
3131
* reasoning about the execution context of particular transformations or terminal operations trivial.
3232
*
33-
* There are two ways to change the context of a flow: [flowOn][Flow.flowOn] and [flowWith][Flow.flowWith].
34-
* The former changes the upstream context ("everything above the flowOn operator") while the latter
35-
* changes the context of the flow within [flowWith] body. For additional information refer to these operators' documentation.
33+
* There is the only way to change the context of a flow: [flowOn][Flow.flowOn] operator,
34+
* that changes the upstream context ("everything above the flowOn operator"). For additional information refer to its documentation.
3635
*
3736
* This reasoning can be demonstrated in practice:
3837
* ```

kotlinx-coroutines-core/common/src/flow/Migration.kt

+3-18
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public fun <T> Flow<T>.publishOn(context: CoroutineContext): Flow<T> = error("Sh
7171
* .doOnEach { value -> println("Processing $value in computation")
7272
* .subscribe()
7373
* ```
74-
* has the following Flow equivalents:
74+
* has the following Flow equivalent:
7575
* ```
7676
* withContext(Dispatchers.Default) {
7777
* flow
@@ -82,25 +82,10 @@ public fun <T> Flow<T>.publishOn(context: CoroutineContext): Flow<T> = error("Sh
8282
* }
8383
* }
8484
* ```
85-
* or
86-
*
87-
* ```
88-
* withContext(Dispatchers.Default) {
89-
* flow
90-
* .flowWith(Dispatchers.IO) { map { value -> println("Doing map in IO"); value } }
91-
* .collect { value ->
92-
* println("Processing $value in computation")
93-
* }
94-
* }
95-
* ```
96-
*
97-
* The difference is that [flowWith] encapsulates ("preserves") the context within its lambda
98-
* while [flowOn] changes the context of all preceding operators.
99-
* Opposed to subscribeOn, it it **possible** to use multiple `flowOn` operators in the one flow.
100-
*
85+
* Opposed to subscribeOn, it it **possible** to use multiple `flowOn` operators in the one flow
10186
* @suppress
10287
*/
103-
@Deprecated(message = "Use flowWith or flowOn instead", level = DeprecationLevel.ERROR)
88+
@Deprecated(message = "Use flowOn instead", level = DeprecationLevel.ERROR)
10489
public fun <T> Flow<T>.subscribeOn(context: CoroutineContext): Flow<T> = error("Should not be called")
10590

10691
/** @suppress **/

kotlinx-coroutines-core/common/src/flow/operators/Context.kt

+14-2
Original file line numberDiff line numberDiff line change
@@ -77,11 +77,23 @@ public fun <T> Flow<T>.flowOn(flowContext: CoroutineContext, bufferSize: Int = 1
7777
* For more explanation of context preservation please refer to [Flow] documentation.
7878
*
7979
* This operator uses channel of the specific [bufferSize] in order to switch between contexts,
80-
* but it is not guaranteed that channel will be created, implementation is free to optimize it away in case of fusing.
80+
* but it is not guaranteed that channel will be created, implementation is free to optimize it away in case of fusing.*
8181
*
82-
* @throws [IllegalArgumentException] if provided context contains [Job] instance.
82+
* This operator is deprecated without replacement because it was discovered that it doesn't play well with coroutines and flow semantics:
83+
* 1) It doesn't prevent context elements from the downstream to leak into its body
84+
* ```
85+
* flowOf(1).flowWith(EmptyCoroutineContext) {
86+
* onEach { println(kotlin.coroutines.coroutineContext[CoroutineName]) } // Will print 42
87+
* }.flowOn(CoroutineName(42))
88+
* ```
89+
* 2) To avoid such leaks, new primitive should be introduced to `kotlinx.coroutines` -- the subtraction of contexts.
90+
* And this will become a new concept to learn, maintain and explain.
91+
* 3) It defers the execution of declarative [builder] until the moment of [collection][Flow.collect] similarly
92+
* to `Observable.defer`. But it is unexpected because nothing in the name `flowWith` reflects this fact.
93+
* 4) It can be confused with [flowOn] operator, though [flowWith] is much rarer.
8394
*/
8495
@FlowPreview
96+
@Deprecated(message = "flowWith is deprecated without replacement, please refer to its KDoc for an explanation", level = DeprecationLevel.WARNING) // Error in beta release, removal in 1.4
8597
public fun <T, R> Flow<T>.flowWith(
8698
flowContext: CoroutineContext,
8799
bufferSize: Int = 16,

kotlinx-coroutines-core/common/test/flow/operators/CombineLatestTest.kt

+5-6
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@
55
package kotlinx.coroutines.flow
66

77
import kotlinx.coroutines.*
8-
import kotlinx.coroutines.flow.combineLatest as combineLatestOriginal
98
import kotlin.test.*
9+
import kotlinx.coroutines.flow.combineLatest as combineLatestOriginal
1010

1111
/*
1212
* Replace: { i, j -> i + j } -> { i, j -> i + j } as soon as KT-30991 is fixed
@@ -133,12 +133,11 @@ abstract class CombineLatestTestBase : TestBase() {
133133
emit(1)
134134
assertEquals("second", NamedDispatchers.name())
135135
expect(3)
136-
}.flowOn(NamedDispatchers("second")).flowWith(NamedDispatchers("with")) {
137-
onEach {
138-
assertEquals("with", NamedDispatchers.name())
136+
}.flowOn(NamedDispatchers("second"))
137+
.onEach {
138+
assertEquals("onEach", NamedDispatchers.name())
139139
expect(4)
140-
}
141-
}
140+
}.flowOn(NamedDispatchers("onEach"))
142141

143142
val value = withContext(NamedDispatchers("main")) {
144143
f1.combineLatest(f2) { i, j ->

kotlinx-coroutines-core/common/test/flow/operators/DebounceTest.kt

+1-4
Original file line numberDiff line numberDiff line change
@@ -152,12 +152,9 @@ class DebounceTest : TestBase() {
152152
emit(1)
153153
expect(2)
154154
throw TestException()
155-
}.flowWith(NamedDispatchers("unused")) {
156-
debounce(Long.MAX_VALUE).map {
155+
}.flowOn(NamedDispatchers("source")).debounce(Long.MAX_VALUE).map {
157156
expectUnreached()
158-
}
159157
}
160-
161158
assertFailsWith<TestException>(flow)
162159
finish(3)
163160
}

kotlinx-coroutines-core/common/test/flow/operators/FlatMapBaseTest.kt

+2-6
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55
package kotlinx.coroutines.flow
66

77
import kotlinx.coroutines.*
8-
import kotlinx.coroutines.channels.*
9-
import kotlinx.coroutines.flow.*
108
import kotlin.test.*
119

1210
abstract class FlatMapBaseTest : TestBase() {
@@ -75,14 +73,12 @@ abstract class FlatMapBaseTest : TestBase() {
7573
fun testIsolatedContext() = runTest {
7674
val flow = flowOf(1)
7775
.flowOn(NamedDispatchers("irrelevant"))
78-
.flowWith(NamedDispatchers("inner")) {
79-
flatMap {
76+
.flatMap {
8077
flow {
8178
assertEquals("inner", NamedDispatchers.name())
8279
emit(it)
8380
}
84-
}
85-
}.flowOn(NamedDispatchers("irrelevant"))
81+
}.flowOn(NamedDispatchers("inner"))
8682
.flatMap {
8783
flow {
8884
assertEquals("outer", NamedDispatchers.name())

kotlinx-coroutines-core/common/test/flow/operators/FlowContextTest.kt

+1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import kotlinx.coroutines.channels.*
99
import kotlin.coroutines.*
1010
import kotlin.test.*
1111

12+
@Suppress("DEPRECATION")
1213
class FlowContextTest : TestBase() {
1314

1415
private val captured = ArrayList<String>()

kotlinx-coroutines-core/common/test/flow/operators/FlowWithTest.kt

+1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import kotlinx.coroutines.*
88
import kotlinx.coroutines.channels.*
99
import kotlin.test.*
1010

11+
@Suppress("DEPRECATION")
1112
class FlowWithTest : TestBase() {
1213

1314
private fun mapper(name: String, index: Int): suspend (Int) -> Int = {

kotlinx-coroutines-core/common/test/flow/operators/SampleTest.kt

+2-4
Original file line numberDiff line numberDiff line change
@@ -227,10 +227,8 @@ class SampleTest : TestBase() {
227227
emit(1)
228228
expect(2)
229229
throw TestException()
230-
}.flowWith(NamedDispatchers("unused")) {
231-
sample(Long.MAX_VALUE).map {
232-
expectUnreached()
233-
}
230+
}.flowOn(NamedDispatchers("unused")).sample(Long.MAX_VALUE).map {
231+
expectUnreached()
234232
}
235233

236234
assertFailsWith<TestException>(flow)

kotlinx-coroutines-core/common/test/flow/operators/ZipTest.kt

+3-41
Original file line numberDiff line numberDiff line change
@@ -112,52 +112,15 @@ class ZipTest : TestBase() {
112112
}
113113

114114
@Test
115-
fun testContextIsIsolated() = runTest {
115+
fun testContextIsIsolatedReversed() = runTest {
116116
val f1 = flow {
117117
emit("a")
118118
assertEquals("first", NamedDispatchers.name())
119119
expect(1)
120120
}.flowOn(NamedDispatchers("first")).onEach {
121-
assertEquals("nested", NamedDispatchers.name())
121+
assertEquals("with", NamedDispatchers.name())
122122
expect(2)
123-
}.flowOn(NamedDispatchers("nested"))
124-
125-
val f2 = flow {
126-
emit(1)
127-
assertEquals("second", NamedDispatchers.name())
128-
expect(3)
129-
}.flowOn(NamedDispatchers("second")).flowWith(NamedDispatchers("with")) {
130-
onEach {
131-
assertEquals("with", NamedDispatchers.name())
132-
expect(4)
133-
}
134-
}
135-
136-
val value = withContext(NamedDispatchers("main")) {
137-
f1.zip(f2) { i, j ->
138-
assertEquals("main", NamedDispatchers.name())
139-
expect(5)
140-
i + j
141-
}.single()
142-
}
143-
144-
assertEquals("a1", value)
145-
finish(6)
146-
}
147-
148-
@Test
149-
fun testContextIsIsolatedReversed() = runTest {
150-
val f1 = flow {
151-
emit("a")
152-
assertEquals("first", NamedDispatchers.name())
153-
expect(1)
154-
}.flowOn(NamedDispatchers("first"))
155-
.flowWith(NamedDispatchers("with")) {
156-
onEach {
157-
assertEquals("with", NamedDispatchers.name())
158-
expect(2)
159-
}
160-
}
123+
}.flowOn(NamedDispatchers("with"))
161124

162125
val f2 = flow {
163126
emit(1)
@@ -180,7 +143,6 @@ class ZipTest : TestBase() {
180143
finish(6)
181144
}
182145

183-
184146
@Test
185147
fun testErrorInDownstreamCancelsUpstream() = runTest {
186148
val f1 = flow {

0 commit comments

Comments
 (0)