Skip to content

Commit 221a6ae

Browse files
committed
Adjust behavior of conflated channel to deliver last value
Fixes #1235 Fixes #332
1 parent 15ee8a3 commit 221a6ae

File tree

4 files changed

+14
-21
lines changed

4 files changed

+14
-21
lines changed

kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt

-1
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,6 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
331331
previous as Receive<E> // type assertion
332332
previous.resumeReceiveClosed(closed)
333333
}
334-
335334
onClosedIdempotent(closed)
336335
}
337336

kotlinx-coroutines-core/common/src/channels/ConflatedChannel.kt

+4-18
Original file line numberDiff line numberDiff line change
@@ -24,31 +24,20 @@ internal open class ConflatedChannel<E> : AbstractChannel<E>() {
2424
protected final override val isBufferAlwaysFull: Boolean get() = false
2525
protected final override val isBufferFull: Boolean get() = false
2626

27-
override fun onClosedIdempotent(closed: LockFreeLinkedListNode) {
28-
conflatePreviousSendBuffered(closed)
29-
}
30-
3127
// result is always `OFFER_SUCCESS | Closed`
3228
protected override fun offerInternal(element: E): Any {
3329
while (true) {
3430
val result = super.offerInternal(element)
3531
when {
3632
result === OFFER_SUCCESS -> return OFFER_SUCCESS
3733
result === OFFER_FAILED -> { // try to buffer
38-
val sendResult = sendConflated(element)
39-
when (sendResult) {
34+
when (val sendResult = sendConflated(element)) {
4035
null -> return OFFER_SUCCESS
41-
is Closed<*> -> {
42-
conflatePreviousSendBuffered(sendResult)
43-
return sendResult
44-
}
36+
is Closed<*> -> return sendResult
4537
}
4638
// otherwise there was receiver in queue, retry super.offerInternal
4739
}
48-
result is Closed<*> -> {
49-
conflatePreviousSendBuffered(result)
50-
return result
51-
}
40+
result is Closed<*> -> return result
5241
else -> error("Invalid offerInternal result $result")
5342
}
5443
}
@@ -64,10 +53,7 @@ internal open class ConflatedChannel<E> : AbstractChannel<E>() {
6453
result === ALREADY_SELECTED -> return ALREADY_SELECTED
6554
result === OFFER_SUCCESS -> return OFFER_SUCCESS
6655
result === OFFER_FAILED -> {} // retry
67-
result is Closed<*> -> {
68-
conflatePreviousSendBuffered(result)
69-
return result
70-
}
56+
result is Closed<*> -> return result
7157
else -> error("Invalid result $result")
7258
}
7359
}

kotlinx-coroutines-core/common/test/channels/ConflatedChannelTest.kt

+7-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,13 @@ class ConflatedChannelTest : TestBase() {
3131
fun testConflatedClose() = runTest {
3232
val q = Channel<Int>(Channel.CONFLATED)
3333
q.send(1)
34-
q.close() // shall conflate sent item and become closed
34+
q.close() // shall become closed but do not conflate last sent item yet
35+
assertTrue(q.isClosedForSend)
36+
assertFalse(q.isClosedForReceive)
37+
assertEquals(1, q.receive())
38+
// not it is closed for receive, too
39+
assertTrue(q.isClosedForSend)
40+
assertTrue(q.isClosedForReceive)
3541
assertNull(q.receiveOrNull())
3642
}
3743

kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt

+3-1
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,10 @@ class ChannelFlowTest : TestBase() {
3434
val flow = channelFlow(bufferSize = Channel.CONFLATED) {
3535
assertTrue(offer(1))
3636
assertTrue(offer(2))
37+
assertTrue(offer(3))
38+
assertTrue(offer(4))
3739
}
38-
assertEquals(listOf(1), flow.toList())
40+
assertEquals(listOf(1, 4), flow.toList()) // two elements in the middle got conflated
3941
}
4042

4143
@Test

0 commit comments

Comments
 (0)