Skip to content

Commit dc78a3b

Browse files
committed
Update the docs for await* in -reactive and -jdk9
1 parent a1c28e2 commit dc78a3b

File tree

4 files changed

+171
-75
lines changed

4 files changed

+171
-75
lines changed

reactive/kotlinx-coroutines-jdk9/src/Await.kt

+38-34
Original file line numberDiff line numberDiff line change
@@ -4,78 +4,82 @@
44

55
package kotlinx.coroutines.jdk9
66

7+
import kotlinx.coroutines.Job
78
import java.util.concurrent.*
89
import org.reactivestreams.FlowAdapters
910
import kotlinx.coroutines.reactive.*
1011

1112
/**
12-
* Awaits for the first value from the given publisher without blocking a thread and
13-
* returns the resulting value or throws the corresponding exception if this publisher had produced error.
13+
* Awaits the first value from the given publisher without blocking the thread and returns the resulting value, or, if
14+
* the publisher has produced an error, throws the corresponding exception.
1415
*
1516
* This suspending function is cancellable.
16-
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
17-
* immediately resumes with [CancellationException].
17+
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
18+
* function immediately cancels its [Flow.Subscription] and resumes with [CancellationException].
1819
*
19-
* @throws NoSuchElementException if publisher does not emit any value
20+
* @throws NoSuchElementException if the publisher does not emit any value
2021
*/
21-
public suspend fun <T> Flow.Publisher<T>.awaitFirst(): T = FlowAdapters.toPublisher(this).awaitFirst()
22+
public suspend fun <T> Flow.Publisher<T>.awaitFirst(): T =
23+
FlowAdapters.toPublisher(this).awaitFirst()
2224

2325
/**
24-
* Awaits for the first value from the given observable or the [default] value if none is emitted without blocking a
25-
* thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
26+
* Awaits the first value from the given observable, or returns the [default] value if none is emitted, without blocking
27+
* the thread, and returns the resulting value, or, if this observable has produced an error, throws the corresponding
28+
* exception.
2629
*
2730
* This suspending function is cancellable.
28-
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
29-
* immediately resumes with [CancellationException].
31+
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
32+
* function immediately cancels its [Flow.Subscription] and resumes with [CancellationException].
3033
*/
3134
public suspend fun <T> Flow.Publisher<T>.awaitFirstOrDefault(default: T): T =
32-
FlowAdapters.toPublisher(this).awaitFirstOrDefault(default)
35+
FlowAdapters.toPublisher(this).awaitFirstOrDefault(default)
3336

3437
/**
35-
* Awaits for the first value from the given observable or `null` value if none is emitted without blocking a
36-
* thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
38+
* Awaits the first value from the given observable, or returns `null` if none is emitted, without blocking the thread,
39+
* and returns the resulting value, or, if this observable has produced an error, throws the corresponding exception.
3740
*
3841
* This suspending function is cancellable.
39-
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
40-
* immediately resumes with [CancellationException].
42+
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
43+
* function immediately cancels its [Flow.Subscription] and resumes with [CancellationException].
4144
*/
4245
public suspend fun <T> Flow.Publisher<T>.awaitFirstOrNull(): T? =
43-
FlowAdapters.toPublisher(this).awaitFirstOrNull()
46+
FlowAdapters.toPublisher(this).awaitFirstOrNull()
4447

4548
/**
46-
* Awaits for the first value from the given observable or call [defaultValue] to get a value if none is emitted without blocking a
47-
* thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
49+
* Awaits the first value from the given observable, or calls [defaultValue] to get a value if none is emitted, without
50+
* blocking the thread, and returns the resulting value, or, if this observable has produced an error, throws the
51+
* corresponding exception.
4852
*
4953
* This suspending function is cancellable.
50-
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
51-
* immediately resumes with [CancellationException].
54+
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
55+
* function immediately cancels its [Flow.Subscription] and resumes with [CancellationException].
5256
*/
5357
public suspend fun <T> Flow.Publisher<T>.awaitFirstOrElse(defaultValue: () -> T): T =
54-
FlowAdapters.toPublisher(this).awaitFirstOrElse(defaultValue)
58+
FlowAdapters.toPublisher(this).awaitFirstOrElse(defaultValue)
5559

5660
/**
57-
* Awaits for the last value from the given publisher without blocking a thread and
58-
* returns the resulting value or throws the corresponding exception if this publisher had produced error.
61+
* Awaits the last value from the given publisher without blocking the thread and
62+
* returns the resulting value, or, if this publisher has produced an error, throws the corresponding exception.
5963
*
6064
* This suspending function is cancellable.
61-
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
62-
* immediately resumes with [CancellationException].
65+
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
66+
* function immediately cancels its [Flow.Subscription] and resumes with [CancellationException].
6367
*
64-
* @throws NoSuchElementException if publisher does not emit any value
68+
* @throws NoSuchElementException if the publisher does not emit any value
6569
*/
6670
public suspend fun <T> Flow.Publisher<T>.awaitLast(): T =
67-
FlowAdapters.toPublisher(this).awaitLast()
71+
FlowAdapters.toPublisher(this).awaitLast()
6872

6973
/**
70-
* Awaits for the single value from the given publisher without blocking a thread and
71-
* returns the resulting value or throws the corresponding exception if this publisher had produced error.
74+
* Awaits the single value from the given publisher without blocking the thread and returns the resulting value, or,
75+
* if this publisher has produced an error, throws the corresponding exception.
7276
*
7377
* This suspending function is cancellable.
74-
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
75-
* immediately resumes with [CancellationException].
78+
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
79+
* function immediately cancels its [Flow.Subscription] and resumes with [CancellationException].
7680
*
77-
* @throws NoSuchElementException if publisher does not emit any value
78-
* @throws IllegalArgumentException if publisher emits more than one value
81+
* @throws NoSuchElementException if the publisher does not emit any value
82+
* @throws IllegalArgumentException if the publisher emits more than one value
7983
*/
8084
public suspend fun <T> Flow.Publisher<T>.awaitSingle(): T =
81-
FlowAdapters.toPublisher(this).awaitSingle()
85+
FlowAdapters.toPublisher(this).awaitSingle()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.jdk9
6+
7+
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.CancellationException
9+
import org.junit.*
10+
import java.util.concurrent.*
11+
12+
class AwaitTest: TestBase() {
13+
14+
/** Tests that calls to [awaitFirst] (and, thus, to the rest of these functions) throw [CancellationException] and
15+
* unsubscribe from the publisher when their [Job] is cancelled. */
16+
@Test
17+
fun testAwaitCancellation() = runTest {
18+
expect(1)
19+
val publisher = Flow.Publisher<Int> { s ->
20+
s.onSubscribe(object : Flow.Subscription {
21+
override fun request(n: Long) {
22+
expect(3)
23+
}
24+
25+
override fun cancel() {
26+
expect(5)
27+
}
28+
})
29+
}
30+
val job = launch(start = CoroutineStart.UNDISPATCHED) {
31+
try {
32+
expect(2)
33+
publisher.awaitFirst()
34+
} catch (e: CancellationException) {
35+
expect(6)
36+
throw e
37+
}
38+
}
39+
expect(4)
40+
job.cancelAndJoin()
41+
finish(7)
42+
}
43+
44+
}

reactive/kotlinx-coroutines-reactive/src/Await.kt

+46-41
Original file line numberDiff line numberDiff line change
@@ -11,85 +11,89 @@ import org.reactivestreams.Publisher
1111
import org.reactivestreams.Subscriber
1212
import org.reactivestreams.Subscription
1313
import java.util.*
14+
import kotlin.NoSuchElementException
1415
import kotlin.coroutines.*
1516

1617
/**
17-
* Awaits for the first value from the given publisher without blocking a thread and
18-
* returns the resulting value or throws the corresponding exception if this publisher had produced error.
18+
* Awaits the first value from the given publisher without blocking the thread and returns the resulting value, or, if
19+
* the publisher has produced an error, throws the corresponding exception.
1920
*
2021
* This suspending function is cancellable.
21-
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
22-
* immediately resumes with [CancellationException].
22+
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
23+
* function immediately cancels its [Subscription] and resumes with [CancellationException].
2324
*
24-
* @throws NoSuchElementException if publisher does not emit any value
25+
* @throws NoSuchElementException if the publisher does not emit any value
2526
*/
2627
public suspend fun <T> Publisher<T>.awaitFirst(): T = awaitOne(Mode.FIRST)
2728

2829
/**
29-
* Awaits for the first value from the given observable or the [default] value if none is emitted without blocking a
30-
* thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
30+
* Awaits the first value from the given observable, or returns the [default] value if none is emitted, without blocking
31+
* the thread, and returns the resulting value, or, if this observable has produced an error, throws the corresponding
32+
* exception.
3133
*
3234
* This suspending function is cancellable.
33-
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
34-
* immediately resumes with [CancellationException].
35+
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
36+
* function immediately cancels its [Subscription] and resumes with [CancellationException].
3537
*/
3638
public suspend fun <T> Publisher<T>.awaitFirstOrDefault(default: T): T = awaitOne(Mode.FIRST_OR_DEFAULT, default)
3739

3840
/**
39-
* Awaits for the first value from the given observable or `null` value if none is emitted without blocking a
40-
* thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
41+
* Awaits the first value from the given observable, or returns `null` if none is emitted, without blocking the thread,
42+
* and returns the resulting value, or, if this observable has produced an error, throws the corresponding exception.
4143
*
4244
* This suspending function is cancellable.
43-
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
44-
* immediately resumes with [CancellationException].
45+
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
46+
* function immediately cancels its [Subscription] and resumes with [CancellationException].
4547
*/
4648
public suspend fun <T> Publisher<T>.awaitFirstOrNull(): T? = awaitOne(Mode.FIRST_OR_DEFAULT)
4749

4850
/**
49-
* Awaits for the first value from the given observable or call [defaultValue] to get a value if none is emitted without blocking a
50-
* thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
51+
* Awaits the first value from the given observable, or calls [defaultValue] to get a value if none is emitted, without
52+
* blocking the thread, and returns the resulting value, or, if this observable has produced an error, throws the
53+
* corresponding exception.
5154
*
5255
* This suspending function is cancellable.
53-
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
54-
* immediately resumes with [CancellationException].
56+
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
57+
* function immediately cancels its [Subscription] and resumes with [CancellationException].
5558
*/
5659
public suspend fun <T> Publisher<T>.awaitFirstOrElse(defaultValue: () -> T): T = awaitOne(Mode.FIRST_OR_DEFAULT) ?: defaultValue()
5760

5861
/**
59-
* Awaits for the last value from the given publisher without blocking a thread and
60-
* returns the resulting value or throws the corresponding exception if this publisher had produced error.
62+
* Awaits the last value from the given publisher without blocking the thread and
63+
* returns the resulting value, or, if this publisher has produced an error, throws the corresponding exception.
6164
*
6265
* This suspending function is cancellable.
63-
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
64-
* immediately resumes with [CancellationException].
66+
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
67+
* function immediately cancels its [Subscription] and resumes with [CancellationException].
6568
*
66-
* @throws NoSuchElementException if publisher does not emit any value
69+
* @throws NoSuchElementException if the publisher does not emit any value
6770
*/
6871
public suspend fun <T> Publisher<T>.awaitLast(): T = awaitOne(Mode.LAST)
6972

7073
/**
71-
* Awaits for the single value from the given publisher without blocking a thread and
72-
* returns the resulting value or throws the corresponding exception if this publisher had produced error.
74+
* Awaits the single value from the given publisher without blocking the thread and returns the resulting value, or,
75+
* if this publisher has produced an error, throws the corresponding exception.
7376
*
7477
* This suspending function is cancellable.
75-
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
76-
* immediately resumes with [CancellationException].
78+
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
79+
* function immediately cancels its [Subscription] and resumes with [CancellationException].
7780
*
78-
* @throws NoSuchElementException if publisher does not emit any value
79-
* @throws IllegalArgumentException if publisher emits more than one value
81+
* @throws NoSuchElementException if the publisher does not emit any value
82+
* @throws IllegalArgumentException if the publisher emits more than one value
8083
*/
8184
public suspend fun <T> Publisher<T>.awaitSingle(): T = awaitOne(Mode.SINGLE)
8285

8386
/**
84-
* Awaits for the single value from the given publisher or the [default] value if none is emitted without blocking a thread and
85-
* returns the resulting value or throws the corresponding exception if this publisher had produced error.
87+
* Awaits the single value from the given observable, or returns the [default] value if none is emitted, without
88+
* blocking the thread, and returns the resulting value, or, if this observable has produced an error, throws the
89+
* corresponding exception.
8690
*
8791
* This suspending function is cancellable.
88-
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
89-
* immediately resumes with [CancellationException].
92+
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
93+
* function immediately cancels its [Subscription] and resumes with [CancellationException].
9094
*
91-
* @throws NoSuchElementException if publisher does not emit any value
92-
* @throws IllegalArgumentException if publisher emits more than one value
95+
* @throws NoSuchElementException if the publisher does not emit any value
96+
* @throws IllegalArgumentException if the publisher emits more than one value
9397
*/
9498
public suspend fun <T> Publisher<T>.awaitSingleOrDefault(default: T): T = awaitOne(Mode.SINGLE_OR_DEFAULT, default)
9599

@@ -109,17 +113,18 @@ public suspend fun <T> Publisher<T>.awaitSingleOrNull(): T? = try {
109113
}
110114

111115
/**
112-
* Awaits for the single value from the given publisher or call [defaultValue] to get a value if none is emitted without blocking a thread and
113-
* returns the resulting value or throws the corresponding exception if this publisher had produced error.
116+
* Awaits the single value from the given observable, or calls [defaultValue] to get a value if none is emitted, without
117+
* blocking the thread, and returns the resulting value, or, if this observable has produced an error, throws the
118+
* corresponding exception.
114119
*
115120
* This suspending function is cancellable.
116-
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
117-
* immediately resumes with [CancellationException].
121+
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
122+
* function immediately cancels its [Subscription] and resumes with [CancellationException].
118123
*
119-
* @throws NoSuchElementException if publisher does not emit any value
120-
* @throws IllegalArgumentException if publisher emits more than one value
124+
* @throws IllegalArgumentException if the publisher emits more than one value
121125
*/
122-
public suspend fun <T> Publisher<T>.awaitSingleOrElse(defaultValue: () -> T): T = awaitOne(Mode.SINGLE_OR_DEFAULT) ?: defaultValue()
126+
public suspend fun <T> Publisher<T>.awaitSingleOrElse(defaultValue: () -> T): T =
127+
awaitOne(Mode.SINGLE_OR_DEFAULT) ?: defaultValue()
123128

124129
// ------------------------ private ------------------------
125130

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.reactive
6+
7+
import kotlinx.coroutines.*
8+
import org.junit.*
9+
import org.reactivestreams.*
10+
11+
class AwaitTest: TestBase() {
12+
13+
/** Tests that calls to [awaitFirst] (and, thus, to the rest of these functions) throw [CancellationException] and
14+
* unsubscribe from the publisher when their [Job] is cancelled. */
15+
@Test
16+
fun testAwaitCancellation() = runTest {
17+
expect(1)
18+
val publisher = Publisher<Int> { s ->
19+
s.onSubscribe(object: Subscription {
20+
override fun request(n: Long) {
21+
expect(3)
22+
}
23+
24+
override fun cancel() {
25+
expect(5)
26+
}
27+
})
28+
}
29+
val job = launch(start = CoroutineStart.UNDISPATCHED) {
30+
try {
31+
expect(2)
32+
publisher.awaitFirst()
33+
} catch (e: CancellationException) {
34+
expect(6)
35+
throw e
36+
}
37+
}
38+
expect(4)
39+
job.cancelAndJoin()
40+
finish(7)
41+
}
42+
43+
}

0 commit comments

Comments
 (0)