Skip to content

Commit c7e9b56

Browse files
authored
Allocate underlying buffer in ArrayChannel in on-demand manner (#1388)
* Allocate underlying buffer in ArrayChannel in on-demand manner Rationale: Such change will allow us to use huge buffers in various flow operators without having a serious footprint in suspension-free scenarios
1 parent 4e47af4 commit c7e9b56

File tree

3 files changed

+101
-13
lines changed

3 files changed

+101
-13
lines changed

kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt

+30-10
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import kotlinx.coroutines.*
88
import kotlinx.coroutines.internal.*
99
import kotlinx.coroutines.selects.*
1010
import kotlin.jvm.*
11+
import kotlin.math.*
1112

1213
/**
1314
* Channel with array buffer of a fixed [capacity].
@@ -29,10 +30,14 @@ internal open class ArrayChannel<E>(
2930
}
3031

3132
private val lock = ReentrantLock()
32-
private val buffer: Array<Any?> = arrayOfNulls<Any?>(capacity)
33+
/*
34+
* Guarded by lock.
35+
* Allocate minimum of capacity and 16 to avoid excess memory pressure for large channels when it's not necessary.
36+
*/
37+
private var buffer: Array<Any?> = arrayOfNulls<Any?>(min(capacity, 8))
3338
private var head: Int = 0
3439
@Volatile
35-
private var size: Int = 0
40+
private var size: Int = 0 // Invariant: size <= capacity
3641

3742
protected final override val isBufferAlwaysEmpty: Boolean get() = false
3843
protected final override val isBufferEmpty: Boolean get() = size == 0
@@ -64,7 +69,8 @@ internal open class ArrayChannel<E>(
6469
}
6570
}
6671
}
67-
buffer[(head + size) % capacity] = element // actually queue element
72+
ensureCapacity(size)
73+
buffer[(head + size) % buffer.size] = element // actually queue element
6874
return OFFER_SUCCESS
6975
}
7076
// size == capacity: full
@@ -112,7 +118,8 @@ internal open class ArrayChannel<E>(
112118
this.size = size // restore size
113119
return ALREADY_SELECTED
114120
}
115-
buffer[(head + size) % capacity] = element // actually queue element
121+
ensureCapacity(size)
122+
buffer[(head + size) % buffer.size] = element // actually queue element
116123
return OFFER_SUCCESS
117124
}
118125
// size == capacity: full
@@ -123,6 +130,19 @@ internal open class ArrayChannel<E>(
123130
return receive!!.offerResult
124131
}
125132

133+
// Guarded by lock
134+
private fun ensureCapacity(currentSize: Int) {
135+
if (currentSize >= buffer.size) {
136+
val newSize = min(buffer.size * 2, capacity)
137+
val newBuffer = arrayOfNulls<Any?>(newSize)
138+
for (i in 0 until currentSize) {
139+
newBuffer[i] = buffer[(head + i) % buffer.size]
140+
}
141+
buffer = newBuffer
142+
head = 0
143+
}
144+
}
145+
126146
// result is `E | POLL_FAILED | Closed`
127147
protected override fun pollInternal(): Any? {
128148
var send: Send? = null
@@ -149,9 +169,9 @@ internal open class ArrayChannel<E>(
149169
}
150170
if (replacement !== POLL_FAILED && replacement !is Closed<*>) {
151171
this.size = size // restore size
152-
buffer[(head + size) % capacity] = replacement
172+
buffer[(head + size) % buffer.size] = replacement
153173
}
154-
head = (head + 1) % capacity
174+
head = (head + 1) % buffer.size
155175
}
156176
// complete send the we're taken replacement from
157177
if (token != null)
@@ -203,7 +223,7 @@ internal open class ArrayChannel<E>(
203223
}
204224
if (replacement !== POLL_FAILED && replacement !is Closed<*>) {
205225
this.size = size // restore size
206-
buffer[(head + size) % capacity] = replacement
226+
buffer[(head + size) % buffer.size] = replacement
207227
} else {
208228
// failed to poll or is already closed --> let's try to select receiving this element from buffer
209229
if (!select.trySelect(null)) { // :todo: move trySelect completion outside of lock
@@ -212,7 +232,7 @@ internal open class ArrayChannel<E>(
212232
return ALREADY_SELECTED
213233
}
214234
}
215-
head = (head + 1) % capacity
235+
head = (head + 1) % buffer.size
216236
}
217237
// complete send the we're taken replacement from
218238
if (token != null)
@@ -226,7 +246,7 @@ internal open class ArrayChannel<E>(
226246
lock.withLock {
227247
repeat(size) {
228248
buffer[head] = 0
229-
head = (head + 1) % capacity
249+
head = (head + 1) % buffer.size
230250
}
231251
size = 0
232252
}
@@ -237,5 +257,5 @@ internal open class ArrayChannel<E>(
237257
// ------ debug ------
238258

239259
override val bufferDebugString: String
240-
get() = "(buffer:capacity=${buffer.size},size=$size)"
260+
get() = "(buffer:capacity=$capacity,size=$size)"
241261
}

kotlinx-coroutines-core/common/test/channels/ArrayChannelTest.kt

+48-1
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ class ArrayChannelTest : TestBase() {
8686
}
8787

8888
@Test
89-
fun testOfferAndPool() = runTest {
89+
fun testOfferAndPoll() = runTest {
9090
val q = Channel<Int>(1)
9191
assertTrue(q.offer(1))
9292
expect(1)
@@ -144,4 +144,51 @@ class ArrayChannelTest : TestBase() {
144144
channel.cancel(TestCancellationException())
145145
channel.receiveOrNull()
146146
}
147+
148+
@Test
149+
fun testBufferSize() = runTest {
150+
val capacity = 42
151+
val channel = Channel<Int>(capacity)
152+
checkBufferChannel(channel, capacity)
153+
}
154+
155+
@Test
156+
fun testBufferSizeFromTheMiddle() = runTest {
157+
val capacity = 42
158+
val channel = Channel<Int>(capacity)
159+
repeat(4) {
160+
channel.offer(-1)
161+
}
162+
repeat(4) {
163+
channel.receiveOrNull()
164+
}
165+
checkBufferChannel(channel, capacity)
166+
}
167+
168+
private suspend fun CoroutineScope.checkBufferChannel(
169+
channel: Channel<Int>,
170+
capacity: Int
171+
) {
172+
launch {
173+
expect(2)
174+
repeat(42) {
175+
channel.send(it)
176+
}
177+
expect(3)
178+
channel.send(42)
179+
expect(5)
180+
channel.close()
181+
}
182+
183+
expect(1)
184+
yield()
185+
186+
expect(4)
187+
val result = ArrayList<Int>(42)
188+
channel.consumeEach {
189+
result.add(it)
190+
}
191+
assertEquals((0..capacity).toList(), result)
192+
finish(6)
193+
}
147194
}

kotlinx-coroutines-core/jvm/test/channels/ArrayChannelStressTest.kt

+23-2
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,13 @@ class ArrayChannelStressTest(private val capacity: Int) : TestBase() {
2222
fun testStress() = runTest {
2323
val n = 100_000 * stressTestMultiplier
2424
val q = Channel<Int>(capacity)
25-
val sender = launch(coroutineContext) {
25+
val sender = launch {
2626
for (i in 1..n) {
2727
q.send(i)
2828
}
2929
expect(2)
3030
}
31-
val receiver = launch(coroutineContext) {
31+
val receiver = launch {
3232
for (i in 1..n) {
3333
val next = q.receive()
3434
check(next == i)
@@ -40,4 +40,25 @@ class ArrayChannelStressTest(private val capacity: Int) : TestBase() {
4040
receiver.join()
4141
finish(4)
4242
}
43+
44+
@Test
45+
fun testBurst() = runTest {
46+
Assume.assumeTrue(capacity < 100_000)
47+
repeat(10_000 * stressTestMultiplier) {
48+
val channel = Channel<Int>(capacity)
49+
val sender = launch(Dispatchers.Default) {
50+
for (i in 1..capacity * 2) {
51+
channel.send(i)
52+
}
53+
}
54+
val receiver = launch(Dispatchers.Default) {
55+
for (i in 1..capacity * 2) {
56+
val next = channel.receive()
57+
check(next == i)
58+
}
59+
}
60+
sender.join()
61+
receiver.join()
62+
}
63+
}
4364
}

0 commit comments

Comments
 (0)