File tree 1 file changed +4
-4
lines changed
kotlinx-coroutines-core/common/src/flow/operators
1 file changed +4
-4
lines changed Original file line number Diff line number Diff line change @@ -13,6 +13,7 @@ import kotlinx.coroutines.channels.*
13
13
import kotlinx.coroutines.channels.Channel.Factory.OPTIONAL_CHANNEL
14
14
import kotlinx.coroutines.flow.internal.*
15
15
import kotlinx.coroutines.internal.*
16
+ import kotlinx.coroutines.sync.*
16
17
import kotlin.coroutines.*
17
18
import kotlin.jvm.*
18
19
import kotlinx.coroutines.flow.unsafeFlow as flow
@@ -149,16 +150,15 @@ private class ChannelFlowMerge<T>(
149
150
150
151
// The actual merge implementation with concurrency limit
151
152
private suspend fun mergeImpl (scope : CoroutineScope , collector : ConcurrentFlowCollector <T >) {
152
- val semaphore = Channel < Unit > (concurrency)
153
+ val semaphore = Semaphore (concurrency)
153
154
@Suppress(" UNCHECKED_CAST" )
154
155
flow.collect { inner ->
155
- // TODO real semaphore (#94)
156
- semaphore.send(Unit ) // Acquire concurrency permit
156
+ semaphore.acquire() // Acquire concurrency permit
157
157
scope.launch {
158
158
try {
159
159
inner.collect(collector)
160
160
} finally {
161
- semaphore.receive () // Release concurrency permit
161
+ semaphore.release () // Release concurrency permit
162
162
}
163
163
}
164
164
}
You can’t perform that action at this time.
0 commit comments