File tree 10 files changed +38
-21
lines changed
kotlinx-coroutines-reactive
10 files changed +38
-21
lines changed Original file line number Diff line number Diff line change @@ -57,7 +57,6 @@ helper function. [NonCancellable] job object is provided to suppress cancellatio
57
57
| [ SendChannel] [ kotlinx.coroutines.channels.SendChannel ] | [ send] [ kotlinx.coroutines.channels.SendChannel.send ] | [ onSend] [ kotlinx.coroutines.channels.SendChannel.onSend ] | [ trySend] [ kotlinx.coroutines.channels.SendChannel.trySend ]
58
58
| [ ReceiveChannel] [ kotlinx.coroutines.channels.ReceiveChannel ] | [ receive] [ kotlinx.coroutines.channels.ReceiveChannel.receive ] | [ onReceive] [ kotlinx.coroutines.channels.ReceiveChannel.onReceive ] | [ tryReceive] [ kotlinx.coroutines.channels.ReceiveChannel.tryReceive ]
59
59
| [ ReceiveChannel] [ kotlinx.coroutines.channels.ReceiveChannel ] | [ receiveCatching] [ kotlinx.coroutines.channels.receiveCatching ] | [ onReceiveCatching] [ kotlinx.coroutines.channels.onReceiveCatching ] | [ tryReceive] [ kotlinx.coroutines.channels.ReceiveChannel.tryReceive ]
60
- | [ Mutex] [ kotlinx.coroutines.sync.Mutex ] | [ lock] [ kotlinx.coroutines.sync.Mutex.lock ] | [ onLock] [ kotlinx.coroutines.sync.Mutex.onLock ] | [ tryLock] [ kotlinx.coroutines.sync.Mutex.tryLock ]
61
60
| none | [ delay] [ kotlinx.coroutines.delay ] | [ onTimeout] [ kotlinx.coroutines.selects.SelectBuilder.onTimeout ] | none
62
61
63
62
# Package kotlinx.coroutines
@@ -121,8 +120,6 @@ Obsolete and deprecated module to test coroutines. Replaced with `kotlinx-corout
121
120
122
121
[ kotlinx.coroutines.sync.Mutex ] : https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/index.html
123
122
[ kotlinx.coroutines.sync.Mutex.lock ] : https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/lock.html
124
- [ kotlinx.coroutines.sync.Mutex.onLock ] : https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/on-lock.html
125
- [ kotlinx.coroutines.sync.Mutex.tryLock ] : https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/try-lock.html
126
123
127
124
<!-- - INDEX kotlinx.coroutines.channels -->
128
125
Original file line number Diff line number Diff line change @@ -186,7 +186,6 @@ public interface SelectInstance<in R> {
186
186
* | [SendChannel] | [send][SendChannel.send] | [onSend][SendChannel.onSend]
187
187
* | [ReceiveChannel] | [receive][ReceiveChannel.receive] | [onReceive][ReceiveChannel.onReceive]
188
188
* | [ReceiveChannel] | [receiveCatching][ReceiveChannel.receiveCatching] | [onReceiveCatching][ReceiveChannel.onReceiveCatching]
189
- * | [Mutex] | [lock][Mutex.lock] | [onLock][Mutex.onLock]
190
189
* | none | [delay] | [onTimeout][SelectBuilder.onTimeout]
191
190
*
192
191
* This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
Original file line number Diff line number Diff line change @@ -52,8 +52,7 @@ public interface Mutex {
52
52
* Note that this function does not check for cancellation when it is not suspended.
53
53
* Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
54
54
*
55
- * This function can be used in [select] invocation with [onLock] clause.
56
- * Use [tryLock] to try acquire lock without waiting.
55
+ * Use [tryLock] to try acquiring a lock without waiting.
57
56
*
58
57
* This function is fair; suspended callers are resumed in first-in-first-out order.
59
58
*
@@ -63,10 +62,10 @@ public interface Mutex {
63
62
public suspend fun lock (owner : Any? = null)
64
63
65
64
/* *
66
- * Clause for [select] expression of [lock] suspending function that selects when the mutex is locked.
67
- * Additional parameter for the clause in the `owner` (see [lock]) and when the clause is selected
68
- * the reference to this mutex is passed into the corresponding block.
65
+ * Deprecated for removal without built-in replacement.
69
66
*/
67
+ @Deprecated(level = DeprecationLevel .WARNING , message = " Mutex.onLock deprecated without replacement. " +
68
+ " For additional details please refer to #2794" )
70
69
public val onLock: SelectClause2 <Any ?, Mutex >
71
70
72
71
/* *
Original file line number Diff line number Diff line change @@ -110,7 +110,6 @@ public class PublisherCoroutine<in T>(
110
110
block(this )
111
111
}
112
112
113
- // TODO discuss it
114
113
launch(start = CoroutineStart .UNDISPATCHED ) {
115
114
mutex.lock()
116
115
// Already selected -- bail out
Original file line number Diff line number Diff line change @@ -296,12 +296,10 @@ class PublishTest : TestBase() {
296
296
expect(2 )
297
297
// Collector is ready
298
298
send(1 )
299
- expect(3 )
300
299
try {
301
300
send(2 )
302
301
expectUnreached()
303
302
} catch (e: CancellationException ) {
304
- expect(7 )
305
303
// publisher cancellation is async
306
304
latch.countDown()
307
305
throw e
@@ -312,15 +310,14 @@ class PublishTest : TestBase() {
312
310
val collectorLatch = Mutex (true )
313
311
val job = launch {
314
312
published.asFlow().buffer(0 ).collect {
315
- expect(4 )
316
313
collectorLatch.unlock()
317
- hang { expect(6 ) }
314
+ hang { expect(4 ) }
318
315
}
319
316
}
320
317
collectorLatch.lock()
321
- expect(5 )
318
+ expect(3 )
322
319
job.cancelAndJoin()
323
320
latch.await()
324
- finish(8 )
321
+ finish(5 )
325
322
}
326
323
}
Original file line number Diff line number Diff line change @@ -33,7 +33,7 @@ class PublisherMultiTest : TestBase() {
33
33
@Test
34
34
fun testConcurrentStressOnSend () = runBlocking {
35
35
val n = 10_000 * stressTestMultiplier
36
- val observable = publish {
36
+ val observable = publish< Int > {
37
37
// concurrent emitters (many coroutines)
38
38
val jobs = List (n) {
39
39
// launch
Original file line number Diff line number Diff line change @@ -10,6 +10,7 @@ import kotlinx.atomicfu.*
10
10
import kotlinx.coroutines.*
11
11
import kotlinx.coroutines.channels.*
12
12
import kotlinx.coroutines.internal.*
13
+ import kotlinx.coroutines.intrinsics.*
13
14
import kotlinx.coroutines.selects.*
14
15
import kotlinx.coroutines.sync.*
15
16
import kotlin.coroutines.*
@@ -95,10 +96,22 @@ private class RxObservableCoroutine<T : Any>(
95
96
element : T ,
96
97
block : suspend (SendChannel <T >) -> R
97
98
) {
98
- mutex.onLock.registerSelectClause2(select, null ) {
99
+ val clause = suspend {
99
100
doLockedNext(element)?.let { throw it }
100
101
block(this )
101
102
}
103
+
104
+ // This is the default replacement proposed in onLock replacement
105
+ launch(start = CoroutineStart .UNDISPATCHED ) {
106
+ mutex.lock()
107
+ // Already selected -- bail out
108
+ if (! select.trySelect()) {
109
+ mutex.unlock()
110
+ return @launch
111
+ }
112
+
113
+ clause.startCoroutineCancellable(select.completion)
114
+ }
102
115
}
103
116
104
117
// assert: mutex.isLocked()
Original file line number Diff line number Diff line change @@ -51,7 +51,7 @@ class ObservableMultiTest : TestBase() {
51
51
@Test
52
52
fun testConcurrentStressOnSend () {
53
53
val n = 10_000 * stressTestMultiplier
54
- val observable = rxObservable {
54
+ val observable = rxObservable< Int > {
55
55
newCoroutineContext(coroutineContext)
56
56
// concurrent emitters (many coroutines)
57
57
val jobs = List (n) {
Original file line number Diff line number Diff line change @@ -13,6 +13,7 @@ import kotlinx.coroutines.selects.*
13
13
import kotlinx.coroutines.sync.*
14
14
import kotlin.coroutines.*
15
15
import kotlinx.coroutines.internal.*
16
+ import kotlinx.coroutines.intrinsics.*
16
17
17
18
/* *
18
19
* Creates cold [observable][Observable] that will run a given [block] in a coroutine.
@@ -95,10 +96,22 @@ private class RxObservableCoroutine<T : Any>(
95
96
element : T ,
96
97
block : suspend (SendChannel <T >) -> R
97
98
) {
98
- mutex.onLock.registerSelectClause2(select, null ) {
99
+ val clause = suspend {
99
100
doLockedNext(element)?.let { throw it }
100
101
block(this )
101
102
}
103
+
104
+ // This is the default replacement proposed in onLock replacement
105
+ launch(start = CoroutineStart .UNDISPATCHED ) {
106
+ mutex.lock()
107
+ // Already selected -- bail out
108
+ if (! select.trySelect()) {
109
+ mutex.unlock()
110
+ return @launch
111
+ }
112
+
113
+ clause.startCoroutineCancellable(select.completion)
114
+ }
102
115
}
103
116
104
117
// assert: mutex.isLocked()
Original file line number Diff line number Diff line change @@ -51,7 +51,7 @@ class ObservableMultiTest : TestBase() {
51
51
@Test
52
52
fun testConcurrentStressOnSend () {
53
53
val n = 10_000 * stressTestMultiplier
54
- val observable = rxObservable {
54
+ val observable = rxObservable< Int > {
55
55
newCoroutineContext(coroutineContext)
56
56
// concurrent emitters (many coroutines)
57
57
val jobs = List (n) {
You can’t perform that action at this time.
0 commit comments