diff --git a/CHANGES.md b/CHANGES.md
index cd920aa2d2..ecf2852c4c 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -1,5 +1,33 @@
# Change log for kotlinx.coroutines
+## Version 1.3.0-RC2
+
+### Flow improvements
+* Operators for UI programming are reworked for the sake of consistency, naming scheme for operator overloads is introduced:
+ * `combineLatest` is deprecated in the favor of `combine`.
+ * `combineTransform` operator for non-trivial transformations (#1224).
+ * Top-level `combine` and `combineTransform` overloads for multiple flows (#1262).
+ * `switchMap` is deprecated. `flatMapLatest`, `mapLatest` and `transformLatest` are introduced instead (#1335).
+ * `collectLatest` terminal operator (#1269).
+
+* Improved cancellation support in `flattenMerge` (#1392).
+* `channelFlow` cancellation does not leak to the parent (#1334).
+* Fixed flow invariant enforcement for `suspend fun main` (#1421).
+* `delayEach` and `delayFlow` are deprecated (#1429).
+
+### General changes
+* Integration with Reactor context
+ * Propagation of the coroutine context of `await` calls into Mono/Flux builder.
+ * Publisher.asFlow propagates coroutine context from `collect` call to the Publisher.
+ * New `Flow.asFlux ` builder.
+
+* ServiceLoader-code is adjusted to avoid I/O on the Main thread on newer (3.6.0+) Android toolchain.
+* Stacktrace recovery support for minified builds on Android (#1416).
+* Guava version in `kotlinx-coroutines-guava` updated to `28.0`.
+* `setTimeout`-based JS dispatcher for platforms where `process` is unavailable (#1404).
+* Native, JS and common modules are added to `kotlinx-coroutines-bom`.
+* Fixed bug with ignored `acquiredPermits` in `Semaphore` (#1423).
+
## Version 1.3.0-RC
### Flow
diff --git a/README.md b/README.md
index 84083a843b..c73595a8ea 100644
--- a/README.md
+++ b/README.md
@@ -2,7 +2,7 @@
[](https://confluence.jetbrains.com/display/ALL/JetBrains+on+GitHub)
[](https://www.apache.org/licenses/LICENSE-2.0)
-[ ](https://bintray.com/kotlin/kotlinx/kotlinx.coroutines/1.3.0-RC)
+[ ](https://bintray.com/kotlin/kotlinx/kotlinx.coroutines/1.3.0-RC2)
Library support for Kotlin coroutines with [multiplatform](#multiplatform) support.
This is a companion version for Kotlin `1.3.41` release.
@@ -81,7 +81,7 @@ Add dependencies (you can also add other modules that you need):
org.jetbrains.kotlinx
kotlinx-coroutines-core
- 1.3.0-RC
+ 1.3.0-RC2
```
@@ -99,7 +99,7 @@ Add dependencies (you can also add other modules that you need):
```groovy
dependencies {
- implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.0-RC'
+ implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.0-RC2'
}
```
@@ -125,7 +125,7 @@ Add dependencies (you can also add other modules that you need):
```groovy
dependencies {
- implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.0-RC")
+ implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.0-RC2")
}
```
@@ -144,7 +144,7 @@ Make sure that you have either `jcenter()` or `mavenCentral()` in the list of re
Core modules of `kotlinx.coroutines` are also available for
[Kotlin/JS](#js) and [Kotlin/Native](#native).
In common code that should get compiled for different platforms, add dependency to
-[`kotlinx-coroutines-core-common`](https://search.maven.org/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core-common/1.3.0-RC/jar)
+[`kotlinx-coroutines-core-common`](https://search.maven.org/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core-common/1.3.0-RC2/jar)
(follow the link to get the dependency declaration snippet).
### Android
@@ -153,7 +153,7 @@ Add [`kotlinx-coroutines-android`](ui/kotlinx-coroutines-android)
module as dependency when using `kotlinx.coroutines` on Android:
```groovy
-implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.0-RC'
+implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.0-RC2'
```
This gives you access to Android [Dispatchers.Main](https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-android/kotlinx.coroutines.android/kotlinx.coroutines.-dispatchers/index.html)
@@ -172,7 +172,7 @@ R8 is a replacement for ProGuard in Android ecosystem, it is enabled by default
### JS
[Kotlin/JS](https://kotlinlang.org/docs/reference/js-overview.html) version of `kotlinx.coroutines` is published as
-[`kotlinx-coroutines-core-js`](https://search.maven.org/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core-js/1.3.0-RC/jar)
+[`kotlinx-coroutines-core-js`](https://search.maven.org/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core-js/1.3.0-RC2/jar)
(follow the link to get the dependency declaration snippet).
You can also use [`kotlinx-coroutines-core`](https://www.npmjs.com/package/kotlinx-coroutines-core) package via NPM.
@@ -180,7 +180,7 @@ You can also use [`kotlinx-coroutines-core`](https://www.npmjs.com/package/kotli
### Native
[Kotlin/Native](https://kotlinlang.org/docs/reference/native-overview.html) version of `kotlinx.coroutines` is published as
-[`kotlinx-coroutines-core-native`](https://search.maven.org/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core-native/1.3.0-RC/jar)
+[`kotlinx-coroutines-core-native`](https://search.maven.org/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core-native/1.3.0-RC2/jar)
(follow the link to get the dependency declaration snippet).
Only single-threaded code (JS-style) on Kotlin/Native is currently supported.
diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt
index 3e20e88bba..a277169065 100644
--- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt
+++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt
@@ -826,9 +826,6 @@ public abstract interface class kotlinx/coroutines/flow/FlowCollector {
public final class kotlinx/coroutines/flow/FlowKt {
public static final field DEFAULT_CONCURRENCY_PROPERTY_NAME Ljava/lang/String;
- public static final fun BehaviourSubject ()Ljava/lang/Object;
- public static final fun PublishSubject ()Ljava/lang/Object;
- public static final fun ReplaySubject ()Ljava/lang/Object;
public static final fun asFlow (Ljava/lang/Iterable;)Lkotlinx/coroutines/flow/Flow;
public static final fun asFlow (Ljava/util/Iterator;)Lkotlinx/coroutines/flow/Flow;
public static final fun asFlow (Lkotlin/jvm/functions/Function0;)Lkotlinx/coroutines/flow/Flow;
@@ -850,12 +847,23 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static final fun collect (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun collect (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun collectIndexed (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
+ public static final fun collectLatest (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
+ public static final synthetic fun combine (Ljava/lang/Iterable;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
+ public static final fun combine (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
+ public static final fun combine (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function4;)Lkotlinx/coroutines/flow/Flow;
+ public static final fun combine (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function5;)Lkotlinx/coroutines/flow/Flow;
+ public static final fun combine (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function6;)Lkotlinx/coroutines/flow/Flow;
+ public static final synthetic fun combine ([Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function4;)Lkotlinx/coroutines/flow/Flow;
public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function5;)Lkotlinx/coroutines/flow/Flow;
public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function6;)Lkotlinx/coroutines/flow/Flow;
- public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;[Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
- public static final synthetic fun combineLatest (Lkotlinx/coroutines/flow/Flow;[Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
+ public static final synthetic fun combineTransform (Ljava/lang/Iterable;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
+ public static final fun combineTransform (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function4;)Lkotlinx/coroutines/flow/Flow;
+ public static final fun combineTransform (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function5;)Lkotlinx/coroutines/flow/Flow;
+ public static final fun combineTransform (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function6;)Lkotlinx/coroutines/flow/Flow;
+ public static final fun combineTransform (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function7;)Lkotlinx/coroutines/flow/Flow;
+ public static final synthetic fun combineTransform ([Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
public static final fun compose (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
public static final fun concatMap (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
public static final fun concatWith (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
@@ -883,6 +891,7 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static final fun first (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun flatMap (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun flatMapConcat (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
+ public static final fun flatMapLatest (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun flatMapMerge (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun flatMapMerge$default (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun flatten (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
@@ -890,6 +899,8 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static final fun flattenMerge (Lkotlinx/coroutines/flow/Flow;I)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun flattenMerge$default (Lkotlinx/coroutines/flow/Flow;IILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun flow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
+ public static final fun flowCombine (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
+ public static final fun flowCombineTransform (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function4;)Lkotlinx/coroutines/flow/Flow;
public static final fun flowOf (Ljava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun flowOf ([Ljava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun flowOn (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/flow/Flow;
@@ -902,6 +913,7 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static final fun getDEFAULT_CONCURRENCY ()I
public static final fun launchIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;)Lkotlinx/coroutines/Job;
public static final fun map (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
+ public static final fun mapLatest (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun mapNotNull (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun merge (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
public static final fun observeOn (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/flow/Flow;
@@ -946,6 +958,7 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static final fun toSet (Lkotlinx/coroutines/flow/Flow;Ljava/util/Set;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun toSet$default (Lkotlinx/coroutines/flow/Flow;Ljava/util/Set;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public static final fun transform (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
+ public static final fun transformLatest (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
public static final fun unsafeTransform (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
public static final fun withContext (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function1;)V
public static final fun withIndex (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
@@ -967,6 +980,10 @@ public abstract class kotlinx/coroutines/flow/internal/ChannelFlow : kotlinx/cor
public static synthetic fun update$default (Lkotlinx/coroutines/flow/internal/ChannelFlow;Lkotlin/coroutines/CoroutineContext;IILjava/lang/Object;)Lkotlinx/coroutines/flow/internal/ChannelFlow;
}
+public final class kotlinx/coroutines/flow/internal/CombineKt {
+ public static final fun combineInternal (Lkotlinx/coroutines/flow/FlowCollector;[Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
+}
+
public final class kotlinx/coroutines/flow/internal/FlowExceptions_commonKt {
public static final fun checkIndexOverflow (I)I
}
@@ -980,6 +997,10 @@ public final class kotlinx/coroutines/flow/internal/SendingCollector : kotlinx/c
public fun emit (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}
+public final class kotlinx/coroutines/intrinsics/CancellableKt {
+ public static final fun startCoroutineCancellable (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)V
+}
+
public class kotlinx/coroutines/scheduling/ExperimentalCoroutineDispatcher : kotlinx/coroutines/ExecutorCoroutineDispatcher {
public synthetic fun (II)V
public synthetic fun (IIILkotlin/jvm/internal/DefaultConstructorMarker;)V
diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-debug.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-debug.txt
index 604e6cd253..79f5b75d15 100644
--- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-debug.txt
+++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-debug.txt
@@ -1,13 +1,9 @@
public final class kotlinx/coroutines/debug/CoroutineInfo {
- public final fun component1 ()Lkotlin/coroutines/CoroutineContext;
- public final fun copy (Lkotlin/coroutines/CoroutineContext;Lkotlin/coroutines/jvm/internal/CoroutineStackFrame;J)Lkotlinx/coroutines/debug/CoroutineInfo;
- public static synthetic fun copy$default (Lkotlinx/coroutines/debug/CoroutineInfo;Lkotlin/coroutines/CoroutineContext;Lkotlin/coroutines/jvm/internal/CoroutineStackFrame;JILjava/lang/Object;)Lkotlinx/coroutines/debug/CoroutineInfo;
- public fun equals (Ljava/lang/Object;)Z
+ public final fun copy ()Lkotlinx/coroutines/debug/CoroutineInfo;
public final fun getContext ()Lkotlin/coroutines/CoroutineContext;
public final fun getCreationStackTrace ()Ljava/util/List;
public final fun getJob ()Lkotlinx/coroutines/Job;
public final fun getState ()Lkotlinx/coroutines/debug/State;
- public fun hashCode ()I
public final fun lastObservedStackTrace ()Ljava/util/List;
public fun toString ()Ljava/lang/String;
}
diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactive.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactive.txt
index 643f64170d..fb24c874f9 100644
--- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactive.txt
+++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactive.txt
@@ -14,11 +14,29 @@ public final class kotlinx/coroutines/reactive/ChannelKt {
public static synthetic fun openSubscription$default (Lorg/reactivestreams/Publisher;IILjava/lang/Object;)Lkotlinx/coroutines/channels/ReceiveChannel;
}
+public abstract interface class kotlinx/coroutines/reactive/ContextInjector {
+ public abstract fun injectCoroutineContext (Lorg/reactivestreams/Publisher;Lkotlin/coroutines/CoroutineContext;)Lorg/reactivestreams/Publisher;
+}
+
public final class kotlinx/coroutines/reactive/ConvertKt {
public static final fun asPublisher (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/CoroutineContext;)Lorg/reactivestreams/Publisher;
public static synthetic fun asPublisher$default (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lorg/reactivestreams/Publisher;
}
+public final class kotlinx/coroutines/reactive/FlowKt {
+ public static final fun asFlow (Lorg/reactivestreams/Publisher;)Lkotlinx/coroutines/flow/Flow;
+ public static final fun asFlow (Lorg/reactivestreams/Publisher;I)Lkotlinx/coroutines/flow/Flow;
+ public static final fun asPublisher (Lkotlinx/coroutines/flow/Flow;)Lorg/reactivestreams/Publisher;
+}
+
+public final class kotlinx/coroutines/reactive/FlowSubscription : kotlinx/coroutines/AbstractCoroutine, org/reactivestreams/Subscription {
+ public final field flow Lkotlinx/coroutines/flow/Flow;
+ public final field subscriber Lorg/reactivestreams/Subscriber;
+ public fun (Lkotlinx/coroutines/flow/Flow;Lorg/reactivestreams/Subscriber;)V
+ public fun cancel ()V
+ public fun request (J)V
+}
+
public final class kotlinx/coroutines/reactive/PublishKt {
public static final fun publish (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lorg/reactivestreams/Publisher;
public static final fun publish (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lorg/reactivestreams/Publisher;
@@ -44,12 +62,3 @@ public final class kotlinx/coroutines/reactive/PublisherCoroutine : kotlinx/coro
public fun send (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}
-public final class kotlinx/coroutines/reactive/flow/FlowAsPublisherKt {
- public static final fun from (Lkotlinx/coroutines/flow/Flow;)Lorg/reactivestreams/Publisher;
-}
-
-public final class kotlinx/coroutines/reactive/flow/PublisherAsFlowKt {
- public static final fun from (Lorg/reactivestreams/Publisher;)Lkotlinx/coroutines/flow/Flow;
- public static final fun from (Lorg/reactivestreams/Publisher;I)Lkotlinx/coroutines/flow/Flow;
-}
-
diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactor.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactor.txt
index 46b35ed71f..20e20baad0 100644
--- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactor.txt
+++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactor.txt
@@ -5,6 +5,10 @@ public final class kotlinx/coroutines/reactor/ConvertKt {
public static final fun asMono (Lkotlinx/coroutines/Job;Lkotlin/coroutines/CoroutineContext;)Lreactor/core/publisher/Mono;
}
+public final class kotlinx/coroutines/reactor/FlowKt {
+ public static final fun asFlux (Lkotlinx/coroutines/flow/Flow;)Lreactor/core/publisher/Flux;
+}
+
public final class kotlinx/coroutines/reactor/FluxKt {
public static final fun flux (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lreactor/core/publisher/Flux;
public static final fun flux (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lreactor/core/publisher/Flux;
diff --git a/build.gradle b/build.gradle
index 4bccce7a69..c05c07afc0 100644
--- a/build.gradle
+++ b/build.gradle
@@ -147,12 +147,10 @@ if (build_snapshot_train) {
allprojects {
tasks.withType(Test).all {
exclude '**/*LinearizabilityTest*'
- exclude '**/*PublicApiTest*' // KT-30956
exclude '**/*LFTest*'
exclude '**/*StressTest*'
exclude '**/*scheduling*'
exclude '**/*Timeout*'
- exclude '**/*coroutines/debug*' // Unmute after 1.3.31 where inlining was fixed
exclude '**/*definitely/not/kotlinx*'
}
}
diff --git a/docs/coroutine-context-and-dispatchers.md b/docs/coroutine-context-and-dispatchers.md
index 4e366ddee2..4769c1e23d 100644
--- a/docs/coroutine-context-and-dispatchers.md
+++ b/docs/coroutine-context-and-dispatchers.md
@@ -221,7 +221,8 @@ The `log` function prints the name of the thread in square brackets, and you can
thread with the identifier of the currently executing coroutine appended to it. This identifier
is consecutively assigned to all created coroutines when the debugging mode is on.
-You can read more about debugging facilities in the documentation of the [newCoroutineContext] function.
+> Debugging mode is also turned on when JVM is run with `-ea` option.
+You can read more about debugging facilities in the documentation of the [DEBUG_PROPERTY_NAME] property.
### Jumping between threads
@@ -696,7 +697,7 @@ that should be implemented.
[ExecutorCoroutineDispatcher.close]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-executor-coroutine-dispatcher/close.html
[runBlocking]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/run-blocking.html
[delay]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/delay.html
-[newCoroutineContext]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/new-coroutine-context.html
+[DEBUG_PROPERTY_NAME]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-d-e-b-u-g_-p-r-o-p-e-r-t-y_-n-a-m-e.html
[withContext]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/with-context.html
[isActive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/is-active.html
[CoroutineScope.coroutineContext]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-coroutine-scope/coroutine-context.html
diff --git a/docs/debugging.md b/docs/debugging.md
index fc8570126d..e2c7ec1e07 100644
--- a/docs/debugging.md
+++ b/docs/debugging.md
@@ -45,7 +45,7 @@ It is easy to demonstrate with actual stacktraces of the same program that await
The only downside of this approach is losing referential transparency of the exception.
-### Stacktrace recovery machinery
+### Stacktrace recovery machinery
This section explains the inner mechanism of stacktrace recovery and can be skipped.
@@ -56,6 +56,7 @@ and then throws the resulting exception instead of the original one.
Exception copy logic is straightforward:
1) If the exception class implements [CopyableThrowable], [CopyableThrowable.createCopy] is used.
+ `null` can be returned from `createCopy` to opt-out specific exception from being recovered.
2) If the exception class has class-specific fields not inherited from Throwable, the exception is not copied.
3) Otherwise, one of the public exception's constructor is invoked reflectively with an optional `initCause` call.
diff --git a/gradle.properties b/gradle.properties
index 60c65a827d..1fd92b1e16 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -1,5 +1,5 @@
# Kotlin
-version=1.3.0-RC-SNAPSHOT
+version=1.3.0-RC2-SNAPSHOT
group=org.jetbrains.kotlinx
kotlin_version=1.3.41
diff --git a/integration/kotlinx-coroutines-guava/build.gradle b/integration/kotlinx-coroutines-guava/build.gradle
index 48fd0f56b1..9e44b99864 100644
--- a/integration/kotlinx-coroutines-guava/build.gradle
+++ b/integration/kotlinx-coroutines-guava/build.gradle
@@ -2,7 +2,7 @@
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
-ext.guava_version = '24.0-jre'
+ext.guava_version = '28.0-jre'
dependencies {
compile "com.google.guava:guava:$guava_version"
diff --git a/kotlinx-coroutines-bom/build.gradle b/kotlinx-coroutines-bom/build.gradle
index 9ec43b2a15..c6675dd33a 100644
--- a/kotlinx-coroutines-bom/build.gradle
+++ b/kotlinx-coroutines-bom/build.gradle
@@ -10,8 +10,14 @@ def name = project.name
dependencyManagement {
dependencies {
rootProject.subprojects.each {
- if (!ext.unpublished.contains(it.name) && it.name != name) {
- dependency(group: it.group, name: it.name, version: it.version)
+ if (ext.unpublished.contains(it.name)) return
+ if (it.name == name) return
+ if (!it.plugins.hasPlugin('maven-publish')) return
+ evaluationDependsOn(it.path)
+ it.publishing.publications.all {
+ if (it.artifactId.endsWith("-kotlinMultiplatform")) return
+ if (it.artifactId.endsWith("-metadata")) return
+ dependency(group: it.groupId, name: it.artifactId, version: it.version)
}
}
}
diff --git a/kotlinx-coroutines-core/common/README.md b/kotlinx-coroutines-core/common/README.md
index a0cc809127..e59392ee66 100644
--- a/kotlinx-coroutines-core/common/README.md
+++ b/kotlinx-coroutines-core/common/README.md
@@ -65,7 +65,7 @@ helper function. [NonCancellable] job object is provided to suppress cancellatio
This module provides debugging facilities for coroutines (run JVM with `-ea` or `-Dkotlinx.coroutines.debug` options)
and [newCoroutineContext] function to write user-defined coroutine builders that work with these
-debugging facilities.
+debugging facilities. See [DEBUG_PROPERTY_NAME] for more details.
This module provides a special CoroutineContext type [TestCoroutineCoroutineContext][kotlinx.coroutines.test.TestCoroutineContext] that
allows the writer of code that contains Coroutines with delays and timeouts to write non-flaky unit-tests for that code allowing these tests to
@@ -124,6 +124,7 @@ Low-level primitives for finer-grained control of coroutines.
[Deferred.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-deferred/await.html
[Deferred.onAwait]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-deferred/on-await.html
[newCoroutineContext]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/new-coroutine-context.html
+[DEBUG_PROPERTY_NAME]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-d-e-b-u-g_-p-r-o-p-e-r-t-y_-n-a-m-e.html
[kotlinx.coroutines.sync.Mutex]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/index.html
[kotlinx.coroutines.sync.Mutex.lock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/lock.html
diff --git a/kotlinx-coroutines-core/common/src/JobSupport.kt b/kotlinx-coroutines-core/common/src/JobSupport.kt
index 63e34fda81..d7ca5f6750 100644
--- a/kotlinx-coroutines-core/common/src/JobSupport.kt
+++ b/kotlinx-coroutines-core/common/src/JobSupport.kt
@@ -326,6 +326,9 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
* may leak to the [CoroutineExceptionHandler].
*/
private fun cancelParent(cause: Throwable): Boolean {
+ // Is scoped coroutine -- don't propagate, will be rethrown
+ if (isScopedCoroutine) return true
+
/* CancellationException is considered "normal" and parent usually is not cancelled when child produces it.
* This allow parent to cancel its children (normally) without being cancelled itself, unless
* child crashes and produce some other exception during its completion.
@@ -337,8 +340,6 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
return isCancellation
}
- // Is scoped coroutine -- don't propagate, will be rethrown
- if (isScopedCoroutine) return isCancellation
// Notify parent but don't forget to check cancellation
return parent.childCancelled(cause) || isCancellation
}
diff --git a/kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt b/kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt
index 688125d946..1e1c0d3ae4 100644
--- a/kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt
+++ b/kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt
@@ -8,6 +8,7 @@ import kotlinx.coroutines.*
import kotlinx.coroutines.internal.*
import kotlinx.coroutines.selects.*
import kotlin.jvm.*
+import kotlin.math.*
/**
* Channel with array buffer of a fixed [capacity].
@@ -29,10 +30,14 @@ internal open class ArrayChannel(
}
private val lock = ReentrantLock()
- private val buffer: Array = arrayOfNulls(capacity)
+ /*
+ * Guarded by lock.
+ * Allocate minimum of capacity and 16 to avoid excess memory pressure for large channels when it's not necessary.
+ */
+ private var buffer: Array = arrayOfNulls(min(capacity, 8))
private var head: Int = 0
@Volatile
- private var size: Int = 0
+ private var size: Int = 0 // Invariant: size <= capacity
protected final override val isBufferAlwaysEmpty: Boolean get() = false
protected final override val isBufferEmpty: Boolean get() = size == 0
@@ -64,7 +69,8 @@ internal open class ArrayChannel(
}
}
}
- buffer[(head + size) % capacity] = element // actually queue element
+ ensureCapacity(size)
+ buffer[(head + size) % buffer.size] = element // actually queue element
return OFFER_SUCCESS
}
// size == capacity: full
@@ -112,7 +118,8 @@ internal open class ArrayChannel(
this.size = size // restore size
return ALREADY_SELECTED
}
- buffer[(head + size) % capacity] = element // actually queue element
+ ensureCapacity(size)
+ buffer[(head + size) % buffer.size] = element // actually queue element
return OFFER_SUCCESS
}
// size == capacity: full
@@ -123,6 +130,19 @@ internal open class ArrayChannel(
return receive!!.offerResult
}
+ // Guarded by lock
+ private fun ensureCapacity(currentSize: Int) {
+ if (currentSize >= buffer.size) {
+ val newSize = min(buffer.size * 2, capacity)
+ val newBuffer = arrayOfNulls(newSize)
+ for (i in 0 until currentSize) {
+ newBuffer[i] = buffer[(head + i) % buffer.size]
+ }
+ buffer = newBuffer
+ head = 0
+ }
+ }
+
// result is `E | POLL_FAILED | Closed`
protected override fun pollInternal(): Any? {
var send: Send? = null
@@ -149,9 +169,9 @@ internal open class ArrayChannel(
}
if (replacement !== POLL_FAILED && replacement !is Closed<*>) {
this.size = size // restore size
- buffer[(head + size) % capacity] = replacement
+ buffer[(head + size) % buffer.size] = replacement
}
- head = (head + 1) % capacity
+ head = (head + 1) % buffer.size
}
// complete send the we're taken replacement from
if (token != null)
@@ -203,7 +223,7 @@ internal open class ArrayChannel(
}
if (replacement !== POLL_FAILED && replacement !is Closed<*>) {
this.size = size // restore size
- buffer[(head + size) % capacity] = replacement
+ buffer[(head + size) % buffer.size] = replacement
} else {
// failed to poll or is already closed --> let's try to select receiving this element from buffer
if (!select.trySelect(null)) { // :todo: move trySelect completion outside of lock
@@ -212,7 +232,7 @@ internal open class ArrayChannel(
return ALREADY_SELECTED
}
}
- head = (head + 1) % capacity
+ head = (head + 1) % buffer.size
}
// complete send the we're taken replacement from
if (token != null)
@@ -226,7 +246,7 @@ internal open class ArrayChannel(
lock.withLock {
repeat(size) {
buffer[head] = 0
- head = (head + 1) % capacity
+ head = (head + 1) % buffer.size
}
size = 0
}
@@ -237,5 +257,5 @@ internal open class ArrayChannel(
// ------ debug ------
override val bufferDebugString: String
- get() = "(buffer:capacity=${buffer.size},size=$size)"
+ get() = "(buffer:capacity=$capacity,size=$size)"
}
diff --git a/kotlinx-coroutines-core/common/src/channels/Produce.kt b/kotlinx-coroutines-core/common/src/channels/Produce.kt
index a579d7a247..bf88b6a062 100644
--- a/kotlinx-coroutines-core/common/src/channels/Produce.kt
+++ b/kotlinx-coroutines-core/common/src/channels/Produce.kt
@@ -126,7 +126,7 @@ public fun CoroutineScope.produce(
return coroutine
}
-internal open class ProducerCoroutine(
+private class ProducerCoroutine(
parentContext: CoroutineContext, channel: Channel
) : ChannelCoroutine(parentContext, channel, active = true), ProducerScope {
override val isActive: Boolean
diff --git a/kotlinx-coroutines-core/common/src/flow/Flow.kt b/kotlinx-coroutines-core/common/src/flow/Flow.kt
index bda326f85d..6d87c2b9aa 100644
--- a/kotlinx-coroutines-core/common/src/flow/Flow.kt
+++ b/kotlinx-coroutines-core/common/src/flow/Flow.kt
@@ -22,7 +22,7 @@ import kotlin.coroutines.*
* or [launchIn] operator that starts collection of the flow in the given scope.
* They are applied to the upstream flow and trigger execution of all operations.
* Execution of the flow is also called _collecting the flow_ and is always performed in a suspending manner
- * without actual blocking. Terminal operator complete normally or exceptionally depending on successful or failed
+ * without actual blocking. Terminal operators complete normally or exceptionally depending on successful or failed
* execution of all the flow operations in the upstream. The most basic terminal operator is [collect], for example:
*
* ```
@@ -37,10 +37,10 @@ import kotlin.coroutines.*
*
* By default, flows are _sequential_ and all flow operations are executed sequentially in the same coroutine,
* with an exception for a few operations specifically designed to introduce concurrency into flow
- * the execution such a [buffer] and [flatMapMerge]. See their documentation for details.
+ * execution such as [buffer] and [flatMapMerge]. See their documentation for details.
*
- * Flow interface does not carry information whether a flow is a truly a cold stream that can be collected repeatedly and
- * triggers execution of the same code every time it is collected or if it is a hot stream that emits different
+ * The `Flow` interface does not carry information whether a flow truly is a cold stream that can be collected repeatedly and
+ * triggers execution of the same code every time it is collected, or if it is a hot stream that emits different
* values from the same running source on each collection. However, conventionally flows represent cold streams.
* Transitions between hot and cold streams are supported via channels and the corresponding API:
* [channelFlow], [produceIn], [broadcastIn].
@@ -54,18 +54,18 @@ import kotlin.coroutines.*
* * [flow { ... }][flow] builder function to construct arbitrary flows from
* sequential calls to [emit][FlowCollector.emit] function.
* * [channelFlow { ... }][channelFlow] builder function to construct arbitrary flows from
- * potentially concurrent calls to [send][kotlinx.coroutines.channels.SendChannel.send] function.
+ * potentially concurrent calls to the [send][kotlinx.coroutines.channels.SendChannel.send] function.
*
* ### Flow constraints
*
- * All implementations of `Flow` interface must adhere to two key properties that are described in detail below:
+ * All implementations of the `Flow` interface must adhere to two key properties described in detail below:
*
* * Context preservation.
* * Exception transparency.
*
* These properties ensure the ability to perform local reasoning about the code with flows and modularize the code
- * in such a way so that upstream flow emitters can be developed separately from downstream flow collectors.
- * A user of the flow does not needs to know implementation details of the upstream flows it uses.
+ * in such a way that upstream flow emitters can be developed separately from downstream flow collectors.
+ * A user of a flow does not need to be aware of implementation details of the upstream flows it uses.
*
* ### Context preservation
*
@@ -73,8 +73,8 @@ import kotlin.coroutines.*
* it downstream, thus making reasoning about the execution context of particular transformations or terminal
* operations trivial.
*
- * There is the only way to change the context of a flow: [flowOn][Flow.flowOn] operator,
- * that changes the upstream context ("everything above the flowOn operator").
+ * There is only one way to change the context of a flow: the [flowOn][Flow.flowOn] operator
+ * that changes the upstream context ("everything above the `flowOn` operator").
* For additional information refer to its documentation.
*
* This reasoning can be demonstrated in practice:
@@ -97,7 +97,7 @@ import kotlin.coroutines.*
* ```
*
* From the implementation point of view, it means that all flow implementations should
- * emit only from the same coroutine.
+ * only emit from the same coroutine.
* This constraint is efficiently enforced by the default [flow] builder.
* The [flow] builder should be used if flow implementation does not start any coroutines.
* Its implementation prevents most of the development mistakes:
@@ -114,27 +114,27 @@ import kotlin.coroutines.*
* }
* ```
*
- * Use [channelFlow] if the collection and emission of the flow are to be separated into multiple coroutines.
+ * Use [channelFlow] if the collection and emission of a flow are to be separated into multiple coroutines.
* It encapsulates all the context preservation work and allows you to focus on your
* domain-specific problem, rather than invariant implementation details.
* It is possible to use any combination of coroutine builders from within [channelFlow].
*
- * If you are looking for the performance and are sure that no concurrent emits and context jumps will happen,
- * [flow] builder alongside with [coroutineScope] or [supervisorScope] can be used instead:
+ * If you are looking for performance and are sure that no concurrent emits and context jumps will happen,
+ * the [flow] builder can be used alongside a [coroutineScope] or [supervisorScope] instead:
* - Scoped primitive should be used to provide a [CoroutineScope].
* - Changing the context of emission is prohibited, no matter whether it is `withContext(ctx)` or
- * builder argument (e.g. `launch(ctx)`).
+ * a builder argument (e.g. `launch(ctx)`).
* - Collecting another flow from a separate context is allowed, but it has the same effect as
- * [flowOn] operator on that flow, which is more efficient.
+ * applying the [flowOn] operator to that flow, which is more efficient.
*
* ### Exception transparency
*
* Flow implementations never catch or handle exceptions that occur in downstream flows. From the implementation standpoint
* it means that calls to [emit][FlowCollector.emit] and [emitAll] shall never be wrapped into
* `try { ... } catch { ... }` blocks. Exception handling in flows shall be performed with
- * [catch][Flow.catch] operator and it is designed to catch only exception coming from upstream flow while passing
- * all the downstream exceptions. Similarly, terminal operators like [collect][Flow.collect]
- * throw any unhandled exception that occurs in its code or in upstream flows, for example:
+ * [catch][Flow.catch] operator and it is designed to only catch exceptions coming from upstream flows while passing
+ * all downstream exceptions. Similarly, terminal operators like [collect][Flow.collect]
+ * throw any unhandled exceptions that occur in their code or in upstream flows, for example:
*
* ```
* flow { emitData() }
@@ -143,13 +143,13 @@ import kotlin.coroutines.*
* .map { computeTwo(it) }
* .collect { process(it) } // throws exceptions from process and computeTwo
* ```
- * The same reasoning can be applied to [onCompletion] operator that is a declarative replacement for `finally` block.
+ * The same reasoning can be applied to the [onCompletion] operator that is a declarative replacement for the `finally` block.
*
- * Failure to adhere to the exception transparency requirement would result in strange behaviours that would make
+ * Failure to adhere to the exception transparency requirement can lead to strange behaviors which make
* it hard to reason about the code because an exception in the `collect { ... }` could be somehow "caught"
- * by the upstream flow, limiting the ability of local reasoning about the code.
+ * by an upstream flow, limiting the ability of local reasoning about the code.
*
- * Currently, flow infrastructure does not enforce exception transparency contracts, however, it might be enforced
+ * Currently, the flow infrastructure does not enforce exception transparency contracts, however, it might be enforced
* in the future either at run time or at compile time.
*
* ### Reactive streams
@@ -162,9 +162,9 @@ public interface Flow {
* Accepts the given [collector] and [emits][FlowCollector.emit] values into it.
* This method should never be implemented or used directly.
*
- * The only way to implement flow interface directly is to extend [AbstractFlow].
- * To collect it into the specific collector, either `collector.emitAll(flow)` or `collect { ... }` extension
- * should be used. Such limitation ensures that context preservation property is not violated and prevents most
+ * The only way to implement the `Flow` interface directly is to extend [AbstractFlow].
+ * To collect it into a specific collector, either `collector.emitAll(flow)` or `collect { ... }` extension
+ * should be used. Such limitation ensures that the context preservation property is not violated and prevents most
* of the developer mistakes related to concurrency, inconsistent flow dispatchers and cancellation.
*/
@InternalCoroutinesApi
@@ -172,8 +172,8 @@ public interface Flow {
}
/**
- * Base class to extend to have a stateful implementation of the flow.
- * It tracks all the properties required for context preservation and throws [IllegalStateException]
+ * Base class for stateful implementations of `Flow`.
+ * It tracks all the properties required for context preservation and throws an [IllegalStateException]
* if any of the properties are violated.
*
* Example of the implementation:
diff --git a/kotlinx-coroutines-core/common/src/flow/Migration.kt b/kotlinx-coroutines-core/common/src/flow/Migration.kt
index b7e91f50ce..16769ad806 100644
--- a/kotlinx-coroutines-core/common/src/flow/Migration.kt
+++ b/kotlinx-coroutines-core/common/src/flow/Migration.kt
@@ -8,6 +8,9 @@
package kotlinx.coroutines.flow
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.internal.*
+import kotlinx.coroutines.flow.internal.unsafeFlow
import kotlin.coroutines.*
import kotlin.jvm.*
@@ -99,29 +102,6 @@ public fun Flow.publishOn(context: CoroutineContext): Flow = noImpl()
@Deprecated(message = "Use flowOn instead", level = DeprecationLevel.ERROR)
public fun Flow.subscribeOn(context: CoroutineContext): Flow = noImpl()
-/**
- * Use [BroadcastChannel][kotlinx.coroutines.channels.BroadcastChannel].asFlow().
- * @suppress
- */
-@Deprecated(message = "Use BroadcastChannel.asFlow()", level = DeprecationLevel.ERROR)
-public fun BehaviourSubject(): Any = noImpl()
-
-/**
- * `ReplaySubject` is not supported. The closest analogue is buffered [BroadcastChannel][kotlinx.coroutines.channels.BroadcastChannel].
- * @suppress
- */
-@Deprecated(
- message = "ReplaySubject is not supported. The closest analogue is buffered broadcast channel",
- level = DeprecationLevel.ERROR)
-public fun ReplaySubject(): Any = noImpl()
-
-/**
- * `PublishSubject` is not supported.
- * @suppress
- */
-@Deprecated(message = "PublishSubject is not supported", level = DeprecationLevel.ERROR)
-public fun PublishSubject(): Any = noImpl()
-
/**
* Flow analogue of `onErrorXxx` is [catch].
* Use `catch { emitAll(fallback) }`.
@@ -380,7 +360,81 @@ public fun Flow.concatWith(value: T): Flow = noImpl()
@Deprecated(
level = DeprecationLevel.ERROR,
message = "Flow analogue of 'concatWith' is 'onCompletion'. Use 'onCompletion { emitAll(other) }'",
- replaceWith = ReplaceWith("onCompletion { emitAkk(other) }")
+ replaceWith = ReplaceWith("onCompletion { emitAll(other) }")
)
public fun Flow.concatWith(other: Flow): Flow = noImpl()
+@Deprecated(
+ level = DeprecationLevel.ERROR,
+ message = "Flow analogue of 'combineLatest' is 'combine'",
+ replaceWith = ReplaceWith("this.combine(other, transform)")
+)
+public fun Flow.combineLatest(other: Flow, transform: suspend (T1, T2) -> R): Flow =
+ combine(this, other, transform)
+
+@Deprecated(
+ level = DeprecationLevel.ERROR,
+ message = "Flow analogue of 'combineLatest' is 'combine'",
+ replaceWith = ReplaceWith("combine(this, other, other2, transform)")
+)
+public inline fun Flow.combineLatest(
+ other: Flow,
+ other2: Flow,
+ crossinline transform: suspend (T1, T2, T3) -> R
+) = combine(this, other, other2, transform)
+
+@Deprecated(
+ level = DeprecationLevel.ERROR,
+ message = "Flow analogue of 'combineLatest' is 'combine'",
+ replaceWith = ReplaceWith("combine(this, other, other2, other3, transform)")
+)
+public inline fun Flow.combineLatest(
+ other: Flow,
+ other2: Flow,
+ other3: Flow,
+ crossinline transform: suspend (T1, T2, T3, T4) -> R
+) = combine(this, other, other2, other3, transform)
+
+@Deprecated(
+ level = DeprecationLevel.ERROR,
+ message = "Flow analogue of 'combineLatest' is 'combine'",
+ replaceWith = ReplaceWith("combine(this, other, other2, other3, transform)")
+)
+public inline fun Flow.combineLatest(
+ other: Flow,
+ other2: Flow,
+ other3: Flow,
+ other4: Flow,
+ crossinline transform: suspend (T1, T2, T3, T4, T5) -> R
+): Flow = combine(this, other, other2, other3, other4, transform)
+
+/**
+ * Delays the emission of values from this flow for the given [timeMillis].
+ * Use `onStart { delay(timeMillis) }`.
+ * @suppress
+ */
+@Deprecated(
+ level = DeprecationLevel.WARNING, // since 1.3.0, error in 1.4.0
+ message = "Use 'onStart { delay(timeMillis) }'",
+ replaceWith = ReplaceWith("onStart { delay(timeMillis) }")
+)
+public fun Flow.delayFlow(timeMillis: Long): Flow = onStart { delay(timeMillis) }
+
+/**
+ * Delays each element emitted by the given flow for the given [timeMillis].
+ * Use `onEach { delay(timeMillis) }`.
+ * @suppress
+ */
+@Deprecated(
+ level = DeprecationLevel.WARNING, // since 1.3.0, error in 1.4.0
+ message = "Use 'onEach { delay(timeMillis) }'",
+ replaceWith = ReplaceWith("onEach { delay(timeMillis) }")
+)
+public fun Flow.delayEach(timeMillis: Long): Flow = onEach { delay(timeMillis) }
+
+@Deprecated(
+ level = DeprecationLevel.ERROR,
+ message = "Flow analogues of 'switchMap' are 'transformLatest', 'flatMapLatest' and 'mapLatest'",
+ replaceWith = ReplaceWith("this.flatMapLatest(transform)")
+)
+public fun Flow.switchMap(transform: suspend (value: T) -> Flow): Flow = flatMapLatest(transform)
diff --git a/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt b/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt
index 99a3bdc655..3bae2ebd38 100644
--- a/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt
+++ b/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt
@@ -68,7 +68,7 @@ public abstract class ChannelFlow(
scope.broadcast(context, produceCapacity, start, block = collectToFun)
open fun produceImpl(scope: CoroutineScope): ReceiveChannel =
- scope.flowProduce(context, produceCapacity, block = collectToFun)
+ scope.produce(context, produceCapacity, block = collectToFun)
override suspend fun collect(collector: FlowCollector) =
coroutineScope {
diff --git a/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt b/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt
new file mode 100644
index 0000000000..f7edad08db
--- /dev/null
+++ b/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt
@@ -0,0 +1,142 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+@file:Suppress("UNCHECKED_CAST", "NON_APPLICABLE_CALL_FOR_BUILDER_INFERENCE") // KT-32203
+
+package kotlinx.coroutines.flow.internal
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
+import kotlinx.coroutines.flow.*
+import kotlinx.coroutines.internal.*
+import kotlinx.coroutines.selects.*
+
+internal fun getNull(): Symbol = NULL // Workaround for JS BE bug
+
+internal suspend fun FlowCollector.combineTransformInternal(
+ first: Flow, second: Flow,
+ transform: suspend FlowCollector.(a: T1, b: T2) -> Unit
+) {
+ coroutineScope {
+ val firstChannel = asFairChannel(first)
+ val secondChannel = asFairChannel(second)
+ var firstValue: Any? = null
+ var secondValue: Any? = null
+ var firstIsClosed = false
+ var secondIsClosed = false
+ while (!firstIsClosed || !secondIsClosed) {
+ select {
+ onReceive(firstIsClosed, firstChannel, { firstIsClosed = true }) { value ->
+ firstValue = value
+ if (secondValue !== null) {
+ transform(getNull().unbox(firstValue), getNull().unbox(secondValue) as T2)
+ }
+ }
+
+ onReceive(secondIsClosed, secondChannel, { secondIsClosed = true }) { value ->
+ secondValue = value
+ if (firstValue !== null) {
+ transform(getNull().unbox(firstValue) as T1, getNull().unbox(secondValue) as T2)
+ }
+ }
+ }
+ }
+ }
+}
+
+@PublishedApi
+internal suspend fun FlowCollector.combineInternal(
+ flows: Array>,
+ arrayFactory: () -> Array,
+ transform: suspend FlowCollector.(Array) -> Unit
+) {
+ coroutineScope {
+ val size = flows.size
+ val channels =
+ Array(size) { asFairChannel(flows[it]) }
+ val latestValues = arrayOfNulls(size)
+ val isClosed = Array(size) { false }
+
+ // See flow.combine(other) for explanation.
+ while (!isClosed.all { it }) {
+ select {
+ for (i in 0 until size) {
+ onReceive(isClosed[i], channels[i], { isClosed[i] = true }) { value ->
+ latestValues[i] = value
+ if (latestValues.all { it !== null }) {
+ val arguments = arrayFactory()
+ for (index in 0 until size) {
+ arguments[index] = NULL.unbox(latestValues[index])
+ }
+ transform(arguments as Array)
+ }
+ }
+ }
+ }
+ }
+ }
+}
+
+private inline fun SelectBuilder.onReceive(
+ isClosed: Boolean,
+ channel: ReceiveChannel,
+ crossinline onClosed: () -> Unit,
+ noinline onReceive: suspend (value: Any) -> Unit
+) {
+ if (isClosed) return
+ channel.onReceiveOrNull {
+ // TODO onReceiveOrClosed when boxing issues are fixed
+ if (it === null) onClosed()
+ else onReceive(it)
+ }
+}
+
+// Channel has any type due to onReceiveOrNull. This will be fixed after receiveOrClosed
+private fun CoroutineScope.asFairChannel(flow: Flow<*>): ReceiveChannel = produce {
+ val channel = channel as ChannelCoroutine
+ flow.collect { value ->
+ return@collect channel.sendFair(value ?: NULL)
+ }
+}
+
+internal fun zipImpl(flow: Flow, flow2: Flow, transform: suspend (T1, T2) -> R): Flow = unsafeFlow {
+ coroutineScope {
+ val first = asChannel(flow)
+ val second = asChannel(flow2)
+ /*
+ * This approach only works with rendezvous channel and is required to enforce correctness
+ * in the following scenario:
+ * ```
+ * val f1 = flow { emit(1); delay(Long.MAX_VALUE) }
+ * val f2 = flowOf(1)
+ * f1.zip(f2) { ... }
+ * ```
+ *
+ * Invariant: this clause is invoked only when all elements from the channel were processed (=> rendezvous restriction).
+ */
+ (second as SendChannel<*>).invokeOnClose {
+ if (!first.isClosedForReceive) first.cancel(AbortFlowException())
+ }
+
+ val otherIterator = second.iterator()
+ try {
+ first.consumeEach { value ->
+ if (!otherIterator.hasNext()) {
+ return@consumeEach
+ }
+ emit(transform(NULL.unbox(value), NULL.unbox(otherIterator.next())))
+ }
+ } catch (e: AbortFlowException) {
+ // complete
+ } finally {
+ if (!second.isClosedForReceive) second.cancel(AbortFlowException())
+ }
+ }
+}
+
+// Channel has any type due to onReceiveOrNull. This will be fixed after receiveOrClosed
+private fun CoroutineScope.asChannel(flow: Flow<*>): ReceiveChannel = produce {
+ flow.collect { value ->
+ return@collect channel.send(value ?: NULL)
+ }
+}
diff --git a/kotlinx-coroutines-core/common/src/flow/internal/FlowCoroutine.kt b/kotlinx-coroutines-core/common/src/flow/internal/FlowCoroutine.kt
index f0b5b391fa..adc3a17d16 100644
--- a/kotlinx-coroutines-core/common/src/flow/internal/FlowCoroutine.kt
+++ b/kotlinx-coroutines-core/common/src/flow/internal/FlowCoroutine.kt
@@ -52,20 +52,6 @@ internal fun scopedFlow(@BuilderInference block: suspend CoroutineScope.(Flo
flowScope { block(collector) }
}
-/*
- * Shortcut for produce { flowScope {block() } }
- */
-internal fun CoroutineScope.flowProduce(
- context: CoroutineContext,
- capacity: Int = 0, @BuilderInference block: suspend ProducerScope.() -> Unit
-): ReceiveChannel {
- val channel = Channel(capacity)
- val newContext = newCoroutineContext(context)
- val coroutine = FlowProduceCoroutine(newContext, channel)
- coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
- return coroutine
-}
-
private class FlowCoroutine(
context: CoroutineContext,
uCont: Continuation
@@ -75,13 +61,3 @@ private class FlowCoroutine(
return cancelImpl(cause)
}
}
-
-private class FlowProduceCoroutine(
- parentContext: CoroutineContext,
- channel: Channel
-) : ProducerCoroutine(parentContext, channel) {
- public override fun childCancelled(cause: Throwable): Boolean {
- if (cause is ChildCancelledException) return true
- return cancelImpl(cause)
- }
-}
diff --git a/kotlinx-coroutines-core/common/src/flow/internal/Merge.kt b/kotlinx-coroutines-core/common/src/flow/internal/Merge.kt
new file mode 100644
index 0000000000..f621be034e
--- /dev/null
+++ b/kotlinx-coroutines-core/common/src/flow/internal/Merge.kt
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.flow.internal
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
+import kotlinx.coroutines.flow.*
+import kotlinx.coroutines.sync.*
+import kotlin.coroutines.*
+
+internal class ChannelFlowTransformLatest(
+ private val transform: suspend FlowCollector.(value: T) -> Unit,
+ flow: Flow,
+ context: CoroutineContext = EmptyCoroutineContext,
+ capacity: Int = Channel.BUFFERED
+) : ChannelFlowOperator(flow, context, capacity) {
+ override fun create(context: CoroutineContext, capacity: Int): ChannelFlow =
+ ChannelFlowTransformLatest(transform, flow, context, capacity)
+
+ override suspend fun flowCollect(collector: FlowCollector) {
+ assert { collector is SendingCollector } // So cancellation behaviour is not leaking into the downstream
+ flowScope {
+ var previousFlow: Job? = null
+ flow.collect { value ->
+ previousFlow?.apply {
+ cancel(ChildCancelledException())
+ join()
+ }
+ // Do not pay for dispatch here, it's never necessary
+ previousFlow = launch(start = CoroutineStart.UNDISPATCHED) {
+ collector.transform(value)
+ }
+ }
+ }
+ }
+}
+
+internal class ChannelFlowMerge(
+ flow: Flow>,
+ private val concurrency: Int,
+ context: CoroutineContext = EmptyCoroutineContext,
+ capacity: Int = Channel.OPTIONAL_CHANNEL
+) : ChannelFlowOperator, T>(flow, context, capacity) {
+ override fun create(context: CoroutineContext, capacity: Int): ChannelFlow =
+ ChannelFlowMerge(flow, concurrency, context, capacity)
+
+ // The actual merge implementation with concurrency limit
+ private suspend fun mergeImpl(scope: CoroutineScope, collector: ConcurrentFlowCollector) {
+ val semaphore = Semaphore(concurrency)
+ val job: Job? = coroutineContext[Job]
+ flow.collect { inner ->
+ /*
+ * We launch a coroutine on each emitted element and the only potential
+ * suspension point in this collector is `semaphore.acquire` that rarely suspends,
+ * so we manually check for cancellation to propagate it to the upstream in time.
+ */
+ job?.ensureActive()
+ semaphore.acquire()
+ scope.launch {
+ try {
+ inner.collect(collector)
+ } finally {
+ semaphore.release() // Release concurrency permit
+ }
+ }
+ }
+ }
+
+ // Fast path in ChannelFlowOperator calls this function (channel was not created yet)
+ override suspend fun flowCollect(collector: FlowCollector) {
+ // this function should not have been invoked when channel was explicitly requested
+ assert { capacity == Channel.OPTIONAL_CHANNEL }
+ flowScope {
+ mergeImpl(this, collector.asConcurrentFlowCollector())
+ }
+ }
+
+ // Slow path when output channel is required (and was created)
+ override suspend fun collectTo(scope: ProducerScope) =
+ mergeImpl(scope, SendingCollector(scope))
+
+ override fun additionalToStringProps(): String =
+ "concurrency=$concurrency, "
+}
diff --git a/kotlinx-coroutines-core/common/src/flow/internal/NullSurrogate.kt b/kotlinx-coroutines-core/common/src/flow/internal/NullSurrogate.kt
index dbd7120e27..c6ff12fc4e 100644
--- a/kotlinx-coroutines-core/common/src/flow/internal/NullSurrogate.kt
+++ b/kotlinx-coroutines-core/common/src/flow/internal/NullSurrogate.kt
@@ -14,3 +14,11 @@ import kotlin.jvm.*
@JvmField
@SharedImmutable
internal val NULL = Symbol("NULL")
+
+/*
+ * Symbol used to indicate that the flow is complete.
+ * It should never leak to the outside world.
+ */
+@JvmField
+@SharedImmutable
+internal val DONE = Symbol("DONE")
diff --git a/kotlinx-coroutines-core/common/src/flow/internal/SafeCollector.kt b/kotlinx-coroutines-core/common/src/flow/internal/SafeCollector.kt
index 09a63781f0..8761058e71 100644
--- a/kotlinx-coroutines-core/common/src/flow/internal/SafeCollector.kt
+++ b/kotlinx-coroutines-core/common/src/flow/internal/SafeCollector.kt
@@ -81,7 +81,13 @@ internal class SafeCollector(
"FlowCollector is not thread-safe and concurrent emissions are prohibited. To mitigate this restriction please use 'channelFlow' builder instead of 'flow'"
)
}
- count + 1
+
+ /*
+ * If collect job is null (-> EmptyCoroutineContext, probably run from `suspend fun main`), then invariant is maintained
+ * (common transitive parent is "null"), but count check will fail, so just do not count job context element when
+ * flow is collected from EmptyCoroutineContext
+ */
+ if (collectJob == null) count else count + 1
}
if (result != collectContextSize) {
error(
diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Context.kt b/kotlinx-coroutines-core/common/src/flow/operators/Context.kt
index 8f3325c508..043c839fff 100644
--- a/kotlinx-coroutines-core/common/src/flow/operators/Context.kt
+++ b/kotlinx-coroutines-core/common/src/flow/operators/Context.kt
@@ -238,7 +238,7 @@ public fun Flow.flowOn(context: CoroutineContext): Flow {
* 4) It can be confused with [flowOn] operator, though [flowWith] is much rarer.
*/
@FlowPreview
-@Deprecated(message = "flowWith is deprecated without replacement, please refer to its KDoc for an explanation", level = DeprecationLevel.WARNING) // Error in beta release, removal in 1.4
+@Deprecated(message = "flowWith is deprecated without replacement, please refer to its KDoc for an explanation", level = DeprecationLevel.ERROR) // Error in beta release, removal in 1.4
public fun Flow.flowWith(
flowContext: CoroutineContext,
bufferSize: Int = BUFFERED,
diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt
index 8d74be5584..85b9b07c6b 100644
--- a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt
+++ b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt
@@ -14,26 +14,6 @@ import kotlinx.coroutines.selects.*
import kotlin.jvm.*
import kotlinx.coroutines.flow.internal.unsafeFlow as flow
-/**
- * Delays the emission of values from this flow for the given [timeMillis].
- */
-@ExperimentalCoroutinesApi
-public fun Flow.delayFlow(timeMillis: Long): Flow = flow {
- delay(timeMillis)
- collect(this@flow)
-}
-
-/**
- * Delays each element emitted by the given flow for the given [timeMillis].
- */
-@ExperimentalCoroutinesApi
-public fun Flow.delayEach(timeMillis: Long): Flow = flow {
- collect { value ->
- delay(timeMillis)
- emit(value)
- }
-}
-
/**
* Returns a flow that mirrors the original flow, but filters out values
* that are followed by the newer values within the given [timeout][timeoutMillis].
@@ -62,18 +42,21 @@ public fun Flow.delayEach(timeMillis: Long): Flow = flow {
public fun Flow.debounce(timeoutMillis: Long): Flow {
require(timeoutMillis > 0) { "Debounce timeout should be positive" }
return scopedFlow { downstream ->
- val values = Channel(Channel.CONFLATED) // Actually Any, KT-30796
- // Channel is not closed deliberately as there is no close with value
- val collector = async {
- collect { value -> values.send(value ?: NULL) }
+ // Actually Any, KT-30796
+ val values = produce(capacity = Channel.CONFLATED) {
+ collect { value -> send(value ?: NULL) }
}
-
- var isDone = false
var lastValue: Any? = null
- while (!isDone) {
+ while (lastValue !== DONE) {
select {
- values.onReceive {
- lastValue = it
+ // Should be receiveOrClosed when boxing issues are fixed
+ values.onReceiveOrNull {
+ if (it == null) {
+ if (lastValue != null) downstream.emit(NULL.unbox(lastValue))
+ lastValue = DONE
+ } else {
+ lastValue = it
+ }
}
lastValue?.let { value ->
@@ -83,12 +66,6 @@ public fun Flow.debounce(timeoutMillis: Long): Flow {
downstream.emit(NULL.unbox(value))
}
}
-
- // Close with value 'idiom'
- collector.onAwait {
- if (lastValue != null) downstream.emit(NULL.unbox(lastValue))
- isDone = true
- }
}
}
}
@@ -118,16 +95,14 @@ public fun Flow.sample(periodMillis: Long): Flow {
// Actually Any, KT-30796
collect { value -> send(value ?: NULL) }
}
-
- var isDone = false
var lastValue: Any? = null
val ticker = fixedPeriodTicker(periodMillis)
- while (!isDone) {
+ while (lastValue !== DONE) {
select {
values.onReceiveOrNull {
if (it == null) {
ticker.cancel(ChildCancelledException())
- isDone = true
+ lastValue = DONE
} else {
lastValue = it
}
diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt b/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt
index e593d0355f..dccc1cd8af 100644
--- a/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt
+++ b/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt
@@ -10,11 +10,8 @@ package kotlinx.coroutines.flow
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
-import kotlinx.coroutines.channels.Channel.Factory.OPTIONAL_CHANNEL
import kotlinx.coroutines.flow.internal.*
import kotlinx.coroutines.internal.*
-import kotlinx.coroutines.sync.*
-import kotlin.coroutines.*
import kotlin.jvm.*
import kotlinx.coroutines.flow.internal.unsafeFlow as flow
@@ -105,9 +102,34 @@ public fun Flow>.flattenMerge(concurrency: Int = DEFAULT_CONCURRENCY
return if (concurrency == 1) flattenConcat() else ChannelFlowMerge(this, concurrency)
}
+/**
+ * Returns a flow that produces element by [transform] function every time the original flow emits a value.
+ * When the original flow emits a new value, the previous `transform` block is cancelled, thus the name `transformLatest`.
+ *
+ * For example, the following flow:
+ * ```
+ * flow {
+ * emit("a")
+ * delay(100)
+ * emit("b")
+ * }.transformLatest { value ->
+ * emit(value)
+ * delay(200)
+ * emit(value + "_last")
+ * }
+ * ```
+ * produces `a b b_last`.
+ *
+ * This operator is [buffered][buffer] by default
+ * and size of its output buffer can be changed by applying subsequent [buffer] operator.
+ */
+@ExperimentalCoroutinesApi
+public fun Flow.transformLatest(@BuilderInference transform: suspend FlowCollector.(value: T) -> Unit): Flow =
+ ChannelFlowTransformLatest(transform, this)
+
/**
* Returns a flow that switches to a new flow produced by [transform] function every time the original flow emits a value.
- * When switch on the a flow is performed, the previous one is cancelled.
+ * When the original flow emits a new value, the previous flow produced by `transform` block is cancelled.
*
* For example, the following flow:
* ```
@@ -115,69 +137,42 @@ public fun Flow>.flattenMerge(concurrency: Int = DEFAULT_CONCURRENCY
* emit("a")
* delay(100)
* emit("b")
- * }.switchMap { value ->
+ * }.flatMapLatest { value ->
* flow {
- * emit(value + value)
+ * emit(value)
* delay(200)
* emit(value + "_last")
* }
* }
* ```
- * produces `aa bb b_last`
+ * produces `a b b_last`
+ *
+ * This operator is [buffered][buffer] by default and size of its output buffer can be changed by applying subsequent [buffer] operator.
*/
-@FlowPreview
-public fun Flow.switchMap(transform: suspend (value: T) -> Flow): Flow = scopedFlow { downstream ->
- var previousFlow: Job? = null
- collect { value ->
- // Linearize calls to emit as alternative to the channel. Bonus points for never-overlapping channels.
- previousFlow?.cancel(ChildCancelledException())
- previousFlow?.join()
- // Undispatched to have better user experience in case of synchronous flows
- previousFlow = launch(start = CoroutineStart.UNDISPATCHED) {
- downstream.emitAll(transform(value))
- }
- }
-}
-
-private class ChannelFlowMerge(
- flow: Flow>,
- private val concurrency: Int,
- context: CoroutineContext = EmptyCoroutineContext,
- capacity: Int = OPTIONAL_CHANNEL
-) : ChannelFlowOperator, T>(flow, context, capacity) {
- override fun create(context: CoroutineContext, capacity: Int): ChannelFlow =
- ChannelFlowMerge(flow, concurrency, context, capacity)
-
- // The actual merge implementation with concurrency limit
- private suspend fun mergeImpl(scope: CoroutineScope, collector: ConcurrentFlowCollector) {
- val semaphore = Semaphore(concurrency)
- @Suppress("UNCHECKED_CAST")
- flow.collect { inner ->
- semaphore.acquire() // Acquire concurrency permit
- scope.launch {
- try {
- inner.collect(collector)
- } finally {
- semaphore.release() // Release concurrency permit
- }
- }
- }
- }
-
- // Fast path in ChannelFlowOperator calls this function (channel was not created yet)
- override suspend fun flowCollect(collector: FlowCollector) {
- // this function should not have been invoked when channel was explicitly requested
- assert { capacity == OPTIONAL_CHANNEL }
- flowScope {
- mergeImpl(this, collector.asConcurrentFlowCollector())
- }
- }
-
- // Slow path when output channel is required (and was created)
- override suspend fun collectTo(scope: ProducerScope) =
- mergeImpl(scope, SendingCollector(scope))
-
- override fun additionalToStringProps(): String =
- "concurrency=$concurrency, "
-}
+@ExperimentalCoroutinesApi
+public inline fun Flow.flatMapLatest(@BuilderInference crossinline transform: suspend (value: T) -> Flow): Flow =
+ transformLatest { emitAll(transform(it)) }
+/**
+ * Returns a flow that emits elements from the original flow transformed by [transform] function.
+ * When the original flow emits a new value, computation of the [transform] block for previous value is cancelled.
+ *
+ * For example, the following flow:
+ * ```
+ * flow {
+ * emit("a")
+ * delay(100)
+ * emit("b")
+ * }.mapLatest { value ->
+ * println("Started computing $value")
+ * delay(200)
+ * "Computed $value"
+ * }
+ * ```
+ * will print "Started computing 1" and "Started computing 2", but the resulting flow will contain only "Computed 2" value.
+ *
+ * This operator is [buffered][buffer] by default and size of its output buffer can be changed by applying subsequent [buffer] operator.
+ */
+@ExperimentalCoroutinesApi
+public fun Flow.mapLatest(@BuilderInference transform: suspend (value: T) -> R): Flow =
+ transformLatest { emit(transform(it)) }
diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Zip.kt b/kotlinx-coroutines-core/common/src/flow/operators/Zip.kt
index 72822bbe4c..ba4f0520a5 100644
--- a/kotlinx-coroutines-core/common/src/flow/operators/Zip.kt
+++ b/kotlinx-coroutines-core/common/src/flow/operators/Zip.kt
@@ -4,15 +4,14 @@
@file:JvmMultifileClass
@file:JvmName("FlowKt")
-@file:Suppress("UNCHECKED_CAST")
+@file:Suppress("UNCHECKED_CAST", "NON_APPLICABLE_CALL_FOR_BUILDER_INFERENCE") // KT-32203
package kotlinx.coroutines.flow
import kotlinx.coroutines.*
-import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.internal.*
-import kotlinx.coroutines.selects.*
import kotlin.jvm.*
+import kotlinx.coroutines.flow.flow as safeFlow
import kotlinx.coroutines.flow.internal.unsafeFlow as flow
/**
@@ -23,69 +22,123 @@ import kotlinx.coroutines.flow.internal.unsafeFlow as flow
* ```
* val flow = flowOf(1, 2).delayEach(10)
* val flow2 = flowOf("a", "b", "c").delayEach(15)
- * flow.combineLatest(flow2) { i, s -> i.toString() + s }.collect {
+ * flow.combine(flow2) { i, s -> i.toString() + s }.collect {
* println(it) // Will print "1a 2a 2b 2c"
* }
* ```
+ *
+ * This function is a shorthand for `flow.combineTransform(flow2) { a, b -> emit(transform(a, b)) }
*/
-@FlowPreview
-public fun Flow.combineLatest(other: Flow, transform: suspend (T1, T2) -> R): Flow = flow {
- coroutineScope {
- val firstChannel = asFairChannel(this@combineLatest)
- val secondChannel = asFairChannel(other)
- var firstValue: Any? = null
- var secondValue: Any? = null
- var firstIsClosed = false
- var secondIsClosed = false
-
- /*
- * Fun fact, this select **semantically** equivalent of the following:
- * ```
- * selectWhile {
- * channel.onReceive {
- * emitCombined(...)
- * }
- * channel2.onReceive {
- * emitCombined(...)
- * }
- * }
- * ```
- * but we are waiting for `channels` branch to get merged where we will change semantics of the select
- * to ignore finished clauses.
- *
- * Instead (especially in the face of non-fair channels) we are using our own hand-rolled select emulation
- * on top of previous select.
- */
- while (!firstIsClosed || !secondIsClosed) {
- select {
- onReceive(firstIsClosed, firstChannel, { firstIsClosed = true }) { value ->
- firstValue = value
- if (secondValue !== null) {
- emit(transform(NULL.unbox(firstValue), NULL.unbox(secondValue)))
- }
- }
-
- onReceive(secondIsClosed, secondChannel, { secondIsClosed = true }) { value ->
- secondValue = value
- if (firstValue !== null) {
- emit(transform(NULL.unbox(firstValue), NULL.unbox(secondValue)))
- }
- }
- }
- }
+@JvmName("flowCombine")
+@ExperimentalCoroutinesApi
+public fun Flow.combine(flow: Flow, transform: suspend (a: T1, b: T2) -> R): Flow = flow {
+ combineTransformInternal(this@combine, flow) { a, b ->
+ emit(transform(a, b))
}
}
/**
* Returns a [Flow] whose values are generated with [transform] function by combining
* the most recently emitted values by each flow.
+ *
+ * It can be demonstrated with the following example:
+ * ```
+ * val flow = flowOf(1, 2).delayEach(10)
+ * val flow2 = flowOf("a", "b", "c").delayEach(15)
+ * combine(flow, flow2) { i, s -> i.toString() + s }.collect {
+ * println(it) // Will print "1a 2a 2b 2c"
+ * }
+ * ```
+ *
+ * This function is a shorthand for `combineTransform(flow, flow2) { a, b -> emit(transform(a, b)) }
*/
-@FlowPreview
-public inline fun Flow.combineLatest(
- other: Flow,
- other2: Flow,
- crossinline transform: suspend (T1, T2, T3) -> R
-): Flow = (this as Flow<*>).combineLatest(other, other2) { args: Array<*> ->
+@ExperimentalCoroutinesApi
+public fun combine(flow: Flow, flow2: Flow, transform: suspend (a: T1, b: T2) -> R): Flow =
+ flow.combine(flow2, transform)
+
+/**
+ * Returns a [Flow] whose values are generated by [transform] function that process the most recently emitted values by each flow.
+ *
+ * The receiver of the [transform] is [FlowCollector] and thus `transform` is a
+ * generic function that may transform emitted element, skip it or emit it multiple times.
+ *
+ * Its usage can be demonstrated with the following example:
+ * ```
+ * val flow = requestFlow()
+ * val flow2 = searchEngineFlow()
+ * flow.combineTransform(flow2) { request, searchEngine ->
+ * emit("Downloading in progress")
+ * val result = download(request, searchEngine)
+ * emit(result)
+ * }
+ * ```
+ */
+@JvmName("flowCombineTransform")
+@ExperimentalCoroutinesApi
+public fun Flow.combineTransform(
+ flow: Flow,
+ @BuilderInference transform: suspend FlowCollector.(a: T1, b: T2) -> Unit
+): Flow = safeFlow {
+ combineTransformInternal(this@combineTransform, flow) { a, b ->
+ transform(a, b)
+ }
+}
+
+/**
+ * Returns a [Flow] whose values are generated by [transform] function that process the most recently emitted values by each flow.
+ *
+ * The receiver of the [transform] is [FlowCollector] and thus `transform` is a
+ * generic function that may transform emitted element, skip it or emit it multiple times.
+ *
+ * Its usage can be demonstrated with the following example:
+ * ```
+ * val flow = requestFlow()
+ * val flow2 = searchEngineFlow()
+ * combineTransform(flow, flow2) { request, searchEngine ->
+ * emit("Downloading in progress")
+ * val result = download(request, searchEngine)
+ * emit(result)
+ * }
+ * ```
+ */
+@ExperimentalCoroutinesApi
+public fun combineTransform(
+ flow: Flow,
+ flow2: Flow,
+ @BuilderInference transform: suspend FlowCollector.(a: T1, b: T2) -> Unit
+): Flow = combineTransform(flow, flow2, transform)
+
+/**
+ * Returns a [Flow] whose values are generated with [transform] function by combining
+ * the most recently emitted values by each flow.
+ */
+@ExperimentalCoroutinesApi
+public inline fun combine(
+ flow: Flow,
+ flow2: Flow,
+ flow3: Flow,
+ @BuilderInference crossinline transform: suspend (T1, T2, T3) -> R
+): Flow = combine(flow, flow2, flow3) { args: Array<*> ->
+ transform(
+ args[0] as T1,
+ args[1] as T2,
+ args[2] as T3
+ )
+}
+
+/**
+ * Returns a [Flow] whose values are generated by [transform] function that process the most recently emitted values by each flow.
+ *
+ * The receiver of the [transform] is [FlowCollector] and thus `transform` is a
+ * generic function that may transform emitted element, skip it or emit it multiple times.
+ */
+@ExperimentalCoroutinesApi
+public inline fun combineTransform(
+ flow: Flow,
+ flow2: Flow,
+ flow3: Flow,
+ @BuilderInference crossinline transform: suspend FlowCollector.(T1, T2, T3) -> Unit
+): Flow = combineTransform(flow, flow2, flow3) { args: Array<*> ->
transform(
args[0] as T1,
args[1] as T2,
@@ -97,13 +150,36 @@ public inline fun Flow.combineLatest(
* Returns a [Flow] whose values are generated with [transform] function by combining
* the most recently emitted values by each flow.
*/
-@FlowPreview
-public inline fun Flow.combineLatest(
- other: Flow,
- other2: Flow,
- other3: Flow,
+@ExperimentalCoroutinesApi
+public inline fun combine(
+ flow: Flow,
+ flow2: Flow,
+ flow3: Flow,
+ flow4: Flow,
crossinline transform: suspend (T1, T2, T3, T4) -> R
-): Flow = (this as Flow<*>).combineLatest(other, other2, other3) { args: Array<*> ->
+): Flow = combine(flow, flow2, flow3, flow4) { args: Array<*> ->
+ transform(
+ args[0] as T1,
+ args[1] as T2,
+ args[2] as T3,
+ args[3] as T4
+ )
+}
+
+/**
+ * Returns a [Flow] whose values are generated by [transform] function that process the most recently emitted values by each flow.
+ *
+ * The receiver of the [transform] is [FlowCollector] and thus `transform` is a
+ * generic function that may transform emitted element, skip it or emit it multiple times.
+ */
+@ExperimentalCoroutinesApi
+public inline fun combineTransform(
+ flow: Flow,
+ flow2: Flow,
+ flow3: Flow,
+ flow4: Flow,
+ @BuilderInference crossinline transform: suspend FlowCollector.(T1, T2, T3, T4) -> Unit
+): Flow = combineTransform(flow, flow2, flow3, flow4) { args: Array<*> ->
transform(
args[0] as T1,
args[1] as T2,
@@ -116,14 +192,15 @@ public inline fun Flow.combineLatest(
* Returns a [Flow] whose values are generated with [transform] function by combining
* the most recently emitted values by each flow.
*/
-@FlowPreview
-public inline fun Flow.combineLatest(
- other: Flow,
- other2: Flow,
- other3: Flow,
- other4: Flow,
+@ExperimentalCoroutinesApi
+public inline fun combine(
+ flow: Flow,
+ flow2: Flow,
+ flow3: Flow,
+ flow4: Flow,
+ flow5: Flow,
crossinline transform: suspend (T1, T2, T3, T4, T5) -> R
-): Flow = (this as Flow<*>).combineLatest(other, other2, other3, other4) { args: Array<*> ->
+): Flow = combine(flow, flow2, flow3, flow4, flow5) { args: Array<*> ->
transform(
args[0] as T1,
args[1] as T2,
@@ -134,67 +211,89 @@ public inline fun Flow.combineLatest(
}
/**
- * Returns a [Flow] whose values are generated with [transform] function by combining
- * the most recently emitted values by each flow.
+ * Returns a [Flow] whose values are generated by [transform] function that process the most recently emitted values by each flow.
+ *
+ * The receiver of the [transform] is [FlowCollector] and thus `transform` is a
+ * generic function that may transform emitted element, skip it or emit it multiple times.
*/
-@FlowPreview
-public inline fun Flow.combineLatest(vararg others: Flow