Skip to content

Commit 4ee5557

Browse files
committed
SharedFlow: Fix scenario with concurrent emitters and cancellation of subscriber
Fixes #2356
1 parent 4ea4078 commit 4ee5557

File tree

2 files changed

+27
-1
lines changed

2 files changed

+27
-1
lines changed

kotlinx-coroutines-core/common/src/flow/SharedFlow.kt

+7-1
Original file line numberDiff line numberDiff line change
@@ -497,7 +497,13 @@ private class SharedFlowImpl<T>(
497497
}
498498
}
499499
// Compute new buffer size -> how many values we now actually have after resume
500-
val newBufferSize1 = (newBufferEndIndex - head).toInt()
500+
var newBufferSize1 = (newBufferEndIndex - head).toInt()
501+
// Note: When nCollectors == 0 we resume all queued emitted and might have resumed more that max size of
502+
// the buffer, so here is why we take coerce the resulting size to the buffer capacity
503+
if (nCollectors == 0 && newBufferSize1 > bufferCapacity) {
504+
newMinCollectorIndex += newBufferSize1 - bufferCapacity // adjust minCollectorIndex, too, to skip items
505+
newBufferSize1 = bufferCapacity
506+
}
501507
// Compute new replay size -> limit to replay the number of items we need, take into account that it can only grow
502508
var newReplayIndex = maxOf(replayIndex, newBufferEndIndex - minOf(replay, newBufferSize1))
503509
// adjustment for synchronous case with cancelled emitter (NO_VALUE)

kotlinx-coroutines-core/common/test/flow/sharing/SharedFlowScenarioTest.kt

+20
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,26 @@ class SharedFlowScenarioTest : TestBase() {
201201
emitResumes(e3); expectReplayOf(3)
202202
}
203203

204+
@Test
205+
fun testSuspendedConcurrentEmitAndCancelSubscriber() =
206+
testSharedFlow<Int>(MutableSharedFlow(1)) {
207+
val a = subscribe("a");
208+
emitRightNow(0); expectReplayOf(0)
209+
collect(a, 0)
210+
emitRightNow(1); expectReplayOf(1)
211+
val e2 = emitSuspends(2) // suspends until 1 is collected
212+
val e3 = emitSuspends(3) // suspends until 1 is collected, too
213+
cancel(a) // must resume emitters 2 & 3
214+
emitResumes(e2)
215+
emitResumes(e3)
216+
expectReplayOf(3) // but replay size is 1 so only 3 should be kept
217+
// Note: originally, SharedFlow was in a broken state here with 3 elements in the buffer
218+
val b = subscribe("b")
219+
collect(b, 3)
220+
emitRightNow(4); expectReplayOf(4)
221+
collect(b, 4)
222+
}
223+
204224
private fun <T> testSharedFlow(
205225
sharedFlow: MutableSharedFlow<T>,
206226
scenario: suspend ScenarioDsl<T>.() -> Unit

0 commit comments

Comments
 (0)