Skip to content

Commit 416fc5e

Browse files
committed
Leverage definitely non-nullable types
1 parent 09d1975 commit 416fc5e

File tree

4 files changed

+31
-22
lines changed

4 files changed

+31
-22
lines changed

build.gradle

-1
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,6 @@ configure(subprojects.findAll { !sourceless.contains(it.name) }) {
170170
kotlinOptions.freeCompilerArgs += OptInPresetKt.optInAnnotations.collect { "-Xopt-in=" + it }
171171
kotlinOptions.freeCompilerArgs += "-progressive"
172172
// Disable KT-36770 for RxJava2 integration
173-
kotlinOptions.freeCompilerArgs += "-XXLanguage:-ProhibitUsingNullableTypeParameterAgainstNotNullAnnotated"
174173
// Remove null assertions to get smaller bytecode on Android
175174
kotlinOptions.freeCompilerArgs += ["-Xno-param-assertions", "-Xno-receiver-assertions", "-Xno-call-assertions"]
176175
}

reactive/kotlinx-coroutines-rx2/build.gradle

+13
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,19 @@ tasks.withType(DokkaTaskPartial.class) {
1818
}
1919
}
2020
}
21+
tasks {
22+
compileKotlin {
23+
kotlinOptions {
24+
languageVersion = "1.7"
25+
}
26+
}
27+
28+
compileTestKotlin {
29+
kotlinOptions {
30+
languageVersion = "1.7"
31+
}
32+
}
33+
}
2134

2235
task testNG(type: Test) {
2336
useTestNG()

reactive/kotlinx-coroutines-rx2/src/RxAwait.kt

+11-14
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,8 @@
55
package kotlinx.coroutines.rx2
66

77
import io.reactivex.*
8-
import io.reactivex.disposables.Disposable
9-
import kotlinx.coroutines.CancellableContinuation
10-
import kotlinx.coroutines.CancellationException
11-
import kotlinx.coroutines.Job
12-
import kotlinx.coroutines.suspendCancellableCoroutine
8+
import io.reactivex.disposables.*
9+
import kotlinx.coroutines.*
1310
import kotlin.coroutines.*
1411

1512
// ------------------------ CompletableSource ------------------------
@@ -43,12 +40,12 @@ public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine
4340
*/
4441
@Suppress("UNCHECKED_CAST")
4542
public suspend fun <T> MaybeSource<T>.awaitSingleOrNull(): T? = suspendCancellableCoroutine { cont ->
46-
subscribe(object : MaybeObserver<T> {
43+
subscribe(object : MaybeObserver<T & Any> {
4744
override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
4845
override fun onComplete() { cont.resume(null) }
49-
override fun onSuccess(t: T) { cont.resume(t) }
46+
override fun onSuccess(t: T & Any) { cont.resume(t) }
5047
override fun onError(error: Throwable) { cont.resumeWithException(error) }
51-
})
48+
} as MaybeObserver<T>)
5249
}
5350

5451
/**
@@ -118,11 +115,11 @@ public suspend fun <T> MaybeSource<T>.awaitOrDefault(default: T): T = awaitSingl
118115
* function immediately disposes of its subscription and resumes with [CancellationException].
119116
*/
120117
public suspend fun <T> SingleSource<T>.await(): T = suspendCancellableCoroutine { cont ->
121-
subscribe(object : SingleObserver<T> {
118+
subscribe(object : SingleObserver<T & Any> {
122119
override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
123-
override fun onSuccess(t: T) { cont.resume(t) }
120+
override fun onSuccess(t: T & Any) { cont.resume(t) }
124121
override fun onError(error: Throwable) { cont.resumeWithException(error) }
125-
})
122+
} as SingleObserver<T>)
126123
}
127124

128125
// ------------------------ ObservableSource ------------------------
@@ -215,7 +212,7 @@ private suspend fun <T> ObservableSource<T>.awaitOne(
215212
mode: Mode,
216213
default: T? = null
217214
): T = suspendCancellableCoroutine { cont ->
218-
subscribe(object : Observer<T> {
215+
subscribe(object : Observer<T & Any> {
219216
private lateinit var subscription: Disposable
220217
private var value: T? = null
221218
private var seenValue = false
@@ -225,7 +222,7 @@ private suspend fun <T> ObservableSource<T>.awaitOne(
225222
cont.invokeOnCancellation { sub.dispose() }
226223
}
227224

228-
override fun onNext(t: T) {
225+
override fun onNext(t: T & Any) {
229226
when (mode) {
230227
Mode.FIRST, Mode.FIRST_OR_DEFAULT -> {
231228
if (!seenValue) {
@@ -266,6 +263,6 @@ private suspend fun <T> ObservableSource<T>.awaitOne(
266263
override fun onError(e: Throwable) {
267264
cont.resumeWithException(e)
268265
}
269-
})
266+
} as Observer<T>)
270267
}
271268

reactive/kotlinx-coroutines-rx2/src/RxChannel.kt

+7-7
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import kotlinx.coroutines.reactive.*
2323
@Deprecated(message = "Deprecated in the favour of Flow", level = DeprecationLevel.ERROR) // Will be hidden in 1.5
2424
public fun <T> MaybeSource<T>.openSubscription(): ReceiveChannel<T> {
2525
val channel = SubscriptionChannel<T>()
26-
subscribe(channel)
26+
subscribe(channel as MaybeObserver<T>)
2727
return channel
2828
}
2929

@@ -38,7 +38,7 @@ public fun <T> MaybeSource<T>.openSubscription(): ReceiveChannel<T> {
3838
@Deprecated(message = "Deprecated in the favour of Flow", level = DeprecationLevel.ERROR) // Will be hidden in 1.5
3939
public fun <T> ObservableSource<T>.openSubscription(): ReceiveChannel<T> {
4040
val channel = SubscriptionChannel<T>()
41-
subscribe(channel)
41+
subscribe(channel as Observer<T>)
4242
return channel
4343
}
4444

@@ -63,20 +63,20 @@ public suspend inline fun <T> ObservableSource<T>.collect(action: (T) -> Unit):
6363
@PublishedApi
6464
internal fun <T> MaybeSource<T>.toChannel(): ReceiveChannel<T> {
6565
val channel = SubscriptionChannel<T>()
66-
subscribe(channel)
66+
subscribe(channel as MaybeObserver<T>)
6767
return channel
6868
}
6969

7070
@PublishedApi
7171
internal fun <T> ObservableSource<T>.toChannel(): ReceiveChannel<T> {
7272
val channel = SubscriptionChannel<T>()
73-
subscribe(channel)
73+
subscribe(channel as Observer<T>)
7474
return channel
7575
}
7676

7777
@Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
7878
private class SubscriptionChannel<T> :
79-
LinkedListChannel<T>(null), Observer<T>, MaybeObserver<T>
79+
LinkedListChannel<T>(null), Observer<T & Any>, MaybeObserver<T & Any>
8080
{
8181
private val _subscription = atomic<Disposable?>(null)
8282

@@ -90,12 +90,12 @@ private class SubscriptionChannel<T> :
9090
_subscription.value = sub
9191
}
9292

93-
override fun onSuccess(t: T) {
93+
override fun onSuccess(t: T & Any) {
9494
trySend(t)
9595
close(cause = null)
9696
}
9797

98-
override fun onNext(t: T) {
98+
override fun onNext(t: T & Any) {
9999
trySend(t) // Safe to ignore return value here, expectedly racing with cancellation
100100
}
101101

0 commit comments

Comments
 (0)