@@ -146,11 +146,9 @@ private suspend fun <T> Publisher<T>.awaitOne(
146
146
override fun onSubscribe (sub : Subscription ) {
147
147
/* * cancelling the existing subscription due to rule 2.5, though the publisher would either have to
148
148
* subscribe more than once, which would break 2.12, or leak this [Subscriber]. */
149
- subscription?.let {
150
- value = null
151
- seenValue = false
152
- inTerminalState = false
153
- it.cancel()
149
+ if (subscription != null ) {
150
+ sub.cancel()
151
+ return
154
152
}
155
153
subscription = sub
156
154
cont.invokeOnCancellation { sub.cancel() }
@@ -185,6 +183,7 @@ private suspend fun <T> Publisher<T>.awaitOne(
185
183
Mode .LAST , Mode .SINGLE , Mode .SINGLE_OR_DEFAULT -> {
186
184
if ((mode == Mode .SINGLE || mode == Mode .SINGLE_OR_DEFAULT ) && seenValue) {
187
185
sub.cancel()
186
+ // the check for `cont.isActive` is just a slight optimization and doesn't affect correctness
188
187
if (cont.isActive)
189
188
cont.resumeWithException(IllegalArgumentException (" More than one onNext value for $mode " ))
190
189
} else {
@@ -200,14 +199,17 @@ private suspend fun <T> Publisher<T>.awaitOne(
200
199
if (! tryEnterTerminalState(" onComplete" ))
201
200
return
202
201
if (seenValue) {
203
- if (cont.isActive) cont.resume(value as T )
202
+ // the check for `cont.isActive` is just a slight optimization and doesn't affect correctness
203
+ if (mode != Mode .FIRST_OR_DEFAULT && mode != Mode .FIRST && cont.isActive)
204
+ cont.resume(value as T )
204
205
return
205
206
}
206
207
when {
207
208
(mode == Mode .FIRST_OR_DEFAULT || mode == Mode .SINGLE_OR_DEFAULT ) -> {
208
209
cont.resume(default as T )
209
210
}
210
211
cont.isActive -> {
212
+ // the check for `cont.isActive` is just a slight optimization and doesn't affect correctness
211
213
cont.resumeWithException(NoSuchElementException (" No value received via onNext for $mode " ))
212
214
}
213
215
}
0 commit comments