Skip to content

Commit 53f007f

Browse files
authored
Fix SharedFlow with replay for subscribers working at different speed (#2325)
Problematic scenario: * Emitter suspends because there is a slow subscriber * Fast subscriber collects all the values and suspend * Slow subscriber resumes, collects value, causes emitter to be resume * Fast subscribers must be resumed in this case, too Fixes #2320
1 parent 45ba58e commit 53f007f

File tree

2 files changed

+44
-8
lines changed

2 files changed

+44
-8
lines changed

kotlinx-coroutines-core/common/src/flow/SharedFlow.kt

+8-6
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,7 @@ private class SharedFlowImpl<T>(
326326
var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
327327
val emitted = synchronized(this) {
328328
if (tryEmitLocked(value)) {
329-
resumes = findSlotsToResumeLocked()
329+
resumes = findSlotsToResumeLocked(resumes)
330330
true
331331
} else {
332332
false
@@ -422,15 +422,15 @@ private class SharedFlowImpl<T>(
422422
// recheck buffer under lock again (make sure it is really full)
423423
if (tryEmitLocked(value)) {
424424
cont.resume(Unit)
425-
resumes = findSlotsToResumeLocked()
425+
resumes = findSlotsToResumeLocked(resumes)
426426
return@lock null
427427
}
428428
// add suspended emitter to the buffer
429429
Emitter(this, head + totalSize, value, cont).also {
430430
enqueueLocked(it)
431431
queueSize++ // added to queue of waiting emitters
432432
// synchronous shared flow might rendezvous with waiting emitter
433-
if (bufferCapacity == 0) resumes = findSlotsToResumeLocked()
433+
if (bufferCapacity == 0) resumes = findSlotsToResumeLocked(resumes)
434434
}
435435
}
436436
// outside of the lock: register dispose on cancellation
@@ -512,6 +512,8 @@ private class SharedFlowImpl<T>(
512512
updateBufferLocked(newReplayIndex, newMinCollectorIndex, newBufferEndIndex, newQueueEndIndex)
513513
// just in case we've moved all buffered emitters and have NO_VALUE's at the tail now
514514
cleanupTailLocked()
515+
// We need to waken up suspended collectors if any emitters were resumed here
516+
if (resumes.isNotEmpty()) resumes = findSlotsToResumeLocked(resumes)
515517
return resumes
516518
}
517519

@@ -598,9 +600,9 @@ private class SharedFlowImpl<T>(
598600
}
599601
}
600602

601-
private fun findSlotsToResumeLocked(): Array<Continuation<Unit>?> {
602-
var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
603-
var resumeCount = 0
603+
private fun findSlotsToResumeLocked(resumesIn: Array<Continuation<Unit>?>): Array<Continuation<Unit>?> {
604+
var resumes: Array<Continuation<Unit>?> = resumesIn
605+
var resumeCount = resumesIn.size
604606
forEachSlotLocked loop@{ slot ->
605607
val cont = slot.cont ?: return@loop // only waiting slots
606608
if (tryPeekLocked(slot) < 0) return@loop // only slots that can peek a value

kotlinx-coroutines-core/common/test/flow/sharing/SharedFlowScenarioTest.kt

+36-2
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,31 @@ class SharedFlowScenarioTest : TestBase() {
176176
collect(b, 15)
177177
}
178178

179+
@Test // https://github.com/Kotlin/kotlinx.coroutines/issues/2320
180+
fun testResumeFastSubscriberOnResumedEmitter() =
181+
testSharedFlow<Int>(MutableSharedFlow(1)) {
182+
// create two subscribers and start collecting
183+
val s1 = subscribe("s1"); resumeCollecting(s1)
184+
val s2 = subscribe("s2"); resumeCollecting(s2)
185+
// now emit 0, make sure it is collected
186+
emitRightNow(0); expectReplayOf(0)
187+
awaitCollected(s1, 0)
188+
awaitCollected(s2, 0)
189+
// now emit 1, and only first subscriber continues and collects it
190+
emitRightNow(1); expectReplayOf(1)
191+
collect(s1, 1)
192+
// now emit 2, it suspend (s2 is blocking it)
193+
val e2 = emitSuspends(2)
194+
resumeCollecting(s1) // resume, but does not collect (e2 is still queued)
195+
collect(s2, 1) // resume + collect next --> resumes emitter, thus resumes s1
196+
awaitCollected(s1, 2) // <-- S1 collects value from the newly resumed emitter here !!!
197+
emitResumes(e2); expectReplayOf(2)
198+
// now emit 3, it suspends (s2 blocks it)
199+
val e3 = emitSuspends(3)
200+
collect(s2, 2)
201+
emitResumes(e3); expectReplayOf(3)
202+
}
203+
179204
private fun <T> testSharedFlow(
180205
sharedFlow: MutableSharedFlow<T>,
181206
scenario: suspend ScenarioDsl<T>.() -> Unit
@@ -305,14 +330,23 @@ class SharedFlowScenarioTest : TestBase() {
305330
return TestJob(job, name)
306331
}
307332

333+
// collect ~== resumeCollecting + awaitCollected (for each value)
308334
suspend fun collect(job: TestJob, vararg a: T) {
309335
for (value in a) {
310336
checkReplay() // should not have changed
311-
addAction(ResumeCollecting(job))
312-
awaitAction(Collected(job, value))
337+
resumeCollecting(job)
338+
awaitCollected(job, value)
313339
}
314340
}
315341

342+
suspend fun resumeCollecting(job: TestJob) {
343+
addAction(ResumeCollecting(job))
344+
}
345+
346+
suspend fun awaitCollected(job: TestJob, value: T) {
347+
awaitAction(Collected(job, value))
348+
}
349+
316350
fun stop() {
317351
log("--- stop")
318352
scope.cancel()

0 commit comments

Comments
 (0)