Skip to content

Commit 5f01641

Browse files
committed
~more small optimizations
* Do not allocate array on each transform call in combine, do it only for combineTransform * Simplify zip operator even further * Get rid of crossinline in combine to fix weird Android crashes Fixes #1743 Fixes #1683
1 parent c2d0707 commit 5f01641

File tree

4 files changed

+106
-58
lines changed

4 files changed

+106
-58
lines changed

kotlinx-coroutines-core/common/src/flow/Migration.kt

+7-7
Original file line numberDiff line numberDiff line change
@@ -367,35 +367,35 @@ public fun <T1, T2, R> Flow<T1>.combineLatest(other: Flow<T2>, transform: suspen
367367
message = "Flow analogue of 'combineLatest' is 'combine'",
368368
replaceWith = ReplaceWith("combine(this, other, other2, transform)")
369369
)
370-
public inline fun <T1, T2, T3, R> Flow<T1>.combineLatest(
370+
public fun <T1, T2, T3, R> Flow<T1>.combineLatest(
371371
other: Flow<T2>,
372372
other2: Flow<T3>,
373-
crossinline transform: suspend (T1, T2, T3) -> R
373+
transform: suspend (T1, T2, T3) -> R
374374
) = combine(this, other, other2, transform)
375375

376376
@Deprecated(
377377
level = DeprecationLevel.ERROR,
378378
message = "Flow analogue of 'combineLatest' is 'combine'",
379379
replaceWith = ReplaceWith("combine(this, other, other2, other3, transform)")
380380
)
381-
public inline fun <T1, T2, T3, T4, R> Flow<T1>.combineLatest(
381+
public fun <T1, T2, T3, T4, R> Flow<T1>.combineLatest(
382382
other: Flow<T2>,
383383
other2: Flow<T3>,
384384
other3: Flow<T4>,
385-
crossinline transform: suspend (T1, T2, T3, T4) -> R
385+
transform: suspend (T1, T2, T3, T4) -> R
386386
) = combine(this, other, other2, other3, transform)
387387

388388
@Deprecated(
389389
level = DeprecationLevel.ERROR,
390390
message = "Flow analogue of 'combineLatest' is 'combine'",
391391
replaceWith = ReplaceWith("combine(this, other, other2, other3, transform)")
392392
)
393-
public inline fun <T1, T2, T3, T4, T5, R> Flow<T1>.combineLatest(
393+
public fun <T1, T2, T3, T4, T5, R> Flow<T1>.combineLatest(
394394
other: Flow<T2>,
395395
other2: Flow<T3>,
396396
other3: Flow<T4>,
397397
other4: Flow<T5>,
398-
crossinline transform: suspend (T1, T2, T3, T4, T5) -> R
398+
transform: suspend (T1, T2, T3, T4, T5) -> R
399399
): Flow<R> = combine(this, other, other2, other3, other4, transform)
400400

401401
/**
@@ -482,4 +482,4 @@ public fun <T> Flow<T>.replay(bufferSize: Int): Flow<T> = noImpl()
482482
message = "Flow analogue of 'cache()' is 'shareIn' with unlimited replay and 'started = SharingStared.Lazily' argument'",
483483
replaceWith = ReplaceWith("this.shareIn(scope, Int.MAX_VALUE, started = SharingStared.Lazily)")
484484
)
485-
public fun <T> Flow<T>.cache(): Flow<T> = noImpl()
485+
public fun <T> Flow<T>.cache(): Flow<T> = noImpl()

kotlinx-coroutines-core/common/src/flow/internal/Combine.kt

+23-20
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,13 @@ private typealias Update = IndexedValue<Any?>
1717
@PublishedApi
1818
internal suspend fun <R, T> FlowCollector<R>.combineInternal(
1919
flows: Array<out Flow<T>>,
20-
arrayFactory: () -> Array<T?>, // Array factory is required to workaround array typing on JVM
20+
arrayFactory: () -> Array<T?>?, // Array factory is required to workaround array typing on JVM
2121
transform: suspend FlowCollector<R>.(Array<T>) -> Unit
2222
): Unit = flowScope { // flow scope so any cancellation within the source flow will cancel the whole scope
2323
val size = flows.size
2424
if (size == 0) return@flowScope // bail-out for empty input
2525
val latestValues = arrayOfNulls<Any?>(size)
2626
latestValues.fill(UNINITIALIZED) // Smaller bytecode & faster that Array(size) { UNINITIALIZED }
27-
val isClosed = BooleanArray(size)
2827
val resultChannel = Channel<Update>(flows.size)
2928
val nonClosed = LocalAtomicInt(size)
3029
var remainingAbsentValues = size
@@ -37,7 +36,6 @@ internal suspend fun <R, T> FlowCollector<R>.combineInternal(
3736
yield() // Emulate fairness, giving each flow chance to emit
3837
}
3938
} finally {
40-
isClosed[i] = true
4139
// Close the channel when there is no more flows
4240
if (nonClosed.decrementAndGet() == 0) {
4341
resultChannel.close()
@@ -72,17 +70,30 @@ internal suspend fun <R, T> FlowCollector<R>.combineInternal(
7270

7371
// Process batch result if there is enough data
7472
if (remainingAbsentValues == 0) {
73+
/*
74+
* If arrayFactory returns null, then we can avoid array copy because
75+
* it's our own safe transformer that immediately deconstructs the array
76+
*/
7577
val results = arrayFactory()
76-
(latestValues as Array<T?>).copyInto(results)
77-
transform(results as Array<T>)
78+
if (results == null) {
79+
transform(latestValues as Array<T>)
80+
} else {
81+
(latestValues as Array<T?>).copyInto(results)
82+
transform(results as Array<T>)
83+
}
7884
}
7985
}
8086
}
8187

8288
internal fun <T1, T2, R> zipImpl(flow: Flow<T1>, flow2: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> =
8389
unsafeFlow {
8490
coroutineScope {
85-
val second = asChannel(flow2)
91+
val second = produce<Any> {
92+
flow2.collect { value ->
93+
return@collect channel.send(value ?: NULL)
94+
}
95+
}
96+
8697
/*
8798
* This approach only works with rendezvous channel and is required to enforce correctness
8899
* in the following scenario:
@@ -95,14 +106,11 @@ internal fun <T1, T2, R> zipImpl(flow: Flow<T1>, flow2: Flow<T2>, transform: sus
95106
* Invariant: this clause is invoked only when all elements from the channel were processed (=> rendezvous restriction).
96107
*/
97108
val collectJob = Job()
98-
val scopeJob = currentCoroutineContext()[Job]!! // TODO replace with extension when #2245 is here
99109
(second as SendChannel<*>).invokeOnClose {
100110
// Optimization to avoid AFE allocation when the other flow is done
101111
if (collectJob.isActive) collectJob.cancel(AbortFlowException(this@unsafeFlow))
102112
}
103113

104-
val newContext = coroutineContext + scopeJob
105-
val cnt = threadContextElements(newContext)
106114
try {
107115
/*
108116
* Non-trivial undispatched (because we are in the right context and there is no structured concurrency)
@@ -116,18 +124,20 @@ internal fun <T1, T2, R> zipImpl(flow: Flow<T1>, flow2: Flow<T2>, transform: sus
116124
* with coroutines scope via a channel, but it's way too expensive, so
117125
* we are using this trick instead.
118126
*/
119-
withContextUndispatched( coroutineContext + collectJob) {
127+
val scopeContext = coroutineContext
128+
val cnt = threadContextElements(scopeContext)
129+
withContextUndispatched(coroutineContext + collectJob) {
120130
flow.collect { value ->
121-
withContextUndispatched(newContext, cnt) {
131+
withContextUndispatched(scopeContext, cnt) {
122132
val otherValue = second.receiveOrNull() ?: throw AbortFlowException(this@unsafeFlow)
123-
emit(transform(NULL.unbox(value), NULL.unbox(otherValue)))
133+
emit(transform(value, NULL.unbox(otherValue)))
124134
}
125135
}
126136
}
127137
} catch (e: AbortFlowException) {
128138
e.checkOwnership(owner = this@unsafeFlow)
129139
} finally {
130-
if (!second.isClosedForReceive) second.cancel(AbortFlowException(this@unsafeFlow))
140+
if (!second.isClosedForReceive) second.cancel()
131141
}
132142
}
133143
}
@@ -144,10 +154,3 @@ private suspend fun withContextUndispatched(
144154
})
145155
}
146156
}
147-
148-
// Channel has any type due to onReceiveOrNull. This will be fixed after receiveOrClosed
149-
private fun CoroutineScope.asChannel(flow: Flow<*>): ReceiveChannel<Any> = produce {
150-
flow.collect { value ->
151-
return@collect channel.send(value ?: NULL)
152-
}
153-
}

kotlinx-coroutines-core/common/src/flow/operators/Zip.kt

+45-20
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import kotlinx.coroutines.flow.internal.unsafeFlow as flow
3030
*/
3131
@JvmName("flowCombine")
3232
public fun <T1, T2, R> Flow<T1>.combine(flow: Flow<T2>, transform: suspend (a: T1, b: T2) -> R): Flow<R> = flow {
33-
combineInternal(arrayOf(this@combine, flow), { arrayOfNulls(2) }, { emit(transform(it[0] as T1, it[1] as T2)) })
33+
combineInternal(arrayOf(this@combine, flow), nullArrayFactory(), { emit(transform(it[0] as T1, it[1] as T2)) })
3434
}
3535

3636
/**
@@ -72,7 +72,7 @@ public fun <T1, T2, R> combine(flow: Flow<T1>, flow2: Flow<T2>, transform: suspe
7272
public fun <T1, T2, R> Flow<T1>.combineTransform(
7373
flow: Flow<T2>,
7474
@BuilderInference transform: suspend FlowCollector<R>.(a: T1, b: T2) -> Unit
75-
): Flow<R> = combineTransform(this, flow) { args: Array<*> ->
75+
): Flow<R> = combineTransformUnsafe(this, flow) { args: Array<*> ->
7676
transform(
7777
args[0] as T1,
7878
args[1] as T2
@@ -100,7 +100,7 @@ public fun <T1, T2, R> combineTransform(
100100
flow: Flow<T1>,
101101
flow2: Flow<T2>,
102102
@BuilderInference transform: suspend FlowCollector<R>.(a: T1, b: T2) -> Unit
103-
): Flow<R> = combineTransform(flow, flow2) { args: Array<*> ->
103+
): Flow<R> = combineTransformUnsafe(flow, flow2) { args: Array<*> ->
104104
transform(
105105
args[0] as T1,
106106
args[1] as T2
@@ -111,12 +111,12 @@ public fun <T1, T2, R> combineTransform(
111111
* Returns a [Flow] whose values are generated with [transform] function by combining
112112
* the most recently emitted values by each flow.
113113
*/
114-
public inline fun <T1, T2, T3, R> combine(
114+
public fun <T1, T2, T3, R> combine(
115115
flow: Flow<T1>,
116116
flow2: Flow<T2>,
117117
flow3: Flow<T3>,
118-
@BuilderInference crossinline transform: suspend (T1, T2, T3) -> R
119-
): Flow<R> = combine(flow, flow2, flow3) { args: Array<*> ->
118+
@BuilderInference transform: suspend (T1, T2, T3) -> R
119+
): Flow<R> = combineUnsafe(flow, flow2, flow3) { args: Array<*> ->
120120
transform(
121121
args[0] as T1,
122122
args[1] as T2,
@@ -130,12 +130,12 @@ public inline fun <T1, T2, T3, R> combine(
130130
* The receiver of the [transform] is [FlowCollector] and thus `transform` is a
131131
* generic function that may transform emitted element, skip it or emit it multiple times.
132132
*/
133-
public inline fun <T1, T2, T3, R> combineTransform(
133+
public fun <T1, T2, T3, R> combineTransform(
134134
flow: Flow<T1>,
135135
flow2: Flow<T2>,
136136
flow3: Flow<T3>,
137-
@BuilderInference crossinline transform: suspend FlowCollector<R>.(T1, T2, T3) -> Unit
138-
): Flow<R> = combineTransform(flow, flow2, flow3) { args: Array<*> ->
137+
@BuilderInference transform: suspend FlowCollector<R>.(T1, T2, T3) -> Unit
138+
): Flow<R> = combineTransformUnsafe(flow, flow2, flow3) { args: Array<*> ->
139139
transform(
140140
args[0] as T1,
141141
args[1] as T2,
@@ -147,12 +147,12 @@ public inline fun <T1, T2, T3, R> combineTransform(
147147
* Returns a [Flow] whose values are generated with [transform] function by combining
148148
* the most recently emitted values by each flow.
149149
*/
150-
public inline fun <T1, T2, T3, T4, R> combine(
150+
public fun <T1, T2, T3, T4, R> combine(
151151
flow: Flow<T1>,
152152
flow2: Flow<T2>,
153153
flow3: Flow<T3>,
154154
flow4: Flow<T4>,
155-
crossinline transform: suspend (T1, T2, T3, T4) -> R
155+
transform: suspend (T1, T2, T3, T4) -> R
156156
): Flow<R> = combine(flow, flow2, flow3, flow4) { args: Array<*> ->
157157
transform(
158158
args[0] as T1,
@@ -168,13 +168,13 @@ public inline fun <T1, T2, T3, T4, R> combine(
168168
* The receiver of the [transform] is [FlowCollector] and thus `transform` is a
169169
* generic function that may transform emitted element, skip it or emit it multiple times.
170170
*/
171-
public inline fun <T1, T2, T3, T4, R> combineTransform(
171+
public fun <T1, T2, T3, T4, R> combineTransform(
172172
flow: Flow<T1>,
173173
flow2: Flow<T2>,
174174
flow3: Flow<T3>,
175175
flow4: Flow<T4>,
176-
@BuilderInference crossinline transform: suspend FlowCollector<R>.(T1, T2, T3, T4) -> Unit
177-
): Flow<R> = combineTransform(flow, flow2, flow3, flow4) { args: Array<*> ->
176+
@BuilderInference transform: suspend FlowCollector<R>.(T1, T2, T3, T4) -> Unit
177+
): Flow<R> = combineTransformUnsafe(flow, flow2, flow3, flow4) { args: Array<*> ->
178178
transform(
179179
args[0] as T1,
180180
args[1] as T2,
@@ -187,14 +187,14 @@ public inline fun <T1, T2, T3, T4, R> combineTransform(
187187
* Returns a [Flow] whose values are generated with [transform] function by combining
188188
* the most recently emitted values by each flow.
189189
*/
190-
public inline fun <T1, T2, T3, T4, T5, R> combine(
190+
public fun <T1, T2, T3, T4, T5, R> combine(
191191
flow: Flow<T1>,
192192
flow2: Flow<T2>,
193193
flow3: Flow<T3>,
194194
flow4: Flow<T4>,
195195
flow5: Flow<T5>,
196-
crossinline transform: suspend (T1, T2, T3, T4, T5) -> R
197-
): Flow<R> = combine(flow, flow2, flow3, flow4, flow5) { args: Array<*> ->
196+
transform: suspend (T1, T2, T3, T4, T5) -> R
197+
): Flow<R> = combineUnsafe(flow, flow2, flow3, flow4, flow5) { args: Array<*> ->
198198
transform(
199199
args[0] as T1,
200200
args[1] as T2,
@@ -210,14 +210,14 @@ public inline fun <T1, T2, T3, T4, T5, R> combine(
210210
* The receiver of the [transform] is [FlowCollector] and thus `transform` is a
211211
* generic function that may transform emitted element, skip it or emit it multiple times.
212212
*/
213-
public inline fun <T1, T2, T3, T4, T5, R> combineTransform(
213+
public fun <T1, T2, T3, T4, T5, R> combineTransform(
214214
flow: Flow<T1>,
215215
flow2: Flow<T2>,
216216
flow3: Flow<T3>,
217217
flow4: Flow<T4>,
218218
flow5: Flow<T5>,
219-
@BuilderInference crossinline transform: suspend FlowCollector<R>.(T1, T2, T3, T4, T5) -> Unit
220-
): Flow<R> = combineTransform(flow, flow2, flow3, flow4, flow5) { args: Array<*> ->
219+
@BuilderInference transform: suspend FlowCollector<R>.(T1, T2, T3, T4, T5) -> Unit
220+
): Flow<R> = combineTransformUnsafe(flow, flow2, flow3, flow4, flow5) { args: Array<*> ->
221221
transform(
222222
args[0] as T1,
223223
args[1] as T2,
@@ -251,6 +251,31 @@ public inline fun <reified T, R> combineTransform(
251251
combineInternal(flows, { arrayOfNulls(flows.size) }, { transform(it) })
252252
}
253253

254+
/*
255+
* Same as combine, but does not copy array each time, deconstructing existing
256+
* array each time. Used in overloads that accept FunctionN instead of Function<Array<R>>
257+
*/
258+
private inline fun <reified T, R> combineUnsafe(
259+
vararg flows: Flow<T>,
260+
crossinline transform: suspend (Array<T>) -> R
261+
): Flow<R> = flow {
262+
combineInternal(flows, nullArrayFactory(), { emit(transform(it)) })
263+
}
264+
265+
/*
266+
* Same as combineTransform, but does not copy array each time, deconstructing existing
267+
* array each time. Used in overloads that accept FunctionN instead of Function<Array<R>>
268+
*/
269+
private inline fun <reified T, R> combineTransformUnsafe(
270+
vararg flows: Flow<T>,
271+
@BuilderInference crossinline transform: suspend FlowCollector<R>.(Array<T>) -> Unit
272+
): Flow<R> = safeFlow {
273+
combineInternal(flows, nullArrayFactory(), { transform(it) })
274+
}
275+
276+
// Saves bunch of anonymous classes
277+
private fun <T> nullArrayFactory(): () -> Array<T>? = { null }
278+
254279
/**
255280
* Returns a [Flow] whose values are generated with [transform] function by combining
256281
* the most recently emitted values by each flow.

0 commit comments

Comments
 (0)