Skip to content

Commit b6bfa4c

Browse files
committed
Review fixes
Signed-off-by: Nikita Koval <[email protected]>
1 parent 6eed6a9 commit b6bfa4c

File tree

1 file changed

+13
-23
lines changed

1 file changed

+13
-23
lines changed

kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt

+13-23
Original file line numberDiff line numberDiff line change
@@ -902,7 +902,7 @@ internal open class BufferedChannel<E>(
902902
while (true) {
903903
// Similar to the `send(e)` operation, `receive()` first checks
904904
// whether the channel is already closed for receiving.
905-
if (isClosedForReceiveImpl) {
905+
if (isClosedForReceive) {
906906
if (receiversCounter < sendersCounter) segment.cleanPrev()
907907
return onClosed()
908908
}
@@ -1958,7 +1958,7 @@ internal open class BufferedChannel<E>(
19581958
* Completes the started [close] or [cancel] procedure.
19591959
*/
19601960
private fun completeCloseOrCancel() {
1961-
isClosedForSendImpl // must finish the started close/cancel if one is detected.
1961+
isClosedForSend // must finish the started close/cancel if one is detected.
19621962
}
19631963

19641964
protected open val isConflatedDropOldest get() = false
@@ -2009,7 +2009,9 @@ internal open class BufferedChannel<E>(
20092009
*/
20102010
private fun closeLinkedList(): ChannelSegment<E> {
20112011
// Choose the last segment.
2012-
val lastSegment = listOf(bufferEndSegment.value, sendSegment.value, receiveSegment.value).maxBy { it.id }
2012+
var lastSegment = bufferEndSegment.value
2013+
sendSegment.value.let { if (it.id > lastSegment.id) lastSegment = it }
2014+
receiveSegment.value.let { if (it.id > lastSegment.id) lastSegment = it }
20132015
// Close the linked list of segment for new segment addition
20142016
// and return the last segment in the linked list.
20152017
return lastSegment.close()
@@ -2235,27 +2237,20 @@ internal open class BufferedChannel<E>(
22352237

22362238
@ExperimentalCoroutinesApi
22372239
override val isClosedForSend: Boolean
2238-
get() = isClosedForSendImpl
2239-
2240-
private val isClosedForSendImpl: Boolean
22412240
get() = sendersAndCloseStatus.value.isClosedForSend0
22422241

22432242
private val Long.isClosedForSend0 get() =
2244-
isClosed(this, sendersCur = this.sendersCounter, isClosedForReceive = false)
2243+
isClosed(this, isClosedForReceive = false)
22452244

22462245
@ExperimentalCoroutinesApi
22472246
override val isClosedForReceive: Boolean
2248-
get() = isClosedForReceiveImpl
2249-
2250-
private val isClosedForReceiveImpl: Boolean
22512247
get() = sendersAndCloseStatus.value.isClosedForReceive0
22522248

22532249
private val Long.isClosedForReceive0 get() =
2254-
isClosed(this, sendersCur = this.sendersCounter, isClosedForReceive = true)
2250+
isClosed(this, isClosedForReceive = true)
22552251

22562252
private fun isClosed(
22572253
sendersAndCloseStatusCur: Long,
2258-
sendersCur: Long,
22592254
isClosedForReceive: Boolean
22602255
) = when (sendersAndCloseStatusCur.sendersCloseStatus) {
22612256
// This channel is active and has not been closed.
@@ -2270,7 +2265,7 @@ internal open class BufferedChannel<E>(
22702265
// for senders or the flag whether there still
22712266
// exist elements to retrieve for receivers.
22722267
CLOSE_STATUS_CLOSED -> {
2273-
completeClose(sendersCur)
2268+
completeClose(sendersAndCloseStatusCur.sendersCounter)
22742269
// When `isClosedForReceive` is `false`, always return `true`.
22752270
// Otherwise, it is possible that the channel is closed but
22762271
// still has elements to retrieve.
@@ -2280,7 +2275,7 @@ internal open class BufferedChannel<E>(
22802275
// Help to complete the cancellation procedure to
22812276
// guarantee linearizability and return `true`.
22822277
CLOSE_STATUS_CANCELLED -> {
2283-
completeCancel(sendersCur)
2278+
completeCancel(sendersAndCloseStatusCur.sendersCounter)
22842279
true
22852280
}
22862281
else -> error("unexpected close status: ${sendersAndCloseStatusCur.sendersCloseStatus}")
@@ -2290,12 +2285,12 @@ internal open class BufferedChannel<E>(
22902285
override val isEmpty: Boolean get() {
22912286
// This function should return `false` if
22922287
// this channel is closed for `receive`.
2293-
if (isClosedForReceiveImpl) return false
2288+
if (isClosedForReceive) return false
22942289
// Does this channel has elements to retrieve?
22952290
if (hasElements()) return false
22962291
// This channel does not have elements to retrieve;
22972292
// Check that it is still not closed for `receive`.
2298-
return !isClosedForReceiveImpl
2293+
return !isClosedForReceive
22992294
}
23002295

23012296
/**
@@ -2796,23 +2791,18 @@ internal class ChannelSegment<E>(id: Long, prev: ChannelSegment<E>?, channel: Bu
27962791
// Perform the cancellation; `onCancellationImpl(..)` return `true` if the
27972792
// cancelled operation had not been resumed. In this case, the `onUndeliveredElement`
27982793
// lambda should be called.
2799-
if (onCancellationImpl(index)) {
2794+
if (onCancellation(index)) {
28002795
channel.onUndeliveredElement!!.callUndeliveredElement(element, context)
28012796
}
28022797
}
28032798

2804-
fun onCancellation(index: Int) {
2805-
onCancellationImpl(index)
2806-
}
2807-
2808-
28092799
/**
28102800
* Returns `true` if the request is successfully cancelled,
28112801
* and no rendezvous has happened. We need this knowledge
28122802
* to keep [BufferedChannel.onUndeliveredElement] correct.
28132803
*/
28142804
@Suppress("ConvertTwoComparisonsToRangeCheck")
2815-
private fun onCancellationImpl(index: Int): Boolean {
2805+
fun onCancellation(index: Int): Boolean {
28162806
// Count the global index of this cell and read
28172807
// the current counters of send and receive operations.
28182808
val globalIndex = id * SEGMENT_SIZE + index

0 commit comments

Comments
 (0)