From 416fc5e24d06b72acb101f1e0502fcd830aeeef6 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Mon, 27 Sep 2021 13:34:36 +0300 Subject: [PATCH 1/2] Leverage definitely non-nullable types --- build.gradle | 1 - reactive/kotlinx-coroutines-rx2/build.gradle | 13 ++++++++++ .../kotlinx-coroutines-rx2/src/RxAwait.kt | 25 ++++++++----------- .../kotlinx-coroutines-rx2/src/RxChannel.kt | 14 +++++------ 4 files changed, 31 insertions(+), 22 deletions(-) diff --git a/build.gradle b/build.gradle index 26b598ff5d..93bd4a0b16 100644 --- a/build.gradle +++ b/build.gradle @@ -170,7 +170,6 @@ configure(subprojects.findAll { !sourceless.contains(it.name) }) { kotlinOptions.freeCompilerArgs += OptInPresetKt.optInAnnotations.collect { "-Xopt-in=" + it } kotlinOptions.freeCompilerArgs += "-progressive" // Disable KT-36770 for RxJava2 integration - kotlinOptions.freeCompilerArgs += "-XXLanguage:-ProhibitUsingNullableTypeParameterAgainstNotNullAnnotated" // Remove null assertions to get smaller bytecode on Android kotlinOptions.freeCompilerArgs += ["-Xno-param-assertions", "-Xno-receiver-assertions", "-Xno-call-assertions"] } diff --git a/reactive/kotlinx-coroutines-rx2/build.gradle b/reactive/kotlinx-coroutines-rx2/build.gradle index b6fd93274c..0e5f48051c 100644 --- a/reactive/kotlinx-coroutines-rx2/build.gradle +++ b/reactive/kotlinx-coroutines-rx2/build.gradle @@ -18,6 +18,19 @@ tasks.withType(DokkaTaskPartial.class) { } } } +tasks { + compileKotlin { + kotlinOptions { + languageVersion = "1.7" + } + } + + compileTestKotlin { + kotlinOptions { + languageVersion = "1.7" + } + } +} task testNG(type: Test) { useTestNG() diff --git a/reactive/kotlinx-coroutines-rx2/src/RxAwait.kt b/reactive/kotlinx-coroutines-rx2/src/RxAwait.kt index 0e0b47ebe8..e74f9925d5 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxAwait.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxAwait.kt @@ -5,11 +5,8 @@ package kotlinx.coroutines.rx2 import io.reactivex.* -import io.reactivex.disposables.Disposable -import kotlinx.coroutines.CancellableContinuation -import kotlinx.coroutines.CancellationException -import kotlinx.coroutines.Job -import kotlinx.coroutines.suspendCancellableCoroutine +import io.reactivex.disposables.* +import kotlinx.coroutines.* import kotlin.coroutines.* // ------------------------ CompletableSource ------------------------ @@ -43,12 +40,12 @@ public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine */ @Suppress("UNCHECKED_CAST") public suspend fun MaybeSource.awaitSingleOrNull(): T? = suspendCancellableCoroutine { cont -> - subscribe(object : MaybeObserver { + subscribe(object : MaybeObserver { override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) } override fun onComplete() { cont.resume(null) } - override fun onSuccess(t: T) { cont.resume(t) } + override fun onSuccess(t: T & Any) { cont.resume(t) } override fun onError(error: Throwable) { cont.resumeWithException(error) } - }) + } as MaybeObserver) } /** @@ -118,11 +115,11 @@ public suspend fun MaybeSource.awaitOrDefault(default: T): T = awaitSingl * function immediately disposes of its subscription and resumes with [CancellationException]. */ public suspend fun SingleSource.await(): T = suspendCancellableCoroutine { cont -> - subscribe(object : SingleObserver { + subscribe(object : SingleObserver { override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) } - override fun onSuccess(t: T) { cont.resume(t) } + override fun onSuccess(t: T & Any) { cont.resume(t) } override fun onError(error: Throwable) { cont.resumeWithException(error) } - }) + } as SingleObserver) } // ------------------------ ObservableSource ------------------------ @@ -215,7 +212,7 @@ private suspend fun ObservableSource.awaitOne( mode: Mode, default: T? = null ): T = suspendCancellableCoroutine { cont -> - subscribe(object : Observer { + subscribe(object : Observer { private lateinit var subscription: Disposable private var value: T? = null private var seenValue = false @@ -225,7 +222,7 @@ private suspend fun ObservableSource.awaitOne( cont.invokeOnCancellation { sub.dispose() } } - override fun onNext(t: T) { + override fun onNext(t: T & Any) { when (mode) { Mode.FIRST, Mode.FIRST_OR_DEFAULT -> { if (!seenValue) { @@ -266,6 +263,6 @@ private suspend fun ObservableSource.awaitOne( override fun onError(e: Throwable) { cont.resumeWithException(e) } - }) + } as Observer) } diff --git a/reactive/kotlinx-coroutines-rx2/src/RxChannel.kt b/reactive/kotlinx-coroutines-rx2/src/RxChannel.kt index bb093b0793..5a30162ded 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxChannel.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxChannel.kt @@ -23,7 +23,7 @@ import kotlinx.coroutines.reactive.* @Deprecated(message = "Deprecated in the favour of Flow", level = DeprecationLevel.ERROR) // Will be hidden in 1.5 public fun MaybeSource.openSubscription(): ReceiveChannel { val channel = SubscriptionChannel() - subscribe(channel) + subscribe(channel as MaybeObserver) return channel } @@ -38,7 +38,7 @@ public fun MaybeSource.openSubscription(): ReceiveChannel { @Deprecated(message = "Deprecated in the favour of Flow", level = DeprecationLevel.ERROR) // Will be hidden in 1.5 public fun ObservableSource.openSubscription(): ReceiveChannel { val channel = SubscriptionChannel() - subscribe(channel) + subscribe(channel as Observer) return channel } @@ -63,20 +63,20 @@ public suspend inline fun ObservableSource.collect(action: (T) -> Unit): @PublishedApi internal fun MaybeSource.toChannel(): ReceiveChannel { val channel = SubscriptionChannel() - subscribe(channel) + subscribe(channel as MaybeObserver) return channel } @PublishedApi internal fun ObservableSource.toChannel(): ReceiveChannel { val channel = SubscriptionChannel() - subscribe(channel) + subscribe(channel as Observer) return channel } @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") private class SubscriptionChannel : - LinkedListChannel(null), Observer, MaybeObserver + LinkedListChannel(null), Observer, MaybeObserver { private val _subscription = atomic(null) @@ -90,12 +90,12 @@ private class SubscriptionChannel : _subscription.value = sub } - override fun onSuccess(t: T) { + override fun onSuccess(t: T & Any) { trySend(t) close(cause = null) } - override fun onNext(t: T) { + override fun onNext(t: T & Any) { trySend(t) // Safe to ignore return value here, expectedly racing with cancellation } From 187cb25e1ece72be91e18f3b61375746307bacb3 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Wed, 13 Oct 2021 14:12:05 +0300 Subject: [PATCH 2/2] Update to newer Kotlin, update RxJava3 module --- build.gradle | 3 ++- reactive/kotlinx-coroutines-rx2/build.gradle | 13 ------------ .../kotlinx-coroutines-rx2/src/RxAwait.kt | 2 ++ .../kotlinx-coroutines-rx3/src/RxAwait.kt | 21 ++++++++++--------- .../kotlinx-coroutines-rx3/src/RxChannel.kt | 12 ++++++----- 5 files changed, 22 insertions(+), 29 deletions(-) diff --git a/build.gradle b/build.gradle index 93bd4a0b16..fc1426e6bb 100644 --- a/build.gradle +++ b/build.gradle @@ -169,7 +169,8 @@ configure(subprojects.findAll { !sourceless.contains(it.name) }) { tasks.withType(org.jetbrains.kotlin.gradle.tasks.AbstractKotlinCompile).all { kotlinOptions.freeCompilerArgs += OptInPresetKt.optInAnnotations.collect { "-Xopt-in=" + it } kotlinOptions.freeCompilerArgs += "-progressive" - // Disable KT-36770 for RxJava2 integration + // Internal implementation details of our Rx integration + kotlinOptions.freeCompilerArgs += "-XXLanguage:+DefinitelyNonNullableTypes" // Remove null assertions to get smaller bytecode on Android kotlinOptions.freeCompilerArgs += ["-Xno-param-assertions", "-Xno-receiver-assertions", "-Xno-call-assertions"] } diff --git a/reactive/kotlinx-coroutines-rx2/build.gradle b/reactive/kotlinx-coroutines-rx2/build.gradle index 0e5f48051c..b6fd93274c 100644 --- a/reactive/kotlinx-coroutines-rx2/build.gradle +++ b/reactive/kotlinx-coroutines-rx2/build.gradle @@ -18,19 +18,6 @@ tasks.withType(DokkaTaskPartial.class) { } } } -tasks { - compileKotlin { - kotlinOptions { - languageVersion = "1.7" - } - } - - compileTestKotlin { - kotlinOptions { - languageVersion = "1.7" - } - } -} task testNG(type: Test) { useTestNG() diff --git a/reactive/kotlinx-coroutines-rx2/src/RxAwait.kt b/reactive/kotlinx-coroutines-rx2/src/RxAwait.kt index e74f9925d5..e8d3db7763 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxAwait.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxAwait.kt @@ -115,6 +115,7 @@ public suspend fun MaybeSource.awaitOrDefault(default: T): T = awaitSingl * function immediately disposes of its subscription and resumes with [CancellationException]. */ public suspend fun SingleSource.await(): T = suspendCancellableCoroutine { cont -> + @Suppress("UNCHECKED_CAST") subscribe(object : SingleObserver { override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) } override fun onSuccess(t: T & Any) { cont.resume(t) } @@ -212,6 +213,7 @@ private suspend fun ObservableSource.awaitOne( mode: Mode, default: T? = null ): T = suspendCancellableCoroutine { cont -> + @Suppress("UNCHECKED_CAST") subscribe(object : Observer { private lateinit var subscription: Disposable private var value: T? = null diff --git a/reactive/kotlinx-coroutines-rx3/src/RxAwait.kt b/reactive/kotlinx-coroutines-rx3/src/RxAwait.kt index 2a14cf7c6c..dee9a1aa49 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxAwait.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxAwait.kt @@ -43,12 +43,12 @@ public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine */ @Suppress("UNCHECKED_CAST") public suspend fun MaybeSource.awaitSingleOrNull(): T? = suspendCancellableCoroutine { cont -> - subscribe(object : MaybeObserver { + subscribe(object : MaybeObserver { override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) } override fun onComplete() { cont.resume(null) } - override fun onSuccess(t: T) { cont.resume(t) } + override fun onSuccess(t: T & Any) { cont.resume(t) } override fun onError(error: Throwable) { cont.resumeWithException(error) } - }) + } as MaybeObserver) } /** @@ -120,11 +120,12 @@ public suspend fun MaybeSource.awaitOrDefault(default: T): T = awaitSingl * function immediately disposes of its subscription and resumes with [CancellationException]. */ public suspend fun SingleSource.await(): T = suspendCancellableCoroutine { cont -> - subscribe(object : SingleObserver { + @Suppress("UNCHECKED_CAST") + subscribe(object : SingleObserver { override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) } - override fun onSuccess(t: T) { cont.resume(t) } + override fun onSuccess(t: T & Any) { cont.resume(t) } override fun onError(error: Throwable) { cont.resumeWithException(error) } - }) + } as SingleObserver) } // ------------------------ ObservableSource ------------------------ @@ -217,7 +218,8 @@ private suspend fun ObservableSource.awaitOne( mode: Mode, default: T? = null ): T = suspendCancellableCoroutine { cont -> - subscribe(object : Observer { + @Suppress("UNCHECKED_CAST") + subscribe(object : Observer { private lateinit var subscription: Disposable private var value: T? = null private var seenValue = false @@ -227,7 +229,7 @@ private suspend fun ObservableSource.awaitOne( cont.invokeOnCancellation { sub.dispose() } } - override fun onNext(t: T) { + override fun onNext(t: T & Any) { when (mode) { Mode.FIRST, Mode.FIRST_OR_DEFAULT -> { if (!seenValue) { @@ -249,7 +251,6 @@ private suspend fun ObservableSource.awaitOne( } } - @Suppress("UNCHECKED_CAST") override fun onComplete() { if (seenValue) { if (cont.isActive) cont.resume(value as T) @@ -268,6 +269,6 @@ private suspend fun ObservableSource.awaitOne( override fun onError(e: Throwable) { cont.resumeWithException(e) } - }) + } as Observer) } diff --git a/reactive/kotlinx-coroutines-rx3/src/RxChannel.kt b/reactive/kotlinx-coroutines-rx3/src/RxChannel.kt index 21238d2491..9ab943f8bc 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxChannel.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxChannel.kt @@ -21,7 +21,8 @@ import kotlinx.coroutines.flow.* @PublishedApi internal fun MaybeSource.openSubscription(): ReceiveChannel { val channel = SubscriptionChannel() - subscribe(channel) + @Suppress("UNCHECKED_CAST") + subscribe(channel as MaybeObserver) return channel } @@ -35,7 +36,8 @@ internal fun MaybeSource.openSubscription(): ReceiveChannel { @PublishedApi internal fun ObservableSource.openSubscription(): ReceiveChannel { val channel = SubscriptionChannel() - subscribe(channel) + @Suppress("UNCHECKED_CAST") + subscribe(channel as Observer) return channel } @@ -59,7 +61,7 @@ public suspend inline fun ObservableSource.collect(action: (T) -> Unit): @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") private class SubscriptionChannel : - LinkedListChannel(null), Observer, MaybeObserver + LinkedListChannel(null), Observer, MaybeObserver { private val _subscription = atomic(null) @@ -73,12 +75,12 @@ private class SubscriptionChannel : _subscription.value = sub } - override fun onSuccess(t: T) { + override fun onSuccess(t: T & Any) { trySend(t) close(cause = null) } - override fun onNext(t: T) { + override fun onNext(t: T & Any) { trySend(t) // Safe to ignore return value here, expectedly racing with cancellation }