@@ -10,7 +10,6 @@ import kotlinx.coroutines.selects.*
10
10
import kotlinx.coroutines.sync.*
11
11
import org.reactivestreams.*
12
12
import kotlin.coroutines.*
13
- import kotlin.internal.*
14
13
15
14
/* *
16
15
* Creates cold reactive [Publisher] that runs a given [block] in a coroutine.
@@ -69,34 +68,32 @@ public class PublisherCoroutine<in T>(
69
68
) : AbstractCoroutine<Unit>(parentContext, false , true ), ProducerScope<T>, Subscription, SelectClause2<T, SendChannel<T>> {
70
69
override val channel: SendChannel <T > get() = this
71
70
72
- // Mutex is locked when either nRequested == 0 or while subscriber.onXXX is being invoked
71
+ // Mutex is locked when either nRequested == 0, unless `isCompleted`, or while subscriber.onXXX is being invoked
73
72
private val mutex = Mutex (locked = true )
74
73
private val _nRequested = atomic(0L ) // < 0 when closed (CLOSED or SIGNALLED)
75
74
76
75
@Volatile
77
- private var cancelled = false // true when Subscription.cancel() is invoked
76
+ private var cancelled = false // true after Subscription.cancel() is invoked
78
77
79
78
override val isClosedForSend: Boolean get() = isCompleted
80
79
override fun close (cause : Throwable ? ): Boolean = cancelCoroutine(cause)
81
80
override fun invokeOnClose (handler : (Throwable ? ) -> Unit ): Nothing =
82
81
throw UnsupportedOperationException (" PublisherCoroutine doesn't support invokeOnClose" )
83
82
84
- override fun trySend (element : T ): ChannelResult <Unit > {
85
- if (! mutex.tryLock()) return ChannelResult .failure()
86
- doLockedNext(element)
87
- return ChannelResult .success(Unit )
88
- }
83
+ // TODO: will throw if `null` is passed -- is throwing this kind of programmer-induced errors okay?
84
+ override fun trySend (element : T ): ChannelResult <Unit > =
85
+ if (! mutex.tryLock()) {
86
+ ChannelResult .failure()
87
+ } else {
88
+ when (val throwable = doLockedNext(element)) {
89
+ null -> ChannelResult .success(Unit )
90
+ else -> ChannelResult .closed(throwable)
91
+ }
92
+ }
89
93
90
94
public override suspend fun send (element : T ) {
91
- // fast-path -- try send without suspension
92
- if (offer(element)) return
93
- // slow-path does suspend
94
- return sendSuspend(element)
95
- }
96
-
97
- private suspend fun sendSuspend (element : T ) {
98
95
mutex.lock()
99
- doLockedNext(element)
96
+ doLockedNext(element)?. let { throw it }
100
97
}
101
98
102
99
override val onSend: SelectClause2 <T , SendChannel <T >>
@@ -106,13 +103,13 @@ public class PublisherCoroutine<in T>(
106
103
@Suppress(" PARAMETER_NAME_CHANGED_ON_OVERRIDE" )
107
104
override fun <R > registerSelectClause2 (select : SelectInstance <R >, element : T , block : suspend (SendChannel <T >) -> R ) {
108
105
mutex.onLock.registerSelectClause2(select, null ) {
109
- doLockedNext(element)
106
+ doLockedNext(element)?. let { throw it }
110
107
block(this )
111
108
}
112
109
}
113
110
114
111
/*
115
- * This code is not trivial because of the two properties:
112
+ * This code is not trivial because of the following properties:
116
113
* 1. It ensures conformance to the reactive specification that mandates that onXXX invocations should not
117
114
* be concurrent. It uses Mutex to protect all onXXX invocation and ensure conformance even when multiple
118
115
* coroutines are invoking `send` function.
@@ -121,27 +118,60 @@ public class PublisherCoroutine<in T>(
121
118
* globally-scoped coroutine that is invoking `send` outside of this context. Without extra precaution this may
122
119
* lead to `onNext` that is concurrent with `onComplete/onError`, so that is why signalling for
123
120
* `onComplete/onError` is also done under the same mutex.
121
+ * 3. The reactive specification forbids emitting more elements than requested, so `onNext` is forbidden until the
122
+ * subscriber actually requests some elements. This is implemented by the mutex being locked when emitting
123
+ * elements is not permitted (`_nRequested.value == 0`).
124
124
*/
125
125
126
- // assert: mutex.isLocked()
127
- private fun doLockedNext (elem : T ) {
128
- // check if already closed for send, note that isActive becomes false as soon as cancel() is invoked,
129
- // because the job is cancelled, so this check also ensure conformance to the reactive specification's
130
- // requirement that after cancellation requested we don't call onXXX
126
+ /* *
127
+ * Attempts to emit a value to the subscriber and, if back-pressure permits this, unlock the mutex.
128
+ *
129
+ * Requires that the caller has locked the mutex before this invocation.
130
+ *
131
+ * If the channel is closed, returns the corresponding [Throwable]; otherwise, returns `null` to denote success.
132
+ *
133
+ * @throws NullPointerException if the passed element is `null`
134
+ */
135
+ private fun doLockedNext (elem : T ): Throwable ? {
136
+ if (elem == null ) {
137
+ throw NullPointerException (" Can not emit null" )
138
+ }
139
+ /* * This guards against the case when the caller of this function managed to lock the mutex not because some
140
+ * elements were requested--and thus it is permitted to call `onNext`--but because the channel was closed.
141
+ *
142
+ * It may look like there is a race condition here between `isActive` and a concurrent cancellation, but it's
143
+ * okay for a cancellation to happen during `onNext`, as the reactive spec only requires that we *eventually*
144
+ * stop signalling the subscriber. */
131
145
if (! isActive) {
132
146
unlockAndCheckCompleted()
133
- throw getCancellationException()
147
+ return getCancellationException()
134
148
}
135
- // notify subscriber
149
+ // notify the subscriber
136
150
try {
137
151
subscriber.onNext(elem)
138
- } catch (e: Throwable ) {
139
- // If onNext fails with exception, then we cancel coroutine (with this exception) and then rethrow it
140
- // to abort the corresponding send/offer invocation. From the standpoint of coroutines machinery,
141
- // this failure is essentially equivalent to a failure of a child coroutine.
142
- cancelCoroutine(e)
152
+ } catch (cause: Throwable ) {
153
+ /* * The reactive streams spec forbids the subscribers from throwing from [Subscriber.onNext] unless the
154
+ * element is `null`, which we check not to be the case. Therefore, we report this exception to the handler
155
+ * for uncaught exceptions and consider the subscription cancelled, as mandated by
156
+ * https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#2.13.
157
+ *
158
+ * Some reactive implementations, like RxJava or Reactor, are known to throw from [Subscriber.onNext] if the
159
+ * execution encounters an exception they consider to be "fatal", like [VirtualMachineError] or
160
+ * [ThreadDeath]. Us using the handler for the undeliverable exceptions to signal "fatal" exceptions is
161
+ * inconsistent with RxJava and Reactor, which attempt to bubble the exception up the call chain as soon as
162
+ * possible. However, we can't do much better here, as simply throwing from all methods indiscriminately
163
+ * would violate the contracts we place on them. */
164
+ cancelled = true
165
+ val causeDelivered = close(cause)
143
166
unlockAndCheckCompleted()
144
- throw e
167
+ return if (causeDelivered) {
168
+ // `cause` is the reason this channel is closed
169
+ cause
170
+ } else {
171
+ // Someone else closed the channel during `onNext`. We report `cause` as an undeliverable exception.
172
+ exceptionOnCancelHandler(cause, context)
173
+ getCancellationException()
174
+ }
145
175
}
146
176
// now update nRequested
147
177
while (true ) { // lock-free loop on nRequested
@@ -152,12 +182,13 @@ public class PublisherCoroutine<in T>(
152
182
if (_nRequested .compareAndSet(current, updated)) {
153
183
if (updated == 0L ) {
154
184
// return to keep locked due to back-pressure
155
- return
185
+ return null
156
186
}
157
187
break // unlock if updated > 0
158
188
}
159
189
}
160
190
unlockAndCheckCompleted()
191
+ return null
161
192
}
162
193
163
194
private fun unlockAndCheckCompleted () {
@@ -177,38 +208,31 @@ public class PublisherCoroutine<in T>(
177
208
// assert: mutex.isLocked() & isCompleted
178
209
private fun doLockedSignalCompleted (cause : Throwable ? , handled : Boolean ) {
179
210
try {
180
- if (_nRequested .value >= CLOSED ) {
181
- _nRequested .value = SIGNALLED // we'll signal onError/onCompleted (that the final state -- no CAS needed)
182
- // Specification requires that after cancellation requested we don't call onXXX
183
- if (cancelled) {
184
- // If the parent had failed to handle our exception, then we must not lose this exception
185
- if (cause != null && ! handled) exceptionOnCancelHandler(cause, context)
186
- return
187
- }
188
-
211
+ if (_nRequested .value == SIGNALLED )
212
+ return
213
+ _nRequested .value = SIGNALLED // we'll signal onError/onCompleted (the final state, so no CAS needed)
214
+ // Specification requires that after the cancellation is requested we eventually stop calling onXXX
215
+ if (cancelled) {
216
+ // If the parent had failed to handle our exception, then we must not lose this exception
217
+ if (cause != null && ! handled) exceptionOnCancelHandler(cause, context)
218
+ return
219
+ }
220
+ if (cause == null ) {
189
221
try {
190
- if (cause != null && cause !is CancellationException ) {
191
- /*
192
- * Reactive frameworks have two types of exceptions: regular and fatal.
193
- * Regular are passed to onError.
194
- * Fatal can be passed to onError, but even the standard implementations **can just swallow it** (e.g. see #1297).
195
- * Such behaviour is inconsistent, leads to silent failures and we can't possibly know whether
196
- * the cause will be handled by onError (and moreover, it depends on whether a fatal exception was
197
- * thrown by subscriber or upstream).
198
- * To make behaviour consistent and least surprising, we always handle fatal exceptions
199
- * by coroutines machinery, anyway, they should not be present in regular program flow,
200
- * thus our goal here is just to expose it as soon as possible.
201
- */
202
- subscriber.onError(cause)
203
- if (! handled && cause.isFatal()) {
204
- exceptionOnCancelHandler(cause, context)
205
- }
206
- } else {
207
- subscriber.onComplete()
208
- }
222
+ subscriber.onComplete()
209
223
} catch (e: Throwable ) {
210
224
handleCoroutineException(context, e)
211
225
}
226
+ } else {
227
+ try {
228
+ // This can't be the cancellation exception from `cancel`, as then `cancelled` would be `true`.
229
+ subscriber.onError(cause)
230
+ } catch (e: Throwable ) {
231
+ if (e != = cause) {
232
+ cause.addSuppressed(e)
233
+ }
234
+ handleCoroutineException(context, cause)
235
+ }
212
236
}
213
237
} finally {
214
238
mutex.unlock()
@@ -217,20 +241,25 @@ public class PublisherCoroutine<in T>(
217
241
218
242
override fun request (n : Long ) {
219
243
if (n <= 0 ) {
220
- // Specification requires IAE for n <= 0
244
+ // Specification requires to call onError with IAE for n <= 0
221
245
cancelCoroutine(IllegalArgumentException (" non-positive subscription request $n " ))
222
246
return
223
247
}
224
248
while (true ) { // lock-free loop for nRequested
225
249
val cur = _nRequested .value
226
- if (cur < 0 ) return // already closed for send, ignore requests
250
+ if (cur < 0 ) return // already closed for send, ignore requests, as mandated by the reactive streams spec
227
251
var upd = cur + n
228
252
if (upd < 0 || n == Long .MAX_VALUE )
229
253
upd = Long .MAX_VALUE
230
254
if (cur == upd) return // nothing to do
231
255
if (_nRequested .compareAndSet(cur, upd)) {
232
256
// unlock the mutex when we don't have back-pressure anymore
233
257
if (cur == 0L ) {
258
+ /* * In a sense, after a successful CAS, it is this invocation, not the coroutine itself, that owns
259
+ * the lock, given that `upd` is necessarily strictly positive. Thus, no other operation has the
260
+ * right to lower the value on [_nRequested], it can only grow or become [CLOSED]. Therefore, it is
261
+ * impossible for any other operations to assume that they own the lock without actually acquiring
262
+ * it. */
234
263
unlockAndCheckCompleted()
235
264
}
236
265
return
@@ -271,8 +300,6 @@ public class PublisherCoroutine<in T>(
271
300
cancelled = true
272
301
super .cancel(null )
273
302
}
274
-
275
- private fun Throwable.isFatal () = this is VirtualMachineError || this is ThreadDeath || this is LinkageError
276
303
}
277
304
278
305
@Deprecated(
0 commit comments