Skip to content

Commit 948c7e3

Browse files
committed
Non-linearizable implementation of PublisherCoroutine.onSend that isn't using Mutex.onLock
1 parent 9411419 commit 948c7e3

File tree

2 files changed

+18
-2
lines changed

2 files changed

+18
-2
lines changed

reactive/kotlinx-coroutines-reactive/src/Publish.kt

+14-1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package kotlinx.coroutines.reactive
66
import kotlinx.atomicfu.*
77
import kotlinx.coroutines.*
88
import kotlinx.coroutines.channels.*
9+
import kotlinx.coroutines.intrinsics.*
910
import kotlinx.coroutines.selects.*
1011
import kotlinx.coroutines.sync.*
1112
import org.reactivestreams.*
@@ -104,10 +105,22 @@ public class PublisherCoroutine<in T>(
104105
// registerSelectSend
105106
@Suppress("PARAMETER_NAME_CHANGED_ON_OVERRIDE")
106107
override fun <R> registerSelectClause2(select: SelectInstance<R>, element: T, block: suspend (SendChannel<T>) -> R) {
107-
mutex.onLock.registerSelectClause2(select, null) {
108+
val clause = suspend {
108109
doLockedNext(element)?.let { throw it }
109110
block(this)
110111
}
112+
113+
// TODO discuss it
114+
launch(start = CoroutineStart.UNDISPATCHED) {
115+
mutex.lock()
116+
// Already selected -- bail out
117+
if (!select.trySelect()) {
118+
mutex.unlock()
119+
return@launch
120+
}
121+
122+
clause.startCoroutineCancellable(select.completion)
123+
}
111124
}
112125

113126
/*

reactive/kotlinx-coroutines-reactive/test/PublishTest.kt

+4-1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import kotlinx.coroutines.*
88
import kotlinx.coroutines.CancellationException
99
import kotlinx.coroutines.channels.*
1010
import kotlinx.coroutines.flow.*
11+
import kotlinx.coroutines.sync.*
1112
import org.junit.Test
1213
import org.reactivestreams.*
1314
import java.util.concurrent.*
@@ -308,13 +309,15 @@ class PublishTest : TestBase() {
308309
}
309310

310311
expect(1)
312+
val collectorLatch = Mutex(true)
311313
val job = launch {
312314
published.asFlow().buffer(0).collect {
313315
expect(4)
316+
collectorLatch.unlock()
314317
hang { expect(6) }
315318
}
316319
}
317-
yield()
320+
collectorLatch.lock()
318321
expect(5)
319322
job.cancelAndJoin()
320323
latch.await()

0 commit comments

Comments
 (0)