File tree 2 files changed +68
-5
lines changed
reactive/kotlinx-coroutines-reactive
2 files changed +68
-5
lines changed Original file line number Diff line number Diff line change @@ -198,12 +198,20 @@ private suspend fun <T> Publisher<T>.awaitOne(
198
198
/* * cancelling the new subscription due to rule 2.5, though the publisher would either have to
199
199
* subscribe more than once, which would break 2.12, or leak this [Subscriber]. */
200
200
if (subscription != null ) {
201
- sub.cancel()
201
+ withSubscriptionLock {
202
+ sub.cancel()
203
+ }
202
204
return
203
205
}
204
206
subscription = sub
205
- cont.invokeOnCancellation { sub.cancel() }
206
- sub.request(if (mode == Mode .FIRST || mode == Mode .FIRST_OR_DEFAULT ) 1 else Long .MAX_VALUE )
207
+ cont.invokeOnCancellation {
208
+ withSubscriptionLock {
209
+ sub.cancel()
210
+ }
211
+ }
212
+ withSubscriptionLock {
213
+ sub.request(if (mode == Mode .FIRST || mode == Mode .FIRST_OR_DEFAULT ) 1 else Long .MAX_VALUE )
214
+ }
207
215
}
208
216
209
217
override fun onNext (t : T ) {
@@ -228,12 +236,16 @@ private suspend fun <T> Publisher<T>.awaitOne(
228
236
return
229
237
}
230
238
seenValue = true
231
- sub.cancel()
239
+ withSubscriptionLock {
240
+ sub.cancel()
241
+ }
232
242
cont.resume(t)
233
243
}
234
244
Mode .LAST , Mode .SINGLE , Mode .SINGLE_OR_DEFAULT -> {
235
245
if ((mode == Mode .SINGLE || mode == Mode .SINGLE_OR_DEFAULT ) && seenValue) {
236
- sub.cancel()
246
+ withSubscriptionLock {
247
+ sub.cancel()
248
+ }
237
249
/* the check for `cont.isActive` is needed in case `sub.cancel() above calls `onComplete` or
238
250
`onError` on its own. */
239
251
if (cont.isActive) {
@@ -289,6 +301,14 @@ private suspend fun <T> Publisher<T>.awaitOne(
289
301
inTerminalState = true
290
302
return true
291
303
}
304
+
305
+ /* *
306
+ * Enforce rule 2.7: [Subscription.request] and [Subscription.cancel] must be executed serially
307
+ */
308
+ @Synchronized
309
+ private fun withSubscriptionLock (block : () -> Unit ) {
310
+ block()
311
+ }
292
312
})
293
313
}
294
314
Original file line number Diff line number Diff line change
1
+ /*
2
+ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3
+ */
4
+
5
+ package kotlinx.coroutines.reactive
6
+
7
+ import kotlinx.coroutines.*
8
+ import org.junit.*
9
+ import org.reactivestreams.*
10
+ import java.util.concurrent.locks.*
11
+
12
+ /* *
13
+ * This test checks implementation of rule 2.7 for await methods - serial execution of subscription methods
14
+ */
15
+ class AwaitCancellationStressTest : TestBase () {
16
+ private val iterations = 10_000 * stressTestMultiplier
17
+
18
+ @Test
19
+ fun testAwaitCancellationOrder () = runTest {
20
+ repeat(iterations) {
21
+ val job = launch(Dispatchers .Default ) {
22
+ testPublisher().awaitFirst()
23
+ }
24
+ job.cancelAndJoin()
25
+ }
26
+ }
27
+
28
+ private fun testPublisher () = Publisher <Int > { s ->
29
+ val lock = ReentrantLock ()
30
+ s.onSubscribe(object : Subscription {
31
+ override fun request (n : Long ) {
32
+ check(lock.tryLock())
33
+ s.onNext(42 )
34
+ lock.unlock()
35
+ }
36
+
37
+ override fun cancel () {
38
+ check(lock.tryLock())
39
+ lock.unlock()
40
+ }
41
+ })
42
+ }
43
+ }
You can’t perform that action at this time.
0 commit comments