Skip to content

Leverage definitely non-nullable types #2954

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,8 @@ configure(subprojects.findAll { !sourceless.contains(it.name) }) {
tasks.withType(org.jetbrains.kotlin.gradle.tasks.AbstractKotlinCompile).all {
kotlinOptions.freeCompilerArgs += OptInPresetKt.optInAnnotations.collect { "-Xopt-in=" + it }
kotlinOptions.freeCompilerArgs += "-progressive"
// Disable KT-36770 for RxJava2 integration
kotlinOptions.freeCompilerArgs += "-XXLanguage:-ProhibitUsingNullableTypeParameterAgainstNotNullAnnotated"
// Internal implementation details of our Rx integration
kotlinOptions.freeCompilerArgs += "-XXLanguage:+DefinitelyNonNullableTypes"
// Remove null assertions to get smaller bytecode on Android
kotlinOptions.freeCompilerArgs += ["-Xno-param-assertions", "-Xno-receiver-assertions", "-Xno-call-assertions"]
}
Expand Down
27 changes: 13 additions & 14 deletions reactive/kotlinx-coroutines-rx2/src/RxAwait.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,8 @@
package kotlinx.coroutines.rx2

import io.reactivex.*
import io.reactivex.disposables.Disposable
import kotlinx.coroutines.CancellableContinuation
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.Job
import kotlinx.coroutines.suspendCancellableCoroutine
import io.reactivex.disposables.*
import kotlinx.coroutines.*
import kotlin.coroutines.*

// ------------------------ CompletableSource ------------------------
Expand Down Expand Up @@ -43,12 +40,12 @@ public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine
*/
@Suppress("UNCHECKED_CAST")
public suspend fun <T> MaybeSource<T>.awaitSingleOrNull(): T? = suspendCancellableCoroutine { cont ->
subscribe(object : MaybeObserver<T> {
subscribe(object : MaybeObserver<T & Any> {
override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
override fun onComplete() { cont.resume(null) }
override fun onSuccess(t: T) { cont.resume(t) }
override fun onSuccess(t: T & Any) { cont.resume(t) }
override fun onError(error: Throwable) { cont.resumeWithException(error) }
})
} as MaybeObserver<T>)
}

/**
Expand Down Expand Up @@ -118,11 +115,12 @@ public suspend fun <T> MaybeSource<T>.awaitOrDefault(default: T): T = awaitSingl
* function immediately disposes of its subscription and resumes with [CancellationException].
*/
public suspend fun <T> SingleSource<T>.await(): T = suspendCancellableCoroutine { cont ->
subscribe(object : SingleObserver<T> {
@Suppress("UNCHECKED_CAST")
subscribe(object : SingleObserver<T & Any> {
override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
override fun onSuccess(t: T) { cont.resume(t) }
override fun onSuccess(t: T & Any) { cont.resume(t) }
override fun onError(error: Throwable) { cont.resumeWithException(error) }
})
} as SingleObserver<T>)
}

// ------------------------ ObservableSource ------------------------
Expand Down Expand Up @@ -215,7 +213,8 @@ private suspend fun <T> ObservableSource<T>.awaitOne(
mode: Mode,
default: T? = null
): T = suspendCancellableCoroutine { cont ->
subscribe(object : Observer<T> {
@Suppress("UNCHECKED_CAST")
subscribe(object : Observer<T & Any> {
private lateinit var subscription: Disposable
private var value: T? = null
private var seenValue = false
Expand All @@ -225,7 +224,7 @@ private suspend fun <T> ObservableSource<T>.awaitOne(
cont.invokeOnCancellation { sub.dispose() }
}

override fun onNext(t: T) {
override fun onNext(t: T & Any) {
when (mode) {
Mode.FIRST, Mode.FIRST_OR_DEFAULT -> {
if (!seenValue) {
Expand Down Expand Up @@ -266,6 +265,6 @@ private suspend fun <T> ObservableSource<T>.awaitOne(
override fun onError(e: Throwable) {
cont.resumeWithException(e)
}
})
} as Observer<T>)
}

14 changes: 7 additions & 7 deletions reactive/kotlinx-coroutines-rx2/src/RxChannel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import kotlinx.coroutines.reactive.*
@Deprecated(message = "Deprecated in the favour of Flow", level = DeprecationLevel.ERROR) // Will be hidden in 1.5
public fun <T> MaybeSource<T>.openSubscription(): ReceiveChannel<T> {
val channel = SubscriptionChannel<T>()
subscribe(channel)
subscribe(channel as MaybeObserver<T>)
return channel
}

Expand All @@ -38,7 +38,7 @@ public fun <T> MaybeSource<T>.openSubscription(): ReceiveChannel<T> {
@Deprecated(message = "Deprecated in the favour of Flow", level = DeprecationLevel.ERROR) // Will be hidden in 1.5
public fun <T> ObservableSource<T>.openSubscription(): ReceiveChannel<T> {
val channel = SubscriptionChannel<T>()
subscribe(channel)
subscribe(channel as Observer<T>)
return channel
}

Expand All @@ -63,20 +63,20 @@ public suspend inline fun <T> ObservableSource<T>.collect(action: (T) -> Unit):
@PublishedApi
internal fun <T> MaybeSource<T>.toChannel(): ReceiveChannel<T> {
val channel = SubscriptionChannel<T>()
subscribe(channel)
subscribe(channel as MaybeObserver<T>)
return channel
}

@PublishedApi
internal fun <T> ObservableSource<T>.toChannel(): ReceiveChannel<T> {
val channel = SubscriptionChannel<T>()
subscribe(channel)
subscribe(channel as Observer<T>)
return channel
}

@Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
private class SubscriptionChannel<T> :
LinkedListChannel<T>(null), Observer<T>, MaybeObserver<T>
LinkedListChannel<T>(null), Observer<T & Any>, MaybeObserver<T & Any>
{
private val _subscription = atomic<Disposable?>(null)

Expand All @@ -90,12 +90,12 @@ private class SubscriptionChannel<T> :
_subscription.value = sub
}

override fun onSuccess(t: T) {
override fun onSuccess(t: T & Any) {
trySend(t)
close(cause = null)
}

override fun onNext(t: T) {
override fun onNext(t: T & Any) {
trySend(t) // Safe to ignore return value here, expectedly racing with cancellation
}

Expand Down
21 changes: 11 additions & 10 deletions reactive/kotlinx-coroutines-rx3/src/RxAwait.kt
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@ public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine
*/
@Suppress("UNCHECKED_CAST")
public suspend fun <T> MaybeSource<T>.awaitSingleOrNull(): T? = suspendCancellableCoroutine { cont ->
subscribe(object : MaybeObserver<T> {
subscribe(object : MaybeObserver<T & Any> {
override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
override fun onComplete() { cont.resume(null) }
override fun onSuccess(t: T) { cont.resume(t) }
override fun onSuccess(t: T & Any) { cont.resume(t) }
override fun onError(error: Throwable) { cont.resumeWithException(error) }
})
} as MaybeObserver<T>)
}

/**
Expand Down Expand Up @@ -120,11 +120,12 @@ public suspend fun <T> MaybeSource<T>.awaitOrDefault(default: T): T = awaitSingl
* function immediately disposes of its subscription and resumes with [CancellationException].
*/
public suspend fun <T> SingleSource<T>.await(): T = suspendCancellableCoroutine { cont ->
subscribe(object : SingleObserver<T> {
@Suppress("UNCHECKED_CAST")
subscribe(object : SingleObserver<T & Any> {
override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
override fun onSuccess(t: T) { cont.resume(t) }
override fun onSuccess(t: T & Any) { cont.resume(t) }
override fun onError(error: Throwable) { cont.resumeWithException(error) }
})
} as SingleObserver<T>)
}

// ------------------------ ObservableSource ------------------------
Expand Down Expand Up @@ -217,7 +218,8 @@ private suspend fun <T> ObservableSource<T>.awaitOne(
mode: Mode,
default: T? = null
): T = suspendCancellableCoroutine { cont ->
subscribe(object : Observer<T> {
@Suppress("UNCHECKED_CAST")
subscribe(object : Observer<T & Any> {
private lateinit var subscription: Disposable
private var value: T? = null
private var seenValue = false
Expand All @@ -227,7 +229,7 @@ private suspend fun <T> ObservableSource<T>.awaitOne(
cont.invokeOnCancellation { sub.dispose() }
}

override fun onNext(t: T) {
override fun onNext(t: T & Any) {
when (mode) {
Mode.FIRST, Mode.FIRST_OR_DEFAULT -> {
if (!seenValue) {
Expand All @@ -249,7 +251,6 @@ private suspend fun <T> ObservableSource<T>.awaitOne(
}
}

@Suppress("UNCHECKED_CAST")
override fun onComplete() {
if (seenValue) {
if (cont.isActive) cont.resume(value as T)
Expand All @@ -268,6 +269,6 @@ private suspend fun <T> ObservableSource<T>.awaitOne(
override fun onError(e: Throwable) {
cont.resumeWithException(e)
}
})
} as Observer<T>)
}

12 changes: 7 additions & 5 deletions reactive/kotlinx-coroutines-rx3/src/RxChannel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import kotlinx.coroutines.flow.*
@PublishedApi
internal fun <T> MaybeSource<T>.openSubscription(): ReceiveChannel<T> {
val channel = SubscriptionChannel<T>()
subscribe(channel)
@Suppress("UNCHECKED_CAST")
subscribe(channel as MaybeObserver<T>)
return channel
}

Expand All @@ -35,7 +36,8 @@ internal fun <T> MaybeSource<T>.openSubscription(): ReceiveChannel<T> {
@PublishedApi
internal fun <T> ObservableSource<T>.openSubscription(): ReceiveChannel<T> {
val channel = SubscriptionChannel<T>()
subscribe(channel)
@Suppress("UNCHECKED_CAST")
subscribe(channel as Observer<T>)
return channel
}

Expand All @@ -59,7 +61,7 @@ public suspend inline fun <T> ObservableSource<T>.collect(action: (T) -> Unit):

@Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
private class SubscriptionChannel<T> :
LinkedListChannel<T>(null), Observer<T>, MaybeObserver<T>
LinkedListChannel<T>(null), Observer<T & Any>, MaybeObserver<T & Any>
{
private val _subscription = atomic<Disposable?>(null)

Expand All @@ -73,12 +75,12 @@ private class SubscriptionChannel<T> :
_subscription.value = sub
}

override fun onSuccess(t: T) {
override fun onSuccess(t: T & Any) {
trySend(t)
close(cause = null)
}

override fun onNext(t: T) {
override fun onNext(t: T & Any) {
trySend(t) // Safe to ignore return value here, expectedly racing with cancellation
}

Expand Down