Skip to content

Commit cff16a3

Browse files
committed
Change default request size in Publisher.openSubscription to one
This makes Publisher.collect consistent with Publisher.asFlow and ensures smooth integration with reactive backpressure operators "out of the box". Fixes #1267
1 parent ec88813 commit cff16a3

File tree

5 files changed

+87
-6
lines changed

5 files changed

+87
-6
lines changed

reactive/coroutines-guide-reactive.md

+4-3
Original file line numberDiff line numberDiff line change
@@ -292,19 +292,20 @@ OnSubscribe
292292
1
293293
2
294294
3
295-
4
296295
OnComplete
297296
Finally
297+
4
298298
5
299299
```
300300

301301
<!--- TEST -->
302302

303-
Notice how "OnComplete" and "Finally" are printed before the last element "5". It happens because our `main` function in this
303+
Notice how "OnComplete" and "Finally" are printed before the lasts element "4" and "5".
304+
It happens because our `main` function in this
304305
example is a coroutine that we start with the [runBlocking] coroutine builder.
305306
Our main coroutine receives on the flowable using the `source.collect { ... }` expression.
306307
The main coroutine is _suspended_ while it waits for the source to emit an item.
307-
When the last item is emitted by `Flowable.range(1, 5)` it
308+
When the last items are emitted by `Flowable.range(1, 5)` it
308309
_resumes_ the main coroutine, which gets dispatched onto the main thread to print this
309310
last element at a later point in time, while the source completes and prints "Finally".
310311

reactive/kotlinx-coroutines-reactive/src/Channel.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,11 @@ import org.reactivestreams.*
1717
* **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1818
* See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1919
*
20-
* @param request how many items to request from publisher in advance (optional, on-demand request by default).
20+
* @param request how many items to request from publisher in advance (optional, one by default).
2121
*/
2222
@ObsoleteCoroutinesApi
2323
@Suppress("CONFLICTING_OVERLOADS")
24-
public fun <T> Publisher<T>.openSubscription(request: Int = 0): ReceiveChannel<T> {
24+
public fun <T> Publisher<T>.openSubscription(request: Int = 1): ReceiveChannel<T> {
2525
val channel = SubscriptionChannel<T>(request)
2626
subscribe(channel)
2727
return channel
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.reactor
6+
7+
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.flow.*
9+
import kotlinx.coroutines.reactive.*
10+
import kotlinx.coroutines.reactive.flow.*
11+
import org.junit.Test
12+
import reactor.core.publisher.*
13+
import kotlin.test.*
14+
15+
class BackpressureTest : TestBase() {
16+
@Test
17+
fun testBackpressureDropDirect() = runTest {
18+
expect(1)
19+
Flux.fromArray(arrayOf(1))
20+
.onBackpressureDrop()
21+
.collect {
22+
assertEquals(1, it)
23+
expect(2)
24+
}
25+
finish(3)
26+
}
27+
28+
@Test
29+
fun testBackpressureDropFlow() = runTest {
30+
expect(1)
31+
Flux.fromArray(arrayOf(1))
32+
.onBackpressureDrop()
33+
.asFlow()
34+
.collect {
35+
assertEquals(1, it)
36+
expect(2)
37+
}
38+
finish(3)
39+
}
40+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.rx2
6+
7+
import io.reactivex.*
8+
import kotlinx.coroutines.*
9+
import kotlinx.coroutines.flow.*
10+
import kotlinx.coroutines.reactive.*
11+
import kotlinx.coroutines.reactive.flow.*
12+
import org.junit.Test
13+
import kotlin.test.*
14+
15+
class BackpressureTest : TestBase() {
16+
@Test
17+
fun testBackpressureDropDirect() = runTest {
18+
expect(1)
19+
Flowable.fromArray(1)
20+
.onBackpressureDrop()
21+
.collect {
22+
assertEquals(1, it)
23+
expect(2)
24+
}
25+
finish(3)
26+
}
27+
28+
@Test
29+
fun testBackpressureDropFlow() = runTest {
30+
expect(1)
31+
Flowable.fromArray(1)
32+
.onBackpressureDrop()
33+
.asFlow()
34+
.collect {
35+
assertEquals(1, it)
36+
expect(2)
37+
}
38+
finish(3)
39+
}
40+
}

reactive/kotlinx-coroutines-rx2/test/guide/test/GuideReactiveTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,9 @@ class GuideReactiveTest : ReactiveTestBase() {
5252
"1",
5353
"2",
5454
"3",
55-
"4",
5655
"OnComplete",
5756
"Finally",
57+
"4",
5858
"5"
5959
)
6060
}

0 commit comments

Comments
 (0)