-
Notifications
You must be signed in to change notification settings - Fork 1.9k
/
Copy pathConflatedChannel.kt
140 lines (125 loc) · 5.08 KB
/
ConflatedChannel.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.channels
import kotlinx.coroutines.*
import kotlinx.coroutines.internal.*
import kotlinx.coroutines.selects.*
import kotlin.native.concurrent.*
/**
* Channel that buffers at most one element and conflates all subsequent `send` and `offer` invocations,
* so that the receiver always gets the most recently sent element.
* Back-to-send sent elements are _conflated_ -- only the most recently sent element is received,
* while previously sent elements **are lost**.
* Sender to this channel never suspends and [offer] always returns `true`.
*
* This channel is created by `Channel(Channel.CONFLATED)` factory function invocation.
*/
internal open class ConflatedChannel<E> : AbstractChannel<E>() {
protected final override val isBufferAlwaysEmpty: Boolean get() = false
protected final override val isBufferEmpty: Boolean get() = value === EMPTY
protected final override val isBufferAlwaysFull: Boolean get() = false
protected final override val isBufferFull: Boolean get() = false
override val isEmpty: Boolean get() = lock.withLock { isEmptyImpl }
private val lock = ReentrantLock()
private var value: Any? = EMPTY
private companion object {
@SharedImmutable
private val EMPTY = Symbol("EMPTY")
}
// result is `OFFER_SUCCESS | Closed`
protected override fun offerInternal(element: E): Any {
var receive: ReceiveOrClosed<E>? = null
lock.withLock {
closedForSend?.let { return it }
// if there is no element written in buffer
if (value === EMPTY) {
// check for receivers that were waiting on the empty buffer
loop@ while(true) {
receive = takeFirstReceiveOrPeekClosed() ?: break@loop // break when no receivers queued
if (receive is Closed) {
return receive!!
}
val token = receive!!.tryResumeReceive(element, null)
if (token != null) {
assert { token === RESUME_TOKEN }
return@withLock
}
}
}
value = element
return OFFER_SUCCESS
}
// breaks here if offer meets receiver
receive!!.completeResumeReceive(element)
return receive!!.offerResult
}
// result is `ALREADY_SELECTED | OFFER_SUCCESS | Closed`
protected override fun offerSelectInternal(element: E, select: SelectInstance<*>): Any {
var receive: ReceiveOrClosed<E>? = null
lock.withLock {
closedForSend?.let { return it }
if (value === EMPTY) {
loop@ while(true) {
val offerOp = describeTryOffer(element)
val failure = select.performAtomicTrySelect(offerOp)
when {
failure == null -> { // offered successfully
receive = offerOp.result
return@withLock
}
failure === OFFER_FAILED -> break@loop // cannot offer -> Ok to queue to buffer
failure === RETRY_ATOMIC -> {} // retry
failure === ALREADY_SELECTED || failure is Closed<*> -> return failure
else -> error("performAtomicTrySelect(describeTryOffer) returned $failure")
}
}
}
// try to select sending this element to buffer
if (!select.trySelect()) {
return ALREADY_SELECTED
}
value = element
return OFFER_SUCCESS
}
// breaks here if offer meets receiver
receive!!.completeResumeReceive(element)
return receive!!.offerResult
}
// result is `E | POLL_FAILED | Closed`
protected override fun pollInternal(): Any? {
var result: Any? = null
lock.withLock {
if (value === EMPTY) return closedForSend ?: POLL_FAILED
result = value
value = EMPTY
}
return result
}
// result is `E | POLL_FAILED | Closed`
protected override fun pollSelectInternal(select: SelectInstance<*>): Any? {
var result: Any? = null
lock.withLock {
if (value === EMPTY) return closedForSend ?: POLL_FAILED
if (!select.trySelect())
return ALREADY_SELECTED
result = value
value = EMPTY
}
return result
}
protected override fun onCancelIdempotent(wasClosed: Boolean) {
if (wasClosed) {
lock.withLock {
value = EMPTY
}
}
super.onCancelIdempotent(wasClosed)
}
override fun enqueueReceiveInternal(receive: Receive<E>): Boolean = lock.withLock {
super.enqueueReceiveInternal(receive)
}
// ------ debug ------
override val bufferDebugString: String
get() = "(value=$value)"
}