Skip to content

Consistently handle exceptions in reactive streams #2646

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Apr 20, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions reactive/kotlinx-coroutines-jdk9/test/FlowAsPublisherTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package kotlinx.coroutines.jdk9
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import org.junit.Test
import java.util.concurrent.CancellationException
import java.util.concurrent.Flow as JFlow
import kotlin.test.*

Expand All @@ -15,7 +16,7 @@ class FlowAsPublisherTest : TestBase() {
@Test
fun testErrorOnCancellationIsReported() {
expect(1)
flow<Int> {
flow {
try {
emit(2)
} finally {
Expand Down Expand Up @@ -50,13 +51,13 @@ class FlowAsPublisherTest : TestBase() {
@Test
fun testCancellationIsNotReported() {
expect(1)
flow<Int> {
flow {
emit(2)
}.asPublisher().subscribe(object : JFlow.Subscriber<Int> {
private lateinit var subscription: JFlow.Subscription

override fun onComplete() {
expect(3)
expectUnreached()
}

override fun onSubscribe(s: JFlow.Subscription?) {
Expand All @@ -73,6 +74,21 @@ class FlowAsPublisherTest : TestBase() {
expectUnreached()
}
})
finish(3)
}

@Test
fun testFlowWithTimeout() = runTest {
val publisher = flow<Int> {
expect(2)
withTimeout(1) { delay(Long.MAX_VALUE) }
}.asPublisher()
try {
expect(1)
publisher.awaitFirstOrNull()
} catch (e: CancellationException) {
expect(3)
}
finish(4)
}
}
15 changes: 14 additions & 1 deletion reactive/kotlinx-coroutines-jdk9/test/IntegrationTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package kotlinx.coroutines.jdk9

import kotlinx.coroutines.*
import kotlinx.coroutines.exceptions.*
import org.junit.Test
import kotlinx.coroutines.flow.flowOn
import org.junit.runner.*
Expand Down Expand Up @@ -129,4 +130,16 @@ class IntegrationTest(
assertEquals(n, last)
}

}
}

internal suspend inline fun <reified E: Throwable> assertCallsExceptionHandlerWith(
crossinline operation: suspend (CoroutineExceptionHandler) -> Unit): E =
CapturingHandler().let { handler ->
withContext(handler) {
operation(handler)
handler.getException().let {
assertTrue(it is E, it.toString())
it
}
}
}
161 changes: 133 additions & 28 deletions reactive/kotlinx-coroutines-jdk9/test/PublishTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@
package kotlinx.coroutines.jdk9

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import org.junit.Test
import java.lang.NullPointerException
import java.util.concurrent.*
import java.util.concurrent.CancellationException
import java.util.concurrent.Flow as JFlow
import kotlin.test.*

Expand Down Expand Up @@ -121,44 +125,110 @@ class PublishTest : TestBase() {
finish(7)
}

/** Tests that, as soon as `ProducerScope.close` is called, `isClosedForSend` starts returning `true`. */
@Test
fun testOnNextError() = runTest {
fun testChannelClosing() = runTest {
expect(1)
val publisher = flowPublish(currentDispatcher()) {
val publisher = flowPublish<Int>(Dispatchers.Unconfined) {
expect(3)
close()
assert(isClosedForSend)
expect(4)
try {
send("OK")
} catch(e: Throwable) {
expect(6)
assert(e is TestException)
}
}
expect(2)
try {
expect(2)
publisher.awaitFirstOrNull()
} catch (e: CancellationException) {
expect(5)
}
finish(6)
}

@Test
fun testOnNextError() = runTest {
val latch = CompletableDeferred<Unit>()
publisher.subscribe(object : JFlow.Subscriber<String> {
override fun onComplete() {
expectUnreached()
expect(1)
assertCallsExceptionHandlerWith<TestException> { exceptionHandler ->
val publisher = flowPublish(currentDispatcher() + exceptionHandler) {
expect(4)
try {
send("OK")
} catch (e: Throwable) {
expect(6)
assert(e is TestException)
assert(isClosedForSend)
latch.complete(Unit)
}
}
expect(2)
publisher.subscribe(object : JFlow.Subscriber<String> {
override fun onComplete() {
expectUnreached()
}

override fun onSubscribe(s: JFlow.Subscription) {
expect(3)
s.request(1)
}
override fun onSubscribe(s: JFlow.Subscription) {
expect(3)
s.request(1)
}

override fun onNext(t: String) {
expect(5)
assertEquals("OK", t)
throw TestException()
}
override fun onNext(t: String) {
expect(5)
assertEquals("OK", t)
throw TestException()
}

override fun onError(t: Throwable) {
expect(7)
assert(t is TestException)
latch.complete(Unit)
override fun onError(t: Throwable) {
expectUnreached()
}
})
latch.await()
}
finish(7)
}

/** Tests the behavior when a call to `onNext` fails after the channel is already closed. */
@Test
fun testOnNextErrorAfterCancellation() = runTest {
assertCallsExceptionHandlerWith<TestException> { handler ->
var producerScope: ProducerScope<Int>? = null
CompletableDeferred<Unit>()
expect(1)
var job: Job? = null
val publisher = flowPublish<Int>(handler + Dispatchers.Unconfined) {
producerScope = this
expect(4)
job = launch {
delay(Long.MAX_VALUE)
}
}
})
latch.await()
finish(8)
expect(2)
publisher.subscribe(object: JFlow.Subscriber<Int> {
override fun onSubscribe(s: JFlow.Subscription) {
expect(3)
s.request(Long.MAX_VALUE)
}
override fun onNext(t: Int) {
expect(6)
assertEquals(1, t)
job!!.cancel()
throw TestException()
}
override fun onError(t: Throwable?) {
/* Correct changes to the implementation could lead to us entering or not entering this method, but
it only matters that if we do, it is the "correct" exception that was validly used to cancel the
coroutine that gets passed here and not `TestException`. */
assertTrue(t is CancellationException)
}
override fun onComplete() { expectUnreached() }
})
expect(5)
val result: ChannelResult<Unit> = producerScope!!.trySend(1)
val e = result.exceptionOrNull()!!
assertTrue(e is CancellationException, "The actual error: $e")
assertTrue(producerScope!!.isClosedForSend)
assertTrue(result.isFailure)
}
finish(7)
}

@Test
Expand All @@ -182,4 +252,39 @@ class PublishTest : TestBase() {
fun testIllegalArgumentException() {
assertFailsWith<IllegalArgumentException> { flowPublish<Int>(Job()) { } }
}

/** Tests that `trySend` doesn't throw in `flowPublish`. */
@Test
fun testTrySendNotThrowing() = runTest {
var producerScope: ProducerScope<Int>? = null
expect(1)
val publisher = flowPublish<Int>(Dispatchers.Unconfined) {
producerScope = this
expect(3)
delay(Long.MAX_VALUE)
}
val job = launch(start = CoroutineStart.UNDISPATCHED) {
expect(2)
publisher.awaitFirstOrNull()
expectUnreached()
}
job.cancel()
expect(4)
val result = producerScope!!.trySend(1)
assertTrue(result.isFailure)
finish(5)
}

/** Tests that all methods on `flowPublish` fail without closing the channel when attempting to emit `null`. */
@Test
fun testEmittingNull() = runTest {
val publisher = flowPublish {
assertFailsWith<NullPointerException> { send(null) }
assertFailsWith<NullPointerException> { trySend(null) }
@Suppress("DEPRECATION")
assertFailsWith<NullPointerException> { offer(null) }
send("OK")
}
assertEquals("OK", publisher.awaitFirstOrNull())
}
}
Loading