Skip to content

Commit 1b248c1

Browse files
Nikita Kovalqwwdfsad
Nikita Koval
authored andcommitted
Review fixes
1 parent fd2b359 commit 1b248c1

File tree

4 files changed

+97
-86
lines changed

4 files changed

+97
-86
lines changed

kotlinx-coroutines-core/api/kotlinx-coroutines-core.api

+7-5
Original file line numberDiff line numberDiff line change
@@ -1199,18 +1199,17 @@ public abstract interface class kotlinx/coroutines/selects/SelectBuilder {
11991199
public abstract fun invoke (Lkotlinx/coroutines/selects/SelectClause1;Lkotlin/jvm/functions/Function2;)V
12001200
public abstract fun invoke (Lkotlinx/coroutines/selects/SelectClause2;Ljava/lang/Object;Lkotlin/jvm/functions/Function2;)V
12011201
public abstract fun invoke (Lkotlinx/coroutines/selects/SelectClause2;Lkotlin/jvm/functions/Function2;)V
1202-
public abstract synthetic fun onTimeout (JLkotlin/jvm/functions/Function1;)V
1202+
public abstract fun onTimeout (JLkotlin/jvm/functions/Function1;)V
12031203
}
12041204

12051205
public final class kotlinx/coroutines/selects/SelectBuilder$DefaultImpls {
12061206
public static fun invoke (Lkotlinx/coroutines/selects/SelectBuilder;Lkotlinx/coroutines/selects/SelectClause2;Lkotlin/jvm/functions/Function2;)V
1207-
public static synthetic fun onTimeout (Lkotlinx/coroutines/selects/SelectBuilder;JLkotlin/jvm/functions/Function1;)V
1207+
public static fun onTimeout (Lkotlinx/coroutines/selects/SelectBuilder;JLkotlin/jvm/functions/Function1;)V
12081208
}
12091209

12101210
public final class kotlinx/coroutines/selects/SelectBuilderImpl : kotlinx/coroutines/selects/SelectImplementation {
12111211
public fun <init> (Lkotlin/coroutines/Continuation;)V
12121212
public final fun getResult ()Ljava/lang/Object;
1213-
public final fun getUCont ()Lkotlin/coroutines/Continuation;
12141213
public final fun handleBuilderException (Ljava/lang/Throwable;)V
12151214
}
12161215

@@ -1241,7 +1240,7 @@ public class kotlinx/coroutines/selects/SelectImplementation : kotlinx/coroutine
12411240
public fun invoke (Lkotlinx/coroutines/selects/SelectClause1;Lkotlin/jvm/functions/Function2;)V
12421241
public fun invoke (Lkotlinx/coroutines/selects/SelectClause2;Ljava/lang/Object;Lkotlin/jvm/functions/Function2;)V
12431242
public fun invoke (Lkotlinx/coroutines/selects/SelectClause2;Lkotlin/jvm/functions/Function2;)V
1244-
public synthetic fun onTimeout (JLkotlin/jvm/functions/Function1;)V
1243+
public fun onTimeout (JLkotlin/jvm/functions/Function1;)V
12451244
protected final fun register (Lkotlinx/coroutines/selects/SelectImplementation$ClauseData;Z)V
12461245
public static synthetic fun register$default (Lkotlinx/coroutines/selects/SelectImplementation;Lkotlinx/coroutines/selects/SelectImplementation$ClauseData;ZILjava/lang/Object;)V
12471246
public fun selectInRegistrationPhase (Ljava/lang/Object;)V
@@ -1272,13 +1271,16 @@ public final class kotlinx/coroutines/selects/SelectKt {
12721271
public static final fun select (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
12731272
}
12741273

1274+
public final class kotlinx/coroutines/selects/SelectOldKt {
1275+
public static final fun selectOld (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
1276+
}
1277+
12751278
public final class kotlinx/coroutines/selects/SelectUnbiasedKt {
12761279
public static final fun selectUnbiased (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
12771280
}
12781281

12791282
public final class kotlinx/coroutines/selects/UnbiasedSelectBuilderImpl : kotlinx/coroutines/selects/UnbiasedSelectImplementation {
12801283
public fun <init> (Lkotlin/coroutines/Continuation;)V
1281-
public final fun getUCont ()Lkotlin/coroutines/Continuation;
12821284
public final fun handleBuilderException (Ljava/lang/Throwable;)V
12831285
public final fun initSelectResult ()Ljava/lang/Object;
12841286
}

reactive/kotlinx-coroutines-reactive/src/Publish.kt

+30-27
Original file line numberDiff line numberDiff line change
@@ -82,37 +82,40 @@ public class PublisherCoroutine<in T>(
8282
throw UnsupportedOperationException("PublisherCoroutine doesn't support invokeOnClose")
8383

8484
// Mutex is locked when either nRequested == 0 or while subscriber.onXXX is being invoked
85-
private val mutex: Mutex = PublisherCoroutineMutex()
85+
private val mutex: Mutex = Mutex(locked = true)
8686

87-
/**
88-
* To send an element, [mutex] should be locked first, after which [doLockedNext]
89-
* is called -- it releases the mutex at the end. To support the `select` operation
90-
* and implement [onSend], we use a modified version of [Mutex.onLock], which
91-
* invokes [doLockedNext] at the end and returns this [PublisherCoroutine] as result
92-
* (see [PublisherCoroutineMutex.onLockProcessResult]).
93-
*
94-
* We use this dirty hack as we need to wait for a lock, and, therefore, the clause
95-
* object should be a [Mutex] instance. Thus, the [doLockedNext] call must be a part
96-
* of [PublisherCoroutineMutex.onLockProcessResult] as well. A possible alternative
97-
* would be to inherit [PublisherCoroutine] from [Mutex], but it already extends [AbstractCoroutine].
98-
*/
99-
@Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE", "CANNOT_OVERRIDE_INVISIBLE_MEMBER")
100-
private inner class PublisherCoroutineMutex : MutexImpl(locked = true) {
101-
override fun onLockRegFunction(select: SelectInstance<*>, element: Any?) {
102-
super.onLockRegFunction(select, owner = null)
103-
}
87+
@Suppress("UNCHECKED_CAST", "INVISIBLE_MEMBER")
88+
override val onSend: SelectClause2<T, SendChannel<T>> get() = SelectClause2Impl(
89+
clauseObject = this,
90+
regFunc = PublisherCoroutine<*>::registerSelectForSend as RegistrationFunction,
91+
processResFunc = PublisherCoroutine<*>::processResultSelectSend as ProcessResultFunction
92+
)
10493

105-
@Suppress("UNCHECKED_CAST", "RedundantNullableReturnType")
106-
override fun onLockProcessResult(element: Any?, result: Any?): Any? {
107-
super.onLockProcessResult(owner = null, result)
108-
doLockedNext(element as T)?.let { throw it }
109-
return this@PublisherCoroutine
94+
@Suppress("UNCHECKED_CAST", "UNUSED_PARAMETER")
95+
private fun registerSelectForSend(select: SelectInstance<*>, element: Any?) {
96+
// Try to acquire the mutex and complete in the registration phase.
97+
if (mutex.tryLock()) {
98+
select.selectInRegistrationPhase(Unit)
99+
return
100+
}
101+
// Start a new coroutine that waits for the mutex, invoking `trySelect(..)` after that.
102+
// Please note that at the point of the `trySelect(..)` invocation the corresponding
103+
// `select` can still be in the registration phase, making this `trySelect(..)` bound to fail.
104+
// In this case, the `onSend` clause will be re-registered, which alongside with the mutex
105+
// manipulation makes the resulting solution obstruction-free.
106+
launch {
107+
mutex.lock()
108+
if (!select.trySelect(this@PublisherCoroutine, Unit)) {
109+
mutex.unlock()
110+
}
110111
}
111112
}
112113

113-
@Suppress("UNCHECKED_CAST")
114-
override val onSend: SelectClause2<T, SendChannel<T>>
115-
get() = mutex.onLock as SelectClause2<T, SendChannel<T>>
114+
@Suppress("RedundantNullableReturnType", "UNUSED_PARAMETER", "UNCHECKED_CAST")
115+
private fun processResultSelectSend(element: Any?, selectResult: Any?): Any? {
116+
doLockedNext(element as T)?.let { throw it }
117+
return this@PublisherCoroutine
118+
}
116119

117120
override fun trySend(element: T): ChannelResult<Unit> =
118121
if (!mutex.tryLock()) {
@@ -221,7 +224,7 @@ public class PublisherCoroutine<in T>(
221224
* We have to recheck `isCompleted` after `unlock` anyway.
222225
*/
223226
mutex.unlock()
224-
// check isCompleted and and try to regain lock to signal completion
227+
// check isCompleted and try to regain lock to signal completion
225228
if (isCompleted && mutex.tryLock()) {
226229
doLockedSignalCompleted(completionCause, completionCauseHandled)
227230
}

reactive/kotlinx-coroutines-rx2/src/RxObservable.kt

+30-27
Original file line numberDiff line numberDiff line change
@@ -68,37 +68,40 @@ private class RxObservableCoroutine<T : Any>(
6868
throw UnsupportedOperationException("RxObservableCoroutine doesn't support invokeOnClose")
6969

7070
// Mutex is locked when either nRequested == 0 or while subscriber.onXXX is being invoked
71-
private val mutex: Mutex = RxObservableCoroutineMutex()
72-
73-
/**
74-
* To send an element, [mutex] should be locked first, after which [doLockedNext]
75-
* is called -- it releases the mutex at the end. To support the `select` operation
76-
* and implement [onSend], we use a modified version of [Mutex.onLock], which
77-
* invokes [doLockedNext] at the end and returns this [RxObservableCoroutine] as result
78-
* (see [RxObservableCoroutineMutex.onLockProcessResult]).
79-
*
80-
* We use this dirty hack as we need to wait for a lock, and, therefore, the clause
81-
* object should be a [Mutex] instance. Thus, the [doLockedNext] call must be a part
82-
* of [RxObservableCoroutineMutex.onLockProcessResult] as well. A possible alternative
83-
* would be to inherit [RxObservableCoroutine] from [Mutex], but it already extends [AbstractCoroutine].
84-
*/
85-
@Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE", "CANNOT_OVERRIDE_INVISIBLE_MEMBER")
86-
private inner class RxObservableCoroutineMutex : MutexImpl(locked = false) {
87-
override fun onLockRegFunction(select: SelectInstance<*>, element: Any?) {
88-
super.onLockRegFunction(select, owner = null)
71+
private val mutex: Mutex = Mutex()
72+
73+
@Suppress("UNCHECKED_CAST", "INVISIBLE_MEMBER")
74+
override val onSend: SelectClause2<T, SendChannel<T>> get() = SelectClause2Impl(
75+
clauseObject = this,
76+
regFunc = RxObservableCoroutine<*>::registerSelectForSend as RegistrationFunction,
77+
processResFunc = RxObservableCoroutine<*>::processResultSelectSend as ProcessResultFunction
78+
)
79+
80+
@Suppress("UNCHECKED_CAST", "UNUSED_PARAMETER")
81+
private fun registerSelectForSend(select: SelectInstance<*>, element: Any?) {
82+
// Try to acquire the mutex and complete in the registration phase.
83+
if (mutex.tryLock()) {
84+
select.selectInRegistrationPhase(Unit)
85+
return
8986
}
90-
91-
@Suppress("UNCHECKED_CAST", "RedundantNullableReturnType")
92-
override fun onLockProcessResult(element: Any?, result: Any?): Any? {
93-
super.onLockProcessResult(owner = null, result)
94-
doLockedNext(element as T)?.let { throw it }
95-
return this@RxObservableCoroutine
87+
// Start a new coroutine that waits for the mutex, invoking `trySelect(..)` after that.
88+
// Please note that at the point of the `trySelect(..)` invocation the corresponding
89+
// `select` can still be in the registration phase, making this `trySelect(..)` bound to fail.
90+
// In this case, the `onSend` clause will be re-registered, which alongside with the mutex
91+
// manipulation makes the resulting solution obstruction-free.
92+
launch {
93+
mutex.lock()
94+
if (!select.trySelect(this@RxObservableCoroutine, Unit)) {
95+
mutex.unlock()
96+
}
9697
}
9798
}
9899

99-
@Suppress("UNCHECKED_CAST")
100-
override val onSend: SelectClause2<T, SendChannel<T>>
101-
get() = mutex.onLock as SelectClause2<T, SendChannel<T>>
100+
@Suppress("RedundantNullableReturnType", "UNUSED_PARAMETER", "UNCHECKED_CAST")
101+
private fun processResultSelectSend(element: Any?, selectResult: Any?): Any? {
102+
doLockedNext(element as T)?.let { throw it }
103+
return this@RxObservableCoroutine
104+
}
102105

103106
override fun trySend(element: T): ChannelResult<Unit> =
104107
if (!mutex.tryLock()) {

reactive/kotlinx-coroutines-rx3/src/RxObservable.kt

+30-27
Original file line numberDiff line numberDiff line change
@@ -68,37 +68,40 @@ private class RxObservableCoroutine<T : Any>(
6868
throw UnsupportedOperationException("RxObservableCoroutine doesn't support invokeOnClose")
6969

7070
// Mutex is locked when either nRequested == 0 or while subscriber.onXXX is being invoked
71-
private val mutex: Mutex = RxObservableCoroutineMutex()
72-
73-
/**
74-
* To send an element, [mutex] should be locked first, after which [doLockedNext]
75-
* is called -- it releases the mutex at the end. To support the `select` operation
76-
* and implement [onSend], we use a modified version of [Mutex.onLock], which
77-
* invokes [doLockedNext] at the end and returns this [RxObservableCoroutine] as result
78-
* (see [RxObservableCoroutineMutex.onLockProcessResult]).
79-
*
80-
* We use this dirty hack as we need to wait for a lock, and, therefore, the clause
81-
* object should be a [Mutex] instance. Thus, the [doLockedNext] call must be a part
82-
* of [RxObservableCoroutineMutex.onLockProcessResult] as well. A possible alternative
83-
* would be to inherit [RxObservableCoroutine] from [Mutex], but it already extends [AbstractCoroutine].
84-
*/
85-
@Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE", "CANNOT_OVERRIDE_INVISIBLE_MEMBER")
86-
private inner class RxObservableCoroutineMutex : MutexImpl(locked = false) {
87-
override fun onLockRegFunction(select: SelectInstance<*>, element: Any?) {
88-
super.onLockRegFunction(select, owner = null)
71+
private val mutex: Mutex = Mutex()
72+
73+
@Suppress("UNCHECKED_CAST", "INVISIBLE_MEMBER")
74+
override val onSend: SelectClause2<T, SendChannel<T>> get() = SelectClause2Impl(
75+
clauseObject = this,
76+
regFunc = RxObservableCoroutine<*>::registerSelectForSend as RegistrationFunction,
77+
processResFunc = RxObservableCoroutine<*>::processResultSelectSend as ProcessResultFunction
78+
)
79+
80+
@Suppress("UNCHECKED_CAST", "UNUSED_PARAMETER")
81+
private fun registerSelectForSend(select: SelectInstance<*>, element: Any?) {
82+
// Try to acquire the mutex and complete in the registration phase.
83+
if (mutex.tryLock()) {
84+
select.selectInRegistrationPhase(Unit)
85+
return
8986
}
90-
91-
@Suppress("UNCHECKED_CAST", "RedundantNullableReturnType")
92-
override fun onLockProcessResult(element: Any?, result: Any?): Any? {
93-
super.onLockProcessResult(owner = null, result)
94-
doLockedNext(element as T)?.let { throw it }
95-
return this@RxObservableCoroutine
87+
// Start a new coroutine that waits for the mutex, invoking `trySelect(..)` after that.
88+
// Please note that at the point of the `trySelect(..)` invocation the corresponding
89+
// `select` can still be in the registration phase, making this `trySelect(..)` bound to fail.
90+
// In this case, the `onSend` clause will be re-registered, which alongside with the mutex
91+
// manipulation makes the resulting solution obstruction-free.
92+
launch {
93+
mutex.lock()
94+
if (!select.trySelect(this@RxObservableCoroutine, Unit)) {
95+
mutex.unlock()
96+
}
9697
}
9798
}
9899

99-
@Suppress("UNCHECKED_CAST")
100-
override val onSend: SelectClause2<T, SendChannel<T>>
101-
get() = mutex.onLock as SelectClause2<T, SendChannel<T>>
100+
@Suppress("RedundantNullableReturnType", "UNUSED_PARAMETER", "UNCHECKED_CAST")
101+
private fun processResultSelectSend(element: Any?, selectResult: Any?): Any? {
102+
doLockedNext(element as T)?.let { throw it }
103+
return this@RxObservableCoroutine
104+
}
102105

103106
override fun trySend(element: T): ChannelResult<Unit> =
104107
if (!mutex.tryLock()) {

0 commit comments

Comments
 (0)