From 3b762e95e89c9ae21e1dd84d131cb88c5965a9b5 Mon Sep 17 00:00:00 2001 From: sellmair Date: Tue, 20 Aug 2019 19:49:48 +0200 Subject: [PATCH 1/3] Create failing `RendezvousChannelStressTest` --- .../channels/RendezvousChannelStressTest.kt | 30 +++++++++++++++++++ 1 file changed, 30 insertions(+) create mode 100644 kotlinx-coroutines-core/jvm/test/channels/RendezvousChannelStressTest.kt diff --git a/kotlinx-coroutines-core/jvm/test/channels/RendezvousChannelStressTest.kt b/kotlinx-coroutines-core/jvm/test/channels/RendezvousChannelStressTest.kt new file mode 100644 index 0000000000..f855395446 --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/channels/RendezvousChannelStressTest.kt @@ -0,0 +1,30 @@ +package kotlinx.coroutines.channels + +import kotlinx.coroutines.TestBase +import kotlinx.coroutines.flow.consumeAsFlow +import kotlinx.coroutines.flow.first +import org.junit.Test +import java.util.concurrent.atomic.AtomicBoolean +import kotlin.concurrent.thread + +class RendezvousChannelStressTest : TestBase() { + + @Test + fun testOfferByThreadStressTest() = runTest { + val channel = Channel(Channel.RENDEZVOUS) + val valueReceived = AtomicBoolean(false) + try { + thread { + var i = 0L + while (!valueReceived.get()) { + i++ + channel.offer(i) + } + } + + channel.consumeAsFlow().first { true } + } finally { + valueReceived.set(true) + } + } +} From 9cb4d7bfd13cffb6743a4aed5b7efd03787c4b6d Mon Sep 17 00:00:00 2001 From: sellmair Date: Tue, 20 Aug 2019 20:09:59 +0200 Subject: [PATCH 2/3] `RendezvousChannelStressTest`: Add test that consumes the channel directly --- .../channels/RendezvousChannelStressTest.kt | 23 ++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/kotlinx-coroutines-core/jvm/test/channels/RendezvousChannelStressTest.kt b/kotlinx-coroutines-core/jvm/test/channels/RendezvousChannelStressTest.kt index f855395446..33c363d7a4 100644 --- a/kotlinx-coroutines-core/jvm/test/channels/RendezvousChannelStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/channels/RendezvousChannelStressTest.kt @@ -10,7 +10,7 @@ import kotlin.concurrent.thread class RendezvousChannelStressTest : TestBase() { @Test - fun testOfferByThreadStressTest() = runTest { + fun testOfferByThread_consumeAsFlow_StressTest() = runTest { val channel = Channel(Channel.RENDEZVOUS) val valueReceived = AtomicBoolean(false) try { @@ -27,4 +27,25 @@ class RendezvousChannelStressTest : TestBase() { valueReceived.set(true) } } + + + @Test + fun testOfferByThread_consumeAsChannel_StressTest() = runTest { + val channel = Channel(Channel.RENDEZVOUS) + val valueReceived = AtomicBoolean(false) + try { + thread { + var i = 0L + while (!valueReceived.get()) { + i++ + channel.offer(i) + } + } + + @Suppress("DEPRECATION") + channel.first { true } + } finally { + valueReceived.set(true) + } + } } From cfab513576b2e7274df38a81fa7ba326f21e7f2b Mon Sep 17 00:00:00 2001 From: sellmair Date: Wed, 21 Aug 2019 08:26:55 +0200 Subject: [PATCH 3/3] `RendezvousChannelStressTest`: Let thread catch `ClosedSendChannelException` --- .../channels/RendezvousChannelStressTest.kt | 24 ++++++++++++------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/kotlinx-coroutines-core/jvm/test/channels/RendezvousChannelStressTest.kt b/kotlinx-coroutines-core/jvm/test/channels/RendezvousChannelStressTest.kt index 33c363d7a4..e83c58b23f 100644 --- a/kotlinx-coroutines-core/jvm/test/channels/RendezvousChannelStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/channels/RendezvousChannelStressTest.kt @@ -15,10 +15,14 @@ class RendezvousChannelStressTest : TestBase() { val valueReceived = AtomicBoolean(false) try { thread { - var i = 0L - while (!valueReceived.get()) { - i++ - channel.offer(i) + try { + var i = 0L + while (!valueReceived.get()) { + i++ + channel.offer(i) + } + } catch (e: ClosedSendChannelException) { + } } @@ -35,10 +39,14 @@ class RendezvousChannelStressTest : TestBase() { val valueReceived = AtomicBoolean(false) try { thread { - var i = 0L - while (!valueReceived.get()) { - i++ - channel.offer(i) + try { + var i = 0L + while (!valueReceived.get()) { + i++ + channel.offer(i) + } + } catch (e: ClosedSendChannelException){ + } }