From ac0fd0e942fd631146e87132c1283b3f96b8f1d5 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Fri, 5 Aug 2022 16:19:01 +0200 Subject: [PATCH 1/5] Introduce non-nullable types in reactive integrations where appropriate * For RxJava2, use them in internal implementations where appropriate * For RxJava3, introduce & Any bound to generic argument in our extensions to avoid errors in Kotlin 1.8.0 due to non-nullability rx3 annotations being part of generics upper bound. This change went through commitee, and all the "unsound" declarations such as "RxSignature" were properly highlighted as warning that will become an error. --- build.gradle | 3 +- .../kotlinx-coroutines-rx2/src/RxAwait.kt | 58 ++++++++++++++----- .../kotlinx-coroutines-rx2/src/RxChannel.kt | 16 ++--- .../kotlinx-coroutines-rx3/src/RxAwait.kt | 44 +++++++------- 4 files changed, 77 insertions(+), 44 deletions(-) diff --git a/build.gradle b/build.gradle index 47f18bd067..882d38e39e 100644 --- a/build.gradle +++ b/build.gradle @@ -176,8 +176,7 @@ configure(subprojects.findAll { !sourceless.contains(it.name) }) { tasks.withType(AbstractKotlinCompile).all { kotlinOptions.freeCompilerArgs += OptInPreset.optInAnnotations.collect { "-Xopt-in=" + it } kotlinOptions.freeCompilerArgs += "-progressive" - // Disable KT-36770 for RxJava2 integration - kotlinOptions.freeCompilerArgs += "-XXLanguage:-ProhibitUsingNullableTypeParameterAgainstNotNullAnnotated" + kotlinOptions.freeCompilerArgs += "-XXLanguage:+ProhibitUsingNullableTypeParameterAgainstNotNullAnnotated" // Remove null assertions to get smaller bytecode on Android kotlinOptions.freeCompilerArgs += ["-Xno-param-assertions", "-Xno-receiver-assertions", "-Xno-call-assertions"] } diff --git a/reactive/kotlinx-coroutines-rx2/src/RxAwait.kt b/reactive/kotlinx-coroutines-rx2/src/RxAwait.kt index da9809c9f8..1091849f3c 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxAwait.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxAwait.kt @@ -24,9 +24,17 @@ import kotlin.coroutines.* */ public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine { cont -> subscribe(object : CompletableObserver { - override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) } - override fun onComplete() { cont.resume(Unit) } - override fun onError(e: Throwable) { cont.resumeWithException(e) } + override fun onSubscribe(d: Disposable) { + cont.disposeOnCancellation(d) + } + + override fun onComplete() { + cont.resume(Unit) + } + + override fun onError(e: Throwable) { + cont.resumeWithException(e) + } }) } @@ -43,12 +51,23 @@ public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine */ @Suppress("UNCHECKED_CAST") public suspend fun MaybeSource.awaitSingleOrNull(): T? = suspendCancellableCoroutine { cont -> - subscribe(object : MaybeObserver { - override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) } - override fun onComplete() { cont.resume(null) } - override fun onSuccess(t: T) { cont.resume(t) } - override fun onError(error: Throwable) { cont.resumeWithException(error) } - }) + subscribe(object : MaybeObserver { + override fun onSubscribe(d: Disposable) { + cont.disposeOnCancellation(d) + } + + override fun onComplete() { + cont.resume(null) + } + + override fun onSuccess(t: T & Any) { + cont.resume(t) + } + + override fun onError(error: Throwable) { + cont.resumeWithException(error) + } + } as MaybeObserver) } /** @@ -117,12 +136,21 @@ public suspend fun MaybeSource.awaitOrDefault(default: T): T = awaitSingl * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this * function immediately disposes of its subscription and resumes with [CancellationException]. */ +@Suppress("UNCHECKED_CAST") public suspend fun SingleSource.await(): T = suspendCancellableCoroutine { cont -> - subscribe(object : SingleObserver { - override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) } - override fun onSuccess(t: T) { cont.resume(t) } - override fun onError(error: Throwable) { cont.resumeWithException(error) } - }) + subscribe(object : SingleObserver { + override fun onSubscribe(d: Disposable) { + cont.disposeOnCancellation(d) + } + + override fun onSuccess(t: T & Any) { + cont.resume(t) + } + + override fun onError(error: Throwable) { + cont.resumeWithException(error) + } + } as SingleObserver) } // ------------------------ ObservableSource ------------------------ @@ -225,7 +253,7 @@ private suspend fun ObservableSource.awaitOne( cont.invokeOnCancellation { sub.dispose() } } - override fun onNext(t: T) { + override fun onNext(t: T & Any) { when (mode) { Mode.FIRST, Mode.FIRST_OR_DEFAULT -> { if (!seenValue) { diff --git a/reactive/kotlinx-coroutines-rx2/src/RxChannel.kt b/reactive/kotlinx-coroutines-rx2/src/RxChannel.kt index fc09bf9ee3..3ca9f89297 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxChannel.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxChannel.kt @@ -31,22 +31,24 @@ public suspend inline fun ObservableSource.collect(action: (T) -> Unit): toChannel().consumeEach(action) @PublishedApi +@Suppress("UNCHECKED_CAST") internal fun MaybeSource.toChannel(): ReceiveChannel { val channel = SubscriptionChannel() - subscribe(channel) + (this as MaybeSource).subscribe(channel) return channel } @PublishedApi +@Suppress("UNCHECKED_CAST") internal fun ObservableSource.toChannel(): ReceiveChannel { val channel = SubscriptionChannel() - subscribe(channel) + (this as ObservableSource).subscribe(channel) return channel } @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") private class SubscriptionChannel : - LinkedListChannel(null), Observer, MaybeObserver + LinkedListChannel(null), Observer, MaybeObserver { private val _subscription = atomic(null) @@ -60,12 +62,12 @@ private class SubscriptionChannel : _subscription.value = sub } - override fun onSuccess(t: T) { + override fun onSuccess(t: T & Any) { trySend(t) close(cause = null) } - override fun onNext(t: T) { + override fun onNext(t: T & Any) { trySend(t) // Safe to ignore return value here, expectedly racing with cancellation } @@ -80,7 +82,7 @@ private class SubscriptionChannel : /** @suppress */ @Deprecated(message = "Deprecated in the favour of Flow", level = DeprecationLevel.HIDDEN) // ERROR in 1.4.0, HIDDEN in 1.6.0 -public fun ObservableSource.openSubscription(): ReceiveChannel { +public fun ObservableSource.openSubscription(): ReceiveChannel { val channel = SubscriptionChannel() subscribe(channel) return channel @@ -88,7 +90,7 @@ public fun ObservableSource.openSubscription(): ReceiveChannel { /** @suppress */ @Deprecated(message = "Deprecated in the favour of Flow", level = DeprecationLevel.HIDDEN) // ERROR in 1.4.0, HIDDEN in 1.6.0 -public fun MaybeSource.openSubscription(): ReceiveChannel { +public fun MaybeSource.openSubscription(): ReceiveChannel { val channel = SubscriptionChannel() subscribe(channel) return channel diff --git a/reactive/kotlinx-coroutines-rx3/src/RxAwait.kt b/reactive/kotlinx-coroutines-rx3/src/RxAwait.kt index 754dd79484..84f3546bd4 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxAwait.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxAwait.kt @@ -41,9 +41,8 @@ public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this * function immediately resumes with [CancellationException] and disposes of its subscription. */ -@Suppress("UNCHECKED_CAST") -public suspend fun MaybeSource.awaitSingleOrNull(): T? = suspendCancellableCoroutine { cont -> - subscribe(object : MaybeObserver { +public suspend fun MaybeSource.awaitSingleOrNull(): T? = suspendCancellableCoroutine { cont -> + subscribe(object : MaybeObserver { override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) } override fun onComplete() { cont.resume(null) } override fun onSuccess(t: T) { cont.resume(t) } @@ -61,7 +60,7 @@ public suspend fun MaybeSource.awaitSingleOrNull(): T? = suspendCancellab * * @throws NoSuchElementException if no elements were produced by this [MaybeSource]. */ -public suspend fun MaybeSource.awaitSingle(): T = awaitSingleOrNull() ?: throw NoSuchElementException() +public suspend fun MaybeSource.awaitSingle(): T = awaitSingleOrNull() ?: throw NoSuchElementException() /** * Awaits for completion of the maybe without blocking a thread. @@ -84,7 +83,7 @@ public suspend fun MaybeSource.awaitSingle(): T = awaitSingleOrNull() ?: level = DeprecationLevel.ERROR, replaceWith = ReplaceWith("this.awaitSingleOrNull()") ) // Warning since 1.5, error in 1.6, hidden in 1.7 -public suspend fun MaybeSource.await(): T? = awaitSingleOrNull() +public suspend fun MaybeSource.await(): T? = awaitSingleOrNull() /** * Awaits for completion of the maybe without blocking a thread. @@ -107,7 +106,7 @@ public suspend fun MaybeSource.await(): T? = awaitSingleOrNull() level = DeprecationLevel.ERROR, replaceWith = ReplaceWith("this.awaitSingleOrNull() ?: default") ) // Warning since 1.5, error in 1.6, hidden in 1.7 -public suspend fun MaybeSource.awaitOrDefault(default: T): T = awaitSingleOrNull() ?: default +public suspend fun MaybeSource.awaitOrDefault(default: T): T = awaitSingleOrNull() ?: default // ------------------------ SingleSource ------------------------ @@ -119,10 +118,10 @@ public suspend fun MaybeSource.awaitOrDefault(default: T): T = awaitSingl * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this * function immediately disposes of its subscription and resumes with [CancellationException]. */ -public suspend fun SingleSource.await(): T = suspendCancellableCoroutine { cont -> - subscribe(object : SingleObserver { +public suspend fun SingleSource.await(): T = suspendCancellableCoroutine { cont -> + subscribe(object : SingleObserver { override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) } - override fun onSuccess(t: T) { cont.resume(t) } + override fun onSuccess(t: T & Any) { cont.resume(t) } override fun onError(error: Throwable) { cont.resumeWithException(error) } }) } @@ -139,7 +138,8 @@ public suspend fun SingleSource.await(): T = suspendCancellableCoroutine * * @throws NoSuchElementException if the observable does not emit any value */ -public suspend fun ObservableSource.awaitFirst(): T = awaitOne(Mode.FIRST) +@Suppress("UNCHECKED_CAST") +public suspend fun ObservableSource.awaitFirst(): T = awaitOne(Mode.FIRST) as T /** * Awaits the first value from the given [Observable], or returns the [default] value if none is emitted, without @@ -150,7 +150,9 @@ public suspend fun ObservableSource.awaitFirst(): T = awaitOne(Mode.FIRST * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this * function immediately disposes of its subscription and resumes with [CancellationException]. */ -public suspend fun ObservableSource.awaitFirstOrDefault(default: T): T = awaitOne(Mode.FIRST_OR_DEFAULT, default) +@Suppress("UNCHECKED_CAST") +public suspend fun ObservableSource.awaitFirstOrDefault(default: T): T = + awaitOne(Mode.FIRST_OR_DEFAULT, default) as T /** * Awaits the first value from the given [Observable], or returns `null` if none is emitted, without blocking the @@ -161,7 +163,7 @@ public suspend fun ObservableSource.awaitFirstOrDefault(default: T): T = * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this * function immediately disposes of its subscription and resumes with [CancellationException]. */ -public suspend fun ObservableSource.awaitFirstOrNull(): T? = awaitOne(Mode.FIRST_OR_DEFAULT) +public suspend fun ObservableSource.awaitFirstOrNull(): T? = awaitOne(Mode.FIRST_OR_DEFAULT) /** * Awaits the first value from the given [Observable], or calls [defaultValue] to get a value if none is emitted, @@ -172,7 +174,7 @@ public suspend fun ObservableSource.awaitFirstOrNull(): T? = awaitOne(Mod * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this * function immediately disposes of its subscription and resumes with [CancellationException]. */ -public suspend fun ObservableSource.awaitFirstOrElse(defaultValue: () -> T): T = +public suspend fun ObservableSource.awaitFirstOrElse(defaultValue: () -> T): T = awaitOne(Mode.FIRST_OR_DEFAULT) ?: defaultValue() /** @@ -185,7 +187,8 @@ public suspend fun ObservableSource.awaitFirstOrElse(defaultValue: () -> * * @throws NoSuchElementException if the observable does not emit any value */ -public suspend fun ObservableSource.awaitLast(): T = awaitOne(Mode.LAST) +@Suppress("UNCHECKED_CAST") +public suspend fun ObservableSource.awaitLast(): T = awaitOne(Mode.LAST) as T /** * Awaits the single value from the given observable without blocking the thread and returns the resulting value, or, @@ -198,14 +201,15 @@ public suspend fun ObservableSource.awaitLast(): T = awaitOne(Mode.LAST) * @throws NoSuchElementException if the observable does not emit any value * @throws IllegalArgumentException if the observable emits more than one value */ -public suspend fun ObservableSource.awaitSingle(): T = awaitOne(Mode.SINGLE) +@Suppress("UNCHECKED_CAST") +public suspend fun ObservableSource.awaitSingle(): T = awaitOne(Mode.SINGLE) as T // ------------------------ private ------------------------ internal fun CancellableContinuation<*>.disposeOnCancellation(d: Disposable) = invokeOnCancellation { d.dispose() } -private enum class Mode(val s: String) { +private enum class Mode(@JvmField val s: String) { FIRST("awaitFirst"), FIRST_OR_DEFAULT("awaitFirstOrDefault"), LAST("awaitLast"), @@ -213,11 +217,11 @@ private enum class Mode(val s: String) { override fun toString(): String = s } -private suspend fun ObservableSource.awaitOne( +private suspend fun ObservableSource.awaitOne( mode: Mode, default: T? = null -): T = suspendCancellableCoroutine { cont -> - subscribe(object : Observer { +): T? = suspendCancellableCoroutine { cont -> + subscribe(object : Observer { private lateinit var subscription: Disposable private var value: T? = null private var seenValue = false @@ -227,7 +231,7 @@ private suspend fun ObservableSource.awaitOne( cont.invokeOnCancellation { sub.dispose() } } - override fun onNext(t: T) { + override fun onNext(t: T & Any) { when (mode) { Mode.FIRST, Mode.FIRST_OR_DEFAULT -> { if (!seenValue) { From 34c08dd017e94c380a000ea0d26af2418a0ea2f5 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Tue, 20 Sep 2022 12:57:00 +0200 Subject: [PATCH 2/5] ~update nullability, fix some warnings --- reactive/kotlinx-coroutines-rx2/src/RxAwait.kt | 10 ++++------ reactive/kotlinx-coroutines-rx2/src/RxChannel.kt | 2 +- reactive/kotlinx-coroutines-rx2/src/RxObservable.kt | 4 ++-- reactive/kotlinx-coroutines-rx3/build.gradle | 2 +- reactive/kotlinx-coroutines-rx3/src/RxChannel.kt | 11 +++++------ reactive/kotlinx-coroutines-rx3/src/RxConvert.kt | 2 +- reactive/kotlinx-coroutines-rx3/src/RxMaybe.kt | 6 +++--- reactive/kotlinx-coroutines-rx3/src/RxObservable.kt | 4 ++-- reactive/kotlinx-coroutines-rx3/src/RxScheduler.kt | 1 - 9 files changed, 19 insertions(+), 23 deletions(-) diff --git a/reactive/kotlinx-coroutines-rx2/src/RxAwait.kt b/reactive/kotlinx-coroutines-rx2/src/RxAwait.kt index 1091849f3c..a891b3f5c2 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxAwait.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxAwait.kt @@ -49,9 +49,8 @@ public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this * function immediately resumes with [CancellationException] and disposes of its subscription. */ -@Suppress("UNCHECKED_CAST") public suspend fun MaybeSource.awaitSingleOrNull(): T? = suspendCancellableCoroutine { cont -> - subscribe(object : MaybeObserver { + subscribe(object : MaybeObserver { override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) } @@ -67,7 +66,7 @@ public suspend fun MaybeSource.awaitSingleOrNull(): T? = suspendCancellab override fun onError(error: Throwable) { cont.resumeWithException(error) } - } as MaybeObserver) + }) } /** @@ -136,9 +135,8 @@ public suspend fun MaybeSource.awaitOrDefault(default: T): T = awaitSingl * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this * function immediately disposes of its subscription and resumes with [CancellationException]. */ -@Suppress("UNCHECKED_CAST") public suspend fun SingleSource.await(): T = suspendCancellableCoroutine { cont -> - subscribe(object : SingleObserver { + subscribe(object : SingleObserver { override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) } @@ -150,7 +148,7 @@ public suspend fun SingleSource.await(): T = suspendCancellableCoroutine override fun onError(error: Throwable) { cont.resumeWithException(error) } - } as SingleObserver) + }) } // ------------------------ ObservableSource ------------------------ diff --git a/reactive/kotlinx-coroutines-rx2/src/RxChannel.kt b/reactive/kotlinx-coroutines-rx2/src/RxChannel.kt index 3ca9f89297..73577b2c32 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxChannel.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxChannel.kt @@ -48,7 +48,7 @@ internal fun ObservableSource.toChannel(): ReceiveChannel { @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") private class SubscriptionChannel : - LinkedListChannel(null), Observer, MaybeObserver + LinkedListChannel(null), Observer, MaybeObserver { private val _subscription = atomic(null) diff --git a/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt b/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt index 57727fbb81..ad8fac71d3 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt @@ -77,7 +77,7 @@ private class RxObservableCoroutine( processResFunc = RxObservableCoroutine<*>::processResultSelectSend as ProcessResultFunction ) - @Suppress("UNCHECKED_CAST", "UNUSED_PARAMETER") + @Suppress("UNUSED_PARAMETER") private fun registerSelectForSend(select: SelectInstance<*>, element: Any?) { // Try to acquire the mutex and complete in the registration phase. if (mutex.tryLock()) { @@ -113,7 +113,7 @@ private class RxObservableCoroutine( } } - public override suspend fun send(element: T) { + override suspend fun send(element: T) { mutex.lock() doLockedNext(element)?.let { throw it } } diff --git a/reactive/kotlinx-coroutines-rx3/build.gradle b/reactive/kotlinx-coroutines-rx3/build.gradle index 15ef66da18..7676b6e23b 100644 --- a/reactive/kotlinx-coroutines-rx3/build.gradle +++ b/reactive/kotlinx-coroutines-rx3/build.gradle @@ -23,7 +23,7 @@ compileKotlin { tasks.withType(DokkaTaskPartial.class) { dokkaSourceSets.configureEach { externalDocumentationLink { - url.set(new URL('http://reactivex.io/RxJava/3.x/javadoc/')) + url.set(new URL('https://reactivex.io/RxJava/3.x/javadoc/')) packageListUrl.set(projectDir.toPath().resolve("package.list").toUri().toURL()) } } diff --git a/reactive/kotlinx-coroutines-rx3/src/RxChannel.kt b/reactive/kotlinx-coroutines-rx3/src/RxChannel.kt index 21238d2491..824d8ae6d4 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxChannel.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxChannel.kt @@ -19,7 +19,7 @@ import kotlinx.coroutines.flow.* * [MaybeSource] doesn't have a corresponding [Flow] adapter, so it should be transformed to [Observable] first. */ @PublishedApi -internal fun MaybeSource.openSubscription(): ReceiveChannel { +internal fun MaybeSource.openSubscription(): ReceiveChannel { val channel = SubscriptionChannel() subscribe(channel) return channel @@ -33,7 +33,7 @@ internal fun MaybeSource.openSubscription(): ReceiveChannel { * [ObservableSource] doesn't have a corresponding [Flow] adapter, so it should be transformed to [Observable] first. */ @PublishedApi -internal fun ObservableSource.openSubscription(): ReceiveChannel { +internal fun ObservableSource.openSubscription(): ReceiveChannel { val channel = SubscriptionChannel() subscribe(channel) return channel @@ -45,7 +45,7 @@ internal fun ObservableSource.openSubscription(): ReceiveChannel { * If [action] throws an exception at some point or if the [MaybeSource] raises an error, the exception is rethrown from * [collect]. */ -public suspend inline fun MaybeSource.collect(action: (T) -> Unit): Unit = +public suspend inline fun MaybeSource.collect(action: (T) -> Unit): Unit = openSubscription().consumeEach(action) /** @@ -54,12 +54,11 @@ public suspend inline fun MaybeSource.collect(action: (T) -> Unit): Unit * If [action] throws an exception at some point, the subscription is cancelled, and the exception is rethrown from * [collect]. Also, if the [ObservableSource] signals an error, that error is rethrown from [collect]. */ -public suspend inline fun ObservableSource.collect(action: (T) -> Unit): Unit = - openSubscription().consumeEach(action) +public suspend inline fun ObservableSource.collect(action: (T) -> Unit): Unit = openSubscription().consumeEach(action) @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") private class SubscriptionChannel : - LinkedListChannel(null), Observer, MaybeObserver + LinkedListChannel(null), Observer, MaybeObserver { private val _subscription = atomic(null) diff --git a/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt index b4693a55e7..57d2dfb370 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt @@ -42,7 +42,7 @@ public fun Job.asCompletable(context: CoroutineContext): Completable = rxComplet * * @param context -- the coroutine context from which the resulting maybe is going to be signalled */ -public fun Deferred.asMaybe(context: CoroutineContext): Maybe = rxMaybe(context) { +public fun Deferred.asMaybe(context: CoroutineContext): Maybe = rxMaybe(context) { this@asMaybe.await() } diff --git a/reactive/kotlinx-coroutines-rx3/src/RxMaybe.kt b/reactive/kotlinx-coroutines-rx3/src/RxMaybe.kt index 12d0197bf2..2e50481732 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxMaybe.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxMaybe.kt @@ -20,7 +20,7 @@ import kotlin.coroutines.* public fun rxMaybe( context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T? -): Maybe { +): Maybe { require(context[Job] === null) { "Maybe context cannot contain job in it." + "Its lifecycle should be managed via Disposable handle. Had $context" } return rxMaybeInternal(GlobalScope, context, block) @@ -30,7 +30,7 @@ private fun rxMaybeInternal( scope: CoroutineScope, // support for legacy rxMaybe in scope context: CoroutineContext, block: suspend CoroutineScope.() -> T? -): Maybe = Maybe.create { subscriber -> +): Maybe = Maybe.create { subscriber -> val newContext = scope.newCoroutineContext(context) val coroutine = RxMaybeCoroutine(newContext, subscriber) subscriber.setCancellable(RxCancellable(coroutine)) @@ -39,7 +39,7 @@ private fun rxMaybeInternal( private class RxMaybeCoroutine( parentContext: CoroutineContext, - private val subscriber: MaybeEmitter + private val subscriber: MaybeEmitter ) : AbstractCoroutine(parentContext, false, true) { override fun onCompleted(value: T) { try { diff --git a/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt b/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt index 5db7585fca..8ea761c979 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt @@ -77,7 +77,7 @@ private class RxObservableCoroutine( processResFunc = RxObservableCoroutine<*>::processResultSelectSend as ProcessResultFunction ) - @Suppress("UNCHECKED_CAST", "UNUSED_PARAMETER") + @Suppress("UNUSED_PARAMETER") private fun registerSelectForSend(select: SelectInstance<*>, element: Any?) { // Try to acquire the mutex and complete in the registration phase. if (mutex.tryLock()) { @@ -113,7 +113,7 @@ private class RxObservableCoroutine( } } - public override suspend fun send(element: T) { + override suspend fun send(element: T) { mutex.lock() doLockedNext(element)?.let { throw it } } diff --git a/reactive/kotlinx-coroutines-rx3/src/RxScheduler.kt b/reactive/kotlinx-coroutines-rx3/src/RxScheduler.kt index abaf02450a..e7f93868b1 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxScheduler.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxScheduler.kt @@ -9,7 +9,6 @@ import io.reactivex.rxjava3.disposables.* import io.reactivex.rxjava3.plugins.* import kotlinx.atomicfu.* import kotlinx.coroutines.* -import kotlinx.coroutines.CancellationException import kotlinx.coroutines.channels.* import java.util.concurrent.* import kotlin.coroutines.* From 508ce3ac667e39bbb34e92ef58bae9ef7cd35690 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Tue, 20 Sep 2022 15:49:05 +0200 Subject: [PATCH 3/5] ~ --- reactive/kotlinx-coroutines-rx2/src/RxChannel.kt | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/reactive/kotlinx-coroutines-rx2/src/RxChannel.kt b/reactive/kotlinx-coroutines-rx2/src/RxChannel.kt index 73577b2c32..8db22799c0 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxChannel.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxChannel.kt @@ -31,18 +31,16 @@ public suspend inline fun ObservableSource.collect(action: (T) -> Unit): toChannel().consumeEach(action) @PublishedApi -@Suppress("UNCHECKED_CAST") internal fun MaybeSource.toChannel(): ReceiveChannel { val channel = SubscriptionChannel() - (this as MaybeSource).subscribe(channel) + subscribe(channel) return channel } @PublishedApi -@Suppress("UNCHECKED_CAST") internal fun ObservableSource.toChannel(): ReceiveChannel { val channel = SubscriptionChannel() - (this as ObservableSource).subscribe(channel) + subscribe(channel) return channel } From 6b6fbd2b157a39761a55d262062566d5abc3bf68 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Tue, 20 Sep 2022 15:54:24 +0200 Subject: [PATCH 4/5] ~ --- build.gradle | 1 - 1 file changed, 1 deletion(-) diff --git a/build.gradle b/build.gradle index 882d38e39e..744bf5cf7d 100644 --- a/build.gradle +++ b/build.gradle @@ -176,7 +176,6 @@ configure(subprojects.findAll { !sourceless.contains(it.name) }) { tasks.withType(AbstractKotlinCompile).all { kotlinOptions.freeCompilerArgs += OptInPreset.optInAnnotations.collect { "-Xopt-in=" + it } kotlinOptions.freeCompilerArgs += "-progressive" - kotlinOptions.freeCompilerArgs += "-XXLanguage:+ProhibitUsingNullableTypeParameterAgainstNotNullAnnotated" // Remove null assertions to get smaller bytecode on Android kotlinOptions.freeCompilerArgs += ["-Xno-param-assertions", "-Xno-receiver-assertions", "-Xno-call-assertions"] } From fa472f93200c3283b559566f6f024d5f3936b1dc Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Tue, 20 Sep 2022 18:24:37 +0200 Subject: [PATCH 5/5] ~make changes compatible with Kotlin 1.8.0 --- reactive/kotlinx-coroutines-rx3/src/RxAwait.kt | 2 +- reactive/kotlinx-coroutines-rx3/src/RxChannel.kt | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/reactive/kotlinx-coroutines-rx3/src/RxAwait.kt b/reactive/kotlinx-coroutines-rx3/src/RxAwait.kt index 84f3546bd4..1c77bbe4f9 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxAwait.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxAwait.kt @@ -45,7 +45,7 @@ public suspend fun MaybeSource.awaitSingleOrNull(): T? = suspendCan subscribe(object : MaybeObserver { override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) } override fun onComplete() { cont.resume(null) } - override fun onSuccess(t: T) { cont.resume(t) } + override fun onSuccess(t: T & Any) { cont.resume(t) } override fun onError(error: Throwable) { cont.resumeWithException(error) } }) } diff --git a/reactive/kotlinx-coroutines-rx3/src/RxChannel.kt b/reactive/kotlinx-coroutines-rx3/src/RxChannel.kt index 824d8ae6d4..5d7624a17a 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxChannel.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxChannel.kt @@ -72,12 +72,12 @@ private class SubscriptionChannel : _subscription.value = sub } - override fun onSuccess(t: T) { + override fun onSuccess(t: T & Any) { trySend(t) close(cause = null) } - override fun onNext(t: T) { + override fun onNext(t: T & Any) { trySend(t) // Safe to ignore return value here, expectedly racing with cancellation }