Skip to content

Commit 066d2f5

Browse files
committed
Ensure ConsumeAsFlow does not retain reference to the last element of the flow with test
1 parent 8fd7ce8 commit 066d2f5

File tree

2 files changed

+54
-5
lines changed

2 files changed

+54
-5
lines changed

kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt

+6-5
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ internal class DispatchedContinuation<in T>(
3131
* Possible states of reusability:
3232
*
3333
* 1) `null`. Cancellable continuation wasn't yet attempted to be reused or
34-
* way used and then invalidated (e.g. because of the cancellation).
34+
* was used and then invalidated (e.g. because of the cancellation).
3535
* 2) [CancellableContinuation]. Continuation to be/that is being reused.
3636
* 3) [REUSABLE_CLAIMED]. CC is currently being reused and its owner executes `suspend` block:
3737
* ```
@@ -91,7 +91,7 @@ internal class DispatchedContinuation<in T>(
9191
return state as CancellableContinuationImpl<T>
9292
}
9393
}
94-
else -> error("Inconsistent state $state")
94+
else -> error("Inconsistent state $state")
9595
}
9696
}
9797
}
@@ -114,13 +114,14 @@ internal class DispatchedContinuation<in T>(
114114
_reusableCancellableContinuation.loop { state ->
115115
// not when(state) to avoid Intrinsics.equals call
116116
when {
117-
state === REUSABLE_CLAIMED -> if (_reusableCancellableContinuation.compareAndSet(REUSABLE_CLAIMED, continuation)) return null
118-
state === null -> return null
117+
state === REUSABLE_CLAIMED -> {
118+
if (_reusableCancellableContinuation.compareAndSet(REUSABLE_CLAIMED, continuation)) return null
119+
}
119120
state is Throwable -> {
120121
require(_reusableCancellableContinuation.compareAndSet(state, null))
121122
return state
122123
}
123-
else -> return null // Is not reusable
124+
else -> error("Inconsistent state $state")
124125
}
125126
}
126127
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.flow
6+
7+
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.channels.*
9+
import org.junit.Test
10+
import kotlin.test.*
11+
12+
class ConsumeAsFlowLeakTest : TestBase() {
13+
14+
private data class Box(val i: Int)
15+
16+
// In companion to avoid references through runTest
17+
companion object {
18+
private val first = Box(4)
19+
private val second = Box(5)
20+
}
21+
22+
// @Test //ignored until KT-33986
23+
fun testReferenceIsNotRetained() = testReferenceNotRetained(true)
24+
25+
@Test
26+
fun testReferenceIsNotRetainedNoSuspension() = testReferenceNotRetained(false)
27+
28+
private fun testReferenceNotRetained(shouldSuspendOnSend: Boolean) = runTest {
29+
val channel = BroadcastChannel<Box>(1)
30+
val job = launch {
31+
expect(2)
32+
channel.asFlow().collect {
33+
expect(it.i)
34+
}
35+
}
36+
37+
expect(1)
38+
yield()
39+
expect(3)
40+
channel.send(first)
41+
if (shouldSuspendOnSend) yield()
42+
channel.send(second)
43+
yield()
44+
assertEquals(0, FieldWalker.walk(channel).count { it === second })
45+
finish(6)
46+
job.cancelAndJoin()
47+
}
48+
}

0 commit comments

Comments
 (0)