Skip to content

Commit bb53f65

Browse files
committed
Adjust behavior of conflated channel to deliver last value
Fixes #1235 Fixes #332
1 parent 15ee8a3 commit bb53f65

File tree

4 files changed

+40
-47
lines changed

4 files changed

+40
-47
lines changed

kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt

+1-31
Original file line numberDiff line numberDiff line change
@@ -104,35 +104,6 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
104104
return null
105105
}
106106

107-
/**
108-
* Queues conflated element, returns null on success or
109-
* returns node reference if it was already closed or is waiting for receive.
110-
* @suppress **This is unstable API and it is subject to change.**
111-
*/
112-
protected fun sendConflated(element: E): ReceiveOrClosed<*>? {
113-
val node = SendBuffered(element)
114-
queue.addLastIfPrev(node, { prev ->
115-
if (prev is ReceiveOrClosed<*>) return@sendConflated prev
116-
true
117-
})
118-
conflatePreviousSendBuffered(node)
119-
return null
120-
}
121-
122-
protected fun conflatePreviousSendBuffered(node: LockFreeLinkedListNode) {
123-
/*
124-
* Conflate all previous SendBuffered,
125-
* helping other sends to coflate
126-
*/
127-
var prev = node.prevNode
128-
while (prev is SendBuffered<*>) {
129-
if (!prev.remove()) {
130-
prev.helpRemove()
131-
}
132-
prev = prev.prevNode
133-
}
134-
}
135-
136107
/**
137108
* @suppress **This is unstable API and it is subject to change.**
138109
*/
@@ -331,7 +302,6 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
331302
previous as Receive<E> // type assertion
332303
previous.resumeReceiveClosed(closed)
333304
}
334-
335305
onClosedIdempotent(closed)
336306
}
337307

@@ -499,7 +469,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
499469
override fun toString(): String = "SendSelect($pollResult)[$channel, $select]"
500470
}
501471

502-
private class SendBuffered<out E>(
472+
internal class SendBuffered<out E>(
503473
@JvmField val element: E
504474
) : LockFreeLinkedListNode(), Send {
505475
override val pollResult: Any? get() = element

kotlinx-coroutines-core/common/src/channels/ConflatedChannel.kt

+29-14
Original file line numberDiff line numberDiff line change
@@ -28,27 +28,45 @@ internal open class ConflatedChannel<E> : AbstractChannel<E>() {
2828
conflatePreviousSendBuffered(closed)
2929
}
3030

31+
/**
32+
* Queues conflated element, returns null on success or
33+
* returns node reference if it was already closed or is waiting for receive.
34+
*/
35+
private fun sendConflated(element: E): ReceiveOrClosed<*>? {
36+
val node = SendBuffered(element)
37+
queue.addLastIfPrev(node) { prev ->
38+
if (prev is ReceiveOrClosed<*>) return@sendConflated prev
39+
true
40+
}
41+
conflatePreviousSendBuffered(node)
42+
return null
43+
}
44+
45+
private fun conflatePreviousSendBuffered(node: LockFreeLinkedListNode) {
46+
// Conflate all previous SendBuffered, helping other sends to conflate
47+
var prev = node.prevNode
48+
while (prev is SendBuffered<*>) {
49+
if (!prev.remove()) {
50+
prev.helpRemove()
51+
}
52+
prev = prev.prevNode
53+
}
54+
}
55+
3156
// result is always `OFFER_SUCCESS | Closed`
3257
protected override fun offerInternal(element: E): Any {
3358
while (true) {
3459
val result = super.offerInternal(element)
3560
when {
3661
result === OFFER_SUCCESS -> return OFFER_SUCCESS
3762
result === OFFER_FAILED -> { // try to buffer
38-
val sendResult = sendConflated(element)
39-
when (sendResult) {
63+
when (val sendResult = sendConflated(element)) {
4064
null -> return OFFER_SUCCESS
41-
is Closed<*> -> {
42-
conflatePreviousSendBuffered(sendResult)
43-
return sendResult
44-
}
65+
is Closed<*> -> return sendResult
4566
}
4667
// otherwise there was receiver in queue, retry super.offerInternal
4768
}
48-
result is Closed<*> -> {
49-
conflatePreviousSendBuffered(result)
50-
return result
51-
}
69+
result is Closed<*> -> return result
5270
else -> error("Invalid offerInternal result $result")
5371
}
5472
}
@@ -64,10 +82,7 @@ internal open class ConflatedChannel<E> : AbstractChannel<E>() {
6482
result === ALREADY_SELECTED -> return ALREADY_SELECTED
6583
result === OFFER_SUCCESS -> return OFFER_SUCCESS
6684
result === OFFER_FAILED -> {} // retry
67-
result is Closed<*> -> {
68-
conflatePreviousSendBuffered(result)
69-
return result
70-
}
85+
result is Closed<*> -> return result
7186
else -> error("Invalid result $result")
7287
}
7388
}

kotlinx-coroutines-core/common/test/channels/ConflatedChannelTest.kt

+7-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,13 @@ class ConflatedChannelTest : TestBase() {
3131
fun testConflatedClose() = runTest {
3232
val q = Channel<Int>(Channel.CONFLATED)
3333
q.send(1)
34-
q.close() // shall conflate sent item and become closed
34+
q.close() // shall become closed but do not conflate last sent item yet
35+
assertTrue(q.isClosedForSend)
36+
assertFalse(q.isClosedForReceive)
37+
assertEquals(1, q.receive())
38+
// not it is closed for receive, too
39+
assertTrue(q.isClosedForSend)
40+
assertTrue(q.isClosedForReceive)
3541
assertNull(q.receiveOrNull())
3642
}
3743

kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt

+3-1
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,10 @@ class ChannelFlowTest : TestBase() {
3434
val flow = channelFlow(bufferSize = Channel.CONFLATED) {
3535
assertTrue(offer(1))
3636
assertTrue(offer(2))
37+
assertTrue(offer(3))
38+
assertTrue(offer(4))
3739
}
38-
assertEquals(listOf(1), flow.toList())
40+
assertEquals(listOf(1, 4), flow.toList()) // two elements in the middle got conflated
3941
}
4042

4143
@Test

0 commit comments

Comments
 (0)