5
5
package kotlinx.coroutines.rx2
6
6
7
7
import io.reactivex.*
8
- import io.reactivex.disposables.Disposable
9
- import kotlinx.coroutines.CancellableContinuation
10
- import kotlinx.coroutines.CancellationException
11
- import kotlinx.coroutines.Job
12
- import kotlinx.coroutines.suspendCancellableCoroutine
8
+ import io.reactivex.disposables.*
9
+ import kotlinx.coroutines.*
13
10
import kotlin.coroutines.*
14
11
15
12
// ------------------------ CompletableSource ------------------------
@@ -43,12 +40,12 @@ public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine
43
40
*/
44
41
@Suppress(" UNCHECKED_CAST" )
45
42
public suspend fun <T > MaybeSource<T>.awaitSingleOrNull (): T ? = suspendCancellableCoroutine { cont ->
46
- subscribe(object : MaybeObserver <T > {
43
+ subscribe(object : MaybeObserver <T & Any > {
47
44
override fun onSubscribe (d : Disposable ) { cont.disposeOnCancellation(d) }
48
45
override fun onComplete () { cont.resume(null ) }
49
- override fun onSuccess (t : T ) { cont.resume(t) }
46
+ override fun onSuccess (t : T & Any ) { cont.resume(t) }
50
47
override fun onError (error : Throwable ) { cont.resumeWithException(error) }
51
- })
48
+ } as MaybeObserver < T > )
52
49
}
53
50
54
51
/* *
@@ -118,11 +115,11 @@ public suspend fun <T> MaybeSource<T>.awaitOrDefault(default: T): T = awaitSingl
118
115
* function immediately disposes of its subscription and resumes with [CancellationException].
119
116
*/
120
117
public suspend fun <T > SingleSource<T>.await (): T = suspendCancellableCoroutine { cont ->
121
- subscribe(object : SingleObserver <T > {
118
+ subscribe(object : SingleObserver <T & Any > {
122
119
override fun onSubscribe (d : Disposable ) { cont.disposeOnCancellation(d) }
123
- override fun onSuccess (t : T ) { cont.resume(t) }
120
+ override fun onSuccess (t : T & Any ) { cont.resume(t) }
124
121
override fun onError (error : Throwable ) { cont.resumeWithException(error) }
125
- })
122
+ } as SingleObserver < T > )
126
123
}
127
124
128
125
// ------------------------ ObservableSource ------------------------
@@ -215,7 +212,7 @@ private suspend fun <T> ObservableSource<T>.awaitOne(
215
212
mode : Mode ,
216
213
default : T ? = null
217
214
): T = suspendCancellableCoroutine { cont ->
218
- subscribe(object : Observer <T > {
215
+ subscribe(object : Observer <T & Any > {
219
216
private lateinit var subscription: Disposable
220
217
private var value: T ? = null
221
218
private var seenValue = false
@@ -225,7 +222,7 @@ private suspend fun <T> ObservableSource<T>.awaitOne(
225
222
cont.invokeOnCancellation { sub.dispose() }
226
223
}
227
224
228
- override fun onNext (t : T ) {
225
+ override fun onNext (t : T & Any ) {
229
226
when (mode) {
230
227
Mode .FIRST , Mode .FIRST_OR_DEFAULT -> {
231
228
if (! seenValue) {
@@ -266,6 +263,6 @@ private suspend fun <T> ObservableSource<T>.awaitOne(
266
263
override fun onError (e : Throwable ) {
267
264
cont.resumeWithException(e)
268
265
}
269
- })
266
+ } as Observer < T > )
270
267
}
271
268
0 commit comments