From 4c69d1c3f476c618778fa9ea87502ef6f2f88997 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Wed, 18 Dec 2024 17:09:23 +0100 Subject: [PATCH 1/3] Properly cleanup thread locals for non-CoroutineDispatcher-intercepted continuations There was a one codepath not covered by undispatched thread local cleanup procedure: when a custom ContinuationInterceptor is used and the scoped coroutine (i.e. withContext) is completed in-place without suspensions. Fixed with the introduction of the corresponding machinery for ScopeCoroutine Fixes #4296 --- .../common/src/AbstractCoroutine.kt | 10 ++ .../common/src/internal/Scopes.kt | 7 ++ .../common/src/intrinsics/Undispatched.kt | 1 + .../jvm/src/CoroutineContext.kt | 18 +++- .../jvm/test/ThreadLocalsLeaksTest.kt | 95 +++++++++++++++++++ 5 files changed, 126 insertions(+), 5 deletions(-) create mode 100644 kotlinx-coroutines-core/jvm/test/ThreadLocalsLeaksTest.kt diff --git a/kotlinx-coroutines-core/common/src/AbstractCoroutine.kt b/kotlinx-coroutines-core/common/src/AbstractCoroutine.kt index d2f79c1381..2d6273acc9 100644 --- a/kotlinx-coroutines-core/common/src/AbstractCoroutine.kt +++ b/kotlinx-coroutines-core/common/src/AbstractCoroutine.kt @@ -5,6 +5,7 @@ package kotlinx.coroutines import kotlinx.coroutines.CoroutineStart.* import kotlinx.coroutines.intrinsics.* import kotlin.coroutines.* +import kotlinx.coroutines.internal.ScopeCoroutine /** * Abstract base class for implementation of coroutines in coroutine builders. @@ -100,6 +101,15 @@ public abstract class AbstractCoroutine( afterResume(state) } + /** + * Invoked when the corresponding `AbstractCoroutine` was **conceptually** resumed, but not mechanically. + * Currently, this function only invokes `resume` on the underlying continuation for [ScopeCoroutine] + * or does nothing otherwise. + * + * Examples of resumes: + * - `afterCompletion` calls when the corresponding `Job` changed its state (i.e. got cancelled) + * - [AbstractCoroutine.resumeWith] was invoked + */ protected open fun afterResume(state: Any?): Unit = afterCompletion(state) internal final override fun handleOnCompletionException(exception: Throwable) { diff --git a/kotlinx-coroutines-core/common/src/internal/Scopes.kt b/kotlinx-coroutines-core/common/src/internal/Scopes.kt index 7e561c83dc..9b830bd5c9 100644 --- a/kotlinx-coroutines-core/common/src/internal/Scopes.kt +++ b/kotlinx-coroutines-core/common/src/internal/Scopes.kt @@ -23,6 +23,13 @@ internal open class ScopeCoroutine( uCont.intercepted().resumeCancellableWith(recoverResult(state, uCont)) } + /** + * Invoked when a scoped coorutine was completed in an undispatched manner directly + * at the place of its start because it never suspended. + */ + open fun afterCompletionUndispatched() { + } + override fun afterResume(state: Any?) { // Resume direct because scope is already in the correct context uCont.resumeWith(recoverResult(state, uCont)) diff --git a/kotlinx-coroutines-core/common/src/intrinsics/Undispatched.kt b/kotlinx-coroutines-core/common/src/intrinsics/Undispatched.kt index 254182b387..511199701a 100644 --- a/kotlinx-coroutines-core/common/src/intrinsics/Undispatched.kt +++ b/kotlinx-coroutines-core/common/src/intrinsics/Undispatched.kt @@ -79,6 +79,7 @@ private inline fun ScopeCoroutine.undispatchedResult( if (result === COROUTINE_SUSPENDED) return COROUTINE_SUSPENDED // (1) val state = makeCompletingOnce(result) if (state === COMPLETING_WAITING_CHILDREN) return COROUTINE_SUSPENDED // (2) + afterCompletionUndispatched() return if (state is CompletedExceptionally) { // (3) when { shouldThrow(state.cause) -> throw recoverStackTrace(state.cause, uCont) diff --git a/kotlinx-coroutines-core/jvm/src/CoroutineContext.kt b/kotlinx-coroutines-core/jvm/src/CoroutineContext.kt index ae8275f86f..3182beff92 100644 --- a/kotlinx-coroutines-core/jvm/src/CoroutineContext.kt +++ b/kotlinx-coroutines-core/jvm/src/CoroutineContext.kt @@ -253,18 +253,26 @@ internal actual class UndispatchedCoroutineactual constructor ( } } + override fun afterCompletionUndispatched() { + clearThreadLocal() + } + override fun afterResume(state: Any?) { + clearThreadLocal() + // resume undispatched -- update context but stay on the same dispatcher + val result = recoverResult(state, uCont) + withContinuationContext(uCont, null) { + uCont.resumeWith(result) + } + } + + private fun clearThreadLocal() { if (threadLocalIsSet) { threadStateToRecover.get()?.let { (ctx, value) -> restoreThreadContext(ctx, value) } threadStateToRecover.remove() } - // resume undispatched -- update context but stay on the same dispatcher - val result = recoverResult(state, uCont) - withContinuationContext(uCont, null) { - uCont.resumeWith(result) - } } } diff --git a/kotlinx-coroutines-core/jvm/test/ThreadLocalsLeaksTest.kt b/kotlinx-coroutines-core/jvm/test/ThreadLocalsLeaksTest.kt new file mode 100644 index 0000000000..f3eccae5f9 --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/ThreadLocalsLeaksTest.kt @@ -0,0 +1,95 @@ +package kotlinx.coroutines + +import kotlinx.coroutines.testing.TestBase +import java.lang.ref.WeakReference +import kotlin.coroutines.AbstractCoroutineContextElement +import kotlin.coroutines.Continuation +import kotlin.coroutines.ContinuationInterceptor +import kotlin.coroutines.CoroutineContext +import kotlin.test.Test +import kotlin.test.assertNull + +/* + * This is an adapted verion of test from #4296. + * + * qwwdfsad: the test relies on System.gc() actually collecting the garbage. + * If these tests flake on CI, first check that JDK/GC setup in not an issue. + */ +class ThreadLocalCustomContinuationInterceptorTest : TestBase() { + + private class CustomContinuationInterceptor(private val delegate: ContinuationInterceptor) : + AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor { + + override fun interceptContinuation(continuation: Continuation): Continuation { + return delegate.interceptContinuation(continuation) + } + } + + private class CustomNeverEqualContinuationInterceptor(private val delegate: ContinuationInterceptor) : + AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor { + + override fun interceptContinuation(continuation: Continuation): Continuation { + return delegate.interceptContinuation(continuation) + } + + override fun equals(other: Any?) = false + } + + @Test + fun testDefaultDispatcherNoSuspension() = ensureCoroutineContextGCed(Dispatchers.Default, suspend = false) + + @Test + fun testDefaultDispatcher() = ensureCoroutineContextGCed(Dispatchers.Default, suspend = true) + + + @Test + fun testNonCoroutineDispatcher() = ensureCoroutineContextGCed( + CustomContinuationInterceptor(Dispatchers.Default), + suspend = true + ) + + @Test + fun testNonCoroutineDispatcherSuspension() = ensureCoroutineContextGCed( + CustomContinuationInterceptor(Dispatchers.Default), + suspend = false + ) + + // Note asymmetric equals codepath never goes through the undispatched withContext, thus the separate test case + + @Test + fun testNonCoroutineDispatcherAsymmetricEquals() = + ensureCoroutineContextGCed( + CustomNeverEqualContinuationInterceptor(Dispatchers.Default), + suspend = true + ) + + @Test + fun testNonCoroutineDispatcherAsymmetricEqualsSuspension() = + ensureCoroutineContextGCed( + CustomNeverEqualContinuationInterceptor(Dispatchers.Default), + suspend = false + ) + + + private fun ensureCoroutineContextGCed(coroutineContext: CoroutineContext, suspend: Boolean) { + runTest { + lateinit var ref: WeakReference + val job = GlobalScope.launch(coroutineContext) { + val coroutineName = CoroutineName("Yo") + ref = WeakReference(coroutineName) + withContext(coroutineName) { + if (suspend) { + delay(1) + } + } + } + job.join() + + // Twice is enough to ensure + System.gc() + System.gc() + assertNull(ref.get()) + } + } + +} From c7c277957f4b82ce094bef07dce14af668826137 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Wed, 18 Dec 2024 20:19:17 +0100 Subject: [PATCH 2/3] Make tests both more robust and more depressing to look at --- .../jvm/src/CoroutineContext.kt | 3 ++- .../jvm/test/ThreadLocalsLeaksTest.kt | 24 ++++++++++--------- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/kotlinx-coroutines-core/jvm/src/CoroutineContext.kt b/kotlinx-coroutines-core/jvm/src/CoroutineContext.kt index 3182beff92..7628d6ac85 100644 --- a/kotlinx-coroutines-core/jvm/src/CoroutineContext.kt +++ b/kotlinx-coroutines-core/jvm/src/CoroutineContext.kt @@ -185,7 +185,8 @@ internal actual class UndispatchedCoroutineactual constructor ( * `withContext` for the sake of logging, MDC, tracing etc., meaning that there exists thousands of * undispatched coroutines. * Each access to Java's [ThreadLocal] leaves a footprint in the corresponding Thread's `ThreadLocalMap` - * that is cleared automatically as soon as the associated thread-local (-> UndispatchedCoroutine) is garbage collected. + * that is cleared automatically as soon as the associated thread-local (-> UndispatchedCoroutine) is garbage collected + * when either the corresponding thread is GC'ed or it cleans up its stale entries on other TL accesses. * When such coroutines are promoted to old generation, `ThreadLocalMap`s become bloated and an arbitrary accesses to thread locals * start to consume significant amount of CPU because these maps are open-addressed and cleaned up incrementally on each access. * (You can read more about this effect as "GC nepotism"). diff --git a/kotlinx-coroutines-core/jvm/test/ThreadLocalsLeaksTest.kt b/kotlinx-coroutines-core/jvm/test/ThreadLocalsLeaksTest.kt index f3eccae5f9..0ead9dea66 100644 --- a/kotlinx-coroutines-core/jvm/test/ThreadLocalsLeaksTest.kt +++ b/kotlinx-coroutines-core/jvm/test/ThreadLocalsLeaksTest.kt @@ -35,20 +35,20 @@ class ThreadLocalCustomContinuationInterceptorTest : TestBase() { override fun equals(other: Any?) = false } - @Test + @Test(timeout = 20_000L) fun testDefaultDispatcherNoSuspension() = ensureCoroutineContextGCed(Dispatchers.Default, suspend = false) - @Test + @Test(timeout = 20_000L) fun testDefaultDispatcher() = ensureCoroutineContextGCed(Dispatchers.Default, suspend = true) - @Test + @Test(timeout = 20_000L) fun testNonCoroutineDispatcher() = ensureCoroutineContextGCed( CustomContinuationInterceptor(Dispatchers.Default), suspend = true ) - @Test + @Test(timeout = 20_000L) fun testNonCoroutineDispatcherSuspension() = ensureCoroutineContextGCed( CustomContinuationInterceptor(Dispatchers.Default), suspend = false @@ -56,14 +56,14 @@ class ThreadLocalCustomContinuationInterceptorTest : TestBase() { // Note asymmetric equals codepath never goes through the undispatched withContext, thus the separate test case - @Test + @Test(timeout = 20_000L) fun testNonCoroutineDispatcherAsymmetricEquals() = ensureCoroutineContextGCed( CustomNeverEqualContinuationInterceptor(Dispatchers.Default), suspend = true ) - @Test + @Test(timeout = 20_000L) fun testNonCoroutineDispatcherAsymmetricEqualsSuspension() = ensureCoroutineContextGCed( CustomNeverEqualContinuationInterceptor(Dispatchers.Default), @@ -71,6 +71,9 @@ class ThreadLocalCustomContinuationInterceptorTest : TestBase() { ) + @Volatile + private var letThatSinkIn: Any = "What is my purpose? To frag the garbage collctor" + private fun ensureCoroutineContextGCed(coroutineContext: CoroutineContext, suspend: Boolean) { runTest { lateinit var ref: WeakReference @@ -85,11 +88,10 @@ class ThreadLocalCustomContinuationInterceptorTest : TestBase() { } job.join() - // Twice is enough to ensure - System.gc() - System.gc() - assertNull(ref.get()) + while (ref.get() != null) { + System.gc() + letThatSinkIn = LongArray(1024 * 1024) + } } } - } From 1f4f4870f0cc3400e8f31ddccbb03fce11db87e9 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Thu, 19 Dec 2024 11:26:52 +0100 Subject: [PATCH 3/3] Make tests a little bit more reassuring --- .../jvm/test/ThreadLocalsLeaksTest.kt | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/kotlinx-coroutines-core/jvm/test/ThreadLocalsLeaksTest.kt b/kotlinx-coroutines-core/jvm/test/ThreadLocalsLeaksTest.kt index 0ead9dea66..19d42fe01c 100644 --- a/kotlinx-coroutines-core/jvm/test/ThreadLocalsLeaksTest.kt +++ b/kotlinx-coroutines-core/jvm/test/ThreadLocalsLeaksTest.kt @@ -7,7 +7,6 @@ import kotlin.coroutines.Continuation import kotlin.coroutines.ContinuationInterceptor import kotlin.coroutines.CoroutineContext import kotlin.test.Test -import kotlin.test.assertNull /* * This is an adapted verion of test from #4296. @@ -75,6 +74,13 @@ class ThreadLocalCustomContinuationInterceptorTest : TestBase() { private var letThatSinkIn: Any = "What is my purpose? To frag the garbage collctor" private fun ensureCoroutineContextGCed(coroutineContext: CoroutineContext, suspend: Boolean) { + fun forceGcUntilRefIsCleaned(ref: WeakReference) { + while (ref.get() != null) { + System.gc() + letThatSinkIn = LongArray(1024 * 1024) + } + } + runTest { lateinit var ref: WeakReference val job = GlobalScope.launch(coroutineContext) { @@ -88,10 +94,7 @@ class ThreadLocalCustomContinuationInterceptorTest : TestBase() { } job.join() - while (ref.get() != null) { - System.gc() - letThatSinkIn = LongArray(1024 * 1024) - } + forceGcUntilRefIsCleaned(ref) } } }