Skip to content

Commit 82c4006

Browse files
committed
Update code to prepare for nullness annotations in rxjava3 and Guava.
(In addition to keeping the code compiling in the future, this change should make some runtime nullness errors impossible.) rxjava3 nullness annotations don't yet trigger Kotlin compile errors, but that will be changing in Kotlin 1.7: https://github.com/JetBrains/kotlin/blob/05822c59b516b6d252bd6d27e9032e660e15b625/core/compiler.common.jvm/src/org/jetbrains/kotlin/load/java/JavaNullabilityAnnotationSettings.kt#L42-L46 We can preview the behavior by passing [email protected]:strict: https://kotlinlang.org/docs/java-interop.html#nullability-annotations We additionally set -Xtype-enhancement-improvements-strict-mode so that the Kotlin compiler looks at type-use annotations on type arguments, type parameters, etc.: https://kotlinlang.org/docs/java-interop.html#annotating-type-arguments-and-type-parameters Usually, the required update is to restrict a type parameter to non-nullable types, since most rxjava types do not support null type arguments. In a few cases, the update is to change an unnecessarily nullable type to be non-nullable. Finally, I removed a `value == null` check in `RxMaybeCoroutine.onCompleted` that the compiler now identifies as unnecessary. (I can keep it if you'd prefer.) Guava's current nullness annotations likewise don't yet trigger Kotlin compile errors. However, Guava did recently remove the misleading `@Nullable` annotation from the parameter of `FutureCallback.onSuccess`: Before: https://github.com/google/guava/blob/v28.0/guava/src/com/google/common/util/concurrent/FutureCallback.java#L34 After: https://github.com/google/guava/blob/v31.0.1/guava/src/com/google/common/util/concurrent/FutureCallback.java#L35 That means that a `FutureCallback<T>` can now implement `onSuccess(T)` instead of `onSuccess(T?)`. And with a future change to Guava, it will _have to_. When I updated this project from Guava 28.0 to Guava 31.0.1, I found that that pulled in a newer version of the Checker Framework nullness annotations. That version was build with a newer version of Gradle, so it generates Gradle module metadata: https://docs.gradle.org/current/userguide/publishing_gradle_module_metadata.html That metadata declares that the Checker Framework annotations require Java 1.8. Even the old version had in fact required Java 1.8 -- and so had even the old version of Guava -- just without having that encoded in its metadata. So I fixed this by setting targetCompatibility and sourceCompatibility to 1.8 for kotlinx-coroutines-guava, too. I should warn you that I have very little understanding of coroutines and of this library. I'm here because we're seeing compile errors inside Google as we work to improve how we handle Kotlin-Java interoperability, and these changes looked like they might be the right fixes. Sorry for any mistakes.
1 parent 5e870c1 commit 82c4006

File tree

7 files changed

+38
-31
lines changed

7 files changed

+38
-31
lines changed

build.gradle

+4
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,10 @@ configure(subprojects.findAll { !sourceless.contains(it.name) }) {
168168
kotlinOptions.freeCompilerArgs += "-XXLanguage:-ProhibitUsingNullableTypeParameterAgainstNotNullAnnotated"
169169
// Remove null assertions to get smaller bytecode on Android
170170
kotlinOptions.freeCompilerArgs += ["-Xno-param-assertions", "-Xno-receiver-assertions", "-Xno-call-assertions"]
171+
// Recognize rxjava3 nullness annotations even before that becomes the default (which will happen in 1.7): https://kotlinlang.org/docs/java-interop.html#nullability-annotations
172+
kotlinOptions.freeCompilerArgs += "[email protected]:strict"
173+
// Recognize nullness annotations on type arguments, etc.: https://kotlinlang.org/docs/java-interop.html#annotating-type-arguments-and-type-parameters
174+
kotlinOptions.freeCompilerArgs += "-Xtype-enhancement-improvements-strict-mode"
171175
}
172176
}
173177

integration/kotlinx-coroutines-guava/build.gradle.kts

+6-1
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,17 @@
22
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

5-
val guavaVersion = "28.0-jre"
5+
val guavaVersion = "31.0.1-jre"
66

77
dependencies {
88
compile("com.google.guava:guava:$guavaVersion")
99
}
1010

11+
java {
12+
targetCompatibility = JavaVersion.VERSION_1_8
13+
sourceCompatibility = JavaVersion.VERSION_1_8
14+
}
15+
1116
externalDocumentationLink(
1217
url = "https://google.github.io/guava/releases/$guavaVersion/api/docs/"
1318
)

integration/kotlinx-coroutines-guava/src/ListenableFuture.kt

+4-6
Original file line numberDiff line numberDiff line change
@@ -133,10 +133,8 @@ public fun <T> ListenableFuture<T>.asDeferred(): Deferred<T> {
133133
// Finally, if this isn't done yet, attach a Listener that will complete the Deferred.
134134
val deferred = CompletableDeferred<T>()
135135
Futures.addCallback(this, object : FutureCallback<T> {
136-
override fun onSuccess(result: T?) {
137-
// Here we work with flexible types, so we unchecked cast to trick the type system
138-
@Suppress("UNCHECKED_CAST")
139-
runCatching { deferred.complete(result as T) }
136+
override fun onSuccess(result: T) {
137+
runCatching { deferred.complete(result) }
140138
.onFailure { handleCoroutineException(EmptyCoroutineContext, it) }
141139
}
142140

@@ -348,7 +346,7 @@ private class JobListenableFuture<T>(private val jobToCancel: Job): ListenableFu
348346
*
349347
* To preserve Coroutine's [CancellationException], this future points to either `T` or [Cancelled].
350348
*/
351-
private val auxFuture = SettableFuture.create<Any>()
349+
private val auxFuture = SettableFuture.create<Any?>()
352350

353351
/**
354352
* `true` if [auxFuture.get][ListenableFuture.get] throws [ExecutionException].
@@ -433,7 +431,7 @@ private class JobListenableFuture<T>(private val jobToCancel: Job): ListenableFu
433431
}
434432

435433
/** See [get()]. */
436-
private fun getInternal(result: Any): T = if (result is Cancelled) {
434+
private fun getInternal(result: Any?): T = if (result is Cancelled) {
437435
throw CancellationException().initCause(result.exception)
438436
} else {
439437
// We know that `auxFuture` can contain either `T` or `Cancelled`.

reactive/kotlinx-coroutines-rx3/src/RxAwait.kt

+12-12
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine
4242
* function immediately resumes with [CancellationException] and disposes of its subscription.
4343
*/
4444
@Suppress("UNCHECKED_CAST")
45-
public suspend fun <T> MaybeSource<T>.awaitSingleOrNull(): T? = suspendCancellableCoroutine { cont ->
45+
public suspend fun <T : Any> MaybeSource<T>.awaitSingleOrNull(): T? = suspendCancellableCoroutine { cont ->
4646
subscribe(object : MaybeObserver<T> {
4747
override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
4848
override fun onComplete() { cont.resume(null) }
@@ -61,7 +61,7 @@ public suspend fun <T> MaybeSource<T>.awaitSingleOrNull(): T? = suspendCancellab
6161
*
6262
* @throws NoSuchElementException if no elements were produced by this [MaybeSource].
6363
*/
64-
public suspend fun <T> MaybeSource<T>.awaitSingle(): T = awaitSingleOrNull() ?: throw NoSuchElementException()
64+
public suspend fun <T : Any> MaybeSource<T>.awaitSingle(): T = awaitSingleOrNull() ?: throw NoSuchElementException()
6565

6666
/**
6767
* Awaits for completion of the maybe without blocking a thread.
@@ -84,7 +84,7 @@ public suspend fun <T> MaybeSource<T>.awaitSingle(): T = awaitSingleOrNull() ?:
8484
level = DeprecationLevel.WARNING,
8585
replaceWith = ReplaceWith("this.awaitSingleOrNull()")
8686
) // Warning since 1.5, error in 1.6, hidden in 1.7
87-
public suspend fun <T> MaybeSource<T>.await(): T? = awaitSingleOrNull()
87+
public suspend fun <T : Any> MaybeSource<T>.await(): T? = awaitSingleOrNull()
8888

8989
/**
9090
* Awaits for completion of the maybe without blocking a thread.
@@ -107,7 +107,7 @@ public suspend fun <T> MaybeSource<T>.await(): T? = awaitSingleOrNull()
107107
level = DeprecationLevel.WARNING,
108108
replaceWith = ReplaceWith("this.awaitSingleOrNull() ?: default")
109109
) // Warning since 1.5, error in 1.6, hidden in 1.7
110-
public suspend fun <T> MaybeSource<T>.awaitOrDefault(default: T): T = awaitSingleOrNull() ?: default
110+
public suspend fun <T : Any> MaybeSource<T>.awaitOrDefault(default: T): T = awaitSingleOrNull() ?: default
111111

112112
// ------------------------ SingleSource ------------------------
113113

@@ -119,7 +119,7 @@ public suspend fun <T> MaybeSource<T>.awaitOrDefault(default: T): T = awaitSingl
119119
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
120120
* function immediately disposes of its subscription and resumes with [CancellationException].
121121
*/
122-
public suspend fun <T> SingleSource<T>.await(): T = suspendCancellableCoroutine { cont ->
122+
public suspend fun <T : Any> SingleSource<T>.await(): T = suspendCancellableCoroutine { cont ->
123123
subscribe(object : SingleObserver<T> {
124124
override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
125125
override fun onSuccess(t: T) { cont.resume(t) }
@@ -139,7 +139,7 @@ public suspend fun <T> SingleSource<T>.await(): T = suspendCancellableCoroutine
139139
*
140140
* @throws NoSuchElementException if the observable does not emit any value
141141
*/
142-
public suspend fun <T> ObservableSource<T>.awaitFirst(): T = awaitOne(Mode.FIRST)
142+
public suspend fun <T : Any> ObservableSource<T>.awaitFirst(): T = awaitOne(Mode.FIRST)
143143

144144
/**
145145
* Awaits the first value from the given [Observable], or returns the [default] value if none is emitted, without
@@ -150,7 +150,7 @@ public suspend fun <T> ObservableSource<T>.awaitFirst(): T = awaitOne(Mode.FIRST
150150
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
151151
* function immediately disposes of its subscription and resumes with [CancellationException].
152152
*/
153-
public suspend fun <T> ObservableSource<T>.awaitFirstOrDefault(default: T): T = awaitOne(Mode.FIRST_OR_DEFAULT, default)
153+
public suspend fun <T : Any> ObservableSource<T>.awaitFirstOrDefault(default: T): T = awaitOne(Mode.FIRST_OR_DEFAULT, default)
154154

155155
/**
156156
* Awaits the first value from the given [Observable], or returns `null` if none is emitted, without blocking the
@@ -161,7 +161,7 @@ public suspend fun <T> ObservableSource<T>.awaitFirstOrDefault(default: T): T =
161161
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
162162
* function immediately disposes of its subscription and resumes with [CancellationException].
163163
*/
164-
public suspend fun <T> ObservableSource<T>.awaitFirstOrNull(): T? = awaitOne(Mode.FIRST_OR_DEFAULT)
164+
public suspend fun <T : Any> ObservableSource<T>.awaitFirstOrNull(): T? = awaitOne(Mode.FIRST_OR_DEFAULT)
165165

166166
/**
167167
* Awaits the first value from the given [Observable], or calls [defaultValue] to get a value if none is emitted,
@@ -172,7 +172,7 @@ public suspend fun <T> ObservableSource<T>.awaitFirstOrNull(): T? = awaitOne(Mod
172172
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
173173
* function immediately disposes of its subscription and resumes with [CancellationException].
174174
*/
175-
public suspend fun <T> ObservableSource<T>.awaitFirstOrElse(defaultValue: () -> T): T =
175+
public suspend fun <T : Any> ObservableSource<T>.awaitFirstOrElse(defaultValue: () -> T): T =
176176
awaitOne(Mode.FIRST_OR_DEFAULT) ?: defaultValue()
177177

178178
/**
@@ -185,7 +185,7 @@ public suspend fun <T> ObservableSource<T>.awaitFirstOrElse(defaultValue: () ->
185185
*
186186
* @throws NoSuchElementException if the observable does not emit any value
187187
*/
188-
public suspend fun <T> ObservableSource<T>.awaitLast(): T = awaitOne(Mode.LAST)
188+
public suspend fun <T : Any> ObservableSource<T>.awaitLast(): T = awaitOne(Mode.LAST)
189189

190190
/**
191191
* Awaits the single value from the given observable without blocking the thread and returns the resulting value, or,
@@ -198,7 +198,7 @@ public suspend fun <T> ObservableSource<T>.awaitLast(): T = awaitOne(Mode.LAST)
198198
* @throws NoSuchElementException if the observable does not emit any value
199199
* @throws IllegalArgumentException if the observable emits more than one value
200200
*/
201-
public suspend fun <T> ObservableSource<T>.awaitSingle(): T = awaitOne(Mode.SINGLE)
201+
public suspend fun <T : Any> ObservableSource<T>.awaitSingle(): T = awaitOne(Mode.SINGLE)
202202

203203
// ------------------------ private ------------------------
204204

@@ -213,7 +213,7 @@ private enum class Mode(val s: String) {
213213
override fun toString(): String = s
214214
}
215215

216-
private suspend fun <T> ObservableSource<T>.awaitOne(
216+
private suspend fun <T : Any> ObservableSource<T>.awaitOne(
217217
mode: Mode,
218218
default: T? = null
219219
): T = suspendCancellableCoroutine { cont ->

reactive/kotlinx-coroutines-rx3/src/RxChannel.kt

+5-5
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import kotlinx.coroutines.flow.*
1919
* [MaybeSource] doesn't have a corresponding [Flow] adapter, so it should be transformed to [Observable] first.
2020
*/
2121
@PublishedApi
22-
internal fun <T> MaybeSource<T>.openSubscription(): ReceiveChannel<T> {
22+
internal fun <T : Any> MaybeSource<T>.openSubscription(): ReceiveChannel<T> {
2323
val channel = SubscriptionChannel<T>()
2424
subscribe(channel)
2525
return channel
@@ -33,7 +33,7 @@ internal fun <T> MaybeSource<T>.openSubscription(): ReceiveChannel<T> {
3333
* [ObservableSource] doesn't have a corresponding [Flow] adapter, so it should be transformed to [Observable] first.
3434
*/
3535
@PublishedApi
36-
internal fun <T> ObservableSource<T>.openSubscription(): ReceiveChannel<T> {
36+
internal fun <T : Any> ObservableSource<T>.openSubscription(): ReceiveChannel<T> {
3737
val channel = SubscriptionChannel<T>()
3838
subscribe(channel)
3939
return channel
@@ -45,7 +45,7 @@ internal fun <T> ObservableSource<T>.openSubscription(): ReceiveChannel<T> {
4545
* If [action] throws an exception at some point or if the [MaybeSource] raises an error, the exception is rethrown from
4646
* [collect].
4747
*/
48-
public suspend inline fun <T> MaybeSource<T>.collect(action: (T) -> Unit): Unit =
48+
public suspend inline fun <T : Any> MaybeSource<T>.collect(action: (T) -> Unit): Unit =
4949
openSubscription().consumeEach(action)
5050

5151
/**
@@ -54,11 +54,11 @@ public suspend inline fun <T> MaybeSource<T>.collect(action: (T) -> Unit): Unit
5454
* If [action] throws an exception at some point, the subscription is cancelled, and the exception is rethrown from
5555
* [collect]. Also, if the [ObservableSource] signals an error, that error is rethrown from [collect].
5656
*/
57-
public suspend inline fun <T> ObservableSource<T>.collect(action: (T) -> Unit): Unit =
57+
public suspend inline fun <T : Any> ObservableSource<T>.collect(action: (T) -> Unit): Unit =
5858
openSubscription().consumeEach(action)
5959

6060
@Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
61-
private class SubscriptionChannel<T> :
61+
private class SubscriptionChannel<T : Any> :
6262
LinkedListChannel<T>(null), Observer<T>, MaybeObserver<T>
6363
{
6464
private val _subscription = atomic<Disposable?>(null)

reactive/kotlinx-coroutines-rx3/src/RxConvert.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public fun Job.asCompletable(context: CoroutineContext): Completable = rxComplet
4242
*
4343
* @param context -- the coroutine context from which the resulting maybe is going to be signalled
4444
*/
45-
public fun <T> Deferred<T?>.asMaybe(context: CoroutineContext): Maybe<T> = rxMaybe(context) {
45+
public fun <T : Any> Deferred<T>.asMaybe(context: CoroutineContext): Maybe<T> = rxMaybe(context) {
4646
this@asMaybe.await()
4747
}
4848

reactive/kotlinx-coroutines-rx3/src/RxMaybe.kt

+6-6
Original file line numberDiff line numberDiff line change
@@ -17,33 +17,33 @@ import kotlin.coroutines.*
1717
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
1818
* Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance.
1919
*/
20-
public fun <T> rxMaybe(
20+
public fun <T : Any> rxMaybe(
2121
context: CoroutineContext = EmptyCoroutineContext,
22-
block: suspend CoroutineScope.() -> T?
22+
block: suspend CoroutineScope.() -> T
2323
): Maybe<T> {
2424
require(context[Job] === null) { "Maybe context cannot contain job in it." +
2525
"Its lifecycle should be managed via Disposable handle. Had $context" }
2626
return rxMaybeInternal(GlobalScope, context, block)
2727
}
2828

29-
private fun <T> rxMaybeInternal(
29+
private fun <T : Any> rxMaybeInternal(
3030
scope: CoroutineScope, // support for legacy rxMaybe in scope
3131
context: CoroutineContext,
32-
block: suspend CoroutineScope.() -> T?
32+
block: suspend CoroutineScope.() -> T
3333
): Maybe<T> = Maybe.create { subscriber ->
3434
val newContext = scope.newCoroutineContext(context)
3535
val coroutine = RxMaybeCoroutine(newContext, subscriber)
3636
subscriber.setCancellable(RxCancellable(coroutine))
3737
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
3838
}
3939

40-
private class RxMaybeCoroutine<T>(
40+
private class RxMaybeCoroutine<T : Any>(
4141
parentContext: CoroutineContext,
4242
private val subscriber: MaybeEmitter<T>
4343
) : AbstractCoroutine<T>(parentContext, false, true) {
4444
override fun onCompleted(value: T) {
4545
try {
46-
if (value == null) subscriber.onComplete() else subscriber.onSuccess(value)
46+
subscriber.onSuccess(value)
4747
} catch (e: Throwable) {
4848
handleUndeliverableException(e, context)
4949
}

0 commit comments

Comments
 (0)