From 1a2e6c8f48ebe38d3a2272299a49ffe4ad8961ea Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Fri, 26 Jul 2019 15:09:03 +0300 Subject: [PATCH 1/2] Allocate underlying buffer in ArrayChannel in on-demand manner Rationale: Such change will allow us to use huge buffers in various flow operators without having a serious footprint in suspension-free scenarios --- .../common/src/channels/ArrayChannel.kt | 21 +++++++++++++-- .../common/test/channels/ArrayChannelTest.kt | 27 +++++++++++++++++++ 2 files changed, 46 insertions(+), 2 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt b/kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt index 688125d946..4f6e442503 100644 --- a/kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt @@ -8,6 +8,7 @@ import kotlinx.coroutines.* import kotlinx.coroutines.internal.* import kotlinx.coroutines.selects.* import kotlin.jvm.* +import kotlin.math.* /** * Channel with array buffer of a fixed [capacity]. @@ -29,10 +30,14 @@ internal open class ArrayChannel( } private val lock = ReentrantLock() - private val buffer: Array = arrayOfNulls(capacity) + /* + * Guarded by lock. + * Allocate minimum of capacity and 16 to avoid excess memory pressure for large channels when it's not necessary. + */ + private var buffer: Array = arrayOfNulls(min(capacity, 16)) private var head: Int = 0 @Volatile - private var size: Int = 0 + private var size: Int = 0 // Invariant: size <= capacity protected final override val isBufferAlwaysEmpty: Boolean get() = false protected final override val isBufferEmpty: Boolean get() = size == 0 @@ -64,6 +69,7 @@ internal open class ArrayChannel( } } } + ensureCapacity(size) buffer[(head + size) % capacity] = element // actually queue element return OFFER_SUCCESS } @@ -112,6 +118,7 @@ internal open class ArrayChannel( this.size = size // restore size return ALREADY_SELECTED } + ensureCapacity(size) buffer[(head + size) % capacity] = element // actually queue element return OFFER_SUCCESS } @@ -123,6 +130,16 @@ internal open class ArrayChannel( return receive!!.offerResult } + // Guarded by lock + private fun ensureCapacity(currentSize: Int) { + if (currentSize == buffer.size) { + val newSize = min(buffer.size * 2, capacity) + val newBuffer = arrayOfNulls(newSize) + buffer.copyInto(newBuffer) + buffer = newBuffer + } + } + // result is `E | POLL_FAILED | Closed` protected override fun pollInternal(): Any? { var send: Send? = null diff --git a/kotlinx-coroutines-core/common/test/channels/ArrayChannelTest.kt b/kotlinx-coroutines-core/common/test/channels/ArrayChannelTest.kt index 2b948dfa25..a4fd1beac3 100644 --- a/kotlinx-coroutines-core/common/test/channels/ArrayChannelTest.kt +++ b/kotlinx-coroutines-core/common/test/channels/ArrayChannelTest.kt @@ -144,4 +144,31 @@ class ArrayChannelTest : TestBase() { channel.cancel(TestCancellationException()) channel.receiveOrNull() } + + @Test + fun testBufferSize() = runTest { + val capacity = 42 + val channel = Channel(capacity) + launch { + expect(2) + repeat(42) { + channel.send(it) + } + expect(3) + channel.send(42) + expect(5) + channel.close() + } + + expect(1) + yield() + + expect(4) + val result = ArrayList(42) + channel.consumeEach { + result.add(it) + } + assertEquals((0..capacity).toList(), result) + finish(6) + } } From f9cbf9c980de0879247a4ef844d3ffce59c88fa2 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Mon, 5 Aug 2019 17:58:48 +0300 Subject: [PATCH 2/2] Properly resize the underlying array --- .../common/src/channels/ArrayChannel.kt | 25 +++++++++++-------- .../common/test/channels/ArrayChannelTest.kt | 22 +++++++++++++++- .../test/channels/ArrayChannelStressTest.kt | 25 +++++++++++++++++-- 3 files changed, 58 insertions(+), 14 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt b/kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt index 4f6e442503..1e1c0d3ae4 100644 --- a/kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt @@ -34,7 +34,7 @@ internal open class ArrayChannel( * Guarded by lock. * Allocate minimum of capacity and 16 to avoid excess memory pressure for large channels when it's not necessary. */ - private var buffer: Array = arrayOfNulls(min(capacity, 16)) + private var buffer: Array = arrayOfNulls(min(capacity, 8)) private var head: Int = 0 @Volatile private var size: Int = 0 // Invariant: size <= capacity @@ -70,7 +70,7 @@ internal open class ArrayChannel( } } ensureCapacity(size) - buffer[(head + size) % capacity] = element // actually queue element + buffer[(head + size) % buffer.size] = element // actually queue element return OFFER_SUCCESS } // size == capacity: full @@ -119,7 +119,7 @@ internal open class ArrayChannel( return ALREADY_SELECTED } ensureCapacity(size) - buffer[(head + size) % capacity] = element // actually queue element + buffer[(head + size) % buffer.size] = element // actually queue element return OFFER_SUCCESS } // size == capacity: full @@ -132,11 +132,14 @@ internal open class ArrayChannel( // Guarded by lock private fun ensureCapacity(currentSize: Int) { - if (currentSize == buffer.size) { + if (currentSize >= buffer.size) { val newSize = min(buffer.size * 2, capacity) val newBuffer = arrayOfNulls(newSize) - buffer.copyInto(newBuffer) + for (i in 0 until currentSize) { + newBuffer[i] = buffer[(head + i) % buffer.size] + } buffer = newBuffer + head = 0 } } @@ -166,9 +169,9 @@ internal open class ArrayChannel( } if (replacement !== POLL_FAILED && replacement !is Closed<*>) { this.size = size // restore size - buffer[(head + size) % capacity] = replacement + buffer[(head + size) % buffer.size] = replacement } - head = (head + 1) % capacity + head = (head + 1) % buffer.size } // complete send the we're taken replacement from if (token != null) @@ -220,7 +223,7 @@ internal open class ArrayChannel( } if (replacement !== POLL_FAILED && replacement !is Closed<*>) { this.size = size // restore size - buffer[(head + size) % capacity] = replacement + buffer[(head + size) % buffer.size] = replacement } else { // failed to poll or is already closed --> let's try to select receiving this element from buffer if (!select.trySelect(null)) { // :todo: move trySelect completion outside of lock @@ -229,7 +232,7 @@ internal open class ArrayChannel( return ALREADY_SELECTED } } - head = (head + 1) % capacity + head = (head + 1) % buffer.size } // complete send the we're taken replacement from if (token != null) @@ -243,7 +246,7 @@ internal open class ArrayChannel( lock.withLock { repeat(size) { buffer[head] = 0 - head = (head + 1) % capacity + head = (head + 1) % buffer.size } size = 0 } @@ -254,5 +257,5 @@ internal open class ArrayChannel( // ------ debug ------ override val bufferDebugString: String - get() = "(buffer:capacity=${buffer.size},size=$size)" + get() = "(buffer:capacity=$capacity,size=$size)" } diff --git a/kotlinx-coroutines-core/common/test/channels/ArrayChannelTest.kt b/kotlinx-coroutines-core/common/test/channels/ArrayChannelTest.kt index a4fd1beac3..ceef21edcb 100644 --- a/kotlinx-coroutines-core/common/test/channels/ArrayChannelTest.kt +++ b/kotlinx-coroutines-core/common/test/channels/ArrayChannelTest.kt @@ -86,7 +86,7 @@ class ArrayChannelTest : TestBase() { } @Test - fun testOfferAndPool() = runTest { + fun testOfferAndPoll() = runTest { val q = Channel(1) assertTrue(q.offer(1)) expect(1) @@ -149,6 +149,26 @@ class ArrayChannelTest : TestBase() { fun testBufferSize() = runTest { val capacity = 42 val channel = Channel(capacity) + checkBufferChannel(channel, capacity) + } + + @Test + fun testBufferSizeFromTheMiddle() = runTest { + val capacity = 42 + val channel = Channel(capacity) + repeat(4) { + channel.offer(-1) + } + repeat(4) { + channel.receiveOrNull() + } + checkBufferChannel(channel, capacity) + } + + private suspend fun CoroutineScope.checkBufferChannel( + channel: Channel, + capacity: Int + ) { launch { expect(2) repeat(42) { diff --git a/kotlinx-coroutines-core/jvm/test/channels/ArrayChannelStressTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ArrayChannelStressTest.kt index ccb0e8749c..74dc24c7f6 100644 --- a/kotlinx-coroutines-core/jvm/test/channels/ArrayChannelStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/channels/ArrayChannelStressTest.kt @@ -22,13 +22,13 @@ class ArrayChannelStressTest(private val capacity: Int) : TestBase() { fun testStress() = runTest { val n = 100_000 * stressTestMultiplier val q = Channel(capacity) - val sender = launch(coroutineContext) { + val sender = launch { for (i in 1..n) { q.send(i) } expect(2) } - val receiver = launch(coroutineContext) { + val receiver = launch { for (i in 1..n) { val next = q.receive() check(next == i) @@ -40,4 +40,25 @@ class ArrayChannelStressTest(private val capacity: Int) : TestBase() { receiver.join() finish(4) } + + @Test + fun testBurst() = runTest { + Assume.assumeTrue(capacity < 100_000) + repeat(10_000 * stressTestMultiplier) { + val channel = Channel(capacity) + val sender = launch(Dispatchers.Default) { + for (i in 1..capacity * 2) { + channel.send(i) + } + } + val receiver = launch(Dispatchers.Default) { + for (i in 1..capacity * 2) { + val next = channel.receive() + check(next == i) + } + } + sender.join() + receiver.join() + } + } }