Skip to content

Commit 0126dba

Browse files
mvicsokolovaelizarov
authored andcommitted
ConflatedChannel with lock to protect the one-element buffer, double-linked list used for suspending receivers only.
1 parent 90a9faf commit 0126dba

File tree

4 files changed

+106
-74
lines changed

4 files changed

+106
-74
lines changed

kotlinx-coroutines-core/build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ task jdk16Test(type: Test, dependsOn: [compileTestKotlinJvm, checkJdk16]) {
107107
testClassesDirs = files { jvmTest.testClassesDirs }
108108
executable = "$System.env.JDK_16/bin/java"
109109
exclude '**/*LFStressTest.*' // lock-freedom tests use LockFreedomTestEnvironment which needs JDK8
110-
exclude '**/*LCStressTest.*' // lic-check tests use LinChecker which needs JDK8
110+
exclude '**/*LCStressTest.*' // lin-check tests use LinChecker which needs JDK8
111111
exclude '**/exceptions/**' // exceptions tests check suppressed exception which needs JDK8
112112
exclude '**/ExceptionsGuideTest.*'
113113
}
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,9 @@
1-
/*
2-
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3-
*/
4-
51
package kotlinx.coroutines.channels
62

7-
import kotlinx.coroutines.selects.*
3+
import kotlinx.coroutines.*
84
import kotlinx.coroutines.internal.*
5+
import kotlinx.coroutines.selects.*
6+
import kotlin.native.concurrent.SharedImmutable
97

108
/**
119
* Channel that buffers at most one element and conflates all subsequent `send` and `offer` invocations,
@@ -15,80 +13,124 @@ import kotlinx.coroutines.internal.*
1513
* Sender to this channel never suspends and [offer] always returns `true`.
1614
*
1715
* This channel is created by `Channel(Channel.CONFLATED)` factory function invocation.
18-
*
19-
* This implementation is fully lock-free.
2016
*/
2117
internal open class ConflatedChannel<E> : AbstractChannel<E>() {
22-
protected final override val isBufferAlwaysEmpty: Boolean get() = true
23-
protected final override val isBufferEmpty: Boolean get() = true
18+
protected final override val isBufferAlwaysEmpty: Boolean get() = false
19+
protected final override val isBufferEmpty: Boolean get() = value === EMPTY
2420
protected final override val isBufferAlwaysFull: Boolean get() = false
2521
protected final override val isBufferFull: Boolean get() = false
2622

27-
override fun onClosedIdempotent(closed: LockFreeLinkedListNode) {
28-
@Suppress("UNCHECKED_CAST")
29-
(closed.prevNode as? SendBuffered<E>)?.let { lastBuffered ->
30-
conflatePreviousSendBuffered(lastBuffered)
31-
}
32-
}
23+
override val isEmpty: Boolean get() = lock.withLock { isEmptyImpl }
3324

34-
/**
35-
* Queues conflated element, returns null on success or
36-
* returns node reference if it was already closed or is waiting for receive.
37-
*/
38-
private fun sendConflated(element: E): ReceiveOrClosed<*>? {
39-
val node = SendBuffered(element)
40-
queue.addLastIfPrev(node) { prev ->
41-
if (prev is ReceiveOrClosed<*>) return@sendConflated prev
42-
true
43-
}
44-
conflatePreviousSendBuffered(node)
45-
return null
25+
private val lock = ReentrantLock()
26+
27+
private var value: Any? = EMPTY
28+
29+
private companion object {
30+
@SharedImmutable
31+
private val EMPTY = Symbol("EMPTY")
4632
}
4733

48-
private fun conflatePreviousSendBuffered(node: SendBuffered<E>) {
49-
// Conflate all previous SendBuffered, helping other sends to conflate
50-
var prev = node.prevNode
51-
while (prev is SendBuffered<*>) {
52-
if (!prev.remove()) {
53-
prev.helpRemove()
34+
// result is `OFFER_SUCCESS | Closed`
35+
protected override fun offerInternal(element: E): Any {
36+
var receive: ReceiveOrClosed<E>? = null
37+
lock.withLock {
38+
closedForSend?.let { return it }
39+
// if there is no element written in buffer
40+
if (value === EMPTY) {
41+
// check for receivers that were waiting on the empty buffer
42+
loop@ while(true) {
43+
receive = takeFirstReceiveOrPeekClosed() ?: break@loop // break when no receivers queued
44+
if (receive is Closed) {
45+
return receive!!
46+
}
47+
val token = receive!!.tryResumeReceive(element, null)
48+
if (token != null) {
49+
assert { token === RESUME_TOKEN }
50+
return@withLock
51+
}
52+
}
5453
}
55-
prev = prev.prevNode
54+
value = element
55+
return OFFER_SUCCESS
5656
}
57+
// breaks here if offer meets receiver
58+
receive!!.completeResumeReceive(element)
59+
return receive!!.offerResult
5760
}
5861

59-
// result is always `OFFER_SUCCESS | Closed`
60-
protected override fun offerInternal(element: E): Any {
61-
while (true) {
62-
val result = super.offerInternal(element)
63-
when {
64-
result === OFFER_SUCCESS -> return OFFER_SUCCESS
65-
result === OFFER_FAILED -> { // try to buffer
66-
when (val sendResult = sendConflated(element)) {
67-
null -> return OFFER_SUCCESS
68-
is Closed<*> -> return sendResult
62+
// result is `ALREADY_SELECTED | OFFER_SUCCESS | Closed`
63+
protected override fun offerSelectInternal(element: E, select: SelectInstance<*>): Any {
64+
var receive: ReceiveOrClosed<E>? = null
65+
lock.withLock {
66+
closedForSend?.let { return it }
67+
if (value === EMPTY) {
68+
loop@ while(true) {
69+
val offerOp = describeTryOffer(element)
70+
val failure = select.performAtomicTrySelect(offerOp)
71+
when {
72+
failure == null -> { // offered successfully
73+
receive = offerOp.result
74+
return@withLock
75+
}
76+
failure === OFFER_FAILED -> break@loop // cannot offer -> Ok to queue to buffer
77+
failure === RETRY_ATOMIC -> {} // retry
78+
failure === ALREADY_SELECTED || failure is Closed<*> -> return failure
79+
else -> error("performAtomicTrySelect(describeTryOffer) returned $failure")
6980
}
70-
// otherwise there was receiver in queue, retry super.offerInternal
7181
}
72-
result is Closed<*> -> return result
73-
else -> error("Invalid offerInternal result $result")
7482
}
83+
// try to select sending this element to buffer
84+
if (!select.trySelect()) {
85+
return ALREADY_SELECTED
86+
}
87+
value = element
88+
return OFFER_SUCCESS
7589
}
90+
// breaks here if offer meets receiver
91+
receive!!.completeResumeReceive(element)
92+
return receive!!.offerResult
7693
}
7794

78-
// result is always `ALREADY_SELECTED | OFFER_SUCCESS | Closed`.
79-
protected override fun offerSelectInternal(element: E, select: SelectInstance<*>): Any {
80-
while (true) {
81-
val result = if (hasReceiveOrClosed)
82-
super.offerSelectInternal(element, select) else
83-
(select.performAtomicTrySelect(describeSendConflated(element)) ?: OFFER_SUCCESS)
84-
when {
85-
result === ALREADY_SELECTED -> return ALREADY_SELECTED
86-
result === OFFER_SUCCESS -> return OFFER_SUCCESS
87-
result === OFFER_FAILED -> {} // retry
88-
result === RETRY_ATOMIC -> {} // retry
89-
result is Closed<*> -> return result
90-
else -> error("Invalid result $result")
95+
// result is `E | POLL_FAILED | Closed`
96+
protected override fun pollInternal(): Any? {
97+
var result: Any? = null
98+
lock.withLock {
99+
if (value === EMPTY) return closedForSend ?: POLL_FAILED
100+
result = value
101+
value = EMPTY
102+
}
103+
return result
104+
}
105+
106+
// result is `E | POLL_FAILED | Closed`
107+
protected override fun pollSelectInternal(select: SelectInstance<*>): Any? {
108+
var result: Any? = null
109+
lock.withLock {
110+
if (value === EMPTY) return closedForSend ?: POLL_FAILED
111+
if (!select.trySelect())
112+
return ALREADY_SELECTED
113+
result = value
114+
value = EMPTY
115+
}
116+
return result
117+
}
118+
119+
protected override fun onCancelIdempotent(wasClosed: Boolean) {
120+
if (wasClosed) {
121+
lock.withLock {
122+
value = EMPTY
91123
}
92124
}
125+
super.onCancelIdempotent(wasClosed)
93126
}
94-
}
127+
128+
override fun enqueueReceiveInternal(receive: Receive<E>): Boolean = lock.withLock {
129+
super.enqueueReceiveInternal(receive)
130+
}
131+
132+
// ------ debug ------
133+
134+
override val bufferDebugString: String
135+
get() = "(value=$value)"
136+
}

kotlinx-coroutines-core/jvm/test/channels/ChannelLFStressTest.kt

-8
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,6 @@ class ChannelLFStressTest : TestBase() {
4141
checkAllReceived()
4242
}
4343

44-
@Test
45-
fun testConflatedLockFreedom() {
46-
// This test does not really verify that all sent elements were received
47-
// and checks only LF property
48-
channel = Channel(Channel.CONFLATED)
49-
performLockFreedomTest()
50-
}
51-
5244
private fun performLockFreedomTest() {
5345
env.onCompletion {
5446
// We must cancel the channel to abort both senders & receivers

kotlinx-coroutines-core/jvm/test/linearizability/ChannelsLCStressTest.kt

+2-4
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ import org.jetbrains.kotlinx.lincheck.paramgen.*
1717
import org.jetbrains.kotlinx.lincheck.verifier.*
1818
import org.junit.*
1919

20-
2120
class RendezvousChannelLCStressTest : ChannelLCStressTestBase(
2221
c = Channel(RENDEZVOUS),
2322
sequentialSpecification = SequentialRendezvousChannel::class.java
@@ -48,7 +47,6 @@ class ConflatedChannelLCStressTest : ChannelLCStressTestBase(
4847
)
4948
class SequentialConflatedChannel : SequentialIntChannelBase(CONFLATED)
5049

51-
5250
@Param.Params(
5351
Param(name = "value", gen = IntGen::class, conf = "1:5"),
5452
Param(name = "closeToken", gen = IntGen::class, conf = "1:3")
@@ -105,10 +103,10 @@ abstract class ChannelLCStressTestBase(private val c: Channel<Int>, private val
105103
// @Operation
106104
fun cancel(@Param(name = "closeToken") token: Int) = c.cancel(NumberedCancellationException(token))
107105

108-
// @Operation
106+
// @Operation
109107
fun isClosedForReceive() = c.isClosedForReceive
110108

111-
// @Operation
109+
// @Operation
112110
fun isClosedForSend() = c.isClosedForSend
113111

114112
// TODO: this operation should be (and can be!) linearizable, but is not

0 commit comments

Comments
 (0)