Skip to content

Commit 187cb25

Browse files
committed
Update to newer Kotlin, update RxJava3 module
1 parent 416fc5e commit 187cb25

File tree

5 files changed

+22
-29
lines changed

5 files changed

+22
-29
lines changed

build.gradle

+2-1
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,8 @@ configure(subprojects.findAll { !sourceless.contains(it.name) }) {
169169
tasks.withType(org.jetbrains.kotlin.gradle.tasks.AbstractKotlinCompile).all {
170170
kotlinOptions.freeCompilerArgs += OptInPresetKt.optInAnnotations.collect { "-Xopt-in=" + it }
171171
kotlinOptions.freeCompilerArgs += "-progressive"
172-
// Disable KT-36770 for RxJava2 integration
172+
// Internal implementation details of our Rx integration
173+
kotlinOptions.freeCompilerArgs += "-XXLanguage:+DefinitelyNonNullableTypes"
173174
// Remove null assertions to get smaller bytecode on Android
174175
kotlinOptions.freeCompilerArgs += ["-Xno-param-assertions", "-Xno-receiver-assertions", "-Xno-call-assertions"]
175176
}

reactive/kotlinx-coroutines-rx2/build.gradle

-13
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,6 @@ tasks.withType(DokkaTaskPartial.class) {
1818
}
1919
}
2020
}
21-
tasks {
22-
compileKotlin {
23-
kotlinOptions {
24-
languageVersion = "1.7"
25-
}
26-
}
27-
28-
compileTestKotlin {
29-
kotlinOptions {
30-
languageVersion = "1.7"
31-
}
32-
}
33-
}
3421

3522
task testNG(type: Test) {
3623
useTestNG()

reactive/kotlinx-coroutines-rx2/src/RxAwait.kt

+2
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ public suspend fun <T> MaybeSource<T>.awaitOrDefault(default: T): T = awaitSingl
115115
* function immediately disposes of its subscription and resumes with [CancellationException].
116116
*/
117117
public suspend fun <T> SingleSource<T>.await(): T = suspendCancellableCoroutine { cont ->
118+
@Suppress("UNCHECKED_CAST")
118119
subscribe(object : SingleObserver<T & Any> {
119120
override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
120121
override fun onSuccess(t: T & Any) { cont.resume(t) }
@@ -212,6 +213,7 @@ private suspend fun <T> ObservableSource<T>.awaitOne(
212213
mode: Mode,
213214
default: T? = null
214215
): T = suspendCancellableCoroutine { cont ->
216+
@Suppress("UNCHECKED_CAST")
215217
subscribe(object : Observer<T & Any> {
216218
private lateinit var subscription: Disposable
217219
private var value: T? = null

reactive/kotlinx-coroutines-rx3/src/RxAwait.kt

+11-10
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,12 @@ public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine
4343
*/
4444
@Suppress("UNCHECKED_CAST")
4545
public suspend fun <T> MaybeSource<T>.awaitSingleOrNull(): T? = suspendCancellableCoroutine { cont ->
46-
subscribe(object : MaybeObserver<T> {
46+
subscribe(object : MaybeObserver<T & Any> {
4747
override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
4848
override fun onComplete() { cont.resume(null) }
49-
override fun onSuccess(t: T) { cont.resume(t) }
49+
override fun onSuccess(t: T & Any) { cont.resume(t) }
5050
override fun onError(error: Throwable) { cont.resumeWithException(error) }
51-
})
51+
} as MaybeObserver<T>)
5252
}
5353

5454
/**
@@ -120,11 +120,12 @@ public suspend fun <T> MaybeSource<T>.awaitOrDefault(default: T): T = awaitSingl
120120
* function immediately disposes of its subscription and resumes with [CancellationException].
121121
*/
122122
public suspend fun <T> SingleSource<T>.await(): T = suspendCancellableCoroutine { cont ->
123-
subscribe(object : SingleObserver<T> {
123+
@Suppress("UNCHECKED_CAST")
124+
subscribe(object : SingleObserver<T & Any> {
124125
override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
125-
override fun onSuccess(t: T) { cont.resume(t) }
126+
override fun onSuccess(t: T & Any) { cont.resume(t) }
126127
override fun onError(error: Throwable) { cont.resumeWithException(error) }
127-
})
128+
} as SingleObserver<T>)
128129
}
129130

130131
// ------------------------ ObservableSource ------------------------
@@ -217,7 +218,8 @@ private suspend fun <T> ObservableSource<T>.awaitOne(
217218
mode: Mode,
218219
default: T? = null
219220
): T = suspendCancellableCoroutine { cont ->
220-
subscribe(object : Observer<T> {
221+
@Suppress("UNCHECKED_CAST")
222+
subscribe(object : Observer<T & Any> {
221223
private lateinit var subscription: Disposable
222224
private var value: T? = null
223225
private var seenValue = false
@@ -227,7 +229,7 @@ private suspend fun <T> ObservableSource<T>.awaitOne(
227229
cont.invokeOnCancellation { sub.dispose() }
228230
}
229231

230-
override fun onNext(t: T) {
232+
override fun onNext(t: T & Any) {
231233
when (mode) {
232234
Mode.FIRST, Mode.FIRST_OR_DEFAULT -> {
233235
if (!seenValue) {
@@ -249,7 +251,6 @@ private suspend fun <T> ObservableSource<T>.awaitOne(
249251
}
250252
}
251253

252-
@Suppress("UNCHECKED_CAST")
253254
override fun onComplete() {
254255
if (seenValue) {
255256
if (cont.isActive) cont.resume(value as T)
@@ -268,6 +269,6 @@ private suspend fun <T> ObservableSource<T>.awaitOne(
268269
override fun onError(e: Throwable) {
269270
cont.resumeWithException(e)
270271
}
271-
})
272+
} as Observer<T>)
272273
}
273274

reactive/kotlinx-coroutines-rx3/src/RxChannel.kt

+7-5
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ import kotlinx.coroutines.flow.*
2121
@PublishedApi
2222
internal fun <T> MaybeSource<T>.openSubscription(): ReceiveChannel<T> {
2323
val channel = SubscriptionChannel<T>()
24-
subscribe(channel)
24+
@Suppress("UNCHECKED_CAST")
25+
subscribe(channel as MaybeObserver<T>)
2526
return channel
2627
}
2728

@@ -35,7 +36,8 @@ internal fun <T> MaybeSource<T>.openSubscription(): ReceiveChannel<T> {
3536
@PublishedApi
3637
internal fun <T> ObservableSource<T>.openSubscription(): ReceiveChannel<T> {
3738
val channel = SubscriptionChannel<T>()
38-
subscribe(channel)
39+
@Suppress("UNCHECKED_CAST")
40+
subscribe(channel as Observer<T>)
3941
return channel
4042
}
4143

@@ -59,7 +61,7 @@ public suspend inline fun <T> ObservableSource<T>.collect(action: (T) -> Unit):
5961

6062
@Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
6163
private class SubscriptionChannel<T> :
62-
LinkedListChannel<T>(null), Observer<T>, MaybeObserver<T>
64+
LinkedListChannel<T>(null), Observer<T & Any>, MaybeObserver<T & Any>
6365
{
6466
private val _subscription = atomic<Disposable?>(null)
6567

@@ -73,12 +75,12 @@ private class SubscriptionChannel<T> :
7375
_subscription.value = sub
7476
}
7577

76-
override fun onSuccess(t: T) {
78+
override fun onSuccess(t: T & Any) {
7779
trySend(t)
7880
close(cause = null)
7981
}
8082

81-
override fun onNext(t: T) {
83+
override fun onNext(t: T & Any) {
8284
trySend(t) // Safe to ignore return value here, expectedly racing with cancellation
8385
}
8486

0 commit comments

Comments
 (0)