@@ -12,14 +12,17 @@ import kotlinx.coroutines.internal.*
12
12
import kotlin.coroutines.*
13
13
import kotlin.coroutines.intrinsics.*
14
14
15
+ internal fun getNull (): Symbol = NULL // Workaround for JS BE bug
16
+
15
17
@PublishedApi
16
18
internal suspend fun <R , T > FlowCollector<R>.combineInternal (
17
19
flows : Array <out Flow <T >>,
18
20
arrayFactory : () -> Array <T ?>,
19
21
transform : suspend FlowCollector <R >.(Array <T >) -> Unit
20
22
): Unit = flowScope { // flow scope so any cancellation within the source flow will cancel the whole scope
21
23
val size = flows.size
22
- val latestValues = Array <Any ?>(size) { NULL }
24
+ if (size == 0 ) return @flowScope // bail-out for empty input
25
+ val latestValues = Array <Any ?>(size) { getNull() }
23
26
val isClosed = Array (size) { false }
24
27
val resultChannel = Channel <Array <T >>(Channel .CONFLATED )
25
28
val nonClosed = LocalAtomicInt (size)
@@ -31,11 +34,11 @@ internal suspend fun <R, T> FlowCollector<R>.combineInternal(
31
34
flows[i].collect { value ->
32
35
val previous = latestValues[i]
33
36
latestValues[i] = value
34
- if (previous == = NULL ) remainingAbsentValues.decrementAndGet()
37
+ if (previous == = getNull() ) remainingAbsentValues.decrementAndGet()
35
38
if (remainingAbsentValues.value == 0 ) {
36
39
val results = arrayFactory()
37
40
for (index in 0 until size) {
38
- results[index] = NULL .unbox(latestValues[index])
41
+ results[index] = getNull() .unbox(latestValues[index])
39
42
}
40
43
// NB: here actually "stale" array can overwrite a fresh one and break linearizability
41
44
resultChannel.send(results as Array <T >)
@@ -98,7 +101,7 @@ internal fun <T1, T2, R> zipImpl(flow: Flow<T1>, flow2: Flow<T2>, transform: sus
98
101
flow.collect { value ->
99
102
val otherValue = second.receiveOrNull() ? : return @collect
100
103
withContextUndispatched(newContext, cnt) {
101
- emit(transform(NULL .unbox(value), NULL .unbox(otherValue)))
104
+ emit(transform(getNull() .unbox(value), getNull() .unbox(otherValue)))
102
105
}
103
106
ensureActive()
104
107
}
@@ -127,6 +130,6 @@ private suspend fun withContextUndispatched(
127
130
// Channel has any type due to onReceiveOrNull. This will be fixed after receiveOrClosed
128
131
private fun CoroutineScope.asChannel (flow : Flow <* >): ReceiveChannel <Any > = produce {
129
132
flow.collect { value ->
130
- return @collect channel.send(value ? : NULL )
133
+ return @collect channel.send(value ? : getNull() )
131
134
}
132
135
}
0 commit comments