Skip to content

Commit e710048

Browse files
authored
SharedFlow: Fix scenario with concurrent emitters and cancellation of subscriber (#2359)
* Added a specific test for a problematic scenario. * Added stress test with concurrent emitters and subscribers that come and go. Fixes #2356
1 parent 4ea4078 commit e710048

File tree

3 files changed

+135
-0
lines changed

3 files changed

+135
-0
lines changed

kotlinx-coroutines-core/common/src/flow/SharedFlow.kt

+6
Original file line numberDiff line numberDiff line change
@@ -498,6 +498,12 @@ private class SharedFlowImpl<T>(
498498
}
499499
// Compute new buffer size -> how many values we now actually have after resume
500500
val newBufferSize1 = (newBufferEndIndex - head).toInt()
501+
// Note: When nCollectors == 0 we resume ALL queued emitters and we might have resumed more than bufferCapacity,
502+
// and newMinCollectorIndex might pointing the wrong place because of that. The easiest way to fix it is by
503+
// forcing newMinCollectorIndex = newBufferEndIndex. We do not needed to update newBufferSize1 (which could be
504+
// too big), because the only use of newBufferSize1 in the below code is in the minOf(replay, newBufferSize1)
505+
// expression, which coerces values that are too big anyway.
506+
if (nCollectors == 0) newMinCollectorIndex = newBufferEndIndex
501507
// Compute new replay size -> limit to replay the number of items we need, take into account that it can only grow
502508
var newReplayIndex = maxOf(replayIndex, newBufferEndIndex - minOf(replay, newBufferSize1))
503509
// adjustment for synchronous case with cancelled emitter (NO_VALUE)

kotlinx-coroutines-core/common/test/flow/sharing/SharedFlowScenarioTest.kt

+42
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,48 @@ class SharedFlowScenarioTest : TestBase() {
201201
emitResumes(e3); expectReplayOf(3)
202202
}
203203

204+
@Test
205+
fun testSuspendedConcurrentEmitAndCancelSubscriberReplay1() =
206+
testSharedFlow<Int>(MutableSharedFlow(1)) {
207+
val a = subscribe("a");
208+
emitRightNow(0); expectReplayOf(0)
209+
collect(a, 0)
210+
emitRightNow(1); expectReplayOf(1)
211+
val e2 = emitSuspends(2) // suspends until 1 is collected
212+
val e3 = emitSuspends(3) // suspends until 1 is collected, too
213+
cancel(a) // must resume emitters 2 & 3
214+
emitResumes(e2)
215+
emitResumes(e3)
216+
expectReplayOf(3) // but replay size is 1 so only 3 should be kept
217+
// Note: originally, SharedFlow was in a broken state here with 3 elements in the buffer
218+
val b = subscribe("b")
219+
collect(b, 3)
220+
emitRightNow(4); expectReplayOf(4)
221+
collect(b, 4)
222+
}
223+
224+
@Test
225+
fun testSuspendedConcurrentEmitAndCancelSubscriberReplay1ExtraBuffer1() =
226+
testSharedFlow<Int>(MutableSharedFlow( replay = 1, extraBufferCapacity = 1)) {
227+
val a = subscribe("a");
228+
emitRightNow(0); expectReplayOf(0)
229+
collect(a, 0)
230+
emitRightNow(1); expectReplayOf(1)
231+
emitRightNow(2); expectReplayOf(2)
232+
val e3 = emitSuspends(3) // suspends until 1 is collected
233+
val e4 = emitSuspends(4) // suspends until 1 is collected, too
234+
val e5 = emitSuspends(5) // suspends until 1 is collected, too
235+
cancel(a) // must resume emitters 3, 4, 5
236+
emitResumes(e3)
237+
emitResumes(e4)
238+
emitResumes(e5)
239+
expectReplayOf(5)
240+
val b = subscribe("b")
241+
collect(b, 5)
242+
emitRightNow(6); expectReplayOf(6)
243+
collect(b, 6)
244+
}
245+
204246
private fun <T> testSharedFlow(
205247
sharedFlow: MutableSharedFlow<T>,
206248
scenario: suspend ScenarioDsl<T>.() -> Unit
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.flow
6+
7+
import kotlinx.atomicfu.*
8+
import kotlinx.coroutines.*
9+
import org.junit.*
10+
import org.junit.Test
11+
import kotlin.collections.ArrayList
12+
import kotlin.test.*
13+
import kotlin.time.*
14+
15+
@ExperimentalTime
16+
class SharedFlowStressTest : TestBase() {
17+
private val nProducers = 5
18+
private val nConsumers = 3
19+
private val nSeconds = 3 * stressTestMultiplier
20+
21+
private lateinit var sf: MutableSharedFlow<Long>
22+
private lateinit var view: SharedFlow<Long>
23+
24+
@get:Rule
25+
val producerDispatcher = ExecutorRule(nProducers)
26+
@get:Rule
27+
val consumerDispatcher = ExecutorRule(nConsumers)
28+
29+
private val totalProduced = atomic(0L)
30+
private val totalConsumed = atomic(0L)
31+
32+
@Test
33+
fun testStressReplay1() =
34+
testStress(1, 0)
35+
36+
@Test
37+
fun testStressReplay1ExtraBuffer1() =
38+
testStress(1, 1)
39+
40+
@Test
41+
fun testStressReplay2ExtraBuffer1() =
42+
testStress(2, 1)
43+
44+
private fun testStress(replay: Int, extraBufferCapacity: Int) = runTest {
45+
sf = MutableSharedFlow(replay, extraBufferCapacity)
46+
view = sf.asSharedFlow()
47+
val jobs = ArrayList<Job>()
48+
jobs += List(nProducers) { producerIndex ->
49+
launch(producerDispatcher) {
50+
var cur = producerIndex.toLong()
51+
while (isActive) {
52+
sf.emit(cur)
53+
totalProduced.incrementAndGet()
54+
cur += nProducers
55+
}
56+
}
57+
}
58+
jobs += List(nConsumers) { consumerIndex ->
59+
launch(consumerDispatcher) {
60+
while (isActive) {
61+
view
62+
.dropWhile { it % nConsumers != consumerIndex.toLong() }
63+
.take(1)
64+
.collect {
65+
check(it % nConsumers == consumerIndex.toLong())
66+
totalConsumed.incrementAndGet()
67+
}
68+
}
69+
}
70+
}
71+
var lastProduced = 0L
72+
var lastConsumed = 0L
73+
for (sec in 1..nSeconds) {
74+
delay(1.seconds)
75+
val produced = totalProduced.value
76+
val consumed = totalConsumed.value
77+
println("$sec sec: produced = $produced; consumed = $consumed")
78+
assertNotEquals(lastProduced, produced)
79+
assertNotEquals(lastConsumed, consumed)
80+
lastProduced = produced
81+
lastConsumed = consumed
82+
}
83+
jobs.forEach { it.cancel() }
84+
jobs.forEach { it.join() }
85+
println("total: produced = ${totalProduced.value}; consumed = ${totalConsumed.value}")
86+
}
87+
}

0 commit comments

Comments
 (0)