diff --git a/kotlinx-coroutines-core/jvm/src/Builders.kt b/kotlinx-coroutines-core/jvm/src/Builders.kt index 3fed0ad9cb..8f72e28606 100644 --- a/kotlinx-coroutines-core/jvm/src/Builders.kt +++ b/kotlinx-coroutines-core/jvm/src/Builders.kt @@ -91,12 +91,11 @@ private class BlockingCoroutine( eventLoop?.incrementUseCount() try { while (true) { - @Suppress("DEPRECATION") - if (Thread.interrupted()) throw InterruptedException().also { cancelCoroutine(it) } val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE // note: process next even may loose unpark flag, so check if completed before parking if (isCompleted) break parkNanos(this, parkNanos) + if (Thread.interrupted()) cancelCoroutine(InterruptedException()) } } finally { // paranoia eventLoop?.decrementUseCount() diff --git a/kotlinx-coroutines-core/jvm/test/ExecutorsTest.kt b/kotlinx-coroutines-core/jvm/test/ExecutorsTest.kt index 965b8fc0be..3c60407bb9 100644 --- a/kotlinx-coroutines-core/jvm/test/ExecutorsTest.kt +++ b/kotlinx-coroutines-core/jvm/test/ExecutorsTest.kt @@ -120,54 +120,6 @@ class ExecutorsTest : TestBase() { check(executorService.isShutdown) } - @Test - fun testEarlyExecutorShutdown() { - runTestExceptionInDispatch(6, { it is RejectedExecutionException }) { - expect(1) - val dispatcher = newSingleThreadContext("Ctx") - launch(dispatcher) { - withContext(Dispatchers.Default) { - expect(2) - delay(100) - expect(4) - } - } - - delay(50) - expect(3) - - dispatcher.close() - } - } - - @Test - fun testExceptionInDispatch() { - runTestExceptionInDispatch(5, { it is TestException }) { - val dispatcher = object : CoroutineDispatcher() { - private var closed = false - override fun dispatch(context: CoroutineContext, block: Runnable) { - if (closed) throw TestException() - Dispatchers.Default.dispatch(context, block) - } - - fun close() { - closed = true - } - } - launch(dispatcher) { - withContext(Dispatchers.Default) { - expect(1) - delay(100) - expect(3) - } - } - - delay(50) - expect(2) - dispatcher.close() - } - } - @Test fun testExceptionInIsDispatchNeeded() { val dispatcher = object : CoroutineDispatcher() { @@ -194,31 +146,4 @@ class ExecutorsTest : TestBase() { finish(4) } } - - private fun runTestExceptionInDispatch( - totalSteps: Int, - isExpectedException: (Throwable) -> Boolean, - block: suspend CoroutineScope.() -> Unit, - ) { - var mainThread: Thread? = null - val exceptionHandler = CoroutineExceptionHandler { _, e -> - if (isExpectedException(e)) { - expect(totalSteps - 1) - mainThread!!.run { - interrupt() - unpark(this) - } - } else { - expectUnreached() - } - } - try { - runBlocking(exceptionHandler) { - block() - mainThread = Thread.currentThread() - } - } catch (_: InterruptedException) { - finish(totalSteps) - } - } } diff --git a/kotlinx-coroutines-core/jvm/test/RunBlockingJvmTest.kt b/kotlinx-coroutines-core/jvm/test/RunBlockingJvmTest.kt index cc2291e6c1..dcb908cc3a 100644 --- a/kotlinx-coroutines-core/jvm/test/RunBlockingJvmTest.kt +++ b/kotlinx-coroutines-core/jvm/test/RunBlockingJvmTest.kt @@ -1,7 +1,11 @@ package kotlinx.coroutines import kotlinx.coroutines.testing.* -import org.junit.* +import java.util.concurrent.CountDownLatch +import java.util.concurrent.atomic.AtomicReference +import kotlin.concurrent.thread +import kotlin.test.* +import kotlin.time.Duration class RunBlockingJvmTest : TestBase() { @Test @@ -12,5 +16,177 @@ class RunBlockingJvmTest : TestBase() { } rb.hashCode() // unused } -} + /** Tests that the [runBlocking] coroutine runs to completion even it was interrupted. */ + @Test + fun testFinishingWhenInterrupted() { + startInSeparateThreadAndInterrupt { mayInterrupt -> + expect(1) + try { + runBlocking { + try { + mayInterrupt() + expect(2) + delay(Duration.INFINITE) + } finally { + withContext(NonCancellable) { + expect(3) + repeat(10) { yield() } + expect(4) + } + } + } + } catch (_: InterruptedException) { + expect(5) + } + } + finish(6) + } + + /** Tests that [runBlocking] will exit if it gets interrupted. */ + @Test + fun testCancellingWhenInterrupted() { + startInSeparateThreadAndInterrupt { mayInterrupt -> + expect(1) + try { + runBlocking { + try { + mayInterrupt() + expect(2) + delay(Duration.INFINITE) + } catch (_: CancellationException) { + expect(3) + } + } + } catch (_: InterruptedException) { + expect(4) + } + } + finish(5) + } + + /** Tests that [runBlocking] does not check for interruptions before the first attempt to suspend, + * as no blocking actually happens. */ + @Test + fun testInitialPortionRunningDespiteInterruptions() { + Thread.currentThread().interrupt() + runBlocking { + expect(1) + try { + Thread.sleep(Long.MAX_VALUE) + } catch (_: InterruptedException) { + expect(2) + } + } + assertFalse(Thread.interrupted()) + finish(3) + } + + /** + * Tests that [runBlockingNonInterruptible] is going to run its job to completion even if it gets interrupted + * or if thread switches occur. + */ + @Test + fun testNonInterruptibleRunBlocking() { + startInSeparateThreadAndInterrupt { mayInterrupt -> + val v = runBlockingNonInterruptible { + mayInterrupt() + repeat(10) { + expect(it + 1) + delay(1) + } + 42 + } + assertTrue(Thread.interrupted()) + assertEquals(42, v) + expect(11) + } + finish(12) + } + + /** + * Tests that [runBlockingNonInterruptible] is going to run its job to completion even if it gets interrupted + * or if thread switches occur, and then will rethrow the exception thrown by the job. + */ + @Test + fun testNonInterruptibleRunBlockingFailure() { + val exception = AssertionError() + startInSeparateThreadAndInterrupt { mayInterrupt -> + val exception2 = assertFailsWith { + runBlockingNonInterruptible { + mayInterrupt() + repeat(10) { + expect(it + 1) + // even thread switches should not be a problem + withContext(Dispatchers.IO) { + delay(1) + } + } + throw exception + } + } + assertTrue(Thread.interrupted()) + assertSame(exception, exception2) + expect(11) + } + finish(12) + } + + + /** + * Tests that [runBlockingNonInterruptible] is going to run its job to completion even if it gets interrupted + * or if thread switches occur. + */ + @Test + fun testNonInterruptibleRunBlockingPropagatingInterruptions() { + val exception = AssertionError() + startInSeparateThreadAndInterrupt { mayInterrupt -> + runBlockingNonInterruptible { + mayInterrupt() + try { + Thread.sleep(Long.MAX_VALUE) + } catch (_: InterruptedException) { + expect(1) + } + } + expect(2) + assertFalse(Thread.interrupted()) + } + finish(3) + } + + /** + * Tests that starting [runBlockingNonInterruptible] in an interrupted thread does not affect the result. + */ + @Test + fun testNonInterruptibleRunBlockingStartingInterrupted() { + Thread.currentThread().interrupt() + val v = runBlockingNonInterruptible { 42 } + assertEquals(42, v) + assertTrue(Thread.interrupted()) + } + + private fun startInSeparateThreadAndInterrupt(action: (mayInterrupt: () -> Unit) -> Unit) { + val latch = CountDownLatch(1) + val thread = thread { + action { latch.countDown() } + } + latch.await() + thread.interrupt() + thread.join() + } + + private fun runBlockingNonInterruptible(action: suspend () -> T): T { + val result = AtomicReference>() + try { + runBlocking { + withContext(NonCancellable) { + result.set(runCatching { action() }) + } + } + } catch (_: InterruptedException) { + Thread.currentThread().interrupt() // restore the interrupted flag + } + return result.get().getOrThrow() + } +}