@@ -23,14 +23,18 @@ internal open class ArrayChannel<E>(
23
23
/* *
24
24
* Buffer capacity.
25
25
*/
26
- val capacity : Int ,
26
+ private val capacity : Int ,
27
+ private val onBufferOverflow : BufferOverflow ,
27
28
onUndeliveredElement : OnUndeliveredElement <E >?
28
29
) : AbstractChannel<E>(onUndeliveredElement) {
29
30
init {
31
+ // This check is actually used by the Channel(...) constructor function which checks only for known
32
+ // capacities and calls ArrayChannel constructor for everything else.
30
33
require(capacity >= 1 ) { " ArrayChannel capacity must be at least 1, but $capacity was specified" }
31
34
}
32
35
33
36
private val lock = ReentrantLock ()
37
+
34
38
/*
35
39
* Guarded by lock.
36
40
* Allocate minimum of capacity and 16 to avoid excess memory pressure for large channels when it's not necessary.
@@ -43,7 +47,7 @@ internal open class ArrayChannel<E>(
43
47
protected final override val isBufferAlwaysEmpty: Boolean get() = false
44
48
protected final override val isBufferEmpty: Boolean get() = size.value == 0
45
49
protected final override val isBufferAlwaysFull: Boolean get() = false
46
- protected final override val isBufferFull: Boolean get() = size.value == capacity
50
+ protected final override val isBufferFull: Boolean get() = size.value == capacity && onBufferOverflow == BufferOverflow . SUSPEND
47
51
48
52
override val isFull: Boolean get() = lock.withLock { isFullImpl }
49
53
override val isEmpty: Boolean get() = lock.withLock { isEmptyImpl }
@@ -55,31 +59,26 @@ internal open class ArrayChannel<E>(
55
59
lock.withLock {
56
60
val size = this .size.value
57
61
closedForSend?.let { return it }
58
- if (size < capacity) {
59
- // tentatively put element to buffer
60
- this .size.value = size + 1 // update size before checking queue (!!!)
61
- // check for receivers that were waiting on empty queue
62
- if (size == 0 ) {
63
- loop@ while (true ) {
64
- receive = takeFirstReceiveOrPeekClosed() ? : break @loop // break when no receivers queued
65
- if (receive is Closed ) {
66
- this .size.value = size // restore size
67
- return receive!!
68
- }
69
- val token = receive!! .tryResumeReceive(element, null )
70
- if (token != null ) {
71
- assert { token == = RESUME_TOKEN }
72
- this .size.value = size // restore size
73
- return @withLock
74
- }
62
+ // update size before checking queue (!!!)
63
+ updateBufferSize(size)?.let { return it }
64
+ // check for receivers that were waiting on empty queue
65
+ if (size == 0 ) {
66
+ loop@ while (true ) {
67
+ receive = takeFirstReceiveOrPeekClosed() ? : break @loop // break when no receivers queued
68
+ if (receive is Closed ) {
69
+ this .size.value = size // restore size
70
+ return receive!!
71
+ }
72
+ val token = receive!! .tryResumeReceive(element, null )
73
+ if (token != null ) {
74
+ assert { token == = RESUME_TOKEN }
75
+ this .size.value = size // restore size
76
+ return @withLock
75
77
}
76
78
}
77
- ensureCapacity(size)
78
- buffer[(head + size) % buffer.size] = element // actually queue element
79
- return OFFER_SUCCESS
80
79
}
81
- // size == capacity: full
82
- return OFFER_FAILED
80
+ enqueueElement( size, element)
81
+ return OFFER_SUCCESS
83
82
}
84
83
// breaks here if offer meets receiver
85
84
receive!! .completeResumeReceive(element)
@@ -92,41 +91,36 @@ internal open class ArrayChannel<E>(
92
91
lock.withLock {
93
92
val size = this .size.value
94
93
closedForSend?.let { return it }
95
- if (size < capacity) {
96
- // tentatively put element to buffer
97
- this .size.value = size + 1 // update size before checking queue (!!!)
98
- // check for receivers that were waiting on empty queue
99
- if (size == 0 ) {
100
- loop@ while (true ) {
101
- val offerOp = describeTryOffer(element)
102
- val failure = select.performAtomicTrySelect(offerOp)
103
- when {
104
- failure == null -> { // offered successfully
105
- this .size.value = size // restore size
106
- receive = offerOp.result
107
- return @withLock
108
- }
109
- failure == = OFFER_FAILED -> break @loop // cannot offer -> Ok to queue to buffer
110
- failure == = RETRY_ATOMIC -> {} // retry
111
- failure == = ALREADY_SELECTED || failure is Closed <* > -> {
112
- this .size.value = size // restore size
113
- return failure
114
- }
115
- else -> error(" performAtomicTrySelect(describeTryOffer) returned $failure " )
94
+ // update size before checking queue (!!!)
95
+ updateBufferSize(size)?.let { return it }
96
+ // check for receivers that were waiting on empty queue
97
+ if (size == 0 ) {
98
+ loop@ while (true ) {
99
+ val offerOp = describeTryOffer(element)
100
+ val failure = select.performAtomicTrySelect(offerOp)
101
+ when {
102
+ failure == null -> { // offered successfully
103
+ this .size.value = size // restore size
104
+ receive = offerOp.result
105
+ return @withLock
106
+ }
107
+ failure == = OFFER_FAILED -> break @loop // cannot offer -> Ok to queue to buffer
108
+ failure == = RETRY_ATOMIC -> {} // retry
109
+ failure == = ALREADY_SELECTED || failure is Closed <* > -> {
110
+ this .size.value = size // restore size
111
+ return failure
116
112
}
113
+ else -> error(" performAtomicTrySelect(describeTryOffer) returned $failure " )
117
114
}
118
115
}
119
- // let's try to select sending this element to buffer
120
- if (! select.trySelect()) { // :todo: move trySelect completion outside of lock
121
- this .size.value = size // restore size
122
- return ALREADY_SELECTED
123
- }
124
- ensureCapacity(size)
125
- buffer[(head + size) % buffer.size] = element // actually queue element
126
- return OFFER_SUCCESS
127
116
}
128
- // size == capacity: full
129
- return OFFER_FAILED
117
+ // let's try to select sending this element to buffer
118
+ if (! select.trySelect()) { // :todo: move trySelect completion outside of lock
119
+ this .size.value = size // restore size
120
+ return ALREADY_SELECTED
121
+ }
122
+ enqueueElement(size, element)
123
+ return OFFER_SUCCESS
130
124
}
131
125
// breaks here if offer meets receiver
132
126
receive!! .completeResumeReceive(element)
@@ -137,6 +131,35 @@ internal open class ArrayChannel<E>(
137
131
super .enqueueSend(send)
138
132
}
139
133
134
+ // Guarded by lock
135
+ // Result is `OFFER_SUCCESS | OFFER_FAILED | null`
136
+ private fun updateBufferSize (currentSize : Int ): Symbol ? {
137
+ if (currentSize < capacity) {
138
+ size.value = currentSize + 1 // tentatively put it into the buffer
139
+ return null // proceed
140
+ }
141
+ // buffer is full
142
+ return when (onBufferOverflow) {
143
+ BufferOverflow .SUSPEND -> OFFER_FAILED
144
+ BufferOverflow .DROP_LATEST -> OFFER_SUCCESS
145
+ BufferOverflow .DROP_OLDEST -> null // proceed, will drop oldest in enqueueElement
146
+ }
147
+ }
148
+
149
+ // Guarded by lock
150
+ private fun enqueueElement (currentSize : Int , element : E ) {
151
+ if (currentSize < capacity) {
152
+ ensureCapacity(currentSize)
153
+ buffer[(head + currentSize) % buffer.size] = element // actually queue element
154
+ } else {
155
+ // buffer is full
156
+ assert { onBufferOverflow == BufferOverflow .DROP_OLDEST } // the only way we can get here
157
+ buffer[head % buffer.size] = null // drop oldest element
158
+ buffer[(head + currentSize) % buffer.size] = element // actually queue element
159
+ head = (head + 1 ) % buffer.size
160
+ }
161
+ }
162
+
140
163
// Guarded by lock
141
164
private fun ensureCapacity (currentSize : Int ) {
142
165
if (currentSize >= buffer.size) {
0 commit comments