@@ -43,12 +43,12 @@ public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine
43
43
*/
44
44
@Suppress(" UNCHECKED_CAST" )
45
45
public suspend fun <T > MaybeSource<T>.awaitSingleOrNull (): T ? = suspendCancellableCoroutine { cont ->
46
- subscribe(object : MaybeObserver <T > {
46
+ subscribe(object : MaybeObserver <T & Any > {
47
47
override fun onSubscribe (d : Disposable ) { cont.disposeOnCancellation(d) }
48
48
override fun onComplete () { cont.resume(null ) }
49
- override fun onSuccess (t : T ) { cont.resume(t) }
49
+ override fun onSuccess (t : T & Any ) { cont.resume(t) }
50
50
override fun onError (error : Throwable ) { cont.resumeWithException(error) }
51
- })
51
+ } as MaybeObserver < T > )
52
52
}
53
53
54
54
/* *
@@ -120,11 +120,12 @@ public suspend fun <T> MaybeSource<T>.awaitOrDefault(default: T): T = awaitSingl
120
120
* function immediately disposes of its subscription and resumes with [CancellationException].
121
121
*/
122
122
public suspend fun <T > SingleSource<T>.await (): T = suspendCancellableCoroutine { cont ->
123
- subscribe(object : SingleObserver <T > {
123
+ @Suppress(" UNCHECKED_CAST" )
124
+ subscribe(object : SingleObserver <T & Any > {
124
125
override fun onSubscribe (d : Disposable ) { cont.disposeOnCancellation(d) }
125
- override fun onSuccess (t : T ) { cont.resume(t) }
126
+ override fun onSuccess (t : T & Any ) { cont.resume(t) }
126
127
override fun onError (error : Throwable ) { cont.resumeWithException(error) }
127
- })
128
+ } as SingleObserver < T > )
128
129
}
129
130
130
131
// ------------------------ ObservableSource ------------------------
@@ -217,7 +218,8 @@ private suspend fun <T> ObservableSource<T>.awaitOne(
217
218
mode : Mode ,
218
219
default : T ? = null
219
220
): T = suspendCancellableCoroutine { cont ->
220
- subscribe(object : Observer <T > {
221
+ @Suppress(" UNCHECKED_CAST" )
222
+ subscribe(object : Observer <T & Any > {
221
223
private lateinit var subscription: Disposable
222
224
private var value: T ? = null
223
225
private var seenValue = false
@@ -227,7 +229,7 @@ private suspend fun <T> ObservableSource<T>.awaitOne(
227
229
cont.invokeOnCancellation { sub.dispose() }
228
230
}
229
231
230
- override fun onNext (t : T ) {
232
+ override fun onNext (t : T & Any ) {
231
233
when (mode) {
232
234
Mode .FIRST , Mode .FIRST_OR_DEFAULT -> {
233
235
if (! seenValue) {
@@ -249,7 +251,6 @@ private suspend fun <T> ObservableSource<T>.awaitOne(
249
251
}
250
252
}
251
253
252
- @Suppress(" UNCHECKED_CAST" )
253
254
override fun onComplete () {
254
255
if (seenValue) {
255
256
if (cont.isActive) cont.resume(value as T )
@@ -268,6 +269,6 @@ private suspend fun <T> ObservableSource<T>.awaitOne(
268
269
override fun onError (e : Throwable ) {
269
270
cont.resumeWithException(e)
270
271
}
271
- })
272
+ } as Observer < T > )
272
273
}
273
274
0 commit comments