diff --git a/CHANGES.md b/CHANGES.md index e4b1653f8b..4db08d9d48 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,18 @@ # Change log for kotlinx.coroutines +## Version 1.3.1 + +This is a minor update with various fixes: +* Flow: Fix recursion in combineTransform (#1466). +* Fixed race in the Semaphore (#1477). +* Repaired some of ListenableFuture.kt's cancellation corner cases (#1441). +* Consistently unwrap exception in slow path of CompletionStage.asDeferred (#1479). +* Various fixes in documentation (#1496, #1476, #1470, #1468). +* Various cleanups and additions in tests. + +Note: Kotlin/Native artifacts are now published with Gradle metadata format version 1.0, so you will need +Gradle version 5.3 or later to use this version of kotlinx.coroutines in your Kotlin/Native project. + ## Version 1.3.0 ### Flow diff --git a/README.md b/README.md index bc58ab7a60..e31d6f242f 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ [![official JetBrains project](https://jb.gg/badges/official.svg)](https://confluence.jetbrains.com/display/ALL/JetBrains+on+GitHub) [![GitHub license](https://img.shields.io/badge/license-Apache%20License%202.0-blue.svg?style=flat)](https://www.apache.org/licenses/LICENSE-2.0) -[![Download](https://api.bintray.com/packages/kotlin/kotlinx/kotlinx.coroutines/images/download.svg?version=1.3.0) ](https://bintray.com/kotlin/kotlinx/kotlinx.coroutines/1.3.0) +[![Download](https://api.bintray.com/packages/kotlin/kotlinx/kotlinx.coroutines/images/download.svg?version=1.3.1) ](https://bintray.com/kotlin/kotlinx/kotlinx.coroutines/1.3.1) Library support for Kotlin coroutines with [multiplatform](#multiplatform) support. This is a companion version for Kotlin `1.3.50` release. @@ -83,7 +83,7 @@ Add dependencies (you can also add other modules that you need): org.jetbrains.kotlinx kotlinx-coroutines-core - 1.3.0 + 1.3.1 ``` @@ -101,7 +101,7 @@ Add dependencies (you can also add other modules that you need): ```groovy dependencies { - implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.0' + implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.1' } ``` @@ -127,7 +127,7 @@ Add dependencies (you can also add other modules that you need): ```groovy dependencies { - implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.0") + implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.1") } ``` @@ -146,7 +146,7 @@ Make sure that you have either `jcenter()` or `mavenCentral()` in the list of re Core modules of `kotlinx.coroutines` are also available for [Kotlin/JS](#js) and [Kotlin/Native](#native). In common code that should get compiled for different platforms, add dependency to -[`kotlinx-coroutines-core-common`](https://search.maven.org/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core-common/1.3.0/jar) +[`kotlinx-coroutines-core-common`](https://search.maven.org/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core-common/1.3.1/jar) (follow the link to get the dependency declaration snippet). ### Android @@ -155,7 +155,7 @@ Add [`kotlinx-coroutines-android`](ui/kotlinx-coroutines-android) module as dependency when using `kotlinx.coroutines` on Android: ```groovy -implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.0' +implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.1' ``` This gives you access to Android [Dispatchers.Main] @@ -174,7 +174,7 @@ R8 is a replacement for ProGuard in Android ecosystem, it is enabled by default ### JS [Kotlin/JS](https://kotlinlang.org/docs/reference/js-overview.html) version of `kotlinx.coroutines` is published as -[`kotlinx-coroutines-core-js`](https://search.maven.org/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core-js/1.3.0/jar) +[`kotlinx-coroutines-core-js`](https://search.maven.org/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core-js/1.3.1/jar) (follow the link to get the dependency declaration snippet). You can also use [`kotlinx-coroutines-core`](https://www.npmjs.com/package/kotlinx-coroutines-core) package via NPM. @@ -182,7 +182,7 @@ You can also use [`kotlinx-coroutines-core`](https://www.npmjs.com/package/kotli ### Native [Kotlin/Native](https://kotlinlang.org/docs/reference/native-overview.html) version of `kotlinx.coroutines` is published as -[`kotlinx-coroutines-core-native`](https://search.maven.org/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core-native/1.3.0/jar) +[`kotlinx-coroutines-core-native`](https://search.maven.org/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core-native/1.3.1/jar) (follow the link to get the dependency declaration snippet). Only single-threaded code (JS-style) on Kotlin/Native is currently supported. diff --git a/RELEASE.md b/RELEASE.md index 3f936ac9e6..22140e68c7 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -19,7 +19,7 @@ To release new `` of `kotlinx-coroutines`: * [`ui/kotlinx-coroutines-android/animation-app/gradle.properties`](ui/kotlinx-coroutines-android/animation-app/gradle.properties) * Make sure to **exclude** `CHANGES.md` from replacements. - As an alternative approach you can use `./bump_version.sh old_version new_version` + As an alternative approach you can use `./bump-version.sh old_version new_version` 5. Write release notes in [`CHANGES.md`](CHANGES.md): * Use old releases as example of style. @@ -76,5 +76,5 @@ To release new `` of `kotlinx-coroutines`: 9. Merge release from `master`:
`git merge origin/master` -10. Push updates to `develop`:
+0. Push updates to `develop`:
`git push` diff --git a/binary-compatibility-validator/build.gradle b/binary-compatibility-validator/build.gradle index 5f985328de..c6eaffdfca 100644 --- a/binary-compatibility-validator/build.gradle +++ b/binary-compatibility-validator/build.gradle @@ -1,9 +1,10 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ configurations { testArtifacts + configureKotlinJvmPlatform(testArtifacts) } dependencies { diff --git a/build.gradle b/build.gradle index 46e8fbc32f..4fe2225e6b 100644 --- a/build.gradle +++ b/build.gradle @@ -61,7 +61,6 @@ buildscript { classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version" classpath "org.jetbrains.dokka:dokka-gradle-plugin:$dokka_version" classpath "org.jetbrains.kotlinx:atomicfu-gradle-plugin:$atomicfu_version" - classpath "com.jfrog.bintray.gradle:gradle-bintray-plugin:$bintray_version" classpath "com.moowork.gradle:gradle-node-plugin:$gradle_node_version" classpath "io.spring.gradle:dependency-management-plugin:$spring_dependency_management_version" @@ -72,6 +71,13 @@ buildscript { } } +import org.jetbrains.kotlin.gradle.plugin.KotlinPlatformType + +// todo:KLUDGE: This is needed to workaround dependency resolution between Java and MPP modules +def configureKotlinJvmPlatform(configuration) { + configuration.attributes.attribute(KotlinPlatformType.attribute, KotlinPlatformType.jvm) +} + allprojects { // the only place where HostManager could be instantiated project.ext.hostManager = new HostManager() @@ -98,6 +104,14 @@ allprojects { } ext.unpublished = unpublished + + // This project property is set during nightly stress test + def stressTest = project.properties['stressTest'] + + // Copy it to all test tasks + tasks.withType(Test) { + systemProperty 'stressTest', stressTest + } } allprojects { @@ -219,10 +233,6 @@ configure(subprojects.findAll { !unpublished.contains(it.name) }) { } } } - - if (platformOf(it) == "jvm") { - dokkaJavadoc.dependsOn project(":$coreModule").dokka - } } } @@ -230,7 +240,7 @@ configure(subprojects.findAll { !unpublished.contains(it.name) }) { println("Using Kotlin compiler version: $org.jetbrains.kotlin.config.KotlinCompilerVersion.VERSION") // --------------- Configure sub-projects that are published --------------- -task deploy(dependsOn: getTasksByName("bintrayUpload", true) + getTasksByName("publishNpm", true)) +task deploy(dependsOn: getTasksByName("publish", true) + getTasksByName("publishNpm", true)) apply plugin: 'base' diff --git a/gradle.properties b/gradle.properties index 6b3060e680..335c999977 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,15 +1,18 @@ +# +# Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. +# + # Kotlin -version=1.3.0-SNAPSHOT +version=1.3.1-SNAPSHOT group=org.jetbrains.kotlinx kotlin_version=1.3.50 # Dependencies junit_version=4.12 -atomicfu_version=0.12.10 +atomicfu_version=0.13.0 html_version=0.6.8 lincheck_version=2.0 dokka_version=0.9.16-rdev-2-mpp-hacks -bintray_version=1.8.4-jetbrains-5 byte_buddy_version=1.9.3 reactor_vesion=3.2.5.RELEASE reactive_streams_version=1.0.2 @@ -25,4 +28,6 @@ mocha_teamcity_reporter_version=2.2.2 source_map_support_version=0.5.3 spring_dependency_management_version=1.0.8.RELEASE +# Settings kotlin.incremental.multiplatform=true +kotlin.native.ignoreDisabledTargets=true diff --git a/gradle/dokka.gradle b/gradle/dokka.gradle index 051bc1c99a..f1d8f21a6c 100644 --- a/gradle/dokka.gradle +++ b/gradle/dokka.gradle @@ -1,10 +1,9 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ // Configures generation of JavaDoc & Dokka artifacts - def makeLinkMapping(dokka, projectDir) { dokka.linkMapping { def relPath = rootProject.projectDir.toPath().relativize(projectDir.toPath()) @@ -16,6 +15,7 @@ def makeLinkMapping(dokka, projectDir) { configurations { dokkaStubs.extendsFrom compileOnly + configureKotlinJvmPlatform(dokkaStubs) } apply plugin: 'org.jetbrains.dokka' @@ -83,14 +83,3 @@ if (project.name == "kotlinx-coroutines-core") { } } } - -// real xxx-javadoc.jar for JVM -task dokkaJavadoc(type: dokka.getClass()) { - outputFormat = 'javadoc' - outputDirectory = "$buildDir/javadoc" -} - -task javadocJar(type: Jar, dependsOn: dokkaJavadoc) { - classifier = 'javadoc' - from "$buildDir/javadoc" -} diff --git a/gradle/publish-bintray.gradle b/gradle/publish-bintray.gradle index 0e37c6786b..dd528fe982 100644 --- a/gradle/publish-bintray.gradle +++ b/gradle/publish-bintray.gradle @@ -1,113 +1,87 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ // Configures publishing of Maven artifacts to Bintray apply plugin: 'maven' apply plugin: 'maven-publish' -apply plugin: 'com.jfrog.bintray' apply plugin: "com.github.johnrengelman.shadow" apply from: project.rootProject.file('gradle/maven-central.gradle') // ------------- tasks -def bUser = project.hasProperty('bintrayUser') ? project.property('bintrayUser') : System.getenv('BINTRAY_USER') -def bKey = project.hasProperty('bintrayApiKey') ? project.property('bintrayApiKey') : System.getenv('BINTRAY_API_KEY') def isMultiplatform = project.name == "kotlinx-coroutines-core" def isBom = project.name == "kotlinx-coroutines-bom" -task stubSources(type: Jar) { - classifier = 'sources' -} - -task stubJavadoc(type: Jar) { - classifier = 'javadoc' -} - -task emptyJar(type: Jar) { -} +if (!isMultiplatform) { + // Regular java modules need 'java-library' plugin for proper publication + apply plugin: 'java-library' -task sourcesJar(type: Jar) { - classifier = 'sources' - if (project.name == "kotlinx-coroutines-core") { - from kotlin.sourceSets.commonMain.kotlin - } else if (!isBom) { + // MPP projects pack their sources automatically, java libraries need to explicitly pack them + task sourcesJar(type: Jar) { + archiveClassifier = 'sources' from sourceSets.main.allSource } } -def configureTransitiveDependencies = { Project project, Publication publication -> - project.configure(project) { - publication.pom.withXml { pom -> - def dependenciesNode = asNode().getAt("dependencies")[0] - if (dependenciesNode == null) return - dependenciesNode.dependency.each { - it.artifactId.each { node -> - def artifactId = node.text() - if (!artifactId.endsWith("native")) return - - switch (project.name) { - case 'metadata': - node.setValue("${artifactId[0..-8]}-common") - break - case 'js': - node.setValue("${artifactId[0..-8]}-js") - break - } - } - } - } - } +// empty xxx-javadoc.jar +task javadocJar(type: Jar) { + archiveClassifier = 'javadoc' } publishing { repositories { - maven { url = 'https://kotlin.bintray.com/kotlinx' } + maven { + def user = 'kotlin' + def repo = 'kotlinx' + def name = 'kotlinx.coroutines' + url = "https://api.bintray.com/maven/$user/$repo/$name/;publish=0" + + credentials { + username = project.hasProperty('bintrayUser') ? project.property('bintrayUser') : System.getenv('BINTRAY_USER') + password = project.hasProperty('bintrayApiKey') ? project.property('bintrayApiKey') : System.getenv('BINTRAY_API_KEY') + } + } } if (isBom) { + // Configure mavenBom publication publications { - mavenBom(MavenPublication) { - pom.withXml(configureMavenCentralMetadata) - } + mavenBom(MavenPublication) {} } - return } else if (!isMultiplatform) { + // Configure java publications for regular non-MPP modules publications { - maven(MavenPublication) { publication -> - publication.artifact javadocJar - publication.artifact sourcesJar - publication.pom.withXml(configureMavenCentralMetadata) + maven(MavenPublication) { if (project.name == "kotlinx-coroutines-debug") { - project.shadow.component(publication) - publication.pom.withXml(configureMavenDependencies) + project.shadow.component(it) } else { - publication.from components.java + from components.java } + artifact sourcesJar } } - - disableMetadataPublicationKotlinJvm() - return } - // Rename artifacts for backward compatibility publications.all { + pom.withXml(configureMavenCentralMetadata) + + // add empty javadocs (no need for MPP root publication which publishes only pom file) + if (it.name != 'kotlinMultiplatform' && !isBom) { + it.artifact(javadocJar) + } + + // Rename MPP artifacts for backward compatibility def type = it.name switch (type) { case 'kotlinMultiplatform': it.artifactId = "$project.name-native" - it.artifact emptyJar - it.artifact stubJavadoc - it.artifact sourcesJar break - case 'metadata': it.artifactId = "$project.name-common" break - case 'jvm': it.artifactId = "$project.name" break @@ -117,36 +91,9 @@ publishing { break } - pom.withXml(configureMavenCentralMetadata) - configureTransitiveDependencies(project, it) - } - - disableMetadataPublication() -} - -private void disableMetadataPublicationKotlinJvm() { - publishing.publications.each { pub -> - pub.moduleDescriptorGenerator = null - tasks.matching { it.name == "generateMetadataFileFor${pub.name.capitalize()}Publication" }.all { - onlyIf { false } - } - } -} - -private void disableMetadataPublication() { - kotlin.targets.all { target -> - def publication = publishing.publications.findByName(target.name) - - if (publication != null) { - publication.artifact stubJavadoc - if (target.platformType.name != 'native') { - publication.moduleDescriptorGenerator = null - tasks.matching { it.name == "generateMetadataFileFor${name.capitalize()}Publication" }.all { - onlyIf { false } - } - } else { - publication.artifact emptyJar - } + // disable metadata everywhere, but in native modules + if (type == 'maven' || type == 'metadata' || type == 'jvm' || type == 'js') { + moduleDescriptorGenerator = null } } } @@ -154,34 +101,9 @@ private void disableMetadataPublication() { task publishDevelopSnapshot() { def branch = System.getenv('currentBranch') if (branch == "develop") { - dependsOn(":artifactoryPublish") + dependsOn(":publish") } } -bintray { - user = bUser - key = bKey - override = true // for multi-platform Kotlin/Native publishing - publications = ['maven'] - pkg { - userOrg = 'kotlin' - repo = 'kotlinx' - name = 'kotlinx.coroutines' - version { - name = project.version - vcsTag = project.version - released = new Date() - } - } -} - -// TODO :kludge this is required for K/N publishing -bintrayUpload.dependsOn publishToMavenLocal - -// This is for easier debugging of bintray uploading problems -bintrayUpload.doFirst { - publications = project.publishing.publications.findAll { !it.name.contains('-test') }.collect { - println("Uploading artifact '$it.groupId:$it.artifactId:$it.version' from publication '$it.name'") - it - } -} \ No newline at end of file +// Compatibility with old TeamCity configurations that perform :kotlinx-coroutines-core:bintrayUpload +task bintrayUpload(dependsOn: publish) \ No newline at end of file diff --git a/gradle/publish-npm-js.gradle b/gradle/publish-npm-js.gradle index 1667a55115..a2991db492 100644 --- a/gradle/publish-npm-js.gradle +++ b/gradle/publish-npm-js.gradle @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ def prop(name, defVal) { @@ -14,7 +14,7 @@ def distTag(version) { return "latest" } -def npmTemplateDir = file("$projectDir/js/npm") +def npmTemplateDir = file("$projectDir/npm") def npmDeployDir = file("$buildDir/npm") def authToken = prop("kotlin.npmjs.auth.token", "") diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 603c4e1143..c141e3345e 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,9 @@ -#Fri Mar 15 12:06:46 CET 2019 +# +# Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. +# + distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-4.10-all.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-5.6.1-all.zip diff --git a/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt b/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt index a924bb4aa3..de47c760a8 100644 --- a/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt +++ b/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt @@ -5,31 +5,40 @@ package kotlinx.coroutines.guava import com.google.common.util.concurrent.* -import kotlinx.coroutines.* +import com.google.common.util.concurrent.internal.InternalFutureFailureAccess +import com.google.common.util.concurrent.internal.InternalFutures import java.util.concurrent.* -import java.util.concurrent.CancellationException import kotlin.coroutines.* +import kotlinx.coroutines.* +import java.util.concurrent.CancellationException /** - * Starts new coroutine and returns its results an an implementation of [ListenableFuture]. - * The running coroutine is cancelled when the resulting future is cancelled or otherwise completed. + * Starts [block] in a new coroutine and returns a [ListenableFuture] pointing to its result. + * + * The coroutine is immediately started. Passing [CoroutineStart.LAZY] to [start] throws + * [IllegalArgumentException], because Futures don't have a way to start lazily. * - * Coroutine context is inherited from a [CoroutineScope], additional context elements can be specified with [context] argument. - * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used. - * The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden - * with corresponding [coroutineContext] element. + * The created coroutine is cancelled when the resulting future completes successfully, fails, or + * is cancelled. * - * By default, the coroutine is immediately scheduled for execution. - * Other options can be specified via `start` parameter. See [CoroutineStart] for details. - * A value of [CoroutineStart.LAZY] is not supported - * (since `ListenableFuture` framework does not provide the corresponding capability) and - * produces [IllegalArgumentException]. + * `CoroutineContext` is inherited from this [CoroutineScope]. Additional context elements can be + * added/overlaid by passing [context]. * - * See [newCoroutineContext][CoroutineScope.newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine. + * If the context does not have a [CoroutineDispatcher], nor any other [ContinuationInterceptor] + * member, [Dispatchers.Default] is used. * - * @param context additional to [CoroutineScope.coroutineContext] context of the coroutine. + * The parent job is inherited from this [CoroutineScope], and can be overridden by passing + * a [Job] in [context]. + * + * See [newCoroutineContext][CoroutineScope.newCoroutineContext] for a description of debugging + * facilities. + * + * Note that the error and cancellation semantics of [future] are _subtly different_ than + * [asListenableFuture]'s. See [ListenableFutureCoroutine] for details. + * + * @param context added overlaying [CoroutineScope.coroutineContext] to form the new context. * @param start coroutine start option. The default value is [CoroutineStart.DEFAULT]. - * @param block the coroutine code. + * @param block the code to execute. */ public fun CoroutineScope.future( context: CoroutineContext = EmptyCoroutineContext, @@ -38,81 +47,98 @@ public fun CoroutineScope.future( ): ListenableFuture { require(!start.isLazy) { "$start start is not supported" } val newContext = newCoroutineContext(context) + // TODO: It'd be nice not to leak this SettableFuture reference, which is easily blind-cast. val future = SettableFuture.create() val coroutine = ListenableFutureCoroutine(newContext, future) - Futures.addCallback(future, coroutine, MoreExecutors.directExecutor()) + future.addListener( + coroutine, + MoreExecutors.directExecutor()) coroutine.start(start, coroutine, block) return future } -private class ListenableFutureCoroutine( - context: CoroutineContext, - private val future: SettableFuture -) : AbstractCoroutine(context), FutureCallback { - /* - * We register coroutine as callback to the future this coroutine completes. - * But when future is cancelled externally, we'd like to cancel coroutine, - * so we register on failure handler for this purpose - */ - override fun onSuccess(result: T?) { - // Do nothing - } - - override fun onFailure(t: Throwable) { - if (t is CancellationException) { - cancel() - } - } - - override fun onCompleted(value: T) { - future.set(value) - } - - override fun onCancelled(cause: Throwable, handled: Boolean) { - if (!future.setException(cause) && !handled) { - // prevents loss of exception that was not handled by parent & could not be set to SettableFuture - handleCoroutineException(context, cause) - } - } -} - /** - * Converts this deferred value to the instance of [ListenableFuture]. - * The deferred value is cancelled when the resulting future is cancelled or otherwise completed. + * Returns a [Deferred] that is completed or failed by `this` [ListenableFuture]. + * + * Completion is non-atomic between the two promises. + * + * Cancellation is propagated bidirectionally. + * + * When `this` `ListenableFuture` completes (either successfully or exceptionally) it will try to + * complete the returned `Deferred` with the same value or exception. This will succeed, barring a + * race with cancellation of the `Deferred`. + * + * When `this` `ListenableFuture` is [successfully cancelled][java.util.concurrent.Future.cancel], + * it will cancel the returned `Deferred`. + * + * When the returned `Deferred` is [cancelled][Deferred.cancel()], it will try to propagate the + * cancellation to `this` `ListenableFuture`. Propagation will succeed, barring a race with the + * `ListenableFuture` completing normally. This is the only case in which the returned `Deferred` + * will complete with a different outcome than `this` `ListenableFuture`. */ -public fun Deferred.asListenableFuture(): ListenableFuture = DeferredListenableFuture(this) - -private class DeferredListenableFuture( - private val deferred: Deferred -) : AbstractFuture() { - init { - deferred.invokeOnCompletion { - try { - set(deferred.getCompleted()) - } catch (t: Throwable) { - setException(t) +public fun ListenableFuture.asDeferred(): Deferred { + /* This method creates very specific behaviour as it entangles the `Deferred` and + * `ListenableFuture`. This behaviour is the best discovered compromise between the possible + * states and interface contracts of a `Future` and the states of a `Deferred`. The specific + * behaviour is described here. + * + * When `this` `ListenableFuture` is successfully cancelled - meaning + * `ListenableFuture.cancel()` returned `true` - it will synchronously cancel the returned + * `Deferred`. This can only race with cancellation of the returned `Deferred`, so the + * `Deferred` will always be put into its "cancelling" state and (barring uncooperative + * cancellation) _eventually_ reach its "cancelled" state when either promise is successfully + * cancelled. + * + * When the returned `Deferred` is cancelled, `ListenableFuture.cancel()` will be synchronously + * called on `this` `ListenableFuture`. This will attempt to cancel the `Future`, though + * cancellation may not succeed and the `ListenableFuture` may complete in a non-cancelled + * terminal state. + * + * The returned `Deferred` may receive and suppress the `true` return value from + * `ListenableFuture.cancel()` when the task is cancelled via the `Deferred` reference to it. + * This is unavoidable, so make sure no idempotent cancellation work is performed by a + * reference-holder of the `ListenableFuture` task. The idempotent work won't get done if + * cancellation was from the `Deferred` representation of the task. + * + * This is inherently a race. See `Future.cancel()` for a description of `Future` cancellation + * semantics. See `Job` for a description of coroutine cancellation semantics. + */ + // First, try the fast-fast error path for Guava ListenableFutures. This will save allocating an + // Exception by using the same instance the Future created. + if (this is InternalFutureFailureAccess) { + val t: Throwable? = InternalFutures.tryInternalFastPathGetFailure(this) + if (t != null) { + return CompletableDeferred().also { + it.completeExceptionally(t) } } } - override fun interruptTask() { deferred.cancel() } -} -/** - * Converts this listenable future to an instance of [Deferred]. - * It is cancelled when the resulting deferred is cancelled. - */ -public fun ListenableFuture.asDeferred(): Deferred { - // Fast path if already completed + // Second, try the fast path for a completed Future. The Future is known to be done, so get() + // will not block, and thus it won't be interrupted. Calling getUninterruptibly() instead of + // getDone() in this known-non-interruptible case saves the volatile read that getDone() uses to + // handle interruption. if (isDone) { return try { - @Suppress("UNCHECKED_CAST") - CompletableDeferred(get() as T) - } catch (e: Throwable) { - // unwrap original cause from ExecutionException - val original = (e as? ExecutionException)?.cause ?: e - CompletableDeferred().also { it.completeExceptionally(original) } + val value = Uninterruptibles.getUninterruptibly(this) + if (value == null) { + CompletableDeferred().also { + it.completeExceptionally(KotlinNullPointerException()) + } + } else { + CompletableDeferred(value) + } + } catch (e: CancellationException) { + CompletableDeferred().also { it.cancel(e) } + } catch (e: ExecutionException) { + // ExecutionException is the only kind of exception that can be thrown from a gotten + // Future. Anything else showing up here indicates a very fundamental bug in a + // Future implementation. + CompletableDeferred().also { it.completeExceptionally(e.nonNullCause()) } } } + + // Finally, if this isn't done yet, attach a Listener that will complete the Deferred. val deferred = CompletableDeferred() Futures.addCallback(this, object : FutureCallback { override fun onSuccess(result: T?) { @@ -124,41 +150,319 @@ public fun ListenableFuture.asDeferred(): Deferred { } }, MoreExecutors.directExecutor()) - deferred.invokeOnCompletion { cancel(false) } + // ... And cancel the Future when the deferred completes. Since the return type of this method + // is Deferred, the only interaction point from the caller is to cancel the Deferred. If this + // completion handler runs before the Future is completed, the Deferred must have been + // cancelled and should propagate its cancellation. If it runs after the Future is completed, + // this is a no-op. + deferred.invokeOnCompletion { + cancel(false) + } return deferred } /** - * Awaits for completion of the future without blocking a thread. + * Returns the cause from an [ExecutionException] thrown by a [Future.get] or similar. + * + * [ExecutionException] _always_ wraps a non-null cause when Future.get() throws. A Future cannot + * fail without a non-null `cause`, because the only way a Future _can_ fail is an uncaught + * [Exception]. + * + * If this !! throws [NullPointerException], a Future is breaking its interface contract and losing + * state - a serious fundamental bug. + */ +private fun ExecutionException.nonNullCause(): Throwable { + return this.cause!! +} + +/** + * Returns a [ListenableFuture] that is completed or failed by `this` [Deferred]. + * + * Completion is non-atomic between the two promises. + * + * When either promise successfully completes, it will attempt to synchronously complete its + * counterpart with the same value. This will succeed barring a race with cancellation. + * + * When either promise completes with an Exception, it will attempt to synchronously complete its + * counterpart with the same Exception. This will succeed barring a race with cancellation. + * + * Cancellation is propagated bidirectionally. + * + * When the returned [Future] is successfully cancelled - meaning [Future.cancel] returned true - + * [Deferred.cancel] will be synchronously called on `this` [Deferred]. This will attempt to cancel + * the `Deferred`, though cancellation may not succeed and the `Deferred` may complete in a + * non-cancelled terminal state. + * + * When `this` `Deferred` reaches its "cancelled" state with a successful cancellation - meaning it + * completes with [kotlinx.coroutines.CancellationException] - `this` `Deferred` will synchronously + * cancel the returned `Future`. This can only race with cancellation of the returned `Future`, so + * the returned `Future` will always _eventually_ reach its cancelled state when either promise is + * successfully cancelled, for their different meanings of "successfully cancelled". + * + * This is inherently a race. See [Future.cancel] for a description of `Future` cancellation + * semantics. See [Job] for a description of coroutine cancellation semantics. See + * [DeferredListenableFuture.cancel] for greater detail on the overlapped cancellation semantics and + * corner cases of this method. + */ +public fun Deferred.asListenableFuture(): ListenableFuture { + val outerFuture = OuterFuture(this) + outerFuture.afterInit() + return outerFuture +} + +/** + * Awaits completion of `this` [ListenableFuture] without blocking a thread. + * + * This suspend function is cancellable. * - * This suspending function is cancellable. * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function * stops waiting for the future and immediately resumes with [CancellationException][kotlinx.coroutines.CancellationException]. * - * This method is intended to be used with one-shot futures, so on coroutine cancellation future is cancelled as well. - * If cancelling given future is undesired, `future.asDeferred().await()` should be used instead. + * This method is intended to be used with one-shot Futures, so on coroutine cancellation, the Future is cancelled as well. + * If cancelling the given future is undesired, use [Futures.nonCancellationPropagating] or + * [kotlinx.coroutines.NonCancellable]. + * */ public suspend fun ListenableFuture.await(): T { try { - if (isDone) return get() as T + if (isDone) return Uninterruptibles.getUninterruptibly(this) } catch (e: ExecutionException) { - throw e.cause ?: e // unwrap original cause from ExecutionException + // ExecutionException is the only kind of exception that can be thrown from a gotten + // Future, other than CancellationException. Cancellation is propagated upward so that + // the coroutine running this suspend function may process it. + // Any other Exception showing up here indicates a very fundamental bug in a + // Future implementation. + throw e.nonNullCause() } return suspendCancellableCoroutine { cont: CancellableContinuation -> - val callback = ContinuationCallback(cont) - Futures.addCallback(this, callback, MoreExecutors.directExecutor()) + addListener( + ToContinuation(this, cont), + MoreExecutors.directExecutor()) cont.invokeOnCancellation { cancel(false) - callback.cont = null // clear the reference to continuation from the future's callback } } } -private class ContinuationCallback( - @Volatile @JvmField var cont: Continuation? -) : FutureCallback { - @Suppress("UNCHECKED_CAST") - override fun onSuccess(result: T?) { cont?.resume(result as T) } - override fun onFailure(t: Throwable) { cont?.resumeWithException(t) } +/** + * Propagates the outcome of [futureToObserve] to [continuation] on completion. + * + * Cancellation is propagated as cancelling the continuation. If [futureToObserve] completes + * and fails, the cause of the Future will be propagated without a wrapping + * [ExecutionException] when thrown. + */ +private class ToContinuation( + val futureToObserve: ListenableFuture, + val continuation: CancellableContinuation +): Runnable { + override fun run() { + if (futureToObserve.isCancelled) { + continuation.cancel() + } else { + try { + continuation.resumeWith( + Result.success(Uninterruptibles.getUninterruptibly(futureToObserve))) + } catch (e: ExecutionException) { + // ExecutionException is the only kind of exception that can be thrown from a gotten + // Future. Anything else showing up here indicates a very fundamental bug in a + // Future implementation. + continuation.resumeWithException(e.nonNullCause()) + } + } + } +} + +/** + * An [AbstractCoroutine] intended for use directly creating a [ListenableFuture] handle to + * completion. + * + * The code in the [Runnable] portion of the class is registered as a [ListenableFuture] callback. + * See [run] for details. Both types are implemented by this object to save an allocation. + */ +private class ListenableFutureCoroutine( + context: CoroutineContext, + private val future: SettableFuture +) : AbstractCoroutine(context), Runnable { + + /** + * When registered as a [ListenableFuture] listener, cancels the returned [Coroutine] if + * [future] is successfully cancelled. By documented contract, a [Future] has been cancelled if + * and only if its `isCancelled()` method returns true. + * + * Any error that occurs after successfully cancelling a [ListenableFuture] + * created by submitting the returned object as a [Runnable] to an `Executor` will be passed + * to the [CoroutineExceptionHandler] from the context. The contract of [Future] does not permit + * it to return an error after it is successfully cancelled. + * + * By calling [asListenableFuture] on a [Deferred], any error that occurs after successfully + * cancelling the [ListenableFuture] representation of the [Deferred] will _not_ be passed to + * the [CoroutineExceptionHandler]. Cancelling a [Deferred] places that [Deferred] in the + * cancelling/cancelled states defined by [Job], which _can_ show the error. It's assumed that + * the [Deferred] pointing to the task will be used to observe any error outcome occurring after + * cancellation. + * + * This may be counterintuitive, but it maintains the error and cancellation contracts of both + * the [Deferred] and [ListenableFuture] types, while permitting both kinds of promise to point + * to the same running task. + */ + override fun run() { + if (future.isCancelled) { + cancel() + } + } + + override fun onCompleted(value: T) { + future.set(value) + } + + // TODO: This doesn't actually cancel the Future. There doesn't seem to be bidi cancellation? + override fun onCancelled(cause: Throwable, handled: Boolean) { + if (!future.setException(cause) && !handled) { + // prevents loss of exception that was not handled by parent & could not be set to SettableFuture + handleCoroutineException(context, cause) + } + } +} + +/** + * A [ListenableFuture] that delegates to an internal [DeferredListenableFuture], collaborating with + * it. + * + * This setup allows the returned [ListenableFuture] to maintain the following properties: + * + * - Correct implementation of [Future]'s happens-after semantics documented for [get], [isDone] + * and [isCancelled] methods + * - Cancellation propagation both to and from [Deferred] + * - Correct cancellation and completion semantics even when this [ListenableFuture] is combined + * with different concrete implementations of [ListenableFuture] + * - Fully correct cancellation and listener happens-after obeying [Future] and + * [ListenableFuture]'s documented and implicit contracts is surprisingly difficult to achieve. + * The best way to be correct, especially given the fun corner cases from + * [AsyncFuture.setAsync], is to just use an [AsyncFuture]. + * - To maintain sanity, this class implements [ListenableFuture] and uses an inner [AsyncFuture] + * around its input [deferred] as a state engine to establish happens-after-completion. This + * could probably be compressed into one subclass of [AsyncFuture] to save an allocation, at the + * cost of the implementation's readability. + */ +private class OuterFuture(private val deferred: Deferred): ListenableFuture { + val innerFuture = DeferredListenableFuture(deferred) + + // Adding the listener after initialization resolves partial construction hairpin problem. + // + // This invokeOnCompletion completes the innerFuture as `deferred` does. The innerFuture may + // have completed earlier if it got cancelled! See DeferredListenableFuture. + fun afterInit() { + deferred.invokeOnCompletion { + innerFuture.complete() + } + } + + /** + * Returns cancellation _in the sense of [Future]_. This is _not_ equivalent to + * [Job.isCancelled]. + * + * When done, this Future is cancelled if its innerFuture is cancelled, or if its delegate + * [deferred] is cancelled. Cancellation of [innerFuture] collaborates with this class. + * + * See [DeferredListenableFuture.cancel]. + */ + override fun isCancelled(): Boolean { + // This expression ensures that isCancelled() will *never* return true when isDone() returns false. + // In the case that the deferred has completed with cancellation, completing `this`, its + // reaching the "cancelled" state with a cause of CancellationException is treated as the + // same thing as innerFuture getting cancelled. If the Job is in the "cancelling" state and + // this Future hasn't itself been successfully cancelled, the Future will return + // isCancelled() == false. This is the only discovered way to reconcile the two different + // cancellation contracts. + return isDone + && (innerFuture.isCancelled + || deferred.getCompletionExceptionOrNull() is kotlinx.coroutines.CancellationException) + } + + /** + * Waits for [innerFuture] to complete by blocking, then uses the [deferred] returned by that + * Future to get the `T` value `this` [ListenableFuture] is pointing to. This establishes + * happens-after ordering for completion of the [Deferred] input to [OuterFuture]. + * + * `innerFuture` _must be complete_ in order for the [isDone] and [isCancelled] happens-after + * contract of [Future] to be correctly followed. If this method were to directly use + * _`this.deferred`_ instead of blocking on its `innerFuture`, the [Deferred] that this + * [ListenableFuture] is created from might be in an incomplete state when used by `get()`. + */ + override fun get(): T { + return getInternal(innerFuture.get()) + } + + /** See [get()]. */ + override fun get(timeout: Long, unit: TimeUnit): T { + return getInternal(innerFuture.get(timeout, unit)) + } + + /** See [get()]. */ + private fun getInternal(deferred: Deferred): T { + if (deferred.isCancelled) { + val exception = deferred.getCompletionExceptionOrNull() + if (exception is kotlinx.coroutines.CancellationException) { + throw exception + } else { + throw ExecutionException(exception) + } + } else { + return deferred.getCompleted() + } + } + + override fun addListener(listener: Runnable, executor: Executor) { + innerFuture.addListener(listener, executor) + } + + override fun isDone(): Boolean { + return innerFuture.isDone + } + + override fun cancel(mayInterruptIfRunning: Boolean): Boolean { + return innerFuture.cancel(mayInterruptIfRunning) + } +} + +/** + * Holds a delegate deferred, and serves as a state machine for [Future] cancellation. + * + * [AbstractFuture] has a highly-correct atomic implementation of `Future`'s completion and + * cancellation semantics. By using that type, the [OuterFuture] can delegate its semantics to + * _this_ `Future` `get()` the result in such a way that the `Deferred` is always complete when + * returned. + */ +private class DeferredListenableFuture( + private val deferred: Deferred +) : AbstractFuture>() { + + fun complete() { + set(deferred) + } + + /** + * Tries to cancel the task. This is fundamentally racy. + * + * For any given call to `cancel()`, if [deferred] is already completed, the call will complete + * this Future with it, and fail to cancel. Otherwise, the + * call to `cancel()` will try to cancel this Future: if and only if cancellation of this + * succeeds, [deferred] will have its [Deferred.cancel] called. + * + * This arrangement means that [deferred] _might not successfully cancel_, if the race resolves + * in a particular way. [deferred] may also be in its "cancelling" state while this + * ListenableFuture is complete and cancelled. + * + * [OuterFuture] collaborates with this class to present a more cohesive picture and ensure + * that certain combinations of cancelled/cancelling states can't be observed. + */ + override fun cancel(mayInterruptIfRunning: Boolean): Boolean { + return if (super.cancel(mayInterruptIfRunning)) { + deferred.cancel() + true + } else { + false + } + } } diff --git a/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt b/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt index cf82318a47..f56a0be4d8 100644 --- a/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt +++ b/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt @@ -6,12 +6,12 @@ package kotlinx.coroutines.guava import com.google.common.util.concurrent.* import kotlinx.coroutines.* -import kotlinx.coroutines.CancellationException import org.hamcrest.core.* import org.junit.* import org.junit.Assert.* import org.junit.Test import java.util.concurrent.* +import java.util.concurrent.CancellationException class ListenableFutureTest : TestBase() { @Before @@ -127,6 +127,17 @@ class ListenableFutureTest : TestBase() { } } + @Test + fun testFutureLazyStartThrows() { + expect(1) + val e = assertFailsWith { + GlobalScope.future(start = CoroutineStart.LAZY) {} + } + + assertThat(e.message, IsEqual("LAZY start is not supported")) + finish(2) + } + @Test fun testCompletedDeferredAsListenableFuture() = runBlocking { expect(1) @@ -189,6 +200,122 @@ class ListenableFutureTest : TestBase() { finish(6) } + @Test + fun testFutureAwaitCancellationPropagatingToDeferred() = runTest { + + val latch = CountDownLatch(1) + val executor = MoreExecutors.listeningDecorator(ForkJoinPool.commonPool()) + val future = executor.submit(Callable { latch.await(); 42 }) + val deferred = async { + expect(2) + future.await() + } + expect(1) + yield() + future.cancel(/*mayInterruptIfRunning=*/true) + expect(3) + latch.countDown() + deferred.join() + assertTrue(future.isCancelled) + assertTrue(deferred.isCancelled) + assertFailsWith { future.get() } + finish(4) + } + + @Test + fun testFutureAwaitCancellationPropagatingToDeferredNoInterruption() = runTest { + + val latch = CountDownLatch(1) + val executor = MoreExecutors.listeningDecorator(ForkJoinPool.commonPool()) + val future = executor.submit(Callable { latch.await(); 42 }) + val deferred = async { + expect(2) + future.await() + } + expect(1) + yield() + future.cancel(/*mayInterruptIfRunning=*/false) + expect(3) + latch.countDown() + deferred.join() + assertTrue(future.isCancelled) + assertTrue(deferred.isCancelled) + assertFailsWith { future.get() } + finish(4) + } + + @Test + fun testAsListenableFutureCancellationPropagatingToDeferred() = runTest { + val latch = CountDownLatch(1) + val executor = MoreExecutors.listeningDecorator(ForkJoinPool.commonPool()) + val future = executor.submit(Callable { latch.await(); 42 }) + val deferred = async { + expect(2) + future.await() + } + val asListenableFuture = deferred.asListenableFuture() + expect(1) + yield() + asListenableFuture.cancel(/*mayInterruptIfRunning=*/true) + expect(3) + latch.countDown() + deferred.join() + assertTrue(future.isCancelled) + assertTrue(deferred.isCancelled) + assertTrue(asListenableFuture.isCancelled) + assertFailsWith { future.get() } + finish(4) + } + + @Test + fun testAsListenableFutureCancellationPropagatingToDeferredNoInterruption() = runTest { + val latch = CountDownLatch(1) + val executor = MoreExecutors.listeningDecorator(ForkJoinPool.commonPool()) + val future = executor.submit(Callable { latch.await(); 42 }) + val deferred = async { + expect(2) + future.await() + } + val asListenableFuture = deferred.asListenableFuture() + expect(1) + yield() + asListenableFuture.cancel(/*mayInterruptIfRunning=*/false) + expect(3) + latch.countDown() + deferred.join() + assertFailsWith { asListenableFuture.get() } + assertTrue(future.isCancelled) + assertTrue(asListenableFuture.isCancelled) + assertTrue(deferred.isCancelled) + assertFailsWith { future.get() } + finish(4) + } + + @Test + fun testAsListenableFutureCancellationThroughSetFuture() = runTest { + val latch = CountDownLatch(1) + val future = SettableFuture.create() + val deferred = async { + expect(2) + future.await() + } + val asListenableFuture = deferred.asListenableFuture() + expect(1) + yield() + future.setFuture(Futures.immediateCancelledFuture()) + expect(3) + latch.countDown() + deferred.join() + assertFailsWith { asListenableFuture.get() } + // Future was not interrupted, but also wasn't blocking, so it will be successfully + // cancelled by its parent Coroutine. + assertTrue(future.isCancelled) + assertTrue(asListenableFuture.isCancelled) + assertTrue(deferred.isCancelled) + assertFailsWith { future.get() } + finish(4) + } + @Test fun testFutureCancellation() = runTest { val future = awaitFutureWithCancel(true) @@ -197,6 +324,21 @@ class ListenableFutureTest : TestBase() { finish(4) } + @Test + fun testAsListenableDeferredCancellationCauseAndMessagePropagate() = runTest { + val deferred = CompletableDeferred() + val inputCancellationException = CancellationException("Foobar") + inputCancellationException.initCause(OutOfMemoryError("Foobaz")) + deferred.cancel(inputCancellationException) + val asFuture = deferred.asListenableFuture() + + val outputCancellationException = + assertFailsWith { asFuture.get() } + assertEquals(outputCancellationException.message, "Foobar") + assertTrue(outputCancellationException.cause is OutOfMemoryError) + assertEquals(outputCancellationException.cause?.message, "Foobaz") + } + @Test fun testNoFutureCancellation() = runTest { val future = awaitFutureWithCancel(false) @@ -205,6 +347,59 @@ class ListenableFutureTest : TestBase() { finish(4) } + @Test + fun testCancelledDeferredAsListenableFutureAwaitThrowsCancellation() = runTest { + val future = Futures.immediateCancelledFuture() + val asDeferred = future.asDeferred() + val asDeferredAsFuture = asDeferred.asListenableFuture() + + assertTrue(asDeferredAsFuture.isCancelled) + assertFailsWith { + val value: Int = asDeferredAsFuture.await() + } + } + + @Test + fun testCancelledDeferredAsListenableFutureAsDeferredPassesCancellationAlong() = runTest { + val deferred = CompletableDeferred() + deferred.completeExceptionally(CancellationException()) + val asFuture = deferred.asListenableFuture() + val asFutureAsDeferred = asFuture.asDeferred() + + assertTrue(asFutureAsDeferred.isCancelled) + assertTrue(asFutureAsDeferred.isCompleted) + // By documentation, join() shouldn't throw when asDeferred is already complete. + asFutureAsDeferred.join() + assertThat( + asFutureAsDeferred.getCompletionExceptionOrNull(), + IsInstanceOf(CancellationException::class.java)) + } + + @Test + fun testCancelledFutureAsDeferredAwaitThrowsCancellation() = runTest { + val future = Futures.immediateCancelledFuture() + val asDeferred = future.asDeferred() + + assertTrue(asDeferred.isCancelled) + assertFailsWith { + val value: Int = asDeferred.await() + } + } + + @Test + fun testCancelledFutureAsDeferredJoinDoesNotThrow() = runTest { + val future = Futures.immediateCancelledFuture() + val asDeferred = future.asDeferred() + + assertTrue(asDeferred.isCancelled) + assertTrue(asDeferred.isCompleted) + // By documentation, join() shouldn't throw when asDeferred is already complete. + asDeferred.join() + assertThat( + asDeferred.getCompletionExceptionOrNull(), + IsInstanceOf(CancellationException::class.java)) + } + @Test fun testCompletedFutureAsDeferred() = runTest { val future = SettableFuture.create() @@ -240,6 +435,22 @@ class ListenableFutureTest : TestBase() { } } + @Test + fun testFutureCompletedWithNullAsDeferred() = runTest { + val executor = MoreExecutors.listeningDecorator(ForkJoinPool.commonPool()) + val future = executor.submit(Callable { null }) + val deferred = GlobalScope.async { + future.asDeferred().await() + } + + try { + deferred.await() + expectUnreached() + } catch (e: Throwable) { + assertTrue(e is KotlinNullPointerException) + } + } + @Test fun testThrowingFutureAsDeferred() = runTest { val executor = MoreExecutors.listeningDecorator(ForkJoinPool.commonPool()) diff --git a/integration/kotlinx-coroutines-jdk8/src/future/Future.kt b/integration/kotlinx-coroutines-jdk8/src/future/Future.kt index 9f38fdee4f..164ee2d2b6 100644 --- a/integration/kotlinx-coroutines-jdk8/src/future/Future.kt +++ b/integration/kotlinx-coroutines-jdk8/src/future/Future.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.future @@ -124,9 +124,12 @@ public fun CompletionStage.asDeferred(): Deferred { val result = CompletableDeferred() whenComplete { value, exception -> if (exception == null) { + // the future has completed normally result.complete(value) } else { - result.completeExceptionally(exception) + // the future has completed with an exception, unwrap it consistently with fast path + // Note: In the fast-path the implementation of CompletableFuture.get() does unwrapping + result.completeExceptionally((exception as? CompletionException)?.cause ?: exception) } } if (this is Future<*>) result.cancelFutureOnCompletion(this) @@ -171,7 +174,7 @@ private class ContinuationConsumer( override fun accept(result: T?, exception: Throwable?) { val cont = this.cont ?: return // atomically read current value unless null if (exception == null) { - // the future has been completed normally + // the future has completed normally cont.resume(result as T) } else { // the future has completed with an exception, unwrap it to provide consistent view of .await() result and to propagate only original exception diff --git a/integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt b/integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt index 47af96f0a2..4649645efb 100644 --- a/integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt +++ b/integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.future @@ -264,11 +264,9 @@ class FutureTest : TestBase() { try { deferred.await() fail("deferred.await() should throw an exception") - } catch (e: CompletionException) { + } catch (e: TestException) { assertTrue(deferred.isCancelled) - val cause = e.cause?.cause!! // Stacktrace augmentation - assertTrue(cause is TestException) - assertEquals("something went wrong", cause.message) + assertEquals("something went wrong", e.message) } } @@ -437,6 +435,45 @@ class FutureTest : TestBase() { } } + /** + * Tests that both [CompletionStage.await] and [CompletionStage.asDeferred] consistently unwrap + * [CompletionException] both in their slow and fast paths. + * See [issue #1479](https://github.com/Kotlin/kotlinx.coroutines/issues/1479). + */ + @Test + fun testConsistentExceptionUnwrapping() = runTest { + expect(1) + // Check the fast path + val fFast = CompletableFuture.supplyAsync { + expect(2) + throw TestException() + } + fFast.checkFutureException() // wait until it completes + // Fast path in await and asDeferred.await() shall produce TestException + expect(3) + val dFast = fFast.asDeferred() + assertFailsWith { fFast.await() } + assertFailsWith { dFast.await() } + // Same test, but future has not completed yet, check the slow path + expect(4) + val barrier = CyclicBarrier(2) + val fSlow = CompletableFuture.supplyAsync { + barrier.await() + expect(6) + throw TestException() + } + val dSlow = fSlow.asDeferred() + launch(start = CoroutineStart.UNDISPATCHED) { + expect(5) + // Slow path on await shall produce TestException, too + assertFailsWith { fSlow.await() } // will suspend here + assertFailsWith { dSlow.await() } + finish(7) + } + barrier.await() + fSlow.checkFutureException() // now wait until it completes + } + private inline fun CompletableFuture<*>.checkFutureException(vararg suppressed: KClass) { val e = assertFailsWith { get() } val cause = e.cause!! diff --git a/kotlinx-coroutines-core/build.gradle b/kotlinx-coroutines-core/build.gradle index a6db106ff5..c329497f95 100644 --- a/kotlinx-coroutines-core/build.gradle +++ b/kotlinx-coroutines-core/build.gradle @@ -42,6 +42,10 @@ kotlin { } } +configurations { + configureKotlinJvmPlatform(kotlinCompilerPluginClasspath) +} + kotlin.sourceSets { jvmTest.dependencies { api "com.devexperts.lincheck:lincheck:$lincheck_version" @@ -72,23 +76,25 @@ jvmTest { maxHeapSize = '1g' enableAssertions = true systemProperty 'java.security.manager', 'kotlinx.coroutines.TestSecurityManager' - exclude '**/*LFTest.*' + exclude '**/*LFStressTest.*' systemProperty 'kotlinx.coroutines.scheduler.keep.alive.sec', '100000' // any unpark problem hangs test } task lockFreedomTest(type: Test, dependsOn: compileTestKotlinJvm) { classpath = files { jvmTest.classpath } testClassesDirs = files { jvmTest.testClassesDirs } - include '**/*LFTest.*' + include '**/*LFStressTest.*' + enableAssertions = true + testLogging.showStandardStreams = true } task jdk16Test(type: Test, dependsOn: [compileTestKotlinJvm, checkJdk16]) { classpath = files { jvmTest.classpath } testClassesDirs = files { jvmTest.testClassesDirs } executable = "$System.env.JDK_16/bin/java" - exclude '**/*LinearizabilityTest*.*' - exclude '**/*LFTest.*' - exclude '**/exceptions/**' + exclude '**/*LFStressTest.*' // lock-freedom tests use LockFreedomTestEnvironment which needs JDK8 + exclude '**/*LCStressTest.*' // lic-check tests use LinChecker which needs JDK8 + exclude '**/exceptions/**' // exceptions tests check suppressed exception which needs JDK8 exclude '**/ExceptionsGuideTest.*' } diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Transform.kt b/kotlinx-coroutines-core/common/src/flow/operators/Transform.kt index 37ba0d39c3..da5e288a66 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Transform.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Transform.kt @@ -80,7 +80,7 @@ public fun Flow.onEach(action: suspend (T) -> Unit): Flow = transform * Note that initial value should be immutable (or should not be mutated) as it is shared between different collectors. * For example: * ``` - * flowOf(1, 2, 3).accumulate(emptyList()) { acc, value -> acc + value }.toList() + * flowOf(1, 2, 3).scan(emptyList()) { acc, value -> acc + value }.toList() * ``` * will produce `[], [1], [1, 2], [1, 2, 3]]`. */ @@ -101,7 +101,7 @@ public fun Flow.scan(initial: R, @BuilderInference operation: suspend * * For example: * ``` - * flowOf(1, 2, 3, 4).scan { (v1, v2) -> v1 + v2 }.toList() + * flowOf(1, 2, 3, 4).scanReduce { (v1, v2) -> v1 + v2 }.toList() * ``` * will produce `[1, 3, 6, 10]` */ diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Zip.kt b/kotlinx-coroutines-core/common/src/flow/operators/Zip.kt index ba4f0520a5..ebc1dcd9d8 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Zip.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Zip.kt @@ -106,7 +106,12 @@ public fun combineTransform( flow: Flow, flow2: Flow, @BuilderInference transform: suspend FlowCollector.(a: T1, b: T2) -> Unit -): Flow = combineTransform(flow, flow2, transform) +): Flow = combineTransform(flow, flow2) { args: Array<*> -> + transform( + args[0] as T1, + args[1] as T2 + ) +} /** * Returns a [Flow] whose values are generated with [transform] function by combining diff --git a/kotlinx-coroutines-core/common/src/internal/ThreadSafeHeap.common.kt b/kotlinx-coroutines-core/common/src/internal/ThreadSafeHeap.kt similarity index 98% rename from kotlinx-coroutines-core/common/src/internal/ThreadSafeHeap.common.kt rename to kotlinx-coroutines-core/common/src/internal/ThreadSafeHeap.kt index fa8e051cbd..0ee570d154 100644 --- a/kotlinx-coroutines-core/common/src/internal/ThreadSafeHeap.common.kt +++ b/kotlinx-coroutines-core/common/src/internal/ThreadSafeHeap.kt @@ -33,7 +33,7 @@ public open class ThreadSafeHeap : SynchronizedObject() where T: ThreadSafeHe public val isEmpty: Boolean get() = size == 0 public fun clear() = synchronized(this) { - a?.let { clear(it) } + a?.fill(null) _size.value = 0 } @@ -157,5 +157,3 @@ public open class ThreadSafeHeap : SynchronizedObject() where T: ThreadSafeHe nj.index = j } } - -internal expect fun clear(a: Array) \ No newline at end of file diff --git a/kotlinx-coroutines-core/common/src/sync/Semaphore.kt b/kotlinx-coroutines-core/common/src/sync/Semaphore.kt index 6ab377da16..a9df15cf49 100644 --- a/kotlinx-coroutines-core/common/src/sync/Semaphore.kt +++ b/kotlinx-coroutines-core/common/src/sync/Semaphore.kt @@ -166,7 +166,8 @@ private class CancelSemaphoreAcquisitionHandler( private val index: Int ) : CancelHandler() { override fun invoke(cause: Throwable?) { - semaphore.incPermits() + val p = semaphore.incPermits() + if (p >= 0) return if (segment.cancel(index)) return semaphore.resumeNextFromQueue() } diff --git a/common/kotlinx-coroutines-core-common/test/channels/ChannelReceiveOrClosedTest.kt b/kotlinx-coroutines-core/common/test/channels/ChannelReceiveOrClosedTest.kt similarity index 86% rename from common/kotlinx-coroutines-core-common/test/channels/ChannelReceiveOrClosedTest.kt rename to kotlinx-coroutines-core/common/test/channels/ChannelReceiveOrClosedTest.kt index 303e6d15e2..e58b0deed5 100644 --- a/common/kotlinx-coroutines-core-common/test/channels/ChannelReceiveOrClosedTest.kt +++ b/kotlinx-coroutines-core/common/test/channels/ChannelReceiveOrClosedTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.channels @@ -17,7 +17,6 @@ class ChannelReceiveOrClosedTest : TestBase() { } val element = channel.receiveOrClosed() - assertTrue(element.isValue) assertTrue(element.value is TestException1) assertTrue(element.valueOrNull is TestException1) @@ -42,7 +41,6 @@ class ChannelReceiveOrClosedTest : TestBase() { expect(1) val element = channel.receiveOrClosed() - assertTrue(element.isValue) assertEquals(1, element.value) assertEquals(1, element.valueOrNull) assertEquals("Value(1)", element.toString()) @@ -50,7 +48,6 @@ class ChannelReceiveOrClosedTest : TestBase() { expect(4) val nullElement = channel.receiveOrClosed() - assertTrue(nullElement.isValue) assertNull(nullElement.value) assertNull(nullElement.valueOrNull) assertEquals("Value(null)", nullElement.toString()) @@ -62,7 +59,7 @@ class ChannelReceiveOrClosedTest : TestBase() { val closed2 = channel.receiveOrClosed() assertTrue(closed2.isClosed) - assertTrue(closed2.closeCause is ClosedReceiveChannelException) + assertNull(closed2.closeCause) finish(7) } @@ -100,7 +97,6 @@ class ChannelReceiveOrClosedTest : TestBase() { expect(1) val closed = channel.receiveOrClosed() assertTrue(closed.isClosed) - assertTrue(closed.closeCause is ClosedReceiveChannelException) finish(3) } @@ -115,27 +111,25 @@ class ChannelReceiveOrClosedTest : TestBase() { } val intResult = channel.receiveOrClosed() - assertTrue(intResult.isValue) assertEquals(1u, intResult.value.value) val closeCauseResult = channel.receiveOrClosed() - assertTrue(closeCauseResult.isValue) assertTrue(closeCauseResult.value.closeCause is TestException1) val closeCause = channel.receiveOrClosed() assertTrue(closeCause.isClosed) assertTrue(closeCause.closeCause is TestException2) - assertFailsWith { closeCause.valueOrThrow } } @Test fun testToString() = runTest { val channel = Channel(1) channel.send("message") - channel.close(TestException1()) + channel.close(TestException1("OK")) assertEquals("Value(message)", channel.receiveOrClosed().toString()) // toString implementation for exception differs on every platform val str = channel.receiveOrClosed().toString() - assertTrue(str.matches("Closed\\(.*TestException1\\)".toRegex())) + if (!str.matches("Closed\\(.*TestException1: OK\\)".toRegex())) + error("Unexpected string: '$str'") } } diff --git a/kotlinx-coroutines-core/common/test/flow/operators/CombineTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/CombineTest.kt index 637cb3d697..a619355b68 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/CombineTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/CombineTest.kt @@ -260,6 +260,11 @@ class CombineIterableTest : CombineTestBase() { combineOriginal(listOf(this, other)) { args -> transform(args[0] as T1, args[1] as T2) } } +class CombineTransformAdapterTest : CombineTestBase() { + override fun Flow.combineLatest(other: Flow, transform: suspend (T1, T2) -> R): Flow = + combineTransformOriginal(flow = this, flow2 = other) { a1, a2 -> emit(transform(a1, a2)) } +} + class CombineTransformVarargAdapterTest : CombineTestBase() { override fun Flow.combineLatest(other: Flow, transform: suspend (T1, T2) -> R): Flow = combineTransformOriginal(this, other) { args: Array -> emit(transform(args[0] as T1, args[1] as T2)) } diff --git a/kotlinx-coroutines-core/js/src/internal/ThreadSafeHeap.kt b/kotlinx-coroutines-core/js/src/internal/ThreadSafeHeap.kt deleted file mode 100644 index f966c999b4..0000000000 --- a/kotlinx-coroutines-core/js/src/internal/ThreadSafeHeap.kt +++ /dev/null @@ -1,10 +0,0 @@ -/* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ - -package kotlinx.coroutines.internal - -@Suppress("NOTHING_TO_INLINE") -internal actual inline fun clear(a: Array) { - for (i in a.indices) a[i] = null -} \ No newline at end of file diff --git a/kotlinx-coroutines-core/jvm/src/internal/ThreadSafeHeap.kt b/kotlinx-coroutines-core/jvm/src/internal/ThreadSafeHeap.kt deleted file mode 100644 index 661a6bcc08..0000000000 --- a/kotlinx-coroutines-core/jvm/src/internal/ThreadSafeHeap.kt +++ /dev/null @@ -1,8 +0,0 @@ -/* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ - -package kotlinx.coroutines.internal - -@Suppress("NOTHING_TO_INLINE") -internal actual inline fun clear(a: Array) = a.fill(null) \ No newline at end of file diff --git a/kotlinx-coroutines-core/jvm/test/TestBase.kt b/kotlinx-coroutines-core/jvm/test/TestBase.kt index 32007a0482..01daa4a8a8 100644 --- a/kotlinx-coroutines-core/jvm/test/TestBase.kt +++ b/kotlinx-coroutines-core/jvm/test/TestBase.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines @@ -37,7 +37,7 @@ public actual open class TestBase actual constructor() { /** * Is `true` when running in a nightly stress test mode. */ - public actual val isStressTest = System.getProperty("stressTest") != null + public actual val isStressTest = System.getProperty("stressTest")?.toBoolean() ?: false public val stressTestMultiplierSqrt = if (isStressTest) 5 else 1 diff --git a/kotlinx-coroutines-core/jvm/test/channels/ChannelLFStressTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ChannelLFStressTest.kt new file mode 100644 index 0000000000..67bd68ac14 --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/channels/ChannelLFStressTest.kt @@ -0,0 +1,110 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.channels + +import kotlinx.atomicfu.* +import kotlinx.coroutines.* +import java.util.concurrent.atomic.AtomicLong +import java.util.concurrent.atomic.AtomicLongArray +import kotlin.math.* +import kotlin.test.* + +/** + * Tests lock-freedom of send and receive operations on rendezvous and conflated channels. + * There is a single channel with two sender and two receiver threads. + * When one sender or receiver gets suspended at most one other operation is allowed to cease having progress + * (`allowSuspendedThreads = 1`). + * + * **Note**: In the current implementation buffered channels are not lock-free, so this test would fail + * if channel is created with a buffer. + */ +class ChannelLFStressTest : TestBase() { + private val nSeconds = 5 * stressTestMultiplier + private val env = LockFreedomTestEnvironment("ChannelLFStressTest", allowSuspendedThreads = 1) + private lateinit var channel: Channel + + private val sendIndex = AtomicLong() + private val receiveCount = AtomicLong() + private val duplicateCount = AtomicLong() + + private val nCheckedSize = 10_000_000 + private val nChecked = (nCheckedSize * Long.SIZE_BITS).toLong() + private val receivedBits = AtomicLongArray(nCheckedSize) // bit set of received values + + @Test + fun testRendezvousLockFreedom() { + channel = Channel() + performLockFreedomTest() + // ensure that all sent were received + checkAllReceived() + } + + @Test + fun testConflatedLockFreedom() { + // This test does not really verify that all sent elements were received + // and checks only LF property + channel = Channel(Channel.CONFLATED) + performLockFreedomTest() + } + + private fun performLockFreedomTest() { + env.onCompletion { channel.close() } + repeat(2) { env.testThread { sender() } } + repeat(2) { env.testThread { receiver() } } + env.performTest(nSeconds) { + println("Sent: $sendIndex, Received: $receiveCount, dups: $duplicateCount") + } + // ensure no duplicates + assertEquals(0L, duplicateCount.get()) + } + + private fun checkAllReceived() { + for (i in 0 until min(sendIndex.get(), nChecked)) { + assertTrue(isReceived(i)) + } + } + + private suspend fun sender() { + val value = sendIndex.getAndIncrement() + try { + channel.send(value) + } catch (e: ClosedSendChannelException) { + check(env.isCompleted) // expected when test was completed + markReceived(value) // fake received (actually failed to send) + } + } + + private suspend fun receiver() { + val value = try { + channel.receive() + } catch (e: ClosedReceiveChannelException) { + check(env.isCompleted) // expected when test was completed + return + } + receiveCount.incrementAndGet() + markReceived(value) + } + + private fun markReceived(value: Long) { + if (value >= nChecked) return // too big + val index = (value / Long.SIZE_BITS).toInt() + val mask = 1L shl (value % Long.SIZE_BITS).toInt() + while (true) { + val bits = receivedBits.get(index) + if (bits and mask != 0L) { + duplicateCount.incrementAndGet() + break + } + if (receivedBits.compareAndSet(index, bits, bits or mask)) break + } + } + + private fun isReceived(value: Long): Boolean { + val index = (value / Long.SIZE_BITS).toInt() + val mask = 1L shl (value % Long.SIZE_BITS).toInt() + val bits = receivedBits.get(index) + return bits and mask != 0L + } +} diff --git a/kotlinx-coroutines-core/jvm/test/guide/test/TestUtil.kt b/kotlinx-coroutines-core/jvm/test/guide/test/TestUtil.kt index c72b60bd53..bd7159fb08 100644 --- a/kotlinx-coroutines-core/jvm/test/guide/test/TestUtil.kt +++ b/kotlinx-coroutines-core/jvm/test/guide/test/TestUtil.kt @@ -26,7 +26,7 @@ private const val SHUTDOWN_TIMEOUT = 5000L // 5 sec at most to wait private val OUT_ENABLED = systemProp("guide.tests.sout", false) @Suppress("DEPRECATION") -fun test(name: String, block: () -> Unit): List = outputException(name) { +fun test(name: String, block: () -> R): List = outputException(name) { val sout = System.out val oldOut = if (OUT_ENABLED) System.out else NullOut val oldErr = System.err @@ -45,7 +45,8 @@ fun test(name: String, block: () -> Unit): List = outputException(name) var bytes = ByteArray(0) withVirtualTimeSource(oldOut) { try { - block() + val result = block() + require(result === Unit) { "Test 'main' shall return Unit" } } catch (e: Throwable) { System.err.print("Exception in thread \"main\" ") e.printStackTrace() diff --git a/kotlinx-coroutines-core/jvm/test/internal/LockFreeLinkedListAtomicStressLFTest.kt b/kotlinx-coroutines-core/jvm/test/internal/LockFreeLinkedListAtomicLFStressTest.kt similarity index 97% rename from kotlinx-coroutines-core/jvm/test/internal/LockFreeLinkedListAtomicStressLFTest.kt rename to kotlinx-coroutines-core/jvm/test/internal/LockFreeLinkedListAtomicLFStressTest.kt index 46f81d64b1..af2de24e1a 100644 --- a/kotlinx-coroutines-core/jvm/test/internal/LockFreeLinkedListAtomicStressLFTest.kt +++ b/kotlinx-coroutines-core/jvm/test/internal/LockFreeLinkedListAtomicLFStressTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.internal @@ -16,8 +16,8 @@ import java.util.concurrent.atomic.AtomicReference * This stress test has 4 threads adding randomly to the list and them immediately undoing * this addition by remove, and 4 threads trying to remove nodes from two lists simultaneously (atomically). */ -class LockFreeLinkedListAtomicStressLFTest : TestBase() { - private val env = LockFreedomTestEnvironment("LockFreeLinkedListAtomicStressLFTest") +class LockFreeLinkedListAtomicLFStressTest : TestBase() { + private val env = LockFreedomTestEnvironment("LockFreeLinkedListAtomicLFStressTest") data class IntNode(val i: Int) : LockFreeLinkedListNode() diff --git a/kotlinx-coroutines-core/jvm/test/internal/SegmentQueueLFTest.kt b/kotlinx-coroutines-core/jvm/test/internal/SegmentQueueLCStressTest.kt similarity index 73% rename from kotlinx-coroutines-core/jvm/test/internal/SegmentQueueLFTest.kt rename to kotlinx-coroutines-core/jvm/test/internal/SegmentQueueLCStressTest.kt index b6faf683fb..c8493f6f30 100644 --- a/kotlinx-coroutines-core/jvm/test/internal/SegmentQueueLFTest.kt +++ b/kotlinx-coroutines-core/jvm/test/internal/SegmentQueueLCStressTest.kt @@ -1,3 +1,7 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + package kotlinx.coroutines.internal import com.devexperts.dxlab.lincheck.LinChecker @@ -8,7 +12,7 @@ import com.devexperts.dxlab.lincheck.strategy.stress.StressCTest import org.junit.Test @StressCTest -class SegmentQueueLFTest { +class SegmentQueueLCStressTest { private val q = SegmentBasedQueue() @Operation @@ -21,6 +25,6 @@ class SegmentQueueLFTest { @Test fun test() { - LinChecker.check(SegmentQueueLFTest::class.java) + LinChecker.check(SegmentQueueLCStressTest::class.java) } } \ No newline at end of file diff --git a/kotlinx-coroutines-core/jvm/test/linearizability/ChannelIsClosedLinearizabilityTest.kt b/kotlinx-coroutines-core/jvm/test/linearizability/ChannelIsClosedLCStressTest.kt similarity index 88% rename from kotlinx-coroutines-core/jvm/test/linearizability/ChannelIsClosedLinearizabilityTest.kt rename to kotlinx-coroutines-core/jvm/test/linearizability/ChannelIsClosedLCStressTest.kt index 9b0d9950f5..44ba182dd3 100644 --- a/kotlinx-coroutines-core/jvm/test/linearizability/ChannelIsClosedLinearizabilityTest.kt +++ b/kotlinx-coroutines-core/jvm/test/linearizability/ChannelIsClosedLCStressTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ @file:Suppress("unused") @@ -15,7 +15,7 @@ import org.junit.* import java.io.* @Param(name = "value", gen = IntGen::class, conf = "1:3") -class ChannelIsClosedLinearizabilityTest : TestBase() { +class ChannelIsClosedLCStressTest : TestBase() { private val lt = LinTesting() private val channel = Channel() @@ -49,6 +49,6 @@ class ChannelIsClosedLinearizabilityTest : TestBase() { .threads(3) .verifier(LinVerifier::class.java) - LinChecker.check(ChannelIsClosedLinearizabilityTest::class.java, options) + LinChecker.check(ChannelIsClosedLCStressTest::class.java, options) } } diff --git a/kotlinx-coroutines-core/jvm/test/linearizability/ChannelLinearizabilityTest.kt b/kotlinx-coroutines-core/jvm/test/linearizability/ChannelLCStressTest.kt similarity index 89% rename from kotlinx-coroutines-core/jvm/test/linearizability/ChannelLinearizabilityTest.kt rename to kotlinx-coroutines-core/jvm/test/linearizability/ChannelLCStressTest.kt index 4060e5523c..f4b775631f 100644 --- a/kotlinx-coroutines-core/jvm/test/linearizability/ChannelLinearizabilityTest.kt +++ b/kotlinx-coroutines-core/jvm/test/linearizability/ChannelLCStressTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ @file:Suppress("unused") @@ -15,7 +15,7 @@ import org.junit.* import java.io.* @Param(name = "value", gen = IntGen::class, conf = "1:3") -class ChannelLinearizabilityTest : TestBase() { +class ChannelLCStressTest : TestBase() { private companion object { // Emulating ctor argument for lincheck @@ -62,12 +62,12 @@ class ChannelLinearizabilityTest : TestBase() { fun testUnlimitedChannelLinearizability() = runTest(Channel.UNLIMITED) private fun runTest(capacity: Int) { - ChannelLinearizabilityTest.capacity = capacity + ChannelLCStressTest.capacity = capacity val options = StressOptions() .iterations(50 * stressTestMultiplierSqrt) .invocationsPerIteration(500 * stressTestMultiplierSqrt) .threads(3) .verifier(LinVerifier::class.java) - LinChecker.check(ChannelLinearizabilityTest::class.java, options) + LinChecker.check(ChannelLCStressTest::class.java, options) } } diff --git a/kotlinx-coroutines-core/jvm/test/linearizability/LockFreeListLinearizabilityTest.kt b/kotlinx-coroutines-core/jvm/test/linearizability/LockFreeListLCStressTest.kt similarity index 89% rename from kotlinx-coroutines-core/jvm/test/linearizability/LockFreeListLinearizabilityTest.kt rename to kotlinx-coroutines-core/jvm/test/linearizability/LockFreeListLCStressTest.kt index 36b4c04329..546615489b 100644 --- a/kotlinx-coroutines-core/jvm/test/linearizability/LockFreeListLinearizabilityTest.kt +++ b/kotlinx-coroutines-core/jvm/test/linearizability/LockFreeListLCStressTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ @file:Suppress("unused") @@ -14,7 +14,7 @@ import kotlinx.coroutines.internal.* import kotlin.test.* @Param(name = "value", gen = IntGen::class, conf = "1:3") -class LockFreeListLinearizabilityTest : TestBase() { +class LockFreeListLCStressTest : TestBase() { class Node(val value: Int): LockFreeLinkedListNode() private val q: LockFreeLinkedListHead = LockFreeLinkedListHead() @@ -49,7 +49,7 @@ class LockFreeListLinearizabilityTest : TestBase() { .iterations(100 * stressTestMultiplierSqrt) .invocationsPerIteration(1000 * stressTestMultiplierSqrt) .threads(3) - LinChecker.check(LockFreeListLinearizabilityTest::class.java, options) + LinChecker.check(LockFreeListLCStressTest::class.java, options) } private var _curElements: ArrayList? = null @@ -62,7 +62,7 @@ class LockFreeListLinearizabilityTest : TestBase() { } override fun equals(other: Any?): Boolean { - other as LockFreeListLinearizabilityTest + other as LockFreeListLCStressTest return curElements == other.curElements } diff --git a/kotlinx-coroutines-core/jvm/test/linearizability/LockFreeTaskQueueLinearizabilityTest.kt b/kotlinx-coroutines-core/jvm/test/linearizability/LockFreeTaskQueueLCStressTest.kt similarity index 85% rename from kotlinx-coroutines-core/jvm/test/linearizability/LockFreeTaskQueueLinearizabilityTest.kt rename to kotlinx-coroutines-core/jvm/test/linearizability/LockFreeTaskQueueLCStressTest.kt index 78ad2627be..ea2afa1019 100644 --- a/kotlinx-coroutines-core/jvm/test/linearizability/LockFreeTaskQueueLinearizabilityTest.kt +++ b/kotlinx-coroutines-core/jvm/test/linearizability/LockFreeTaskQueueLCStressTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ @file:Suppress("unused") @@ -19,7 +19,7 @@ internal data class Snapshot(val elements: List, val isClosed: Boolean) { @OpGroupConfig.OpGroupConfigs(OpGroupConfig(name = "consumer", nonParallel = true)) @Param(name = "value", gen = IntGen::class, conf = "1:3") -class LockFreeTaskQueueLinearizabilityTestSC : LockFreeTaskQueueLinearizabilityTestBase() { +class SCLockFreeTaskQueueLCStressTest : LockFreeTaskQueueLCTestBase() { private val q: LockFreeTaskQueue = LockFreeTaskQueue(singleConsumer = true) @Operation @@ -42,7 +42,7 @@ class LockFreeTaskQueueLinearizabilityTestSC : LockFreeTaskQueueLinearizabilityT if (this === other) return true if (javaClass != other?.javaClass) return false - other as LockFreeTaskQueueLinearizabilityTestSC + other as SCLockFreeTaskQueueLCStressTest return Snapshot(q) == Snapshot(other.q) } @@ -51,7 +51,7 @@ class LockFreeTaskQueueLinearizabilityTestSC : LockFreeTaskQueueLinearizabilityT } @Param(name = "value", gen = IntGen::class, conf = "1:3") -class LockFreeTaskQueueLinearizabilityTestMC : LockFreeTaskQueueLinearizabilityTestBase() { +class MCLockFreeTaskQueueLCStressTest : LockFreeTaskQueueLCTestBase() { private val q: LockFreeTaskQueue = LockFreeTaskQueue(singleConsumer = false) @Operation @@ -74,7 +74,7 @@ class LockFreeTaskQueueLinearizabilityTestMC : LockFreeTaskQueueLinearizabilityT if (this === other) return true if (javaClass != other?.javaClass) return false - other as LockFreeTaskQueueLinearizabilityTestMC + other as MCLockFreeTaskQueueLCStressTest return Snapshot(q) == Snapshot(other.q) } @@ -82,7 +82,7 @@ class LockFreeTaskQueueLinearizabilityTestMC : LockFreeTaskQueueLinearizabilityT override fun hashCode(): Int = Snapshot(q).hashCode() } -open class LockFreeTaskQueueLinearizabilityTestBase : TestBase() { +open class LockFreeTaskQueueLCTestBase : TestBase() { fun linTest() { val options = StressOptions() .iterations(100 * stressTestMultiplierSqrt) diff --git a/kotlinx-coroutines-core/jvm/test/sync/SemaphoreStressTest.kt b/kotlinx-coroutines-core/jvm/test/sync/SemaphoreStressTest.kt index cdfcc6bded..e872b52708 100644 --- a/kotlinx-coroutines-core/jvm/test/sync/SemaphoreStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/sync/SemaphoreStressTest.kt @@ -2,6 +2,7 @@ package kotlinx.coroutines.sync import kotlinx.coroutines.* import org.junit.Test +import org.junit.After import kotlin.test.assertEquals class SemaphoreStressTest : TestBase() { @@ -57,4 +58,39 @@ class SemaphoreStressTest : TestBase() { semaphore.release() assertEquals(1, semaphore.availablePermits) } -} \ No newline at end of file + + /** + * This checks if repeated releases that race with cancellations put + * the semaphore into an incorrect state where permits are leaked. + */ + @Test + fun stressReleaseCancelRace() = runTest { + val n = 10_000 * stressTestMultiplier + val semaphore = Semaphore(1, 1) + newSingleThreadContext("SemaphoreStressTest").use { pool -> + repeat (n) { + // Initially, we hold the permit and no one else can `acquire`, + // otherwise it's a bug. + assertEquals(0, semaphore.availablePermits) + var job1_entered_critical_section = false + val job1 = launch(start = CoroutineStart.UNDISPATCHED) { + semaphore.acquire() + job1_entered_critical_section = true + semaphore.release() + } + // check that `job1` didn't finish the call to `acquire()` + assertEquals(false, job1_entered_critical_section) + val job2 = launch(pool) { + semaphore.release() + } + // Because `job2` executes in a separate thread, this + // cancellation races with the call to `release()`. + job1.cancelAndJoin() + job2.join() + assertEquals(1, semaphore.availablePermits) + semaphore.acquire() + } + } + } + +} diff --git a/kotlinx-coroutines-core/native/src/internal/ThreadSafeHeap.kt b/kotlinx-coroutines-core/native/src/internal/ThreadSafeHeap.kt deleted file mode 100644 index f966c999b4..0000000000 --- a/kotlinx-coroutines-core/native/src/internal/ThreadSafeHeap.kt +++ /dev/null @@ -1,10 +0,0 @@ -/* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ - -package kotlinx.coroutines.internal - -@Suppress("NOTHING_TO_INLINE") -internal actual inline fun clear(a: Array) { - for (i in a.indices) a[i] = null -} \ No newline at end of file diff --git a/kotlinx-coroutines-core/npm/README.md b/kotlinx-coroutines-core/npm/README.md new file mode 100644 index 0000000000..7f88ea393f --- /dev/null +++ b/kotlinx-coroutines-core/npm/README.md @@ -0,0 +1,19 @@ +# kotlinx.coroutines + +Library support for Kotlin coroutines in +[Kotlin/JS](https://kotlinlang.org/docs/reference/js-overview.html). + +```kotlin +suspend fun main() = coroutineScope { + launch { + delay(1000) + println("Kotlin Coroutines World!") + } + println("Hello") +} +``` + +## Documentation + +* [Guide to kotlinx.coroutines by example on JVM](https://kotlinlang.org/docs/reference/coroutines/coroutines-guide.html) (**read it first**) +* [Full kotlinx.coroutines API reference](https://kotlin.github.io/kotlinx.coroutines) diff --git a/kotlinx-coroutines-core/npm/package.json b/kotlinx-coroutines-core/npm/package.json new file mode 100644 index 0000000000..5dda39433d --- /dev/null +++ b/kotlinx-coroutines-core/npm/package.json @@ -0,0 +1,26 @@ +{ + "name": "kotlinx-coroutines-core", + "version" : "$version", + "description" : "Library support for Kotlin coroutines", + "main" : "kotlinx-coroutines-core.js", + "author": "JetBrains", + "license": "Apache-2.0", + "homepage": "https://github.com/Kotlin/kotlinx.coroutines", + "bugs": { + "url": "https://github.com/Kotlin/kotlinx.coroutines/issues" + }, + "repository": { + "type": "git", + "url": "git+https://github.com/Kotlin/kotlinx.coroutines.git" + }, + "keywords": [ + "Kotlin", + "async", + "coroutines", + "JavaScript", + "JetBrains" + ], + "peerDependencies": { + $kotlinDependency + } +} diff --git a/kotlinx-coroutines-debug/README.md b/kotlinx-coroutines-debug/README.md index fbbf8eaf06..ca2a05b628 100644 --- a/kotlinx-coroutines-debug/README.md +++ b/kotlinx-coroutines-debug/README.md @@ -18,7 +18,7 @@ of coroutines hierarchy referenced by a [Job] or [CoroutineScope] instances usin Add `kotlinx-coroutines-debug` to your project test dependencies: ``` dependencies { - testImplementation 'org.jetbrains.kotlinx:kotlinx-coroutines-debug:1.3.0' + testImplementation 'org.jetbrains.kotlinx:kotlinx-coroutines-debug:1.3.1' } ``` @@ -57,7 +57,7 @@ stacktraces will be dumped to the console. ### Using as JVM agent It is possible to use this module as a standalone JVM agent to enable debug probes on the application startup. -You can run your application with an additional argument: `-javaagent:kotlinx-coroutines-debug-1.3.0.jar`. +You can run your application with an additional argument: `-javaagent:kotlinx-coroutines-debug-1.3.1.jar`. Additionally, on Linux and Mac OS X you can use `kill -5 $pid` command in order to force your application to print all alive coroutines. diff --git a/kotlinx-coroutines-debug/build.gradle b/kotlinx-coroutines-debug/build.gradle index 296566b645..420a88bd38 100644 --- a/kotlinx-coroutines-debug/build.gradle +++ b/kotlinx-coroutines-debug/build.gradle @@ -1,15 +1,27 @@ -import org.w3c.dom.Element - /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ apply plugin: "com.github.johnrengelman.shadow" +configurations { + shadowDeps // shaded dependencies, not included into the resulting .pom file + compileOnly.extendsFrom(shadowDeps) + runtimeOnly.extendsFrom(shadowDeps) + + /* + * It is possible to extend a particular configuration with shadow, + * but in that case it changes dependency type to "runtime" and resolves it + * (so it cannot be further modified). Otherwise, shadow just ignores all dependencies. + */ + shadow.extendsFrom(compile) // shadow - resulting configuration with shaded jar file + configureKotlinJvmPlatform(shadow) +} + dependencies { compileOnly "junit:junit:$junit_version" - compile "net.bytebuddy:byte-buddy:$byte_buddy_version" - compile "net.bytebuddy:byte-buddy-agent:$byte_buddy_version" + shadowDeps "net.bytebuddy:byte-buddy:$byte_buddy_version" + shadowDeps "net.bytebuddy:byte-buddy-agent:$byte_buddy_version" } jar { @@ -18,42 +30,10 @@ jar { attributes "Can-Redefine-Classes": "true" } } -/* - * It is possible to extend a particular configuration with shadow, - * but in that case it changes dependency type to "runtime" and resolves it - * (so it cannot be further modified). Otherwise, shadow just ignores all dependencies. - */ -configurations.shadow.extendsFrom(configurations.compile) - -/* - * Thus we are rewriting the POM. I am really question my existence at this point. - */ -project.ext.configureMavenDependencies = { - def root = it.asElement() as Element - def dependencies = root.getChildNodes().find { it.nodeName == "dependencies" }.childNodes - def childrenToRemove = [] - for (i in 0..dependencies.length - 1) { - def dependency = dependencies.item(i) as Element - def scope = dependency.getChildNodes().find { it.nodeName == "scope" } as Element - def groupId = dependency.getChildNodes().find { it.nodeName == "groupId" } as Element - if (groupId != null && groupId.firstChild.nodeValue == "net.bytebuddy") { - childrenToRemove.add(dependency) - } else if (scope != null) { - scope.firstChild.setNodeValue("compile") - } - } - - childrenToRemove.each { - root.getChildNodes().find { it.nodeName == "dependencies" }.removeChild(it) - } -} shadowJar { classifier null // Shadow only byte buddy, do not package kotlin stdlib - dependencies { - include(dependency("net.bytebuddy:byte-buddy:$byte_buddy_version")) - include(dependency("net.bytebuddy:byte-buddy-agent:$byte_buddy_version")) - } + configurations = [project.configurations.shadowDeps] relocate 'net.bytebuddy', 'kotlinx.coroutines.repackaged.net.bytebuddy' } diff --git a/kotlinx-coroutines-test/README.md b/kotlinx-coroutines-test/README.md index 23ec0d71fd..014c53b702 100644 --- a/kotlinx-coroutines-test/README.md +++ b/kotlinx-coroutines-test/README.md @@ -9,7 +9,7 @@ This package provides testing utilities for effectively testing coroutines. Add `kotlinx-coroutines-test` to your project test dependencies: ``` dependencies { - testImplementation 'org.jetbrains.kotlinx:kotlinx-coroutines-test:1.3.0' + testImplementation 'org.jetbrains.kotlinx:kotlinx-coroutines-test:1.3.1' } ``` diff --git a/ui/coroutines-guide-ui.md b/ui/coroutines-guide-ui.md index 011ed6b682..049c944121 100644 --- a/ui/coroutines-guide-ui.md +++ b/ui/coroutines-guide-ui.md @@ -165,7 +165,7 @@ Add dependencies on `kotlinx-coroutines-android` module to the `dependencies { . `app/build.gradle` file: ```groovy -implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.0" +implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.1" ``` You can clone [kotlinx.coroutines](https://github.com/Kotlin/kotlinx.coroutines) project from GitHub onto your diff --git a/ui/kotlinx-coroutines-android/animation-app/gradle.properties b/ui/kotlinx-coroutines-android/animation-app/gradle.properties index e4fedb2a12..8e119d7159 100644 --- a/ui/kotlinx-coroutines-android/animation-app/gradle.properties +++ b/ui/kotlinx-coroutines-android/animation-app/gradle.properties @@ -19,5 +19,5 @@ org.gradle.jvmargs=-Xmx1536m kotlin.coroutines=enable kotlin_version=1.3.50 -coroutines_version=1.3.0 +coroutines_version=1.3.1 diff --git a/ui/kotlinx-coroutines-android/example-app/gradle.properties b/ui/kotlinx-coroutines-android/example-app/gradle.properties index e4fedb2a12..8e119d7159 100644 --- a/ui/kotlinx-coroutines-android/example-app/gradle.properties +++ b/ui/kotlinx-coroutines-android/example-app/gradle.properties @@ -19,5 +19,5 @@ org.gradle.jvmargs=-Xmx1536m kotlin.coroutines=enable kotlin_version=1.3.50 -coroutines_version=1.3.0 +coroutines_version=1.3.1