Skip to content

Commit fcf229a

Browse files
committed
Allocate underlying buffer in ArrayChannel in on-demand manner
Rationale: Such change will allow us to use huge buffers in various flow operators without having a serious footprint in suspension-free scenarios
1 parent b37ca3a commit fcf229a

File tree

2 files changed

+46
-2
lines changed

2 files changed

+46
-2
lines changed

kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt

+19-2
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import kotlinx.coroutines.*
88
import kotlinx.coroutines.internal.*
99
import kotlinx.coroutines.selects.*
1010
import kotlin.jvm.*
11+
import kotlin.math.*
1112

1213
/**
1314
* Channel with array buffer of a fixed [capacity].
@@ -29,10 +30,14 @@ internal open class ArrayChannel<E>(
2930
}
3031

3132
private val lock = ReentrantLock()
32-
private val buffer: Array<Any?> = arrayOfNulls<Any?>(capacity)
33+
/*
34+
* Guarded by lock.
35+
* Allocate minimum of capacity and 16 to avoid excess memory pressure for large channels when it's not necessary.
36+
*/
37+
private var buffer: Array<Any?> = arrayOfNulls<Any?>(min(capacity, 16))
3338
private var head: Int = 0
3439
@Volatile
35-
private var size: Int = 0
40+
private var size: Int = 0 // Invariant: size <= capacity
3641

3742
protected final override val isBufferAlwaysEmpty: Boolean get() = false
3843
protected final override val isBufferEmpty: Boolean get() = size == 0
@@ -64,6 +69,7 @@ internal open class ArrayChannel<E>(
6469
}
6570
}
6671
}
72+
ensureCapacity(size)
6773
buffer[(head + size) % capacity] = element // actually queue element
6874
return OFFER_SUCCESS
6975
}
@@ -112,6 +118,7 @@ internal open class ArrayChannel<E>(
112118
this.size = size // restore size
113119
return ALREADY_SELECTED
114120
}
121+
ensureCapacity(size)
115122
buffer[(head + size) % capacity] = element // actually queue element
116123
return OFFER_SUCCESS
117124
}
@@ -123,6 +130,16 @@ internal open class ArrayChannel<E>(
123130
return receive!!.offerResult
124131
}
125132

133+
// Guarded by lock
134+
private fun ensureCapacity(currentSize: Int) {
135+
if (currentSize == buffer.size ) {
136+
val newSize = min(buffer.size * 2, capacity)
137+
val newBuffer = arrayOfNulls<Any?>(newSize)
138+
buffer.copyInto(newBuffer)
139+
buffer = newBuffer
140+
}
141+
}
142+
126143
// result is `E | POLL_FAILED | Closed`
127144
protected override fun pollInternal(): Any? {
128145
var send: Send? = null

kotlinx-coroutines-core/common/test/channels/ArrayChannelTest.kt

+27
Original file line numberDiff line numberDiff line change
@@ -144,4 +144,31 @@ class ArrayChannelTest : TestBase() {
144144
channel.cancel(TestCancellationException())
145145
channel.receiveOrNull()
146146
}
147+
148+
@Test
149+
fun testBufferSize() = runTest {
150+
val capacity = 42
151+
val channel = Channel<Int>(capacity)
152+
launch {
153+
expect(2)
154+
repeat(42) {
155+
channel.send(it)
156+
}
157+
expect(3)
158+
channel.send(42)
159+
expect(5)
160+
channel.close()
161+
}
162+
163+
expect(1)
164+
yield()
165+
166+
expect(4)
167+
val result = ArrayList<Int>(42)
168+
channel.consumeEach {
169+
result.add(it)
170+
}
171+
assertEquals((0..capacity).toList(), result)
172+
finish(6)
173+
}
147174
}

0 commit comments

Comments
 (0)