Skip to content

Commit a77d713

Browse files
committed
Add Flow.onStart, support emit in onCompletion
* All "emitting" operators (onStart, transform, onCompletion) are moved to Emitter * All transformations in Transform.kt are rewritten via (unsafe) transforms, but all of (safe) transform, onStart and onCompletion operators collectors are safe. * Added migration for startWith and concatWith. * Consistent docs for all migration functions. * JvmMultifileClass for Migration.kt so that renamed/deprecated functions (when they moved there) continue to resolve. Fixes #1168
1 parent d100a3f commit a77d713

File tree

7 files changed

+417
-130
lines changed

7 files changed

+417
-130
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt

+4-1
Original file line numberDiff line numberDiff line change
@@ -865,10 +865,12 @@ public final class kotlinx/coroutines/flow/FlowKt {
865865
public static final fun launchIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;)Lkotlinx/coroutines/Job;
866866
public static final fun map (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
867867
public static final fun mapNotNull (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
868-
public static final fun onCompletion (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
868+
public static final synthetic fun onCompletion (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
869+
public static final fun onCompletion (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
869870
public static final fun onEach (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
870871
public static final fun onErrorCollect (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
871872
public static synthetic fun onErrorCollect$default (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
873+
public static final fun onStart (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
872874
public static final fun produceIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;)Lkotlinx/coroutines/channels/ReceiveChannel;
873875
public static final fun reduce (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
874876
public static final synthetic fun retry (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
@@ -891,6 +893,7 @@ public final class kotlinx/coroutines/flow/FlowKt {
891893
public static synthetic fun toSet$default (Lkotlinx/coroutines/flow/Flow;Ljava/util/Set;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
892894
public static final fun transform (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
893895
public static final fun unsafeFlow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
896+
public static final fun unsafeTransform (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
894897
public static final fun zip (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
895898
}
896899

kotlinx-coroutines-core/common/src/flow/Migration.kt

+136-31
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,26 @@
11
/*
22
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33
*/
4+
5+
@file:JvmMultifileClass
6+
@file:JvmName("FlowKt")
47
@file:Suppress("unused", "DeprecatedCallableAddReplaceWith", "UNUSED_PARAMETER")
8+
59
package kotlinx.coroutines.flow
610

711
import kotlin.coroutines.*
12+
import kotlin.jvm.*
813

914
/**
15+
* **GENERAL NOTE**
16+
*
1017
* These deprecations are added to improve user experience when they will start to
1118
* search for their favourite operators and/or patterns that are missing or renamed in Flow.
19+
* Deprecated functions also are moved here when they renamed. The difference is that they have
20+
* a body with their implementation while pure stubs have [noImpl].
1221
*/
22+
private fun noImpl(): Nothing =
23+
throw UnsupportedOperationException("Not implemented, should not be called")
1324

1425
/**
1526
* `observeOn` has no direct match in [Flow] API because all terminal flow operators are suspending and
@@ -33,7 +44,7 @@ import kotlin.coroutines.*
3344
* @suppress
3445
*/
3546
@Deprecated(message = "Collect flow in the desired context instead", level = DeprecationLevel.ERROR)
36-
public fun <T> Flow<T>.observeOn(context: CoroutineContext): Flow<T> = error("Should not be called")
47+
public fun <T> Flow<T>.observeOn(context: CoroutineContext): Flow<T> = noImpl()
3748

3849
/**
3950
* `publishOn` has no direct match in [Flow] API because all terminal flow operators are suspending and
@@ -57,7 +68,7 @@ public fun <T> Flow<T>.observeOn(context: CoroutineContext): Flow<T> = error("Sh
5768
* @suppress
5869
*/
5970
@Deprecated(message = "Collect flow in the desired context instead", level = DeprecationLevel.ERROR)
60-
public fun <T> Flow<T>.publishOn(context: CoroutineContext): Flow<T> = error("Should not be called")
71+
public fun <T> Flow<T>.publishOn(context: CoroutineContext): Flow<T> = noImpl()
6172

6273
/**
6374
* `subscribeOn` has no direct match in [Flow] API because [Flow] preserves its context and does not leak it.
@@ -86,44 +97,62 @@ public fun <T> Flow<T>.publishOn(context: CoroutineContext): Flow<T> = error("Sh
8697
* @suppress
8798
*/
8899
@Deprecated(message = "Use flowOn instead", level = DeprecationLevel.ERROR)
89-
public fun <T> Flow<T>.subscribeOn(context: CoroutineContext): Flow<T> = error("Should not be called")
100+
public fun <T> Flow<T>.subscribeOn(context: CoroutineContext): Flow<T> = noImpl()
90101

91-
/** @suppress **/
102+
/**
103+
* Use [BroadcastChannel][kotlinx.coroutines.channels.BroadcastChannel].asFlow().
104+
* @suppress
105+
*/
92106
@Deprecated(message = "Use BroadcastChannel.asFlow()", level = DeprecationLevel.ERROR)
93-
public fun BehaviourSubject(): Any = error("Should not be called")
107+
public fun BehaviourSubject(): Any = noImpl()
94108

95-
/** @suppress **/
109+
/**
110+
* `ReplaySubject` is not supported. The closest analogue is buffered [BroadcastChannel][kotlinx.coroutines.channels.BroadcastChannel].
111+
* @suppress
112+
*/
96113
@Deprecated(
97114
message = "ReplaySubject is not supported. The closest analogue is buffered broadcast channel",
98115
level = DeprecationLevel.ERROR)
99-
public fun ReplaySubject(): Any = error("Should not be called")
116+
public fun ReplaySubject(): Any = noImpl()
100117

101-
/** @suppress **/
118+
/**
119+
* `PublishSubject` is not supported.
120+
* @suppress
121+
*/
102122
@Deprecated(message = "PublishSubject is not supported", level = DeprecationLevel.ERROR)
103-
public fun PublishSubject(): Any = error("Should not be called")
123+
public fun PublishSubject(): Any = noImpl()
104124

105-
/** @suppress **/
125+
/**
126+
* Flow analogue of `onErrorXxx` is [catch].
127+
* Use `catch { emitAll(fallback) }`.
128+
* @suppress
129+
*/
106130
@Deprecated(
107131
level = DeprecationLevel.ERROR,
108132
message = "Flow analogue of 'onErrorXxx' is 'catch'. Use 'catch { emitAll(fallback) }'",
109133
replaceWith = ReplaceWith("catch { emitAll(fallback) }")
110134
)
111-
public fun <T> Flow<T>.onErrorResume(fallback: Flow<T>): Flow<T> = error("Should not be called")
135+
public fun <T> Flow<T>.onErrorResume(fallback: Flow<T>): Flow<T> = noImpl()
112136

137+
/**
138+
* Flow analogue of `onErrorXxx` is [catch].
139+
* Use `catch { emitAll(fallback) }`.
140+
* @suppress
141+
*/
113142
@Deprecated(
114143
level = DeprecationLevel.ERROR,
115144
message = "Flow analogue of 'onErrorXxx' is 'catch'. Use 'catch { emitAll(fallback) }'",
116145
replaceWith = ReplaceWith("catch { emitAll(fallback) }")
117146
)
118-
public fun <T> Flow<T>.onErrorResumeNext(fallback: Flow<T>): Flow<T> = error("Should not be called")
147+
public fun <T> Flow<T>.onErrorResumeNext(fallback: Flow<T>): Flow<T> = noImpl()
119148

120149
/**
121150
* Self-explanatory, the reason of deprecation is "context preservation" property (you can read more in [Flow] documentation)
122151
* @suppress
123152
**/
124153
@Suppress("UNUSED_PARAMETER", "UNUSED", "DeprecatedCallableAddReplaceWith")
125154
@Deprecated(message = "withContext in flow body is deprecated, use flowOn instead", level = DeprecationLevel.ERROR)
126-
public fun <T, R> FlowCollector<T>.withContext(context: CoroutineContext, block: suspend () -> R): Unit = error("Should not be called")
155+
public fun <T, R> FlowCollector<T>.withContext(context: CoroutineContext, block: suspend () -> R): Unit = noImpl()
127156

128157
/**
129158
* `subscribe` is Rx-specific API that has no direct match in flows.
@@ -153,19 +182,25 @@ public fun <T, R> FlowCollector<T>.withContext(context: CoroutineContext, block:
153182
message = "Use launchIn with onEach, onCompletion and catch operators instead",
154183
level = DeprecationLevel.ERROR
155184
)
156-
public fun <T> Flow<T>.subscribe(): Unit = error("Should not be called")
185+
public fun <T> Flow<T>.subscribe(): Unit = noImpl()
157186

158-
/** @suppress **/
187+
/**
188+
* Use [launchIn] with [onEach], [onCompletion] and [catch] operators instead.
189+
* @suppress
190+
*/
159191
@Deprecated(
160192
message = "Use launchIn with onEach, onCompletion and catch operators instead",
161193
level = DeprecationLevel.ERROR
162-
)public fun <T> Flow<T>.subscribe(onEach: suspend (T) -> Unit): Unit = error("Should not be called")
194+
)public fun <T> Flow<T>.subscribe(onEach: suspend (T) -> Unit): Unit = noImpl()
163195

164-
/** @suppress **/
196+
/**
197+
* Use [launchIn] with [onEach], [onCompletion] and [catch] operators instead.
198+
* @suppress
199+
*/
165200
@Deprecated(
166201
message = "Use launchIn with onEach, onCompletion and catch operators instead",
167202
level = DeprecationLevel.ERROR
168-
)public fun <T> Flow<T>.subscribe(onEach: suspend (T) -> Unit, onError: suspend (Throwable) -> Unit): Unit = error("Should not be called")
203+
)public fun <T> Flow<T>.subscribe(onEach: suspend (T) -> Unit, onError: suspend (Throwable) -> Unit): Unit = noImpl()
169204

170205
/**
171206
* Note that this replacement is sequential (`concat`) by default.
@@ -177,15 +212,18 @@ public fun <T> Flow<T>.subscribe(): Unit = error("Should not be called")
177212
message = "Flow analogue is named flatMapConcat",
178213
replaceWith = ReplaceWith("flatMapConcat(mapper)")
179214
)
180-
public fun <T, R> Flow<T>.flatMap(mapper: suspend (T) -> Flow<R>): Flow<R> = error("Should not be called")
215+
public fun <T, R> Flow<T>.flatMap(mapper: suspend (T) -> Flow<R>): Flow<R> = noImpl()
181216

182-
/** @suppress **/
217+
/**
218+
* Flow analogue of `concatMap` is [flatMapConcat].
219+
* @suppress
220+
*/
183221
@Deprecated(
184222
level = DeprecationLevel.ERROR,
185-
message = "Flow analogue is named flatMapConcat",
223+
message = "Flow analogue of 'concatMap' is 'flatMapConcat'",
186224
replaceWith = ReplaceWith("flatMapConcat(mapper)")
187225
)
188-
public fun <T, R> Flow<T>.concatMap(mapper: (T) -> Flow<R>): Flow<R> = error("Should not be called")
226+
public fun <T, R> Flow<T>.concatMap(mapper: (T) -> Flow<R>): Flow<R> = noImpl()
189227

190228
/**
191229
* Note that this replacement is sequential (`concat`) by default.
@@ -197,15 +235,18 @@ public fun <T, R> Flow<T>.concatMap(mapper: (T) -> Flow<R>): Flow<R> = error("Sh
197235
message = "Flow analogue of 'merge' is 'flattenConcat'",
198236
replaceWith = ReplaceWith("flattenConcat()")
199237
)
200-
public fun <T> Flow<Flow<T>>.merge(): Flow<T> = error("Should not be called")
238+
public fun <T> Flow<Flow<T>>.merge(): Flow<T> = noImpl()
201239

202-
/** @suppress **/
240+
/**
241+
* Flow analogue of `flatten` is [flattenConcat].
242+
* @suppress
243+
*/
203244
@Deprecated(
204245
level = DeprecationLevel.ERROR,
205246
message = "Flow analogue of 'flatten' is 'flattenConcat'",
206247
replaceWith = ReplaceWith("flattenConcat()")
207248
)
208-
public fun <T> Flow<Flow<T>>.flatten(): Flow<T> = error("Should not be called")
249+
public fun <T> Flow<Flow<T>>.flatten(): Flow<T> = noImpl()
209250

210251
/**
211252
* Kotlin has a built-in generic mechanism for making chained calls.
@@ -218,25 +259,25 @@ public fun <T> Flow<Flow<T>>.flatten(): Flow<T> = error("Should not be called")
218259
* ```
219260
* myFlow.let(MyFlowExtensions.ignoreErrors()).collect { ... }
220261
* ```
221-
*
222262
* @suppress
223263
*/
224264
@Deprecated(
225265
level = DeprecationLevel.ERROR,
226266
message = "Flow analogue of 'compose' is 'let'",
227267
replaceWith = ReplaceWith("let(transformer)")
228268
)
229-
public fun <T, R> Flow<T>.compose(transformer: Flow<T>.() -> Flow<R>): Flow<R> = error("Should not be called")
269+
public fun <T, R> Flow<T>.compose(transformer: Flow<T>.() -> Flow<R>): Flow<R> = noImpl()
230270

231271
/**
272+
* Flow analogue of `skip` is [drop].
232273
* @suppress
233274
*/
234275
@Deprecated(
235276
level = DeprecationLevel.ERROR,
236277
message = "Flow analogue of 'skip' is 'drop'",
237278
replaceWith = ReplaceWith("drop(count)")
238279
)
239-
public fun <T> Flow<T>.skip(count: Int): Flow<T> = error("Should not be called")
280+
public fun <T> Flow<T>.skip(count: Int): Flow<T> = noImpl()
240281

241282
/**
242283
* Flow extension to iterate over elements is [collect].
@@ -251,23 +292,38 @@ public fun <T> Flow<T>.skip(count: Int): Flow<T> = error("Should not be called")
251292
message = "Flow analogue of 'forEach' is 'collect'",
252293
replaceWith = ReplaceWith("collect(block)")
253294
)
254-
public fun <T> Flow<T>.forEach(action: suspend (value: T) -> Unit): Unit = error("Should not be called")
295+
public fun <T> Flow<T>.forEach(action: suspend (value: T) -> Unit): Unit = noImpl()
255296

297+
/**
298+
* Flow has less verbose [scan] shortcut.
299+
* @suppress
300+
*/
256301
@Deprecated(
257302
level = DeprecationLevel.ERROR,
258303
message = "Flow has less verbose 'scan' shortcut",
259304
replaceWith = ReplaceWith("scan(initial, operation)")
260305
)
261-
public fun <T, R> Flow<T>.scanFold(initial: R, @BuilderInference operation: suspend (accumulator: R, value: T) -> R): Flow<R> = error("Should not be called")
306+
public fun <T, R> Flow<T>.scanFold(initial: R, @BuilderInference operation: suspend (accumulator: R, value: T) -> R): Flow<R> =
307+
noImpl()
262308

309+
/**
310+
* Flow analogue of `onErrorXxx` is [catch].
311+
* Use `catch { emit(fallback) }`.
312+
* @suppress
313+
*/
263314
@Deprecated(
264315
level = DeprecationLevel.ERROR,
265316
message = "Flow analogue of 'onErrorXxx' is 'catch'. Use 'catch { emit(fallback) }'",
266317
replaceWith = ReplaceWith("catch { emit(fallback) }")
267318
)
268319
// Note: this version without predicate gives better "replaceWith" action
269-
public fun <T> Flow<T>.onErrorReturn(fallback: T): Flow<T> = error("Should not be called")
320+
public fun <T> Flow<T>.onErrorReturn(fallback: T): Flow<T> = noImpl()
270321

322+
/**
323+
* Flow analogue of `onErrorXxx` is [catch].
324+
* Use `catch { e -> if (predicate(e)) emit(fallback) else throw e }`.
325+
* @suppress
326+
*/
271327
@Deprecated(
272328
level = DeprecationLevel.ERROR,
273329
message = "Flow analogue of 'onErrorXxx' is 'catch'. Use 'catch { e -> if (predicate(e)) emit(fallback) else throw e }'",
@@ -279,3 +335,52 @@ public fun <T> Flow<T>.onErrorReturn(fallback: T, predicate: (Throwable) -> Bool
279335
if (!predicate(e)) throw e
280336
emit(fallback)
281337
}
338+
339+
/**
340+
* Flow analogue of `startWith` is [onStart].
341+
* Use `onStart { emit(value) }`.
342+
* @suppress
343+
*/
344+
@Deprecated(
345+
level = DeprecationLevel.ERROR,
346+
message = "Flow analogue of 'startWith' is 'onStart'. Use 'onStart { emit(value) }'",
347+
replaceWith = ReplaceWith("onStart { emit(value) }")
348+
)
349+
public fun <T> Flow<T>.startWith(value: T): Flow<T> = noImpl()
350+
351+
/**
352+
* Flow analogue of `startWith` is [onStart].
353+
* Use `onStart { emitAll(other) }`.
354+
* @suppress
355+
*/
356+
@Deprecated(
357+
level = DeprecationLevel.ERROR,
358+
message = "Flow analogue of 'startWith' is 'onStart'. Use 'onStart { emitAll(other) }'",
359+
replaceWith = ReplaceWith("onStart { emitAll(other) }")
360+
)
361+
public fun <T> Flow<T>.startWith(other: Flow<T>): Flow<T> = noImpl()
362+
363+
/**
364+
* Flow analogue of `concatWith` is [onCompletion].
365+
* Use `onCompletion { emit(value) }`.
366+
* @suppress
367+
*/
368+
@Deprecated(
369+
level = DeprecationLevel.ERROR,
370+
message = "Flow analogue of 'concatWith' is 'onCompletion'. Use 'onCompletion { emit(value) }'",
371+
replaceWith = ReplaceWith("onCompletion { emit(value) }")
372+
)
373+
public fun <T> Flow<T>.concatWith(value: T): Flow<T> = noImpl()
374+
375+
/**
376+
* Flow analogue of `concatWith` is [onCompletion].
377+
* Use `onCompletion { emitAll(other) }`.
378+
* @suppress
379+
*/
380+
@Deprecated(
381+
level = DeprecationLevel.ERROR,
382+
message = "Flow analogue of 'concatWith' is 'onCompletion'. Use 'onCompletion { emitAll(other) }'",
383+
replaceWith = ReplaceWith("onCompletion { emitAkk(other) }")
384+
)
385+
public fun <T> Flow<T>.concatWith(other: Flow<T>): Flow<T> = noImpl()
386+

0 commit comments

Comments
 (0)