Skip to content

Commit 03701ec

Browse files
committed
Ensure ConsumeAsFlow does not retain reference to the last element of the flow with test
1 parent 8fd7ce8 commit 03701ec

File tree

3 files changed

+115
-14
lines changed

3 files changed

+115
-14
lines changed

kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt

+6-4
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ internal class DispatchedContinuation<in T>(
3131
* Possible states of reusability:
3232
*
3333
* 1) `null`. Cancellable continuation wasn't yet attempted to be reused or
34-
* way used and then invalidated (e.g. because of the cancellation).
34+
* was used and then invalidated (e.g. because of the cancellation).
3535
* 2) [CancellableContinuation]. Continuation to be/that is being reused.
3636
* 3) [REUSABLE_CLAIMED]. CC is currently being reused and its owner executes `suspend` block:
3737
* ```
@@ -91,7 +91,7 @@ internal class DispatchedContinuation<in T>(
9191
return state as CancellableContinuationImpl<T>
9292
}
9393
}
94-
else -> error("Inconsistent state $state")
94+
else -> error("Inconsistent state $state")
9595
}
9696
}
9797
}
@@ -114,13 +114,15 @@ internal class DispatchedContinuation<in T>(
114114
_reusableCancellableContinuation.loop { state ->
115115
// not when(state) to avoid Intrinsics.equals call
116116
when {
117-
state === REUSABLE_CLAIMED -> if (_reusableCancellableContinuation.compareAndSet(REUSABLE_CLAIMED, continuation)) return null
117+
state === REUSABLE_CLAIMED -> {
118+
if (_reusableCancellableContinuation.compareAndSet(REUSABLE_CLAIMED, continuation)) return null
119+
}
118120
state === null -> return null
119121
state is Throwable -> {
120122
require(_reusableCancellableContinuation.compareAndSet(state, null))
121123
return state
122124
}
123-
else -> return null // Is not reusable
125+
else -> error("Inconsistent state $state")
124126
}
125127
}
126128
}

kotlinx-coroutines-core/jvm/test/FieldWalker.kt

+61-10
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package kotlinx.coroutines
77
import java.lang.reflect.*
88
import java.util.*
99
import java.util.Collections.*
10+
import kotlin.collections.ArrayList
1011

1112
object FieldWalker {
1213

@@ -22,12 +23,6 @@ object FieldWalker {
2223
val element = stack.removeLast()
2324
val type = element.javaClass
2425
type.visit(element, result, stack)
25-
26-
var superclass = type.superclass
27-
while (superclass != Any::class.java && superclass != null) {
28-
superclass.visit(element, result, stack)
29-
superclass = superclass.superclass
30-
}
3126
}
3227
return result
3328
}
@@ -56,9 +51,65 @@ object FieldWalker {
5651
}
5752
}
5853

59-
private fun Class<*>.fields() = declaredFields.filter {
60-
!it.type.isPrimitive
61-
&& !Modifier.isStatic(it.modifiers)
62-
&& !(it.type.isArray && it.type.componentType.isPrimitive)
54+
private fun Class<*>.fields(): List<Field> {
55+
val result = ArrayList<Field>()
56+
var type = this
57+
while (type != Any::class.java) {
58+
val fields = type.declaredFields.filter {
59+
!it.type.isPrimitive
60+
&& !Modifier.isStatic(it.modifiers)
61+
&& !(it.type.isArray && it.type.componentType.isPrimitive)
62+
}
63+
result.addAll(fields)
64+
type = type.superclass
65+
}
66+
67+
return result
68+
}
69+
70+
// Debugging-only
71+
@Suppress("UNUSED")
72+
fun printPath(from: Any, to: Any) {
73+
val pathNodes = ArrayList<String>()
74+
val visited = newSetFromMap<Any>(IdentityHashMap())
75+
visited.add(from)
76+
if (findPath(from, to, visited, pathNodes)) {
77+
pathNodes.reverse()
78+
println(pathNodes.joinToString(" -> ", from.javaClass.simpleName + " -> ", "-> " + to.javaClass.simpleName))
79+
} else {
80+
println("Path from $from to $to not found")
81+
}
82+
}
83+
84+
private fun findPath(from: Any, to: Any, visited: MutableSet<Any>, pathNodes: MutableList<String>): Boolean {
85+
if (from === to) {
86+
return true
87+
}
88+
89+
val type = from.javaClass
90+
if (type.isArray) {
91+
if (type.componentType.isPrimitive) return false
92+
val array = from as Array<Any?>
93+
array.filterNotNull().forEach {
94+
if (findPath(it, to, visited, pathNodes)) {
95+
return true
96+
}
97+
}
98+
return false
99+
}
100+
101+
val fields = type.fields()
102+
fields.forEach {
103+
it.isAccessible = true
104+
val value = it.get(from) ?: return@forEach
105+
if (!visited.add(value)) return@forEach
106+
val found = findPath(value, to, visited, pathNodes)
107+
if (found) {
108+
pathNodes += from.javaClass.simpleName + ":" + it.name
109+
return true
110+
}
111+
}
112+
113+
return false
63114
}
64115
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.flow
6+
7+
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.channels.*
9+
import org.junit.Test
10+
import kotlin.test.*
11+
12+
class ConsumeAsFlowLeakTest : TestBase() {
13+
14+
private data class Box(val i: Int)
15+
16+
// In companion to avoid references through runTest
17+
companion object {
18+
private val first = Box(4)
19+
private val second = Box(5)
20+
}
21+
22+
// @Test //ignored until KT-33986
23+
fun testReferenceIsNotRetained() = testReferenceNotRetained(true)
24+
25+
@Test
26+
fun testReferenceIsNotRetainedNoSuspension() = testReferenceNotRetained(false)
27+
28+
private fun testReferenceNotRetained(shouldSuspendOnSend: Boolean) = runTest {
29+
val channel = BroadcastChannel<Box>(1)
30+
val job = launch {
31+
expect(2)
32+
channel.asFlow().collect {
33+
expect(it.i)
34+
}
35+
}
36+
37+
expect(1)
38+
yield()
39+
expect(3)
40+
channel.send(first)
41+
if (shouldSuspendOnSend) yield()
42+
channel.send(second)
43+
yield()
44+
assertEquals(0, FieldWalker.walk(channel).count { it === second })
45+
finish(6)
46+
job.cancelAndJoin()
47+
}
48+
}

0 commit comments

Comments
 (0)