diff --git a/.gitignore b/.gitignore index 33ebe3f3a1..aed7103292 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,9 @@ **/.idea/* !/.idea/icon.png !/.idea/vcs.xml +!/.idea/copyright +!/.idea/codeStyleSettings.xml +!/.idea/codeStyles *.iml .gradle .gradletasknamecache diff --git a/.idea/codeStyleSettings.xml b/.idea/codeStyleSettings.xml new file mode 100644 index 0000000000..2287edb44c --- /dev/null +++ b/.idea/codeStyleSettings.xml @@ -0,0 +1,8 @@ + + + + + + \ No newline at end of file diff --git a/.idea/codeStyles/Project.xml b/.idea/codeStyles/Project.xml new file mode 100644 index 0000000000..62fd5c7dfd --- /dev/null +++ b/.idea/codeStyles/Project.xml @@ -0,0 +1,15 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/codeStyles/codeStyleConfig.xml b/.idea/codeStyles/codeStyleConfig.xml new file mode 100644 index 0000000000..79ee123c2b --- /dev/null +++ b/.idea/codeStyles/codeStyleConfig.xml @@ -0,0 +1,5 @@ + + + + \ No newline at end of file diff --git a/.idea/copyright/kotlinx_coroutines.xml b/.idea/copyright/kotlinx_coroutines.xml new file mode 100644 index 0000000000..f175624311 --- /dev/null +++ b/.idea/copyright/kotlinx_coroutines.xml @@ -0,0 +1,6 @@ + + + + \ No newline at end of file diff --git a/.idea/copyright/profiles_settings.xml b/.idea/copyright/profiles_settings.xml new file mode 100644 index 0000000000..5e22a9977e --- /dev/null +++ b/.idea/copyright/profiles_settings.xml @@ -0,0 +1,3 @@ + + + \ No newline at end of file diff --git a/CHANGES.md b/CHANGES.md index ee507237c3..4fa05cb122 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,14 @@ # Change log for kotlinx.coroutines +## Version 1.3.7 + +* Fixed problem that triggered Android Lint failure (#2004). +* New `Flow.cancellable()` operator for cooperative cancellation (#2026). +* Emissions from `flow` builder now check cancellation status and are properly cancellable (#2026). +* New `currentCoroutineContext` function to use unambiguously in the contexts with `CoroutineScope` in receiver position (#2026). +* `EXACTLY_ONCE` contract support in coroutine builders. +* Various documentation improvements. + ## Version 1.3.6 ### Flow diff --git a/README.md b/README.md index 5d655b274d..795616c88d 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ [![official JetBrains project](https://jb.gg/badges/official.svg)](https://confluence.jetbrains.com/display/ALL/JetBrains+on+GitHub) [![GitHub license](https://img.shields.io/badge/license-Apache%20License%202.0-blue.svg?style=flat)](https://www.apache.org/licenses/LICENSE-2.0) -[![Download](https://api.bintray.com/packages/kotlin/kotlinx/kotlinx.coroutines/images/download.svg?version=1.3.6) ](https://bintray.com/kotlin/kotlinx/kotlinx.coroutines/1.3.6) +[![Download](https://api.bintray.com/packages/kotlin/kotlinx/kotlinx.coroutines/images/download.svg?version=1.3.7) ](https://bintray.com/kotlin/kotlinx/kotlinx.coroutines/1.3.7) Library support for Kotlin coroutines with [multiplatform](#multiplatform) support. This is a companion version for Kotlin `1.3.71` release. @@ -84,7 +84,7 @@ Add dependencies (you can also add other modules that you need): org.jetbrains.kotlinx kotlinx-coroutines-core - 1.3.6 + 1.3.7 ``` @@ -102,7 +102,7 @@ Add dependencies (you can also add other modules that you need): ```groovy dependencies { - implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.6' + implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.7' } ``` @@ -128,7 +128,7 @@ Add dependencies (you can also add other modules that you need): ```groovy dependencies { - implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.6") + implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.7") } ``` @@ -147,7 +147,7 @@ Make sure that you have either `jcenter()` or `mavenCentral()` in the list of re Core modules of `kotlinx.coroutines` are also available for [Kotlin/JS](#js) and [Kotlin/Native](#native). In common code that should get compiled for different platforms, add dependency to -[`kotlinx-coroutines-core-common`](https://search.maven.org/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core-common/1.3.6/jar) +[`kotlinx-coroutines-core-common`](https://search.maven.org/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core-common/1.3.7/jar) (follow the link to get the dependency declaration snippet). ### Android @@ -156,7 +156,7 @@ Add [`kotlinx-coroutines-android`](ui/kotlinx-coroutines-android) module as dependency when using `kotlinx.coroutines` on Android: ```groovy -implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.6' +implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.7' ``` This gives you access to Android [Dispatchers.Main] @@ -172,7 +172,7 @@ For more details see ["Optimization" section for Android](ui/kotlinx-coroutines- ### JS [Kotlin/JS](https://kotlinlang.org/docs/reference/js-overview.html) version of `kotlinx.coroutines` is published as -[`kotlinx-coroutines-core-js`](https://search.maven.org/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core-js/1.3.6/jar) +[`kotlinx-coroutines-core-js`](https://search.maven.org/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core-js/1.3.7/jar) (follow the link to get the dependency declaration snippet). You can also use [`kotlinx-coroutines-core`](https://www.npmjs.com/package/kotlinx-coroutines-core) package via NPM. @@ -180,7 +180,7 @@ You can also use [`kotlinx-coroutines-core`](https://www.npmjs.com/package/kotli ### Native [Kotlin/Native](https://kotlinlang.org/docs/reference/native-overview.html) version of `kotlinx.coroutines` is published as -[`kotlinx-coroutines-core-native`](https://search.maven.org/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core-native/1.3.6/jar) +[`kotlinx-coroutines-core-native`](https://search.maven.org/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core-native/1.3.7/jar) (follow the link to get the dependency declaration snippet). Only single-threaded code (JS-style) on Kotlin/Native is currently supported. diff --git a/benchmarks/build.gradle.kts b/benchmarks/build.gradle.kts index fa651346ce..7df4510bf4 100644 --- a/benchmarks/build.gradle.kts +++ b/benchmarks/build.gradle.kts @@ -2,24 +2,31 @@ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ -import org.jetbrains.kotlin.gradle.tasks.KotlinCompile +import me.champeau.gradle.* +import org.jetbrains.kotlin.gradle.tasks.* plugins { id("net.ltgt.apt") id("com.github.johnrengelman.shadow") - id("me.champeau.gradle.jmh") + id("me.champeau.gradle.jmh") apply false } repositories { maven("https://repo.typesafe.com/typesafe/releases/") } -tasks.withType().configureEach { - kotlinOptions.jvmTarget = "1.8" +java { + sourceCompatibility = JavaVersion.VERSION_1_8 + targetCompatibility = JavaVersion.VERSION_1_8 } -tasks.compileJmhKotlin { - kotlinOptions.freeCompilerArgs += "-Xjvm-default=enable" +apply(plugin="me.champeau.gradle.jmh") + +tasks.named("compileJmhKotlin") { + kotlinOptions { + jvmTarget = "1.8" + freeCompilerArgs += "-Xjvm-default=enable" + } } /* @@ -39,20 +46,20 @@ val removeRedundantFiles = tasks.register("removeRedundantFiles") { delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$histoOfLetters\$1\$\$special\$\$inlined\$fold\$1\$1.class") delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleBase\$play\$buildHistoOnScore\$1\$\$special\$\$inlined\$filter\$1\$1.class") delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleBase\$play\$histoOfLetters\$1\$\$special\$\$inlined\$fold\$1\$1.class") - delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble//SaneFlowPlaysScrabble\$play\$buildHistoOnScore\$1\$\$special\$\$inlined\$filter\$1\$1.class") + delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/SaneFlowPlaysScrabble\$play\$buildHistoOnScore\$1\$\$special\$\$inlined\$filter\$1\$1.class") // Primes delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/misc/Numbers\$\$special\$\$inlined\$filter\$1\$2\$1.class") delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/misc/Numbers\$\$special\$\$inlined\$filter\$1\$1.class") } -tasks.jmhRunBytecodeGenerator { +tasks.named("jmhRunBytecodeGenerator") { dependsOn(removeRedundantFiles) } // It is better to use the following to run benchmarks, otherwise you may get unexpected errors: // ./gradlew --no-daemon cleanJmhJar jmh -Pjmh="MyBenchmark" -jmh { +extensions.configure("jmh") { jmhVersion = "1.21" duplicateClassesStrategy = DuplicatesStrategy.INCLUDE failOnError = true @@ -63,7 +70,7 @@ jmh { // includeTests = false } -tasks.jmhJar { +tasks.named("jmhJar") { baseName = "benchmarks" classifier = null version = null @@ -76,10 +83,9 @@ dependencies { compile("io.reactivex.rxjava2:rxjava:2.1.9") compile("com.github.akarnokd:rxjava2-extensions:0.20.8") - compile("org.openjdk.jmh:jmh-core:1.21") compile("com.typesafe.akka:akka-actor_2.12:2.5.0") compile(project(":kotlinx-coroutines-core")) // add jmh dependency on main - jmhImplementation(sourceSets.main.get().runtimeClasspath) + "jmhImplementation"(sourceSets.main.get().runtimeClasspath) } diff --git a/benchmarks/src/jmh/kotlin/benchmarks/flow/FlowFlattenMergeBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/flow/FlowFlattenMergeBenchmark.kt index 3fff2697cc..149898881b 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/flow/FlowFlattenMergeBenchmark.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/flow/FlowFlattenMergeBenchmark.kt @@ -8,7 +8,7 @@ import benchmarks.common.* import kotlinx.coroutines.* import kotlinx.coroutines.flow.* import org.openjdk.jmh.annotations.* -import java.util.concurrent.* +import java.util.concurrent.TimeUnit /** * Benchmark to measure performance of [kotlinx.coroutines.flow.FlowKt.flattenMerge]. diff --git a/benchmarks/src/jmh/kotlin/benchmarks/flow/NumbersBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/flow/NumbersBenchmark.kt index 0cb31056bb..4ebb3d07ff 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/flow/NumbersBenchmark.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/flow/NumbersBenchmark.kt @@ -11,7 +11,8 @@ import io.reactivex.functions.* import kotlinx.coroutines.* import kotlinx.coroutines.flow.* import org.openjdk.jmh.annotations.* -import java.util.concurrent.* +import java.util.concurrent.TimeUnit +import java.util.concurrent.Callable @Warmup(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS) @Measurement(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS) diff --git a/benchmarks/src/jmh/kotlin/benchmarks/flow/TakeBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/flow/TakeBenchmark.kt index 1c469a69b9..1e12e2c33e 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/flow/TakeBenchmark.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/flow/TakeBenchmark.kt @@ -7,7 +7,7 @@ package benchmarks.flow import kotlinx.coroutines.* import kotlinx.coroutines.flow.* import org.openjdk.jmh.annotations.* -import java.util.concurrent.* +import java.util.concurrent.TimeUnit import java.util.concurrent.CancellationException import kotlin.coroutines.* import kotlin.coroutines.intrinsics.* diff --git a/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/FlowPlaysScrabbleBase.kt b/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/FlowPlaysScrabbleBase.kt index 9e39b43b8b..3501bdfed3 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/FlowPlaysScrabbleBase.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/FlowPlaysScrabbleBase.kt @@ -9,7 +9,7 @@ import kotlinx.coroutines.flow.* import org.openjdk.jmh.annotations.* import java.lang.Long.max import java.util.* -import java.util.concurrent.* +import java.util.concurrent.TimeUnit import kotlin.math.* @Warmup(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS) diff --git a/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/SaneFlowPlaysScrabble.kt b/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/SaneFlowPlaysScrabble.kt index 0a4f69672f..ad97dfa373 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/SaneFlowPlaysScrabble.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/SaneFlowPlaysScrabble.kt @@ -9,7 +9,7 @@ import kotlinx.coroutines.flow.* import org.openjdk.jmh.annotations.* import java.lang.Long.* import java.util.* -import java.util.concurrent.* +import java.util.concurrent.TimeUnit @Warmup(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS) @Measurement(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS) diff --git a/build.gradle b/build.gradle index 032c4e798b..bc4dd36afc 100644 --- a/build.gradle +++ b/build.gradle @@ -3,6 +3,7 @@ */ import org.jetbrains.kotlin.konan.target.HostManager +apply plugin: 'jdk-convention' apply from: rootProject.file("gradle/experimental.gradle") def rootModule = "kotlinx.coroutines" @@ -69,7 +70,6 @@ buildscript { classpath "org.jetbrains.kotlinx:atomicfu-gradle-plugin:$atomicfu_version" classpath "org.jetbrains.kotlinx:kotlinx-knit:$knit_version" classpath "com.moowork.gradle:gradle-node-plugin:$gradle_node_version" - classpath "org.openjfx:javafx-plugin:$javafx_plugin_version" classpath "org.jetbrains.kotlinx:binary-compatibility-validator:$binary_compatibility_validator_version" // JMH plugins @@ -264,22 +264,9 @@ configure(subprojects.findAll { !unpublished.contains(it.name) }) { // Report Kotlin compiler version when building project println("Using Kotlin compiler version: $org.jetbrains.kotlin.config.KotlinCompilerVersion.VERSION") -// --------------- Publish only from under JDK11+ --------------- -task checkJdkForPublish { - doFirst { - if (JavaVersionKt.javaVersionMajor < 11) { - throw new GradleException("Project can be build for publishing only under JDK 11+, but found ${JavaVersionKt.javaVersion}") - } - } -} - // --------------- Configure sub-projects that are published --------------- def publishTasks = getTasksByName("publish", true) + getTasksByName("publishNpm", true) -publishTasks.each { - it.dependsOn checkJdkForPublish -} - task deploy(dependsOn: publishTasks) apply plugin: 'base' diff --git a/buildSrc/src/main/kotlin/Idea.kt b/buildSrc/src/main/kotlin/Idea.kt new file mode 100644 index 0000000000..802b387b0d --- /dev/null +++ b/buildSrc/src/main/kotlin/Idea.kt @@ -0,0 +1,4 @@ +object Idea { + val active: Boolean + get() = System.getProperty("idea.active") == "true" +} diff --git a/buildSrc/src/main/kotlin/JavaVersion.kt b/buildSrc/src/main/kotlin/JavaVersion.kt deleted file mode 100644 index 2fbefce5e3..0000000000 --- a/buildSrc/src/main/kotlin/JavaVersion.kt +++ /dev/null @@ -1,7 +0,0 @@ -val javaVersion: String - get() = System.getProperty("java.version")!! - -val javaVersionMajor: Int - get() = javaVersion - .substringBefore(".") - .toInt() diff --git a/buildSrc/src/main/kotlin/jdk-convention.gradle.kts b/buildSrc/src/main/kotlin/jdk-convention.gradle.kts new file mode 100644 index 0000000000..8bc1abf17d --- /dev/null +++ b/buildSrc/src/main/kotlin/jdk-convention.gradle.kts @@ -0,0 +1,10 @@ +import org.gradle.api.JavaVersion + +if (!JavaVersion.current().isJava11Compatible) { + val message = "Project required JDK 11+, but found ${JavaVersion.current()}" + if (Idea.active) { + logger.error(message) + } else { + throw GradleException(message) + } +} diff --git a/gradle.properties b/gradle.properties index 8e611251eb..0a45ecd693 100644 --- a/gradle.properties +++ b/gradle.properties @@ -3,7 +3,7 @@ # # Kotlin -version=1.3.6-SNAPSHOT +version=1.3.7-SNAPSHOT group=org.jetbrains.kotlinx kotlin_version=1.3.71 diff --git a/integration/kotlinx-coroutines-jdk8/src/time/Time.kt b/integration/kotlinx-coroutines-jdk8/src/time/Time.kt index 807a3bbc3b..acff1d21cb 100644 --- a/integration/kotlinx-coroutines-jdk8/src/time/Time.kt +++ b/integration/kotlinx-coroutines-jdk8/src/time/Time.kt @@ -1,6 +1,8 @@ /* * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ +@file:OptIn(ExperimentalContracts::class) + package kotlinx.coroutines.time import kotlinx.coroutines.* @@ -8,6 +10,7 @@ import kotlinx.coroutines.flow.* import kotlinx.coroutines.selects.* import java.time.* import java.time.temporal.* +import kotlin.contracts.* /** * "java.time" adapter method for [kotlinx.coroutines.delay]. @@ -35,8 +38,12 @@ public fun SelectBuilder.onTimeout(duration: Duration, block: suspend () /** * "java.time" adapter method for [kotlinx.coroutines.withTimeout]. */ -public suspend fun withTimeout(duration: Duration, block: suspend CoroutineScope.() -> T): T = - kotlinx.coroutines.withTimeout(duration.coerceToMillis(), block) +public suspend fun withTimeout(duration: Duration, block: suspend CoroutineScope.() -> T): T { + contract { + callsInPlace(block, InvocationKind.EXACTLY_ONCE) + } + return kotlinx.coroutines.withTimeout(duration.coerceToMillis(), block) +} /** * "java.time" adapter method for [kotlinx.coroutines.withTimeoutOrNull]. diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api index c91d329465..6a24b6a23a 100644 --- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api +++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api @@ -214,6 +214,7 @@ public final class kotlinx/coroutines/CoroutineScopeKt { public static synthetic fun cancel$default (Lkotlinx/coroutines/CoroutineScope;Ljava/lang/String;Ljava/lang/Throwable;ILjava/lang/Object;)V public static synthetic fun cancel$default (Lkotlinx/coroutines/CoroutineScope;Ljava/util/concurrent/CancellationException;ILjava/lang/Object;)V public static final fun coroutineScope (Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun currentCoroutineContext (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun ensureActive (Lkotlinx/coroutines/CoroutineScope;)V public static final fun isActive (Lkotlinx/coroutines/CoroutineScope;)Z public static final fun plus (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/CoroutineScope; @@ -873,6 +874,7 @@ public final class kotlinx/coroutines/flow/FlowKt { public static final fun buffer (Lkotlinx/coroutines/flow/Flow;I)Lkotlinx/coroutines/flow/Flow; public static synthetic fun buffer$default (Lkotlinx/coroutines/flow/Flow;IILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow; public static final fun callbackFlow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; + public static final fun cancellable (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow; public static final fun catch (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow; public static final fun channelFlow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; public static final fun collect (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; diff --git a/kotlinx-coroutines-core/build.gradle b/kotlinx-coroutines-core/build.gradle index 451293c5ff..0222098405 100644 --- a/kotlinx-coroutines-core/build.gradle +++ b/kotlinx-coroutines-core/build.gradle @@ -55,6 +55,10 @@ compileKotlinMetadata { } kotlin.sourceSets { + jvm.dependencies { + compileOnly "com.google.android:annotations:4.1.1.4" + } + jvmTest.dependencies { api "org.jetbrains.kotlinx:lincheck:$lincheck_version" api "org.jetbrains.kotlinx:kotlinx-knit-test:$knit_version" diff --git a/kotlinx-coroutines-core/common/src/Builders.common.kt b/kotlinx-coroutines-core/common/src/Builders.common.kt index 7dd1b174ee..64bff500dc 100644 --- a/kotlinx-coroutines-core/common/src/Builders.common.kt +++ b/kotlinx-coroutines-core/common/src/Builders.common.kt @@ -4,6 +4,7 @@ @file:JvmMultifileClass @file:JvmName("BuildersKt") +@file:OptIn(ExperimentalContracts::class) package kotlinx.coroutines @@ -11,6 +12,7 @@ import kotlinx.atomicfu.* import kotlinx.coroutines.internal.* import kotlinx.coroutines.intrinsics.* import kotlinx.coroutines.selects.* +import kotlin.contracts.* import kotlin.coroutines.* import kotlin.coroutines.intrinsics.* import kotlin.jvm.* @@ -134,31 +136,36 @@ private class LazyDeferredCoroutine( public suspend fun withContext( context: CoroutineContext, block: suspend CoroutineScope.() -> T -): T = suspendCoroutineUninterceptedOrReturn sc@ { uCont -> - // compute new context - val oldContext = uCont.context - val newContext = oldContext + context - // always check for cancellation of new context - newContext.checkCompletion() - // FAST PATH #1 -- new context is the same as the old one - if (newContext === oldContext) { - val coroutine = ScopeCoroutine(newContext, uCont) - return@sc coroutine.startUndispatchedOrReturn(coroutine, block) +): T { + contract { + callsInPlace(block, InvocationKind.EXACTLY_ONCE) } - // FAST PATH #2 -- the new dispatcher is the same as the old one (something else changed) - // `equals` is used by design (see equals implementation is wrapper context like ExecutorCoroutineDispatcher) - if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) { - val coroutine = UndispatchedCoroutine(newContext, uCont) - // There are changes in the context, so this thread needs to be updated - withCoroutineContext(newContext, null) { + return suspendCoroutineUninterceptedOrReturn sc@ { uCont -> + // compute new context + val oldContext = uCont.context + val newContext = oldContext + context + // always check for cancellation of new context + newContext.checkCompletion() + // FAST PATH #1 -- new context is the same as the old one + if (newContext === oldContext) { + val coroutine = ScopeCoroutine(newContext, uCont) return@sc coroutine.startUndispatchedOrReturn(coroutine, block) } + // FAST PATH #2 -- the new dispatcher is the same as the old one (something else changed) + // `equals` is used by design (see equals implementation is wrapper context like ExecutorCoroutineDispatcher) + if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) { + val coroutine = UndispatchedCoroutine(newContext, uCont) + // There are changes in the context, so this thread needs to be updated + withCoroutineContext(newContext, null) { + return@sc coroutine.startUndispatchedOrReturn(coroutine, block) + } + } + // SLOW PATH -- use new dispatcher + val coroutine = DispatchedCoroutine(newContext, uCont) + coroutine.initParentJob() + block.startCoroutineCancellable(coroutine, coroutine) + coroutine.getResult() } - // SLOW PATH -- use new dispatcher - val coroutine = DispatchedCoroutine(newContext, uCont) - coroutine.initParentJob() - block.startCoroutineCancellable(coroutine, coroutine) - coroutine.getResult() } /** diff --git a/kotlinx-coroutines-core/common/src/CompletableDeferred.kt b/kotlinx-coroutines-core/common/src/CompletableDeferred.kt index cba8a63929..d24f1837cd 100644 --- a/kotlinx-coroutines-core/common/src/CompletableDeferred.kt +++ b/kotlinx-coroutines-core/common/src/CompletableDeferred.kt @@ -16,8 +16,11 @@ import kotlinx.coroutines.selects.* * * An instance of completable deferred can be created by `CompletableDeferred()` function in _active_ state. * - * All functions on this interface and on all interfaces derived from it are **thread-safe** and can + * All functions on this interface are **thread-safe** and can * be safely invoked from concurrent coroutines without external synchronization. + * + * **`CompletableDeferred` interface is not stable for inheritance in 3rd party libraries**, + * as new methods might be added to this interface in the future, but is stable for use. */ public interface CompletableDeferred : Deferred { /** diff --git a/kotlinx-coroutines-core/common/src/CompletableJob.kt b/kotlinx-coroutines-core/common/src/CompletableJob.kt index 4b4d16bc53..8e6b1ab02f 100644 --- a/kotlinx-coroutines-core/common/src/CompletableJob.kt +++ b/kotlinx-coroutines-core/common/src/CompletableJob.kt @@ -7,6 +7,12 @@ package kotlinx.coroutines /** * A job that can be completed using [complete()] function. * It is returned by [Job()][Job] and [SupervisorJob()][SupervisorJob] constructor functions. + * + * All functions on this interface are **thread-safe** and can + * be safely invoked from concurrent coroutines without external synchronization. + * + * **`CompletableJob` interface is not stable for inheritance in 3rd party libraries**, + * as new methods might be added to this interface in the future, but is stable for use. */ public interface CompletableJob : Job { /** diff --git a/kotlinx-coroutines-core/common/src/CoroutineScope.kt b/kotlinx-coroutines-core/common/src/CoroutineScope.kt index 7b5c645d1f..7dbd6a6d7b 100644 --- a/kotlinx-coroutines-core/common/src/CoroutineScope.kt +++ b/kotlinx-coroutines-core/common/src/CoroutineScope.kt @@ -1,11 +1,13 @@ /* * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ +@file:OptIn(ExperimentalContracts::class) package kotlinx.coroutines import kotlinx.coroutines.internal.* import kotlinx.coroutines.intrinsics.* +import kotlin.contracts.* import kotlin.coroutines.* import kotlin.coroutines.intrinsics.* @@ -183,11 +185,15 @@ public object GlobalScope : CoroutineScope { * or may throw a corresponding unhandled [Throwable] if there is any unhandled exception in this scope * (for example, from a crashed coroutine that was started with [launch][CoroutineScope.launch] in this scope). */ -public suspend fun coroutineScope(block: suspend CoroutineScope.() -> R): R = - suspendCoroutineUninterceptedOrReturn { uCont -> +public suspend fun coroutineScope(block: suspend CoroutineScope.() -> R): R { + contract { + callsInPlace(block, InvocationKind.EXACTLY_ONCE) + } + return suspendCoroutineUninterceptedOrReturn { uCont -> val coroutine = ScopeCoroutine(uCont.context, uCont) coroutine.startUndispatchedOrReturn(coroutine, block) } +} /** * Creates a [CoroutineScope] that wraps the given coroutine [context]. @@ -233,3 +239,19 @@ public fun CoroutineScope.cancel(message: String, cause: Throwable? = null): Uni * ``` */ public fun CoroutineScope.ensureActive(): Unit = coroutineContext.ensureActive() + + +/** + * Returns the current [CoroutineContext] retrieved by using [kotlin.coroutines.coroutineContext]. + * This function is an alias to avoid name clash with [CoroutineScope.coroutineContext] in a receiver position: + * + * ``` + * launch { // this: CoroutineScope + * val flow = flow { + * coroutineContext // Resolves into the context of outer launch, which is incorrect, see KT-38033 + * currentCoroutineContext() // Retrieves actual context where the flow is collected + * } + * } + * ``` + */ +public suspend inline fun currentCoroutineContext(): CoroutineContext = coroutineContext diff --git a/kotlinx-coroutines-core/common/src/Deferred.kt b/kotlinx-coroutines-core/common/src/Deferred.kt index f05abbdb43..72f3fde141 100644 --- a/kotlinx-coroutines-core/common/src/Deferred.kt +++ b/kotlinx-coroutines-core/common/src/Deferred.kt @@ -30,6 +30,9 @@ import kotlinx.coroutines.selects.* * * All functions on this interface and on all interfaces derived from it are **thread-safe** and can * be safely invoked from concurrent coroutines without external synchronization. + * + * **`Deferred` interface and all its derived interfaces are not stable for inheritance in 3rd party libraries**, + * as new methods might be added to this interface in the future, but is stable for use. */ public interface Deferred : Job { diff --git a/kotlinx-coroutines-core/common/src/Job.kt b/kotlinx-coroutines-core/common/src/Job.kt index 4dd783ca21..e10b138e08 100644 --- a/kotlinx-coroutines-core/common/src/Job.kt +++ b/kotlinx-coroutines-core/common/src/Job.kt @@ -19,17 +19,22 @@ import kotlin.jvm.* * culminates in its completion. * * Jobs can be arranged into parent-child hierarchies where cancellation - * of a parent leads to immediate cancellation of all its [children]. Failure or cancellation of a child - * with an exception other than [CancellationException] immediately cancels its parent. This way, a parent - * can [cancel] its own children (including all their children recursively) without cancelling itself. + * of a parent leads to immediate cancellation of all its [children] recursively. + * Failure of a child with an exception other than [CancellationException] immediately cancels its parent and, + * consequently, all its other children. This behavior can be customized using [SupervisorJob]. * - * The most basic instances of [Job] are created with [launch][CoroutineScope.launch] coroutine builder or with a - * `Job()` factory function. By default, a failure of any of the job's children leads to an immediate failure - * of its parent and cancellation of the rest of its children. This behavior can be customized using [SupervisorJob]. + * The most basic instances of `Job` interface are created like this: * - * Conceptually, an execution of the job does not produce a result value. Jobs are launched solely for their + * * **Coroutine job** is created with [launch][CoroutineScope.launch] coroutine builder. + * It runs a specified block of code and completes on completion of this block. + * * **[CompletableJob]** is created with a `Job()` factory function. + * It is completed by calling [CompletableJob.complete]. + * + * Conceptually, an execution of a job does not produce a result value. Jobs are launched solely for their * side-effects. See [Deferred] interface for a job that produces a result. * + * ### Job states + * * A job has the following states: * * | **State** | [isActive] | [isCompleted] | [isCancelled] | @@ -41,13 +46,23 @@ import kotlin.jvm.* * | _Cancelled_ (final state) | `false` | `true` | `true` | * | _Completed_ (final state) | `false` | `true` | `false` | * - * Usually, a job is created in _active_ state (it is created and started). However, coroutine builders - * that provide an optional `start` parameter create a coroutine in _new_ state when this parameter is set to + * Usually, a job is created in the _active_ state (it is created and started). However, coroutine builders + * that provide an optional `start` parameter create a coroutine in the _new_ state when this parameter is set to * [CoroutineStart.LAZY]. Such a job can be made _active_ by invoking [start] or [join]. * - * A job is _active_ while the coroutine is working. Failure of the job with exception makes it _cancelling_. + * A job is _active_ while the coroutine is working or until [CompletableJob] is completed, + * or until it fails or cancelled. + * + * Failure of an _active_ job with an exception makes it _cancelling_. * A job can be cancelled at any time with [cancel] function that forces it to transition to - * _cancelling_ state immediately. The job becomes _cancelled_ when it finishes executing its work. + * the _cancelling_ state immediately. The job becomes _cancelled_ when it finishes executing its work and + * all its children complete. + * + * Completion of an _active_ coroutine's body or a call to [CompletableJob.complete] transitions the job to + * the _completing_ state. It waits in the _completing_ state for all its children to complete before + * transitioning to the _completed_ state. + * Note that _completing_ state is purely internal to the job. For an outside observer a _completing_ job is still + * active, while internally it is waiting for its children. * * ``` * wait children @@ -67,19 +82,32 @@ import kotlin.jvm.* * [coroutineContext](https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.coroutines/coroutine-context.html) * represents the coroutine itself. * - * A job can have a _parent_ job. A job with a parent is cancelled when its parent is cancelled. - * Parent job waits in _completing_ or _cancelling_ state for all its children to complete before finishing. - * Note that _completing_ state is purely internal to the job. For an outside observer a _completing_ job is still - * active, while internally it is waiting for its children. + * ### Cancellation cause + * + * A coroutine job is said to _complete exceptionally_ when its body throws an exception; + * a [CompletableJob] is completed exceptionally by calling [CompletableJob.completeExceptionally]. + * An exceptionally completed job is cancelled and the corresponding exception becomes the _cancellation cause_ of the job. + * + * Normal cancellation of a job is distinguished from its failure by the type of this exception that caused its cancellation. + * A coroutine that threw [CancellationException] is considered to be _cancelled normally_. + * If a cancellation cause is a different exception type, then the job is considered to have _failed_. + * When a job has _failed_, then its parent gets cancelled with the exception of the same type, + * thus ensuring transparency in delegating parts of the job to its children. * - * Normal cancellation of a job is distinguished from its failure by the type of its cancellation exception cause. - * If the cause of cancellation is [CancellationException], then the job is considered to be _cancelled normally_. - * This usually happens when [cancel] is invoked without additional parameters. If the cause of cancellation is - * a different exception, then the job is considered to have _failed_. This usually happens when the code of the job - * encounters some problem and throws an exception. + * Note, that [cancel] function on a job only accepts [CancellationException] as a cancellation cause, thus + * calling [cancel] always results in a normal cancellation of a job, which does not lead to cancellation + * of its parent. This way, a parent can [cancel] its own children (cancelling all their children recursively, too) + * without cancelling itself. + * + * ### Concurrency and synchronization * * All functions on this interface and on all interfaces derived from it are **thread-safe** and can * be safely invoked from concurrent coroutines without external synchronization. + * + * ### Not stable for inheritance + * + * **`Job` interface and all its derived interfaces are not stable for inheritance in 3rd party libraries**, + * as new methods might be added to this interface in the future, but is stable for use. */ public interface Job : CoroutineContext.Element { /** diff --git a/kotlinx-coroutines-core/common/src/Supervisor.kt b/kotlinx-coroutines-core/common/src/Supervisor.kt index 1991119053..542e4fef48 100644 --- a/kotlinx-coroutines-core/common/src/Supervisor.kt +++ b/kotlinx-coroutines-core/common/src/Supervisor.kt @@ -1,13 +1,14 @@ /* * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ - +@file:OptIn(ExperimentalContracts::class) @file:Suppress("DEPRECATION_ERROR") package kotlinx.coroutines import kotlinx.coroutines.internal.* import kotlinx.coroutines.intrinsics.* +import kotlin.contracts.* import kotlin.coroutines.* import kotlin.coroutines.intrinsics.* import kotlin.jvm.* @@ -47,11 +48,15 @@ public fun SupervisorJob0(parent: Job? = null) : Job = SupervisorJob(parent) * A failure of the scope itself (exception thrown in the [block] or cancellation) fails the scope with all its children, * but does not cancel parent job. */ -public suspend fun supervisorScope(block: suspend CoroutineScope.() -> R): R = - suspendCoroutineUninterceptedOrReturn { uCont -> +public suspend fun supervisorScope(block: suspend CoroutineScope.() -> R): R { + contract { + callsInPlace(block, InvocationKind.EXACTLY_ONCE) + } + return suspendCoroutineUninterceptedOrReturn { uCont -> val coroutine = SupervisorCoroutine(uCont.context, uCont) coroutine.startUndispatchedOrReturn(coroutine, block) } +} private class SupervisorJobImpl(parent: Job?) : JobImpl(parent) { override fun childCancelled(cause: Throwable): Boolean = false diff --git a/kotlinx-coroutines-core/common/src/Timeout.kt b/kotlinx-coroutines-core/common/src/Timeout.kt index 87fe733773..c8e4455c92 100644 --- a/kotlinx-coroutines-core/common/src/Timeout.kt +++ b/kotlinx-coroutines-core/common/src/Timeout.kt @@ -1,12 +1,14 @@ /* * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ +@file:OptIn(ExperimentalContracts::class) package kotlinx.coroutines import kotlinx.coroutines.internal.* import kotlinx.coroutines.intrinsics.* import kotlinx.coroutines.selects.* +import kotlin.contracts.* import kotlin.coroutines.* import kotlin.coroutines.intrinsics.* import kotlin.jvm.* @@ -27,6 +29,9 @@ import kotlin.time.* * @param timeMillis timeout time in milliseconds. */ public suspend fun withTimeout(timeMillis: Long, block: suspend CoroutineScope.() -> T): T { + contract { + callsInPlace(block, InvocationKind.EXACTLY_ONCE) + } if (timeMillis <= 0L) throw TimeoutCancellationException("Timed out immediately") return suspendCoroutineUninterceptedOrReturn { uCont -> setupTimeout(TimeoutCoroutine(timeMillis, uCont), block) @@ -46,8 +51,12 @@ public suspend fun withTimeout(timeMillis: Long, block: suspend CoroutineSco * Implementation note: how the time is tracked exactly is an implementation detail of the context's [CoroutineDispatcher]. */ @ExperimentalTime -public suspend fun withTimeout(timeout: Duration, block: suspend CoroutineScope.() -> T): T = - withTimeout(timeout.toDelayMillis(), block) +public suspend fun withTimeout(timeout: Duration, block: suspend CoroutineScope.() -> T): T { + contract { + callsInPlace(block, InvocationKind.EXACTLY_ONCE) + } + return withTimeout(timeout.toDelayMillis(), block) +} /** * Runs a given suspending block of code inside a coroutine with a specified [timeout][timeMillis] and returns diff --git a/kotlinx-coroutines-core/common/src/flow/Builders.kt b/kotlinx-coroutines-core/common/src/flow/Builders.kt index 4157576aae..74a0ad2583 100644 --- a/kotlinx-coroutines-core/common/src/flow/Builders.kt +++ b/kotlinx-coroutines-core/common/src/flow/Builders.kt @@ -33,6 +33,7 @@ import kotlinx.coroutines.flow.internal.unsafeFlow as flow * * fibonacci().take(100).collect { println(it) } * ``` + * Emissions from [flow] builder are [cancellable] by default. * * `emit` should happen strictly in the dispatchers of the [block] in order to preserve the flow context. * For example, the following code will result in an [IllegalStateException]: @@ -49,14 +50,9 @@ import kotlinx.coroutines.flow.internal.unsafeFlow as flow public fun flow(@BuilderInference block: suspend FlowCollector.() -> Unit): Flow = SafeFlow(block) // Named anonymous object -private class SafeFlow(private val block: suspend FlowCollector.() -> Unit) : Flow { - override suspend fun collect(collector: FlowCollector) { - val safeCollector = SafeCollector(collector, coroutineContext) - try { - safeCollector.block() - } finally { - safeCollector.releaseIntercepted() - } +private class SafeFlow(private val block: suspend FlowCollector.() -> Unit) : AbstractFlow() { + override suspend fun collectSafely(collector: FlowCollector) { + collector.block() } } diff --git a/kotlinx-coroutines-core/common/src/flow/StateFlow.kt b/kotlinx-coroutines-core/common/src/flow/StateFlow.kt index f14a1ac45e..b2bbb6d3ae 100644 --- a/kotlinx-coroutines-core/common/src/flow/StateFlow.kt +++ b/kotlinx-coroutines-core/common/src/flow/StateFlow.kt @@ -109,7 +109,7 @@ public interface StateFlow : Flow { } /** - * A mutable [StateFlow] that provides a setter for [value] and a method to [close] the flow. + * A mutable [StateFlow] that provides a setter for [value]. * * See [StateFlow] documentation for details. * diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Context.kt b/kotlinx-coroutines-core/common/src/flow/operators/Context.kt index 4cb17428cc..afdcd9ed18 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Context.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Context.kt @@ -208,6 +208,21 @@ public fun Flow.flowOn(context: CoroutineContext): Flow { } } +/** + * Returns a flow which checks cancellation status on each emission and throws + * the corresponding cancellation cause if flow collector was cancelled. + * Note that [flow] builder is [cancellable] by default. + */ +public fun Flow.cancellable(): Flow { + if (this is AbstractFlow<*>) return this // Fast-path, already cancellable + return unsafeFlow { + collect { + currentCoroutineContext().ensureActive() + emit(it) + } + } +} + /** * The operator that changes the context where all transformations applied to the given flow within a [builder] are executed. * This operator is context preserving and does not affect the context of the preceding and subsequent operations. @@ -256,7 +271,7 @@ public fun Flow.flowWith( * All builders are written using scoping and no global coroutines are launched, so it is safe not to provide explicit Job. * It is also necessary not to mess with cancellation if multiple flowWith are used. */ - val originalContext = coroutineContext.minusKey(Job) + val originalContext = currentCoroutineContext().minusKey(Job) val prepared = source.flowOn(originalContext).buffer(bufferSize) builder(prepared).flowOn(flowContext).buffer(bufferSize).collect { value -> return@collect emit(value) diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt b/kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt index d296517bdb..1ffbf94a98 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt @@ -10,11 +10,10 @@ package kotlinx.coroutines.flow import kotlinx.coroutines.* import kotlinx.coroutines.flow.internal.* -import kotlin.coroutines.* import kotlin.jvm.* // ------------------ WARNING ------------------ -// These emitting operators must use safe flow builder, because their allow +// These emitting operators must use safe flow builder, because they allow // user code to directly emit to the underlying FlowCollector. /** @@ -70,7 +69,7 @@ internal inline fun Flow.unsafeTransform( public fun Flow.onStart( action: suspend FlowCollector.() -> Unit ): Flow = unsafeFlow { // Note: unsafe flow is used here, but safe collector is used to invoke start action - val safeCollector = SafeCollector(this, coroutineContext) + val safeCollector = SafeCollector(this, currentCoroutineContext()) try { safeCollector.action() } finally { @@ -153,7 +152,7 @@ public fun Flow.onCompletion( throw e } // Normal completion - SafeCollector(this, coroutineContext).invokeSafely(action, null) + SafeCollector(this, currentCoroutineContext()).invokeSafely(action, null) } /** @@ -178,7 +177,7 @@ public fun Flow.onEmpty( emit(it) } if (isEmpty) { - val collector = SafeCollector(this, coroutineContext) + val collector = SafeCollector(this, currentCoroutineContext()) try { collector.action() } finally { diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Lint.kt b/kotlinx-coroutines-core/common/src/flow/operators/Lint.kt index 23bc1f84cb..5500034e9f 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Lint.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Lint.kt @@ -4,6 +4,7 @@ package kotlinx.coroutines.flow +import kotlinx.coroutines.* import kotlin.coroutines.* /** @@ -41,3 +42,31 @@ public fun StateFlow.conflate(): Flow = noImpl() replaceWith = ReplaceWith("this") ) public fun StateFlow.distinctUntilChanged(): Flow = noImpl() + +//@Deprecated( +// message = "isActive is resolved into the extension of outer CoroutineScope which is likely to be an error." + +// "Use currentCoroutineContext().isActive or cancellable() operator instead " + +// "or specify the receiver of isActive explicitly. " + +// "Additionally, flow {} builder emissions are cancellable by default.", +// level = DeprecationLevel.WARNING, // ERROR in 1.4 +// replaceWith = ReplaceWith("currentCoroutineContext().isActive") +//) +//public val FlowCollector<*>.isActive: Boolean +// get() = noImpl() +// +//@Deprecated( +// message = "cancel() is resolved into the extension of outer CoroutineScope which is likely to be an error." + +// "Use currentCoroutineContext().cancel() instead or specify the receiver of cancel() explicitly", +// level = DeprecationLevel.WARNING, // ERROR in 1.4 +// replaceWith = ReplaceWith("currentCoroutineContext().cancel(cause)") +//) +//public fun FlowCollector<*>.cancel(cause: CancellationException? = null): Unit = noImpl() +// +//@Deprecated( +// message = "coroutineContext is resolved into the property of outer CoroutineScope which is likely to be an error." + +// "Use currentCoroutineContext() instead or specify the receiver of coroutineContext explicitly", +// level = DeprecationLevel.WARNING, // ERROR in 1.4 +// replaceWith = ReplaceWith("currentCoroutineContext()") +//) +//public val FlowCollector<*>.coroutineContext: CoroutineContext +// get() = noImpl() \ No newline at end of file diff --git a/kotlinx-coroutines-core/common/src/selects/Select.kt b/kotlinx-coroutines-core/common/src/selects/Select.kt index 0595341f92..e744a0c724 100644 --- a/kotlinx-coroutines-core/common/src/selects/Select.kt +++ b/kotlinx-coroutines-core/common/src/selects/Select.kt @@ -1,6 +1,7 @@ /* * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ +@file:OptIn(ExperimentalContracts::class) package kotlinx.coroutines.selects @@ -10,6 +11,7 @@ import kotlinx.coroutines.channels.* import kotlinx.coroutines.internal.* import kotlinx.coroutines.intrinsics.* import kotlinx.coroutines.sync.* +import kotlin.contracts.* import kotlin.coroutines.* import kotlin.coroutines.intrinsics.* import kotlin.jvm.* @@ -199,8 +201,11 @@ public interface SelectInstance { * Note that this function does not check for cancellation when it is not suspended. * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed. */ -public suspend inline fun select(crossinline builder: SelectBuilder.() -> Unit): R = - suspendCoroutineUninterceptedOrReturn { uCont -> +public suspend inline fun select(crossinline builder: SelectBuilder.() -> Unit): R { + contract { + callsInPlace(builder, InvocationKind.EXACTLY_ONCE) + } + return suspendCoroutineUninterceptedOrReturn { uCont -> val scope = SelectBuilderImpl(uCont) try { builder(scope) @@ -209,6 +214,7 @@ public suspend inline fun select(crossinline builder: SelectBuilder.() -> } scope.getResult() } +} @SharedImmutable diff --git a/kotlinx-coroutines-core/common/src/sync/Mutex.kt b/kotlinx-coroutines-core/common/src/sync/Mutex.kt index 769c9f1168..61e046c77a 100644 --- a/kotlinx-coroutines-core/common/src/sync/Mutex.kt +++ b/kotlinx-coroutines-core/common/src/sync/Mutex.kt @@ -9,6 +9,7 @@ import kotlinx.coroutines.* import kotlinx.coroutines.internal.* import kotlinx.coroutines.intrinsics.* import kotlinx.coroutines.selects.* +import kotlin.contracts.* import kotlin.coroutines.* import kotlin.jvm.* import kotlin.native.concurrent.* @@ -106,7 +107,12 @@ public fun Mutex(locked: Boolean = false): Mutex = * * @return the return value of the action. */ +@OptIn(ExperimentalContracts::class) public suspend inline fun Mutex.withLock(owner: Any? = null, action: () -> T): T { + contract { + callsInPlace(action, InvocationKind.EXACTLY_ONCE) + } + lock(owner) try { return action() diff --git a/kotlinx-coroutines-core/common/src/sync/Semaphore.kt b/kotlinx-coroutines-core/common/src/sync/Semaphore.kt index 125bbaaeb5..27c976ce3f 100644 --- a/kotlinx-coroutines-core/common/src/sync/Semaphore.kt +++ b/kotlinx-coroutines-core/common/src/sync/Semaphore.kt @@ -7,6 +7,7 @@ package kotlinx.coroutines.sync import kotlinx.atomicfu.* import kotlinx.coroutines.* import kotlinx.coroutines.internal.* +import kotlin.contracts.* import kotlin.coroutines.* import kotlin.math.* import kotlin.native.concurrent.SharedImmutable @@ -74,7 +75,12 @@ public fun Semaphore(permits: Int, acquiredPermits: Int = 0): Semaphore = Semaph * * @return the return value of the [action]. */ +@OptIn(ExperimentalContracts::class) public suspend inline fun Semaphore.withPermit(action: () -> T): T { + contract { + callsInPlace(action, InvocationKind.EXACTLY_ONCE) + } + acquire() try { return action() diff --git a/kotlinx-coroutines-core/common/test/BuilderContractsTest.kt b/kotlinx-coroutines-core/common/test/BuilderContractsTest.kt new file mode 100644 index 0000000000..b20dd6b1d2 --- /dev/null +++ b/kotlinx-coroutines-core/common/test/BuilderContractsTest.kt @@ -0,0 +1,52 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines + +import kotlinx.coroutines.selects.* +import kotlin.test.* + +class BuilderContractsTest : TestBase() { + + @Test + fun testContracts() = runTest { + // Coroutine scope + val cs: Int + coroutineScope { + cs = 42 + } + consume(cs) + + // Supervisor scope + val svs: Int + supervisorScope { + svs = 21 + } + consume(svs) + + // with context scope + val wctx: Int + withContext(Dispatchers.Unconfined) { + wctx = 239 + } + consume(wctx) + + val wt: Int + withTimeout(Long.MAX_VALUE) { + wt = 123 + } + consume(wt) + + val s: Int + select { + s = 42 + Job().apply { complete() }.onJoin {} + } + consume(s) + } + + private fun consume(a: Int) { + a.hashCode() // BE codegen verification + } +} \ No newline at end of file diff --git a/kotlinx-coroutines-core/common/test/flow/operators/CancellableTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/CancellableTest.kt new file mode 100644 index 0000000000..b1b14c4c40 --- /dev/null +++ b/kotlinx-coroutines-core/common/test/flow/operators/CancellableTest.kt @@ -0,0 +1,38 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.flow.operators + +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* +import kotlin.test.* + +class CancellableTest : TestBase() { + + @Test + fun testCancellable() = runTest { + var sum = 0 + val flow = (0..1000).asFlow() + .onEach { + if (it != 0) currentCoroutineContext().cancel() + sum += it + } + + flow.launchIn(this).join() + assertEquals(500500, sum) + + sum = 0 + flow.cancellable().launchIn(this).join() + assertEquals(1, sum) + } + + @Test + fun testFastPath() { + val flow = listOf(1).asFlow() + assertNotSame(flow, flow.cancellable()) + + val cancellableFlow = flow { emit(42) } + assertSame(cancellableFlow, cancellableFlow.cancellable()) + } +} diff --git a/kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeTest.kt index 684923c861..7470289ece 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeTest.kt @@ -58,6 +58,7 @@ class FlatMapMergeTest : FlatMapMergeBaseTest() { } launch { expect(2) + yield() job.cancel() } } diff --git a/kotlinx-coroutines-core/js/src/flow/internal/SafeCollector.kt b/kotlinx-coroutines-core/js/src/flow/internal/SafeCollector.kt index 78f1bdb2a9..ace633ccc6 100644 --- a/kotlinx-coroutines-core/js/src/flow/internal/SafeCollector.kt +++ b/kotlinx-coroutines-core/js/src/flow/internal/SafeCollector.kt @@ -4,6 +4,7 @@ package kotlinx.coroutines.flow.internal +import kotlinx.coroutines.* import kotlinx.coroutines.flow.* import kotlin.coroutines.* @@ -17,7 +18,8 @@ internal actual class SafeCollector actual constructor( private var lastEmissionContext: CoroutineContext? = null override suspend fun emit(value: T) { - val currentContext = coroutineContext + val currentContext = currentCoroutineContext() + currentContext.ensureActive() if (lastEmissionContext !== currentContext) { checkContext(currentContext) lastEmissionContext = currentContext diff --git a/kotlinx-coroutines-core/jvm/src/Builders.kt b/kotlinx-coroutines-core/jvm/src/Builders.kt index b8a250fef6..e4504ccdd4 100644 --- a/kotlinx-coroutines-core/jvm/src/Builders.kt +++ b/kotlinx-coroutines-core/jvm/src/Builders.kt @@ -4,10 +4,12 @@ @file:JvmMultifileClass @file:JvmName("BuildersKt") +@file:OptIn(ExperimentalContracts::class) package kotlinx.coroutines import java.util.concurrent.locks.* +import kotlin.contracts.* import kotlin.coroutines.* /** @@ -34,6 +36,9 @@ import kotlin.coroutines.* */ @Throws(InterruptedException::class) public fun runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T { + contract { + callsInPlace(block, InvocationKind.EXACTLY_ONCE) + } val currentThread = Thread.currentThread() val contextInterceptor = context[ContinuationInterceptor] val eventLoop: EventLoop? diff --git a/kotlinx-coroutines-core/jvm/src/debug/AgentPremain.kt b/kotlinx-coroutines-core/jvm/src/debug/AgentPremain.kt index 6493077593..c4d6f530b7 100644 --- a/kotlinx-coroutines-core/jvm/src/debug/AgentPremain.kt +++ b/kotlinx-coroutines-core/jvm/src/debug/AgentPremain.kt @@ -9,8 +9,14 @@ import sun.misc.* import java.lang.instrument.* import java.lang.instrument.ClassFileTransformer import java.security.* +import android.annotation.* +/* + * This class is loaded if and only if kotlinx-coroutines-core was used as -javaagent argument, + * but Android complains anyway (java.lang.instrument.*), so we suppress all lint checks here + */ @Suppress("unused") +@SuppressLint("all") internal object AgentPremain { public var isInstalledStatically = false diff --git a/kotlinx-coroutines-core/jvm/src/flow/internal/SafeCollector.kt b/kotlinx-coroutines-core/jvm/src/flow/internal/SafeCollector.kt index 4647a14245..a8e04f0f16 100644 --- a/kotlinx-coroutines-core/jvm/src/flow/internal/SafeCollector.kt +++ b/kotlinx-coroutines-core/jvm/src/flow/internal/SafeCollector.kt @@ -4,6 +4,7 @@ package kotlinx.coroutines.flow.internal +import kotlinx.coroutines.* import kotlinx.coroutines.flow.* import kotlin.coroutines.* import kotlin.coroutines.intrinsics.* @@ -62,6 +63,7 @@ internal actual class SafeCollector actual constructor( private fun emit(uCont: Continuation, value: T): Any? { val currentContext = uCont.context + currentContext.ensureActive() // This check is triggered once per flow on happy path. val previousContext = lastEmissionContext if (previousContext !== currentContext) { diff --git a/kotlinx-coroutines-core/jvm/test/RunBlockingTest.kt b/kotlinx-coroutines-core/jvm/test/RunBlockingTest.kt index d21a9f895b..e20362ff93 100644 --- a/kotlinx-coroutines-core/jvm/test/RunBlockingTest.kt +++ b/kotlinx-coroutines-core/jvm/test/RunBlockingTest.kt @@ -162,4 +162,13 @@ class RunBlockingTest : TestBase() { handle.dispose() } + + @Test + fun testContract() { + val rb: Int + runBlocking { + rb = 42 + } + rb.hashCode() // unused + } } diff --git a/kotlinx-coroutines-core/jvm/test/flow/FlowCancellationTest.kt b/kotlinx-coroutines-core/jvm/test/flow/FlowCancellationTest.kt new file mode 100644 index 0000000000..b6ba3514f6 --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/flow/FlowCancellationTest.kt @@ -0,0 +1,46 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.flow + +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* +import kotlin.test.* + +class FlowCancellationTest : TestBase() { + + @Test + fun testEmitIsCooperative() = runTest { + val latch = Channel(1) + val job = flow { + expect(1) + latch.send(Unit) + while (true) { + emit(42) + } + }.launchIn(this + Dispatchers.Default) + + latch.receive() + expect(2) + job.cancelAndJoin() + finish(3) + } + + @Test + fun testIsActiveOnCurrentContext() = runTest { + val latch = Channel(1) + val job = flow { + expect(1) + latch.send(Unit) + while (currentCoroutineContext().isActive) { + // Do nothing + } + }.launchIn(this + Dispatchers.Default) + + latch.receive() + expect(2) + job.cancelAndJoin() + finish(3) + } +} \ No newline at end of file diff --git a/kotlinx-coroutines-core/native/src/flow/internal/SafeCollector.kt b/kotlinx-coroutines-core/native/src/flow/internal/SafeCollector.kt index 78f1bdb2a9..ace633ccc6 100644 --- a/kotlinx-coroutines-core/native/src/flow/internal/SafeCollector.kt +++ b/kotlinx-coroutines-core/native/src/flow/internal/SafeCollector.kt @@ -4,6 +4,7 @@ package kotlinx.coroutines.flow.internal +import kotlinx.coroutines.* import kotlinx.coroutines.flow.* import kotlin.coroutines.* @@ -17,7 +18,8 @@ internal actual class SafeCollector actual constructor( private var lastEmissionContext: CoroutineContext? = null override suspend fun emit(value: T) { - val currentContext = coroutineContext + val currentContext = currentCoroutineContext() + currentContext.ensureActive() if (lastEmissionContext !== currentContext) { checkContext(currentContext) lastEmissionContext = currentContext diff --git a/kotlinx-coroutines-debug/README.md b/kotlinx-coroutines-debug/README.md index a6cab20475..bd025159b9 100644 --- a/kotlinx-coroutines-debug/README.md +++ b/kotlinx-coroutines-debug/README.md @@ -23,7 +23,7 @@ https://github.com/reactor/BlockHound/blob/1.0.2.RELEASE/docs/quick_start.md). Add `kotlinx-coroutines-debug` to your project test dependencies: ``` dependencies { - testImplementation 'org.jetbrains.kotlinx:kotlinx-coroutines-debug:1.3.6' + testImplementation 'org.jetbrains.kotlinx:kotlinx-coroutines-debug:1.3.7' } ``` @@ -61,7 +61,7 @@ stacktraces will be dumped to the console. ### Using as JVM agent Debug module can also be used as a standalone JVM agent to enable debug probes on the application startup. -You can run your application with an additional argument: `-javaagent:kotlinx-coroutines-debug-1.3.6.jar`. +You can run your application with an additional argument: `-javaagent:kotlinx-coroutines-debug-1.3.7.jar`. Additionally, on Linux and Mac OS X you can use `kill -5 $pid` command in order to force your application to print all alive coroutines. When used as Java agent, `"kotlinx.coroutines.debug.enable.creation.stack.trace"` system property can be used to control [DebugProbes.enableCreationStackTraces] along with agent startup. diff --git a/kotlinx-coroutines-test/README.md b/kotlinx-coroutines-test/README.md index 8b5329b141..ec37aaac51 100644 --- a/kotlinx-coroutines-test/README.md +++ b/kotlinx-coroutines-test/README.md @@ -9,7 +9,7 @@ This package provides testing utilities for effectively testing coroutines. Add `kotlinx-coroutines-test` to your project test dependencies: ``` dependencies { - testImplementation 'org.jetbrains.kotlinx:kotlinx-coroutines-test:1.3.6' + testImplementation 'org.jetbrains.kotlinx:kotlinx-coroutines-test:1.3.7' } ``` diff --git a/reactive/kotlinx-coroutines-reactive/src/Convert.kt b/reactive/kotlinx-coroutines-reactive/src/Convert.kt index fb06ca3a3c..727eff8b48 100644 --- a/reactive/kotlinx-coroutines-reactive/src/Convert.kt +++ b/reactive/kotlinx-coroutines-reactive/src/Convert.kt @@ -16,7 +16,7 @@ import kotlin.coroutines.* * @param context -- the coroutine context from which the resulting observable is going to be signalled */ @Deprecated(message = "Deprecated in the favour of consumeAsFlow()", - level = DeprecationLevel.WARNING, + level = DeprecationLevel.WARNING, // Error in 1.4 replaceWith = ReplaceWith("this.consumeAsFlow().asPublisher()")) public fun ReceiveChannel.asPublisher(context: CoroutineContext = EmptyCoroutineContext): Publisher = publish(context) { for (t in this@asPublisher) diff --git a/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt b/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt index 20e59f6a3a..96ae6287c1 100644 --- a/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt +++ b/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt @@ -85,6 +85,7 @@ private class PublisherAsFlow( var consumed = 0L while (true) { val value = subscriber.takeNextOrNull() ?: break + coroutineContext.ensureActive() collector.emit(value) if (++consumed == requestSize) { consumed = 0L diff --git a/reactive/kotlinx-coroutines-reactor/test/BackpressureTest.kt b/reactive/kotlinx-coroutines-reactor/test/BackpressureTest.kt index 80feaeb865..3d6398aa13 100644 --- a/reactive/kotlinx-coroutines-reactor/test/BackpressureTest.kt +++ b/reactive/kotlinx-coroutines-reactor/test/BackpressureTest.kt @@ -5,6 +5,7 @@ package kotlinx.coroutines.reactor import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* import kotlinx.coroutines.flow.* import kotlinx.coroutines.reactive.* import org.junit.Test @@ -36,4 +37,17 @@ class BackpressureTest : TestBase() { } finish(3) } + + @Test + fun testCooperativeCancellation() = runTest { + val flow = Flux.fromIterable((0L..Long.MAX_VALUE)).asFlow() + flow.onEach { if (it > 10) currentCoroutineContext().cancel() }.launchIn(this + Dispatchers.Default).join() + } + + @Test + fun testCooperativeCancellationForBuffered() = runTest(expected = { it is CancellationException }) { + val flow = Flux.fromIterable((0L..Long.MAX_VALUE)).asFlow() + val channel = flow.onEach { if (it > 10) currentCoroutineContext().cancel() }.produceIn(this + Dispatchers.Default) + channel.consumeEach { /* Do nothing, just consume elements */ } + } } \ No newline at end of file diff --git a/settings.gradle b/settings.gradle index 2f0e661b4c..d22d65fd25 100644 --- a/settings.gradle +++ b/settings.gradle @@ -4,8 +4,11 @@ pluginManagement { plugins { + id "org.openjfx.javafxplugin" version javafx_plugin_version + + // JMH id "net.ltgt.apt" version "0.21" - id "me.champeau.gradle.jmh" version "0.5.0-rc-2" + id "me.champeau.gradle.jmh" version "0.5.0" } } @@ -44,7 +47,9 @@ module('reactive/kotlinx-coroutines-rx2') module('reactive/kotlinx-coroutines-rx3') module('ui/kotlinx-coroutines-android') module('ui/kotlinx-coroutines-android/android-unit-tests') -module('ui/kotlinx-coroutines-javafx') +if (JavaVersion.current().isJava11Compatible()) { + module('ui/kotlinx-coroutines-javafx') +} module('ui/kotlinx-coroutines-swing') module('js/js-stub') diff --git a/ui/coroutines-guide-ui.md b/ui/coroutines-guide-ui.md index 2c6d309ee8..ac3a0a5d9c 100644 --- a/ui/coroutines-guide-ui.md +++ b/ui/coroutines-guide-ui.md @@ -110,7 +110,7 @@ Add dependencies on `kotlinx-coroutines-android` module to the `dependencies { . `app/build.gradle` file: ```groovy -implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.6" +implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.7" ``` You can clone [kotlinx.coroutines](https://github.com/Kotlin/kotlinx.coroutines) project from GitHub onto your diff --git a/ui/kotlinx-coroutines-android/animation-app/gradle.properties b/ui/kotlinx-coroutines-android/animation-app/gradle.properties index 28ed356afd..9a5627f24e 100644 --- a/ui/kotlinx-coroutines-android/animation-app/gradle.properties +++ b/ui/kotlinx-coroutines-android/animation-app/gradle.properties @@ -21,7 +21,7 @@ org.gradle.jvmargs=-Xmx1536m # org.gradle.parallel=true kotlin_version=1.3.71 -coroutines_version=1.3.6 +coroutines_version=1.3.7 android.useAndroidX=true android.enableJetifier=true diff --git a/ui/kotlinx-coroutines-android/example-app/gradle.properties b/ui/kotlinx-coroutines-android/example-app/gradle.properties index 28ed356afd..9a5627f24e 100644 --- a/ui/kotlinx-coroutines-android/example-app/gradle.properties +++ b/ui/kotlinx-coroutines-android/example-app/gradle.properties @@ -21,7 +21,7 @@ org.gradle.jvmargs=-Xmx1536m # org.gradle.parallel=true kotlin_version=1.3.71 -coroutines_version=1.3.6 +coroutines_version=1.3.7 android.useAndroidX=true android.enableJetifier=true diff --git a/ui/kotlinx-coroutines-javafx/build.gradle b/ui/kotlinx-coroutines-javafx/build.gradle index daabda40ff..77f1b09650 100644 --- a/ui/kotlinx-coroutines-javafx/build.gradle +++ b/ui/kotlinx-coroutines-javafx/build.gradle @@ -2,15 +2,14 @@ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ -// JDK11+ does not bundle JavaFx and the plugin for JavaFx support is compiled with class file version 55.0 (JDK 11) -if (JavaVersionKt.javaVersionMajor >= 11) { - apply plugin: 'org.openjfx.javafxplugin' +plugins { + id 'org.openjfx.javafxplugin' +} - javafx { - version = javafx_version - modules = ['javafx.controls'] - configuration = 'compile' - } +javafx { + version = javafx_version + modules = ['javafx.controls'] + configuration = 'compile' } task checkJdk8() {