@@ -311,6 +311,7 @@ class MonoTest : TestBase() {
311
311
* this is considered normal behavior and exceptions are not propagated. */
312
312
@Test
313
313
fun testDownstreamCancellationDoesNotThrow () = runTest {
314
+ var i = 0
314
315
/* * Attach a hook that handles exceptions from publishers that are known to be disposed of. We don't expect it
315
316
* to be fired in this case, as the reason for the publisher in this test to accept an exception is simply
316
317
* cancellation from the downstream. */
@@ -319,63 +320,66 @@ class MonoTest : TestBase() {
319
320
t
320
321
}
321
322
/* * A Mono that doesn't emit a value and instead waits indefinitely. */
322
- val mono = mono { expect(3 ); delay(Long .MAX_VALUE ) }
323
- .doOnSubscribe { expect(2 ) }
323
+ val mono = mono( Dispatchers . Unconfined ) { expect(5 * i + 3 ); delay(Long .MAX_VALUE ) }
324
+ .doOnSubscribe { expect(5 * i + 2 ) }
324
325
.doOnNext { expectUnreached() }
325
326
.doOnSuccess { expectUnreached() }
326
327
.doOnError { expectUnreached() }
327
- .doOnCancel { expect(4 ) }
328
- expect(1 )
329
- mono.awaitCancelAndJoin()
330
- finish(5 )
328
+ .doOnCancel { expect(5 * i + 4 ) }
329
+ val n = 1000
330
+ repeat(n) {
331
+ i = it
332
+ expect(5 * i + 1 )
333
+ mono.awaitCancelAndJoin()
334
+ expect(5 * i + 5 )
335
+ }
336
+ finish(5 * n + 1 )
331
337
Hooks .resetOnOperatorError(" testDownstreamCancellationDoesNotThrow" )
332
338
}
333
339
334
340
/* * Test that, when [Mono] is cancelled by the downstream and throws during handling the cancellation, the resulting
335
341
* error is propagated to [Hooks.onOperatorError]. */
336
342
@Test
337
343
fun testRethrowingDownstreamCancellation () = runTest {
344
+ var i = 0
338
345
/* * Attach a hook that handles exceptions from publishers that are known to be disposed of. We expect it
339
346
* to be fired in this case. */
340
347
Hooks .onOperatorError(" testDownstreamCancellationDoesNotThrow" ) { t, a ->
341
- expect(5 )
348
+ expect(i * 6 + 5 )
342
349
t
343
350
}
344
351
/* * A Mono that doesn't emit a value and instead waits indefinitely, and, when cancelled, throws. */
345
- val mono = mono {
346
- expect(3 );
352
+ val mono = mono( Dispatchers . Unconfined ) {
353
+ expect(i * 6 + 3 )
347
354
try {
348
355
delay(Long .MAX_VALUE )
349
356
} catch (e: CancellationException ) {
350
357
throw TestException ()
351
358
}
352
359
}
353
- .doOnSubscribe { expect(2 ) }
360
+ .doOnSubscribe { expect(i * 6 + 2 ) }
354
361
.doOnNext { expectUnreached() }
355
362
.doOnSuccess { expectUnreached() }
356
363
.doOnError { expectUnreached() }
357
- .doOnCancel { expect(4 ) }
358
- expect(1 )
359
- mono.awaitCancelAndJoin()
360
- finish(6 ) /* * if this line fails, see the comment for [awaitCancelAndJoin] */
364
+ .doOnCancel { expect(i * 6 + 4 ) }
365
+ val n = 1000
366
+ repeat(n) {
367
+ i = it
368
+ expect(i * 6 + 1 )
369
+ mono.awaitCancelAndJoin()
370
+ expect(i * 6 + 6 )
371
+ }
372
+ finish(n * 6 + 1 )
361
373
Hooks .resetOnOperatorError(" testDownstreamCancellationDoesNotThrow" )
362
374
}
363
375
364
- /* * Run the given [Mono ], cancel it, wait for the cancellation handler to finish, and * return only then* .
376
+ /* * Run the given [Publisher ], cancel it, wait for the cancellation handler to finish, and return only then.
365
377
*
366
- * There are no guarantees about the execution context in which the cancellation handler will run, but we have
367
- * to wait for it to finish to check its behavior. The contraption below seems to ensure that everything works out.
368
- * If it stops giving that guarantee, then [testRethrowingDownstreamCancellation] should fail more or less
369
- * consistently because the hook won't have enough time to fire before a call to [finish].
370
- */
371
- private suspend fun <T > Mono<T>.awaitCancelAndJoin () = coroutineScope {
372
- val job = async(start = CoroutineStart .UNDISPATCHED ) {
378
+ * Will not work in the general case, but here, when the publisher uses [Dispatchers.Unconfined], this seems to
379
+ * ensure that the cancellation handler will have nowhere to execute but serially with the cancellation. */
380
+ private suspend fun <T > Publisher<T>.awaitCancelAndJoin () = coroutineScope {
381
+ async(start = CoroutineStart .UNDISPATCHED ) {
373
382
awaitFirstOrNull()
374
- }
375
- newSingleThreadContext(" monoCancellationCleanup" ).use { pool ->
376
- launch(pool) {
377
- job.cancelAndJoin()
378
- }
379
- }.join()
383
+ }.cancelAndJoin()
380
384
}
381
385
}
0 commit comments