Skip to content

Commit b6651f1

Browse files
committed
Improve await* in RxJava
1 parent d170614 commit b6651f1

File tree

10 files changed

+409
-124
lines changed

10 files changed

+409
-124
lines changed

reactive/kotlinx-coroutines-rx2/src/RxAwait.kt

+84-46
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,12 @@ import kotlin.coroutines.*
1515
// ------------------------ CompletableSource ------------------------
1616

1717
/**
18-
* Awaits for completion of this completable without blocking a thread.
19-
* Returns `Unit` or throws the corresponding exception if this completable had produced error.
18+
* Awaits for completion of this completable without blocking the thread.
19+
* Returns `Unit`, or throws the corresponding exception if this completable produces an error.
2020
*
2121
* This suspending function is cancellable. If the [Job] of the invoking coroutine is cancelled or completed while this
22-
* suspending function is suspended, this function immediately resumes with [CancellationException].
22+
* suspending function is suspended, this function immediately resumes with [CancellationException] and disposes of its
23+
* subscription.
2324
*/
2425
public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine { cont ->
2526
subscribe(object : CompletableObserver {
@@ -31,6 +32,37 @@ public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine
3132

3233
// ------------------------ MaybeSource ------------------------
3334

35+
/**
36+
* Awaits for completion of the [MaybeSource] without blocking the thread.
37+
* Returns the resulting value, or `null` if no value is produced, or throws the corresponding exception if this
38+
* [MaybeSource] produces an error.
39+
*
40+
* This suspending function is cancellable.
41+
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this
42+
* function immediately resumes with [CancellationException] and disposes of its subscription.
43+
*/
44+
@Suppress("UNCHECKED_CAST")
45+
public suspend fun <T> MaybeSource<T>.awaitSingleOrNull(): T? = suspendCancellableCoroutine { cont ->
46+
subscribe(object : MaybeObserver<T> {
47+
override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
48+
override fun onComplete() { cont.resume(null) }
49+
override fun onSuccess(t: T) { cont.resume(t) }
50+
override fun onError(error: Throwable) { cont.resumeWithException(error) }
51+
})
52+
}
53+
54+
/**
55+
* Awaits for completion of the [MaybeSource] without blocking the thread.
56+
* Returns the resulting value, or throws if either no value is produced or this [MaybeSource] produces an error.
57+
*
58+
* This suspending function is cancellable.
59+
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this
60+
* function immediately resumes with [CancellationException] and disposes of its subscription.
61+
*
62+
* @throws NoSuchElementException if no elements were produced by this [MaybeSource].
63+
*/
64+
public suspend fun <T> MaybeSource<T>.awaitSingle(): T = awaitSingleOrNull() ?: throw NoSuchElementException()
65+
3466
/**
3567
* Awaits for completion of the maybe without blocking a thread.
3668
* Returns the resulting value, null if no value was produced or throws the corresponding exception if this
@@ -40,8 +72,12 @@ public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine
4072
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
4173
* immediately resumes with [CancellationException].
4274
*/
43-
@Suppress("UNCHECKED_CAST")
44-
public suspend fun <T> MaybeSource<T>.await(): T? = (this as MaybeSource<T?>).awaitOrDefault(null)
75+
@Deprecated(
76+
message = "Deprecated in favor of awaitSingleOrNull()",
77+
level = DeprecationLevel.WARNING,
78+
replaceWith = ReplaceWith("this.awaitSingleOrNull()")
79+
)
80+
public suspend fun <T> MaybeSource<T>.await(): T? = awaitSingleOrNull()
4581

4682
/**
4783
* Awaits for completion of the maybe without blocking a thread.
@@ -52,24 +88,22 @@ public suspend fun <T> MaybeSource<T>.await(): T? = (this as MaybeSource<T?>).aw
5288
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
5389
* immediately resumes with [CancellationException].
5490
*/
55-
public suspend fun <T> MaybeSource<T>.awaitOrDefault(default: T): T = suspendCancellableCoroutine { cont ->
56-
subscribe(object : MaybeObserver<T> {
57-
override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
58-
override fun onComplete() { cont.resume(default) }
59-
override fun onSuccess(t: T) { cont.resume(t) }
60-
override fun onError(error: Throwable) { cont.resumeWithException(error) }
61-
})
62-
}
91+
@Deprecated(
92+
message = "Deprecated in favor of awaitSingleOrNull()",
93+
level = DeprecationLevel.WARNING,
94+
replaceWith = ReplaceWith("this.awaitSingleOrNull() ?: default")
95+
)
96+
public suspend fun <T> MaybeSource<T>.awaitOrDefault(default: T): T = awaitSingleOrNull() ?: default
6397

6498
// ------------------------ SingleSource ------------------------
6599

66100
/**
67-
* Awaits for completion of the single value without blocking a thread.
68-
* Returns the resulting value or throws the corresponding exception if this single had produced error.
101+
* Awaits for completion of the single value response without blocking the thread.
102+
* Returns the resulting value, or throws the corresponding exception if this response produces an error.
69103
*
70104
* This suspending function is cancellable.
71-
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
72-
* immediately resumes with [CancellationException].
105+
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
106+
* function immediately disposes of its subscription and resumes with [CancellationException].
73107
*/
74108
public suspend fun <T> SingleSource<T>.await(): T = suspendCancellableCoroutine { cont ->
75109
subscribe(object : SingleObserver<T> {
@@ -82,69 +116,73 @@ public suspend fun <T> SingleSource<T>.await(): T = suspendCancellableCoroutine
82116
// ------------------------ ObservableSource ------------------------
83117

84118
/**
85-
* Awaits for the first value from the given observable without blocking a thread.
86-
* Returns the resulting value or throws the corresponding exception if this observable had produced error.
119+
* Awaits the first value from the given [Observable] without blocking the thread and returns the resulting value, or,
120+
* if the observable has produced an error, throws the corresponding exception.
87121
*
88122
* This suspending function is cancellable.
89-
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
90-
* immediately resumes with [CancellationException].
123+
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
124+
* function immediately disposes of its subscription and resumes with [CancellationException].
91125
*
92-
* @throws NoSuchElementException if observable does not emit any value
126+
* @throws NoSuchElementException if the observable does not emit any value
93127
*/
94128
public suspend fun <T> ObservableSource<T>.awaitFirst(): T = awaitOne(Mode.FIRST)
95129

96130
/**
97-
* Awaits for the first value from the given observable or the [default] value if none is emitted without blocking a
98-
* thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
131+
* Awaits the first value from the given [Observable], or returns the [default] value if none is emitted, without
132+
* blocking the thread, and returns the resulting value, or, if this observable has produced an error, throws the
133+
* corresponding exception.
99134
*
100135
* This suspending function is cancellable.
101-
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
102-
* immediately resumes with [CancellationException].
136+
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
137+
* function immediately disposes of its subscription and resumes with [CancellationException].
103138
*/
104139
public suspend fun <T> ObservableSource<T>.awaitFirstOrDefault(default: T): T = awaitOne(Mode.FIRST_OR_DEFAULT, default)
105140

106141
/**
107-
* Awaits for the first value from the given observable or `null` value if none is emitted without blocking a
108-
* thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
142+
* Awaits the first value from the given [Observable], or returns `null` if none is emitted, without blocking the
143+
* thread, and returns the resulting value, or, if this observable has produced an error, throws the corresponding
144+
* exception.
109145
*
110146
* This suspending function is cancellable.
111-
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
112-
* immediately resumes with [CancellationException].
147+
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
148+
* function immediately disposes of its subscription and resumes with [CancellationException].
113149
*/
114150
public suspend fun <T> ObservableSource<T>.awaitFirstOrNull(): T? = awaitOne(Mode.FIRST_OR_DEFAULT)
115151

116152
/**
117-
* Awaits for the first value from the given observable or call [defaultValue] to get a value if none is emitted without blocking a
118-
* thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
153+
* Awaits the first value from the given [Observable], or calls [defaultValue] to get a value if none is emitted,
154+
* without blocking the thread, and returns the resulting value, or, if this observable has produced an error, throws
155+
* the corresponding exception.
119156
*
120157
* This suspending function is cancellable.
121-
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
122-
* immediately resumes with [CancellationException].
158+
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
159+
* function immediately disposes of its subscription and resumes with [CancellationException].
123160
*/
124-
public suspend fun <T> ObservableSource<T>.awaitFirstOrElse(defaultValue: () -> T): T = awaitOne(Mode.FIRST_OR_DEFAULT) ?: defaultValue()
161+
public suspend fun <T> ObservableSource<T>.awaitFirstOrElse(defaultValue: () -> T): T =
162+
awaitOne(Mode.FIRST_OR_DEFAULT) ?: defaultValue()
125163

126164
/**
127-
* Awaits for the last value from the given observable without blocking a thread.
128-
* Returns the resulting value or throws the corresponding exception if this observable had produced error.
165+
* Awaits the last value from the given [Observable] without blocking the thread and
166+
* returns the resulting value, or, if this observable has produced an error, throws the corresponding exception.
129167
*
130168
* This suspending function is cancellable.
131-
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
132-
* immediately resumes with [CancellationException].
169+
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
170+
* function immediately disposes of its subscription and resumes with [CancellationException].
133171
*
134-
* @throws NoSuchElementException if observable does not emit any value
172+
* @throws NoSuchElementException if the observable does not emit any value
135173
*/
136174
public suspend fun <T> ObservableSource<T>.awaitLast(): T = awaitOne(Mode.LAST)
137175

138176
/**
139-
* Awaits for the single value from the given observable without blocking a thread.
140-
* Returns the resulting value or throws the corresponding exception if this observable had produced error.
177+
* Awaits the single value from the given observable without blocking the thread and returns the resulting value, or,
178+
* if this observable has produced an error, throws the corresponding exception.
141179
*
142180
* This suspending function is cancellable.
143-
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
144-
* immediately resumes with [CancellationException].
181+
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
182+
* function immediately disposes of its subscription and resumes with [CancellationException].
145183
*
146-
* @throws NoSuchElementException if observable does not emit any value
147-
* @throws IllegalArgumentException if observable emits more than one value
184+
* @throws NoSuchElementException if the observable does not emit any value
185+
* @throws IllegalArgumentException if the observable emits more than one value
148186
*/
149187
public suspend fun <T> ObservableSource<T>.awaitSingle(): T = awaitOne(Mode.SINGLE)
150188

reactive/kotlinx-coroutines-rx2/test/CompletableTest.kt

+27-3
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,31 @@ class CompletableTest : TestBase() {
9898
}
9999
}
100100

101+
/** Tests that calls to [await] throw [CancellationException] and dispose of the subscription when their [Job] is
102+
* cancelled. */
103+
@Test
104+
fun testAwaitCancellation() = runTest {
105+
expect(1)
106+
val completable = CompletableSource { s ->
107+
s.onSubscribe(object: Disposable {
108+
override fun dispose() { expect(4) }
109+
override fun isDisposed(): Boolean { expectUnreached(); return false }
110+
})
111+
}
112+
val job = launch(start = CoroutineStart.UNDISPATCHED) {
113+
try {
114+
expect(2)
115+
completable.await()
116+
} catch (e: CancellationException) {
117+
expect(5)
118+
throw e
119+
}
120+
}
121+
expect(3)
122+
job.cancelAndJoin()
123+
finish(6)
124+
}
125+
101126
@Test
102127
fun testSuppressedException() = runTest {
103128
val completable = rxCompletable(currentDispatcher()) {
@@ -119,7 +144,7 @@ class CompletableTest : TestBase() {
119144
}
120145

121146
@Test
122-
fun testUnhandledException() = runTest() {
147+
fun testUnhandledException() = runTest {
123148
expect(1)
124149
var disposable: Disposable? = null
125150
val handler = { e: Throwable ->
@@ -165,8 +190,7 @@ class CompletableTest : TestBase() {
165190
withExceptionHandler(handler) {
166191
rxCompletable(Dispatchers.Unconfined) {
167192
expect(1)
168-
42
169-
}.subscribe({ throw LinkageError() })
193+
}.subscribe { throw LinkageError() }
170194
finish(3)
171195
}
172196
}

reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt

+37-11
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,12 @@ package kotlinx.coroutines.rx2
77
import io.reactivex.*
88
import io.reactivex.disposables.*
99
import io.reactivex.exceptions.*
10-
import io.reactivex.functions.*
1110
import io.reactivex.internal.functions.Functions.*
1211
import kotlinx.coroutines.*
1312
import org.junit.*
1413
import org.junit.Test
1514
import java.util.concurrent.*
16-
import java.util.concurrent.CancellationException
15+
import java.util.concurrent.CancellationException as jCancellationException
1716
import kotlin.test.*
1817

1918
class MaybeTest : TestBase() {
@@ -47,7 +46,7 @@ class MaybeTest : TestBase() {
4746
null
4847
}
4948
expect(2)
50-
maybe.subscribe (emptyConsumer(), ON_ERROR_MISSING, Action {
49+
maybe.subscribe (emptyConsumer(), ON_ERROR_MISSING, {
5150
expect(5)
5251
})
5352
expect(3)
@@ -112,18 +111,45 @@ class MaybeTest : TestBase() {
112111

113112
@Test
114113
fun testMaybeAwait() = runBlocking {
115-
assertEquals("OK", Maybe.just("O").await() + "K")
114+
assertEquals("OK", Maybe.just("O").awaitSingleOrNull() + "K")
115+
assertEquals("OK", Maybe.just("O").awaitSingle() + "K")
116116
}
117117

118118
@Test
119-
fun testMaybeAwaitForNull() = runBlocking {
120-
assertNull(Maybe.empty<String>().await())
119+
fun testMaybeAwaitForNull(): Unit = runBlocking {
120+
assertNull(Maybe.empty<String>().awaitSingleOrNull())
121+
assertFailsWith<NoSuchElementException> { Maybe.empty<String>().awaitSingle() }
122+
}
123+
124+
/** Tests that calls to [awaitSingleOrNull] throw [CancellationException] and dispose of the subscription when their
125+
* [Job] is cancelled. */
126+
@Test
127+
fun testMaybeAwaitCancellation() = runTest {
128+
expect(1)
129+
val maybe = MaybeSource<Int> { s ->
130+
s.onSubscribe(object: Disposable {
131+
override fun dispose() { expect(4) }
132+
override fun isDisposed(): Boolean { expectUnreached(); return false }
133+
})
134+
}
135+
val job = launch(start = CoroutineStart.UNDISPATCHED) {
136+
try {
137+
expect(2)
138+
maybe.awaitSingleOrNull()
139+
} catch (e: CancellationException) {
140+
expect(5)
141+
throw e
142+
}
143+
}
144+
expect(3)
145+
job.cancelAndJoin()
146+
finish(6)
121147
}
122148

123149
@Test
124150
fun testMaybeEmitAndAwait() {
125151
val maybe = rxMaybe {
126-
Maybe.just("O").await() + "K"
152+
Maybe.just("O").awaitSingleOrNull() + "K"
127153
}
128154

129155
checkMaybeValue(maybe) {
@@ -205,11 +231,11 @@ class MaybeTest : TestBase() {
205231
@Test
206232
fun testCancelledConsumer() = runTest {
207233
expect(1)
208-
val maybe = rxMaybe<Int>(currentDispatcher()) {
234+
val maybe = rxMaybe(currentDispatcher()) {
209235
expect(4)
210236
try {
211237
delay(Long.MAX_VALUE)
212-
} catch (e: CancellationException) {
238+
} catch (e: jCancellationException) {
213239
expect(6)
214240
}
215241
42
@@ -241,7 +267,7 @@ class MaybeTest : TestBase() {
241267
}
242268
}
243269
try {
244-
maybe.await()
270+
maybe.awaitSingleOrNull()
245271
expectUnreached()
246272
} catch (e: TestException) {
247273
assertTrue(e.suppressed[0] is TestException2)
@@ -301,7 +327,7 @@ class MaybeTest : TestBase() {
301327
rxMaybe(Dispatchers.Unconfined) {
302328
expect(1)
303329
42
304-
}.subscribe({ throw LinkageError() })
330+
}.subscribe { throw LinkageError() }
305331
finish(3)
306332
}
307333
}

0 commit comments

Comments
 (0)