Skip to content

Commit 10c6ac0

Browse files
committed
Actually test 'onLock' and the corresponding concurrency and cancellation of Reactive's onSend
* Also, cleanup the related code a bit
1 parent e4bee74 commit 10c6ac0

File tree

7 files changed

+114
-8
lines changed

7 files changed

+114
-8
lines changed

kotlinx-coroutines-core/common/src/channels/Channel.kt

-1
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@ public interface SendChannel<in E> {
6464
*/
6565
public val onSend: SelectClause2<E, SendChannel<E>>
6666

67-
6867
/**
6968
* Immediately adds the specified [element] to this channel, if this doesn't violate its capacity restrictions,
7069
* and returns the successful result. Otherwise, returns failed or closed result.

kotlinx-coroutines-core/jvm/test/channels/RandevouzChannelStressTest.kt renamed to kotlinx-coroutines-core/jvm/test/channels/RendezvousChannelStressTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ package kotlinx.coroutines.channels
77
import kotlinx.coroutines.*
88
import org.junit.*
99

10-
class RandevouzChannelStressTest : TestBase() {
10+
class RendezvousChannelStressTest : TestBase() {
1111

1212
@Test
1313
fun testStress() = runTest {

reactive/kotlinx-coroutines-reactive/test/PublishTest.kt

+37-1
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,12 @@
55
package kotlinx.coroutines.reactive
66

77
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.CancellationException
89
import kotlinx.coroutines.channels.*
10+
import kotlinx.coroutines.flow.*
911
import org.junit.Test
1012
import org.reactivestreams.*
13+
import java.util.concurrent.*
1114
import kotlin.test.*
1215

1316
class PublishTest : TestBase() {
@@ -284,4 +287,37 @@ class PublishTest : TestBase() {
284287
}
285288
assertEquals("OK", publisher.awaitFirstOrNull())
286289
}
287-
}
290+
291+
@Test
292+
fun testOnSendCancelled() = runTest {
293+
val latch = CountDownLatch(1)
294+
val published = publish(Dispatchers.Default) {
295+
expect(2)
296+
// Collector is ready
297+
send(1)
298+
expect(3)
299+
try {
300+
send(2)
301+
expectUnreached()
302+
} catch (e: CancellationException) {
303+
expect(7)
304+
// publisher cancellation is async
305+
latch.countDown()
306+
throw e
307+
}
308+
}
309+
310+
expect(1)
311+
val job = launch {
312+
published.asFlow().buffer(0).collect {
313+
expect(4)
314+
hang { expect(6) }
315+
}
316+
}
317+
yield()
318+
expect(5)
319+
job.cancelAndJoin()
320+
latch.await()
321+
finish(8)
322+
}
323+
}

reactive/kotlinx-coroutines-reactive/test/PublisherMultiTest.kt

+24-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package kotlinx.coroutines.reactive
66

77
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.selects.*
89
import org.junit.Test
910
import kotlin.test.*
1011

@@ -16,7 +17,7 @@ class PublisherMultiTest : TestBase() {
1617
// concurrent emitters (many coroutines)
1718
val jobs = List(n) {
1819
// launch
19-
launch {
20+
launch(Dispatchers.Default) {
2021
send(it)
2122
}
2223
}
@@ -28,4 +29,26 @@ class PublisherMultiTest : TestBase() {
2829
}
2930
assertEquals(n, resultSet.size)
3031
}
32+
33+
@Test
34+
fun testConcurrentStressOnSend() = runBlocking {
35+
val n = 10_000 * stressTestMultiplier
36+
val observable = publish {
37+
// concurrent emitters (many coroutines)
38+
val jobs = List(n) {
39+
// launch
40+
launch(Dispatchers.Default) {
41+
select<Unit> {
42+
onSend(it) {}
43+
}
44+
}
45+
}
46+
jobs.forEach { it.join() }
47+
}
48+
val resultSet = mutableSetOf<Int>()
49+
observable.collect {
50+
assertTrue(resultSet.add(it))
51+
}
52+
assertEquals(n, resultSet.size)
53+
}
3154
}

reactive/kotlinx-coroutines-rx2/test/ObservableCompletionStressTest.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import kotlin.coroutines.*
1212
class ObservableCompletionStressTest : TestBase() {
1313
private val N_REPEATS = 10_000 * stressTestMultiplier
1414

15-
private fun CoroutineScope.range(context: CoroutineContext, start: Int, count: Int) = rxObservable(context) {
15+
private fun range(context: CoroutineContext, start: Int, count: Int) = rxObservable(context) {
1616
for (x in start until start + count) send(x)
1717
}
1818

@@ -33,4 +33,4 @@ class ObservableCompletionStressTest : TestBase() {
3333
}
3434
}
3535
}
36-
}
36+
}

reactive/kotlinx-coroutines-rx2/test/ObservableMultiTest.kt

+25-1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package kotlinx.coroutines.rx2
66

77
import io.reactivex.*
88
import kotlinx.coroutines.*
9+
import kotlinx.coroutines.selects.*
910
import org.junit.Test
1011
import java.io.*
1112
import kotlin.test.*
@@ -47,6 +48,29 @@ class ObservableMultiTest : TestBase() {
4748
}
4849
}
4950

51+
@Test
52+
fun testConcurrentStressOnSend() {
53+
val n = 10_000 * stressTestMultiplier
54+
val observable = rxObservable {
55+
newCoroutineContext(coroutineContext)
56+
// concurrent emitters (many coroutines)
57+
val jobs = List(n) {
58+
// launch
59+
launch(Dispatchers.Default) {
60+
val i = it
61+
select<Unit> {
62+
onSend(i) {}
63+
}
64+
}
65+
}
66+
jobs.forEach { it.join() }
67+
}
68+
checkSingleValue(observable.toList()) { list ->
69+
assertEquals(n, list.size)
70+
assertEquals((0 until n).toList(), list.sorted())
71+
}
72+
}
73+
5074
@Test
5175
fun testIteratorResendUnconfined() {
5276
val n = 10_000 * stressTestMultiplier
@@ -88,4 +112,4 @@ class ObservableMultiTest : TestBase() {
88112
assertEquals("OK", it)
89113
}
90114
}
91-
}
115+
}

reactive/kotlinx-coroutines-rx3/test/ObservableMultiTest.kt

+25-1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package kotlinx.coroutines.rx3
66

77
import io.reactivex.rxjava3.core.*
88
import kotlinx.coroutines.*
9+
import kotlinx.coroutines.selects.*
910
import org.junit.Test
1011
import java.io.*
1112
import kotlin.test.*
@@ -34,7 +35,7 @@ class ObservableMultiTest : TestBase() {
3435
// concurrent emitters (many coroutines)
3536
val jobs = List(n) {
3637
// launch
37-
launch {
38+
launch(Dispatchers.Default) {
3839
val i = it
3940
send(i)
4041
}
@@ -47,6 +48,29 @@ class ObservableMultiTest : TestBase() {
4748
}
4849
}
4950

51+
@Test
52+
fun testConcurrentStressOnSend() {
53+
val n = 10_000 * stressTestMultiplier
54+
val observable = rxObservable {
55+
newCoroutineContext(coroutineContext)
56+
// concurrent emitters (many coroutines)
57+
val jobs = List(n) {
58+
// launch
59+
launch(Dispatchers.Default) {
60+
val i = it
61+
select<Unit> {
62+
onSend(i) {}
63+
}
64+
}
65+
}
66+
jobs.forEach { it.join() }
67+
}
68+
checkSingleValue(observable.toList()) { list ->
69+
assertEquals(n, list.size)
70+
assertEquals((0 until n).toList(), list.sorted())
71+
}
72+
}
73+
5074
@Test
5175
fun testIteratorResendUnconfined() {
5276
val n = 10_000 * stressTestMultiplier

0 commit comments

Comments
 (0)