Skip to content

Deprecate awaitSingleOr*, specialize some await* functions for Mono and Maybe #2628

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Apr 21, 2021
72 changes: 38 additions & 34 deletions reactive/kotlinx-coroutines-jdk9/src/Await.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,78 +4,82 @@

package kotlinx.coroutines.jdk9

import kotlinx.coroutines.*
import java.util.concurrent.*
import org.reactivestreams.FlowAdapters
import kotlinx.coroutines.reactive.*

/**
* Awaits for the first value from the given publisher without blocking a thread and
* returns the resulting value or throws the corresponding exception if this publisher had produced error.
* Awaits the first value from the given publisher without blocking the thread and returns the resulting value, or, if
* the publisher has produced an error, throws the corresponding exception.
*
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
* immediately resumes with [CancellationException].
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
* function immediately cancels its [Flow.Subscription] and resumes with [CancellationException].
*
* @throws NoSuchElementException if publisher does not emit any value
* @throws NoSuchElementException if the publisher does not emit any value
*/
public suspend fun <T> Flow.Publisher<T>.awaitFirst(): T = FlowAdapters.toPublisher(this).awaitFirst()
public suspend fun <T> Flow.Publisher<T>.awaitFirst(): T =
FlowAdapters.toPublisher(this).awaitFirst()

/**
* Awaits for the first value from the given observable or the [default] value if none is emitted without blocking a
* thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
* Awaits the first value from the given publisher, or returns the [default] value if none is emitted, without blocking
* the thread, and returns the resulting value, or, if this publisher has produced an error, throws the corresponding
* exception.
*
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
* immediately resumes with [CancellationException].
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
* function immediately cancels its [Flow.Subscription] and resumes with [CancellationException].
*/
public suspend fun <T> Flow.Publisher<T>.awaitFirstOrDefault(default: T): T =
FlowAdapters.toPublisher(this).awaitFirstOrDefault(default)
FlowAdapters.toPublisher(this).awaitFirstOrDefault(default)

/**
* Awaits for the first value from the given observable or `null` value if none is emitted without blocking a
* thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
* Awaits the first value from the given publisher, or returns `null` if none is emitted, without blocking the thread,
* and returns the resulting value, or, if this publisher has produced an error, throws the corresponding exception.
*
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
* immediately resumes with [CancellationException].
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
* function immediately cancels its [Flow.Subscription] and resumes with [CancellationException].
*/
public suspend fun <T> Flow.Publisher<T>.awaitFirstOrNull(): T? =
FlowAdapters.toPublisher(this).awaitFirstOrNull()
FlowAdapters.toPublisher(this).awaitFirstOrNull()

/**
* Awaits for the first value from the given observable or call [defaultValue] to get a value if none is emitted without blocking a
* thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
* Awaits the first value from the given publisher, or calls [defaultValue] to get a value if none is emitted, without
* blocking the thread, and returns the resulting value, or, if this publisher has produced an error, throws the
* corresponding exception.
*
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
* immediately resumes with [CancellationException].
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
* function immediately cancels its [Flow.Subscription] and resumes with [CancellationException].
*/
public suspend fun <T> Flow.Publisher<T>.awaitFirstOrElse(defaultValue: () -> T): T =
FlowAdapters.toPublisher(this).awaitFirstOrElse(defaultValue)
FlowAdapters.toPublisher(this).awaitFirstOrElse(defaultValue)

/**
* Awaits for the last value from the given publisher without blocking a thread and
* returns the resulting value or throws the corresponding exception if this publisher had produced error.
* Awaits the last value from the given publisher without blocking the thread and
* returns the resulting value, or, if this publisher has produced an error, throws the corresponding exception.
*
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
* immediately resumes with [CancellationException].
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
* function immediately cancels its [Flow.Subscription] and resumes with [CancellationException].
*
* @throws NoSuchElementException if publisher does not emit any value
* @throws NoSuchElementException if the publisher does not emit any value
*/
public suspend fun <T> Flow.Publisher<T>.awaitLast(): T =
FlowAdapters.toPublisher(this).awaitLast()
FlowAdapters.toPublisher(this).awaitLast()

/**
* Awaits for the single value from the given publisher without blocking a thread and
* returns the resulting value or throws the corresponding exception if this publisher had produced error.
* Awaits the single value from the given publisher without blocking the thread and returns the resulting value, or,
* if this publisher has produced an error, throws the corresponding exception.
*
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
* immediately resumes with [CancellationException].
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
* function immediately cancels its [Flow.Subscription] and resumes with [CancellationException].
*
* @throws NoSuchElementException if publisher does not emit any value
* @throws IllegalArgumentException if publisher emits more than one value
* @throws NoSuchElementException if the publisher does not emit any value
* @throws IllegalArgumentException if the publisher emits more than one value
*/
public suspend fun <T> Flow.Publisher<T>.awaitSingle(): T =
FlowAdapters.toPublisher(this).awaitSingle()
FlowAdapters.toPublisher(this).awaitSingle()
43 changes: 43 additions & 0 deletions reactive/kotlinx-coroutines-jdk9/test/AwaitTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.jdk9

import kotlinx.coroutines.*
import org.junit.*
import java.util.concurrent.Flow as JFlow

class AwaitTest: TestBase() {

/** Tests that calls to [awaitFirst] (and, thus, to the rest of these functions) throw [CancellationException] and
* unsubscribe from the publisher when their [Job] is cancelled. */
@Test
fun testAwaitCancellation() = runTest {
expect(1)
val publisher = JFlow.Publisher<Int> { s ->
s.onSubscribe(object : JFlow.Subscription {
override fun request(n: Long) {
expect(3)
}

override fun cancel() {
expect(5)
}
})
}
val job = launch(start = CoroutineStart.UNDISPATCHED) {
try {
expect(2)
publisher.awaitFirst()
} catch (e: CancellationException) {
expect(6)
throw e
}
}
expect(4)
job.cancelAndJoin()
finish(7)
}

}
Loading