Skip to content

Commit a2d62d9

Browse files
committed
Add some tests to check error handling in reactive streams
TODO: if they are accepted, maybe clone to the jdk9 module
1 parent ff12a4a commit a2d62d9

File tree

2 files changed

+188
-0
lines changed

2 files changed

+188
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.reactive
6+
7+
import kotlinx.coroutines.*
8+
import org.junit.*
9+
import org.reactivestreams.*
10+
11+
class AwaitTest: TestBase() {
12+
13+
/** Tests that calls to [awaitFirst] (and, thus, to the rest of these functions) throw [CancellationException] and
14+
* unsubscribe from the publisher when their [Job] is cancelled. */
15+
@Test
16+
fun testAwaitCancellation() = runTest {
17+
expect(1)
18+
val publisher = Publisher<Int> { s ->
19+
s.onSubscribe(object: Subscription {
20+
override fun request(n: Long) {
21+
expect(3)
22+
}
23+
24+
override fun cancel() {
25+
expect(5)
26+
}
27+
})
28+
}
29+
val job = launch(start = CoroutineStart.UNDISPATCHED) {
30+
try {
31+
expect(2)
32+
publisher.awaitFirst()
33+
} catch (e: CancellationException) {
34+
expect(6)
35+
throw e
36+
}
37+
}
38+
expect(4)
39+
job.cancelAndJoin()
40+
finish(7)
41+
}
42+
43+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
/*
2+
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.reactive
6+
7+
import kotlinx.coroutines.*
8+
import org.junit.Test
9+
import org.reactivestreams.*
10+
import java.lang.IllegalArgumentException
11+
import kotlin.test.*
12+
13+
class PublisherCollectTest: TestBase() {
14+
15+
/** Tests the simple scenario where the publisher outputs a bounded stream of values to collect. */
16+
@Test
17+
fun testCollect() = runTest {
18+
val x = 100
19+
val xSum = x * (x + 1) / 2
20+
val publisher = Publisher<Int> { subscriber ->
21+
var requested = 0L
22+
var lastOutput = 0
23+
subscriber.onSubscribe(object: Subscription {
24+
25+
override fun request(n: Long) {
26+
requested += n
27+
if (n <= 0) {
28+
subscriber.onError(IllegalArgumentException())
29+
return
30+
}
31+
while (lastOutput < x && lastOutput < requested) {
32+
lastOutput += 1
33+
subscriber.onNext(lastOutput)
34+
}
35+
if (lastOutput == x)
36+
subscriber.onComplete()
37+
}
38+
39+
override fun cancel() {
40+
/** According to rule 3.5 of the
41+
* [reactive spec](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#3.5),
42+
* this method can be called by the subscriber at any point, so it's not an error if it's called
43+
* in this scenario. */
44+
}
45+
46+
})
47+
}
48+
var sum = 0
49+
publisher.collect {
50+
sum += it
51+
}
52+
assertEquals(xSum, sum)
53+
}
54+
55+
/** Tests the behavior of [collect] when the publisher raises an error. */
56+
@Test
57+
fun testCollectThrowingPublisher() = runTest {
58+
val errorString = "Too many elements requested"
59+
val x = 100
60+
val xSum = x * (x + 1) / 2
61+
val publisher = Publisher<Int> { subscriber ->
62+
var requested = 0L
63+
var lastOutput = 0
64+
subscriber.onSubscribe(object: Subscription {
65+
66+
override fun request(n: Long) {
67+
requested += n
68+
if (n <= 0) {
69+
subscriber.onError(IllegalArgumentException())
70+
return
71+
}
72+
while (lastOutput < x && lastOutput < requested) {
73+
lastOutput += 1
74+
subscriber.onNext(lastOutput)
75+
}
76+
if (lastOutput == x)
77+
subscriber.onError(IllegalArgumentException(errorString))
78+
}
79+
80+
override fun cancel() {
81+
/** See the comment for the corresponding part of [testCollect]. */
82+
}
83+
84+
})
85+
}
86+
var sum = 0
87+
try {
88+
publisher.collect {
89+
sum += it
90+
}
91+
} catch (e: IllegalArgumentException) {
92+
assertEquals(errorString, e.message)
93+
}
94+
assertEquals(xSum, sum)
95+
}
96+
97+
/** Tests the behavior of [collect] when the action throws. */
98+
@Test
99+
fun testCollectThrowingAction() = runTest {
100+
val errorString = "Too many elements produced"
101+
val x = 100
102+
val xSum = x * (x + 1) / 2
103+
val publisher = Publisher<Int> { subscriber ->
104+
var requested = 0L
105+
var lastOutput = 0
106+
subscriber.onSubscribe(object: Subscription {
107+
108+
override fun request(n: Long) {
109+
requested += n
110+
if (n <= 0) {
111+
subscriber.onError(IllegalArgumentException())
112+
return
113+
}
114+
while (lastOutput < x && lastOutput < requested) {
115+
lastOutput += 1
116+
subscriber.onNext(lastOutput)
117+
}
118+
}
119+
120+
override fun cancel() {
121+
assertEquals(x, lastOutput)
122+
expect(x + 2)
123+
}
124+
125+
})
126+
}
127+
var sum = 0
128+
try {
129+
expect(1)
130+
var i = 1
131+
publisher.collect {
132+
sum += it
133+
i += 1
134+
expect(i)
135+
if (sum >= xSum) {
136+
throw IllegalArgumentException(errorString)
137+
}
138+
}
139+
} catch (e: IllegalArgumentException) {
140+
expect(x + 3)
141+
assertEquals(errorString, e.message)
142+
}
143+
finish(x + 4)
144+
}
145+
}

0 commit comments

Comments
 (0)