@@ -9,7 +9,6 @@ import org.reactivestreams.Publisher
9
9
import org.reactivestreams.Subscriber
10
10
import org.reactivestreams.Subscription
11
11
import java.lang.IllegalStateException
12
- import java.util.*
13
12
import kotlin.NoSuchElementException
14
13
import kotlin.coroutines.*
15
14
@@ -26,8 +25,8 @@ import kotlin.coroutines.*
26
25
public suspend fun <T > Publisher<T>.awaitFirst (): T = awaitOne(Mode .FIRST )
27
26
28
27
/* *
29
- * Awaits the first value from the given observable , or returns the [default] value if none is emitted, without blocking
30
- * the thread, and returns the resulting value, or, if this observable has produced an error, throws the corresponding
28
+ * Awaits the first value from the given publisher , or returns the [default] value if none is emitted, without blocking
29
+ * the thread, and returns the resulting value, or, if this publisher has produced an error, throws the corresponding
31
30
* exception.
32
31
*
33
32
* This suspending function is cancellable.
@@ -37,8 +36,8 @@ public suspend fun <T> Publisher<T>.awaitFirst(): T = awaitOne(Mode.FIRST)
37
36
public suspend fun <T > Publisher<T>.awaitFirstOrDefault (default : T ): T = awaitOne(Mode .FIRST_OR_DEFAULT , default)
38
37
39
38
/* *
40
- * Awaits the first value from the given observable , or returns `null` if none is emitted, without blocking the thread,
41
- * and returns the resulting value, or, if this observable has produced an error, throws the corresponding exception.
39
+ * Awaits the first value from the given publisher , or returns `null` if none is emitted, without blocking the thread,
40
+ * and returns the resulting value, or, if this publisher has produced an error, throws the corresponding exception.
42
41
*
43
42
* This suspending function is cancellable.
44
43
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
@@ -47,8 +46,8 @@ public suspend fun <T> Publisher<T>.awaitFirstOrDefault(default: T): T = awaitOn
47
46
public suspend fun <T > Publisher<T>.awaitFirstOrNull (): T ? = awaitOne(Mode .FIRST_OR_DEFAULT )
48
47
49
48
/* *
50
- * Awaits the first value from the given observable , or calls [defaultValue] to get a value if none is emitted, without
51
- * blocking the thread, and returns the resulting value, or, if this observable has produced an error, throws the
49
+ * Awaits the first value from the given publisher , or calls [defaultValue] to get a value if none is emitted, without
50
+ * blocking the thread, and returns the resulting value, or, if this publisher has produced an error, throws the
52
51
* corresponding exception.
53
52
*
54
53
* This suspending function is cancellable.
@@ -83,8 +82,8 @@ public suspend fun <T> Publisher<T>.awaitLast(): T = awaitOne(Mode.LAST)
83
82
public suspend fun <T > Publisher<T>.awaitSingle (): T = awaitOne(Mode .SINGLE )
84
83
85
84
/* *
86
- * Awaits the single value from the given observable , or returns the [default] value if none is emitted, without
87
- * blocking the thread, and returns the resulting value, or, if this observable has produced an error, throws the
85
+ * Awaits the single value from the given publisher , or returns the [default] value if none is emitted, without
86
+ * blocking the thread, and returns the resulting value, or, if this publisher has produced an error, throws the
88
87
* corresponding exception.
89
88
*
90
89
* This suspending function is cancellable.
@@ -94,26 +93,34 @@ public suspend fun <T> Publisher<T>.awaitSingle(): T = awaitOne(Mode.SINGLE)
94
93
* @throws NoSuchElementException if the publisher does not emit any value
95
94
* @throws IllegalArgumentException if the publisher emits more than one value
96
95
*/
96
+ @Deprecated(
97
+ message = " Deprecated without a replacement due to its name incorrectly conveying the behavior" ,
98
+ level = DeprecationLevel .WARNING
99
+ )
97
100
public suspend fun <T > Publisher<T>.awaitSingleOrDefault (default : T ): T = awaitOne(Mode .SINGLE_OR_DEFAULT , default)
98
101
99
102
/* *
100
- * Awaits the single value from the given observable without blocking the thread and returns the resulting value, or, if
101
- * this observable has produced an error, throws the corresponding exception. If more than one value or none were
103
+ * Awaits the single value from the given publisher without blocking the thread and returns the resulting value, or, if
104
+ * this publisher has produced an error, throws the corresponding exception. If more than one value or none were
102
105
* produced by the publisher, `null` is returned.
103
106
*
104
107
* This suspending function is cancellable.
105
108
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
106
109
* function immediately cancels its [Subscription] and resumes with [CancellationException].
110
+ *
111
+ * @throws IllegalArgumentException if the publisher emits more than one value
107
112
*/
108
- public suspend fun <T > Publisher<T>.awaitSingleOrNull (): T ? = try {
109
- awaitOne(Mode .SINGLE_OR_DEFAULT )
110
- } catch (e: TooManyElementsException ) {
111
- null
112
- }
113
+ @Deprecated(
114
+ message = " Deprecated without a replacement due to its name incorrectly conveying the behavior. " +
115
+ " There is a specialized version for Reactor's Mono, please use that where applicable." ,
116
+ level = DeprecationLevel .WARNING ,
117
+ replaceWith = ReplaceWith (" this.awaitSingleOrNull()" , " kotlinx.coroutines.reactor" )
118
+ )
119
+ public suspend fun <T > Publisher<T>.awaitSingleOrNull (): T ? = awaitOne(Mode .SINGLE_OR_DEFAULT )
113
120
114
121
/* *
115
- * Awaits the single value from the given observable , or calls [defaultValue] to get a value if none is emitted, without
116
- * blocking the thread, and returns the resulting value, or, if this observable has produced an error, throws the
122
+ * Awaits the single value from the given publisher , or calls [defaultValue] to get a value if none is emitted, without
123
+ * blocking the thread, and returns the resulting value, or, if this publisher has produced an error, throws the
117
124
* corresponding exception.
118
125
*
119
126
* This suspending function is cancellable.
@@ -122,13 +129,15 @@ public suspend fun <T> Publisher<T>.awaitSingleOrNull(): T? = try {
122
129
*
123
130
* @throws IllegalArgumentException if the publisher emits more than one value
124
131
*/
132
+ @Deprecated(
133
+ message = " Deprecated without a replacement due to its name incorrectly conveying the behavior" ,
134
+ level = DeprecationLevel .WARNING
135
+ )
125
136
public suspend fun <T > Publisher<T>.awaitSingleOrElse (defaultValue : () -> T ): T =
126
137
awaitOne(Mode .SINGLE_OR_DEFAULT ) ? : defaultValue()
127
138
128
139
// ------------------------ private ------------------------
129
140
130
- private class TooManyElementsException (message : String ): IllegalArgumentException(message)
131
-
132
141
private enum class Mode (val s : String ) {
133
142
FIRST (" awaitFirst" ),
134
143
FIRST_OR_DEFAULT (" awaitFirstOrDefault" ),
@@ -195,7 +204,7 @@ private suspend fun <T> Publisher<T>.awaitOne(
195
204
/* the check for `cont.isActive` is needed in case `sub.cancel() above calls `onComplete` or
196
205
`onError` on its own. */
197
206
if (cont.isActive) {
198
- cont.resumeWithException(TooManyElementsException (" More than one onNext value for $mode " ))
207
+ cont.resumeWithException(IllegalArgumentException (" More than one onNext value for $mode " ))
199
208
}
200
209
} else {
201
210
value = t
0 commit comments