-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Introduce non-nullable types in reactive integrations where appropriate #3393
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
ac0fd0e
34c08dd
508ce3a
6b6fbd2
fa472f9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -41,9 +41,8 @@ public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine | |
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this | ||
* function immediately resumes with [CancellationException] and disposes of its subscription. | ||
*/ | ||
@Suppress("UNCHECKED_CAST") | ||
public suspend fun <T> MaybeSource<T>.awaitSingleOrNull(): T? = suspendCancellableCoroutine { cont -> | ||
subscribe(object : MaybeObserver<T> { | ||
public suspend fun <T> MaybeSource<T & Any>.awaitSingleOrNull(): T? = suspendCancellableCoroutine { cont -> | ||
subscribe(object : MaybeObserver<T & Any> { | ||
override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) } | ||
override fun onComplete() { cont.resume(null) } | ||
override fun onSuccess(t: T) { cont.resume(t) } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This bound seems to be properly propagated from the declaration site There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since the object is implementing Or you know what, actually, even if we were implementing plain I think that's why, with the current parameter type of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for pointing it out. T & Any is required since Kotlin 1.8.0, it was an omission on the language site |
||
|
@@ -61,7 +60,7 @@ public suspend fun <T> MaybeSource<T>.awaitSingleOrNull(): T? = suspendCancellab | |
* | ||
* @throws NoSuchElementException if no elements were produced by this [MaybeSource]. | ||
*/ | ||
public suspend fun <T> MaybeSource<T>.awaitSingle(): T = awaitSingleOrNull() ?: throw NoSuchElementException() | ||
public suspend fun <T> MaybeSource<T & Any>.awaitSingle(): T = awaitSingleOrNull() ?: throw NoSuchElementException() | ||
|
||
/** | ||
* Awaits for completion of the maybe without blocking a thread. | ||
|
@@ -84,7 +83,7 @@ public suspend fun <T> MaybeSource<T>.awaitSingle(): T = awaitSingleOrNull() ?: | |
level = DeprecationLevel.ERROR, | ||
replaceWith = ReplaceWith("this.awaitSingleOrNull()") | ||
) // Warning since 1.5, error in 1.6, hidden in 1.7 | ||
public suspend fun <T> MaybeSource<T>.await(): T? = awaitSingleOrNull() | ||
public suspend fun <T> MaybeSource<T & Any>.await(): T? = awaitSingleOrNull() | ||
|
||
/** | ||
* Awaits for completion of the maybe without blocking a thread. | ||
|
@@ -107,7 +106,7 @@ public suspend fun <T> MaybeSource<T>.await(): T? = awaitSingleOrNull() | |
level = DeprecationLevel.ERROR, | ||
replaceWith = ReplaceWith("this.awaitSingleOrNull() ?: default") | ||
) // Warning since 1.5, error in 1.6, hidden in 1.7 | ||
public suspend fun <T> MaybeSource<T>.awaitOrDefault(default: T): T = awaitSingleOrNull() ?: default | ||
public suspend fun <T> MaybeSource<T & Any>.awaitOrDefault(default: T): T = awaitSingleOrNull() ?: default | ||
|
||
// ------------------------ SingleSource ------------------------ | ||
|
||
|
@@ -119,10 +118,10 @@ public suspend fun <T> MaybeSource<T>.awaitOrDefault(default: T): T = awaitSingl | |
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this | ||
* function immediately disposes of its subscription and resumes with [CancellationException]. | ||
*/ | ||
public suspend fun <T> SingleSource<T>.await(): T = suspendCancellableCoroutine { cont -> | ||
subscribe(object : SingleObserver<T> { | ||
public suspend fun <T> SingleSource<T & Any>.await(): T = suspendCancellableCoroutine { cont -> | ||
subscribe(object : SingleObserver<T & Any> { | ||
override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) } | ||
override fun onSuccess(t: T) { cont.resume(t) } | ||
override fun onSuccess(t: T & Any) { cont.resume(t) } | ||
override fun onError(error: Throwable) { cont.resumeWithException(error) } | ||
}) | ||
} | ||
|
@@ -139,7 +138,8 @@ public suspend fun <T> SingleSource<T>.await(): T = suspendCancellableCoroutine | |
* | ||
* @throws NoSuchElementException if the observable does not emit any value | ||
*/ | ||
public suspend fun <T> ObservableSource<T>.awaitFirst(): T = awaitOne(Mode.FIRST) | ||
@Suppress("UNCHECKED_CAST") | ||
public suspend fun <T> ObservableSource<T & Any>.awaitFirst(): T = awaitOne(Mode.FIRST) as T | ||
|
||
/** | ||
* Awaits the first value from the given [Observable], or returns the [default] value if none is emitted, without | ||
|
@@ -150,7 +150,9 @@ public suspend fun <T> ObservableSource<T>.awaitFirst(): T = awaitOne(Mode.FIRST | |
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this | ||
* function immediately disposes of its subscription and resumes with [CancellationException]. | ||
*/ | ||
public suspend fun <T> ObservableSource<T>.awaitFirstOrDefault(default: T): T = awaitOne(Mode.FIRST_OR_DEFAULT, default) | ||
@Suppress("UNCHECKED_CAST") | ||
public suspend fun <T> ObservableSource<T & Any>.awaitFirstOrDefault(default: T): T = | ||
awaitOne(Mode.FIRST_OR_DEFAULT, default) as T | ||
|
||
/** | ||
* Awaits the first value from the given [Observable], or returns `null` if none is emitted, without blocking the | ||
|
@@ -161,7 +163,7 @@ public suspend fun <T> ObservableSource<T>.awaitFirstOrDefault(default: T): T = | |
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this | ||
* function immediately disposes of its subscription and resumes with [CancellationException]. | ||
*/ | ||
public suspend fun <T> ObservableSource<T>.awaitFirstOrNull(): T? = awaitOne(Mode.FIRST_OR_DEFAULT) | ||
public suspend fun <T> ObservableSource<T & Any>.awaitFirstOrNull(): T? = awaitOne(Mode.FIRST_OR_DEFAULT) | ||
|
||
/** | ||
* Awaits the first value from the given [Observable], or calls [defaultValue] to get a value if none is emitted, | ||
|
@@ -172,7 +174,7 @@ public suspend fun <T> ObservableSource<T>.awaitFirstOrNull(): T? = awaitOne(Mod | |
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this | ||
* function immediately disposes of its subscription and resumes with [CancellationException]. | ||
*/ | ||
public suspend fun <T> ObservableSource<T>.awaitFirstOrElse(defaultValue: () -> T): T = | ||
public suspend fun <T> ObservableSource<T & Any>.awaitFirstOrElse(defaultValue: () -> T): T = | ||
awaitOne(Mode.FIRST_OR_DEFAULT) ?: defaultValue() | ||
|
||
/** | ||
|
@@ -185,7 +187,8 @@ public suspend fun <T> ObservableSource<T>.awaitFirstOrElse(defaultValue: () -> | |
* | ||
* @throws NoSuchElementException if the observable does not emit any value | ||
*/ | ||
public suspend fun <T> ObservableSource<T>.awaitLast(): T = awaitOne(Mode.LAST) | ||
@Suppress("UNCHECKED_CAST") | ||
public suspend fun <T> ObservableSource<T & Any>.awaitLast(): T = awaitOne(Mode.LAST) as T | ||
|
||
/** | ||
* Awaits the single value from the given observable without blocking the thread and returns the resulting value, or, | ||
|
@@ -198,26 +201,27 @@ public suspend fun <T> ObservableSource<T>.awaitLast(): T = awaitOne(Mode.LAST) | |
* @throws NoSuchElementException if the observable does not emit any value | ||
* @throws IllegalArgumentException if the observable emits more than one value | ||
*/ | ||
public suspend fun <T> ObservableSource<T>.awaitSingle(): T = awaitOne(Mode.SINGLE) | ||
@Suppress("UNCHECKED_CAST") | ||
public suspend fun <T> ObservableSource<T & Any>.awaitSingle(): T = awaitOne(Mode.SINGLE) as T | ||
|
||
// ------------------------ private ------------------------ | ||
|
||
internal fun CancellableContinuation<*>.disposeOnCancellation(d: Disposable) = | ||
invokeOnCancellation { d.dispose() } | ||
|
||
private enum class Mode(val s: String) { | ||
private enum class Mode(@JvmField val s: String) { | ||
FIRST("awaitFirst"), | ||
FIRST_OR_DEFAULT("awaitFirstOrDefault"), | ||
LAST("awaitLast"), | ||
SINGLE("awaitSingle"); | ||
override fun toString(): String = s | ||
} | ||
|
||
private suspend fun <T> ObservableSource<T>.awaitOne( | ||
private suspend fun <T> ObservableSource<T & Any>.awaitOne( | ||
mode: Mode, | ||
default: T? = null | ||
): T = suspendCancellableCoroutine { cont -> | ||
subscribe(object : Observer<T> { | ||
): T? = suspendCancellableCoroutine { cont -> | ||
subscribe(object : Observer<T & Any> { | ||
private lateinit var subscription: Disposable | ||
private var value: T? = null | ||
private var seenValue = false | ||
|
@@ -227,7 +231,7 @@ private suspend fun <T> ObservableSource<T>.awaitOne( | |
cont.invokeOnCancellation { sub.dispose() } | ||
} | ||
|
||
override fun onNext(t: T) { | ||
override fun onNext(t: T & Any) { | ||
when (mode) { | ||
Mode.FIRST, Mode.FIRST_OR_DEFAULT -> { | ||
if (!seenValue) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -42,7 +42,7 @@ public fun Job.asCompletable(context: CoroutineContext): Completable = rxComplet | |
* | ||
* @param context -- the coroutine context from which the resulting maybe is going to be signalled | ||
*/ | ||
public fun <T> Deferred<T?>.asMaybe(context: CoroutineContext): Maybe<T> = rxMaybe(context) { | ||
public fun <T> Deferred<T?>.asMaybe(context: CoroutineContext): Maybe<T & Any> = rxMaybe(context) { | ||
[email protected]() | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure exactly what
-progressive
enables, but I notice that I can get a few more errors (noted above) if I enable the flags from #3007. (Thanks for pointing me here from there!)