diff --git a/build.gradle b/build.gradle index 26b598ff5d..fc1426e6bb 100644 --- a/build.gradle +++ b/build.gradle @@ -169,8 +169,8 @@ configure(subprojects.findAll { !sourceless.contains(it.name) }) { tasks.withType(org.jetbrains.kotlin.gradle.tasks.AbstractKotlinCompile).all { kotlinOptions.freeCompilerArgs += OptInPresetKt.optInAnnotations.collect { "-Xopt-in=" + it } kotlinOptions.freeCompilerArgs += "-progressive" - // Disable KT-36770 for RxJava2 integration - kotlinOptions.freeCompilerArgs += "-XXLanguage:-ProhibitUsingNullableTypeParameterAgainstNotNullAnnotated" + // Internal implementation details of our Rx integration + kotlinOptions.freeCompilerArgs += "-XXLanguage:+DefinitelyNonNullableTypes" // Remove null assertions to get smaller bytecode on Android kotlinOptions.freeCompilerArgs += ["-Xno-param-assertions", "-Xno-receiver-assertions", "-Xno-call-assertions"] } diff --git a/reactive/kotlinx-coroutines-rx2/src/RxAwait.kt b/reactive/kotlinx-coroutines-rx2/src/RxAwait.kt index 0e0b47ebe8..e8d3db7763 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxAwait.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxAwait.kt @@ -5,11 +5,8 @@ package kotlinx.coroutines.rx2 import io.reactivex.* -import io.reactivex.disposables.Disposable -import kotlinx.coroutines.CancellableContinuation -import kotlinx.coroutines.CancellationException -import kotlinx.coroutines.Job -import kotlinx.coroutines.suspendCancellableCoroutine +import io.reactivex.disposables.* +import kotlinx.coroutines.* import kotlin.coroutines.* // ------------------------ CompletableSource ------------------------ @@ -43,12 +40,12 @@ public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine */ @Suppress("UNCHECKED_CAST") public suspend fun MaybeSource.awaitSingleOrNull(): T? = suspendCancellableCoroutine { cont -> - subscribe(object : MaybeObserver { + subscribe(object : MaybeObserver { override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) } override fun onComplete() { cont.resume(null) } - override fun onSuccess(t: T) { cont.resume(t) } + override fun onSuccess(t: T & Any) { cont.resume(t) } override fun onError(error: Throwable) { cont.resumeWithException(error) } - }) + } as MaybeObserver) } /** @@ -118,11 +115,12 @@ public suspend fun MaybeSource.awaitOrDefault(default: T): T = awaitSingl * function immediately disposes of its subscription and resumes with [CancellationException]. */ public suspend fun SingleSource.await(): T = suspendCancellableCoroutine { cont -> - subscribe(object : SingleObserver { + @Suppress("UNCHECKED_CAST") + subscribe(object : SingleObserver { override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) } - override fun onSuccess(t: T) { cont.resume(t) } + override fun onSuccess(t: T & Any) { cont.resume(t) } override fun onError(error: Throwable) { cont.resumeWithException(error) } - }) + } as SingleObserver) } // ------------------------ ObservableSource ------------------------ @@ -215,7 +213,8 @@ private suspend fun ObservableSource.awaitOne( mode: Mode, default: T? = null ): T = suspendCancellableCoroutine { cont -> - subscribe(object : Observer { + @Suppress("UNCHECKED_CAST") + subscribe(object : Observer { private lateinit var subscription: Disposable private var value: T? = null private var seenValue = false @@ -225,7 +224,7 @@ private suspend fun ObservableSource.awaitOne( cont.invokeOnCancellation { sub.dispose() } } - override fun onNext(t: T) { + override fun onNext(t: T & Any) { when (mode) { Mode.FIRST, Mode.FIRST_OR_DEFAULT -> { if (!seenValue) { @@ -266,6 +265,6 @@ private suspend fun ObservableSource.awaitOne( override fun onError(e: Throwable) { cont.resumeWithException(e) } - }) + } as Observer) } diff --git a/reactive/kotlinx-coroutines-rx2/src/RxChannel.kt b/reactive/kotlinx-coroutines-rx2/src/RxChannel.kt index bb093b0793..5a30162ded 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxChannel.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxChannel.kt @@ -23,7 +23,7 @@ import kotlinx.coroutines.reactive.* @Deprecated(message = "Deprecated in the favour of Flow", level = DeprecationLevel.ERROR) // Will be hidden in 1.5 public fun MaybeSource.openSubscription(): ReceiveChannel { val channel = SubscriptionChannel() - subscribe(channel) + subscribe(channel as MaybeObserver) return channel } @@ -38,7 +38,7 @@ public fun MaybeSource.openSubscription(): ReceiveChannel { @Deprecated(message = "Deprecated in the favour of Flow", level = DeprecationLevel.ERROR) // Will be hidden in 1.5 public fun ObservableSource.openSubscription(): ReceiveChannel { val channel = SubscriptionChannel() - subscribe(channel) + subscribe(channel as Observer) return channel } @@ -63,20 +63,20 @@ public suspend inline fun ObservableSource.collect(action: (T) -> Unit): @PublishedApi internal fun MaybeSource.toChannel(): ReceiveChannel { val channel = SubscriptionChannel() - subscribe(channel) + subscribe(channel as MaybeObserver) return channel } @PublishedApi internal fun ObservableSource.toChannel(): ReceiveChannel { val channel = SubscriptionChannel() - subscribe(channel) + subscribe(channel as Observer) return channel } @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") private class SubscriptionChannel : - LinkedListChannel(null), Observer, MaybeObserver + LinkedListChannel(null), Observer, MaybeObserver { private val _subscription = atomic(null) @@ -90,12 +90,12 @@ private class SubscriptionChannel : _subscription.value = sub } - override fun onSuccess(t: T) { + override fun onSuccess(t: T & Any) { trySend(t) close(cause = null) } - override fun onNext(t: T) { + override fun onNext(t: T & Any) { trySend(t) // Safe to ignore return value here, expectedly racing with cancellation } diff --git a/reactive/kotlinx-coroutines-rx3/src/RxAwait.kt b/reactive/kotlinx-coroutines-rx3/src/RxAwait.kt index 2a14cf7c6c..dee9a1aa49 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxAwait.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxAwait.kt @@ -43,12 +43,12 @@ public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine */ @Suppress("UNCHECKED_CAST") public suspend fun MaybeSource.awaitSingleOrNull(): T? = suspendCancellableCoroutine { cont -> - subscribe(object : MaybeObserver { + subscribe(object : MaybeObserver { override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) } override fun onComplete() { cont.resume(null) } - override fun onSuccess(t: T) { cont.resume(t) } + override fun onSuccess(t: T & Any) { cont.resume(t) } override fun onError(error: Throwable) { cont.resumeWithException(error) } - }) + } as MaybeObserver) } /** @@ -120,11 +120,12 @@ public suspend fun MaybeSource.awaitOrDefault(default: T): T = awaitSingl * function immediately disposes of its subscription and resumes with [CancellationException]. */ public suspend fun SingleSource.await(): T = suspendCancellableCoroutine { cont -> - subscribe(object : SingleObserver { + @Suppress("UNCHECKED_CAST") + subscribe(object : SingleObserver { override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) } - override fun onSuccess(t: T) { cont.resume(t) } + override fun onSuccess(t: T & Any) { cont.resume(t) } override fun onError(error: Throwable) { cont.resumeWithException(error) } - }) + } as SingleObserver) } // ------------------------ ObservableSource ------------------------ @@ -217,7 +218,8 @@ private suspend fun ObservableSource.awaitOne( mode: Mode, default: T? = null ): T = suspendCancellableCoroutine { cont -> - subscribe(object : Observer { + @Suppress("UNCHECKED_CAST") + subscribe(object : Observer { private lateinit var subscription: Disposable private var value: T? = null private var seenValue = false @@ -227,7 +229,7 @@ private suspend fun ObservableSource.awaitOne( cont.invokeOnCancellation { sub.dispose() } } - override fun onNext(t: T) { + override fun onNext(t: T & Any) { when (mode) { Mode.FIRST, Mode.FIRST_OR_DEFAULT -> { if (!seenValue) { @@ -249,7 +251,6 @@ private suspend fun ObservableSource.awaitOne( } } - @Suppress("UNCHECKED_CAST") override fun onComplete() { if (seenValue) { if (cont.isActive) cont.resume(value as T) @@ -268,6 +269,6 @@ private suspend fun ObservableSource.awaitOne( override fun onError(e: Throwable) { cont.resumeWithException(e) } - }) + } as Observer) } diff --git a/reactive/kotlinx-coroutines-rx3/src/RxChannel.kt b/reactive/kotlinx-coroutines-rx3/src/RxChannel.kt index 21238d2491..9ab943f8bc 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxChannel.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxChannel.kt @@ -21,7 +21,8 @@ import kotlinx.coroutines.flow.* @PublishedApi internal fun MaybeSource.openSubscription(): ReceiveChannel { val channel = SubscriptionChannel() - subscribe(channel) + @Suppress("UNCHECKED_CAST") + subscribe(channel as MaybeObserver) return channel } @@ -35,7 +36,8 @@ internal fun MaybeSource.openSubscription(): ReceiveChannel { @PublishedApi internal fun ObservableSource.openSubscription(): ReceiveChannel { val channel = SubscriptionChannel() - subscribe(channel) + @Suppress("UNCHECKED_CAST") + subscribe(channel as Observer) return channel } @@ -59,7 +61,7 @@ public suspend inline fun ObservableSource.collect(action: (T) -> Unit): @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") private class SubscriptionChannel : - LinkedListChannel(null), Observer, MaybeObserver + LinkedListChannel(null), Observer, MaybeObserver { private val _subscription = atomic(null) @@ -73,12 +75,12 @@ private class SubscriptionChannel : _subscription.value = sub } - override fun onSuccess(t: T) { + override fun onSuccess(t: T & Any) { trySend(t) close(cause = null) } - override fun onNext(t: T) { + override fun onNext(t: T & Any) { trySend(t) // Safe to ignore return value here, expectedly racing with cancellation }