-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Complete mono { } on cancellation #2606
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Also, we've discussed that such behaviour creates unwanted inconsistency between |
@@ -32,6 +33,22 @@ public fun <T> mono( | |||
return monoInternal(GlobalScope, context, block) | |||
} | |||
|
|||
@Suppress("UNCHECKED_CAST") | |||
internal suspend fun <T> Mono<T>.await(): T? = (this as Mono<T?>).awaitOrDefault(null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These functions seem to be misplaced and should belong to test source-set
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a request for these functions (#1587), so I thought it would make sense to add them here (though this PR is unrelated) and to promote them to public
later, after adding documentation and tests for them.
*/ | ||
if (getCancellationException() !== cause) { | ||
/** Cancellation exceptions that were caused by [dispose], that is, came from downstream, are not errors. */ | ||
if (getCancellationException() !== cause || !disposed) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With such a check, we have a scenario when an exception is swallowed.
E.g. the following test:
@Test
fun testSwallowedException() = runTest {
val started = CountDownLatch(1)
val disposed = CountDownLatch(1)
val m = mono {
started.countDown()
disposed.await()
println("Running")
throw ArithmeticException()
}
val d = m.subscribe()
started.await()
d.dispose()
disposed.countDown()
}
will print Running
and then will completely ignore the arithmetic exception
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PTAL on discussion here #252 and the corresponding commit that fixes the problem
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have read the discussion and am not sure how it is applicable. Could you please clarify what behavior would you expect instead?
What happens here is that onError
gets called after m
is disposed of, and Reactor deals with errors from such objects by passing them to the onOperatorError
hook, which is unset in this test. Setting the hook leads to the exception being reported:
@Test
fun testSwallowedException() = runTest {
Hooks.onOperatorError("testSwallowedException") { t, a ->
expectUnreached()
t
}
val started = CountDownLatch(1)
val disposed = CountDownLatch(1)
val m = mono {
started.countDown()
disposed.await()
println("Running")
throw ArithmeticException()
}
val d = m.subscribe()
started.await()
d.dispose()
disposed.countDown()
}
successfully fails.
Moreover, it looks like using onOperatorError
is the approach we take for undeliverable exceptions in Reactor, so… this exception is already dealt with the way we usually deal with the undeliverable exceptions. What am I missing here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, sorry, my bad, I've misinterpreted the code. Thanks for the clarification!
Fixes #2262