diff --git a/CHANGES.md b/CHANGES.md
index 4d9509982f..0d9b19e14f 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -1,5 +1,21 @@
# Change log for kotlinx.coroutines
+## Version 1.2.0
+
+ * Kotlin updated to 1.3.30.
+ * New API: `CancellableContinuation.resume` with `onCancelling` lambda (#1044) to consistently handle closeable resources.
+ * Play services task version updated to 16.0.1.
+ * `ReceiveChannel.isEmpty` is no longer deprecated
+
+A lot of `Flow` improvements:
+ * Purity property is renamed to context preservation and became more restrictive.
+ * `zip` and `combineLatest` operators.
+ * Integration with RxJava2
+ * `flatMap`, `merge` and `concatenate` are replaced with `flattenConcat`, `flattenMerge`, `flatMapConcat` and `flatMapMerge`.
+ * Various documentation improvements and minor bug fixes.
+
+Note that `Flow` **is not** leaving its [preview status](/docs/compatibility.md#flow-preview-api).
+
## Version 1.2.0-alpha-2
This release contains major [feature preview](/docs/compatibility.md#flow-preview-api): cold streams aka `Flow` (#254).
diff --git a/README.md b/README.md
index d07ab3446d..cfb92b54bf 100644
--- a/README.md
+++ b/README.md
@@ -2,10 +2,10 @@
[](https://confluence.jetbrains.com/display/ALL/JetBrains+on+GitHub)
[](https://www.apache.org/licenses/LICENSE-2.0)
-[ ](https://bintray.com/kotlin/kotlinx/kotlinx.coroutines/1.2.0-alpha-2)
+[ ](https://bintray.com/kotlin/kotlinx/kotlinx.coroutines/1.2.0)
Library support for Kotlin coroutines with [multiplatform](#multiplatform) support.
-This is a companion version for Kotlin `1.3.21` release.
+This is a companion version for Kotlin `1.3.30` release.
```kotlin
GlobalScope.launch {
@@ -75,7 +75,7 @@ Add dependencies (you can also add other modules that you need):
org.jetbrains.kotlinx
kotlinx-coroutines-core
- 1.2.0-alpha-2
+ 1.2.0
```
@@ -83,7 +83,7 @@ And make sure that you use the latest Kotlin version:
```xml
- 1.3.21
+ 1.3.30
```
@@ -93,7 +93,7 @@ Add dependencies (you can also add other modules that you need):
```groovy
dependencies {
- implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.2.0-alpha-2'
+ implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.2.0'
}
```
@@ -101,7 +101,7 @@ And make sure that you use the latest Kotlin version:
```groovy
buildscript {
- ext.kotlin_version = '1.3.21'
+ ext.kotlin_version = '1.3.30'
}
```
@@ -119,7 +119,7 @@ Add dependencies (you can also add other modules that you need):
```groovy
dependencies {
- implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.2.0-alpha-2")
+ implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.2.0")
}
```
@@ -127,7 +127,7 @@ And make sure that you use the latest Kotlin version:
```groovy
plugins {
- kotlin("jvm") version "1.3.21"
+ kotlin("jvm") version "1.3.30"
}
```
@@ -147,7 +147,7 @@ Add [`kotlinx-coroutines-android`](ui/kotlinx-coroutines-android)
module as dependency when using `kotlinx.coroutines` on Android:
```groovy
-implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:1.2.0-alpha-2'
+implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:1.2.0'
```
This gives you access to Android [Dispatchers.Main](https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-android/kotlinx.coroutines.android/kotlinx.coroutines.-dispatchers/index.html)
coroutine dispatcher and also makes sure that in case of crashed coroutine with unhandled exception this
diff --git a/RELEASE.md b/RELEASE.md
index 3ef2f7cff8..653bda9253 100644
--- a/RELEASE.md
+++ b/RELEASE.md
@@ -63,14 +63,18 @@ To release new `` of `kotlinx-coroutines`:
5. Announce new release in [Slack](https://kotlinlang.slack.com)
-6. Switch into `develop` branch:
+6. Create a ticket to update coroutines version on [try.kotlinlang.org](try.kotlinlang.org).
+ * Use [KT-30870](https://youtrack.jetbrains.com/issue/KT-30870) as a template
+ * This step should be skipped for eap versions that are not merged to `master`
+
+7. Switch into `develop` branch:
`git checkout develop`
-7. Fetch the latest `master`:
+8. Fetch the latest `master`:
`git fetch`
-8. Merge release from `master`:
+9. Merge release from `master`:
`git merge origin/master`
-9. Push updates to `develop`:
+10. Push updates to `develop`:
`git push`
diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt
index f35ef54158..8379ae1551 100644
--- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt
+++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt
@@ -40,6 +40,7 @@ public abstract interface class kotlinx/coroutines/CancellableContinuation : kot
public abstract fun isActive ()Z
public abstract fun isCancelled ()Z
public abstract fun isCompleted ()Z
+ public abstract fun resume (Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)V
public abstract fun resumeUndispatched (Lkotlinx/coroutines/CoroutineDispatcher;Ljava/lang/Object;)V
public abstract fun resumeUndispatchedWithException (Lkotlinx/coroutines/CoroutineDispatcher;Ljava/lang/Throwable;)V
public abstract fun tryResume (Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;
@@ -58,20 +59,18 @@ public class kotlinx/coroutines/CancellableContinuationImpl : kotlin/coroutines/
public fun getCallerFrame ()Lkotlin/coroutines/jvm/internal/CoroutineStackFrame;
public fun getContext ()Lkotlin/coroutines/CoroutineContext;
public fun getContinuationCancellationCause (Lkotlinx/coroutines/Job;)Ljava/lang/Throwable;
- public final fun getDelegate ()Lkotlin/coroutines/Continuation;
public final fun getResult ()Ljava/lang/Object;
public fun getStackTraceElement ()Ljava/lang/StackTraceElement;
- public fun getSuccessfulResult (Ljava/lang/Object;)Ljava/lang/Object;
public synthetic fun initCancellability ()V
public fun invokeOnCancellation (Lkotlin/jvm/functions/Function1;)V
public fun isActive ()Z
public fun isCancelled ()Z
public fun isCompleted ()Z
protected fun nameString ()Ljava/lang/String;
+ public fun resume (Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)V
public fun resumeUndispatched (Lkotlinx/coroutines/CoroutineDispatcher;Ljava/lang/Object;)V
public fun resumeUndispatchedWithException (Lkotlinx/coroutines/CoroutineDispatcher;Ljava/lang/Throwable;)V
public fun resumeWith (Ljava/lang/Object;)V
- public fun takeState ()Ljava/lang/Object;
public fun toString ()Ljava/lang/String;
public fun tryResume (Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;
public fun tryResumeWithException (Ljava/lang/Throwable;)Ljava/lang/Object;
@@ -794,8 +793,7 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static final fun broadcastIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;ILkotlinx/coroutines/CoroutineStart;)Lkotlinx/coroutines/channels/BroadcastChannel;
public static synthetic fun broadcastIn$default (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;ILkotlinx/coroutines/CoroutineStart;ILjava/lang/Object;)Lkotlinx/coroutines/channels/BroadcastChannel;
public static final fun collect (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
- public static final fun concatenate (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
- public static final fun concatenate (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
+ public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
public static final fun count (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun count (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun delayEach (Lkotlinx/coroutines/flow/Flow;J)Lkotlinx/coroutines/flow/Flow;
@@ -808,8 +806,12 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static final fun filter (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun filterNot (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun filterNotNull (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
- public static final fun flatMap (Lkotlinx/coroutines/flow/Flow;IILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
- public static synthetic fun flatMap$default (Lkotlinx/coroutines/flow/Flow;IILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
+ public static final fun flatMapConcat (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
+ public static final fun flatMapMerge (Lkotlinx/coroutines/flow/Flow;IILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
+ public static synthetic fun flatMapMerge$default (Lkotlinx/coroutines/flow/Flow;IILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
+ public static final fun flattenConcat (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
+ public static final fun flattenMerge (Lkotlinx/coroutines/flow/Flow;II)Lkotlinx/coroutines/flow/Flow;
+ public static synthetic fun flattenMerge$default (Lkotlinx/coroutines/flow/Flow;IIILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun flow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun flowOf ([Ljava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun flowOn (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;I)Lkotlinx/coroutines/flow/Flow;
@@ -821,8 +823,6 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static final fun fold (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun map (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun mapNotNull (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
- public static final fun merge (Ljava/lang/Iterable;II)Lkotlinx/coroutines/flow/Flow;
- public static synthetic fun merge$default (Ljava/lang/Iterable;IIILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun onEach (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun onErrorCollect (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun onErrorCollect$default (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
@@ -844,14 +844,17 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static synthetic fun toSet$default (Lkotlinx/coroutines/flow/Flow;Ljava/util/Set;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public static final fun transform (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
public static final fun unsafeFlow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
+ public static final fun zip (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
}
public final class kotlinx/coroutines/flow/MigrationKt {
public static final fun BehaviourSubject ()Ljava/lang/Object;
public static final fun PublishSubject ()Ljava/lang/Object;
public static final fun ReplaySubject ()Ljava/lang/Object;
- public static final fun concat (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
public static final fun concatMap (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
+ public static final fun flatMap (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
+ public static final fun flatten (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
+ public static final fun merge (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
public static final fun observeOn (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/flow/Flow;
public static final fun onErrorResume (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
public static final fun publishOn (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/flow/Flow;
@@ -863,7 +866,7 @@ public final class kotlinx/coroutines/flow/MigrationKt {
}
public final class kotlinx/coroutines/flow/internal/SafeCollector : kotlinx/coroutines/flow/FlowCollector {
- public fun (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/ContinuationInterceptor;)V
+ public fun (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/CoroutineContext;)V
public fun emit (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}
diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-rx2.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-rx2.txt
index 791690cd22..3a99ada4a1 100644
--- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-rx2.txt
+++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-rx2.txt
@@ -28,6 +28,8 @@ public final class kotlinx/coroutines/rx2/RxConvertKt {
public static final fun asMaybe (Lkotlinx/coroutines/Deferred;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Maybe;
public static final fun asObservable (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Observable;
public static final fun asSingle (Lkotlinx/coroutines/Deferred;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Single;
+ public static final fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/Flowable;
+ public static final fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/Observable;
}
public final class kotlinx/coroutines/rx2/RxFlowableKt {
diff --git a/gradle.properties b/gradle.properties
index 745fb5befc..bec0c8a6a6 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -1,17 +1,18 @@
# Kotlin
-version=1.2.0-alpha-2-SNAPSHOT
+version=1.2.0-SNAPSHOT
group=org.jetbrains.kotlinx
-kotlin_version=1.3.21
+kotlin_version=1.3.30
# Dependencies
junit_version=4.12
-atomicFU_version=0.12.2
+atomicFU_version=0.12.3
html_version=0.6.8
lincheck_version=2.0
dokka_version=0.9.16-rdev-2-mpp-hacks
bintray_version=1.8.4-jetbrains-5
byte_buddy_version=1.9.3
reactor_vesion=3.2.5.RELEASE
+reactive_streams_version=1.0.2
artifactory_plugin_version=4.7.3
# JS
diff --git a/integration/kotlinx-coroutines-play-services/build.gradle b/integration/kotlinx-coroutines-play-services/build.gradle
index 51cce3f278..61201faeb7 100644
--- a/integration/kotlinx-coroutines-play-services/build.gradle
+++ b/integration/kotlinx-coroutines-play-services/build.gradle
@@ -7,7 +7,7 @@ import java.util.zip.ZipFile
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
-ext.tasks_version = '15.0.1'
+ext.tasks_version = '16.0.1'
def attr = Attribute.of("artifactType", String.class)
configurations {
diff --git a/integration/kotlinx-coroutines-play-services/src/Tasks.kt b/integration/kotlinx-coroutines-play-services/src/Tasks.kt
index 14fd961f82..4952daa7c4 100644
--- a/integration/kotlinx-coroutines-play-services/src/Tasks.kt
+++ b/integration/kotlinx-coroutines-play-services/src/Tasks.kt
@@ -50,7 +50,8 @@ public fun Task.asDeferred(): Deferred {
if (isComplete) {
val e = exception
return if (e == null) {
- CompletableDeferred().apply { if (isCanceled) cancel() else complete(result) }
+ @Suppress("UNCHECKED_CAST")
+ CompletableDeferred().apply { if (isCanceled) cancel() else complete(result as T) }
} else {
CompletableDeferred().apply { completeExceptionally(e) }
}
@@ -60,7 +61,8 @@ public fun Task.asDeferred(): Deferred {
addOnCompleteListener {
val e = it.exception
if (e == null) {
- if (isCanceled) result.cancel() else result.complete(it.result)
+ @Suppress("UNCHECKED_CAST")
+ if (isCanceled) result.cancel() else result.complete(it.result as T)
} else {
result.completeExceptionally(e)
}
@@ -83,7 +85,8 @@ public suspend fun Task.await(): T {
if (isCanceled) {
throw CancellationException("Task $this was cancelled normally.")
} else {
- result
+ @Suppress("UNCHECKED_CAST")
+ result as T
}
} else {
throw e
@@ -94,7 +97,8 @@ public suspend fun Task.await(): T {
addOnCompleteListener {
val e = exception
if (e == null) {
- if (isCanceled) cont.cancel() else cont.resume(result)
+ @Suppress("UNCHECKED_CAST")
+ if (isCanceled) cont.cancel() else cont.resume(result as T)
} else {
cont.resumeWithException(e)
}
diff --git a/integration/kotlinx-coroutines-play-services/test/TaskTest.kt b/integration/kotlinx-coroutines-play-services/test/TaskTest.kt
index 15c302bb1f..b87a295449 100644
--- a/integration/kotlinx-coroutines-play-services/test/TaskTest.kt
+++ b/integration/kotlinx-coroutines-play-services/test/TaskTest.kt
@@ -92,6 +92,11 @@ class TaskTest : TestBase() {
assertEquals(42, deferred.await())
}
+ @Test
+ fun testNullResultTaskAsDeferred() = runTest {
+ assertNull(Tasks.forResult(null).asDeferred().await())
+ }
+
@Test
fun testCancelledTaskAsDeferred() = runTest {
val deferred = Tasks.forCanceled().asDeferred()
diff --git a/kotlinx-coroutines-core/README.md b/kotlinx-coroutines-core/README.md
index c1113e165e..f930794c87 100644
--- a/kotlinx-coroutines-core/README.md
+++ b/kotlinx-coroutines-core/README.md
@@ -70,7 +70,11 @@ Synchronization primitives (mutex).
# Package kotlinx.coroutines.channels
-Channels -- non-blocking primitives for communicating a stream of elements between coroutines.
+Channels — non-blocking primitives for communicating a stream of elements between coroutines.
+
+# Package kotlinx.coroutines.flow
+
+Flow — asynchronous cold stream of elements.
# Package kotlinx.coroutines.selects
diff --git a/kotlinx-coroutines-core/common/src/CancellableContinuation.kt b/kotlinx-coroutines-core/common/src/CancellableContinuation.kt
index 4fffcd906f..139ef0403d 100644
--- a/kotlinx-coroutines-core/common/src/CancellableContinuation.kt
+++ b/kotlinx-coroutines-core/common/src/CancellableContinuation.kt
@@ -123,10 +123,16 @@ public interface CancellableContinuation : Continuation {
* with cancellation exception. Otherwise, the handler will be invoked once on cancellation if this
* continuation is cancelled.
*
- * Installed [handler] should not throw any exceptions. If it does, they will get caught,
- * wrapped into [CompletionHandlerException], and rethrown, potentially causing the crash of unrelated code.
+ * Installed [handler] should not throw any exceptions.
+ * If it does, they will get caught, wrapped into [CompletionHandlerException] and
+ * processed as uncaught exception in the context of the current coroutine
+ * (see [CoroutineExceptionHandler]).
*
* At most one [handler] can be installed on one continuation.
+ *
+ * **Note**: Implementation of `CompletionHandler` must be fast, non-blocking, and thread-safe.
+ * This handler can be invoked concurrently with the surrounding code.
+ * There is no guarantee on the execution context in which the [handler] is invoked.
*/
public fun invokeOnCancellation(handler: CompletionHandler)
@@ -151,6 +157,34 @@ public interface CancellableContinuation : Continuation {
*/
@ExperimentalCoroutinesApi
public fun CoroutineDispatcher.resumeUndispatchedWithException(exception: Throwable)
+
+ /**
+ * Resumes this continuation with a given [value] and calls the specified [onCancellation]
+ * handler when resumed too late (when continuation was already cancelled) or when resumed
+ * successfully (before cancellation), but coroutine's job was cancelled before it had a
+ * chance to run in its dispatcher, so that suspended function threw an exception
+ * instead of returning this value.
+ *
+ * Installed [onCancellation] handler should not throw any exceptions.
+ * If it does, they will get caught, wrapped into [CompletionHandlerException] and
+ * processed as uncaught exception in the context of the current coroutine
+ * (see [CoroutineExceptionHandler]).
+ *
+ * This function shall be used when resuming with a resource that must be closed by the
+ * code that had called the corresponding suspending function, e.g.:
+ *
+ * ```
+ * continuation.resume(resource) {
+ * resource.close()
+ * }
+ * ```
+ *
+ * **Note**: Implementation of [onCancellation] handler must be fast, non-blocking, and thread-safe.
+ * This handler can be invoked concurrently with the surrounding code.
+ * There is no guarantee on the execution context in which the [onCancellation] handler is invoked.
+ */
+ @ExperimentalCoroutinesApi // since 1.2.0, tentatively graduates in 1.3.0
+ public fun resume(value: T, onCancellation: (cause: Throwable) -> Unit)
}
/**
diff --git a/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt b/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt
index 813276f8ed..1686330c5a 100644
--- a/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt
+++ b/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt
@@ -19,7 +19,7 @@ private const val RESUMED = 2
*/
@PublishedApi
internal open class CancellableContinuationImpl(
- public final override val delegate: Continuation,
+ final override val delegate: Continuation,
resumeMode: Int
) : DispatchedTask(resumeMode), CancellableContinuation, CoroutineStackFrame {
public override val context: CoroutineContext = delegate.context
@@ -102,6 +102,14 @@ internal open class CancellableContinuationImpl(
override fun takeState(): Any? = state
+ override fun cancelResult(state: Any?, cause: Throwable) {
+ if (state is CompletedWithCancellation) {
+ invokeHandlerSafely {
+ state.onCancellation(cause)
+ }
+ }
+ }
+
public override fun cancel(cause: Throwable?): Boolean {
_state.loop { state ->
if (state !is NotCompleted) return false // false if already complete or cancelling
@@ -165,8 +173,19 @@ internal open class CancellableContinuationImpl(
return getSuccessfulResult(state)
}
- override fun resumeWith(result: Result) =
+ override fun resumeWith(result: Result) {
resumeImpl(result.toState(), resumeMode)
+ }
+
+ override fun resume(value: T, onCancellation: (cause: Throwable) -> Unit) {
+ val cancelled = resumeImpl(CompletedWithCancellation(value, onCancellation), resumeMode)
+ if (cancelled != null) {
+ // too late to resume (was cancelled) -- call handler
+ invokeHandlerSafely {
+ onCancellation(cancelled.cause)
+ }
+ }
+ }
internal fun resumeWithExceptionMode(exception: Throwable, mode: Int) =
resumeImpl(CompletedExceptionally(exception), mode)
@@ -219,14 +238,15 @@ internal open class CancellableContinuationImpl(
dispatch(mode)
}
- private fun resumeImpl(proposedUpdate: Any?, resumeMode: Int) {
+ // returns null when successfully dispatched resumed, CancelledContinuation if too late (was already cancelled)
+ private fun resumeImpl(proposedUpdate: Any?, resumeMode: Int): CancelledContinuation? {
_state.loop { state ->
when (state) {
is NotCompleted -> {
if (!_state.compareAndSet(state, proposedUpdate)) return@loop // retry on cas failure
disposeParentHandle()
dispatchResume(resumeMode)
- return
+ return null
}
is CancelledContinuation -> {
/*
@@ -234,7 +254,7 @@ internal open class CancellableContinuationImpl(
* because cancellation is asynchronous and may race with resume.
* Racy exceptions will be lost, too.
*/
- if (state.makeResumed()) return // ok -- resumed just once
+ if (state.makeResumed()) return state // tried to resume just once, but was cancelled
}
}
alreadyResumedError(proposedUpdate) // otherwise -- an error (second resume attempt)
@@ -307,7 +327,11 @@ internal open class CancellableContinuationImpl(
@Suppress("UNCHECKED_CAST")
override fun getSuccessfulResult(state: Any?): T =
- if (state is CompletedIdempotentResult) state.result as T else state as T
+ when (state) {
+ is CompletedIdempotentResult -> state.result as T
+ is CompletedWithCancellation -> state.result as T
+ else -> state as T
+ }
// For nicer debugging
public override fun toString(): String =
@@ -344,3 +368,11 @@ private class CompletedIdempotentResult(
) {
override fun toString(): String = "CompletedIdempotentResult[$result]"
}
+
+private class CompletedWithCancellation(
+ @JvmField val result: Any?,
+ @JvmField val onCancellation: (cause: Throwable) -> Unit
+) {
+ override fun toString(): String = "CompletedWithCancellation[$result]"
+}
+
diff --git a/kotlinx-coroutines-core/common/src/CoroutineContext.common.kt b/kotlinx-coroutines-core/common/src/CoroutineContext.common.kt
index 785e8a7691..ae9cb73201 100644
--- a/kotlinx-coroutines-core/common/src/CoroutineContext.common.kt
+++ b/kotlinx-coroutines-core/common/src/CoroutineContext.common.kt
@@ -20,4 +20,5 @@ internal expect val DefaultDelay: Delay
// countOrElement -- pre-cached value for ThreadContext.kt
internal expect inline fun withCoroutineContext(context: CoroutineContext, countOrElement: Any?, block: () -> T): T
internal expect fun Continuation<*>.toDebugString(): String
-internal expect val CoroutineContext.coroutineName: String?
\ No newline at end of file
+internal expect val CoroutineContext.coroutineName: String?
+internal expect fun CoroutineContext.minusId(): CoroutineContext
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/common/src/Dispatched.kt b/kotlinx-coroutines-core/common/src/Dispatched.kt
index f656b22b84..ffccf21368 100644
--- a/kotlinx-coroutines-core/common/src/Dispatched.kt
+++ b/kotlinx-coroutines-core/common/src/Dispatched.kt
@@ -203,15 +203,17 @@ internal fun Continuation.resumeDirectWithException(exception: Throwable)
internal abstract class DispatchedTask(
@JvmField public var resumeMode: Int
) : SchedulerTask() {
- public abstract val delegate: Continuation
+ internal abstract val delegate: Continuation
- public abstract fun takeState(): Any?
+ internal abstract fun takeState(): Any?
+
+ internal open fun cancelResult(state: Any?, cause: Throwable) {}
@Suppress("UNCHECKED_CAST")
- public open fun getSuccessfulResult(state: Any?): T =
+ internal open fun getSuccessfulResult(state: Any?): T =
state as T
- public fun getExceptionalResult(state: Any?): Throwable? =
+ internal fun getExceptionalResult(state: Any?): Throwable? =
(state as? CompletedExceptionally)?.cause
public final override fun run() {
@@ -224,9 +226,11 @@ internal abstract class DispatchedTask(
val job = if (resumeMode.isCancellableMode) context[Job] else null
val state = takeState() // NOTE: Must take state in any case, even if cancelled
withCoroutineContext(context, delegate.countOrElement) {
- if (job != null && !job.isActive)
- continuation.resumeWithException(job.getCancellationException())
- else {
+ if (job != null && !job.isActive) {
+ val cause = job.getCancellationException()
+ cancelResult(state, cause)
+ continuation.resumeWithException(cause)
+ } else {
val exception = getExceptionalResult(state)
if (exception != null)
continuation.resumeWithStackTrace(exception)
diff --git a/kotlinx-coroutines-core/common/src/Job.kt b/kotlinx-coroutines-core/common/src/Job.kt
index df615156f4..e0e34590e0 100644
--- a/kotlinx-coroutines-core/common/src/Job.kt
+++ b/kotlinx-coroutines-core/common/src/Job.kt
@@ -273,7 +273,9 @@ public interface Job : CoroutineContext.Element {
* Installed [handler] should not throw any exceptions. If it does, they will get caught,
* wrapped into [CompletionHandlerException], and rethrown, potentially causing crash of unrelated code.
*
- * **Note**: Implementations of `CompletionHandler` must be fast and _lock-free_.
+ * **Note**: Implementation of `CompletionHandler` must be fast, non-blocking, and thread-safe.
+ * This handler can be invoked concurrently with the surrounding code.
+ * There is no guarantee on the execution context in which the [handler] is invoked.
*/
public fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle
@@ -304,7 +306,9 @@ public interface Job : CoroutineContext.Element {
* **Note**: This function is a part of internal machinery that supports parent-child hierarchies
* and allows for implementation of suspending functions that wait on the Job's state.
* This function should not be used in general application code.
- * Implementations of `CompletionHandler` must be fast and _lock-free_.
+ * Implementation of `CompletionHandler` must be fast, non-blocking, and thread-safe.
+ * This handler can be invoked concurrently with the surrounding code.
+ * There is no guarantee on the execution context in which the [handler] is invoked.
*
* @param onCancelling when `true`, then the [handler] is invoked as soon as this job transitions to _cancelling_ state;
* when `false` then the [handler] is invoked only when it transitions to _completed_ state.
diff --git a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
index 38f7c3bf16..05bfbca98d 100644
--- a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
+++ b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
@@ -177,6 +177,14 @@ internal abstract class AbstractSendChannel : SendChannel {
return sendSuspend(element)
}
+ internal suspend fun sendFair(element: E) {
+ if (offer(element)) {
+ yield() // Works only on fast path to properly work in sequential use-cases
+ return
+ }
+ return sendSuspend(element)
+ }
+
public final override fun offer(element: E): Boolean {
val result = offerInternal(element)
return when {
@@ -562,8 +570,7 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel : AbstractSendChannel(), Channel registerSelectReceive(select: SelectInstance, block: suspend (E) -> R) {
while (true) {
if (select.isSelected) return
- if (empty) {
+ if (isEmpty) {
val enqueueOp = TryEnqueueReceiveDesc(select, block as (suspend (E?) -> R), nullOnClose = false)
val enqueueResult = select.performAtomicIfNotSelected(enqueueOp) ?: return
when {
@@ -784,7 +791,7 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel registerSelectReceiveOrNull(select: SelectInstance, block: suspend (E?) -> R) {
while (true) {
if (select.isSelected) return
- if (empty) {
+ if (isEmpty) {
val enqueueOp = TryEnqueueReceiveDesc(select, block, nullOnClose = true)
val enqueueResult = select.performAtomicIfNotSelected(enqueueOp) ?: return
when {
diff --git a/kotlinx-coroutines-core/common/src/channels/Channel.kt b/kotlinx-coroutines-core/common/src/channels/Channel.kt
index 286196dc7b..212ed5554d 100644
--- a/kotlinx-coroutines-core/common/src/channels/Channel.kt
+++ b/kotlinx-coroutines-core/common/src/channels/Channel.kt
@@ -151,11 +151,8 @@ public interface ReceiveChannel {
/**
* Returns `true` if the channel is empty (contains no elements) and the [receive] attempt will suspend.
* This function returns `false` for [isClosedForReceive] channel.
- *
- * @suppress **Will be removed in next releases, no replacement.**
*/
@ExperimentalCoroutinesApi
- @Deprecated(level = DeprecationLevel.ERROR, message = "Will be removed in next releases without replacement")
public val isEmpty: Boolean
/**
diff --git a/kotlinx-coroutines-core/common/src/flow/Builders.kt b/kotlinx-coroutines-core/common/src/flow/Builders.kt
index c495d6e671..6fbd1400ee 100644
--- a/kotlinx-coroutines-core/common/src/flow/Builders.kt
+++ b/kotlinx-coroutines-core/common/src/flow/Builders.kt
@@ -19,35 +19,35 @@ import kotlin.jvm.*
* Example of usage:
* ```
* fun fibonacci(): Flow = flow {
- * emit(1L)
- * var f1 = 1L
- * var f2 = 1L
- * repeat(100) {
- * var tmp = f1
- * f1 = f2
- * f2 += tmp
- * emit(f1)
- * }
+ * emit(1L)
+ * var f1 = 1L
+ * var f2 = 1L
+ * repeat(100) {
+ * var tmp = f1
+ * f1 = f2
+ * f2 += tmp
+ * emit(f1)
+ * }
* }
* ```
*
- * `emit` should happen strictly in the dispatchers of the [block] in order to preserve flow purity.
+ * `emit` should happen strictly in the dispatchers of the [block] in order to preserve flow context.
* For example, the following code will produce [IllegalStateException]:
* ```
* flow {
- * emit(1) // Ok
- * withContext(Dispatcher.IO) {
- * emit(2) // Will fail with ISE
- * }
+ * emit(1) // Ok
+ * withContext(Dispatcher.IO) {
+ * emit(2) // Will fail with ISE
+ * }
* }
* ```
* If you want to switch the context where this flow is executed use [flowOn] operator.
*/
@FlowPreview
-public fun flow(@BuilderInference block: suspend FlowCollector.() -> Unit): Flow {
+public fun flow(@BuilderInference block: suspend FlowCollector.() -> Unit): Flow {
return object : Flow {
- override suspend fun collect(collector: FlowCollector) {
- SafeCollector(collector, coroutineContext[ContinuationInterceptor]).block()
+ override suspend fun collect(collector: FlowCollector) {
+ SafeCollector(collector, coroutineContext).block()
}
}
}
@@ -58,9 +58,9 @@ public fun flow(@BuilderInference block: suspend FlowCollector.() -> U
*/
@FlowPreview
@PublishedApi
-internal fun unsafeFlow(@BuilderInference block: suspend FlowCollector.() -> Unit): Flow {
+internal fun unsafeFlow(@BuilderInference block: suspend FlowCollector.() -> Unit): Flow {
return object : Flow {
- override suspend fun collect(collector: FlowCollector) {
+ override suspend fun collect(collector: FlowCollector) {
collector.block()
}
}
@@ -129,7 +129,7 @@ public fun flowOf(vararg elements: T): Flow = unsafeFlow {
public fun emptyFlow(): Flow = EmptyFlow
private object EmptyFlow : Flow {
- override suspend fun collect(collector: FlowCollector) = Unit
+ override suspend fun collect(collector: FlowCollector) = Unit
}
/**
@@ -183,19 +183,33 @@ public fun LongRange.asFlow(): Flow = flow {
}
/**
- * Creates an instance of the cold [Flow] from a supplied [SendChannel].
+ * Creates an instance of the cold [Flow] with elements that are sent to a [SendChannel]
+ * that is provided to the builder's [block] of code. It allows elements to be
+ * produced by the code that is running in a different context,
+ * e.g. from a callback-based API.
+ *
+ * The resulting flow is _cold_, which means that [block] is called on each call of a terminal operator
+ * on the resulting flow.
*
* To control backpressure, [bufferSize] is used and matches directly the `capacity` parameter of [Channel] factory.
* The provided channel can later be used by any external service to communicate with flow and its buffer determines
* backpressure buffer size or its behaviour (e.g. in case when [Channel.CONFLATED] was used).
*
* Example of usage:
+ *
* ```
- * fun flowFrom(api: CallbackBasedApi): Flow = flowViaChannel { channel ->
- * val adapter = FlowSinkAdapter(channel) // implementation of callback interface
- * api.register(adapter)
+ * fun flowFrom(api: CallbackBasedApi): Flow = flowViaChannel { channel ->
+ * val callback = object : Callback { // implementation of some callback interface
+ * override fun onNextValue(value: T) {
+ * channel.offer(value) // Note: offer drops value when buffer is full
+ * }
+ * override fun onApiError(cause: Throwable) {
+ * channel.cancel("API Error", CancellationException(cause))
+ * }
+ * }
+ * api.register(callback)
* channel.invokeOnClose {
- * api.unregister(adapter)
+ * api.unregister(callback)
* }
* }
* ```
diff --git a/kotlinx-coroutines-core/common/src/flow/Flow.kt b/kotlinx-coroutines-core/common/src/flow/Flow.kt
index 19c9eac233..5b1c6ec965 100644
--- a/kotlinx-coroutines-core/common/src/flow/Flow.kt
+++ b/kotlinx-coroutines-core/common/src/flow/Flow.kt
@@ -7,30 +7,29 @@ package kotlinx.coroutines.flow
import kotlinx.coroutines.*
/**
- * A cold asynchronous stream of the data, that emits from zero to N (where N can be unbounded)
- * values and completes normally or with an exception.
+ * A cold asynchronous stream of the data, that emits from zero to N (where N can be unbounded) values and completes normally or with an exception.
*
- * All transformations on the flow, such as [map] and [filter] do not trigger flow collection or execution, only
- * terminal operators (e.g. [single]) do trigger it.
+ * All transformations on the flow, such as [map] and [filter] do not trigger flow collection or execution, only terminal operators (e.g. [single]) do trigger it.
*
- * Flow can be collected in a suspending manner, without actual blocking using [collect] extension that will complete normally or exceptionally:
+ * Flow can be collected in a suspending manner, without actual blocking, using [collect] extension that will complete normally or exceptionally:
* ```
* try {
- * flow.collect { value ->
- * println("Received $value")
- * }
+ * flow.collect { value ->
+ * println("Received $value")
+ * }
* } catch (e: Exception) {
- * println("Flow has thrown an exception: $e")
+ * println("Flow has thrown an exception: $e")
* }
* ```
* Additionally, the library provides a rich set of terminal operators such as [single], [reduce] and others.
*
* Flow does not carry information whether it is a cold stream (that can be collected multiple times and
- * triggers its evaluation every time collection is executed) or hot one, but conventionally flow represents a cold stream.
- * Transitions between hot and cold streams are support via channels and corresponding API: [flowViaChannel], [broadcastIn], [produceIn].
+ * triggers its evaluation every time [collect] is executed) or a hot one, but conventionally flow represents a cold stream.
+ * Transitions between hot and cold streams are supported via channels and the corresponding API: [flowViaChannel], [broadcastIn], [produceIn].
*
- * Flow is a **pure** concept: it encapsulates its own execution context and never propagates it to the downstream, thus making
+ * Flow has a context preserving property: it encapsulates its own execution context and never propagates or leaks it to the downstream, thus making
* reasoning about execution context of particular transformations or terminal operations trivial.
+ *
* There are two ways of changing the flow's context: [flowOn][Flow.flowOn] and [flowWith][Flow.flowWith].
* The former changes the upstream context ("everything above the flowOn operator") while the latter
* changes the context of the flow within [flowWith] body. For additional information refer to these operators documentation.
@@ -41,10 +40,10 @@ import kotlinx.coroutines.*
* .map { it + 1 } // Will be executed in ctx_1
* .flowOn(ctx_1) // Changes upstream context: flowOf and map
*
- * // Now we have flow that is pure: it is executed somewhere but this information is encapsulated in the flow itself
+ * // Now we have flow that is context-preserving: it is executed somewhere but this information is encapsulated in the flow itself
*
- * val filtered = flow
- * .filter { it == 3 } // Pure operator without a context
+ * val filtered = flow // ctx_1 is inaccessible
+ * .filter { it == 3 } // Pure operator without a context yet
*
* withContext(Dispatchers.Main) {
* // All not encapsulated operators will be executed in Main: filter and single
@@ -53,6 +52,29 @@ import kotlinx.coroutines.*
* }
* ```
*
+ * From the implementation point of view it means that all intermediate operators on [Flow] should use the following constraint:
+ * If one wants to separate collection or emission into multiple coroutines, it should use [coroutineScope] or [supervisorScope] and
+ * is not allowed to modify coroutines context:
+ * ```
+ * fun Flow.buffer(bufferSize: Int): Flow = flow {
+ * coroutineScope { // coroutine scope is necessary, withContext is prohibited
+ * val channel = Channel(bufferSize)
+ * // GlobalScope.launch { is prohibited
+ * // launch(Dispatchers.IO) { is prohibited
+ * launch { // is OK
+ * collect { value ->
+ * channel.send(value)
+ * }
+ * channel.close()
+ * }
+ *
+ * for (i in channel) {
+ * emit(i)
+ * }
+ * }
+ * }
+ * ```
+ *
* Flow is [Reactive Streams](http://www.reactive-streams.org/) compliant, you can safely interop it with reactive streams using [Flow.asPublisher] and [Publisher.asFlow] from
* kotlinx-coroutines-reactive module.
*/
@@ -66,16 +88,22 @@ public interface Flow {
* 1) It should not change the coroutine context (e.g. with `withContext(Dispatchers.IO)`) when emitting values.
* The emission should happen in the context of the [collect] call.
*
- * Only coroutine builders that inherit the context are allowed, for example the following code is fine:
+ * Only coroutine builders that inherit the context are allowed, for example:
* ```
- * coroutineScope { // Context is inherited
- * launch { // Dispatcher is not overridden, fine as well
- * collector.emit(someValue)
+ * class MyFlow : Flow {
+ * override suspend fun collect(collector: FlowCollector) {
+ * coroutineScope {
+ * // Context is inherited
+ * launch { // Dispatcher is not overridden, fine as well
+ * collector.emit(42) // Emit from the launched coroutine
+ * }
+ * }
* }
* }
* ```
+ * is a proper [Flow] implementation, but using `launch(Dispatchers.IO)` is not.
*
* 2) It should serialize calls to [emit][FlowCollector.emit] as [FlowCollector] implementations are not thread safe by default.
*/
- public suspend fun collect(collector: FlowCollector)
+ public suspend fun collect(collector: FlowCollector)
}
diff --git a/kotlinx-coroutines-core/common/src/flow/FlowCollector.kt b/kotlinx-coroutines-core/common/src/flow/FlowCollector.kt
index b7f231fbd5..8c63b9afdb 100644
--- a/kotlinx-coroutines-core/common/src/flow/FlowCollector.kt
+++ b/kotlinx-coroutines-core/common/src/flow/FlowCollector.kt
@@ -14,7 +14,7 @@ import kotlinx.coroutines.*
* Implementations of this interface are not thread-safe.
*/
@FlowPreview
-public interface FlowCollector {
+public interface FlowCollector {
/**
* Collects the value emitted by the upstream.
diff --git a/kotlinx-coroutines-core/common/src/flow/Migration.kt b/kotlinx-coroutines-core/common/src/flow/Migration.kt
index 5ab2dcf738..7570911006 100644
--- a/kotlinx-coroutines-core/common/src/flow/Migration.kt
+++ b/kotlinx-coroutines-core/common/src/flow/Migration.kt
@@ -11,18 +11,98 @@ import kotlin.coroutines.*
* search for their favourite operators and/or patterns that are missing or renamed in Flow.
*/
-/** @suppress **/
-@Deprecated(message = "Use flowWith or flowOn instead", level = DeprecationLevel.ERROR)
-public fun Flow.subscribeOn(context: CoroutineContext): Flow = error("Should not be called")
-
-/** @suppress **/
-@Deprecated(message = "Use flowWith or flowOn instead", level = DeprecationLevel.ERROR)
+/**
+ * `observeOn` has no direct match in [Flow] API because all terminal flow operators are suspending and
+ * thus use the context of the caller.
+ *
+ * For example, the following code:
+ * ```
+ * flowable
+ * .observeOn(Schedulers.io())
+ * .doOnEach { value -> println("Received $value") }
+ * .subscribe()
+ * ```
+ *
+ * has the following Flow equivalent:
+ * ```
+ * withContext(Dispatchers.IO) {
+ * flow.collect { value -> println("Received $value") }
+ * }
+ *
+ * ```
+ * @suppress
+ */
+@Deprecated(message = "Collect flow in the desired context instead", level = DeprecationLevel.ERROR)
public fun Flow.observeOn(context: CoroutineContext): Flow = error("Should not be called")
-/** @suppress **/
-@Deprecated(message = "Use flowWith or flowOn instead", level = DeprecationLevel.ERROR)
+/**
+ * `publishOn` has no direct match in [Flow] API because all terminal flow operators are suspending and
+ * thus use the context of the caller.
+ *
+ * For example, the following code:
+ * ```
+ * flux
+ * .publishOn(Schedulers.io())
+ * .doOnEach { value -> println("Received $value") }
+ * .subscribe()
+ * ```
+ *
+ * has the following Flow equivalent:
+ * ```
+ * withContext(Dispatchers.IO) {
+ * flow.collect { value -> println("Received $value") }
+ * }
+ *
+ * ```
+ * @suppress
+ */
+@Deprecated(message = "Collect flow in the desired context instead", level = DeprecationLevel.ERROR)
public fun Flow.publishOn(context: CoroutineContext): Flow = error("Should not be called")
+/**
+ * `subscribeOn` has no direct match in [Flow] API because [Flow] preserves its context and does not leak it.
+ *
+ * For example, the following code:
+ * ```
+ * flowable
+ * .map { value -> println("Doing map in IO"); value }
+ * .subscribeOn(Schedulers.io())
+ * .observeOn(Schedulers.computation())
+ * .doOnEach { value -> println("Processing $value in computation")
+ * .subscribe()
+ * ```
+ * has the following Flow equivalents:
+ * ```
+ * withContext(Dispatchers.Default) {
+ * flow
+ * .map { value -> println("Doing map in IO"); value }
+ * .flowOn(Dispatchers.IO) // Works upstream, doesn't change downstream
+ * .collect { value ->
+ * println("Processing $value in computation")
+ * }
+ * }
+ * ```
+ * or
+ *
+ * ```
+ * withContext(Dispatchers.Default) {
+ * flow
+ * .flowWith(Dispatchers.IO) { map { value -> println("Doing map in IO"); value } }
+ * .collect { value ->
+ * println("Processing $value in computation")
+ * }
+ * }
+ * ```
+ *
+ * The difference is that [flowWith] encapsulates ("preserves") the context within its lambda
+ * while [flowOn] changes the context of all preceding operators.
+ * Opposed to subscribeOn, it it **possible** to use multiple `flowOn` operators in the one flow.
+ *
+ * @suppress
+ */
+@Deprecated(message = "Use flowWith or flowOn instead", level = DeprecationLevel.ERROR)
+public fun Flow.subscribeOn(context: CoroutineContext): Flow = error("Should not be called")
+
/** @suppress **/
@Deprecated(message = "Use BroadcastChannel.asFlow()", level = DeprecationLevel.ERROR)
public fun BehaviourSubject(): Any = error("Should not be called")
@@ -45,14 +125,40 @@ public fun PublishSubject(): Any = error("Should not be called")
)
public fun Flow.onErrorResume(fallback: Flow): Flow = error("Should not be called")
-
-/** @suppress **/
+/**
+ * Self-explanatory, the reason of deprecation is "context preservation" property (you can read more in [Flow] documentation)
+ * @suppress
+ **/
@Suppress("UNUSED_PARAMETER", "UNUSED", "DeprecatedCallableAddReplaceWith")
@Deprecated(message = "withContext in flow body is deprecated, use flowOn instead", level = DeprecationLevel.ERROR)
public fun FlowCollector.withContext(context: CoroutineContext, block: suspend () -> R): Unit = error("Should not be called")
-/** @suppress **/
+/**
+ * `subscribe` is Rx-specific API that has no direct match in flows.
+ * One can use `launch` instead, for example the following:
+ * ```
+ * flowable
+ * .observeOn(Schedulers.io())
+ * .subscribe({ println("Received $it") }, { println("Exception $it happened") }, { println("Flowable is completed successfully") }
+ * ```
+ *
+ * has the following Flow equivalent:
+ * ```
+ * launch(Dispatchers.IO) {
+ * try {
+ * flow.collect { value ->
+ * println("Received $value")
+ * }
+ * println("Flow is completed successfully")
+ * } catch (e: Throwable) {
+ * println("Exception $e happened")
+ * }
+ * }
+ * ```
+ * But most of the time it is better to use terminal operators like [single] instead of [collect].
+ * @suppress
+ */
@Deprecated(message = "Use launch + collect instead", level = DeprecationLevel.ERROR)
public fun Flow.subscribe(): Unit = error("Should not be called")
@@ -64,18 +170,42 @@ public fun Flow.subscribe(onEach: (T) -> Unit): Unit = error("Should not
@Deprecated(message = "Use launch + collect instead", level = DeprecationLevel.ERROR)
public fun Flow.subscribe(onEach: (T) -> Unit, onError: (Throwable) -> Unit): Unit = error("Should not be called")
-/** @suppress **/
+/**
+ * Note that this replacement is sequential (`concat`) by default.
+ * For concurrent flatMap [flatMapMerge] can be used instead.
+ * @suppress
+ */
@Deprecated(
level = DeprecationLevel.ERROR,
- message = "Flow analogue is named concatenate",
- replaceWith = ReplaceWith("concatenate()")
+ message = "Flow analogue is named flatMapConcat",
+ replaceWith = ReplaceWith("flatMapConcat(mapper)")
)
-public fun Flow.concat(): Flow = error("Should not be called")
+public fun Flow.flatMap(mapper: suspend (T) -> Flow): Flow = error("Should not be called")
/** @suppress **/
@Deprecated(
level = DeprecationLevel.ERROR,
- message = "Flow analogue is named concatenate",
- replaceWith = ReplaceWith("concatenate(mapper)")
+ message = "Flow analogue is named flatMapConcat",
+ replaceWith = ReplaceWith("flatMapConcat(mapper)")
)
public fun Flow.concatMap(mapper: (T) -> Flow): Flow = error("Should not be called")
+
+/**
+ * Note that this replacement is sequential (`concat`) by default.
+ * For concurrent flatMap [flattenMerge] can be used instead.
+ * @suppress
+ */
+@Deprecated(
+ level = DeprecationLevel.ERROR,
+ message = "Flow analogue is named flattenConcat",
+ replaceWith = ReplaceWith("flattenConcat()")
+)
+public fun Flow>.merge(): Flow = error("Should not be called")
+
+/** @suppress **/
+@Deprecated(
+ level = DeprecationLevel.ERROR,
+ message = "Flow analogue is named flattenConcat",
+ replaceWith = ReplaceWith("flattenConcat()")
+)
+public fun Flow>.flatten(): Flow = error("Should not be called")
diff --git a/kotlinx-coroutines-core/common/src/flow/internal/NullSurrogate.kt b/kotlinx-coroutines-core/common/src/flow/internal/NullSurrogate.kt
index dc728f98d7..dc1cf2f3a2 100644
--- a/kotlinx-coroutines-core/common/src/flow/internal/NullSurrogate.kt
+++ b/kotlinx-coroutines-core/common/src/flow/internal/NullSurrogate.kt
@@ -4,4 +4,11 @@
package kotlinx.coroutines.flow.internal
-internal object NullSurrogate
+import kotlin.jvm.*
+
+internal object NullSurrogate {
+
+ @JvmStatic
+ @Suppress("UNCHECKED_CAST")
+ internal fun unbox(value: Any?): T = if (value === NullSurrogate) null as T else value as T
+}
diff --git a/kotlinx-coroutines-core/common/src/flow/internal/SafeCollector.kt b/kotlinx-coroutines-core/common/src/flow/internal/SafeCollector.kt
index d723dcdec9..17306e2ebf 100644
--- a/kotlinx-coroutines-core/common/src/flow/internal/SafeCollector.kt
+++ b/kotlinx-coroutines-core/common/src/flow/internal/SafeCollector.kt
@@ -4,6 +4,7 @@
package kotlinx.coroutines.flow.internal
+import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.internal.*
import kotlin.coroutines.*
@@ -11,15 +12,17 @@ import kotlin.coroutines.*
@PublishedApi
internal class SafeCollector(
private val collector: FlowCollector,
- private val interceptor: ContinuationInterceptor?
+ collectContext: CoroutineContext
) : FlowCollector, SynchronizedObject() {
+ private val collectContext = collectContext.minusKey(Job).minusId()
+
override suspend fun emit(value: T) {
- if (interceptor != coroutineContext[ContinuationInterceptor]) {
+ val emitContext = coroutineContext.minusKey(Job).minusId()
+ if (emitContext != collectContext) {
error(
- "Flow invariant is violated: flow was collected in $interceptor, but emission happened in ${coroutineContext[ContinuationInterceptor]}. " +
- "Please refer to 'flow' documentation or use 'flowOn' instead"
- )
+ "Flow invariant is violated: flow was collected in $collectContext, but emission happened in $emitContext. " +
+ "Please refer to 'flow' documentation or use 'flowOn' instead")
}
collector.emit(value)
}
diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Context.kt b/kotlinx-coroutines-core/common/src/flow/operators/Context.kt
index 13e7db7cef..754dc43e1f 100644
--- a/kotlinx-coroutines-core/common/src/flow/operators/Context.kt
+++ b/kotlinx-coroutines-core/common/src/flow/operators/Context.kt
@@ -15,19 +15,21 @@ import kotlinx.coroutines.flow.unsafeFlow as flow
/**
* The operator that changes the context where this flow is executed to the given [flowContext].
- * This operator is composable and affects only precedent operators that do not have its own context.
- * This operator is pure: [flowContext] **does not** leak into the downstream flow.
+ * This operator is composable and affects only preceding operators that do not have its own context.
+ * This operator is context preserving: [flowContext] **does not** leak into the downstream flow.
*
* For example:
* ```
- * val singleValue = intFlow // will be executed on IO if context wasn't specified before
- * .map { ... } // Will be executed in IO
- * .flowOn(Dispatchers.IO)
- * .filter { ... } // Will be executed in Default
- * .flowOn(Dispatchers.Default)
- * .single() // Will be executed in the context of the caller
+ * withContext(Dispatchers.Main) {
+ * val singleValue = intFlow // will be executed on IO if context wasn't specified before
+ * .map { ... } // Will be executed in IO
+ * .flowOn(Dispatchers.IO)
+ * .filter { ... } // Will be executed in Default
+ * .flowOn(Dispatchers.Default)
+ * .single() // Will be executed in the Main
+ * }
* ```
- * For more explanation of purity concept please refer to [Flow] documentation.
+ * For more explanation of context preservation please refer to [Flow] documentation.
*
* This operator uses a channel of the specific [bufferSize] in order to switch between contexts,
* but it is not guaranteed that the channel will be created, implementation is free to optimize it away in case of fusing.
@@ -69,7 +71,7 @@ public fun Flow.flowOn(flowContext: CoroutineContext, bufferSize: Int = 1
/**
* The operator that changes the context where all transformations applied to the given flow within a [builder] are executed.
- * This operator is pure and does not affect the context of the precedent and subsequent operations.
+ * This operator is context preserving and does not affect the context of the preceding and subsequent operations.
*
* Example:
* ```
@@ -81,7 +83,7 @@ public fun Flow.flowOn(flowContext: CoroutineContext, bufferSize: Int = 1
* }
* .map { ... } // Not affected
* ```
- * For more explanation of purity concept please refer to [Flow] documentation.
+ * For more explanation of context preservation please refer to [Flow] documentation.
*
* This operator uses channel of the specific [bufferSize] in order to switch between contexts,
* but it is not guaranteed that channel will be created, implementation is free to optimize it away in case of fusing.
@@ -98,9 +100,9 @@ public fun Flow.flowWith(
val source = this
return flow {
/**
- * Here we should subtract Job instance from the context.
+ * Here we should remove a Job instance from the context.
* All builders are written using scoping and no global coroutines are launched, so it is safe not to provide explicit Job.
- * It is also necessary not to mess with cancellations if multiple flowWith are used.
+ * It is also necessary not to mess with cancellation if multiple flowWith are used.
*/
val originalContext = coroutineContext.minusKey(Job)
val prepared = source.flowOn(originalContext, bufferSize)
diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Errors.kt b/kotlinx-coroutines-core/common/src/flow/operators/Errors.kt
index a08eabbe14..ae8436a748 100644
--- a/kotlinx-coroutines-core/common/src/flow/operators/Errors.kt
+++ b/kotlinx-coroutines-core/common/src/flow/operators/Errors.kt
@@ -61,7 +61,7 @@ public fun Flow.retry(
try {
emit(value)
} catch (e: Throwable) {
- fromDownstream = predicate(e)
+ fromDownstream = true
throw e
}
}
@@ -74,7 +74,7 @@ public fun Flow.retry(
}
}
-private fun Flow.collectSafely(onException: suspend FlowCollector.(Throwable) -> Unit): Flow =
+private fun Flow.collectSafely(onException: suspend FlowCollector.(Throwable) -> Unit): Flow =
flow {
// Note that exception may come from the downstream operators, we should not switch on that
var fromDownstream = false
diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt b/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt
index 34b89aa7e2..5f3d167e58 100644
--- a/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt
+++ b/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt
@@ -7,36 +7,59 @@
@file:Suppress("unused")
package kotlinx.coroutines.flow
+
import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.internal.*
-import kotlinx.coroutines.flow.unsafeFlow as flow
import kotlin.jvm.*
+import kotlinx.coroutines.flow.unsafeFlow as flow
/**
- * Transforms elements emitted by the original flow by applying [mapper], that returns another flow, and then merging and flattening these flows.
+ * Transforms elements emitted by the original flow by applying [transform], that returns another flow, and then concatenating and flattening these flows.
+ * This method is identical to `flatMapMerge(concurrency = 1, bufferSize = 1)`
+ *
+ * Note that even though this operator looks very familiar, we discourage its usage in a regular application-specific flows.
+ * Most likely, suspending operation in [map] operator will be sufficient and linear transformations are much easier to reason about.
+ */
+@FlowPreview
+public fun Flow.flatMapConcat(transform: suspend (value: T) -> Flow): Flow = flow {
+ collect { value ->
+ transform(value).collect { innerValue ->
+ emit(innerValue)
+ }
+ }
+}
+
+/**
+ * Transforms elements emitted by the original flow by applying [transform], that returns another flow, and then merging and flattening these flows.
*
* Note that even though this operator looks very familiar, we discourage its usage in a regular application-specific flows.
* Most likely, suspending operation in [map] operator will be sufficient and linear transformations are much easier to reason about.
*
* [bufferSize] parameter controls the size of backpressure aka the amount of queued in-flight elements.
- * [concurrency] parameter controls the size of in-flight flows.
+ * [concurrency] parameter controls the size of in-flight flows, at most [concurrency] flows are collected at the same time.
*/
@FlowPreview
-public fun Flow.flatMap(concurrency: Int = 16, bufferSize: Int = 16, mapper: suspend (value: T) -> Flow): Flow {
+public fun Flow.flatMapMerge(concurrency: Int = 16, bufferSize: Int = 16, transform: suspend (value: T) -> Flow): Flow {
+ require(bufferSize >= 0) { "Expected non-negative buffer size, but had $bufferSize" }
+ require(concurrency >= 0) { "Expected non-negative concurrency level, but had $concurrency" }
return flow {
val semaphore = Channel(concurrency)
val flatMap = SerializingFlatMapCollector(this, bufferSize)
coroutineScope {
collect { outerValue ->
+ // TODO real semaphore (#94)
semaphore.send(Unit) // Acquire concurrency permit
- val inner = mapper(outerValue)
+ val inner = transform(outerValue)
launch {
- inner.collect { value ->
- flatMap.push(value)
+ try {
+ inner.collect { value ->
+ flatMap.emit(value)
+ }
+ } finally {
+ semaphore.receive() // Release concurrency permit
}
- semaphore.receive() // Release concurrency permit
}
}
}
@@ -44,39 +67,28 @@ public fun Flow.flatMap(concurrency: Int = 16, bufferSize: Int = 16, m
}
/**
- * Merges given sequence of flows into a single flow with no guarantees on the order.
- *
- * [bufferSize] parameter controls the size of backpressure aka the amount of queued in-flight elements.
- * [concurrency] parameter controls the size of in-flight flows.
+ * Flattens the given flow of flows into a single flow in a sequentially manner, without interleaving nested flows.
+ * This method is identical to `flattenMerge(concurrency = 1, bufferSize = 1)
*/
@FlowPreview
-public fun Iterable>.merge(concurrency: Int = 16, bufferSize: Int = 16): Flow = asFlow().flatMap(concurrency, bufferSize) { it }
-
-/**
- * Concatenates values of each flow sequentially, without interleaving them.
- */
-@FlowPreview
-public fun Flow>.concatenate(): Flow = flow {
- collect {
- val inner = it
- inner.collect { value ->
- emit(value)
+public fun Flow>.flattenConcat(): Flow = flow {
+ collect { value ->
+ value.collect { innerValue ->
+ emit(innerValue)
}
}
}
/**
- * Transforms each value of the given flow into flow of another type and then flattens these flows
- * sequentially, without interleaving them.
+ * Flattens the given flow of flows into a single flow.
+ * This method is identical to `flatMapMerge(concurrency, bufferSize) { it }`
+ *
+ * [bufferSize] parameter controls the size of backpressure aka the amount of queued in-flight elements.
+ * [concurrency] parameter controls the size of in-flight flows, at most [concurrency] flows are collected at the same time.
*/
@FlowPreview
-public fun Flow.concatenate(mapper: suspend (T) -> Flow): Flow = flow {
- collect { value ->
- mapper(value).collect { innerValue ->
- emit(innerValue)
- }
- }
-}
+public fun Flow>.flattenMerge(concurrency: Int = 16, bufferSize: Int = 16): Flow = flatMapMerge(concurrency, bufferSize) { it }
+
// Effectively serializes access to downstream collector from flatMap
private class SerializingFlatMapCollector(
@@ -84,32 +96,42 @@ private class SerializingFlatMapCollector(
private val bufferSize: Int
) {
- // Let's try to leverage the fact that flatMap is never contended
- private val channel: Channel by lazy { Channel(bufferSize) }
- private val inProgress = atomic(false)
+ // Let's try to leverage the fact that flatMapMerge is never contended
+ // TODO 1.2.1 do not allocate channel
+ private val channel = Channel(bufferSize) // Should be any, but KT-30796
+ private val inProgressLock = atomic(false)
- public suspend fun push(value: T) {
- if (!inProgress.compareAndSet(false, true)) {
+ public suspend fun emit(value: T) {
+ if (!inProgressLock.tryAcquire()) {
channel.send(value ?: NullSurrogate)
- if (inProgress.compareAndSet(false, true)) {
- helpPush()
+ if (inProgressLock.tryAcquire()) {
+ helpEmit()
}
return
}
downstream.emit(value)
- helpPush()
+ helpEmit()
}
@Suppress("UNCHECKED_CAST")
- private suspend fun helpPush() {
- var element = channel.poll()
- while (element != null) { // TODO receive or closed
- if (element === NullSurrogate) downstream.emit(null as T)
- else downstream.emit(element as T)
- element = channel.poll()
- }
+ private suspend fun helpEmit() {
+ while (true) {
+ var element = channel.poll()
+ while (element != null) { // TODO receive or closed (#330)
+ downstream.emit(NullSurrogate.unbox(element))
+ element = channel.poll()
+ }
- inProgress.value = false
+ inProgressLock.release()
+ // Enforce liveness
+ if (channel.isEmpty || !inProgressLock.tryAcquire()) break
+ }
}
}
+
+private fun AtomicBoolean.tryAcquire(): Boolean = compareAndSet(false, true)
+
+private fun AtomicBoolean.release() {
+ value = false
+}
diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Transform.kt b/kotlinx-coroutines-core/common/src/flow/operators/Transform.kt
index 6f39e3b732..f75f3df8fb 100644
--- a/kotlinx-coroutines-core/common/src/flow/operators/Transform.kt
+++ b/kotlinx-coroutines-core/common/src/flow/operators/Transform.kt
@@ -26,7 +26,7 @@ import kotlinx.coroutines.flow.unsafeFlow as flow
* ```
*/
@FlowPreview
-public fun Flow.transform(@BuilderInference transformer: suspend FlowCollector.(value: T) -> Unit): Flow {
+public fun Flow.transform(@BuilderInference transformer: suspend FlowCollector.(value: T) -> Unit): Flow {
return flow {
collect { value ->
transformer(value)
diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Zip.kt b/kotlinx-coroutines-core/common/src/flow/operators/Zip.kt
new file mode 100644
index 0000000000..ab0bee3f96
--- /dev/null
+++ b/kotlinx-coroutines-core/common/src/flow/operators/Zip.kt
@@ -0,0 +1,158 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+@file:JvmMultifileClass
+@file:JvmName("FlowKt")
+@file:Suppress("UNCHECKED_CAST")
+
+package kotlinx.coroutines.flow
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
+import kotlinx.coroutines.flow.internal.*
+import kotlinx.coroutines.selects.*
+import kotlin.jvm.*
+import kotlinx.coroutines.flow.unsafeFlow as flow
+
+/**
+ * Returns a [Flow] whose values are generated with [transform] function by combining
+ * the most recently emitted values by each flow.
+ *
+ * It can be demonstrated with the following example:
+ * ```
+ * val flow = flowOf(1, 2).delayEach(10)
+ * val flow2 = flowOf("a", "b", "c").delayEach(15)
+ * flow.combineLatest(flow2) { i, s -> i.toString() + s }.collect {
+ * println(it) // Will print "1a 2a 2b 2c"
+ * }
+ * ```
+ */
+public fun Flow.combineLatest(other: Flow, transform: suspend (T1, T2) -> R): Flow = flow {
+ coroutineScope {
+ val firstChannel = asFairChannel(this@combineLatest)
+ val secondChannel = asFairChannel(other)
+ var firstValue: Any? = null
+ var secondValue: Any? = null
+ var firstIsClosed = false
+ var secondIsClosed = false
+
+ /*
+ * Fun fact, this select **semantically** equivalent of the following:
+ * ```
+ * selectWhile {
+ * channel.onReceive {
+ * emitCombined(...)
+ * }
+ * channel2.onReceive {
+ * emitCombined(...)
+ * }
+ * }
+ * ```
+ * but we are waiting for `channels` branch to get merged where we will change semantics of the select
+ * to ignore finished clauses.
+ *
+ * Instead (especially in the face of non-fair channels) we are using our own hand-rolled select emulation
+ * on top of previous select.
+ */
+ while (!firstIsClosed || !secondIsClosed) {
+ select {
+ onReceive(firstIsClosed, firstChannel, { firstIsClosed = true }) { value ->
+ firstValue = value
+ if (secondValue !== null) {
+ emit(transform(NullSurrogate.unbox(firstValue), NullSurrogate.unbox(secondValue)))
+ }
+ }
+
+ onReceive(secondIsClosed, secondChannel, { secondIsClosed = true }) { value ->
+ secondValue = value
+ if (firstValue !== null) {
+ emit(transform(NullSurrogate.unbox(firstValue), NullSurrogate.unbox(secondValue)))
+ }
+ }
+ }
+ }
+ }
+}
+
+
+private inline fun SelectBuilder.onReceive(
+ isClosed: Boolean,
+ channel: Channel,
+ crossinline onClosed: () -> Unit,
+ noinline onReceive: suspend (value: Any) -> Unit
+) {
+ if (isClosed) return
+ channel.onReceiveOrNull {
+ if (it === null) onClosed()
+ else onReceive(it)
+ }
+}
+
+// Channel has any type due to onReceiveOrNull. This will be fixed after receiveOrClosed
+private fun CoroutineScope.asFairChannel(flow: Flow<*>): Channel {
+ val channel = RendezvousChannel() // Explicit type
+ launch {
+ try {
+ flow.collect { value ->
+ channel.sendFair(value ?: NullSurrogate)
+ }
+ } finally {
+ channel.close()
+ }
+ }
+ return channel
+}
+
+
+/**
+ * Zips values from the current flow (`this`) with [other] flow using provided [transform] function applied to each pair of values.
+ * The resulting flow completes as soon as one of the flows completes and cancel is called on the remaining flow.
+ *
+ * It can be demonstrated with the following example:
+ * ```
+ * val flow = flowOf(1, 2, 3).delayEach(10)
+ * val flow2 = flowOf("a", "b", "c", "d").delayEach(15)
+ * flow.zip(flow2) { i, s -> i.toString() + s }.collect {
+ * println(it) // Will print "1a 2b 3c"
+ * }
+ * ```
+ */
+public fun Flow.zip(other: Flow, transform: suspend (T1, T2) -> R): Flow = flow {
+ coroutineScope {
+ val first = asChannel(this@zip)
+ val second = asChannel(other)
+ /*
+ * This approach only works with rendezvous channel and is required to enforce correctness
+ * in the following scenario:
+ * ```
+ * val f1 = flow { emit(1); delay(Long.MAX_VALUE) }
+ * val f2 = flowOf(1)
+ * f1.zip(f2) { ... }
+ * ```
+ *
+ * Invariant: this clause is invoked only when all elements from the channel were processed (=> rendezvous restriction).
+ */
+ (second as SendChannel<*>).invokeOnClose { first.cancel() }
+
+ val otherIterator = second.iterator()
+ try {
+ first.consumeEach { value ->
+ if (!otherIterator.hasNext()) {
+ return@consumeEach
+ }
+ val secondValue = NullSurrogate.unbox(otherIterator.next())
+ emit(transform(NullSurrogate.unbox(value), NullSurrogate.unbox(secondValue)))
+ }
+ } finally {
+ second.cancel()
+ }
+ }
+}
+
+// Channel has any type due to onReceiveOrNull. This will be fixed after receiveOrClosed
+private fun CoroutineScope.asChannel(flow: Flow<*>): ReceiveChannel = produce {
+ flow.collect { value ->
+ channel.send(value ?: NullSurrogate)
+ }
+}
diff --git a/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt b/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt
index 47333933f7..d0e04ff286 100644
--- a/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt
+++ b/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt
@@ -18,12 +18,12 @@ import kotlin.jvm.*
* ```
* val flow = getMyEvents()
* try {
- * flow.collect { value ->
- * println("Received $value")
- * }
- * println("My events are consumed successfully")
+ * flow.collect { value ->
+ * println("Received $value")
+ * }
+ * println("My events are consumed successfully")
* } catch (e: Throwable) {
- * println("Exception from the flow: $e")
+ * println("Exception from the flow: $e")
* }
* ```
*/
diff --git a/kotlinx-coroutines-core/common/test/CancellableResumeTest.kt b/kotlinx-coroutines-core/common/test/CancellableResumeTest.kt
new file mode 100644
index 0000000000..b2cde6b978
--- /dev/null
+++ b/kotlinx-coroutines-core/common/test/CancellableResumeTest.kt
@@ -0,0 +1,122 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+@file:Suppress("NAMED_ARGUMENTS_NOT_ALLOWED") // KT-21913
+
+package kotlinx.coroutines
+
+import kotlin.test.*
+
+/**
+ * Test for [CancellableContinuation.resume] with `onCancellation` parameter.
+ */
+class CancellableResumeTest : TestBase() {
+ @Test
+ fun testResumeImmediateNormally() = runTest {
+ expect(1)
+ val ok = suspendCancellableCoroutine { cont ->
+ expect(2)
+ cont.invokeOnCancellation { expectUnreached() }
+ cont.resume("OK") { expectUnreached() }
+ expect(3)
+ }
+ assertEquals("OK", ok)
+ finish(4)
+ }
+
+ @Test
+ fun testResumeImmediateAfterCancel() = runTest(
+ expected = { it is TestException }
+ ) {
+ expect(1)
+ val ok = suspendCancellableCoroutine { cont ->
+ expect(2)
+ cont.invokeOnCancellation { expect(3) }
+ cont.cancel(TestException("FAIL"))
+ expect(4)
+ cont.resume("OK") { cause ->
+ expect(5)
+ assertTrue(cause is TestException)
+ }
+ finish(6)
+ }
+ expectUnreached()
+ }
+
+ @Test
+ fun testResumeLaterNormally() = runTest {
+ expect(1)
+ lateinit var cc: CancellableContinuation
+ launch(start = CoroutineStart.UNDISPATCHED) {
+ expect(2)
+ val ok = suspendCancellableCoroutine { cont ->
+ expect(3)
+ cont.invokeOnCancellation { expectUnreached() }
+ cc = cont
+ }
+ assertEquals("OK", ok)
+ finish(6)
+ }
+ expect(4)
+ cc.resume("OK") { expectUnreached() }
+ expect(5)
+ }
+
+ @Test
+ fun testResumeLaterAfterCancel() = runTest {
+ expect(1)
+ lateinit var cc: CancellableContinuation
+ val job = launch(start = CoroutineStart.UNDISPATCHED) {
+ expect(2)
+ try {
+ suspendCancellableCoroutine { cont ->
+ expect(3)
+ cont.invokeOnCancellation { expect(5) }
+ cc = cont
+ }
+ expectUnreached()
+ } catch (e: CancellationException) {
+ finish(9)
+ }
+ }
+ expect(4)
+ job.cancel(TestCancellationException())
+ expect(6)
+ cc.resume("OK") { cause ->
+ expect(7)
+ assertTrue(cause is TestCancellationException)
+ }
+ expect(8)
+ }
+
+ @Test
+ fun testResumeCancelWhileDispatched() = runTest {
+ expect(1)
+ lateinit var cc: CancellableContinuation
+ val job = launch(start = CoroutineStart.UNDISPATCHED) {
+ expect(2)
+ try {
+ suspendCancellableCoroutine { cont ->
+ expect(3)
+ // resumed first, then cancelled, so no invokeOnCancellation call
+ cont.invokeOnCancellation { expectUnreached() }
+ cc = cont
+ }
+ expectUnreached()
+ } catch (e: CancellationException) {
+ expect(8)
+ }
+ }
+ expect(4)
+ cc.resume("OK") { cause ->
+ expect(7)
+ assertTrue(cause is TestCancellationException)
+ }
+ expect(5)
+ job.cancel(TestCancellationException()) // cancel while execution is dispatched
+ expect(6)
+ yield() // to coroutine -- throws cancellation exception
+ finish(9)
+ }
+}
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/common/test/channels/ArrayChannelTest.kt b/kotlinx-coroutines-core/common/test/channels/ArrayChannelTest.kt
index 308f8f3c2a..bcff1edfa0 100644
--- a/kotlinx-coroutines-core/common/test/channels/ArrayChannelTest.kt
+++ b/kotlinx-coroutines-core/common/test/channels/ArrayChannelTest.kt
@@ -11,10 +11,12 @@ class ArrayChannelTest : TestBase() {
@Test
fun testSimple() = runTest {
val q = Channel(1)
+ check(q.isEmpty)
expect(1)
val sender = launch {
expect(4)
q.send(1) // success -- buffered
+ check(!q.isEmpty)
expect(5)
q.send(2) // suspends (buffer full)
expect(9)
@@ -23,6 +25,7 @@ class ArrayChannelTest : TestBase() {
val receiver = launch {
expect(6)
check(q.receive() == 1) // does not suspend -- took from buffer
+ check(!q.isEmpty) // waiting sender's element moved to buffer
expect(7)
check(q.receive() == 2) // does not suspend (takes from sender)
expect(8)
@@ -30,20 +33,21 @@ class ArrayChannelTest : TestBase() {
expect(3)
sender.join()
receiver.join()
+ check(q.isEmpty)
finish(10)
}
@Test
fun testClosedBufferedReceiveOrNull() = runTest {
val q = Channel(1)
- check(!q.isClosedForSend && !q.isClosedForReceive)
+ check(q.isEmpty && !q.isClosedForSend && !q.isClosedForReceive)
expect(1)
launch {
expect(5)
- check(q.isClosedForSend && !q.isClosedForReceive)
+ check(!q.isEmpty && q.isClosedForSend && !q.isClosedForReceive)
assertEquals(42, q.receiveOrNull())
expect(6)
- check(q.isClosedForSend && q.isClosedForReceive)
+ check(!q.isEmpty && q.isClosedForSend && q.isClosedForReceive)
assertEquals(null, q.receiveOrNull())
expect(7)
}
@@ -52,9 +56,9 @@ class ArrayChannelTest : TestBase() {
expect(3)
q.close() // goes on
expect(4)
- check(q.isClosedForSend && !q.isClosedForReceive)
+ check(!q.isEmpty && q.isClosedForSend && !q.isClosedForReceive)
yield()
- check(q.isClosedForSend && q.isClosedForReceive)
+ check(!q.isEmpty && q.isClosedForSend && q.isClosedForReceive)
finish(8)
}
diff --git a/kotlinx-coroutines-core/common/test/channels/RendezvousChannelTest.kt b/kotlinx-coroutines-core/common/test/channels/RendezvousChannelTest.kt
index fafda0d7da..d7ca753e0b 100644
--- a/kotlinx-coroutines-core/common/test/channels/RendezvousChannelTest.kt
+++ b/kotlinx-coroutines-core/common/test/channels/RendezvousChannelTest.kt
@@ -11,6 +11,7 @@ class RendezvousChannelTest : TestBase() {
@Test
fun testSimple() = runTest {
val q = Channel(Channel.RENDEZVOUS)
+ check(q.isEmpty)
expect(1)
val sender = launch {
expect(4)
@@ -30,13 +31,14 @@ class RendezvousChannelTest : TestBase() {
expect(3)
sender.join()
receiver.join()
+ check(q.isEmpty)
finish(10)
}
@Test
fun testClosedReceiveOrNull() = runTest {
val q = Channel(Channel.RENDEZVOUS)
- check(!q.isClosedForSend && !q.isClosedForReceive)
+ check(q.isEmpty && !q.isClosedForSend && !q.isClosedForReceive)
expect(1)
launch {
expect(3)
@@ -49,9 +51,9 @@ class RendezvousChannelTest : TestBase() {
q.send(42)
expect(5)
q.close()
- check(q.isClosedForSend && q.isClosedForReceive)
+ check(!q.isEmpty && q.isClosedForSend && q.isClosedForReceive)
yield()
- check(q.isClosedForSend && q.isClosedForReceive)
+ check(!q.isEmpty && q.isClosedForSend && q.isClosedForReceive)
finish(7)
}
@@ -252,7 +254,7 @@ class RendezvousChannelTest : TestBase() {
expect(1)
send(bad)
}
- assertTrue(c.receive() === bad)
+ assertSame(c.receive(), bad)
finish(2)
}
diff --git a/kotlinx-coroutines-core/common/test/channels/TestChannelKind.kt b/kotlinx-coroutines-core/common/test/channels/TestChannelKind.kt
index 6e1ee2bf69..465699e27c 100644
--- a/kotlinx-coroutines-core/common/test/channels/TestChannelKind.kt
+++ b/kotlinx-coroutines-core/common/test/channels/TestChannelKind.kt
@@ -54,7 +54,6 @@ private class ChannelViaBroadcast(
val sub = broadcast.openSubscription()
override val isClosedForReceive: Boolean get() = sub.isClosedForReceive
- @Suppress("DEPRECATION_ERROR")
override val isEmpty: Boolean get() = sub.isEmpty
override suspend fun receive(): E = sub.receive()
diff --git a/kotlinx-coroutines-core/common/test/flow/FlowInvariantsTest.kt b/kotlinx-coroutines-core/common/test/flow/FlowInvariantsTest.kt
index 86f685181c..0befdc2227 100644
--- a/kotlinx-coroutines-core/common/test/flow/FlowInvariantsTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/FlowInvariantsTest.kt
@@ -5,6 +5,8 @@
package kotlinx.coroutines.flow
import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
+import kotlin.coroutines.*
import kotlin.test.*
class FlowInvariantsTest : TestBase() {
@@ -22,7 +24,7 @@ class FlowInvariantsTest : TestBase() {
}
@Test
- fun testWithContextContractViolated() = runTest({ it is IllegalStateException }) {
+ fun testWithDispatcherContractViolated() = runTest({ it is IllegalStateException }) {
flow {
kotlinx.coroutines.withContext(NamedDispatchers("foo")) {
emit(1)
@@ -32,6 +34,17 @@ class FlowInvariantsTest : TestBase() {
}
}
+ @Test
+ fun testWithNameContractViolated() = runTest({ it is IllegalStateException }) {
+ flow {
+ kotlinx.coroutines.withContext(CoroutineName("foo")) {
+ emit(1)
+ }
+ }.collect {
+ fail()
+ }
+ }
+
@Test
fun testWithContextDoesNotChangeExecution() = runTest {
val flow = flow {
@@ -51,4 +64,40 @@ class FlowInvariantsTest : TestBase() {
assertEquals("original", result)
}
+
+ @Test
+ fun testScopedJob() = runTest {
+ flow { emit(1) }.buffer(EmptyCoroutineContext).collect {
+ expect(1)
+ }
+
+ finish(2)
+ }
+
+ @Test
+ fun testScopedJobWithViolation() = runTest({ it is IllegalStateException }) {
+ flow { emit(1) }.buffer(Dispatchers.Unconfined).collect {
+ expect(1)
+ }
+
+ finish(2)
+ }
+
+ private fun Flow.buffer(coroutineContext: CoroutineContext): Flow = flow {
+ coroutineScope {
+ val channel = Channel()
+ launch {
+ collect { value ->
+ channel.send(value)
+ }
+ channel.close()
+ }
+
+ launch(coroutineContext) {
+ for (i in channel) {
+ emit(i)
+ }
+ }
+ }
+ }
}
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/CombineLatestTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/CombineLatestTest.kt
new file mode 100644
index 0000000000..5954e211ad
--- /dev/null
+++ b/kotlinx-coroutines-core/common/test/flow/operators/CombineLatestTest.kt
@@ -0,0 +1,200 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.flow
+
+import kotlinx.coroutines.*
+import kotlin.test.*
+
+/*
+ * Replace: { i, j -> i + j } -> { i, j -> i + j } as soon as KT-30991 is fixed
+ */
+class CombineLatestTest : TestBase() {
+
+ @Test
+ fun testCombineLatest() = runTest {
+ val flow = flowOf("a", "b", "c")
+ val flow2 = flowOf(1, 2, 3)
+ val list = flow.combineLatest(flow2, { i, j -> i + j }).toList()
+ assertEquals(listOf("a1", "b1", "b2", "c2", "c3"), list)
+ }
+
+ @Test
+ fun testNulls() = runTest {
+ val flow = flowOf("a", null, null)
+ val flow2 = flowOf(1, 2, 3)
+ val list = flow.combineLatest(flow2, { i, j -> i + j }).toList()
+ assertEquals(listOf("a1", "null1", "null2", "null2", "null3"), list)
+ }
+
+ @Test
+ fun testNullsOther() = runTest {
+ val flow = flowOf("a", "b", "c")
+ val flow2 = flowOf(null, 2, null)
+ val list = flow.combineLatest(flow2, { i, j -> i + j }).toList()
+ assertEquals(listOf("anull", "bnull", "b2", "c2", "cnull"), list)
+ }
+
+ @Test
+ fun testEmptyFlow() = runTest {
+ val flow = emptyFlow().combineLatest(emptyFlow(), { i, j -> i + j })
+ assertNull(flow.singleOrNull())
+ }
+
+ @Test
+ fun testFirstIsEmpty() = runTest {
+ val f1 = emptyFlow()
+ val f2 = flowOf(1)
+ assertEquals(emptyList(), f1.combineLatest(f2, { i, j -> i + j }).toList())
+ }
+
+ @Test
+ fun testSecondIsEmpty() = runTest {
+ val f1 = flowOf("a")
+ val f2 = emptyFlow()
+ assertEquals(emptyList(), f1.combineLatest(f2, { i, j -> i + j }).toList())
+ }
+
+ @Test
+ fun testPreservingOrder() = runTest {
+ val f1 = flow {
+ expect(1)
+ emit("a")
+ expect(3)
+ emit("b")
+ emit("c")
+ expect(4)
+ }
+
+ val f2 = flow {
+ expect(2)
+ emit(1)
+ yield()
+ yield()
+ expect(5)
+ emit(2)
+ expect(6)
+ yield()
+ expect(7)
+ emit(3)
+ }
+
+ val result = f1.combineLatest(f2, { i, j -> i + j }).toList()
+ assertEquals(listOf("a1", "b1", "c1", "c2", "c3"), result)
+ finish(8)
+ }
+
+ @Test
+ fun testPreservingOrderReversed() = runTest {
+ val f1 = flow {
+ expect(1)
+ emit("a")
+ expect(3)
+ emit("b")
+ emit("c")
+ expect(4)
+ }
+
+ val f2 = flow {
+ yield() // One more yield because now this flow starts first
+ expect(2)
+ emit(1)
+ yield()
+ yield()
+ expect(5)
+ emit(2)
+ expect(6)
+ yield()
+ expect(7)
+ emit(3)
+ }
+
+ val result = f2.combineLatest(f1) { i, j -> j + i }.toList()
+ assertEquals(listOf("a1", "b1", "c1", "c2", "c3"), result)
+ finish(8)
+ }
+
+ @Test
+ fun testContextIsIsolated() = runTest {
+ val f1 = flow {
+ emit("a")
+ assertEquals("first", NamedDispatchers.name())
+ expect(1)
+ }.flowOn(NamedDispatchers("first")).onEach {
+ assertEquals("nested", NamedDispatchers.name())
+ expect(2)
+ }.flowOn(NamedDispatchers("nested"))
+
+ val f2 = flow {
+ emit(1)
+ assertEquals("second", NamedDispatchers.name())
+ expect(3)
+ }.flowOn(NamedDispatchers("second")).flowWith(NamedDispatchers("with")) {
+ onEach {
+ assertEquals("with", NamedDispatchers.name())
+ expect(4)
+ }
+ }
+
+ val value = withContext(NamedDispatchers("main")) {
+ f1.combineLatest(f2) { i, j ->
+ assertEquals("main", NamedDispatchers.name())
+ expect(5)
+ i + j
+ }.single()
+ }
+
+ assertEquals("a1", value)
+ finish(6)
+ }
+
+ @Test
+ fun testErrorInDownstreamCancelsUpstream() = runTest {
+ val f1 = flow {
+ emit("a")
+ hang {
+ expect(2)
+ }
+ }.flowOn(NamedDispatchers("first"))
+
+ val f2 = flow {
+ emit(1)
+ hang {
+ expect(3)
+ }
+ }.flowOn(NamedDispatchers("second"))
+
+ val flow = f1.combineLatest(f2) { i, j ->
+ assertEquals("combine", NamedDispatchers.name())
+ expect(1)
+ i + j
+ }.flowOn(NamedDispatchers("combine")).onEach {
+ throw TestException()
+ }
+
+ assertFailsWith(flow)
+ finish(4)
+ }
+
+ @Test
+ fun testErrorCancelsSibling() = runTest {
+ val f1 = flow {
+ emit("a")
+ hang {
+ expect(1)
+ }
+ }.flowOn(NamedDispatchers("first"))
+
+ val f2 = flow {
+ emit(1)
+ throw TestException()
+ }.flowOn(NamedDispatchers("second"))
+
+ val flow = f1.combineLatest(f2) { _, _ -> 1 }
+ assertFailsWith(flow)
+ finish(2)
+ }
+
+ private suspend fun sum(s: String?, i: Int?) = s + i
+}
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/ConcatenateMapTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/ConcatenateMapTest.kt
deleted file mode 100644
index d4e15a8667..0000000000
--- a/kotlinx-coroutines-core/common/test/flow/operators/ConcatenateMapTest.kt
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
- */
-
-package kotlinx.coroutines.flow
-
-import kotlinx.coroutines.*
-import kotlinx.coroutines.channels.*
-import kotlin.test.*
-
-class ConcatenateMapTest : TestBase() {
- @Test
- fun testConcatenate() = runTest {
- val n = 100
- val sum = flow {
- repeat(n) {
- emit(it + 1) // 1..100
- }
- }.concatenate { value ->
- // 1 + (1 + 2) + (1 + 2 + 3) + ... (1 + .. + n)
- flow {
- repeat(value) {
- emit(it + 1)
- }
- }
- }.sum()
-
- assertEquals(n * (n + 1) * (n + 2) / 6, sum)
- }
-
- @Test
- fun testSingle() = runTest {
- val flow = flow {
- repeat(100) {
- emit(it)
- }
- }.concatenate { value ->
- if (value == 99) flowOf(42)
- else flowOf()
- }
-
- val value = flow.single()
- assertEquals(42, value)
- }
-
- @Test
- fun testFailure() = runTest {
- var finally = false
- val latch = Channel()
- val flow = flow {
- coroutineScope {
- launch {
- latch.send(Unit)
- hang { finally = true }
- }
-
- emit(1)
- }
- }.concatenate {
- flow {
- latch.receive()
- throw TestException()
- }
- }
-
- assertFailsWith { flow.count() }
- assertTrue(finally)
- }
-
- @Test
- fun testFailureInMapOperation() = runTest {
- val latch = Channel()
- val flow = flow {
- coroutineScope {
- launch {
- latch.send(Unit)
- hang { expect(3) }
- }
-
- expect(1)
- emit(1)
- }
- }.concatenate {
- latch.receive()
- expect(2)
- throw TestException()
- flowOf() // Workaround for KT-30642, return type should not be Nothing
- }
-
- assertFailsWith { flow.count() }
- finish(4)
- }
-
- @Test
- fun testContext() = runTest {
- val captured = ArrayList()
- val flow = flowOf(1)
- .flowOn(NamedDispatchers("irrelevant"))
- .concatenate {
- flow {
- captured += NamedDispatchers.name()
- emit(it)
- }
- }
-
- flow.flowOn(NamedDispatchers("1")).sum()
- flow.flowOn(NamedDispatchers("2")).sum()
- assertEquals(listOf("1", "2"), captured)
- }
-
-
- @Test
- fun testIsolatedContext() = runTest {
- val flow = flowOf(1)
- .flowOn(NamedDispatchers("irrelevant"))
- .flowWith(NamedDispatchers("inner")) {
- concatenate {
- flow {
- expect(2)
- assertEquals("inner", NamedDispatchers.name())
- emit(it)
- }
- }
- }.flowOn(NamedDispatchers("irrelevant"))
- .concatenate {
- flow {
- expect(3)
- assertEquals("outer", NamedDispatchers.name())
- emit(it)
- }
- }.flowOn(NamedDispatchers("outer"))
-
- expect(1)
- assertEquals(1, flow.single())
- finish(4)
- }
-}
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/ConcatenateTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/ConcatenateTest.kt
deleted file mode 100644
index 6a50bc92dc..0000000000
--- a/kotlinx-coroutines-core/common/test/flow/operators/ConcatenateTest.kt
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
- */
-
-package kotlinx.coroutines.flow
-
-import kotlinx.coroutines.*
-import kotlin.test.*
-
-class ConcatenateTest : TestBase() {
- @Test
- fun testConcatenate() = runTest {
- val n = 100
- val sum = (1..n).asFlow()
- .map { value ->
- flow {
- repeat(value) {
- emit(it + 1)
- }
- }
- }.concatenate().sum()
- assertEquals(n * (n + 1) * (n + 2) / 6, sum)
- }
-
- @Test
- fun testSingle() = runTest {
- val flows = flow {
- repeat(100) {
- if (it == 99) emit(flowOf(42))
- else emit(flowOf())
- }
- }
-
- val value = flows.concatenate().single()
- assertEquals(42, value)
- }
-
-
- @Test
- fun testContext() = runTest {
- val flow = flow {
- emit(flow {
- expect(2)
- assertEquals("first", NamedDispatchers.name())
- emit(1)
- }.flowOn(NamedDispatchers("first")))
-
- emit(flow {
- expect(3)
- assertEquals("second", NamedDispatchers.name())
- emit(1)
- }.flowOn(NamedDispatchers("second")))
- }.concatenate().flowOn(NamedDispatchers("first"))
-
- expect(1)
- assertEquals(2, flow.sum())
- finish(4)
- }
-}
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/FlatMapBaseTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/FlatMapBaseTest.kt
new file mode 100644
index 0000000000..4c3f646d42
--- /dev/null
+++ b/kotlinx-coroutines-core/common/test/flow/operators/FlatMapBaseTest.kt
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.flow
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
+import kotlinx.coroutines.flow.*
+import kotlin.test.*
+
+abstract class FlatMapBaseTest : TestBase() {
+
+ abstract fun Flow.flatMap(mapper: suspend (T) -> Flow): Flow