Skip to content

Commit 69c15b3

Browse files
committed
Make ConflatedChannel linearizable
* Rename onClosed to onClosedIdempotent * Use onClosedIdempotent with helpClose on concurrent close * Get rid of afterClose and replace it with onClosedIdempotent * Help interleaving operations with conflation
1 parent 16f0140 commit 69c15b3

File tree

5 files changed

+24
-27
lines changed

5 files changed

+24
-27
lines changed

common/kotlinx-coroutines-core-common/src/channels/AbstractChannel.kt

+18-17
Original file line numberDiff line numberDiff line change
@@ -119,12 +119,18 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
119119
return null
120120
}
121121

122-
/**
123-
* @suppress **This is unstable API and it is subject to change.**
124-
*/
125122
protected fun conflatePreviousSendBuffered(node: LockFreeLinkedListNode) {
126-
val prev = node.prevNode
127-
(prev as? SendBuffered<*>)?.remove()
123+
/*
124+
* Conflate all previous SendBuffered,
125+
* helping other sends to coflate
126+
*/
127+
var prev = node.prevNode
128+
while (prev is SendBuffered<*>) {
129+
if (!prev.remove()) {
130+
prev.helpRemove()
131+
}
132+
prev = prev.prevNode
133+
}
128134
}
129135

130136
/**
@@ -249,15 +255,15 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
249255
*/
250256
val closeAdded = queue.addLastIfPrev(closed, { it !is Closed<*> })
251257
if (!closeAdded) {
252-
helpClose(queue.prevNode as Closed<*>)
258+
val actualClosed = queue.prevNode as Closed<*>
259+
helpClose(actualClosed)
260+
onClosedIdempotent(actualClosed)
253261
return false
254262
}
255263

256264
helpClose(closed)
257265
invokeOnCloseHandler(cause)
258-
// TODO We can get rid of afterClose
259-
onClosed(closed)
260-
afterClose(cause)
266+
onClosedIdempotent(closed)
261267
return true
262268
}
263269

@@ -322,15 +328,10 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
322328
}
323329

324330
/**
325-
* Invoked when [Closed] element was just added.
326-
* @suppress **This is unstable API and it is subject to change.**
327-
*/
328-
protected open fun onClosed(closed: Closed<E>) {}
329-
330-
/**
331-
* Invoked after successful [close].
331+
* Invoked when channel is closed as the last action of [close] invocation.
332+
* This method should be idempotent and can be called multiple times.
332333
*/
333-
protected open fun afterClose(cause: Throwable?) {}
334+
protected open fun onClosedIdempotent(closed: LockFreeLinkedListNode) {}
334335

335336
/**
336337
* Retrieves first receiving waiter from the queue or returns closed token.

common/kotlinx-coroutines-core-common/src/channels/ConflatedChannel.kt

+2-6
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
package kotlinx.coroutines.channels
66

77
import kotlinx.coroutines.selects.*
8-
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.internal.*
99

1010
/**
1111
* Channel that buffers at most one element and conflates all subsequent `send` and `offer` invocations,
@@ -24,11 +24,7 @@ internal open class ConflatedChannel<E> : AbstractChannel<E>() {
2424
protected final override val isBufferAlwaysFull: Boolean get() = false
2525
protected final override val isBufferFull: Boolean get() = false
2626

27-
/**
28-
* This implementation conflates last sent item when channel is closed.
29-
* @suppress **This is unstable API and it is subject to change.**
30-
*/
31-
override fun onClosed(closed: Closed<E>) {
27+
override fun onClosedIdempotent(closed: LockFreeLinkedListNode) {
3228
conflatePreviousSendBuffered(closed)
3329
}
3430

core/kotlinx-coroutines-core/test/linearizability/ChannelLinearizabilityTest.kt

-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ class ChannelLinearizabilityTest : TestBase() {
2525
private val lt = LinTesting()
2626
private var channel: Channel<Int> = Channel(capacity)
2727

28-
2928
@Operation(runOnce = true)
3029
fun send1(@Param(name = "value") value: Int) = lt.run("send1") { channel.send(value) }
3130

reactive/kotlinx-coroutines-reactive/src/Channel.kt

+2-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package kotlinx.coroutines.reactive
77
import kotlinx.atomicfu.*
88
import kotlinx.coroutines.*
99
import kotlinx.coroutines.channels.*
10+
import kotlinx.coroutines.internal.*
1011
import org.reactivestreams.*
1112

1213
/**
@@ -74,7 +75,7 @@ private class SubscriptionChannel<T>(
7475
}
7576

7677
@Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER")
77-
override fun afterClose(cause: Throwable?) {
78+
override fun onClosedIdempotent(closed: LockFreeLinkedListNode) {
7879
subscription?.cancel()
7980
}
8081

reactive/kotlinx-coroutines-rx2/src/RxChannel.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import io.reactivex.*
88
import io.reactivex.disposables.*
99
import kotlinx.coroutines.channels.*
1010
import kotlinx.coroutines.*
11+
import kotlinx.coroutines.internal.*
1112

1213
/**
1314
* Subscribes to this [MaybeSource] and returns a channel to receive elements emitted by it.
@@ -64,9 +65,8 @@ private class SubscriptionChannel<T> :
6465
@Volatile
6566
var subscription: Disposable? = null
6667

67-
// AbstractChannel overrides
6868
@Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER")
69-
override fun afterClose(cause: Throwable?) {
69+
override fun onClosedIdempotent(closed: LockFreeLinkedListNode) {
7070
subscription?.dispose()
7171
}
7272

0 commit comments

Comments
 (0)