Skip to content

Commit f661918

Browse files
qwwdfsadpablobaxter
authored andcommitted
Properly detect non-released reusable continuations in non-reusable o… (Kotlin#2772)
* Properly detect non-released reusable continuations in non-reusable ones and await for reusability to have a consistent state * Ensure that the caller to DispatchedContinuation.isReusable is reusable itself * Using the previous invariant, simplify DispatchedContinuation.isReusable to a single null-check * It also restores the invariant that `cc.isReusable() == cc.resumeMode.isReusableMode` Fixes Kotlin#2736 Fixes Kotlin#2768
1 parent b6ff228 commit f661918

File tree

3 files changed

+52
-22
lines changed

3 files changed

+52
-22
lines changed

kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt

+3-4
Original file line numberDiff line numberDiff line change
@@ -107,15 +107,15 @@ internal open class CancellableContinuationImpl<in T>(
107107
}
108108
}
109109

110-
private fun isReusable(): Boolean = delegate is DispatchedContinuation<*> && delegate.isReusable(this)
110+
private fun isReusable(): Boolean = resumeMode.isReusableMode && (delegate as DispatchedContinuation<*>).isReusable()
111111

112112
/**
113113
* Resets cancellability state in order to [suspendCancellableCoroutineReusable] to work.
114114
* Invariant: used only by [suspendCancellableCoroutineReusable] in [REUSABLE_CLAIMED] state.
115115
*/
116116
@JvmName("resetStateReusable") // Prettier stack traces
117117
internal fun resetStateReusable(): Boolean {
118-
assert { resumeMode == MODE_CANCELLABLE_REUSABLE } // invalid mode for CancellableContinuationImpl
118+
assert { resumeMode == MODE_CANCELLABLE_REUSABLE }
119119
assert { parentHandle !== NonDisposableHandle }
120120
val state = _state.value
121121
assert { state !is NotCompleted }
@@ -164,8 +164,7 @@ internal open class CancellableContinuationImpl<in T>(
164164
* Attempt to postpone cancellation for reusable cancellable continuation
165165
*/
166166
private fun cancelLater(cause: Throwable): Boolean {
167-
if (!resumeMode.isReusableMode) return false
168-
// Ensure that we are postponing cancellation to the right instance
167+
// Ensure that we are postponing cancellation to the right reusable instance
169168
if (!isReusable()) return false
170169
val dispatched = delegate as DispatchedContinuation<*>
171170
return dispatched.postponeCancellation(cause)

kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt

+8-18
Original file line numberDiff line numberDiff line change
@@ -58,40 +58,30 @@ internal class DispatchedContinuation<in T>(
5858
*/
5959
private val _reusableCancellableContinuation = atomic<Any?>(null)
6060

61-
public val reusableCancellableContinuation: CancellableContinuationImpl<*>?
61+
private val reusableCancellableContinuation: CancellableContinuationImpl<*>?
6262
get() = _reusableCancellableContinuation.value as? CancellableContinuationImpl<*>
6363

64-
public fun isReusable(requester: CancellableContinuationImpl<*>): Boolean {
64+
fun isReusable(): Boolean {
6565
/*
66+
Invariant: caller.resumeMode.isReusableMode
6667
* Reusability control:
6768
* `null` -> no reusability at all, `false`
68-
* If current state is not CCI, then we are within `suspendCancellableCoroutineReusable`, true
69-
* Else, if result is CCI === requester, then it's our reusable continuation
70-
* Identity check my fail for the following pattern:
71-
* ```
72-
* loop:
73-
* suspendCancellableCoroutineReusable { } // Reusable, outer coroutine stores the child handle
74-
* suspendCancellableCoroutine { } // **Not reusable**, handle should be disposed after {}, otherwise
75-
* it will leak because it won't be freed by `releaseInterceptedContinuation`
76-
* ```
69+
* anything else -> reusable.
7770
*/
78-
val value = _reusableCancellableContinuation.value ?: return false
79-
if (value is CancellableContinuationImpl<*>) return value === requester
80-
return true
71+
return _reusableCancellableContinuation.value != null
8172
}
8273

83-
8474
/**
8575
* Awaits until previous call to `suspendCancellableCoroutineReusable` will
8676
* stop mutating cached instance
8777
*/
88-
public fun awaitReusability() {
89-
_reusableCancellableContinuation.loop { it ->
78+
fun awaitReusability() {
79+
_reusableCancellableContinuation.loop {
9080
if (it !== REUSABLE_CLAIMED) return
9181
}
9282
}
9383

94-
public fun release() {
84+
fun release() {
9585
/*
9686
* Called from `releaseInterceptedContinuation`, can be concurrent with
9787
* the code in `getResult` right after `trySuspend` returned `true`, so we have
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines
6+
7+
import kotlinx.coroutines.flow.*
8+
import org.junit.*
9+
10+
class ReusableContinuationStressTest : TestBase() {
11+
12+
private val iterations = 1000 * stressTestMultiplierSqrt
13+
14+
@Test // Originally reported by @denis-bezrukov in #2736
15+
fun testDebounceWithStateFlow() = runBlocking<Unit> {
16+
withContext(Dispatchers.Default) {
17+
repeat(iterations) {
18+
launch { // <- load the dispatcher and OS scheduler
19+
runStressTestOnce(1, 1)
20+
}
21+
}
22+
}
23+
}
24+
25+
private suspend fun runStressTestOnce(delay: Int, debounce: Int) = coroutineScope {
26+
val stateFlow = MutableStateFlow(0)
27+
val emitter = launch {
28+
repeat(1000) { i ->
29+
stateFlow.emit(i)
30+
delay(delay.toLong())
31+
}
32+
}
33+
var last = 0
34+
stateFlow.debounce(debounce.toLong()).take(100).collect { i ->
35+
if (i - last > 100) {
36+
last = i
37+
}
38+
}
39+
emitter.cancel()
40+
}
41+
}

0 commit comments

Comments
 (0)