Skip to content

Commit c847570

Browse files
committed
### Reproducing scenario
For the following code: ``` suspend fun foo() = yield() ``` the following bytecode is produced: ``` fun foo(...) { uCont.intercepted().dispatchUsingDispatcher() // 1 probeCoroutineSuspended() // 2 return COROUTINE_SUSPENDED // Unroll the stack } ``` And it is possible to observe the next 'probeCoroutineSuspended' **prior** to receiving 'probeCoroutineSuspended'. ### Steps taken To address this, a dedicated 'unmatchedResumes' field is introduced to the coroutine state, which keeps track of consecutive 'probeCoroutineResumed' without matching 'probeCoroutineSuspended' and attributes lately arrived probes to it. Unfortunately, it introduces a much more unlikely race when **two** 'probeCoroutineSuspended' are reordered, then we misattribute 'lastObservedFrame', but it is still better than misattributing the actual coroutine state. @volatile and @synchronized are also introduced to DebugCoroutineInfoImpl as previously they have been subject to data races as well Fixes #3193
1 parent bfc6c0b commit c847570

File tree

3 files changed

+144
-16
lines changed

3 files changed

+144
-16
lines changed

kotlinx-coroutines-core/jvm/src/debug/internal/DebugCoroutineInfoImpl.kt

+83-13
Original file line numberDiff line numberDiff line change
@@ -40,19 +40,101 @@ internal class DebugCoroutineInfoImpl(
4040
* Can be CREATED, RUNNING, SUSPENDED.
4141
*/
4242
public val state: String get() = _state
43+
44+
@Volatile
4345
private var _state: String = CREATED
4446

47+
/*
48+
* How many consecutive unmatched 'updateState(RESUMED)' this object has received.
49+
* It can be `> 1` in two cases:
50+
*
51+
* * The coroutine is finishing and its state is being unrolled in BaseContinuationImpl, see comment to DebugProbesImpl#callerInfoCache
52+
* Such resumes are not expected to be matched and are ignored.
53+
* * We encountered suspend-resume race explained above, and we do wait for a match.
54+
*/
55+
private var unmatchedResume = 0
56+
57+
/**
58+
* Here we orchestrate overlapping state updates that are coming asynchronously.
59+
* In a nutshell, `probeCoroutineSuspended` can arrive **later** than its matching `probeCoroutineResumed`,
60+
* e.g. for the following code:
61+
* ```
62+
* suspend fun foo() = yield()
63+
* ```
64+
*
65+
* we have this sequence:
66+
* ```
67+
* fun foo(...) {
68+
* uCont.intercepted().dispatchUsingDispatcher() // 1
69+
* // Notify the debugger the coroutine is suspended
70+
* probeCoroutineSuspended() // 2
71+
* return COROUTINE_SUSPENDED // Unroll the stack
72+
* }
73+
* ```
74+
* Nothing prevents coroutine to be dispatched and invoke `probeCoroutineResumed` right between '1' and '2'.
75+
* See also: https://github.com/Kotlin/kotlinx.coroutines/issues/3193
76+
*
77+
* [shouldBeMatched] -- `false` if it is an expected consecutive `probeCoroutineResumed` from BaseContinuationImpl,
78+
* `true` otherwise.
79+
*/
80+
@Synchronized
81+
internal fun updateState(state: String, frame: Continuation<*>, shouldBeMatched: Boolean) {
82+
/**
83+
* We observe consecutive resume that had to be matched, but it wasn't,
84+
* increment
85+
*/
86+
if (_state == RUNNING && state == RUNNING && shouldBeMatched) {
87+
++unmatchedResume
88+
} else if (unmatchedResume > 0 && state == SUSPENDED) {
89+
/*
90+
* We received late 'suspend' probe for unmatched resume, skip it.
91+
* Here we deliberately allow the very unlikely race;
92+
* Consider the following scenario ('[r:a]' means "probeCoroutineResumed at a()"):
93+
* ```
94+
* [r:a] a() -> b() [s:b] [r:b] -> (back to a) a() -> c() [s:c]
95+
* ```
96+
* We can, in theory, observe the following probes interleaving:
97+
* ```
98+
* r:a
99+
* r:b // Unmatched resume
100+
* s:c // Matched suspend, discard
101+
* s:b
102+
* ```
103+
* Thus mis-attributing 'lastObservedFrame' to a previously-observed.
104+
* It is possible in theory (though I've failed to reproduce it), yet
105+
* is more preferred than indefinitely mismatched state (-> mismatched real/enhanced stacktrace)
106+
*/
107+
--unmatchedResume
108+
return
109+
}
110+
111+
// Propagate only non-duplicating transitions to running, see KT-29997
112+
if (_state == state && state == SUSPENDED && lastObservedFrame != null) return
113+
114+
_state = state
115+
lastObservedFrame = frame as? CoroutineStackFrame
116+
lastObservedThread = if (state == RUNNING) {
117+
Thread.currentThread()
118+
} else {
119+
null
120+
}
121+
}
122+
45123
@JvmField
124+
@Volatile
46125
internal var lastObservedThread: Thread? = null
47126

48127
/**
49128
* We cannot keep a strong reference to the last observed frame of the coroutine, because this will
50129
* prevent garbage-collection of a coroutine that was lost.
51130
*/
131+
@Volatile
52132
private var _lastObservedFrame: WeakReference<CoroutineStackFrame>? = null
53133
internal var lastObservedFrame: CoroutineStackFrame?
54134
get() = _lastObservedFrame?.get()
55-
set(value) { _lastObservedFrame = value?.let { WeakReference(it) } }
135+
set(value) {
136+
_lastObservedFrame = value?.let { WeakReference(it) }
137+
}
56138

57139
/**
58140
* Last observed stacktrace of the coroutine captured on its suspension or resumption point.
@@ -84,17 +166,5 @@ internal class DebugCoroutineInfoImpl(
84166
}
85167
}
86168

87-
internal fun updateState(state: String, frame: Continuation<*>) {
88-
// Propagate only duplicating transitions to running for KT-29997
89-
if (_state == state && state == SUSPENDED && lastObservedFrame != null) return
90-
_state = state
91-
lastObservedFrame = frame as? CoroutineStackFrame
92-
lastObservedThread = if (state == RUNNING) {
93-
Thread.currentThread()
94-
} else {
95-
null
96-
}
97-
}
98-
99169
override fun toString(): String = "DebugCoroutineInfo(state=$state,context=$context)"
100170
}

kotlinx-coroutines-core/jvm/src/debug/internal/DebugProbesImpl.kt

+5-3
Original file line numberDiff line numberDiff line change
@@ -446,16 +446,18 @@ internal object DebugProbesImpl {
446446
// Lookup coroutine info in cache or by traversing stack frame
447447
val info: DebugCoroutineInfoImpl
448448
val cached = callerInfoCache.remove(frame)
449+
val shouldBeMatchedWithProbeSuspended: Boolean
449450
if (cached != null) {
450451
info = cached
452+
shouldBeMatchedWithProbeSuspended = false
451453
} else {
452454
info = frame.owner()?.info ?: return
455+
shouldBeMatchedWithProbeSuspended = true
453456
// Guard against improper implementations of CoroutineStackFrame and bugs in the compiler
454457
val realCaller = info.lastObservedFrame?.realCaller()
455458
if (realCaller != null) callerInfoCache.remove(realCaller)
456459
}
457-
458-
info.updateState(state, frame as Continuation<*>)
460+
info.updateState(state, frame as Continuation<*>, shouldBeMatchedWithProbeSuspended)
459461
// Do not cache it for proxy-classes such as ScopeCoroutines
460462
val caller = frame.realCaller() ?: return
461463
callerInfoCache[caller] = info
@@ -468,7 +470,7 @@ internal object DebugProbesImpl {
468470

469471
private fun updateState(owner: CoroutineOwner<*>, frame: Continuation<*>, state: String) = coroutineStateLock.read {
470472
if (!isInstalled) return
471-
owner.info.updateState(state, frame)
473+
owner.info.updateState(state, frame, true)
472474
}
473475

474476
private fun Continuation<*>.owner(): CoroutineOwner<*>? = (this as? CoroutineStackFrame)?.owner()

kotlinx-coroutines-debug/test/DebugProbesTest.kt

+56
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package kotlinx.coroutines.debug
66
import kotlinx.coroutines.*
77
import org.junit.Test
88
import java.util.concurrent.*
9+
import java.util.concurrent.atomic.AtomicBoolean
910
import kotlin.test.*
1011

1112
class DebugProbesTest : DebugTestBase() {
@@ -100,4 +101,59 @@ class DebugProbesTest : DebugTestBase() {
100101
verifyStackTrace(e, traces)
101102
}
102103
}
104+
105+
@Test
106+
fun testMultipleConsecutiveProbeResumed() = runTest {
107+
val job = launch {
108+
expect(1)
109+
foo()
110+
expect(4)
111+
delay(Long.MAX_VALUE)
112+
expectUnreached()
113+
}
114+
yield()
115+
yield()
116+
expect(5)
117+
val infos = DebugProbes.dumpCoroutinesInfo()
118+
assertEquals(2, infos.size)
119+
assertEquals(setOf(State.RUNNING, State.SUSPENDED), infos.map { it.state }.toSet())
120+
job.cancel()
121+
finish(6)
122+
}
123+
124+
@Test
125+
fun testMultipleConsecutiveProbeResumedAndLaterRunning() = runTest {
126+
val reachedActiveStage = AtomicBoolean(false)
127+
val job = launch(Dispatchers.Default) {
128+
expect(1)
129+
foo()
130+
expect(4)
131+
yield()
132+
reachedActiveStage.set(true)
133+
while (isActive) {
134+
// Spin until test is done
135+
}
136+
}
137+
while (!reachedActiveStage.get()) {
138+
delay(10)
139+
}
140+
expect(5)
141+
val infos = DebugProbes.dumpCoroutinesInfo()
142+
assertEquals(2, infos.size)
143+
assertEquals(setOf(State.RUNNING, State.RUNNING), infos.map { it.state }.toSet())
144+
job.cancel()
145+
finish(6)
146+
}
147+
148+
private suspend fun foo() {
149+
bar()
150+
// Kill TCO
151+
expect(3)
152+
}
153+
154+
155+
private suspend fun bar() {
156+
yield()
157+
expect(2)
158+
}
103159
}

0 commit comments

Comments
 (0)