Skip to content

Commit ed0839d

Browse files
elizarovqwwdfsad
authored andcommitted
Cancellation in consumeEach should dispose Rx Observable
Fixed bugs in MaybeSource/ObservableSource.consumeEach implementation so that observable is disposed on cancellation. Also optimized implementation of bridge function to avoid extra dispose calls if possible (this is permissible by specification, though) Fixes #1008
1 parent d1a05f3 commit ed0839d

File tree

8 files changed

+228
-30
lines changed

8 files changed

+228
-30
lines changed

reactive/kotlinx-coroutines-reactive/src/Channel.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,8 @@ public fun <T> Publisher<T>.openSubscription(request: Int = 0): ReceiveChannel<T
3030
/**
3131
* Subscribes to this [Publisher] and performs the specified action for each received element.
3232
*/
33-
public suspend inline fun <T> Publisher<T>.consumeEach(action: (T) -> Unit) {
33+
public suspend inline fun <T> Publisher<T>.consumeEach(action: (T) -> Unit) =
3434
openSubscription().consumeEach(action)
35-
}
3635

3736
@Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
3837
private class SubscriptionChannel<T>(
@@ -75,6 +74,7 @@ private class SubscriptionChannel<T>(
7574
@Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER")
7675
override fun onClosedIdempotent(closed: LockFreeLinkedListNode) {
7776
subscription?.cancel()
77+
subscription = null // optimization -- no need to cancel it again
7878
}
7979

8080
// Subscriber overrides

reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt

-18
Original file line numberDiff line numberDiff line change
@@ -74,24 +74,6 @@ class IntegrationTest(
7474
assertThat(cnt, IsEqual(1))
7575
}
7676

77-
@Test
78-
fun testFailingConsumer() = runTest {
79-
val pub = publish {
80-
repeat(3) {
81-
expect(it + 1) // expect(1), expect(2) *should* be invoked
82-
send(it)
83-
}
84-
}
85-
86-
try {
87-
pub.consumeEach {
88-
throw TestException()
89-
}
90-
} catch (e: TestException) {
91-
finish(3)
92-
}
93-
}
94-
9577
@Test
9678
fun testNumbers() = runBlocking<Unit> {
9779
val n = 100 * stressTestMultiplier

reactive/kotlinx-coroutines-reactive/test/PublishTest.kt

+17
Original file line numberDiff line numberDiff line change
@@ -252,4 +252,21 @@ class PublishTest : TestBase() {
252252
latch.await()
253253
finish(8)
254254
}
255+
256+
@Test
257+
fun testFailingConsumer() = runTest {
258+
val pub = publish {
259+
repeat(3) {
260+
expect(it + 1) // expect(1), expect(2) *should* be invoked
261+
send(it)
262+
}
263+
}
264+
try {
265+
pub.consumeEach {
266+
throw TestException()
267+
}
268+
} catch (e: TestException) {
269+
finish(3)
270+
}
271+
}
255272
}

reactive/kotlinx-coroutines-reactor/test/FluxTest.kt

+58
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,11 @@
55
package kotlinx.coroutines.reactor
66

77
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.reactive.*
89
import org.hamcrest.core.*
910
import org.junit.*
11+
import org.junit.Test
12+
import kotlin.test.*
1013

1114
class FluxTest : TestBase() {
1215
@Test
@@ -80,4 +83,59 @@ class FluxTest : TestBase() {
8083
{ assert(it is RuntimeException) }
8184
)
8285
}
86+
87+
@Test
88+
fun testNotifyOnceOnCancellation() = runTest {
89+
expect(1)
90+
val observable =
91+
flux {
92+
expect(5)
93+
send("OK")
94+
try {
95+
delay(Long.MAX_VALUE)
96+
} catch (e: CancellationException) {
97+
expect(11)
98+
}
99+
}
100+
.doOnNext {
101+
expect(6)
102+
assertEquals("OK", it)
103+
}
104+
.doOnCancel {
105+
expect(10) // notified once!
106+
}
107+
expect(2)
108+
val job = launch(start = CoroutineStart.UNDISPATCHED) {
109+
expect(3)
110+
observable.consumeEach {
111+
expect(8)
112+
assertEquals("OK", it)
113+
}
114+
}
115+
expect(4)
116+
yield() // to observable code
117+
expect(7)
118+
yield() // to consuming coroutines
119+
expect(9)
120+
job.cancel()
121+
job.join()
122+
finish(12)
123+
}
124+
125+
@Test
126+
fun testFailingConsumer() = runTest {
127+
val pub = flux {
128+
repeat(3) {
129+
expect(it + 1) // expect(1), expect(2) *should* be invoked
130+
send(it)
131+
}
132+
}
133+
try {
134+
pub.consumeEach {
135+
throw TestException()
136+
}
137+
} catch (e: TestException) {
138+
finish(3)
139+
}
140+
}
83141
}

reactive/kotlinx-coroutines-rx2/src/RxChannel.kt

+5-10
Original file line numberDiff line numberDiff line change
@@ -43,20 +43,14 @@ public fun <T> ObservableSource<T>.openSubscription(): ReceiveChannel<T> {
4343
/**
4444
* Subscribes to this [MaybeSource] and performs the specified action for each received element.
4545
*/
46-
public suspend inline fun <T> MaybeSource<T>.consumeEach(action: (T) -> Unit) {
47-
val channel = openSubscription()
48-
for (x in channel) action(x)
49-
channel.cancel()
50-
}
46+
public suspend inline fun <T> MaybeSource<T>.consumeEach(action: (T) -> Unit) =
47+
openSubscription().consumeEach(action)
5148

5249
/**
5350
* Subscribes to this [ObservableSource] and performs the specified action for each received element.
5451
*/
55-
public suspend inline fun <T> ObservableSource<T>.consumeEach(action: (T) -> Unit) {
56-
val channel = openSubscription()
57-
for (x in channel) action(x)
58-
channel.cancel()
59-
}
52+
public suspend inline fun <T> ObservableSource<T>.consumeEach(action: (T) -> Unit) =
53+
openSubscription().consumeEach(action)
6054

6155
@Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
6256
private class SubscriptionChannel<T> :
@@ -68,6 +62,7 @@ private class SubscriptionChannel<T> :
6862
@Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER")
6963
override fun onClosedIdempotent(closed: LockFreeLinkedListNode) {
7064
subscription?.dispose()
65+
subscription = null // optimization -- no need to dispose it again
7166
}
7267

7368
// Observer overrider

reactive/kotlinx-coroutines-rx2/test/FlowableTest.kt

+57
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import kotlinx.coroutines.*
88
import kotlinx.coroutines.reactive.*
99
import org.hamcrest.core.*
1010
import org.junit.*
11+
import org.junit.Test
12+
import kotlin.test.*
1113

1214
class FlowableTest : TestBase() {
1315
@Test
@@ -81,4 +83,59 @@ class FlowableTest : TestBase() {
8183
{ assert(it is RuntimeException) }
8284
)
8385
}
86+
87+
@Test
88+
fun testNotifyOnceOnCancellation() = runTest {
89+
expect(1)
90+
val observable =
91+
rxFlowable {
92+
expect(5)
93+
send("OK")
94+
try {
95+
delay(Long.MAX_VALUE)
96+
} catch (e: CancellationException) {
97+
expect(11)
98+
}
99+
}
100+
.doOnNext {
101+
expect(6)
102+
assertEquals("OK", it)
103+
}
104+
.doOnCancel {
105+
expect(10) // notified once!
106+
}
107+
expect(2)
108+
val job = launch(start = CoroutineStart.UNDISPATCHED) {
109+
expect(3)
110+
observable.consumeEach{
111+
expect(8)
112+
assertEquals("OK", it)
113+
}
114+
}
115+
expect(4)
116+
yield() // to observable code
117+
expect(7)
118+
yield() // to consuming coroutines
119+
expect(9)
120+
job.cancel()
121+
job.join()
122+
finish(12)
123+
}
124+
125+
@Test
126+
fun testFailingConsumer() = runTest {
127+
val pub = rxFlowable {
128+
repeat(3) {
129+
expect(it + 1) // expect(1), expect(2) *should* be invoked
130+
send(it)
131+
}
132+
}
133+
try {
134+
pub.consumeEach {
135+
throw TestException()
136+
}
137+
} catch (e: TestException) {
138+
finish(3)
139+
}
140+
}
84141
}

reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt

+27
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import org.hamcrest.core.*
1212
import org.junit.*
1313
import org.junit.Assert.*
1414
import java.util.concurrent.*
15+
import java.util.concurrent.CancellationException
1516

1617
class MaybeTest : TestBase() {
1718
@Before
@@ -211,4 +212,30 @@ class MaybeTest : TestBase() {
211212
{ assert(it is RuntimeException) }
212213
)
213214
}
215+
216+
@Test
217+
fun testCancelledConsumer() = runTest {
218+
expect(1)
219+
val maybe = rxMaybe<Int> {
220+
expect(4)
221+
try {
222+
delay(Long.MAX_VALUE)
223+
} catch (e: CancellationException) {
224+
expect(6)
225+
}
226+
42
227+
}
228+
expect(2)
229+
val timeout = withTimeoutOrNull(100) {
230+
expect(3)
231+
maybe.consumeEach {
232+
expectUnreached()
233+
}
234+
expectUnreached()
235+
}
236+
assertNull(timeout)
237+
expect(5)
238+
yield() // must cancel code inside maybe!!!
239+
finish(7)
240+
}
214241
}

reactive/kotlinx-coroutines-rx2/test/ObservableTest.kt

+62
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ package kotlinx.coroutines.rx2
77
import kotlinx.coroutines.*
88
import org.hamcrest.core.*
99
import org.junit.*
10+
import org.junit.Test
11+
import kotlin.test.*
1012

1113
class ObservableTest : TestBase() {
1214
@Test
@@ -80,4 +82,64 @@ class ObservableTest : TestBase() {
8082
{ assert(it is RuntimeException) }
8183
)
8284
}
85+
86+
@Test
87+
fun testNotifyOnceOnCancellation() = runTest {
88+
expect(1)
89+
val observable =
90+
rxObservable {
91+
expect(5)
92+
send("OK")
93+
try {
94+
delay(Long.MAX_VALUE)
95+
} catch (e: CancellationException) {
96+
expect(11)
97+
}
98+
}
99+
.doOnNext {
100+
expect(6)
101+
assertEquals("OK", it)
102+
}
103+
.doOnDispose {
104+
expect(10) // notified once!
105+
}
106+
expect(2)
107+
val job = launch(start = CoroutineStart.UNDISPATCHED) {
108+
expect(3)
109+
observable.consumeEach{
110+
expect(8)
111+
assertEquals("OK", it)
112+
}
113+
}
114+
expect(4)
115+
yield() // to observable code
116+
expect(7)
117+
yield() // to consuming coroutines
118+
expect(9)
119+
job.cancel()
120+
job.join()
121+
finish(12)
122+
}
123+
124+
@Test
125+
fun testFailingConsumer() = runTest {
126+
expect(1)
127+
val pub = rxObservable {
128+
expect(2)
129+
send("OK")
130+
try {
131+
delay(Long.MAX_VALUE)
132+
} catch (e: CancellationException) {
133+
finish(5)
134+
}
135+
}
136+
try {
137+
pub.consumeEach {
138+
expect(3)
139+
throw TestException()
140+
}
141+
} catch (e: TestException) {
142+
expect(4)
143+
}
144+
}
83145
}

0 commit comments

Comments
 (0)