Skip to content

Commit 9b05908

Browse files
elizarovqwwdfsad
authored andcommitted
Consolidate NullSurrogate with Symbol and rename it to NULL
* JvmStatic on unbox did not make sense anyway * unbox is useful for other symbols
1 parent 15ee8a3 commit 9b05908

File tree

9 files changed

+38
-41
lines changed

9 files changed

+38
-41
lines changed

kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt

+2-3
Original file line numberDiff line numberDiff line change
@@ -978,8 +978,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
978978

979979
@Suppress("UNCHECKED_CAST")
980980
override fun completeResumeReceive(token: Any) {
981-
val value: E = (if (token === NULL_VALUE) null else token) as E
982-
block.startCoroutine(value, select.completion)
981+
block.startCoroutine(NULL_VALUE.unbox<E>(token), select.completion)
983982
}
984983

985984
override fun resumeReceiveClosed(closed: Closed<*>) {
@@ -1035,7 +1034,7 @@ internal val SELECT_STARTED: Any = Symbol("SELECT_STARTED")
10351034

10361035
@JvmField
10371036
@SharedImmutable
1038-
internal val NULL_VALUE: Any = Symbol("NULL_VALUE")
1037+
internal val NULL_VALUE: Symbol = Symbol("NULL_VALUE")
10391038

10401039
@JvmField
10411040
@SharedImmutable

kotlinx-coroutines-core/common/src/channels/ConflatedBroadcastChannel.kt

+4-11
Original file line numberDiff line numberDiff line change
@@ -88,17 +88,10 @@ public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
8888
* The most recently sent element to this channel or `null` when this class is constructed without
8989
* initial value and no value was sent yet or if it was [closed][close].
9090
*/
91-
@Suppress("UNCHECKED_CAST")
92-
public val valueOrNull: E? get() {
93-
val state = _state.value
94-
when (state) {
95-
is Closed -> return null
96-
is State<*> -> {
97-
if (state.value === UNDEFINED) return null
98-
return state.value as E
99-
}
100-
else -> error("Invalid state $state")
101-
}
91+
public val valueOrNull: E? get() = when (val state = _state.value) {
92+
is Closed -> null
93+
is State<*> -> UNDEFINED.unbox(state.value)
94+
else -> error("Invalid state $state")
10295
}
10396

10497
public override val isClosedForSend: Boolean get() = _state.value is Closed

kotlinx-coroutines-core/common/src/flow/internal/NullSurrogate.kt

+8-6
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,13 @@
44

55
package kotlinx.coroutines.flow.internal
66

7+
import kotlinx.coroutines.internal.*
78
import kotlin.jvm.*
89

9-
internal object NullSurrogate {
10-
11-
@JvmStatic
12-
@Suppress("UNCHECKED_CAST")
13-
internal fun <T> unbox(value: Any?): T = if (value === NullSurrogate) null as T else value as T
14-
}
10+
/**
11+
* This value is used a a surrogate `null` value when needed.
12+
* It should never leak to the outside world.
13+
*/
14+
@JvmField
15+
@SharedImmutable
16+
internal val NULL = Symbol("NULL")

kotlinx-coroutines-core/common/src/flow/operators/Delay.kt

+5-5
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public fun <T> Flow<T>.debounce(timeoutMillis: Long): Flow<T> {
6565
val values = Channel<Any?>(Channel.CONFLATED) // Actually Any, KT-30796
6666
// Channel is not closed deliberately as there is no close with value
6767
val collector = async {
68-
collect { value -> values.send(value ?: NullSurrogate) }
68+
collect { value -> values.send(value ?: NULL) }
6969
}
7070

7171
var isDone = false
@@ -79,13 +79,13 @@ public fun <T> Flow<T>.debounce(timeoutMillis: Long): Flow<T> {
7979
lastValue?.let { value -> // set timeout when lastValue != null
8080
onTimeout(timeoutMillis) {
8181
lastValue = null // Consume the value
82-
emit(NullSurrogate.unbox(value))
82+
emit(NULL.unbox(value))
8383
}
8484
}
8585

8686
// Close with value 'idiom'
8787
collector.onAwait {
88-
if (lastValue != null) emit(NullSurrogate.unbox(lastValue))
88+
if (lastValue != null) emit(NULL.unbox(lastValue))
8989
isDone = true
9090
}
9191
}
@@ -115,7 +115,7 @@ public fun <T> Flow<T>.sample(periodMillis: Long): Flow<T> {
115115
return flow {
116116
coroutineScope {
117117
val values = produce<Any?>(capacity = Channel.CONFLATED) { // Actually Any, KT-30796
118-
collect { value -> send(value ?: NullSurrogate) }
118+
collect { value -> send(value ?: NULL) }
119119
}
120120

121121
var isDone = false
@@ -136,7 +136,7 @@ public fun <T> Flow<T>.sample(periodMillis: Long): Flow<T> {
136136
ticker.onReceive {
137137
val value = lastValue ?: return@onReceive
138138
lastValue = null // Consume the value
139-
emit(NullSurrogate.unbox(value))
139+
emit(NULL.unbox(value))
140140
}
141141
}
142142
}

kotlinx-coroutines-core/common/src/flow/operators/Distinct.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ public fun <T> Flow<T>.distinctUntilChanged(): Flow<T> = distinctUntilChangedBy
2525
@FlowPreview
2626
public fun <T, K> Flow<T>.distinctUntilChangedBy(keySelector: (T) -> K): Flow<T> =
2727
flow {
28-
var previousKey: Any? = NullSurrogate
28+
var previousKey: Any? = NULL
2929
collect { value ->
3030
val key = keySelector(value)
3131
if (previousKey != key) {

kotlinx-coroutines-core/common/src/flow/operators/Merge.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ private class SerializingFlatMapCollector<T>(
138138

139139
public suspend fun emit(value: T) {
140140
if (!inProgressLock.tryAcquire()) {
141-
channel.send(value ?: NullSurrogate)
141+
channel.send(value ?: NULL)
142142
if (inProgressLock.tryAcquire()) {
143143
helpEmit()
144144
}
@@ -154,7 +154,7 @@ private class SerializingFlatMapCollector<T>(
154154
while (true) {
155155
var element = channel.poll()
156156
while (element != null) { // TODO receive or closed (#330)
157-
downstream.emit(NullSurrogate.unbox(element))
157+
downstream.emit(NULL.unbox(element))
158158
element = channel.poll()
159159
}
160160

kotlinx-coroutines-core/common/src/flow/operators/Zip.kt

+7-7
Original file line numberDiff line numberDiff line change
@@ -61,14 +61,14 @@ public fun <T1, T2, R> Flow<T1>.combineLatest(other: Flow<T2>, transform: suspen
6161
onReceive(firstIsClosed, firstChannel, { firstIsClosed = true }) { value ->
6262
firstValue = value
6363
if (secondValue !== null) {
64-
emit(transform(NullSurrogate.unbox(firstValue), NullSurrogate.unbox(secondValue)))
64+
emit(transform(NULL.unbox(firstValue), NULL.unbox(secondValue)))
6565
}
6666
}
6767

6868
onReceive(secondIsClosed, secondChannel, { secondIsClosed = true }) { value ->
6969
secondValue = value
7070
if (firstValue !== null) {
71-
emit(transform(NullSurrogate.unbox(firstValue), NullSurrogate.unbox(secondValue)))
71+
emit(transform(NULL.unbox(firstValue), NULL.unbox(secondValue)))
7272
}
7373
}
7474
}
@@ -163,7 +163,7 @@ internal fun <T, R> Flow<T>.combineLatest(vararg others: Flow<T>, arrayFactory:
163163
if (latestValues.all { it !== null }) {
164164
val arguments = arrayFactory()
165165
for (index in 0 until size) {
166-
arguments[index] = NullSurrogate.unbox(latestValues[index])
166+
arguments[index] = NULL.unbox(latestValues[index])
167167
}
168168
emit(transform(arguments as Array<T>))
169169
}
@@ -191,7 +191,7 @@ private inline fun SelectBuilder<Unit>.onReceive(
191191
private fun CoroutineScope.asFairChannel(flow: Flow<*>): ReceiveChannel<Any> = produce {
192192
val channel = channel as ChannelCoroutine<Any>
193193
flow.collect { value ->
194-
channel.sendFair(value ?: NullSurrogate)
194+
channel.sendFair(value ?: NULL)
195195
}
196196
}
197197

@@ -235,8 +235,8 @@ public fun <T1, T2, R> Flow<T1>.zip(other: Flow<T2>, transform: suspend (T1, T2)
235235
if (!otherIterator.hasNext()) {
236236
return@consumeEach
237237
}
238-
val secondValue = NullSurrogate.unbox<T2>(otherIterator.next())
239-
emit(transform(NullSurrogate.unbox(value), NullSurrogate.unbox(secondValue)))
238+
val secondValue = NULL.unbox<T2>(otherIterator.next())
239+
emit(transform(NULL.unbox(value), NULL.unbox(secondValue)))
240240
}
241241
} catch (e: AbortFlowException) {
242242
// complete
@@ -249,6 +249,6 @@ public fun <T1, T2, R> Flow<T1>.zip(other: Flow<T2>, transform: suspend (T1, T2)
249249
// Channel has any type due to onReceiveOrNull. This will be fixed after receiveOrClosed
250250
private fun CoroutineScope.asChannel(flow: Flow<*>): ReceiveChannel<Any> = produce {
251251
flow.collect { value ->
252-
channel.send(value ?: NullSurrogate)
252+
channel.send(value ?: NULL)
253253
}
254254
}

kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt

+6-6
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,18 @@ import kotlin.jvm.*
1818
*/
1919
@FlowPreview
2020
public suspend fun <S, T : S> Flow<T>.reduce(operation: suspend (accumulator: S, value: T) -> S): S {
21-
var accumulator: Any? = NullSurrogate
21+
var accumulator: Any? = NULL
2222

2323
collect { value ->
24-
accumulator = if (accumulator !== NullSurrogate) {
24+
accumulator = if (accumulator !== NULL) {
2525
@Suppress("UNCHECKED_CAST")
2626
operation(accumulator as S, value)
2727
} else {
2828
value
2929
}
3030
}
3131

32-
if (accumulator === NullSurrogate) throw UnsupportedOperationException("Empty flow can't be reduced")
32+
if (accumulator === NULL) throw UnsupportedOperationException("Empty flow can't be reduced")
3333
@Suppress("UNCHECKED_CAST")
3434
return accumulator as S
3535
}
@@ -56,13 +56,13 @@ public suspend inline fun <T, R> Flow<T>.fold(
5656
*/
5757
@FlowPreview
5858
public suspend fun <T> Flow<T>.single(): T {
59-
var result: Any? = NullSurrogate
59+
var result: Any? = NULL
6060
collect { value ->
61-
if (result !== NullSurrogate) error("Expected only one element")
61+
if (result !== NULL) error("Expected only one element")
6262
result = value
6363
}
6464

65-
if (result === NullSurrogate) throw NoSuchElementException("Expected at least one element")
65+
if (result === NULL) throw NoSuchElementException("Expected at least one element")
6666
@Suppress("UNCHECKED_CAST")
6767
return result as T
6868
}

kotlinx-coroutines-core/common/src/internal/Symbol.kt

+3
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,7 @@ package kotlinx.coroutines.internal
1111
*/
1212
internal class Symbol(val symbol: String) {
1313
override fun toString(): String = symbol
14+
15+
@Suppress("UNCHECKED_CAST", "NOTHING_TO_INLINE")
16+
inline fun <T> unbox(value: Any?): T = if (value === this) null as T else value as T
1417
}

0 commit comments

Comments
 (0)