Skip to content

Commit 7590b05

Browse files
elizarovqwwdfsad
authored andcommitted
Dispose/cancel reactive subscription exactly once using atomic
1 parent ed0839d commit 7590b05

File tree

2 files changed

+9
-12
lines changed

2 files changed

+9
-12
lines changed

reactive/kotlinx-coroutines-reactive/src/Channel.kt

+4-6
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,7 @@ private class SubscriptionChannel<T>(
4141
require(request >= 0) { "Invalid request size: $request" }
4242
}
4343

44-
@Volatile
45-
private var subscription: Subscription? = null
44+
private val _subscription = atomic<Subscription?>(null)
4645

4746
// requested from subscription minus number of received minus number of enqueued receivers,
4847
// can be negative if we have receivers, but no subscription yet
@@ -52,7 +51,7 @@ private class SubscriptionChannel<T>(
5251
@Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER")
5352
override fun onReceiveEnqueued() {
5453
_requested.loop { wasRequested ->
55-
val subscription = this.subscription
54+
val subscription = _subscription.value
5655
val needRequested = wasRequested - 1
5756
if (subscription != null && needRequested < 0) { // need to request more from subscription
5857
// try to fixup by making request
@@ -73,13 +72,12 @@ private class SubscriptionChannel<T>(
7372

7473
@Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER")
7574
override fun onClosedIdempotent(closed: LockFreeLinkedListNode) {
76-
subscription?.cancel()
77-
subscription = null // optimization -- no need to cancel it again
75+
_subscription.getAndSet(null)?.cancel() // cancel exactly once
7876
}
7977

8078
// Subscriber overrides
8179
override fun onSubscribe(s: Subscription) {
82-
subscription = s
80+
_subscription.value = s
8381
while (true) { // lock-free loop on _requested
8482
if (isClosedForSend) {
8583
s.cancel()

reactive/kotlinx-coroutines-rx2/src/RxChannel.kt

+5-6
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,9 @@ package kotlinx.coroutines.rx2
66

77
import io.reactivex.*
88
import io.reactivex.disposables.*
9-
import kotlinx.coroutines.channels.*
9+
import kotlinx.atomicfu.*
1010
import kotlinx.coroutines.*
11+
import kotlinx.coroutines.channels.*
1112
import kotlinx.coroutines.internal.*
1213

1314
/**
@@ -56,18 +57,16 @@ public suspend inline fun <T> ObservableSource<T>.consumeEach(action: (T) -> Uni
5657
private class SubscriptionChannel<T> :
5758
LinkedListChannel<T>(), Observer<T>, MaybeObserver<T>
5859
{
59-
@Volatile
60-
var subscription: Disposable? = null
60+
private val _subscription = atomic<Disposable?>(null)
6161

6262
@Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER")
6363
override fun onClosedIdempotent(closed: LockFreeLinkedListNode) {
64-
subscription?.dispose()
65-
subscription = null // optimization -- no need to dispose it again
64+
_subscription.getAndSet(null)?.dispose() // dispose exactly once
6665
}
6766

6867
// Observer overrider
6968
override fun onSubscribe(sub: Disposable) {
70-
subscription = sub
69+
_subscription.value = sub
7170
}
7271

7372
override fun onSuccess(t: T) {

0 commit comments

Comments
 (0)