Skip to content

Commit 8c3ee43

Browse files
committed
Coroutines debugger should keep weak references to running coroutines
It should not prevent garbage-collection of coroutines that were otherwise lost, which included the following practically-useful cases: * Synchronous coroutines (iterator/sequence). * Lazy coroutines that were not started. * Abandoned coroutines that suspend forever without strong references to them in GlobalScope. Two kinds of tests cover this functionality: * A test via FieldWalker ensures that debugger impl does not keep a strong reference. This tests works fast and provides good diagnostics if anything goes wrong, but it is fragile, as futures changes to debugger my introduce static references to running coroutines elsewhere. * A stress-test that ensures that no OOM indeed happens when you run a lot of such lost coroutines. Longer-running, more stable to code change, but fragile in a difference sense as it may accidentally start passing in the future if lots of memory get allocated for tests. Fixes #2117
1 parent 75dede3 commit 8c3ee43

14 files changed

+579
-40
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,280 @@
1+
/*
2+
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.debug.internal
6+
7+
import kotlinx.atomicfu.*
8+
import kotlinx.coroutines.flow.*
9+
import kotlinx.coroutines.internal.*
10+
import java.lang.ref.*
11+
import java.util.concurrent.atomic.*
12+
13+
// This is very limited implementation, not suitable as a generic map replacement.
14+
// It has lock-free get and put with synchronized rehash for simplicity (and better CPU usage on contention)
15+
@OptIn(ExperimentalStdlibApi::class)
16+
@Suppress("UNCHECKED_CAST")
17+
internal class ConcurrentWeakMap<K : Any, V: Any> : AbstractMutableMap<K, V>() {
18+
private val _size = atomic(0)
19+
private val core = atomic(Core(MIN_CAPACITY))
20+
private val queue = ReferenceQueue<K>()
21+
22+
override val size: Int
23+
get() = _size.value
24+
25+
private fun decrementSize() { _size.decrementAndGet() }
26+
27+
/**
28+
* Proactively checks if any already-garbage-collected memory can be released.
29+
* Calling it periodically is not strictly required, as it is also checked on every put and rehash,
30+
* but calling it periodically helps in reducing memory utilization.
31+
*/
32+
fun cleanWeakRefs() {
33+
core.value.cleanWeakRefs()
34+
}
35+
36+
override fun get(key: K): V? = core.value.getImpl(key)
37+
38+
override fun put(key: K, value: V): V? {
39+
var oldValue = core.value.putImpl(key, value)
40+
if (oldValue === REHASH) oldValue = putSynchronized(key, value)
41+
if (oldValue == null) _size.incrementAndGet()
42+
return oldValue as V?
43+
}
44+
45+
@Synchronized
46+
private fun putSynchronized(key: K, value: V): V? {
47+
// Note: concurrent put leaves chance that we fail to put even after rehash, we retry until successful
48+
var curCore = core.value
49+
while (true) {
50+
val oldValue = curCore.putImpl(key, value)
51+
if (oldValue !== REHASH) return oldValue as V?
52+
curCore = curCore.rehash()
53+
core.value = curCore
54+
}
55+
}
56+
57+
override fun remove(key: K): V? {
58+
val oldValue = core.value.putImpl(key, null)
59+
assert(oldValue !== REHASH) // cannot happen when removing
60+
if (oldValue != null) _size.decrementAndGet()
61+
return oldValue as V?
62+
}
63+
64+
override val keys: MutableSet<K>
65+
get() = KeyValueSet { k, _ -> k }
66+
67+
override val entries: MutableSet<MutableMap.MutableEntry<K, V>>
68+
get() = KeyValueSet { k, v -> Entry(k, v) }
69+
70+
// We don't care much about clear's efficiency
71+
override fun clear() {
72+
for (k in keys) remove(k)
73+
}
74+
75+
@Suppress("UNCHECKED_CAST")
76+
private inner class Core(private val allocated: Int) {
77+
private val shift = allocated.countLeadingZeroBits() + 1
78+
private val threshold = 2 * allocated / 3 // max fill factor at 66% to ensure speedy lookups
79+
private val load = atomic(0) // counts how many slots are occupied in this core
80+
private val keys = AtomicReferenceArray<HashedWeakRef<K>?>(allocated)
81+
private val values = AtomicReferenceArray<Any?>(allocated)
82+
83+
private fun index(hash: Int) = (hash * MAGIC) ushr shift
84+
85+
// get is always lock-free, unwraps the value that was marked by concurrent rehash
86+
fun getImpl(key: K): V? {
87+
var index = index(key.hashCode())
88+
while (true) {
89+
val w = keys[index] ?: return null // not found
90+
val k = w.get()
91+
if (k == key) {
92+
val value = values[index]
93+
return (if (value is Marked) value.ref else value) as V?
94+
}
95+
if (k == null) removeCleanedAt(index) // weak ref was here, but collected
96+
if (index == 0) index = allocated
97+
index--
98+
}
99+
}
100+
101+
private fun removeCleanedAt(index: Int) {
102+
while (true) {
103+
val oldValue = values[index] ?: return // return when already removed
104+
if (oldValue is Marked) return // cannot remove marked (rehash is working on it, will not copy)
105+
if (values.compareAndSet(index, oldValue, null)) { // removed
106+
decrementSize()
107+
return
108+
}
109+
}
110+
}
111+
112+
// returns REHASH when rehash is needed (the value was not put)
113+
fun putImpl(key: K, value: V?, weakKey0: HashedWeakRef<K>? = null): Any? {
114+
cleanWeakRefs()
115+
var index = index(key.hashCode())
116+
var loadIncremented = false
117+
var weakKey: HashedWeakRef<K>? = weakKey0
118+
while (true) {
119+
val w = keys[index]
120+
if (w == null) { // slot empty => not found => try reserving slot
121+
if (value == null) return null // removing missing value, nothing to do here
122+
if (!loadIncremented) {
123+
// We must increment load before we even try to occupy a slot to avoid overfill during concurrent put
124+
if (load.incrementAndGet() >= threshold) return REHASH
125+
loadIncremented = true
126+
}
127+
if (weakKey == null) weakKey = HashedWeakRef(key, queue)
128+
if (keys.compareAndSet(index, null, weakKey)) break // slot reserved !!!
129+
continue // retry at this slot on CAS failure (somebody already reserved this slot)
130+
}
131+
val k = w.get()
132+
if (k == key) { // found already reserved slot at index
133+
if (loadIncremented) load.decrementAndGet() // undo increment, because found a slot
134+
break
135+
}
136+
if (k == null) removeCleanedAt(index) // weak ref was here, but collected
137+
if (index == 0) index = allocated
138+
index--
139+
}
140+
// update value
141+
var oldValue: Any?
142+
while (true) {
143+
oldValue = values[index]
144+
if (oldValue is Marked) return REHASH // rehash started, cannot work here
145+
if (values.compareAndSet(index, oldValue, value)) break
146+
}
147+
return oldValue as V?
148+
}
149+
150+
// only one thread can rehash, put maybe concurrent puts/gets
151+
fun rehash(): Core {
152+
// use size to approximate new required capacity to have at least 25-50% fill factor,
153+
// may fail due to concurrent modification, will retry
154+
retry@while (true) {
155+
val newCapacity = size.coerceAtLeast(MIN_CAPACITY / 4).takeHighestOneBit() * 4
156+
val newCore = Core(newCapacity)
157+
for (index in 0 until allocated) {
158+
// load the key
159+
val w = keys[index]
160+
val k = w?.get()
161+
if (w != null && k == null) removeCleanedAt(index) // weak ref was here, but collected
162+
// mark value so that it cannot be changed while we rehash to new core
163+
var value: Any?
164+
while (true) {
165+
value = values[index]
166+
if (value is Marked) { // already marked -- good
167+
value = value.ref
168+
break
169+
}
170+
// try mark
171+
if (values.compareAndSet(index, value, value.mark())) break
172+
}
173+
if (k != null && value != null) {
174+
val oldValue = newCore.putImpl(k, value as V, w)
175+
if (oldValue === REHASH) continue@retry // retry if we underestimated capacity
176+
assert(oldValue == null)
177+
}
178+
}
179+
return newCore // rehashed everything successfully
180+
}
181+
}
182+
183+
fun cleanWeakRefs() {
184+
while (true) {
185+
cleanRef(queue.poll() as HashedWeakRef<K>? ?: return)
186+
}
187+
}
188+
189+
private fun cleanRef(weakRef: HashedWeakRef<K>) {
190+
var index = index(weakRef.hash)
191+
while (true) {
192+
val w = keys[index] ?: return // return when slots are over
193+
if (w === weakRef) { // found
194+
removeCleanedAt(index)
195+
return
196+
}
197+
if (index == 0) index = allocated
198+
index--
199+
}
200+
}
201+
202+
fun <E> keyValueIterator(factory: (K, V) -> E): MutableIterator<E> {
203+
cleanWeakRefs()
204+
return KeyValueIterator(factory)
205+
}
206+
207+
private inner class KeyValueIterator<E>(private val factory: (K, V) -> E) : MutableIterator<E> {
208+
private var index = -1
209+
private lateinit var key: K
210+
private lateinit var value: V
211+
212+
init { findNext() }
213+
214+
private fun findNext() {
215+
while (++index < allocated) {
216+
key = keys[index]?.get() ?: continue
217+
var value = values[index]
218+
if (value is Marked) value = value.ref
219+
if (value != null) {
220+
this.value = value as V
221+
return
222+
}
223+
}
224+
}
225+
226+
override fun hasNext(): Boolean = index < allocated
227+
228+
override fun next(): E {
229+
if (index >= allocated) throw NoSuchElementException()
230+
return factory(key, value).also { findNext() }
231+
}
232+
233+
override fun remove() = noImpl()
234+
}
235+
}
236+
237+
private class Entry<K, V>(override val key: K, override val value: V) : MutableMap.MutableEntry<K, V> {
238+
override fun setValue(newValue: V): V = noImpl()
239+
}
240+
241+
private inner class KeyValueSet<E>(
242+
private val factory: (K, V) -> E
243+
) : AbstractMutableSet<E>() {
244+
override val size: Int get() = this@ConcurrentWeakMap.size
245+
override fun add(element: E): Boolean = noImpl()
246+
override fun iterator(): MutableIterator<E> = core.value.keyValueIterator(factory)
247+
}
248+
249+
private fun noImpl(): Nothing {
250+
throw UnsupportedOperationException("no implemented")
251+
}
252+
}
253+
254+
private const val MAGIC = 2654435769L.toInt() // golden ratio
255+
private const val MIN_CAPACITY = 16
256+
private val REHASH = Symbol("REHASH")
257+
private val MARKED_NULL = Marked(null)
258+
private val MARKED_TRUE = Marked(true) // When using map as set "true" used as value, optimize its mark allocation
259+
260+
/**
261+
* Weak reference that stores the original hash code so that we can use reference queue to promptly clean them up
262+
* from the hashtable even in the absence of ongoing modifications.
263+
*/
264+
private class HashedWeakRef<T>(ref: T, queue: ReferenceQueue<T>) : WeakReference<T>(ref, queue) {
265+
@JvmField
266+
val hash = ref.hashCode()
267+
}
268+
269+
/**
270+
* Marked values cannot be modified. The marking is performed when rehash has started to ensure that concurrent
271+
* modifications (that are lock-free) cannot perform any changes and are forced to synchronize with ongoing rehash.
272+
*/
273+
private class Marked(@JvmField val ref: Any?)
274+
275+
private fun Any?.mark(): Marked = when(this) {
276+
null -> MARKED_NULL
277+
true -> MARKED_TRUE
278+
else -> Marked(this)
279+
}
280+

kotlinx-coroutines-core/jvm/src/debug/internal/DebugCoroutineInfo.kt

+19-3
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
package kotlinx.coroutines.debug.internal
66

7+
import java.lang.ref.*
78
import kotlin.coroutines.*
89
import kotlin.coroutines.jvm.internal.*
910

@@ -12,10 +13,18 @@ internal const val RUNNING = "RUNNING"
1213
internal const val SUSPENDED = "SUSPENDED"
1314

1415
internal class DebugCoroutineInfo(
15-
public val context: CoroutineContext,
16+
context: CoroutineContext?,
1617
public val creationStackBottom: CoroutineStackFrame?,
1718
@JvmField internal val sequenceNumber: Long
1819
) {
20+
/**
21+
* We cannot keep a strong reference to the context, because with the [Job] in the context it will indirectly
22+
* keep a reference to the last frame of an abandoned coroutine which the debugger should not be preventing
23+
* garbage-collection of. The reference to context will not disappear as long as the coroutine itself is not lost.
24+
*/
25+
private val _context = WeakReference(context)
26+
public val context: CoroutineContext?
27+
get() = _context.get()
1928

2029
public val creationStackTrace: List<StackTraceElement> get() = creationStackTrace()
2130

@@ -28,8 +37,15 @@ internal class DebugCoroutineInfo(
2837

2938
@JvmField
3039
internal var lastObservedThread: Thread? = null
31-
@JvmField
32-
internal var lastObservedFrame: CoroutineStackFrame? = null
40+
41+
/**
42+
* We cannot keep a strong reference to the last observed frame of the coroutine, because this will
43+
* prevent garbage-collection of a coroutine that was lost.
44+
*/
45+
private var _lastObservedFrame: WeakReference<CoroutineStackFrame>? = null
46+
internal var lastObservedFrame: CoroutineStackFrame?
47+
get() = _lastObservedFrame?.get()
48+
set(value) { _lastObservedFrame = value?.let { WeakReference(it) } }
3349

3450
public fun copy(): DebugCoroutineInfo = DebugCoroutineInfo(
3551
context,

0 commit comments

Comments
 (0)