Skip to content

Mutex.onLock deprecation #2850

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Oct 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions benchmarks/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,14 @@ tasks.named<Jar>("jmhJar") {
}

dependencies {
compile("org.openjdk.jmh:jmh-core:1.26")
compile("io.projectreactor:reactor-core:${version("reactor")}")
compile("io.reactivex.rxjava2:rxjava:2.1.9")
compile("com.github.akarnokd:rxjava2-extensions:0.20.8")
implementation("org.openjdk.jmh:jmh-core:1.26")
implementation("io.projectreactor:reactor-core:${version("reactor")}")
implementation("io.reactivex.rxjava2:rxjava:2.1.9")
implementation("com.github.akarnokd:rxjava2-extensions:0.20.8")

compile("com.typesafe.akka:akka-actor_2.12:2.5.0")
compile(project(":kotlinx-coroutines-core"))
implementation("com.typesafe.akka:akka-actor_2.12:2.5.0")
implementation(project(":kotlinx-coroutines-core"))
implementation(project(":kotlinx-coroutines-reactive"))

// add jmh dependency on main
"jmhImplementation"(sourceSets.main.get().runtimeClasspath)
Expand Down
3 changes: 0 additions & 3 deletions kotlinx-coroutines-core/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ helper function. [NonCancellable] job object is provided to suppress cancellatio
| [SendChannel][kotlinx.coroutines.channels.SendChannel] | [send][kotlinx.coroutines.channels.SendChannel.send] | [onSend][kotlinx.coroutines.channels.SendChannel.onSend] | [trySend][kotlinx.coroutines.channels.SendChannel.trySend]
| [ReceiveChannel][kotlinx.coroutines.channels.ReceiveChannel] | [receive][kotlinx.coroutines.channels.ReceiveChannel.receive] | [onReceive][kotlinx.coroutines.channels.ReceiveChannel.onReceive] | [tryReceive][kotlinx.coroutines.channels.ReceiveChannel.tryReceive]
| [ReceiveChannel][kotlinx.coroutines.channels.ReceiveChannel] | [receiveCatching][kotlinx.coroutines.channels.receiveCatching] | [onReceiveCatching][kotlinx.coroutines.channels.onReceiveCatching] | [tryReceive][kotlinx.coroutines.channels.ReceiveChannel.tryReceive]
| [Mutex][kotlinx.coroutines.sync.Mutex] | [lock][kotlinx.coroutines.sync.Mutex.lock] | [onLock][kotlinx.coroutines.sync.Mutex.onLock] | [tryLock][kotlinx.coroutines.sync.Mutex.tryLock]
| none | [delay][kotlinx.coroutines.delay] | [onTimeout][kotlinx.coroutines.selects.SelectBuilder.onTimeout] | none

# Package kotlinx.coroutines
Expand Down Expand Up @@ -121,8 +120,6 @@ Obsolete and deprecated module to test coroutines. Replaced with `kotlinx-corout

[kotlinx.coroutines.sync.Mutex]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/index.html
[kotlinx.coroutines.sync.Mutex.lock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/lock.html
[kotlinx.coroutines.sync.Mutex.onLock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/on-lock.html
[kotlinx.coroutines.sync.Mutex.tryLock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/try-lock.html

<!--- INDEX kotlinx.coroutines.channels -->

Expand Down
2 changes: 0 additions & 2 deletions kotlinx-coroutines-core/common/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ helper function. [NonCancellable] job object is provided to suppress cancellatio
| [SendChannel][kotlinx.coroutines.channels.SendChannel] | [send][kotlinx.coroutines.channels.SendChannel.send] | [onSend][kotlinx.coroutines.channels.SendChannel.onSend] | [trySend][kotlinx.coroutines.channels.SendChannel.trySend]
| [ReceiveChannel][kotlinx.coroutines.channels.ReceiveChannel] | [receive][kotlinx.coroutines.channels.ReceiveChannel.receive] | [onReceive][kotlinx.coroutines.channels.ReceiveChannel.onReceive] | [tryReceive][kotlinx.coroutines.channels.ReceiveChannel.tryReceive]
| [ReceiveChannel][kotlinx.coroutines.channels.ReceiveChannel] | [receiveCatching][kotlinx.coroutines.channels.ReceiveChannel.receiveCatching] | [onReceiveCatching][kotlinx.coroutines.channels.ReceiveChannel.onReceiveCatching] | [tryReceive][kotlinx.coroutines.channels.ReceiveChannel.tryReceive]
| [Mutex][kotlinx.coroutines.sync.Mutex] | [lock][kotlinx.coroutines.sync.Mutex.lock] | [onLock][kotlinx.coroutines.sync.Mutex.onLock] | [tryLock][kotlinx.coroutines.sync.Mutex.tryLock]
| none | [delay] | [onTimeout][kotlinx.coroutines.selects.SelectBuilder.onTimeout] | none

This module provides debugging facilities for coroutines (run JVM with `-ea` or `-Dkotlinx.coroutines.debug` options)
Expand Down Expand Up @@ -131,7 +130,6 @@ Low-level primitives for finer-grained control of coroutines.

[kotlinx.coroutines.sync.Mutex]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/index.html
[kotlinx.coroutines.sync.Mutex.lock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/lock.html
[kotlinx.coroutines.sync.Mutex.onLock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/on-lock.html
[kotlinx.coroutines.sync.Mutex.tryLock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/try-lock.html

<!--- INDEX kotlinx.coroutines.channels -->
Expand Down
1 change: 0 additions & 1 deletion kotlinx-coroutines-core/common/src/channels/Channel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ public interface SendChannel<in E> {
*/
public val onSend: SelectClause2<E, SendChannel<E>>


/**
* Immediately adds the specified [element] to this channel, if this doesn't violate its capacity restrictions,
* and returns the successful result. Otherwise, returns failed or closed result.
Expand Down
1 change: 0 additions & 1 deletion kotlinx-coroutines-core/common/src/selects/Select.kt
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,6 @@ public interface SelectInstance<in R> {
* | [SendChannel] | [send][SendChannel.send] | [onSend][SendChannel.onSend]
* | [ReceiveChannel] | [receive][ReceiveChannel.receive] | [onReceive][ReceiveChannel.onReceive]
* | [ReceiveChannel] | [receiveCatching][ReceiveChannel.receiveCatching] | [onReceiveCatching][ReceiveChannel.onReceiveCatching]
* | [Mutex] | [lock][Mutex.lock] | [onLock][Mutex.onLock]
* | none | [delay] | [onTimeout][SelectBuilder.onTimeout]
*
* This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
Expand Down
9 changes: 4 additions & 5 deletions kotlinx-coroutines-core/common/src/sync/Mutex.kt
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ public interface Mutex {
* Note that this function does not check for cancellation when it is not suspended.
* Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
*
* This function can be used in [select] invocation with [onLock] clause.
* Use [tryLock] to try acquire lock without waiting.
* Use [tryLock] to try acquiring a lock without waiting.
*
* This function is fair; suspended callers are resumed in first-in-first-out order.
*
Expand All @@ -63,10 +62,10 @@ public interface Mutex {
public suspend fun lock(owner: Any? = null)

/**
* Clause for [select] expression of [lock] suspending function that selects when the mutex is locked.
* Additional parameter for the clause in the `owner` (see [lock]) and when the clause is selected
* the reference to this mutex is passed into the corresponding block.
* Deprecated for removal without built-in replacement.
*/
@Deprecated(level = DeprecationLevel.WARNING, message = "Mutex.onLock deprecated without replacement. " +
"For additional details please refer to #2794")
public val onLock: SelectClause2<Any?, Mutex>

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ package kotlinx.coroutines.channels
import kotlinx.coroutines.*
import org.junit.*

class RandevouzChannelStressTest : TestBase() {
class RendezvousChannelStressTest : TestBase() {

@Test
fun testStress() = runTest {
Expand Down
14 changes: 13 additions & 1 deletion reactive/kotlinx-coroutines-reactive/src/Publish.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package kotlinx.coroutines.reactive
import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.intrinsics.*
import kotlinx.coroutines.selects.*
import kotlinx.coroutines.sync.*
import org.reactivestreams.*
Expand Down Expand Up @@ -104,10 +105,21 @@ public class PublisherCoroutine<in T>(
// registerSelectSend
@Suppress("PARAMETER_NAME_CHANGED_ON_OVERRIDE")
override fun <R> registerSelectClause2(select: SelectInstance<R>, element: T, block: suspend (SendChannel<T>) -> R) {
mutex.onLock.registerSelectClause2(select, null) {
val clause = suspend {
doLockedNext(element)?.let { throw it }
block(this)
}

launch(start = CoroutineStart.UNDISPATCHED) {
mutex.lock()
// Already selected -- bail out
if (!select.trySelect()) {
mutex.unlock()
return@launch
}

clause.startCoroutineCancellable(select.completion)
}
}

/*
Expand Down
38 changes: 37 additions & 1 deletion reactive/kotlinx-coroutines-reactive/test/PublishTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,13 @@
package kotlinx.coroutines.reactive

import kotlinx.coroutines.*
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.sync.*
import org.junit.Test
import org.reactivestreams.*
import java.util.concurrent.*
import kotlin.test.*

class PublishTest : TestBase() {
Expand Down Expand Up @@ -284,4 +288,36 @@ class PublishTest : TestBase() {
}
assertEquals("OK", publisher.awaitFirstOrNull())
}
}

@Test
fun testOnSendCancelled() = runTest {
val latch = CountDownLatch(1)
val published = publish(Dispatchers.Default) {
expect(2)
// Collector is ready
send(1)
try {
send(2)
expectUnreached()
} catch (e: CancellationException) {
// publisher cancellation is async
latch.countDown()
throw e
}
}

expect(1)
val collectorLatch = Mutex(true)
val job = launch {
published.asFlow().buffer(0).collect {
collectorLatch.unlock()
hang { expect(4) }
}
}
collectorLatch.lock()
expect(3)
job.cancelAndJoin()
latch.await()
finish(5)
}
}
25 changes: 24 additions & 1 deletion reactive/kotlinx-coroutines-reactive/test/PublisherMultiTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package kotlinx.coroutines.reactive

import kotlinx.coroutines.*
import kotlinx.coroutines.selects.*
import org.junit.Test
import kotlin.test.*

Expand All @@ -16,7 +17,7 @@ class PublisherMultiTest : TestBase() {
// concurrent emitters (many coroutines)
val jobs = List(n) {
// launch
launch {
launch(Dispatchers.Default) {
send(it)
}
}
Expand All @@ -28,4 +29,26 @@ class PublisherMultiTest : TestBase() {
}
assertEquals(n, resultSet.size)
}

@Test
fun testConcurrentStressOnSend() = runBlocking {
val n = 10_000 * stressTestMultiplier
val observable = publish<Int> {
// concurrent emitters (many coroutines)
val jobs = List(n) {
// launch
launch(Dispatchers.Default) {
select<Unit> {
onSend(it) {}
}
}
}
jobs.forEach { it.join() }
}
val resultSet = mutableSetOf<Int>()
observable.collect {
assertTrue(resultSet.add(it))
}
assertEquals(n, resultSet.size)
}
}
15 changes: 14 additions & 1 deletion reactive/kotlinx-coroutines-rx2/src/RxObservable.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.internal.*
import kotlinx.coroutines.intrinsics.*
import kotlinx.coroutines.selects.*
import kotlinx.coroutines.sync.*
import kotlin.coroutines.*
Expand Down Expand Up @@ -95,10 +96,22 @@ private class RxObservableCoroutine<T : Any>(
element: T,
block: suspend (SendChannel<T>) -> R
) {
mutex.onLock.registerSelectClause2(select, null) {
val clause = suspend {
doLockedNext(element)?.let { throw it }
block(this)
}

// This is the default replacement proposed in onLock replacement
launch(start = CoroutineStart.UNDISPATCHED) {
mutex.lock()
// Already selected -- bail out
if (!select.trySelect()) {
mutex.unlock()
return@launch
}

clause.startCoroutineCancellable(select.completion)
}
}

// assert: mutex.isLocked()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import kotlin.coroutines.*
class ObservableCompletionStressTest : TestBase() {
private val N_REPEATS = 10_000 * stressTestMultiplier

private fun CoroutineScope.range(context: CoroutineContext, start: Int, count: Int) = rxObservable(context) {
private fun range(context: CoroutineContext, start: Int, count: Int) = rxObservable(context) {
for (x in start until start + count) send(x)
}

Expand All @@ -33,4 +33,4 @@ class ObservableCompletionStressTest : TestBase() {
}
}
}
}
}
26 changes: 25 additions & 1 deletion reactive/kotlinx-coroutines-rx2/test/ObservableMultiTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package kotlinx.coroutines.rx2

import io.reactivex.*
import kotlinx.coroutines.*
import kotlinx.coroutines.selects.*
import org.junit.Test
import java.io.*
import kotlin.test.*
Expand Down Expand Up @@ -47,6 +48,29 @@ class ObservableMultiTest : TestBase() {
}
}

@Test
fun testConcurrentStressOnSend() {
val n = 10_000 * stressTestMultiplier
val observable = rxObservable<Int> {
newCoroutineContext(coroutineContext)
// concurrent emitters (many coroutines)
val jobs = List(n) {
// launch
launch(Dispatchers.Default) {
val i = it
select<Unit> {
onSend(i) {}
}
}
}
jobs.forEach { it.join() }
}
checkSingleValue(observable.toList()) { list ->
assertEquals(n, list.size)
assertEquals((0 until n).toList(), list.sorted())
}
}

@Test
fun testIteratorResendUnconfined() {
val n = 10_000 * stressTestMultiplier
Expand Down Expand Up @@ -88,4 +112,4 @@ class ObservableMultiTest : TestBase() {
assertEquals("OK", it)
}
}
}
}
15 changes: 14 additions & 1 deletion reactive/kotlinx-coroutines-rx3/src/RxObservable.kt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import kotlinx.coroutines.selects.*
import kotlinx.coroutines.sync.*
import kotlin.coroutines.*
import kotlinx.coroutines.internal.*
import kotlinx.coroutines.intrinsics.*

/**
* Creates cold [observable][Observable] that will run a given [block] in a coroutine.
Expand Down Expand Up @@ -95,10 +96,22 @@ private class RxObservableCoroutine<T : Any>(
element: T,
block: suspend (SendChannel<T>) -> R
) {
mutex.onLock.registerSelectClause2(select, null) {
val clause = suspend {
doLockedNext(element)?.let { throw it }
block(this)
}

// This is the default replacement proposed in onLock replacement
launch(start = CoroutineStart.UNDISPATCHED) {
mutex.lock()
// Already selected -- bail out
if (!select.trySelect()) {
mutex.unlock()
return@launch
}

clause.startCoroutineCancellable(select.completion)
}
}

// assert: mutex.isLocked()
Expand Down
Loading