Skip to content

Commit a2024f6

Browse files
committed
Adjust behavior of conflated channel to deliver last value
Fixes #1235 Fixes #332
1 parent 15ee8a3 commit a2024f6

File tree

4 files changed

+44
-48
lines changed

4 files changed

+44
-48
lines changed

kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt

+1-31
Original file line numberDiff line numberDiff line change
@@ -104,35 +104,6 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
104104
return null
105105
}
106106

107-
/**
108-
* Queues conflated element, returns null on success or
109-
* returns node reference if it was already closed or is waiting for receive.
110-
* @suppress **This is unstable API and it is subject to change.**
111-
*/
112-
protected fun sendConflated(element: E): ReceiveOrClosed<*>? {
113-
val node = SendBuffered(element)
114-
queue.addLastIfPrev(node, { prev ->
115-
if (prev is ReceiveOrClosed<*>) return@sendConflated prev
116-
true
117-
})
118-
conflatePreviousSendBuffered(node)
119-
return null
120-
}
121-
122-
protected fun conflatePreviousSendBuffered(node: LockFreeLinkedListNode) {
123-
/*
124-
* Conflate all previous SendBuffered,
125-
* helping other sends to coflate
126-
*/
127-
var prev = node.prevNode
128-
while (prev is SendBuffered<*>) {
129-
if (!prev.remove()) {
130-
prev.helpRemove()
131-
}
132-
prev = prev.prevNode
133-
}
134-
}
135-
136107
/**
137108
* @suppress **This is unstable API and it is subject to change.**
138109
*/
@@ -331,7 +302,6 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
331302
previous as Receive<E> // type assertion
332303
previous.resumeReceiveClosed(closed)
333304
}
334-
335305
onClosedIdempotent(closed)
336306
}
337307

@@ -499,7 +469,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
499469
override fun toString(): String = "SendSelect($pollResult)[$channel, $select]"
500470
}
501471

502-
private class SendBuffered<out E>(
472+
internal class SendBuffered<out E>(
503473
@JvmField val element: E
504474
) : LockFreeLinkedListNode(), Send {
505475
override val pollResult: Any? get() = element

kotlinx-coroutines-core/common/src/channels/ConflatedChannel.kt

+33-15
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,35 @@ internal open class ConflatedChannel<E> : AbstractChannel<E>() {
2525
protected final override val isBufferFull: Boolean get() = false
2626

2727
override fun onClosedIdempotent(closed: LockFreeLinkedListNode) {
28-
conflatePreviousSendBuffered(closed)
28+
@Suppress("UNCHECKED_CAST")
29+
(closed.prevNode as? SendBuffered<E>)?.let { lastBuffered ->
30+
conflatePreviousSendBuffered(lastBuffered)
31+
}
32+
}
33+
34+
/**
35+
* Queues conflated element, returns null on success or
36+
* returns node reference if it was already closed or is waiting for receive.
37+
*/
38+
private fun sendConflated(element: E): ReceiveOrClosed<*>? {
39+
val node = SendBuffered(element)
40+
queue.addLastIfPrev(node) { prev ->
41+
if (prev is ReceiveOrClosed<*>) return@sendConflated prev
42+
true
43+
}
44+
conflatePreviousSendBuffered(node)
45+
return null
46+
}
47+
48+
private fun conflatePreviousSendBuffered(node: SendBuffered<E>) {
49+
// Conflate all previous SendBuffered, helping other sends to conflate
50+
var prev = node.prevNode
51+
while (prev is SendBuffered<*>) {
52+
if (!prev.remove()) {
53+
prev.helpRemove()
54+
}
55+
prev = prev.prevNode
56+
}
2957
}
3058

3159
// result is always `OFFER_SUCCESS | Closed`
@@ -35,20 +63,13 @@ internal open class ConflatedChannel<E> : AbstractChannel<E>() {
3563
when {
3664
result === OFFER_SUCCESS -> return OFFER_SUCCESS
3765
result === OFFER_FAILED -> { // try to buffer
38-
val sendResult = sendConflated(element)
39-
when (sendResult) {
66+
when (val sendResult = sendConflated(element)) {
4067
null -> return OFFER_SUCCESS
41-
is Closed<*> -> {
42-
conflatePreviousSendBuffered(sendResult)
43-
return sendResult
44-
}
68+
is Closed<*> -> return sendResult
4569
}
4670
// otherwise there was receiver in queue, retry super.offerInternal
4771
}
48-
result is Closed<*> -> {
49-
conflatePreviousSendBuffered(result)
50-
return result
51-
}
72+
result is Closed<*> -> return result
5273
else -> error("Invalid offerInternal result $result")
5374
}
5475
}
@@ -64,10 +85,7 @@ internal open class ConflatedChannel<E> : AbstractChannel<E>() {
6485
result === ALREADY_SELECTED -> return ALREADY_SELECTED
6586
result === OFFER_SUCCESS -> return OFFER_SUCCESS
6687
result === OFFER_FAILED -> {} // retry
67-
result is Closed<*> -> {
68-
conflatePreviousSendBuffered(result)
69-
return result
70-
}
88+
result is Closed<*> -> return result
7189
else -> error("Invalid result $result")
7290
}
7391
}

kotlinx-coroutines-core/common/test/channels/ConflatedChannelTest.kt

+7-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,13 @@ class ConflatedChannelTest : TestBase() {
3131
fun testConflatedClose() = runTest {
3232
val q = Channel<Int>(Channel.CONFLATED)
3333
q.send(1)
34-
q.close() // shall conflate sent item and become closed
34+
q.close() // shall become closed but do not conflate last sent item yet
35+
assertTrue(q.isClosedForSend)
36+
assertFalse(q.isClosedForReceive)
37+
assertEquals(1, q.receive())
38+
// not it is closed for receive, too
39+
assertTrue(q.isClosedForSend)
40+
assertTrue(q.isClosedForReceive)
3541
assertNull(q.receiveOrNull())
3642
}
3743

kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt

+3-1
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,10 @@ class ChannelFlowTest : TestBase() {
3434
val flow = channelFlow(bufferSize = Channel.CONFLATED) {
3535
assertTrue(offer(1))
3636
assertTrue(offer(2))
37+
assertTrue(offer(3))
38+
assertTrue(offer(4))
3739
}
38-
assertEquals(listOf(1), flow.toList())
40+
assertEquals(listOf(1, 4), flow.toList()) // two elements in the middle got conflated
3941
}
4042

4143
@Test

0 commit comments

Comments
 (0)