@@ -15,7 +15,6 @@ import reactor.core.publisher.*
15
15
import reactor.util.context.*
16
16
import java.time.*
17
17
import java.time.Duration.*
18
- import java.util.concurrent.*
19
18
import java.util.function.*
20
19
import kotlin.test.*
21
20
@@ -326,20 +325,57 @@ class MonoTest : TestBase() {
326
325
.doOnSuccess { expectUnreached() }
327
326
.doOnError { expectUnreached() }
328
327
.doOnCancel { expect(4 ) }
329
- /* * There are no guarantees about the execution context in which the cancellation handler will run, but we have
330
- * to somehow make sure that [Hooks.resetOnOperatorError] occurs after that, as otherwise, the test will pass
331
- * successfully even for an incorrect implementation. This contraption seems to ensure that the cancellation
332
- * handler does complete before [finish] is called. */
333
- newSingleThreadContext(" testDownstreamCancellationDoesNotThrow" ).use { pool ->
334
- val job = launch(pool, start = CoroutineStart .UNDISPATCHED ) {
335
- expect(1 )
336
- mono.awaitFirstOrNull()
328
+ expect(1 )
329
+ mono.awaitCancelAndJoin()
330
+ finish(5 )
331
+ Hooks .resetOnOperatorError(" testDownstreamCancellationDoesNotThrow" )
332
+ }
333
+
334
+ /* * Test that, when [Mono] is cancelled by the downstream and throws during handling the cancellation, the resulting
335
+ * error is propagated to [Hooks.onOperatorError]. */
336
+ @Test
337
+ fun testRethrowingDownstreamCancellation () = runTest {
338
+ /* * Attach a hook that handles exceptions from publishers that are known to be disposed of. We expect it
339
+ * to be fired in this case. */
340
+ Hooks .onOperatorError(" testDownstreamCancellationDoesNotThrow" ) { t, a ->
341
+ expect(5 )
342
+ t
343
+ }
344
+ /* * A Mono that doesn't emit a value and instead waits indefinitely, and, when cancelled, throws. */
345
+ val mono = mono {
346
+ expect(3 );
347
+ try {
348
+ delay(Long .MAX_VALUE )
349
+ } catch (e: CancellationException ) {
350
+ throw TestException ()
337
351
}
352
+ }
353
+ .doOnSubscribe { expect(2 ) }
354
+ .doOnNext { expectUnreached() }
355
+ .doOnSuccess { expectUnreached() }
356
+ .doOnError { expectUnreached() }
357
+ .doOnCancel { expect(4 ) }
358
+ expect(1 )
359
+ mono.awaitCancelAndJoin()
360
+ finish(6 ) /* * if this line fails, see the comment for [awaitCancelAndJoin] */
361
+ Hooks .resetOnOperatorError(" testDownstreamCancellationDoesNotThrow" )
362
+ }
363
+
364
+ /* * Run the given [Mono], cancel it, wait for the cancellation handler to finish, and *return only then*.
365
+ *
366
+ * There are no guarantees about the execution context in which the cancellation handler will run, but we have
367
+ * to wait for it to finish to check its behavior. The contraption below seems to ensure that everything works out.
368
+ * If it stops giving that guarantee, then [testRethrowingDownstreamCancellation] should fail more or less
369
+ * consistently because the hook won't have enough time to fire before a call to [finish].
370
+ */
371
+ private suspend fun <T > Mono<T>.awaitCancelAndJoin () = coroutineScope {
372
+ val job = async(start = CoroutineStart .UNDISPATCHED ) {
373
+ awaitFirstOrNull()
374
+ }
375
+ newSingleThreadContext(" monoCancellationCleanup" ).use { pool ->
338
376
launch(pool) {
339
377
job.cancelAndJoin()
340
378
}
341
379
}.join()
342
- finish(5 )
343
- Hooks .resetOnOperatorError(" testDownstreamCancellationDoesNotThrow" )
344
380
}
345
381
}
0 commit comments