diff --git a/CHANGES.md b/CHANGES.md
index bce941c70b..baee6c4340 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -1,5 +1,11 @@
# Change log for kotlinx.coroutines
+## Version 1.4.1
+
+This is a patch release with an important fix to the `SharedFlow` implementation.
+
+* SharedFlow: Fix scenario with concurrent emitters and cancellation of subscriber (#2359, thanks to @vehovsky for the bug report).
+
## Version 1.4.0
### Improvements
diff --git a/README.md b/README.md
index 2bc45cf451..7bd8e5a74b 100644
--- a/README.md
+++ b/README.md
@@ -2,7 +2,7 @@
[](https://confluence.jetbrains.com/display/ALL/JetBrains+on+GitHub)
[](https://www.apache.org/licenses/LICENSE-2.0)
-[ ](https://bintray.com/kotlin/kotlinx/kotlinx.coroutines/1.4.0)
+[ ](https://bintray.com/kotlin/kotlinx/kotlinx.coroutines/1.4.1)
[](http://kotlinlang.org)
[](https://kotlinlang.slack.com/messages/coroutines/)
@@ -86,7 +86,7 @@ Add dependencies (you can also add other modules that you need):
org.jetbrains.kotlinx
kotlinx-coroutines-core
- 1.4.0
+ 1.4.1
```
@@ -104,7 +104,7 @@ Add dependencies (you can also add other modules that you need):
```groovy
dependencies {
- implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.0'
+ implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.1'
}
```
@@ -130,7 +130,7 @@ Add dependencies (you can also add other modules that you need):
```groovy
dependencies {
- implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.0")
+ implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.1")
}
```
@@ -152,7 +152,7 @@ In common code that should get compiled for different platforms, you can add dep
```groovy
commonMain {
dependencies {
- implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.0")
+ implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.1")
}
}
```
@@ -163,7 +163,7 @@ Add [`kotlinx-coroutines-android`](ui/kotlinx-coroutines-android)
module as dependency when using `kotlinx.coroutines` on Android:
```groovy
-implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:1.4.0'
+implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:1.4.1'
```
This gives you access to Android [Dispatchers.Main]
@@ -190,7 +190,7 @@ packagingOptions {
### JS
[Kotlin/JS](https://kotlinlang.org/docs/reference/js-overview.html) version of `kotlinx.coroutines` is published as
-[`kotlinx-coroutines-core-js`](https://search.maven.org/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core-js/1.4.0/jar)
+[`kotlinx-coroutines-core-js`](https://search.maven.org/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core-js/1.4.1/jar)
(follow the link to get the dependency declaration snippet).
You can also use [`kotlinx-coroutines-core`](https://www.npmjs.com/package/kotlinx-coroutines-core) package via NPM.
@@ -198,7 +198,7 @@ You can also use [`kotlinx-coroutines-core`](https://www.npmjs.com/package/kotli
### Native
[Kotlin/Native](https://kotlinlang.org/docs/reference/native-overview.html) version of `kotlinx.coroutines` is published as
-[`kotlinx-coroutines-core-native`](https://search.maven.org/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core-native/1.4.0/jar)
+[`kotlinx-coroutines-core-native`](https://search.maven.org/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core-native/1.4.1/jar)
(follow the link to get the dependency declaration snippet).
Only single-threaded code (JS-style) on Kotlin/Native is currently supported.
diff --git a/RELEASE.md b/RELEASE.md
index 22cb61c42f..b2a08b6757 100644
--- a/RELEASE.md
+++ b/RELEASE.md
@@ -12,11 +12,15 @@ To release new `` of `kotlinx-coroutines`:
`git merge origin/master`
4. Search & replace `` with `` across the project files. Should replace in:
- * [`README.md`](README.md) (native, core, test, debug, modules)
- * [`coroutines-guide.md`](docs/coroutines-guide.md)
- * [`gradle.properties`](gradle.properties)
- * [`ui/kotlinx-coroutines-android/example-app/gradle.properties`](ui/kotlinx-coroutines-android/example-app/gradle.properties)
- * [`ui/kotlinx-coroutines-android/animation-app/gradle.properties`](ui/kotlinx-coroutines-android/animation-app/gradle.properties)
+ * Docs
+ * [`README.md`](README.md) (native, core, test, debug, modules)
+ * [`kotlinx-coroutines-debug/README.md`](kotlinx-coroutines-debug/README.md)
+ * [`kotlinx-coroutines-test/README.md`](kotlinx-coroutines-test/README.md)
+ * [`coroutines-guide-ui.md`](ui/coroutines-guide-ui.md)
+ * Properties
+ * [`gradle.properties`](gradle.properties)
+ * [`ui/kotlinx-coroutines-android/example-app/gradle.properties`](ui/kotlinx-coroutines-android/example-app/gradle.properties)
+ * [`ui/kotlinx-coroutines-android/animation-app/gradle.properties`](ui/kotlinx-coroutines-android/animation-app/gradle.properties)
* Make sure to **exclude** `CHANGES.md` from replacements.
As an alternative approach you can use `./bump-version.sh old_version new_version`
@@ -26,7 +30,7 @@ To release new `` of `kotlinx-coroutines`:
* Write each change on a single line (don't wrap with CR).
* Study commit message from previous release.
-6. Create branch for this release:
+6. Create the branch for this release:
`git checkout -b version-`
7. Commit updated files to a new version branch:
diff --git a/gradle.properties b/gradle.properties
index 75c07d3833..1ffa02d1ae 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -3,7 +3,7 @@
#
# Kotlin
-version=1.4.0-SNAPSHOT
+version=1.4.1-SNAPSHOT
group=org.jetbrains.kotlinx
kotlin_version=1.4.0
diff --git a/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt b/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt
index 427041a7bb..feb2749595 100644
--- a/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt
+++ b/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt
@@ -498,6 +498,12 @@ private class SharedFlowImpl(
}
// Compute new buffer size -> how many values we now actually have after resume
val newBufferSize1 = (newBufferEndIndex - head).toInt()
+ // Note: When nCollectors == 0 we resume ALL queued emitters and we might have resumed more than bufferCapacity,
+ // and newMinCollectorIndex might pointing the wrong place because of that. The easiest way to fix it is by
+ // forcing newMinCollectorIndex = newBufferEndIndex. We do not needed to update newBufferSize1 (which could be
+ // too big), because the only use of newBufferSize1 in the below code is in the minOf(replay, newBufferSize1)
+ // expression, which coerces values that are too big anyway.
+ if (nCollectors == 0) newMinCollectorIndex = newBufferEndIndex
// Compute new replay size -> limit to replay the number of items we need, take into account that it can only grow
var newReplayIndex = maxOf(replayIndex, newBufferEndIndex - minOf(replay, newBufferSize1))
// adjustment for synchronous case with cancelled emitter (NO_VALUE)
diff --git a/kotlinx-coroutines-core/common/test/flow/sharing/SharedFlowScenarioTest.kt b/kotlinx-coroutines-core/common/test/flow/sharing/SharedFlowScenarioTest.kt
index c3eb2dac04..794553b482 100644
--- a/kotlinx-coroutines-core/common/test/flow/sharing/SharedFlowScenarioTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/sharing/SharedFlowScenarioTest.kt
@@ -201,6 +201,48 @@ class SharedFlowScenarioTest : TestBase() {
emitResumes(e3); expectReplayOf(3)
}
+ @Test
+ fun testSuspendedConcurrentEmitAndCancelSubscriberReplay1() =
+ testSharedFlow(MutableSharedFlow(1)) {
+ val a = subscribe("a");
+ emitRightNow(0); expectReplayOf(0)
+ collect(a, 0)
+ emitRightNow(1); expectReplayOf(1)
+ val e2 = emitSuspends(2) // suspends until 1 is collected
+ val e3 = emitSuspends(3) // suspends until 1 is collected, too
+ cancel(a) // must resume emitters 2 & 3
+ emitResumes(e2)
+ emitResumes(e3)
+ expectReplayOf(3) // but replay size is 1 so only 3 should be kept
+ // Note: originally, SharedFlow was in a broken state here with 3 elements in the buffer
+ val b = subscribe("b")
+ collect(b, 3)
+ emitRightNow(4); expectReplayOf(4)
+ collect(b, 4)
+ }
+
+ @Test
+ fun testSuspendedConcurrentEmitAndCancelSubscriberReplay1ExtraBuffer1() =
+ testSharedFlow(MutableSharedFlow( replay = 1, extraBufferCapacity = 1)) {
+ val a = subscribe("a");
+ emitRightNow(0); expectReplayOf(0)
+ collect(a, 0)
+ emitRightNow(1); expectReplayOf(1)
+ emitRightNow(2); expectReplayOf(2)
+ val e3 = emitSuspends(3) // suspends until 1 is collected
+ val e4 = emitSuspends(4) // suspends until 1 is collected, too
+ val e5 = emitSuspends(5) // suspends until 1 is collected, too
+ cancel(a) // must resume emitters 3, 4, 5
+ emitResumes(e3)
+ emitResumes(e4)
+ emitResumes(e5)
+ expectReplayOf(5)
+ val b = subscribe("b")
+ collect(b, 5)
+ emitRightNow(6); expectReplayOf(6)
+ collect(b, 6)
+ }
+
private fun testSharedFlow(
sharedFlow: MutableSharedFlow,
scenario: suspend ScenarioDsl.() -> Unit
diff --git a/kotlinx-coroutines-core/jvm/test/flow/SharedFlowStressTest.kt b/kotlinx-coroutines-core/jvm/test/flow/SharedFlowStressTest.kt
new file mode 100644
index 0000000000..349b7c8121
--- /dev/null
+++ b/kotlinx-coroutines-core/jvm/test/flow/SharedFlowStressTest.kt
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.flow
+
+import kotlinx.atomicfu.*
+import kotlinx.coroutines.*
+import org.junit.*
+import org.junit.Test
+import kotlin.collections.ArrayList
+import kotlin.test.*
+import kotlin.time.*
+
+@ExperimentalTime
+class SharedFlowStressTest : TestBase() {
+ private val nProducers = 5
+ private val nConsumers = 3
+ private val nSeconds = 3 * stressTestMultiplier
+
+ private lateinit var sf: MutableSharedFlow
+ private lateinit var view: SharedFlow
+
+ @get:Rule
+ val producerDispatcher = ExecutorRule(nProducers)
+ @get:Rule
+ val consumerDispatcher = ExecutorRule(nConsumers)
+
+ private val totalProduced = atomic(0L)
+ private val totalConsumed = atomic(0L)
+
+ @Test
+ fun testStressReplay1() =
+ testStress(1, 0)
+
+ @Test
+ fun testStressReplay1ExtraBuffer1() =
+ testStress(1, 1)
+
+ @Test
+ fun testStressReplay2ExtraBuffer1() =
+ testStress(2, 1)
+
+ private fun testStress(replay: Int, extraBufferCapacity: Int) = runTest {
+ sf = MutableSharedFlow(replay, extraBufferCapacity)
+ view = sf.asSharedFlow()
+ val jobs = ArrayList()
+ jobs += List(nProducers) { producerIndex ->
+ launch(producerDispatcher) {
+ var cur = producerIndex.toLong()
+ while (isActive) {
+ sf.emit(cur)
+ totalProduced.incrementAndGet()
+ cur += nProducers
+ }
+ }
+ }
+ jobs += List(nConsumers) { consumerIndex ->
+ launch(consumerDispatcher) {
+ while (isActive) {
+ view
+ .dropWhile { it % nConsumers != consumerIndex.toLong() }
+ .take(1)
+ .collect {
+ check(it % nConsumers == consumerIndex.toLong())
+ totalConsumed.incrementAndGet()
+ }
+ }
+ }
+ }
+ var lastProduced = 0L
+ var lastConsumed = 0L
+ for (sec in 1..nSeconds) {
+ delay(1.seconds)
+ val produced = totalProduced.value
+ val consumed = totalConsumed.value
+ println("$sec sec: produced = $produced; consumed = $consumed")
+ assertNotEquals(lastProduced, produced)
+ assertNotEquals(lastConsumed, consumed)
+ lastProduced = produced
+ lastConsumed = consumed
+ }
+ jobs.forEach { it.cancel() }
+ jobs.forEach { it.join() }
+ println("total: produced = ${totalProduced.value}; consumed = ${totalConsumed.value}")
+ }
+}
\ No newline at end of file
diff --git a/kotlinx-coroutines-debug/README.md b/kotlinx-coroutines-debug/README.md
index 0c01400f1f..5525f9129f 100644
--- a/kotlinx-coroutines-debug/README.md
+++ b/kotlinx-coroutines-debug/README.md
@@ -61,7 +61,7 @@ stacktraces will be dumped to the console.
### Using as JVM agent
Debug module can also be used as a standalone JVM agent to enable debug probes on the application startup.
-You can run your application with an additional argument: `-javaagent:kotlinx-coroutines-debug-1.4.0.jar`.
+You can run your application with an additional argument: `-javaagent:kotlinx-coroutines-debug-1.4.1.jar`.
Additionally, on Linux and Mac OS X you can use `kill -5 $pid` command in order to force your application to print all alive coroutines.
When used as Java agent, `"kotlinx.coroutines.debug.enable.creation.stack.trace"` system property can be used to control
[DebugProbes.enableCreationStackTraces] along with agent startup.
diff --git a/kotlinx-coroutines-test/README.md b/kotlinx-coroutines-test/README.md
index 0b1c23938e..afcd4a3b3b 100644
--- a/kotlinx-coroutines-test/README.md
+++ b/kotlinx-coroutines-test/README.md
@@ -9,7 +9,7 @@ This package provides testing utilities for effectively testing coroutines.
Add `kotlinx-coroutines-test` to your project test dependencies:
```
dependencies {
- testImplementation 'org.jetbrains.kotlinx:kotlinx-coroutines-test:1.4.0'
+ testImplementation 'org.jetbrains.kotlinx:kotlinx-coroutines-test:1.4.1'
}
```
diff --git a/ui/coroutines-guide-ui.md b/ui/coroutines-guide-ui.md
index 297b1fbc98..9c1251fe21 100644
--- a/ui/coroutines-guide-ui.md
+++ b/ui/coroutines-guide-ui.md
@@ -110,7 +110,7 @@ Add dependencies on `kotlinx-coroutines-android` module to the `dependencies { .
`app/build.gradle` file:
```groovy
-implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.4.0"
+implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.4.1"
```
You can clone [kotlinx.coroutines](https://github.com/Kotlin/kotlinx.coroutines) project from GitHub onto your
diff --git a/ui/kotlinx-coroutines-android/animation-app/gradle.properties b/ui/kotlinx-coroutines-android/animation-app/gradle.properties
index 5ee7794f17..c4aa67585e 100644
--- a/ui/kotlinx-coroutines-android/animation-app/gradle.properties
+++ b/ui/kotlinx-coroutines-android/animation-app/gradle.properties
@@ -21,7 +21,7 @@ org.gradle.jvmargs=-Xmx1536m
# org.gradle.parallel=true
kotlin_version=1.4.0
-coroutines_version=1.4.0
+coroutines_version=1.4.1
android.useAndroidX=true
android.enableJetifier=true
diff --git a/ui/kotlinx-coroutines-android/example-app/gradle.properties b/ui/kotlinx-coroutines-android/example-app/gradle.properties
index 5ee7794f17..c4aa67585e 100644
--- a/ui/kotlinx-coroutines-android/example-app/gradle.properties
+++ b/ui/kotlinx-coroutines-android/example-app/gradle.properties
@@ -21,7 +21,7 @@ org.gradle.jvmargs=-Xmx1536m
# org.gradle.parallel=true
kotlin_version=1.4.0
-coroutines_version=1.4.0
+coroutines_version=1.4.1
android.useAndroidX=true
android.enableJetifier=true