Skip to content

Commit 19f4b73

Browse files
committed
Fix race in Flow.asPublisher
The race was leading to emitting more items via onNext than requested, the corresponding stress-test was added, too Fixes #2109
1 parent 3744f8e commit 19f4b73

File tree

3 files changed

+130
-13
lines changed

3 files changed

+130
-13
lines changed

reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt

+7-8
Original file line numberDiff line numberDiff line change
@@ -197,19 +197,18 @@ public class FlowSubscription<T>(
197197
*/
198198
private suspend fun consumeFlow() {
199199
flow.collect { value ->
200-
/*
201-
* Flow is scopeless, thus if it's not active, its subscription was cancelled.
202-
* No intermediate "child failed, but flow coroutine is not" states are allowed.
203-
*/
204-
coroutineContext.ensureActive()
205-
if (requested.value <= 0L) {
200+
// Emit the value
201+
subscriber.onNext(value)
202+
// Suspend if needed before requesting the next value
203+
if (requested.decrementAndGet() <= 0) {
206204
suspendCancellableCoroutine<Unit> {
207205
producer.value = it
208206
if (requested.value != 0L) it.resumeSafely()
209207
}
208+
} else {
209+
// check for cancellation if we don't suspend
210+
coroutineContext.ensureActive()
210211
}
211-
requested.decrementAndGet()
212-
subscriber.onNext(value)
213212
}
214213
}
215214

reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt

+4-5
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,10 @@ class FlowAsPublisherTest : TestBase() {
1616
fun testErrorOnCancellationIsReported() {
1717
expect(1)
1818
flow<Int> {
19-
emit(2)
2019
try {
21-
hang { expect(3) }
20+
emit(2)
2221
} finally {
22+
expect(3)
2323
throw TestException()
2424
}
2525
}.asPublisher().subscribe(object : Subscriber<Int> {
@@ -52,12 +52,11 @@ class FlowAsPublisherTest : TestBase() {
5252
expect(1)
5353
flow<Int> {
5454
emit(2)
55-
hang { expect(3) }
5655
}.asPublisher().subscribe(object : Subscriber<Int> {
5756
private lateinit var subscription: Subscription
5857

5958
override fun onComplete() {
60-
expect(4)
59+
expect(3)
6160
}
6261

6362
override fun onSubscribe(s: Subscription?) {
@@ -74,6 +73,6 @@ class FlowAsPublisherTest : TestBase() {
7473
expectUnreached()
7574
}
7675
})
77-
finish(5)
76+
finish(4)
7877
}
7978
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/*
2+
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.reactive
6+
7+
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.flow.*
9+
import kotlinx.coroutines.flow.Flow
10+
import org.junit.*
11+
import org.reactivestreams.*
12+
import java.util.concurrent.*
13+
import java.util.concurrent.atomic.*
14+
import kotlin.coroutines.*
15+
16+
@Suppress("ReactiveStreamsSubscriberImplementation")
17+
class PublisherRequestStressTest : TestBase() {
18+
private val testDurationSec = 3 * stressTestMultiplier
19+
20+
private val minDemand = 8L
21+
private val maxDemand = 10L
22+
private val nEmitThreads = 4
23+
24+
private val emitThreadNo = AtomicInteger()
25+
26+
private val emitPool = Executors.newFixedThreadPool(nEmitThreads) { r ->
27+
Thread(r, "PublisherRequestStressTest-emit-${emitThreadNo.incrementAndGet()}")
28+
}
29+
30+
private val reqPool = Executors.newSingleThreadExecutor { r ->
31+
Thread(r, "PublisherRequestStressTest-req")
32+
}
33+
34+
private val nextValue = AtomicLong(0)
35+
36+
@After
37+
fun tearDown() {
38+
emitPool.shutdown()
39+
reqPool.shutdown()
40+
emitPool.awaitTermination(10, TimeUnit.SECONDS)
41+
reqPool.awaitTermination(10, TimeUnit.SECONDS)
42+
}
43+
44+
private lateinit var subscription: Subscription
45+
46+
@Test
47+
fun testRequestStress() {
48+
val expectedValue = AtomicLong(0)
49+
val requestedTill = AtomicLong(0)
50+
val completionLatch = CountDownLatch(1)
51+
val callingOnNext = AtomicInteger()
52+
53+
val publisher = mtFlow().asPublisher()
54+
var error = false
55+
56+
publisher.subscribe(object : Subscriber<Long> {
57+
private var demand = 0L // only updated from reqPool
58+
59+
override fun onComplete() {
60+
completionLatch.countDown()
61+
}
62+
63+
override fun onSubscribe(sub: Subscription) {
64+
subscription = sub
65+
maybeRequestMore()
66+
}
67+
68+
private fun maybeRequestMore() {
69+
if (demand >= minDemand) return
70+
val more = maxDemand - demand
71+
demand = maxDemand
72+
requestedTill.addAndGet(more)
73+
subscription.request(more)
74+
}
75+
76+
override fun onNext(value: Long) {
77+
check(callingOnNext.getAndIncrement() == 0) // make sure it is not concurrent
78+
// check for expected value
79+
check(value == expectedValue.get())
80+
// check that it does not exceed requested values
81+
check(value < requestedTill.get())
82+
val nextExpected = value + 1
83+
expectedValue.set(nextExpected)
84+
// send more requests from request thread
85+
reqPool.execute {
86+
demand-- // processed an item
87+
maybeRequestMore()
88+
}
89+
callingOnNext.decrementAndGet()
90+
}
91+
92+
override fun onError(ex: Throwable?) {
93+
error = true
94+
error("Failed", ex)
95+
}
96+
})
97+
for (second in 1..testDurationSec) {
98+
if (error) break
99+
Thread.sleep(1000)
100+
println("$second: nextValue = ${nextValue.get()}, expectedValue = ${expectedValue.get()}")
101+
}
102+
if (!error) {
103+
subscription.cancel()
104+
completionLatch.await()
105+
}
106+
}
107+
108+
private fun mtFlow(): Flow<Long> = flow {
109+
while (currentCoroutineContext().isActive) {
110+
emit(aWait())
111+
}
112+
}
113+
114+
private suspend fun aWait(): Long = suspendCancellableCoroutine { cont ->
115+
emitPool.execute(Runnable {
116+
cont.resume(nextValue.getAndIncrement())
117+
})
118+
}
119+
}

0 commit comments

Comments
 (0)