From e710048362179c93fdb2dfb2000eaffe5eadfbf3 Mon Sep 17 00:00:00 2001 From: Roman Elizarov Date: Tue, 3 Nov 2020 23:04:19 +0300 Subject: [PATCH 1/2] SharedFlow: Fix scenario with concurrent emitters and cancellation of subscriber (#2359) * Added a specific test for a problematic scenario. * Added stress test with concurrent emitters and subscribers that come and go. Fixes #2356 --- .../common/src/flow/SharedFlow.kt | 6 ++ .../flow/sharing/SharedFlowScenarioTest.kt | 42 +++++++++ .../jvm/test/flow/SharedFlowStressTest.kt | 87 +++++++++++++++++++ 3 files changed, 135 insertions(+) create mode 100644 kotlinx-coroutines-core/jvm/test/flow/SharedFlowStressTest.kt diff --git a/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt b/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt index 427041a7bb..feb2749595 100644 --- a/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt +++ b/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt @@ -498,6 +498,12 @@ private class SharedFlowImpl( } // Compute new buffer size -> how many values we now actually have after resume val newBufferSize1 = (newBufferEndIndex - head).toInt() + // Note: When nCollectors == 0 we resume ALL queued emitters and we might have resumed more than bufferCapacity, + // and newMinCollectorIndex might pointing the wrong place because of that. The easiest way to fix it is by + // forcing newMinCollectorIndex = newBufferEndIndex. We do not needed to update newBufferSize1 (which could be + // too big), because the only use of newBufferSize1 in the below code is in the minOf(replay, newBufferSize1) + // expression, which coerces values that are too big anyway. + if (nCollectors == 0) newMinCollectorIndex = newBufferEndIndex // Compute new replay size -> limit to replay the number of items we need, take into account that it can only grow var newReplayIndex = maxOf(replayIndex, newBufferEndIndex - minOf(replay, newBufferSize1)) // adjustment for synchronous case with cancelled emitter (NO_VALUE) diff --git a/kotlinx-coroutines-core/common/test/flow/sharing/SharedFlowScenarioTest.kt b/kotlinx-coroutines-core/common/test/flow/sharing/SharedFlowScenarioTest.kt index c3eb2dac04..794553b482 100644 --- a/kotlinx-coroutines-core/common/test/flow/sharing/SharedFlowScenarioTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/sharing/SharedFlowScenarioTest.kt @@ -201,6 +201,48 @@ class SharedFlowScenarioTest : TestBase() { emitResumes(e3); expectReplayOf(3) } + @Test + fun testSuspendedConcurrentEmitAndCancelSubscriberReplay1() = + testSharedFlow(MutableSharedFlow(1)) { + val a = subscribe("a"); + emitRightNow(0); expectReplayOf(0) + collect(a, 0) + emitRightNow(1); expectReplayOf(1) + val e2 = emitSuspends(2) // suspends until 1 is collected + val e3 = emitSuspends(3) // suspends until 1 is collected, too + cancel(a) // must resume emitters 2 & 3 + emitResumes(e2) + emitResumes(e3) + expectReplayOf(3) // but replay size is 1 so only 3 should be kept + // Note: originally, SharedFlow was in a broken state here with 3 elements in the buffer + val b = subscribe("b") + collect(b, 3) + emitRightNow(4); expectReplayOf(4) + collect(b, 4) + } + + @Test + fun testSuspendedConcurrentEmitAndCancelSubscriberReplay1ExtraBuffer1() = + testSharedFlow(MutableSharedFlow( replay = 1, extraBufferCapacity = 1)) { + val a = subscribe("a"); + emitRightNow(0); expectReplayOf(0) + collect(a, 0) + emitRightNow(1); expectReplayOf(1) + emitRightNow(2); expectReplayOf(2) + val e3 = emitSuspends(3) // suspends until 1 is collected + val e4 = emitSuspends(4) // suspends until 1 is collected, too + val e5 = emitSuspends(5) // suspends until 1 is collected, too + cancel(a) // must resume emitters 3, 4, 5 + emitResumes(e3) + emitResumes(e4) + emitResumes(e5) + expectReplayOf(5) + val b = subscribe("b") + collect(b, 5) + emitRightNow(6); expectReplayOf(6) + collect(b, 6) + } + private fun testSharedFlow( sharedFlow: MutableSharedFlow, scenario: suspend ScenarioDsl.() -> Unit diff --git a/kotlinx-coroutines-core/jvm/test/flow/SharedFlowStressTest.kt b/kotlinx-coroutines-core/jvm/test/flow/SharedFlowStressTest.kt new file mode 100644 index 0000000000..349b7c8121 --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/flow/SharedFlowStressTest.kt @@ -0,0 +1,87 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.flow + +import kotlinx.atomicfu.* +import kotlinx.coroutines.* +import org.junit.* +import org.junit.Test +import kotlin.collections.ArrayList +import kotlin.test.* +import kotlin.time.* + +@ExperimentalTime +class SharedFlowStressTest : TestBase() { + private val nProducers = 5 + private val nConsumers = 3 + private val nSeconds = 3 * stressTestMultiplier + + private lateinit var sf: MutableSharedFlow + private lateinit var view: SharedFlow + + @get:Rule + val producerDispatcher = ExecutorRule(nProducers) + @get:Rule + val consumerDispatcher = ExecutorRule(nConsumers) + + private val totalProduced = atomic(0L) + private val totalConsumed = atomic(0L) + + @Test + fun testStressReplay1() = + testStress(1, 0) + + @Test + fun testStressReplay1ExtraBuffer1() = + testStress(1, 1) + + @Test + fun testStressReplay2ExtraBuffer1() = + testStress(2, 1) + + private fun testStress(replay: Int, extraBufferCapacity: Int) = runTest { + sf = MutableSharedFlow(replay, extraBufferCapacity) + view = sf.asSharedFlow() + val jobs = ArrayList() + jobs += List(nProducers) { producerIndex -> + launch(producerDispatcher) { + var cur = producerIndex.toLong() + while (isActive) { + sf.emit(cur) + totalProduced.incrementAndGet() + cur += nProducers + } + } + } + jobs += List(nConsumers) { consumerIndex -> + launch(consumerDispatcher) { + while (isActive) { + view + .dropWhile { it % nConsumers != consumerIndex.toLong() } + .take(1) + .collect { + check(it % nConsumers == consumerIndex.toLong()) + totalConsumed.incrementAndGet() + } + } + } + } + var lastProduced = 0L + var lastConsumed = 0L + for (sec in 1..nSeconds) { + delay(1.seconds) + val produced = totalProduced.value + val consumed = totalConsumed.value + println("$sec sec: produced = $produced; consumed = $consumed") + assertNotEquals(lastProduced, produced) + assertNotEquals(lastConsumed, consumed) + lastProduced = produced + lastConsumed = consumed + } + jobs.forEach { it.cancel() } + jobs.forEach { it.join() } + println("total: produced = ${totalProduced.value}; consumed = ${totalConsumed.value}") + } +} \ No newline at end of file From c35ce7e509ff284f4b33ed20a0026e5149ca17a9 Mon Sep 17 00:00:00 2001 From: Roman Elizarov Date: Tue, 3 Nov 2020 23:18:14 +0300 Subject: [PATCH 2/2] Version 1.4.1 --- CHANGES.md | 6 ++++++ README.md | 16 ++++++++-------- RELEASE.md | 16 ++++++++++------ gradle.properties | 2 +- kotlinx-coroutines-debug/README.md | 2 +- kotlinx-coroutines-test/README.md | 2 +- ui/coroutines-guide-ui.md | 2 +- .../animation-app/gradle.properties | 2 +- .../example-app/gradle.properties | 2 +- 9 files changed, 30 insertions(+), 20 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index bce941c70b..baee6c4340 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,11 @@ # Change log for kotlinx.coroutines +## Version 1.4.1 + +This is a patch release with an important fix to the `SharedFlow` implementation. + +* SharedFlow: Fix scenario with concurrent emitters and cancellation of subscriber (#2359, thanks to @vehovsky for the bug report). + ## Version 1.4.0 ### Improvements diff --git a/README.md b/README.md index 2bc45cf451..7bd8e5a74b 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ [![official JetBrains project](https://jb.gg/badges/official.svg)](https://confluence.jetbrains.com/display/ALL/JetBrains+on+GitHub) [![GitHub license](https://img.shields.io/badge/license-Apache%20License%202.0-blue.svg?style=flat)](https://www.apache.org/licenses/LICENSE-2.0) -[![Download](https://api.bintray.com/packages/kotlin/kotlinx/kotlinx.coroutines/images/download.svg?version=1.4.0) ](https://bintray.com/kotlin/kotlinx/kotlinx.coroutines/1.4.0) +[![Download](https://api.bintray.com/packages/kotlin/kotlinx/kotlinx.coroutines/images/download.svg?version=1.4.1) ](https://bintray.com/kotlin/kotlinx/kotlinx.coroutines/1.4.1) [![Kotlin](https://img.shields.io/badge/kotlin-1.4.0-blue.svg?logo=kotlin)](http://kotlinlang.org) [![Slack channel](https://img.shields.io/badge/chat-slack-green.svg?logo=slack)](https://kotlinlang.slack.com/messages/coroutines/) @@ -86,7 +86,7 @@ Add dependencies (you can also add other modules that you need): org.jetbrains.kotlinx kotlinx-coroutines-core - 1.4.0 + 1.4.1 ``` @@ -104,7 +104,7 @@ Add dependencies (you can also add other modules that you need): ```groovy dependencies { - implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.0' + implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.1' } ``` @@ -130,7 +130,7 @@ Add dependencies (you can also add other modules that you need): ```groovy dependencies { - implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.0") + implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.1") } ``` @@ -152,7 +152,7 @@ In common code that should get compiled for different platforms, you can add dep ```groovy commonMain { dependencies { - implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.0") + implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.1") } } ``` @@ -163,7 +163,7 @@ Add [`kotlinx-coroutines-android`](ui/kotlinx-coroutines-android) module as dependency when using `kotlinx.coroutines` on Android: ```groovy -implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:1.4.0' +implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:1.4.1' ``` This gives you access to Android [Dispatchers.Main] @@ -190,7 +190,7 @@ packagingOptions { ### JS [Kotlin/JS](https://kotlinlang.org/docs/reference/js-overview.html) version of `kotlinx.coroutines` is published as -[`kotlinx-coroutines-core-js`](https://search.maven.org/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core-js/1.4.0/jar) +[`kotlinx-coroutines-core-js`](https://search.maven.org/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core-js/1.4.1/jar) (follow the link to get the dependency declaration snippet). You can also use [`kotlinx-coroutines-core`](https://www.npmjs.com/package/kotlinx-coroutines-core) package via NPM. @@ -198,7 +198,7 @@ You can also use [`kotlinx-coroutines-core`](https://www.npmjs.com/package/kotli ### Native [Kotlin/Native](https://kotlinlang.org/docs/reference/native-overview.html) version of `kotlinx.coroutines` is published as -[`kotlinx-coroutines-core-native`](https://search.maven.org/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core-native/1.4.0/jar) +[`kotlinx-coroutines-core-native`](https://search.maven.org/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core-native/1.4.1/jar) (follow the link to get the dependency declaration snippet). Only single-threaded code (JS-style) on Kotlin/Native is currently supported. diff --git a/RELEASE.md b/RELEASE.md index 22cb61c42f..b2a08b6757 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -12,11 +12,15 @@ To release new `` of `kotlinx-coroutines`: `git merge origin/master` 4. Search & replace `` with `` across the project files. Should replace in: - * [`README.md`](README.md) (native, core, test, debug, modules) - * [`coroutines-guide.md`](docs/coroutines-guide.md) - * [`gradle.properties`](gradle.properties) - * [`ui/kotlinx-coroutines-android/example-app/gradle.properties`](ui/kotlinx-coroutines-android/example-app/gradle.properties) - * [`ui/kotlinx-coroutines-android/animation-app/gradle.properties`](ui/kotlinx-coroutines-android/animation-app/gradle.properties) + * Docs + * [`README.md`](README.md) (native, core, test, debug, modules) + * [`kotlinx-coroutines-debug/README.md`](kotlinx-coroutines-debug/README.md) + * [`kotlinx-coroutines-test/README.md`](kotlinx-coroutines-test/README.md) + * [`coroutines-guide-ui.md`](ui/coroutines-guide-ui.md) + * Properties + * [`gradle.properties`](gradle.properties) + * [`ui/kotlinx-coroutines-android/example-app/gradle.properties`](ui/kotlinx-coroutines-android/example-app/gradle.properties) + * [`ui/kotlinx-coroutines-android/animation-app/gradle.properties`](ui/kotlinx-coroutines-android/animation-app/gradle.properties) * Make sure to **exclude** `CHANGES.md` from replacements. As an alternative approach you can use `./bump-version.sh old_version new_version` @@ -26,7 +30,7 @@ To release new `` of `kotlinx-coroutines`: * Write each change on a single line (don't wrap with CR). * Study commit message from previous release. -6. Create branch for this release: +6. Create the branch for this release: `git checkout -b version-` 7. Commit updated files to a new version branch:
diff --git a/gradle.properties b/gradle.properties index 75c07d3833..1ffa02d1ae 100644 --- a/gradle.properties +++ b/gradle.properties @@ -3,7 +3,7 @@ # # Kotlin -version=1.4.0-SNAPSHOT +version=1.4.1-SNAPSHOT group=org.jetbrains.kotlinx kotlin_version=1.4.0 diff --git a/kotlinx-coroutines-debug/README.md b/kotlinx-coroutines-debug/README.md index 0c01400f1f..5525f9129f 100644 --- a/kotlinx-coroutines-debug/README.md +++ b/kotlinx-coroutines-debug/README.md @@ -61,7 +61,7 @@ stacktraces will be dumped to the console. ### Using as JVM agent Debug module can also be used as a standalone JVM agent to enable debug probes on the application startup. -You can run your application with an additional argument: `-javaagent:kotlinx-coroutines-debug-1.4.0.jar`. +You can run your application with an additional argument: `-javaagent:kotlinx-coroutines-debug-1.4.1.jar`. Additionally, on Linux and Mac OS X you can use `kill -5 $pid` command in order to force your application to print all alive coroutines. When used as Java agent, `"kotlinx.coroutines.debug.enable.creation.stack.trace"` system property can be used to control [DebugProbes.enableCreationStackTraces] along with agent startup. diff --git a/kotlinx-coroutines-test/README.md b/kotlinx-coroutines-test/README.md index 0b1c23938e..afcd4a3b3b 100644 --- a/kotlinx-coroutines-test/README.md +++ b/kotlinx-coroutines-test/README.md @@ -9,7 +9,7 @@ This package provides testing utilities for effectively testing coroutines. Add `kotlinx-coroutines-test` to your project test dependencies: ``` dependencies { - testImplementation 'org.jetbrains.kotlinx:kotlinx-coroutines-test:1.4.0' + testImplementation 'org.jetbrains.kotlinx:kotlinx-coroutines-test:1.4.1' } ``` diff --git a/ui/coroutines-guide-ui.md b/ui/coroutines-guide-ui.md index 297b1fbc98..9c1251fe21 100644 --- a/ui/coroutines-guide-ui.md +++ b/ui/coroutines-guide-ui.md @@ -110,7 +110,7 @@ Add dependencies on `kotlinx-coroutines-android` module to the `dependencies { . `app/build.gradle` file: ```groovy -implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.4.0" +implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.4.1" ``` You can clone [kotlinx.coroutines](https://github.com/Kotlin/kotlinx.coroutines) project from GitHub onto your diff --git a/ui/kotlinx-coroutines-android/animation-app/gradle.properties b/ui/kotlinx-coroutines-android/animation-app/gradle.properties index 5ee7794f17..c4aa67585e 100644 --- a/ui/kotlinx-coroutines-android/animation-app/gradle.properties +++ b/ui/kotlinx-coroutines-android/animation-app/gradle.properties @@ -21,7 +21,7 @@ org.gradle.jvmargs=-Xmx1536m # org.gradle.parallel=true kotlin_version=1.4.0 -coroutines_version=1.4.0 +coroutines_version=1.4.1 android.useAndroidX=true android.enableJetifier=true diff --git a/ui/kotlinx-coroutines-android/example-app/gradle.properties b/ui/kotlinx-coroutines-android/example-app/gradle.properties index 5ee7794f17..c4aa67585e 100644 --- a/ui/kotlinx-coroutines-android/example-app/gradle.properties +++ b/ui/kotlinx-coroutines-android/example-app/gradle.properties @@ -21,7 +21,7 @@ org.gradle.jvmargs=-Xmx1536m # org.gradle.parallel=true kotlin_version=1.4.0 -coroutines_version=1.4.0 +coroutines_version=1.4.1 android.useAndroidX=true android.enableJetifier=true