File tree 15 files changed +166
-28
lines changed
kotlinx-coroutines-reactive
15 files changed +166
-28
lines changed Original file line number Diff line number Diff line change @@ -54,13 +54,14 @@ tasks.named<Jar>("jmhJar") {
54
54
}
55
55
56
56
dependencies {
57
- compile (" org.openjdk.jmh:jmh-core:1.26" )
58
- compile (" io.projectreactor:reactor-core:${version(" reactor" )} " )
59
- compile (" io.reactivex.rxjava2:rxjava:2.1.9" )
60
- compile (" com.github.akarnokd:rxjava2-extensions:0.20.8" )
57
+ implementation (" org.openjdk.jmh:jmh-core:1.26" )
58
+ implementation (" io.projectreactor:reactor-core:${version(" reactor" )} " )
59
+ implementation (" io.reactivex.rxjava2:rxjava:2.1.9" )
60
+ implementation (" com.github.akarnokd:rxjava2-extensions:0.20.8" )
61
61
62
- compile(" com.typesafe.akka:akka-actor_2.12:2.5.0" )
63
- compile(project(" :kotlinx-coroutines-core" ))
62
+ implementation(" com.typesafe.akka:akka-actor_2.12:2.5.0" )
63
+ implementation(project(" :kotlinx-coroutines-core" ))
64
+ implementation(project(" :kotlinx-coroutines-reactive" ))
64
65
65
66
// add jmh dependency on main
66
67
" jmhImplementation" (sourceSets.main.get().runtimeClasspath)
Original file line number Diff line number Diff line change @@ -57,7 +57,6 @@ helper function. [NonCancellable] job object is provided to suppress cancellatio
57
57
| [ SendChannel] [ kotlinx.coroutines.channels.SendChannel ] | [ send] [ kotlinx.coroutines.channels.SendChannel.send ] | [ onSend] [ kotlinx.coroutines.channels.SendChannel.onSend ] | [ trySend] [ kotlinx.coroutines.channels.SendChannel.trySend ]
58
58
| [ ReceiveChannel] [ kotlinx.coroutines.channels.ReceiveChannel ] | [ receive] [ kotlinx.coroutines.channels.ReceiveChannel.receive ] | [ onReceive] [ kotlinx.coroutines.channels.ReceiveChannel.onReceive ] | [ tryReceive] [ kotlinx.coroutines.channels.ReceiveChannel.tryReceive ]
59
59
| [ ReceiveChannel] [ kotlinx.coroutines.channels.ReceiveChannel ] | [ receiveCatching] [ kotlinx.coroutines.channels.receiveCatching ] | [ onReceiveCatching] [ kotlinx.coroutines.channels.onReceiveCatching ] | [ tryReceive] [ kotlinx.coroutines.channels.ReceiveChannel.tryReceive ]
60
- | [ Mutex] [ kotlinx.coroutines.sync.Mutex ] | [ lock] [ kotlinx.coroutines.sync.Mutex.lock ] | [ onLock] [ kotlinx.coroutines.sync.Mutex.onLock ] | [ tryLock] [ kotlinx.coroutines.sync.Mutex.tryLock ]
61
60
| none | [ delay] [ kotlinx.coroutines.delay ] | [ onTimeout] [ kotlinx.coroutines.selects.SelectBuilder.onTimeout ] | none
62
61
63
62
# Package kotlinx.coroutines
@@ -121,8 +120,6 @@ Obsolete and deprecated module to test coroutines. Replaced with `kotlinx-corout
121
120
122
121
[ kotlinx.coroutines.sync.Mutex ] : https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/index.html
123
122
[ kotlinx.coroutines.sync.Mutex.lock ] : https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/lock.html
124
- [ kotlinx.coroutines.sync.Mutex.onLock ] : https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/on-lock.html
125
- [ kotlinx.coroutines.sync.Mutex.tryLock ] : https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/try-lock.html
126
123
127
124
<!-- - INDEX kotlinx.coroutines.channels -->
128
125
Original file line number Diff line number Diff line change @@ -60,7 +60,6 @@ helper function. [NonCancellable] job object is provided to suppress cancellatio
60
60
| [ SendChannel] [ kotlinx.coroutines.channels.SendChannel ] | [ send] [ kotlinx.coroutines.channels.SendChannel.send ] | [ onSend] [ kotlinx.coroutines.channels.SendChannel.onSend ] | [ trySend] [ kotlinx.coroutines.channels.SendChannel.trySend ]
61
61
| [ ReceiveChannel] [ kotlinx.coroutines.channels.ReceiveChannel ] | [ receive] [ kotlinx.coroutines.channels.ReceiveChannel.receive ] | [ onReceive] [ kotlinx.coroutines.channels.ReceiveChannel.onReceive ] | [ tryReceive] [ kotlinx.coroutines.channels.ReceiveChannel.tryReceive ]
62
62
| [ ReceiveChannel] [ kotlinx.coroutines.channels.ReceiveChannel ] | [ receiveCatching] [ kotlinx.coroutines.channels.ReceiveChannel.receiveCatching ] | [ onReceiveCatching] [ kotlinx.coroutines.channels.ReceiveChannel.onReceiveCatching ] | [ tryReceive] [ kotlinx.coroutines.channels.ReceiveChannel.tryReceive ]
63
- | [ Mutex] [ kotlinx.coroutines.sync.Mutex ] | [ lock] [ kotlinx.coroutines.sync.Mutex.lock ] | [ onLock] [ kotlinx.coroutines.sync.Mutex.onLock ] | [ tryLock] [ kotlinx.coroutines.sync.Mutex.tryLock ]
64
63
| none | [ delay] | [ onTimeout] [ kotlinx.coroutines.selects.SelectBuilder.onTimeout ] | none
65
64
66
65
This module provides debugging facilities for coroutines (run JVM with ` -ea ` or ` -Dkotlinx.coroutines.debug ` options)
@@ -131,7 +130,6 @@ Low-level primitives for finer-grained control of coroutines.
131
130
132
131
[ kotlinx.coroutines.sync.Mutex ] : https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/index.html
133
132
[ kotlinx.coroutines.sync.Mutex.lock ] : https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/lock.html
134
- [ kotlinx.coroutines.sync.Mutex.onLock ] : https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/on-lock.html
135
133
[ kotlinx.coroutines.sync.Mutex.tryLock ] : https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/try-lock.html
136
134
137
135
<!-- - INDEX kotlinx.coroutines.channels -->
Original file line number Diff line number Diff line change @@ -64,7 +64,6 @@ public interface SendChannel<in E> {
64
64
*/
65
65
public val onSend: SelectClause2 <E , SendChannel <E >>
66
66
67
-
68
67
/* *
69
68
* Immediately adds the specified [element] to this channel, if this doesn't violate its capacity restrictions,
70
69
* and returns the successful result. Otherwise, returns failed or closed result.
Original file line number Diff line number Diff line change @@ -186,7 +186,6 @@ public interface SelectInstance<in R> {
186
186
* | [SendChannel] | [send][SendChannel.send] | [onSend][SendChannel.onSend]
187
187
* | [ReceiveChannel] | [receive][ReceiveChannel.receive] | [onReceive][ReceiveChannel.onReceive]
188
188
* | [ReceiveChannel] | [receiveCatching][ReceiveChannel.receiveCatching] | [onReceiveCatching][ReceiveChannel.onReceiveCatching]
189
- * | [Mutex] | [lock][Mutex.lock] | [onLock][Mutex.onLock]
190
189
* | none | [delay] | [onTimeout][SelectBuilder.onTimeout]
191
190
*
192
191
* This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
Original file line number Diff line number Diff line change @@ -52,8 +52,7 @@ public interface Mutex {
52
52
* Note that this function does not check for cancellation when it is not suspended.
53
53
* Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
54
54
*
55
- * This function can be used in [select] invocation with [onLock] clause.
56
- * Use [tryLock] to try acquire lock without waiting.
55
+ * Use [tryLock] to try acquiring a lock without waiting.
57
56
*
58
57
* This function is fair; suspended callers are resumed in first-in-first-out order.
59
58
*
@@ -63,10 +62,10 @@ public interface Mutex {
63
62
public suspend fun lock (owner : Any? = null)
64
63
65
64
/* *
66
- * Clause for [select] expression of [lock] suspending function that selects when the mutex is locked.
67
- * Additional parameter for the clause in the `owner` (see [lock]) and when the clause is selected
68
- * the reference to this mutex is passed into the corresponding block.
65
+ * Deprecated for removal without built-in replacement.
69
66
*/
67
+ @Deprecated(level = DeprecationLevel .WARNING , message = " Mutex.onLock deprecated without replacement. " +
68
+ " For additional details please refer to #2794" )
70
69
public val onLock: SelectClause2 <Any ?, Mutex >
71
70
72
71
/* *
Original file line number Diff line number Diff line change @@ -7,7 +7,7 @@ package kotlinx.coroutines.channels
7
7
import kotlinx.coroutines.*
8
8
import org.junit.*
9
9
10
- class RandevouzChannelStressTest : TestBase () {
10
+ class RendezvousChannelStressTest : TestBase () {
11
11
12
12
@Test
13
13
fun testStress () = runTest {
Original file line number Diff line number Diff line change @@ -6,6 +6,7 @@ package kotlinx.coroutines.reactive
6
6
import kotlinx.atomicfu.*
7
7
import kotlinx.coroutines.*
8
8
import kotlinx.coroutines.channels.*
9
+ import kotlinx.coroutines.intrinsics.*
9
10
import kotlinx.coroutines.selects.*
10
11
import kotlinx.coroutines.sync.*
11
12
import org.reactivestreams.*
@@ -104,10 +105,21 @@ public class PublisherCoroutine<in T>(
104
105
// registerSelectSend
105
106
@Suppress(" PARAMETER_NAME_CHANGED_ON_OVERRIDE" )
106
107
override fun <R > registerSelectClause2 (select : SelectInstance <R >, element : T , block : suspend (SendChannel <T >) -> R ) {
107
- mutex.onLock.registerSelectClause2(select, null ) {
108
+ val clause = suspend {
108
109
doLockedNext(element)?.let { throw it }
109
110
block(this )
110
111
}
112
+
113
+ launch(start = CoroutineStart .UNDISPATCHED ) {
114
+ mutex.lock()
115
+ // Already selected -- bail out
116
+ if (! select.trySelect()) {
117
+ mutex.unlock()
118
+ return @launch
119
+ }
120
+
121
+ clause.startCoroutineCancellable(select.completion)
122
+ }
111
123
}
112
124
113
125
/*
Original file line number Diff line number Diff line change 5
5
package kotlinx.coroutines.reactive
6
6
7
7
import kotlinx.coroutines.*
8
+ import kotlinx.coroutines.CancellationException
8
9
import kotlinx.coroutines.channels.*
10
+ import kotlinx.coroutines.flow.*
11
+ import kotlinx.coroutines.sync.*
9
12
import org.junit.Test
10
13
import org.reactivestreams.*
14
+ import java.util.concurrent.*
11
15
import kotlin.test.*
12
16
13
17
class PublishTest : TestBase () {
@@ -284,4 +288,36 @@ class PublishTest : TestBase() {
284
288
}
285
289
assertEquals(" OK" , publisher.awaitFirstOrNull())
286
290
}
287
- }
291
+
292
+ @Test
293
+ fun testOnSendCancelled () = runTest {
294
+ val latch = CountDownLatch (1 )
295
+ val published = publish(Dispatchers .Default ) {
296
+ expect(2 )
297
+ // Collector is ready
298
+ send(1 )
299
+ try {
300
+ send(2 )
301
+ expectUnreached()
302
+ } catch (e: CancellationException ) {
303
+ // publisher cancellation is async
304
+ latch.countDown()
305
+ throw e
306
+ }
307
+ }
308
+
309
+ expect(1 )
310
+ val collectorLatch = Mutex (true )
311
+ val job = launch {
312
+ published.asFlow().buffer(0 ).collect {
313
+ collectorLatch.unlock()
314
+ hang { expect(4 ) }
315
+ }
316
+ }
317
+ collectorLatch.lock()
318
+ expect(3 )
319
+ job.cancelAndJoin()
320
+ latch.await()
321
+ finish(5 )
322
+ }
323
+ }
Original file line number Diff line number Diff line change 5
5
package kotlinx.coroutines.reactive
6
6
7
7
import kotlinx.coroutines.*
8
+ import kotlinx.coroutines.selects.*
8
9
import org.junit.Test
9
10
import kotlin.test.*
10
11
@@ -16,7 +17,7 @@ class PublisherMultiTest : TestBase() {
16
17
// concurrent emitters (many coroutines)
17
18
val jobs = List (n) {
18
19
// launch
19
- launch {
20
+ launch( Dispatchers . Default ) {
20
21
send(it)
21
22
}
22
23
}
@@ -28,4 +29,26 @@ class PublisherMultiTest : TestBase() {
28
29
}
29
30
assertEquals(n, resultSet.size)
30
31
}
32
+
33
+ @Test
34
+ fun testConcurrentStressOnSend () = runBlocking {
35
+ val n = 10_000 * stressTestMultiplier
36
+ val observable = publish<Int > {
37
+ // concurrent emitters (many coroutines)
38
+ val jobs = List (n) {
39
+ // launch
40
+ launch(Dispatchers .Default ) {
41
+ select<Unit > {
42
+ onSend(it) {}
43
+ }
44
+ }
45
+ }
46
+ jobs.forEach { it.join() }
47
+ }
48
+ val resultSet = mutableSetOf<Int >()
49
+ observable.collect {
50
+ assertTrue(resultSet.add(it))
51
+ }
52
+ assertEquals(n, resultSet.size)
53
+ }
31
54
}
Original file line number Diff line number Diff line change @@ -10,6 +10,7 @@ import kotlinx.atomicfu.*
10
10
import kotlinx.coroutines.*
11
11
import kotlinx.coroutines.channels.*
12
12
import kotlinx.coroutines.internal.*
13
+ import kotlinx.coroutines.intrinsics.*
13
14
import kotlinx.coroutines.selects.*
14
15
import kotlinx.coroutines.sync.*
15
16
import kotlin.coroutines.*
@@ -95,10 +96,22 @@ private class RxObservableCoroutine<T : Any>(
95
96
element : T ,
96
97
block : suspend (SendChannel <T >) -> R
97
98
) {
98
- mutex.onLock.registerSelectClause2(select, null ) {
99
+ val clause = suspend {
99
100
doLockedNext(element)?.let { throw it }
100
101
block(this )
101
102
}
103
+
104
+ // This is the default replacement proposed in onLock replacement
105
+ launch(start = CoroutineStart .UNDISPATCHED ) {
106
+ mutex.lock()
107
+ // Already selected -- bail out
108
+ if (! select.trySelect()) {
109
+ mutex.unlock()
110
+ return @launch
111
+ }
112
+
113
+ clause.startCoroutineCancellable(select.completion)
114
+ }
102
115
}
103
116
104
117
// assert: mutex.isLocked()
Original file line number Diff line number Diff line change @@ -12,7 +12,7 @@ import kotlin.coroutines.*
12
12
class ObservableCompletionStressTest : TestBase () {
13
13
private val N_REPEATS = 10_000 * stressTestMultiplier
14
14
15
- private fun CoroutineScope. range (context : CoroutineContext , start : Int , count : Int ) = rxObservable(context) {
15
+ private fun range (context : CoroutineContext , start : Int , count : Int ) = rxObservable(context) {
16
16
for (x in start until start + count) send(x)
17
17
}
18
18
@@ -33,4 +33,4 @@ class ObservableCompletionStressTest : TestBase() {
33
33
}
34
34
}
35
35
}
36
- }
36
+ }
Original file line number Diff line number Diff line change @@ -6,6 +6,7 @@ package kotlinx.coroutines.rx2
6
6
7
7
import io.reactivex.*
8
8
import kotlinx.coroutines.*
9
+ import kotlinx.coroutines.selects.*
9
10
import org.junit.Test
10
11
import java.io.*
11
12
import kotlin.test.*
@@ -47,6 +48,29 @@ class ObservableMultiTest : TestBase() {
47
48
}
48
49
}
49
50
51
+ @Test
52
+ fun testConcurrentStressOnSend () {
53
+ val n = 10_000 * stressTestMultiplier
54
+ val observable = rxObservable<Int > {
55
+ newCoroutineContext(coroutineContext)
56
+ // concurrent emitters (many coroutines)
57
+ val jobs = List (n) {
58
+ // launch
59
+ launch(Dispatchers .Default ) {
60
+ val i = it
61
+ select<Unit > {
62
+ onSend(i) {}
63
+ }
64
+ }
65
+ }
66
+ jobs.forEach { it.join() }
67
+ }
68
+ checkSingleValue(observable.toList()) { list ->
69
+ assertEquals(n, list.size)
70
+ assertEquals((0 until n).toList(), list.sorted())
71
+ }
72
+ }
73
+
50
74
@Test
51
75
fun testIteratorResendUnconfined () {
52
76
val n = 10_000 * stressTestMultiplier
@@ -88,4 +112,4 @@ class ObservableMultiTest : TestBase() {
88
112
assertEquals(" OK" , it)
89
113
}
90
114
}
91
- }
115
+ }
Original file line number Diff line number Diff line change @@ -13,6 +13,7 @@ import kotlinx.coroutines.selects.*
13
13
import kotlinx.coroutines.sync.*
14
14
import kotlin.coroutines.*
15
15
import kotlinx.coroutines.internal.*
16
+ import kotlinx.coroutines.intrinsics.*
16
17
17
18
/* *
18
19
* Creates cold [observable][Observable] that will run a given [block] in a coroutine.
@@ -95,10 +96,22 @@ private class RxObservableCoroutine<T : Any>(
95
96
element : T ,
96
97
block : suspend (SendChannel <T >) -> R
97
98
) {
98
- mutex.onLock.registerSelectClause2(select, null ) {
99
+ val clause = suspend {
99
100
doLockedNext(element)?.let { throw it }
100
101
block(this )
101
102
}
103
+
104
+ // This is the default replacement proposed in onLock replacement
105
+ launch(start = CoroutineStart .UNDISPATCHED ) {
106
+ mutex.lock()
107
+ // Already selected -- bail out
108
+ if (! select.trySelect()) {
109
+ mutex.unlock()
110
+ return @launch
111
+ }
112
+
113
+ clause.startCoroutineCancellable(select.completion)
114
+ }
102
115
}
103
116
104
117
// assert: mutex.isLocked()
You can’t perform that action at this time.
0 commit comments