From 83e14e27fdc053fe0c14525fbe9c354f9182fbc0 Mon Sep 17 00:00:00 2001 From: Maksim Zuev Date: Thu, 1 Aug 2024 16:46:07 +0200 Subject: [PATCH 1/3] Fix flaky test, ensure job suspension where expected by the test --- .../jvm/test/ThreadContextElementTest.kt | 27 ++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/kotlinx-coroutines-core/jvm/test/ThreadContextElementTest.kt b/kotlinx-coroutines-core/jvm/test/ThreadContextElementTest.kt index 3b106c440d..50319142dc 100644 --- a/kotlinx-coroutines-core/jvm/test/ThreadContextElementTest.kt +++ b/kotlinx-coroutines-core/jvm/test/ThreadContextElementTest.kt @@ -1,10 +1,13 @@ package kotlinx.coroutines +import kotlinx.coroutines.flow.* import kotlinx.coroutines.testing.* import org.junit.Test +import java.util.* +import java.util.concurrent.* +import kotlin.collections.ArrayList import kotlin.coroutines.* import kotlin.test.* -import kotlinx.coroutines.flow.* class ThreadContextElementTest : TestBase() { @@ -169,15 +172,32 @@ class ThreadContextElementTest : TestBase() { } } + /** + * For stability of the test, it is important to make sure that + * the parent job actually suspends when calling + * `withContext(dispatcher2 + CoroutineName("dispatched"))`. + * + * Here this requirement is fulfilled by forcing execution on a single thread. + * However, dispatching is performed with two non-equal dispatchers to simulate multithreaded behavior. + * + * Suspend of the parent coroutine [kotlinx.coroutines.DispatchedCoroutine.trySuspend] is out of the control of the test, + * while being executed concurrently with resume of the child coroutine [kotlinx.coroutines.DispatchedCoroutine.tryResume]. + */ @Test fun testWithContextJobAccess() = runTest { + val executor = Executors.newSingleThreadExecutor() + // Emulate non-equal dispatchers + val executor1 = object : ExecutorService by executor {} + val executor2 = object : ExecutorService by executor {} + val dispatcher1 = executor1.asCoroutineDispatcher() + val dispatcher2 = executor2.asCoroutineDispatcher() val captor = JobCaptor() val manuallyCaptured = ArrayList() - runBlocking(captor) { + runBlocking(captor + dispatcher1) { manuallyCaptured += coroutineContext.job withContext(CoroutineName("undispatched")) { manuallyCaptured += coroutineContext.job - withContext(Dispatchers.IO) { + withContext(dispatcher2 + CoroutineName("dispatched")) { manuallyCaptured += coroutineContext.job } // Context restored, captured again @@ -188,6 +208,7 @@ class ThreadContextElementTest : TestBase() { } assertEquals(manuallyCaptured, captor.capturees) + executor.shutdownNow() } @Test From 5f8e33ce8140fa5f28ff6734cdfbae753d060c69 Mon Sep 17 00:00:00 2001 From: Maksim Zuev Date: Thu, 1 Aug 2024 16:54:38 +0200 Subject: [PATCH 2/3] Add clarifying comment --- kotlinx-coroutines-core/jvm/test/ThreadContextElementTest.kt | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/kotlinx-coroutines-core/jvm/test/ThreadContextElementTest.kt b/kotlinx-coroutines-core/jvm/test/ThreadContextElementTest.kt index 50319142dc..57af32411f 100644 --- a/kotlinx-coroutines-core/jvm/test/ThreadContextElementTest.kt +++ b/kotlinx-coroutines-core/jvm/test/ThreadContextElementTest.kt @@ -197,6 +197,10 @@ class ThreadContextElementTest : TestBase() { manuallyCaptured += coroutineContext.job withContext(CoroutineName("undispatched")) { manuallyCaptured += coroutineContext.job + // Without forcing of single backing thread the code inside `withContext` + // may already complete at the moment when the parent coroutine decides + // whether it needs to suspend or not. + // If the parent coroutine does not need to suspend, no context capture will be called. withContext(dispatcher2 + CoroutineName("dispatched")) { manuallyCaptured += coroutineContext.job } From ddf372046b2e8fcee547364c260e42f492dec186 Mon Sep 17 00:00:00 2001 From: Maksim Zuev Date: Mon, 5 Aug 2024 11:35:05 +0200 Subject: [PATCH 3/3] Collect restore points to clarify the test failure reason --- .../jvm/test/ThreadContextElementTest.kt | 48 +++++++++++++------ 1 file changed, 34 insertions(+), 14 deletions(-) diff --git a/kotlinx-coroutines-core/jvm/test/ThreadContextElementTest.kt b/kotlinx-coroutines-core/jvm/test/ThreadContextElementTest.kt index 57af32411f..54e88677e1 100644 --- a/kotlinx-coroutines-core/jvm/test/ThreadContextElementTest.kt +++ b/kotlinx-coroutines-core/jvm/test/ThreadContextElementTest.kt @@ -3,9 +3,9 @@ package kotlinx.coroutines import kotlinx.coroutines.flow.* import kotlinx.coroutines.testing.* import org.junit.Test -import java.util.* -import java.util.concurrent.* -import kotlin.collections.ArrayList +import java.util.concurrent.CopyOnWriteArrayList +import java.util.concurrent.ExecutorService +import java.util.concurrent.Executors import kotlin.coroutines.* import kotlin.test.* @@ -158,17 +158,18 @@ class ThreadContextElementTest : TestBase() { } } - class JobCaptor(val capturees: ArrayList = ArrayList()) : ThreadContextElement { + class JobCaptor(val capturees: MutableList = CopyOnWriteArrayList()) : ThreadContextElement { companion object Key : CoroutineContext.Key override val key: CoroutineContext.Key<*> get() = Key override fun updateThreadContext(context: CoroutineContext) { - capturees.add(context.job) + capturees.add("Update: ${context.job}") } override fun restoreThreadContext(context: CoroutineContext, oldState: Unit) { + capturees.add("Restore: ${context.job}") } } @@ -178,7 +179,7 @@ class ThreadContextElementTest : TestBase() { * `withContext(dispatcher2 + CoroutineName("dispatched"))`. * * Here this requirement is fulfilled by forcing execution on a single thread. - * However, dispatching is performed with two non-equal dispatchers to simulate multithreaded behavior. + * However, dispatching is performed with two non-equal dispatchers to force dispatching. * * Suspend of the parent coroutine [kotlinx.coroutines.DispatchedCoroutine.trySuspend] is out of the control of the test, * while being executed concurrently with resume of the child coroutine [kotlinx.coroutines.DispatchedCoroutine.tryResume]. @@ -192,26 +193,45 @@ class ThreadContextElementTest : TestBase() { val dispatcher1 = executor1.asCoroutineDispatcher() val dispatcher2 = executor2.asCoroutineDispatcher() val captor = JobCaptor() - val manuallyCaptured = ArrayList() + val manuallyCaptured = mutableListOf() + + fun registerUpdate(job: Job?) = manuallyCaptured.add("Update: $job") + fun registerRestore(job: Job?) = manuallyCaptured.add("Restore: $job") + + var rootJob: Job? = null runBlocking(captor + dispatcher1) { - manuallyCaptured += coroutineContext.job + rootJob = coroutineContext.job + registerUpdate(rootJob) + var undispatchedJob: Job? = null withContext(CoroutineName("undispatched")) { - manuallyCaptured += coroutineContext.job + undispatchedJob = coroutineContext.job + registerUpdate(undispatchedJob) + // These 2 restores and the corresponding next 2 updates happen only if the following `withContext` + // call actually suspends. + registerRestore(undispatchedJob) + registerRestore(rootJob) // Without forcing of single backing thread the code inside `withContext` // may already complete at the moment when the parent coroutine decides // whether it needs to suspend or not. - // If the parent coroutine does not need to suspend, no context capture will be called. + var dispatchedJob: Job? = null withContext(dispatcher2 + CoroutineName("dispatched")) { - manuallyCaptured += coroutineContext.job + dispatchedJob = coroutineContext.job + registerUpdate(dispatchedJob) } + registerRestore(dispatchedJob) // Context restored, captured again - manuallyCaptured += coroutineContext.job + registerUpdate(undispatchedJob) } + registerRestore(undispatchedJob) // Context restored, captured again - manuallyCaptured += coroutineContext.job + registerUpdate(rootJob) } + registerRestore(rootJob) - assertEquals(manuallyCaptured, captor.capturees) + // Restores may be called concurrently to the update calls in other threads, so their order is not checked. + val expected = manuallyCaptured.filter { it.startsWith("Update: ") }.joinToString(separator = "\n") + val actual = captor.capturees.filter { it.startsWith("Update: ") }.joinToString(separator = "\n") + assertEquals(expected, actual) executor.shutdownNow() }