From 19f4b7398d98684aec593bd6afd65b83794978fb Mon Sep 17 00:00:00 2001 From: Roman Elizarov Date: Tue, 7 Jul 2020 12:23:44 +0300 Subject: [PATCH 1/4] Fix race in Flow.asPublisher The race was leading to emitting more items via onNext than requested, the corresponding stress-test was added, too Fixes #2109 --- .../src/ReactiveFlow.kt | 15 ++- .../test/FlowAsPublisherTest.kt | 9 +- .../test/PublisherRequestStressTest.kt | 119 ++++++++++++++++++ 3 files changed, 130 insertions(+), 13 deletions(-) create mode 100644 reactive/kotlinx-coroutines-reactive/test/PublisherRequestStressTest.kt diff --git a/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt b/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt index 96ae6287c1..ea35d1a9e3 100644 --- a/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt +++ b/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt @@ -197,19 +197,18 @@ public class FlowSubscription( */ private suspend fun consumeFlow() { flow.collect { value -> - /* - * Flow is scopeless, thus if it's not active, its subscription was cancelled. - * No intermediate "child failed, but flow coroutine is not" states are allowed. - */ - coroutineContext.ensureActive() - if (requested.value <= 0L) { + // Emit the value + subscriber.onNext(value) + // Suspend if needed before requesting the next value + if (requested.decrementAndGet() <= 0) { suspendCancellableCoroutine { producer.value = it if (requested.value != 0L) it.resumeSafely() } + } else { + // check for cancellation if we don't suspend + coroutineContext.ensureActive() } - requested.decrementAndGet() - subscriber.onNext(value) } } diff --git a/reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt b/reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt index 8633492810..c044d92725 100644 --- a/reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt @@ -16,10 +16,10 @@ class FlowAsPublisherTest : TestBase() { fun testErrorOnCancellationIsReported() { expect(1) flow { - emit(2) try { - hang { expect(3) } + emit(2) } finally { + expect(3) throw TestException() } }.asPublisher().subscribe(object : Subscriber { @@ -52,12 +52,11 @@ class FlowAsPublisherTest : TestBase() { expect(1) flow { emit(2) - hang { expect(3) } }.asPublisher().subscribe(object : Subscriber { private lateinit var subscription: Subscription override fun onComplete() { - expect(4) + expect(3) } override fun onSubscribe(s: Subscription?) { @@ -74,6 +73,6 @@ class FlowAsPublisherTest : TestBase() { expectUnreached() } }) - finish(5) + finish(4) } } diff --git a/reactive/kotlinx-coroutines-reactive/test/PublisherRequestStressTest.kt b/reactive/kotlinx-coroutines-reactive/test/PublisherRequestStressTest.kt new file mode 100644 index 0000000000..981013c17c --- /dev/null +++ b/reactive/kotlinx-coroutines-reactive/test/PublisherRequestStressTest.kt @@ -0,0 +1,119 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.reactive + +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* +import kotlinx.coroutines.flow.Flow +import org.junit.* +import org.reactivestreams.* +import java.util.concurrent.* +import java.util.concurrent.atomic.* +import kotlin.coroutines.* + +@Suppress("ReactiveStreamsSubscriberImplementation") +class PublisherRequestStressTest : TestBase() { + private val testDurationSec = 3 * stressTestMultiplier + + private val minDemand = 8L + private val maxDemand = 10L + private val nEmitThreads = 4 + + private val emitThreadNo = AtomicInteger() + + private val emitPool = Executors.newFixedThreadPool(nEmitThreads) { r -> + Thread(r, "PublisherRequestStressTest-emit-${emitThreadNo.incrementAndGet()}") + } + + private val reqPool = Executors.newSingleThreadExecutor { r -> + Thread(r, "PublisherRequestStressTest-req") + } + + private val nextValue = AtomicLong(0) + + @After + fun tearDown() { + emitPool.shutdown() + reqPool.shutdown() + emitPool.awaitTermination(10, TimeUnit.SECONDS) + reqPool.awaitTermination(10, TimeUnit.SECONDS) + } + + private lateinit var subscription: Subscription + + @Test + fun testRequestStress() { + val expectedValue = AtomicLong(0) + val requestedTill = AtomicLong(0) + val completionLatch = CountDownLatch(1) + val callingOnNext = AtomicInteger() + + val publisher = mtFlow().asPublisher() + var error = false + + publisher.subscribe(object : Subscriber { + private var demand = 0L // only updated from reqPool + + override fun onComplete() { + completionLatch.countDown() + } + + override fun onSubscribe(sub: Subscription) { + subscription = sub + maybeRequestMore() + } + + private fun maybeRequestMore() { + if (demand >= minDemand) return + val more = maxDemand - demand + demand = maxDemand + requestedTill.addAndGet(more) + subscription.request(more) + } + + override fun onNext(value: Long) { + check(callingOnNext.getAndIncrement() == 0) // make sure it is not concurrent + // check for expected value + check(value == expectedValue.get()) + // check that it does not exceed requested values + check(value < requestedTill.get()) + val nextExpected = value + 1 + expectedValue.set(nextExpected) + // send more requests from request thread + reqPool.execute { + demand-- // processed an item + maybeRequestMore() + } + callingOnNext.decrementAndGet() + } + + override fun onError(ex: Throwable?) { + error = true + error("Failed", ex) + } + }) + for (second in 1..testDurationSec) { + if (error) break + Thread.sleep(1000) + println("$second: nextValue = ${nextValue.get()}, expectedValue = ${expectedValue.get()}") + } + if (!error) { + subscription.cancel() + completionLatch.await() + } + } + + private fun mtFlow(): Flow = flow { + while (currentCoroutineContext().isActive) { + emit(aWait()) + } + } + + private suspend fun aWait(): Long = suspendCancellableCoroutine { cont -> + emitPool.execute(Runnable { + cont.resume(nextValue.getAndIncrement()) + }) + } +} \ No newline at end of file From 9d9b3fff0f2cfc7f5c91511ac96dc0df1bca7df1 Mon Sep 17 00:00:00 2001 From: Roman Elizarov Date: Tue, 7 Jul 2020 13:39:03 +0300 Subject: [PATCH 2/4] ~ Also fix test in JDK9 integration module --- .../kotlinx-coroutines-jdk9/test/FlowAsPublisherTest.kt | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/reactive/kotlinx-coroutines-jdk9/test/FlowAsPublisherTest.kt b/reactive/kotlinx-coroutines-jdk9/test/FlowAsPublisherTest.kt index 8017ee5b4f..488695dea2 100644 --- a/reactive/kotlinx-coroutines-jdk9/test/FlowAsPublisherTest.kt +++ b/reactive/kotlinx-coroutines-jdk9/test/FlowAsPublisherTest.kt @@ -16,10 +16,10 @@ class FlowAsPublisherTest : TestBase() { fun testErrorOnCancellationIsReported() { expect(1) flow { - emit(2) try { - hang { expect(3) } + emit(2) } finally { + expect(3) throw TestException() } }.asPublisher().subscribe(object : JFlow.Subscriber { @@ -52,12 +52,11 @@ class FlowAsPublisherTest : TestBase() { expect(1) flow { emit(2) - hang { expect(3) } }.asPublisher().subscribe(object : JFlow.Subscriber { private lateinit var subscription: JFlow.Subscription override fun onComplete() { - expect(4) + expect(3) } override fun onSubscribe(s: JFlow.Subscription?) { @@ -74,6 +73,6 @@ class FlowAsPublisherTest : TestBase() { expectUnreached() } }) - finish(5) + finish(4) } } From 87c80b895f08ccc5660942f9b9fc7fae7e55e3ed Mon Sep 17 00:00:00 2001 From: Roman Elizarov Date: Tue, 7 Jul 2020 13:50:50 +0300 Subject: [PATCH 3/4] ~ Added explanation to stress-test --- .../test/PublisherRequestStressTest.kt | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/reactive/kotlinx-coroutines-reactive/test/PublisherRequestStressTest.kt b/reactive/kotlinx-coroutines-reactive/test/PublisherRequestStressTest.kt index 981013c17c..ce2d270036 100644 --- a/reactive/kotlinx-coroutines-reactive/test/PublisherRequestStressTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/PublisherRequestStressTest.kt @@ -13,12 +13,28 @@ import java.util.concurrent.* import java.util.concurrent.atomic.* import kotlin.coroutines.* +/** + * This stress-test is self-contained reproducer for the race in [Flow.asPublisher] extension + * that was originally reported in the issue + * [#2109](https://github.com/Kotlin/kotlinx.coroutines/issues/2109). + * The original reproducer used a flow that loads a file using AsynchronousFileChannel + * (that issues completion callbacks from multiple threads) + * and uploads it to S3 via Amazon SDK, which internally uses netty for I/O + * (which uses a single thread for connection-related callbacks). + * + * This stress-test essentially mimics the logic in multiple interacting threads: several emitter threads that form + * the flow and a single requesting thread works on the subscriber's side to periodically request more + * values when the number of items requested drops below the threshold. + */ @Suppress("ReactiveStreamsSubscriberImplementation") class PublisherRequestStressTest : TestBase() { private val testDurationSec = 3 * stressTestMultiplier + // Original code in Amazon SDK uses 4 and 16 as low/high watermarks. + // There constants were chosen so that problem reproduces asap with particular this code. private val minDemand = 8L private val maxDemand = 10L + private val nEmitThreads = 4 private val emitThreadNo = AtomicInteger() From 00486564210c7be06958ff72d06eda9e03fbf0e7 Mon Sep 17 00:00:00 2001 From: Roman Elizarov Date: Tue, 7 Jul 2020 15:00:21 +0300 Subject: [PATCH 4/4] ~ It was still racy, this fixes it --- .../src/ReactiveFlow.kt | 31 +++++++++---------- .../test/PublisherRequestStressTest.kt | 14 ++++++--- 2 files changed, 24 insertions(+), 21 deletions(-) diff --git a/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt b/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt index ea35d1a9e3..efa9c9c9f1 100644 --- a/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt +++ b/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt @@ -166,11 +166,12 @@ private class FlowAsPublisher(private val flow: Flow) : Publisher public class FlowSubscription( @JvmField public val flow: Flow, @JvmField public val subscriber: Subscriber -) : Subscription, AbstractCoroutine(Dispatchers.Unconfined, false) { +) : Subscription, AbstractCoroutine(Dispatchers.Unconfined, true) { private val requested = atomic(0L) - private val producer = atomic?>(null) + private val producer = atomic?>(createInitialContinuation()) - override fun onStart() { + // This code wraps startCoroutineCancellable into continuation + private fun createInitialContinuation(): Continuation = Continuation(coroutineContext) { ::flowProcessing.startCoroutineCancellable(this) } @@ -203,7 +204,6 @@ public class FlowSubscription( if (requested.decrementAndGet() <= 0) { suspendCancellableCoroutine { producer.value = it - if (requested.value != 0L) it.resumeSafely() } } else { // check for cancellation if we don't suspend @@ -217,22 +217,19 @@ public class FlowSubscription( } override fun request(n: Long) { - if (n <= 0) { - return - } - start() - requested.update { value -> + if (n <= 0) return + val old = requested.getAndUpdate { value -> val newValue = value + n if (newValue <= 0L) Long.MAX_VALUE else newValue } - val producer = producer.getAndSet(null) ?: return - producer.resumeSafely() - } - - private fun CancellableContinuation.resumeSafely() { - val token = tryResume(Unit) - if (token != null) { - completeResume(token) + if (old <= 0L) { + assert(old == 0L) + // Emitter is not started yet or has suspended -- spin on race with suspendCancellableCoroutine + while(true) { + val producer = producer.getAndSet(null) ?: continue // spin if not set yet + producer.resume(Unit) + break + } } } } diff --git a/reactive/kotlinx-coroutines-reactive/test/PublisherRequestStressTest.kt b/reactive/kotlinx-coroutines-reactive/test/PublisherRequestStressTest.kt index ce2d270036..736a66404f 100644 --- a/reactive/kotlinx-coroutines-reactive/test/PublisherRequestStressTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/PublisherRequestStressTest.kt @@ -12,6 +12,7 @@ import org.reactivestreams.* import java.util.concurrent.* import java.util.concurrent.atomic.* import kotlin.coroutines.* +import kotlin.random.* /** * This stress-test is self-contained reproducer for the race in [Flow.asPublisher] extension @@ -33,7 +34,7 @@ class PublisherRequestStressTest : TestBase() { // Original code in Amazon SDK uses 4 and 16 as low/high watermarks. // There constants were chosen so that problem reproduces asap with particular this code. private val minDemand = 8L - private val maxDemand = 10L + private val maxDemand = 16L private val nEmitThreads = 4 @@ -83,8 +84,9 @@ class PublisherRequestStressTest : TestBase() { private fun maybeRequestMore() { if (demand >= minDemand) return - val more = maxDemand - demand - demand = maxDemand + val nextDemand = Random.nextLong(minDemand + 1..maxDemand) + val more = nextDemand - demand + demand = nextDemand requestedTill.addAndGet(more) subscription.request(more) } @@ -110,10 +112,14 @@ class PublisherRequestStressTest : TestBase() { error("Failed", ex) } }) + var prevExpected = -1L for (second in 1..testDurationSec) { if (error) break Thread.sleep(1000) - println("$second: nextValue = ${nextValue.get()}, expectedValue = ${expectedValue.get()}") + val expected = expectedValue.get() + println("$second: expectedValue = $expected") + check(expected > prevExpected) // should have progress + prevExpected = expected } if (!error) { subscription.cancel()