Skip to content

Commit bb3f0a1

Browse files
committed
Fix suspend-resume race in DebugProbes
### Reproducing scenario For the following code: ``` suspend fun foo() = yield() ``` the following bytecode is produced: ``` fun foo(...) { uCont.intercepted().dispatchUsingDispatcher() // 1 probeCoroutineSuspended() // 2 return COROUTINE_SUSPENDED // Unroll the stack } ``` And it is possible to observe the next 'probeCoroutineSuspended' **prior** to receiving 'probeCoroutineSuspended'. ### Steps taken To address this, a dedicated 'unmatchedResumes' field is introduced to the coroutine state, which keeps track of consecutive 'probeCoroutineResumed' without matching 'probeCoroutineSuspended' and attributes lately arrived probes to it. Unfortunately, it introduces a much more unlikely race when **two** 'probeCoroutineSuspended' are reordered, then we misattribute 'lastObservedFrame', but it is still better than misattributing the actual coroutine state. @volatile and @synchronized are also introduced to DebugCoroutineInfoImpl as previously they have been subject to data races as well Fixes #3193
1 parent 33220fc commit bb3f0a1

File tree

3 files changed

+141
-16
lines changed

3 files changed

+141
-16
lines changed

kotlinx-coroutines-core/jvm/src/debug/internal/DebugCoroutineInfoImpl.kt

+80-13
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,86 @@ internal class DebugCoroutineInfoImpl(
4141
* Can be CREATED, RUNNING, SUSPENDED.
4242
*/
4343
public val state: String get() = _state
44+
4445
@Volatile
4546
private var _state: String = CREATED
4647

48+
/*
49+
* How many consecutive unmatched 'updateState(RESUMED)' this object has received.
50+
* It can be `> 1` in two cases:
51+
*
52+
* * The coroutine is finishing and its state is being unrolled in BaseContinuationImpl, see comment to DebugProbesImpl#callerInfoCache
53+
* Such resumes are not expected to be matched and are ignored.
54+
* * We encountered suspend-resume race explained above, and we do wait for a match.
55+
*/
56+
private var unmatchedResume = 0
57+
58+
/**
59+
* Here we orchestrate overlapping state updates that are coming asynchronously.
60+
* In a nutshell, `probeCoroutineSuspended` can arrive **later** than its matching `probeCoroutineResumed`,
61+
* e.g. for the following code:
62+
* ```
63+
* suspend fun foo() = yield()
64+
* ```
65+
*
66+
* we have this sequence:
67+
* ```
68+
* fun foo(...) {
69+
* uCont.intercepted().dispatchUsingDispatcher() // 1
70+
* // Notify the debugger the coroutine is suspended
71+
* probeCoroutineSuspended() // 2
72+
* return COROUTINE_SUSPENDED // Unroll the stack
73+
* }
74+
* ```
75+
* Nothing prevents coroutine to be dispatched and invoke `probeCoroutineResumed` right between '1' and '2'.
76+
* See also: https://github.com/Kotlin/kotlinx.coroutines/issues/3193
77+
*
78+
* [shouldBeMatched] -- `false` if it is an expected consecutive `probeCoroutineResumed` from BaseContinuationImpl,
79+
* `true` otherwise.
80+
*/
81+
@Synchronized
82+
internal fun updateState(state: String, frame: Continuation<*>, shouldBeMatched: Boolean) {
83+
/**
84+
* We observe consecutive resume that had to be matched, but it wasn't,
85+
* increment
86+
*/
87+
if (_state == RUNNING && state == RUNNING && shouldBeMatched) {
88+
++unmatchedResume
89+
} else if (unmatchedResume > 0 && state == SUSPENDED) {
90+
/*
91+
* We received late 'suspend' probe for unmatched resume, skip it.
92+
* Here we deliberately allow the very unlikely race;
93+
* Consider the following scenario ('[r:a]' means "probeCoroutineResumed at a()"):
94+
* ```
95+
* [r:a] a() -> b() [s:b] [r:b] -> (back to a) a() -> c() [s:c]
96+
* ```
97+
* We can, in theory, observe the following probes interleaving:
98+
* ```
99+
* r:a
100+
* r:b // Unmatched resume
101+
* s:c // Matched suspend, discard
102+
* s:b
103+
* ```
104+
* Thus mis-attributing 'lastObservedFrame' to a previously-observed.
105+
* It is possible in theory (though I've failed to reproduce it), yet
106+
* is more preferred than indefinitely mismatched state (-> mismatched real/enhanced stacktrace)
107+
*/
108+
--unmatchedResume
109+
return
110+
}
111+
112+
// Propagate only non-duplicating transitions to running, see KT-29997
113+
if (_state == state && state == SUSPENDED && lastObservedFrame != null) return
114+
115+
_state = state
116+
lastObservedFrame = frame as? CoroutineStackFrame
117+
lastObservedThread = if (state == RUNNING) {
118+
Thread.currentThread()
119+
} else {
120+
null
121+
}
122+
}
123+
47124
@JvmField
48125
@Volatile
49126
internal var lastObservedThread: Thread? = null
@@ -56,7 +133,9 @@ internal class DebugCoroutineInfoImpl(
56133
private var _lastObservedFrame: WeakReference<CoroutineStackFrame>? = null
57134
internal var lastObservedFrame: CoroutineStackFrame?
58135
get() = _lastObservedFrame?.get()
59-
set(value) { _lastObservedFrame = value?.let { WeakReference(it) } }
136+
set(value) {
137+
_lastObservedFrame = value?.let { WeakReference(it) }
138+
}
60139

61140
/**
62141
* Last observed stacktrace of the coroutine captured on its suspension or resumption point.
@@ -88,17 +167,5 @@ internal class DebugCoroutineInfoImpl(
88167
}
89168
}
90169

91-
internal fun updateState(state: String, frame: Continuation<*>) {
92-
// Propagate only duplicating transitions to running for KT-29997
93-
if (_state == state && state == SUSPENDED && lastObservedFrame != null) return
94-
_state = state
95-
lastObservedFrame = frame as? CoroutineStackFrame
96-
lastObservedThread = if (state == RUNNING) {
97-
Thread.currentThread()
98-
} else {
99-
null
100-
}
101-
}
102-
103170
override fun toString(): String = "DebugCoroutineInfo(state=$state,context=$context)"
104171
}

kotlinx-coroutines-core/jvm/src/debug/internal/DebugProbesImpl.kt

+5-3
Original file line numberDiff line numberDiff line change
@@ -436,16 +436,18 @@ internal object DebugProbesImpl {
436436
// Lookup coroutine info in cache or by traversing stack frame
437437
val info: DebugCoroutineInfoImpl
438438
val cached = callerInfoCache.remove(frame)
439+
val shouldBeMatchedWithProbeSuspended: Boolean
439440
if (cached != null) {
440441
info = cached
442+
shouldBeMatchedWithProbeSuspended = false
441443
} else {
442444
info = frame.owner()?.info ?: return
445+
shouldBeMatchedWithProbeSuspended = true
443446
// Guard against improper implementations of CoroutineStackFrame and bugs in the compiler
444447
val realCaller = info.lastObservedFrame?.realCaller()
445448
if (realCaller != null) callerInfoCache.remove(realCaller)
446449
}
447-
448-
info.updateState(state, frame as Continuation<*>)
450+
info.updateState(state, frame as Continuation<*>, shouldBeMatchedWithProbeSuspended)
449451
// Do not cache it for proxy-classes such as ScopeCoroutines
450452
val caller = frame.realCaller() ?: return
451453
callerInfoCache[caller] = info
@@ -458,7 +460,7 @@ internal object DebugProbesImpl {
458460

459461
private fun updateState(owner: CoroutineOwner<*>, frame: Continuation<*>, state: String) {
460462
if (!isInstalled) return
461-
owner.info.updateState(state, frame)
463+
owner.info.updateState(state, frame, true)
462464
}
463465

464466
private fun Continuation<*>.owner(): CoroutineOwner<*>? = (this as? CoroutineStackFrame)?.owner()

kotlinx-coroutines-debug/test/DebugProbesTest.kt

+56
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package kotlinx.coroutines.debug
66
import kotlinx.coroutines.*
77
import org.junit.Test
88
import java.util.concurrent.*
9+
import java.util.concurrent.atomic.AtomicBoolean
910
import kotlin.test.*
1011

1112
class DebugProbesTest : DebugTestBase() {
@@ -100,4 +101,59 @@ class DebugProbesTest : DebugTestBase() {
100101
verifyStackTrace(e, traces)
101102
}
102103
}
104+
105+
@Test
106+
fun testMultipleConsecutiveProbeResumed() = runTest {
107+
val job = launch {
108+
expect(1)
109+
foo()
110+
expect(4)
111+
delay(Long.MAX_VALUE)
112+
expectUnreached()
113+
}
114+
yield()
115+
yield()
116+
expect(5)
117+
val infos = DebugProbes.dumpCoroutinesInfo()
118+
assertEquals(2, infos.size)
119+
assertEquals(setOf(State.RUNNING, State.SUSPENDED), infos.map { it.state }.toSet())
120+
job.cancel()
121+
finish(6)
122+
}
123+
124+
@Test
125+
fun testMultipleConsecutiveProbeResumedAndLaterRunning() = runTest {
126+
val reachedActiveStage = AtomicBoolean(false)
127+
val job = launch(Dispatchers.Default) {
128+
expect(1)
129+
foo()
130+
expect(4)
131+
yield()
132+
reachedActiveStage.set(true)
133+
while (isActive) {
134+
// Spin until test is done
135+
}
136+
}
137+
while (!reachedActiveStage.get()) {
138+
delay(10)
139+
}
140+
expect(5)
141+
val infos = DebugProbes.dumpCoroutinesInfo()
142+
assertEquals(2, infos.size)
143+
assertEquals(setOf(State.RUNNING, State.RUNNING), infos.map { it.state }.toSet())
144+
job.cancel()
145+
finish(6)
146+
}
147+
148+
private suspend fun foo() {
149+
bar()
150+
// Kill TCO
151+
expect(3)
152+
}
153+
154+
155+
private suspend fun bar() {
156+
yield()
157+
expect(2)
158+
}
103159
}

0 commit comments

Comments
 (0)