File tree 3 files changed +51
-2
lines changed
common/src/flow/operators
3 files changed +51
-2
lines changed Original file line number Diff line number Diff line change @@ -158,7 +158,12 @@ public fun <T> Flow<T>.onCompletion(
158
158
throw e
159
159
}
160
160
// Normal completion
161
- SafeCollector (this , currentCoroutineContext()).invokeSafely(action, null )
161
+ val sc = SafeCollector (this , currentCoroutineContext())
162
+ try {
163
+ sc.action(null )
164
+ } finally {
165
+ sc.releaseIntercepted()
166
+ }
162
167
}
163
168
164
169
/* *
Original file line number Diff line number Diff line change @@ -55,7 +55,6 @@ internal actual class SafeCollector<T> actual constructor(
55
55
*/
56
56
override suspend fun emit (value : T ) {
57
57
return suspendCoroutineUninterceptedOrReturn sc@{ uCont ->
58
- // Update information about caller for stackwalking
59
58
try {
60
59
emit(uCont, value)
61
60
} catch (e: Throwable ) {
Original file line number Diff line number Diff line change
1
+ /*
2
+ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3
+ */
4
+
5
+ package kotlinx.coroutines.flow
6
+
7
+ import kotlinx.coroutines.*
8
+ import org.junit.Test
9
+ import kotlin.coroutines.*
10
+ import kotlin.test.*
11
+
12
+ class OnCompletionInterceptedReleaseTest : TestBase () {
13
+ @Test
14
+ fun testLeak () = runTest {
15
+ expect(1 )
16
+ var cont: Continuation <Unit >? = null
17
+ val interceptor = CountingInterceptor ()
18
+ val job = launch(interceptor, start = CoroutineStart .UNDISPATCHED ) {
19
+ emptyFlow<Int >()
20
+ .onCompletion { emit(1 ) }
21
+ .collect { value ->
22
+ expect(2 )
23
+ assertEquals(1 , value)
24
+ suspendCoroutine { cont = it }
25
+ }
26
+ }
27
+ cont!! .resume(Unit )
28
+ assertTrue(job.isCompleted)
29
+ assertEquals(interceptor.intercepted, interceptor.released)
30
+ finish(3 )
31
+ }
32
+
33
+ class CountingInterceptor : AbstractCoroutineContextElement (ContinuationInterceptor ), ContinuationInterceptor {
34
+ var intercepted = 0
35
+ var released = 0
36
+ override fun <T > interceptContinuation (continuation : Continuation <T >): Continuation <T > {
37
+ intercepted++
38
+ return Continuation (continuation.context) { continuation.resumeWith(it) }
39
+ }
40
+
41
+ override fun releaseInterceptedContinuation (continuation : Continuation <* >) {
42
+ released++
43
+ }
44
+ }
45
+ }
You can’t perform that action at this time.
0 commit comments