Skip to content

Commit dd8860f

Browse files
committed
Promote rx2 extensions to stable, increase deprecation level for obsolete reactive primitives
1 parent 5410890 commit dd8860f

File tree

15 files changed

+39
-44
lines changed

15 files changed

+39
-44
lines changed

reactive/kotlinx-coroutines-reactive/src/Channel.kt

+1-2
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,14 @@ import org.reactivestreams.*
2020
* @param request how many items to request from publisher in advance (optional, one by default).
2121
*/
2222
@ObsoleteCoroutinesApi
23-
@Suppress("CONFLICTING_OVERLOADS")
2423
public fun <T> Publisher<T>.openSubscription(request: Int = 1): ReceiveChannel<T> {
2524
val channel = SubscriptionChannel<T>(request)
2625
subscribe(channel)
2726
return channel
2827
}
2928

3029
// Will be promoted to error in 1.3.0, removed in 1.4.0
31-
@Deprecated(message = "Use collect instead", level = DeprecationLevel.WARNING, replaceWith = ReplaceWith("this.collect(action)"))
30+
@Deprecated(message = "Use collect instead", level = DeprecationLevel.ERROR, replaceWith = ReplaceWith("this.collect(action)"))
3231
public suspend inline fun <T> Publisher<T>.consumeEach(action: (T) -> Unit) =
3332
openSubscription().consumeEach(action)
3433

reactive/kotlinx-coroutines-reactor/src/Flux.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public fun <T> flux(
5050

5151
@Deprecated(
5252
message = "CoroutineScope.flux is deprecated in favour of top-level flux",
53-
level = DeprecationLevel.WARNING,
53+
level = DeprecationLevel.ERROR,
5454
replaceWith = ReplaceWith("flux(context, block)")
5555
) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0. Binary compatibility with Spring
5656
@LowPriorityInOverloadResolution

reactive/kotlinx-coroutines-reactor/src/Mono.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public fun <T> mono(
3939

4040
@Deprecated(
4141
message = "CoroutineScope.mono is deprecated in favour of top-level mono",
42-
level = DeprecationLevel.WARNING,
42+
level = DeprecationLevel.ERROR,
4343
replaceWith = ReplaceWith("mono(context, block)")
4444
) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0
4545
@LowPriorityInOverloadResolution

reactive/kotlinx-coroutines-reactor/test/FluxMultiTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ class FluxMultiTest : TestBase() {
7373
val mono = mono {
7474
var result = ""
7575
try {
76-
flux.consumeEach { result += it }
76+
flux.collect { result += it }
7777
} catch(e: IOException) {
7878
result += e.message
7979
}

reactive/kotlinx-coroutines-rx2/src/RxChannel.kt

+2-6
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ import kotlinx.coroutines.internal.*
1919
* See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
2020
*/
2121
@ObsoleteCoroutinesApi
22-
@Suppress("CONFLICTING_OVERLOADS")
2322
public fun <T> MaybeSource<T>.openSubscription(): ReceiveChannel<T> {
2423
val channel = SubscriptionChannel<T>()
2524
subscribe(channel)
@@ -34,36 +33,33 @@ public fun <T> MaybeSource<T>.openSubscription(): ReceiveChannel<T> {
3433
* See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
3534
*/
3635
@ObsoleteCoroutinesApi
37-
@Suppress("CONFLICTING_OVERLOADS")
3836
public fun <T> ObservableSource<T>.openSubscription(): ReceiveChannel<T> {
3937
val channel = SubscriptionChannel<T>()
4038
subscribe(channel)
4139
return channel
4240
}
4341

4442
// Will be promoted to error in 1.3.0, removed in 1.4.0
45-
@Deprecated(message = "Use collect instead", level = DeprecationLevel.WARNING, replaceWith = ReplaceWith("this.collect(action)"))
43+
@Deprecated(message = "Use collect instead", level = DeprecationLevel.ERROR, replaceWith = ReplaceWith("this.collect(action)"))
4644
public suspend inline fun <T> MaybeSource<T>.consumeEach(action: (T) -> Unit) =
4745
openSubscription().consumeEach(action)
4846

4947
// Will be promoted to error in 1.3.0, removed in 1.4.0
50-
@Deprecated(message = "Use collect instead", level = DeprecationLevel.WARNING, replaceWith = ReplaceWith("this.collect(action)"))
48+
@Deprecated(message = "Use collect instead", level = DeprecationLevel.ERROR, replaceWith = ReplaceWith("this.collect(action)"))
5149
public suspend inline fun <T> ObservableSource<T>.consumeEach(action: (T) -> Unit) =
5250
openSubscription().consumeEach(action)
5351

5452
/**
5553
* Subscribes to this [MaybeSource] and performs the specified action for each received element.
5654
* Cancels subscription if any exception happens during collect.
5755
*/
58-
@ExperimentalCoroutinesApi // Since 1.2.1, tentatively till 1.3.0
5956
public suspend inline fun <T> MaybeSource<T>.collect(action: (T) -> Unit) =
6057
openSubscription().consumeEach(action)
6158

6259
/**
6360
* Subscribes to this [ObservableSource] and performs the specified action for each received element.
6461
* Cancels subscription if any exception happens during collect.
6562
*/
66-
@ExperimentalCoroutinesApi // Since 1.2.1, tentatively till 1.3.0
6763
public suspend inline fun <T> ObservableSource<T>.collect(action: (T) -> Unit) =
6864
openSubscription().consumeEach(action)
6965

reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public fun rxCompletable(
3636

3737
@Deprecated(
3838
message = "CoroutineScope.rxCompletable is deprecated in favour of top-level rxCompletable",
39-
level = DeprecationLevel.WARNING,
39+
level = DeprecationLevel.ERROR,
4040
replaceWith = ReplaceWith("rxCompletable(context, block)")
4141
) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0
4242
@LowPriorityInOverloadResolution

reactive/kotlinx-coroutines-rx2/src/RxFlowable.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public fun <T: Any> rxFlowable(
4545

4646
@Deprecated(
4747
message = "CoroutineScope.rxFlowable is deprecated in favour of top-level rxFlowable",
48-
level = DeprecationLevel.WARNING,
48+
level = DeprecationLevel.ERROR,
4949
replaceWith = ReplaceWith("rxFlowable(context, block)")
5050
) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0
5151
@LowPriorityInOverloadResolution

reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public fun <T> rxMaybe(
3737

3838
@Deprecated(
3939
message = "CoroutineScope.rxMaybe is deprecated in favour of top-level rxMaybe",
40-
level = DeprecationLevel.WARNING,
40+
level = DeprecationLevel.ERROR,
4141
replaceWith = ReplaceWith("rxMaybe(context, block)")
4242
) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0
4343
@LowPriorityInOverloadResolution

reactive/kotlinx-coroutines-rx2/src/RxObservable.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public fun <T : Any> rxObservable(
4646

4747
@Deprecated(
4848
message = "CoroutineScope.rxObservable is deprecated in favour of top-level rxObservable",
49-
level = DeprecationLevel.WARNING,
49+
level = DeprecationLevel.ERROR,
5050
replaceWith = ReplaceWith("rxObservable(context, block)")
5151
) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0
5252
@LowPriorityInOverloadResolution

reactive/kotlinx-coroutines-rx2/src/RxSingle.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public fun <T : Any> rxSingle(
3636

3737
@Deprecated(
3838
message = "CoroutineScope.rxSingle is deprecated in favour of top-level rxSingle",
39-
level = DeprecationLevel.WARNING,
39+
level = DeprecationLevel.ERROR,
4040
replaceWith = ReplaceWith("rxSingle(context, block)")
4141
) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0
4242
@LowPriorityInOverloadResolution

reactive/kotlinx-coroutines-rx2/test/CompletableTest.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ class CompletableTest : TestBase() {
150150

151151
@Test
152152
fun testFatalExceptionInSubscribe() = runTest {
153-
GlobalScope.rxCompletable(Dispatchers.Unconfined + CoroutineExceptionHandler{ _, e -> assertTrue(e is LinkageError); expect(2)}) {
153+
rxCompletable(Dispatchers.Unconfined + CoroutineExceptionHandler{ _, e -> assertTrue(e is LinkageError); expect(2)}) {
154154
expect(1)
155155
42
156156
}.subscribe({ throw LinkageError() })
@@ -159,7 +159,7 @@ class CompletableTest : TestBase() {
159159

160160
@Test
161161
fun testFatalExceptionInSingle() = runTest {
162-
GlobalScope.rxCompletable(Dispatchers.Unconfined) {
162+
rxCompletable(Dispatchers.Unconfined) {
163163
throw LinkageError()
164164
}.subscribe({ expectUnreached() }, { expect(1); assertTrue(it is LinkageError) })
165165
finish(2)

reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,7 @@ class MaybeTest : TestBase() {
280280

281281
@Test
282282
fun testFatalExceptionInSubscribe() = runTest {
283-
GlobalScope.rxMaybe(Dispatchers.Unconfined + CoroutineExceptionHandler{ _, e -> assertTrue(e is LinkageError); expect(2)}) {
283+
rxMaybe(Dispatchers.Unconfined + CoroutineExceptionHandler{ _, e -> assertTrue(e is LinkageError); expect(2)}) {
284284
expect(1)
285285
42
286286
}.subscribe({ throw LinkageError() })
@@ -289,7 +289,7 @@ class MaybeTest : TestBase() {
289289

290290
@Test
291291
fun testFatalExceptionInSingle() = runTest {
292-
GlobalScope.rxMaybe(Dispatchers.Unconfined) {
292+
rxMaybe(Dispatchers.Unconfined) {
293293
throw LinkageError()
294294
}.subscribe({ expectUnreached() }, { expect(1); assertTrue(it is LinkageError) })
295295
finish(2)

reactive/kotlinx-coroutines-rx2/test/ObservableMultiTest.kt

+6-6
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ class ObservableMultiTest : TestBase() {
1818
@Test
1919
fun testNumbers() {
2020
val n = 100 * stressTestMultiplier
21-
val observable = GlobalScope.rxObservable {
21+
val observable = rxObservable {
2222
repeat(n) { send(it) }
2323
}
2424
checkSingleValue(observable.toList()) { list ->
@@ -30,7 +30,7 @@ class ObservableMultiTest : TestBase() {
3030
@Test
3131
fun testConcurrentStress() {
3232
val n = 10_000 * stressTestMultiplier
33-
val observable = GlobalScope.rxObservable {
33+
val observable = rxObservable {
3434
newCoroutineContext(coroutineContext)
3535
// concurrent emitters (many coroutines)
3636
val jobs = List(n) {
@@ -51,7 +51,7 @@ class ObservableMultiTest : TestBase() {
5151
@Test
5252
fun testIteratorResendUnconfined() {
5353
val n = 10_000 * stressTestMultiplier
54-
val observable = GlobalScope.rxObservable(Dispatchers.Unconfined) {
54+
val observable = rxObservable(Dispatchers.Unconfined) {
5555
Observable.range(0, n).collect { send(it) }
5656
}
5757
checkSingleValue(observable.toList()) { list ->
@@ -62,7 +62,7 @@ class ObservableMultiTest : TestBase() {
6262
@Test
6363
fun testIteratorResendPool() {
6464
val n = 10_000 * stressTestMultiplier
65-
val observable = GlobalScope.rxObservable {
65+
val observable = rxObservable {
6666
Observable.range(0, n).collect { send(it) }
6767
}
6868
checkSingleValue(observable.toList()) { list ->
@@ -72,14 +72,14 @@ class ObservableMultiTest : TestBase() {
7272

7373
@Test
7474
fun testSendAndCrash() {
75-
val observable = GlobalScope.rxObservable {
75+
val observable = rxObservable {
7676
send("O")
7777
throw IOException("K")
7878
}
7979
val single = rxSingle {
8080
var result = ""
8181
try {
82-
observable.consumeEach { result += it }
82+
observable.collect { result += it }
8383
} catch(e: IOException) {
8484
result += e.message
8585
}

reactive/kotlinx-coroutines-rx2/test/ObservableSingleTest.kt

+16-16
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import java.util.concurrent.*
1313
class ObservableSingleTest {
1414
@Test
1515
fun testSingleNoWait() {
16-
val observable = GlobalScope.rxObservable {
16+
val observable = rxObservable {
1717
send("OK")
1818
}
1919

@@ -29,7 +29,7 @@ class ObservableSingleTest {
2929

3030
@Test
3131
fun testSingleEmitAndAwait() {
32-
val observable = GlobalScope.rxObservable {
32+
val observable = rxObservable {
3333
send(Observable.just("O").awaitSingle() + "K")
3434
}
3535

@@ -40,7 +40,7 @@ class ObservableSingleTest {
4040

4141
@Test
4242
fun testSingleWithDelay() {
43-
val observable = GlobalScope.rxObservable {
43+
val observable = rxObservable {
4444
send(Observable.timer(50, TimeUnit.MILLISECONDS).map { "O" }.awaitSingle() + "K")
4545
}
4646

@@ -51,7 +51,7 @@ class ObservableSingleTest {
5151

5252
@Test
5353
fun testSingleException() {
54-
val observable = GlobalScope.rxObservable {
54+
val observable = rxObservable {
5555
send(Observable.just("O", "K").awaitSingle() + "K")
5656
}
5757

@@ -62,7 +62,7 @@ class ObservableSingleTest {
6262

6363
@Test
6464
fun testAwaitFirst() {
65-
val observable = GlobalScope.rxObservable {
65+
val observable = rxObservable {
6666
send(Observable.just("O", "#").awaitFirst() + "K")
6767
}
6868

@@ -73,7 +73,7 @@ class ObservableSingleTest {
7373

7474
@Test
7575
fun testAwaitFirstOrDefault() {
76-
val observable = GlobalScope.rxObservable {
76+
val observable = rxObservable {
7777
send(Observable.empty<String>().awaitFirstOrDefault("O") + "K")
7878
}
7979

@@ -84,7 +84,7 @@ class ObservableSingleTest {
8484

8585
@Test
8686
fun testAwaitFirstOrDefaultWithValues() {
87-
val observable = GlobalScope.rxObservable {
87+
val observable = rxObservable {
8888
send(Observable.just("O", "#").awaitFirstOrDefault("!") + "K")
8989
}
9090

@@ -95,7 +95,7 @@ class ObservableSingleTest {
9595

9696
@Test
9797
fun testAwaitFirstOrNull() {
98-
val observable = GlobalScope.rxObservable<String> {
98+
val observable = rxObservable<String> {
9999
send(Observable.empty<String>().awaitFirstOrNull() ?: "OK")
100100
}
101101

@@ -106,7 +106,7 @@ class ObservableSingleTest {
106106

107107
@Test
108108
fun testAwaitFirstOrNullWithValues() {
109-
val observable = GlobalScope.rxObservable {
109+
val observable = rxObservable {
110110
send((Observable.just("O", "#").awaitFirstOrNull() ?: "!") + "K")
111111
}
112112

@@ -117,7 +117,7 @@ class ObservableSingleTest {
117117

118118
@Test
119119
fun testAwaitFirstOrElse() {
120-
val observable = GlobalScope.rxObservable {
120+
val observable = rxObservable {
121121
send(Observable.empty<String>().awaitFirstOrElse { "O" } + "K")
122122
}
123123

@@ -128,7 +128,7 @@ class ObservableSingleTest {
128128

129129
@Test
130130
fun testAwaitFirstOrElseWithValues() {
131-
val observable = GlobalScope.rxObservable {
131+
val observable = rxObservable {
132132
send(Observable.just("O", "#").awaitFirstOrElse { "!" } + "K")
133133
}
134134

@@ -139,7 +139,7 @@ class ObservableSingleTest {
139139

140140
@Test
141141
fun testAwaitLast() {
142-
val observable = GlobalScope.rxObservable {
142+
val observable = rxObservable {
143143
send(Observable.just("#", "O").awaitLast() + "K")
144144
}
145145

@@ -150,7 +150,7 @@ class ObservableSingleTest {
150150

151151
@Test
152152
fun testExceptionFromObservable() {
153-
val observable = GlobalScope.rxObservable {
153+
val observable = rxObservable {
154154
try {
155155
send(Observable.error<String>(RuntimeException("O")).awaitFirst())
156156
} catch (e: RuntimeException) {
@@ -165,7 +165,7 @@ class ObservableSingleTest {
165165

166166
@Test
167167
fun testExceptionFromCoroutine() {
168-
val observable = GlobalScope.rxObservable<String> {
168+
val observable = rxObservable<String> {
169169
error(Observable.just("O").awaitSingle() + "K")
170170
}
171171

@@ -177,7 +177,7 @@ class ObservableSingleTest {
177177

178178
@Test
179179
fun testObservableIteration() {
180-
val observable = GlobalScope.rxObservable {
180+
val observable = rxObservable {
181181
var result = ""
182182
Observable.just("O", "K").collect { result += it }
183183
send(result)
@@ -190,7 +190,7 @@ class ObservableSingleTest {
190190

191191
@Test
192192
fun testObservableIterationFailure() {
193-
val observable = GlobalScope.rxObservable {
193+
val observable = rxObservable {
194194
try {
195195
Observable.error<String>(RuntimeException("OK")).collect { fail("Should not be here") }
196196
send("Fail")

reactive/kotlinx-coroutines-rx2/test/SingleTest.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ class SingleTest : TestBase() {
201201

202202
@Test
203203
fun testFatalExceptionInSubscribe() = runTest {
204-
GlobalScope.rxSingle(Dispatchers.Unconfined + CoroutineExceptionHandler { _, e -> assertTrue(e is LinkageError); expect(2) }) {
204+
rxSingle(Dispatchers.Unconfined + CoroutineExceptionHandler { _, e -> assertTrue(e is LinkageError); expect(2) }) {
205205
expect(1)
206206
42
207207
}.subscribe(Consumer {
@@ -212,7 +212,7 @@ class SingleTest : TestBase() {
212212

213213
@Test
214214
fun testFatalExceptionInSingle() = runTest {
215-
GlobalScope.rxSingle(Dispatchers.Unconfined) {
215+
rxSingle(Dispatchers.Unconfined) {
216216
throw LinkageError()
217217
}.subscribe({ _, e -> assertTrue(e is LinkageError); expect(1) })
218218

0 commit comments

Comments
 (0)