diff --git a/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt b/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt index 88dc775842..7167971429 100644 --- a/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt +++ b/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt @@ -326,7 +326,7 @@ private class SharedFlowImpl( var resumes: Array?> = EMPTY_RESUMES val emitted = synchronized(this) { if (tryEmitLocked(value)) { - resumes = findSlotsToResumeLocked() + resumes = findSlotsToResumeLocked(resumes) true } else { false @@ -422,7 +422,7 @@ private class SharedFlowImpl( // recheck buffer under lock again (make sure it is really full) if (tryEmitLocked(value)) { cont.resume(Unit) - resumes = findSlotsToResumeLocked() + resumes = findSlotsToResumeLocked(resumes) return@lock null } // add suspended emitter to the buffer @@ -430,7 +430,7 @@ private class SharedFlowImpl( enqueueLocked(it) queueSize++ // added to queue of waiting emitters // synchronous shared flow might rendezvous with waiting emitter - if (bufferCapacity == 0) resumes = findSlotsToResumeLocked() + if (bufferCapacity == 0) resumes = findSlotsToResumeLocked(resumes) } } // outside of the lock: register dispose on cancellation @@ -512,6 +512,8 @@ private class SharedFlowImpl( updateBufferLocked(newReplayIndex, newMinCollectorIndex, newBufferEndIndex, newQueueEndIndex) // just in case we've moved all buffered emitters and have NO_VALUE's at the tail now cleanupTailLocked() + // We need to waken up suspended collectors if any emitters were resumed here + if (resumes.isNotEmpty()) resumes = findSlotsToResumeLocked(resumes) return resumes } @@ -598,9 +600,9 @@ private class SharedFlowImpl( } } - private fun findSlotsToResumeLocked(): Array?> { - var resumes: Array?> = EMPTY_RESUMES - var resumeCount = 0 + private fun findSlotsToResumeLocked(resumesIn: Array?>): Array?> { + var resumes: Array?> = resumesIn + var resumeCount = resumesIn.size forEachSlotLocked loop@{ slot -> val cont = slot.cont ?: return@loop // only waiting slots if (tryPeekLocked(slot) < 0) return@loop // only slots that can peek a value diff --git a/kotlinx-coroutines-core/common/test/flow/sharing/SharedFlowScenarioTest.kt b/kotlinx-coroutines-core/common/test/flow/sharing/SharedFlowScenarioTest.kt index f716389fb7..c3eb2dac04 100644 --- a/kotlinx-coroutines-core/common/test/flow/sharing/SharedFlowScenarioTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/sharing/SharedFlowScenarioTest.kt @@ -176,6 +176,31 @@ class SharedFlowScenarioTest : TestBase() { collect(b, 15) } + @Test // https://github.com/Kotlin/kotlinx.coroutines/issues/2320 + fun testResumeFastSubscriberOnResumedEmitter() = + testSharedFlow(MutableSharedFlow(1)) { + // create two subscribers and start collecting + val s1 = subscribe("s1"); resumeCollecting(s1) + val s2 = subscribe("s2"); resumeCollecting(s2) + // now emit 0, make sure it is collected + emitRightNow(0); expectReplayOf(0) + awaitCollected(s1, 0) + awaitCollected(s2, 0) + // now emit 1, and only first subscriber continues and collects it + emitRightNow(1); expectReplayOf(1) + collect(s1, 1) + // now emit 2, it suspend (s2 is blocking it) + val e2 = emitSuspends(2) + resumeCollecting(s1) // resume, but does not collect (e2 is still queued) + collect(s2, 1) // resume + collect next --> resumes emitter, thus resumes s1 + awaitCollected(s1, 2) // <-- S1 collects value from the newly resumed emitter here !!! + emitResumes(e2); expectReplayOf(2) + // now emit 3, it suspends (s2 blocks it) + val e3 = emitSuspends(3) + collect(s2, 2) + emitResumes(e3); expectReplayOf(3) + } + private fun testSharedFlow( sharedFlow: MutableSharedFlow, scenario: suspend ScenarioDsl.() -> Unit @@ -305,14 +330,23 @@ class SharedFlowScenarioTest : TestBase() { return TestJob(job, name) } + // collect ~== resumeCollecting + awaitCollected (for each value) suspend fun collect(job: TestJob, vararg a: T) { for (value in a) { checkReplay() // should not have changed - addAction(ResumeCollecting(job)) - awaitAction(Collected(job, value)) + resumeCollecting(job) + awaitCollected(job, value) } } + suspend fun resumeCollecting(job: TestJob) { + addAction(ResumeCollecting(job)) + } + + suspend fun awaitCollected(job: TestJob, value: T) { + awaitAction(Collected(job, value)) + } + fun stop() { log("--- stop") scope.cancel()