Skip to content

Commit c6f3c35

Browse files
committed
Adjust behavior of conflated channel (WIP)
Fixes #1235 Fixes #332
1 parent 15ee8a3 commit c6f3c35

File tree

4 files changed

+14
-29
lines changed

4 files changed

+14
-29
lines changed

kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt

-8
Original file line numberDiff line numberDiff line change
@@ -331,16 +331,8 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
331331
previous as Receive<E> // type assertion
332332
previous.resumeReceiveClosed(closed)
333333
}
334-
335-
onClosedIdempotent(closed)
336334
}
337335

338-
/**
339-
* Invoked when channel is closed as the last action of [close] invocation.
340-
* This method should be idempotent and can be called multiple times.
341-
*/
342-
protected open fun onClosedIdempotent(closed: LockFreeLinkedListNode) {}
343-
344336
/**
345337
* Retrieves first receiving waiter from the queue or returns closed token.
346338
* @suppress **This is unstable API and it is subject to change.**

kotlinx-coroutines-core/common/src/channels/ConflatedChannel.kt

+4-19
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
package kotlinx.coroutines.channels
66

77
import kotlinx.coroutines.selects.*
8-
import kotlinx.coroutines.internal.*
98

109
/**
1110
* Channel that buffers at most one element and conflates all subsequent `send` and `offer` invocations,
@@ -24,31 +23,20 @@ internal open class ConflatedChannel<E> : AbstractChannel<E>() {
2423
protected final override val isBufferAlwaysFull: Boolean get() = false
2524
protected final override val isBufferFull: Boolean get() = false
2625

27-
override fun onClosedIdempotent(closed: LockFreeLinkedListNode) {
28-
conflatePreviousSendBuffered(closed)
29-
}
30-
3126
// result is always `OFFER_SUCCESS | Closed`
3227
protected override fun offerInternal(element: E): Any {
3328
while (true) {
3429
val result = super.offerInternal(element)
3530
when {
3631
result === OFFER_SUCCESS -> return OFFER_SUCCESS
3732
result === OFFER_FAILED -> { // try to buffer
38-
val sendResult = sendConflated(element)
39-
when (sendResult) {
33+
when (val sendResult = sendConflated(element)) {
4034
null -> return OFFER_SUCCESS
41-
is Closed<*> -> {
42-
conflatePreviousSendBuffered(sendResult)
43-
return sendResult
44-
}
35+
is Closed<*> -> return sendResult
4536
}
4637
// otherwise there was receiver in queue, retry super.offerInternal
4738
}
48-
result is Closed<*> -> {
49-
conflatePreviousSendBuffered(result)
50-
return result
51-
}
39+
result is Closed<*> -> return result
5240
else -> error("Invalid offerInternal result $result")
5341
}
5442
}
@@ -64,10 +52,7 @@ internal open class ConflatedChannel<E> : AbstractChannel<E>() {
6452
result === ALREADY_SELECTED -> return ALREADY_SELECTED
6553
result === OFFER_SUCCESS -> return OFFER_SUCCESS
6654
result === OFFER_FAILED -> {} // retry
67-
result is Closed<*> -> {
68-
conflatePreviousSendBuffered(result)
69-
return result
70-
}
55+
result is Closed<*> -> return result
7156
else -> error("Invalid result $result")
7257
}
7358
}

kotlinx-coroutines-core/common/test/channels/ConflatedChannelTest.kt

+7-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,13 @@ class ConflatedChannelTest : TestBase() {
3131
fun testConflatedClose() = runTest {
3232
val q = Channel<Int>(Channel.CONFLATED)
3333
q.send(1)
34-
q.close() // shall conflate sent item and become closed
34+
q.close() // shall become closed but do not conflate last sent item yet
35+
assertTrue(q.isClosedForSend)
36+
assertFalse(q.isClosedForReceive)
37+
assertEquals(1, q.receive())
38+
// not it is closed for receive, too
39+
assertTrue(q.isClosedForSend)
40+
assertTrue(q.isClosedForReceive)
3541
assertNull(q.receiveOrNull())
3642
}
3743

kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt

+3-1
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,10 @@ class ChannelFlowTest : TestBase() {
3434
val flow = channelFlow(bufferSize = Channel.CONFLATED) {
3535
assertTrue(offer(1))
3636
assertTrue(offer(2))
37+
assertTrue(offer(3))
38+
assertTrue(offer(4))
3739
}
38-
assertEquals(listOf(1), flow.toList())
40+
assertEquals(listOf(1, 4), flow.toList()) // two elements in the middle got conflated
3941
}
4042

4143
@Test

0 commit comments

Comments
 (0)