diff --git a/reactive/kotlinx-coroutines-reactive/src/Channel.kt b/reactive/kotlinx-coroutines-reactive/src/Channel.kt index 6cf11b7aa7..df2131b630 100644 --- a/reactive/kotlinx-coroutines-reactive/src/Channel.kt +++ b/reactive/kotlinx-coroutines-reactive/src/Channel.kt @@ -20,7 +20,6 @@ import org.reactivestreams.* * @param request how many items to request from publisher in advance (optional, one by default). */ @ObsoleteCoroutinesApi -@Suppress("CONFLICTING_OVERLOADS") public fun Publisher.openSubscription(request: Int = 1): ReceiveChannel { val channel = SubscriptionChannel(request) subscribe(channel) @@ -28,7 +27,7 @@ public fun Publisher.openSubscription(request: Int = 1): ReceiveChannel Publisher.consumeEach(action: (T) -> Unit) = openSubscription().consumeEach(action) @@ -36,7 +35,6 @@ public suspend inline fun Publisher.consumeEach(action: (T) -> Unit) = * Subscribes to this [Publisher] and performs the specified action for each received element. * Cancels subscription if any exception happens during collect. */ -@ExperimentalCoroutinesApi // Since 1.2.1, tentatively till 1.3.0 public suspend inline fun Publisher.collect(action: (T) -> Unit) = openSubscription().consumeEach(action) diff --git a/reactive/kotlinx-coroutines-reactive/src/Publish.kt b/reactive/kotlinx-coroutines-reactive/src/Publish.kt index a7d53876d6..0e3a7b8c11 100644 --- a/reactive/kotlinx-coroutines-reactive/src/Publish.kt +++ b/reactive/kotlinx-coroutines-reactive/src/Publish.kt @@ -48,7 +48,7 @@ public fun publish( @Deprecated( message = "CoroutineScope.publish is deprecated in favour of top-level publish", - level = DeprecationLevel.WARNING, + level = DeprecationLevel.ERROR, replaceWith = ReplaceWith("publish(context, block)") ) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0. Binary compatibility with Spring @LowPriorityInOverloadResolution diff --git a/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt b/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt index 559068f578..1a3edc842a 100644 --- a/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt +++ b/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt @@ -23,14 +23,12 @@ import kotlin.coroutines.* * If any of the resulting flow transformations fails, subscription is immediately cancelled and all in-flights elements * are discarded. */ -@ExperimentalCoroutinesApi public fun Publisher.asFlow(): Flow = PublisherAsFlow(this, 1) /** * Transforms the given flow to a spec-compliant [Publisher]. */ -@ExperimentalCoroutinesApi public fun Flow.asPublisher(): Publisher = FlowAsPublisher(this) private class PublisherAsFlow( diff --git a/reactive/kotlinx-coroutines-reactor/src/Flux.kt b/reactive/kotlinx-coroutines-reactor/src/Flux.kt index 18b84ac117..389428df11 100644 --- a/reactive/kotlinx-coroutines-reactor/src/Flux.kt +++ b/reactive/kotlinx-coroutines-reactor/src/Flux.kt @@ -50,7 +50,7 @@ public fun flux( @Deprecated( message = "CoroutineScope.flux is deprecated in favour of top-level flux", - level = DeprecationLevel.WARNING, + level = DeprecationLevel.ERROR, replaceWith = ReplaceWith("flux(context, block)") ) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0. Binary compatibility with Spring @LowPriorityInOverloadResolution diff --git a/reactive/kotlinx-coroutines-reactor/src/Mono.kt b/reactive/kotlinx-coroutines-reactor/src/Mono.kt index b218f6d0c5..76f0418ea6 100644 --- a/reactive/kotlinx-coroutines-reactor/src/Mono.kt +++ b/reactive/kotlinx-coroutines-reactor/src/Mono.kt @@ -39,7 +39,7 @@ public fun mono( @Deprecated( message = "CoroutineScope.mono is deprecated in favour of top-level mono", - level = DeprecationLevel.WARNING, + level = DeprecationLevel.ERROR, replaceWith = ReplaceWith("mono(context, block)") ) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0 @LowPriorityInOverloadResolution diff --git a/reactive/kotlinx-coroutines-reactor/src/ReactorFlow.kt b/reactive/kotlinx-coroutines-reactor/src/ReactorFlow.kt index 6b031ed453..c852c7667f 100644 --- a/reactive/kotlinx-coroutines-reactor/src/ReactorFlow.kt +++ b/reactive/kotlinx-coroutines-reactor/src/ReactorFlow.kt @@ -4,7 +4,6 @@ package kotlinx.coroutines.reactor -import kotlinx.coroutines.* import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.flowOn import kotlinx.coroutines.reactive.FlowSubscription @@ -15,7 +14,6 @@ import reactor.core.publisher.Flux * Converts the given flow to a cold flux. * The original flow is cancelled when the flux subscriber is disposed. */ -@ExperimentalCoroutinesApi public fun Flow.asFlux(): Flux = FlowAsFlux(this) private class FlowAsFlux(private val flow: Flow) : Flux() { diff --git a/reactive/kotlinx-coroutines-reactor/test/FluxMultiTest.kt b/reactive/kotlinx-coroutines-reactor/test/FluxMultiTest.kt index ae23d3c23d..7203120dae 100644 --- a/reactive/kotlinx-coroutines-reactor/test/FluxMultiTest.kt +++ b/reactive/kotlinx-coroutines-reactor/test/FluxMultiTest.kt @@ -73,7 +73,7 @@ class FluxMultiTest : TestBase() { val mono = mono { var result = "" try { - flux.consumeEach { result += it } + flux.collect { result += it } } catch(e: IOException) { result += e.message } diff --git a/reactive/kotlinx-coroutines-rx2/src/RxChannel.kt b/reactive/kotlinx-coroutines-rx2/src/RxChannel.kt index b038e539fb..dc852b9024 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxChannel.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxChannel.kt @@ -19,7 +19,6 @@ import kotlinx.coroutines.internal.* * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @ObsoleteCoroutinesApi -@Suppress("CONFLICTING_OVERLOADS") public fun MaybeSource.openSubscription(): ReceiveChannel { val channel = SubscriptionChannel() subscribe(channel) @@ -34,7 +33,6 @@ public fun MaybeSource.openSubscription(): ReceiveChannel { * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @ObsoleteCoroutinesApi -@Suppress("CONFLICTING_OVERLOADS") public fun ObservableSource.openSubscription(): ReceiveChannel { val channel = SubscriptionChannel() subscribe(channel) @@ -42,12 +40,12 @@ public fun ObservableSource.openSubscription(): ReceiveChannel { } // Will be promoted to error in 1.3.0, removed in 1.4.0 -@Deprecated(message = "Use collect instead", level = DeprecationLevel.WARNING, replaceWith = ReplaceWith("this.collect(action)")) +@Deprecated(message = "Use collect instead", level = DeprecationLevel.ERROR, replaceWith = ReplaceWith("this.collect(action)")) public suspend inline fun MaybeSource.consumeEach(action: (T) -> Unit) = openSubscription().consumeEach(action) // Will be promoted to error in 1.3.0, removed in 1.4.0 -@Deprecated(message = "Use collect instead", level = DeprecationLevel.WARNING, replaceWith = ReplaceWith("this.collect(action)")) +@Deprecated(message = "Use collect instead", level = DeprecationLevel.ERROR, replaceWith = ReplaceWith("this.collect(action)")) public suspend inline fun ObservableSource.consumeEach(action: (T) -> Unit) = openSubscription().consumeEach(action) @@ -55,7 +53,6 @@ public suspend inline fun ObservableSource.consumeEach(action: (T) -> Uni * Subscribes to this [MaybeSource] and performs the specified action for each received element. * Cancels subscription if any exception happens during collect. */ -@ExperimentalCoroutinesApi // Since 1.2.1, tentatively till 1.3.0 public suspend inline fun MaybeSource.collect(action: (T) -> Unit) = openSubscription().consumeEach(action) @@ -63,7 +60,6 @@ public suspend inline fun MaybeSource.collect(action: (T) -> Unit) = * Subscribes to this [ObservableSource] and performs the specified action for each received element. * Cancels subscription if any exception happens during collect. */ -@ExperimentalCoroutinesApi // Since 1.2.1, tentatively till 1.3.0 public suspend inline fun ObservableSource.collect(action: (T) -> Unit) = openSubscription().consumeEach(action) diff --git a/reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt b/reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt index 0da06776a5..c59b4bd6b5 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt @@ -36,7 +36,7 @@ public fun rxCompletable( @Deprecated( message = "CoroutineScope.rxCompletable is deprecated in favour of top-level rxCompletable", - level = DeprecationLevel.WARNING, + level = DeprecationLevel.ERROR, replaceWith = ReplaceWith("rxCompletable(context, block)") ) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0 @LowPriorityInOverloadResolution diff --git a/reactive/kotlinx-coroutines-rx2/src/RxFlowable.kt b/reactive/kotlinx-coroutines-rx2/src/RxFlowable.kt index beee40eedb..30a1ed7e92 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxFlowable.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxFlowable.kt @@ -45,7 +45,7 @@ public fun rxFlowable( @Deprecated( message = "CoroutineScope.rxFlowable is deprecated in favour of top-level rxFlowable", - level = DeprecationLevel.WARNING, + level = DeprecationLevel.ERROR, replaceWith = ReplaceWith("rxFlowable(context, block)") ) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0 @LowPriorityInOverloadResolution diff --git a/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt b/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt index fbc366c6df..9f176e938c 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt @@ -37,7 +37,7 @@ public fun rxMaybe( @Deprecated( message = "CoroutineScope.rxMaybe is deprecated in favour of top-level rxMaybe", - level = DeprecationLevel.WARNING, + level = DeprecationLevel.ERROR, replaceWith = ReplaceWith("rxMaybe(context, block)") ) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0 @LowPriorityInOverloadResolution diff --git a/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt b/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt index 3d0ccd824d..6ccf0f0bae 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt @@ -46,7 +46,7 @@ public fun rxObservable( @Deprecated( message = "CoroutineScope.rxObservable is deprecated in favour of top-level rxObservable", - level = DeprecationLevel.WARNING, + level = DeprecationLevel.ERROR, replaceWith = ReplaceWith("rxObservable(context, block)") ) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0 @LowPriorityInOverloadResolution diff --git a/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt b/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt index b6cebf097c..f3573ee6eb 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt @@ -36,7 +36,7 @@ public fun rxSingle( @Deprecated( message = "CoroutineScope.rxSingle is deprecated in favour of top-level rxSingle", - level = DeprecationLevel.WARNING, + level = DeprecationLevel.ERROR, replaceWith = ReplaceWith("rxSingle(context, block)") ) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0 @LowPriorityInOverloadResolution diff --git a/reactive/kotlinx-coroutines-rx2/test/CompletableTest.kt b/reactive/kotlinx-coroutines-rx2/test/CompletableTest.kt index a7caea4793..9a12bafb1d 100644 --- a/reactive/kotlinx-coroutines-rx2/test/CompletableTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/CompletableTest.kt @@ -150,7 +150,7 @@ class CompletableTest : TestBase() { @Test fun testFatalExceptionInSubscribe() = runTest { - GlobalScope.rxCompletable(Dispatchers.Unconfined + CoroutineExceptionHandler{ _, e -> assertTrue(e is LinkageError); expect(2)}) { + rxCompletable(Dispatchers.Unconfined + CoroutineExceptionHandler{ _, e -> assertTrue(e is LinkageError); expect(2)}) { expect(1) 42 }.subscribe({ throw LinkageError() }) @@ -159,7 +159,7 @@ class CompletableTest : TestBase() { @Test fun testFatalExceptionInSingle() = runTest { - GlobalScope.rxCompletable(Dispatchers.Unconfined) { + rxCompletable(Dispatchers.Unconfined) { throw LinkageError() }.subscribe({ expectUnreached() }, { expect(1); assertTrue(it is LinkageError) }) finish(2) diff --git a/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt b/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt index dcd66638e5..326c83e45c 100644 --- a/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt @@ -280,7 +280,7 @@ class MaybeTest : TestBase() { @Test fun testFatalExceptionInSubscribe() = runTest { - GlobalScope.rxMaybe(Dispatchers.Unconfined + CoroutineExceptionHandler{ _, e -> assertTrue(e is LinkageError); expect(2)}) { + rxMaybe(Dispatchers.Unconfined + CoroutineExceptionHandler{ _, e -> assertTrue(e is LinkageError); expect(2)}) { expect(1) 42 }.subscribe({ throw LinkageError() }) @@ -289,7 +289,7 @@ class MaybeTest : TestBase() { @Test fun testFatalExceptionInSingle() = runTest { - GlobalScope.rxMaybe(Dispatchers.Unconfined) { + rxMaybe(Dispatchers.Unconfined) { throw LinkageError() }.subscribe({ expectUnreached() }, { expect(1); assertTrue(it is LinkageError) }) finish(2) diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableMultiTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableMultiTest.kt index 75f79de5ca..6971918723 100644 --- a/reactive/kotlinx-coroutines-rx2/test/ObservableMultiTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/ObservableMultiTest.kt @@ -18,7 +18,7 @@ class ObservableMultiTest : TestBase() { @Test fun testNumbers() { val n = 100 * stressTestMultiplier - val observable = GlobalScope.rxObservable { + val observable = rxObservable { repeat(n) { send(it) } } checkSingleValue(observable.toList()) { list -> @@ -30,7 +30,7 @@ class ObservableMultiTest : TestBase() { @Test fun testConcurrentStress() { val n = 10_000 * stressTestMultiplier - val observable = GlobalScope.rxObservable { + val observable = rxObservable { newCoroutineContext(coroutineContext) // concurrent emitters (many coroutines) val jobs = List(n) { @@ -51,7 +51,7 @@ class ObservableMultiTest : TestBase() { @Test fun testIteratorResendUnconfined() { val n = 10_000 * stressTestMultiplier - val observable = GlobalScope.rxObservable(Dispatchers.Unconfined) { + val observable = rxObservable(Dispatchers.Unconfined) { Observable.range(0, n).collect { send(it) } } checkSingleValue(observable.toList()) { list -> @@ -62,7 +62,7 @@ class ObservableMultiTest : TestBase() { @Test fun testIteratorResendPool() { val n = 10_000 * stressTestMultiplier - val observable = GlobalScope.rxObservable { + val observable = rxObservable { Observable.range(0, n).collect { send(it) } } checkSingleValue(observable.toList()) { list -> @@ -72,14 +72,14 @@ class ObservableMultiTest : TestBase() { @Test fun testSendAndCrash() { - val observable = GlobalScope.rxObservable { + val observable = rxObservable { send("O") throw IOException("K") } val single = rxSingle { var result = "" try { - observable.consumeEach { result += it } + observable.collect { result += it } } catch(e: IOException) { result += e.message } diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableSingleTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableSingleTest.kt index 6b5d7451a4..2fa1d9b0df 100644 --- a/reactive/kotlinx-coroutines-rx2/test/ObservableSingleTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/ObservableSingleTest.kt @@ -13,7 +13,7 @@ import java.util.concurrent.* class ObservableSingleTest { @Test fun testSingleNoWait() { - val observable = GlobalScope.rxObservable { + val observable = rxObservable { send("OK") } @@ -29,7 +29,7 @@ class ObservableSingleTest { @Test fun testSingleEmitAndAwait() { - val observable = GlobalScope.rxObservable { + val observable = rxObservable { send(Observable.just("O").awaitSingle() + "K") } @@ -40,7 +40,7 @@ class ObservableSingleTest { @Test fun testSingleWithDelay() { - val observable = GlobalScope.rxObservable { + val observable = rxObservable { send(Observable.timer(50, TimeUnit.MILLISECONDS).map { "O" }.awaitSingle() + "K") } @@ -51,7 +51,7 @@ class ObservableSingleTest { @Test fun testSingleException() { - val observable = GlobalScope.rxObservable { + val observable = rxObservable { send(Observable.just("O", "K").awaitSingle() + "K") } @@ -62,7 +62,7 @@ class ObservableSingleTest { @Test fun testAwaitFirst() { - val observable = GlobalScope.rxObservable { + val observable = rxObservable { send(Observable.just("O", "#").awaitFirst() + "K") } @@ -73,7 +73,7 @@ class ObservableSingleTest { @Test fun testAwaitFirstOrDefault() { - val observable = GlobalScope.rxObservable { + val observable = rxObservable { send(Observable.empty().awaitFirstOrDefault("O") + "K") } @@ -84,7 +84,7 @@ class ObservableSingleTest { @Test fun testAwaitFirstOrDefaultWithValues() { - val observable = GlobalScope.rxObservable { + val observable = rxObservable { send(Observable.just("O", "#").awaitFirstOrDefault("!") + "K") } @@ -95,7 +95,7 @@ class ObservableSingleTest { @Test fun testAwaitFirstOrNull() { - val observable = GlobalScope.rxObservable { + val observable = rxObservable { send(Observable.empty().awaitFirstOrNull() ?: "OK") } @@ -106,7 +106,7 @@ class ObservableSingleTest { @Test fun testAwaitFirstOrNullWithValues() { - val observable = GlobalScope.rxObservable { + val observable = rxObservable { send((Observable.just("O", "#").awaitFirstOrNull() ?: "!") + "K") } @@ -117,7 +117,7 @@ class ObservableSingleTest { @Test fun testAwaitFirstOrElse() { - val observable = GlobalScope.rxObservable { + val observable = rxObservable { send(Observable.empty().awaitFirstOrElse { "O" } + "K") } @@ -128,7 +128,7 @@ class ObservableSingleTest { @Test fun testAwaitFirstOrElseWithValues() { - val observable = GlobalScope.rxObservable { + val observable = rxObservable { send(Observable.just("O", "#").awaitFirstOrElse { "!" } + "K") } @@ -139,7 +139,7 @@ class ObservableSingleTest { @Test fun testAwaitLast() { - val observable = GlobalScope.rxObservable { + val observable = rxObservable { send(Observable.just("#", "O").awaitLast() + "K") } @@ -150,7 +150,7 @@ class ObservableSingleTest { @Test fun testExceptionFromObservable() { - val observable = GlobalScope.rxObservable { + val observable = rxObservable { try { send(Observable.error(RuntimeException("O")).awaitFirst()) } catch (e: RuntimeException) { @@ -165,7 +165,7 @@ class ObservableSingleTest { @Test fun testExceptionFromCoroutine() { - val observable = GlobalScope.rxObservable { + val observable = rxObservable { error(Observable.just("O").awaitSingle() + "K") } @@ -177,7 +177,7 @@ class ObservableSingleTest { @Test fun testObservableIteration() { - val observable = GlobalScope.rxObservable { + val observable = rxObservable { var result = "" Observable.just("O", "K").collect { result += it } send(result) @@ -190,7 +190,7 @@ class ObservableSingleTest { @Test fun testObservableIterationFailure() { - val observable = GlobalScope.rxObservable { + val observable = rxObservable { try { Observable.error(RuntimeException("OK")).collect { fail("Should not be here") } send("Fail") diff --git a/reactive/kotlinx-coroutines-rx2/test/SingleTest.kt b/reactive/kotlinx-coroutines-rx2/test/SingleTest.kt index fce772347b..9251149375 100644 --- a/reactive/kotlinx-coroutines-rx2/test/SingleTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/SingleTest.kt @@ -201,7 +201,7 @@ class SingleTest : TestBase() { @Test fun testFatalExceptionInSubscribe() = runTest { - GlobalScope.rxSingle(Dispatchers.Unconfined + CoroutineExceptionHandler { _, e -> assertTrue(e is LinkageError); expect(2) }) { + rxSingle(Dispatchers.Unconfined + CoroutineExceptionHandler { _, e -> assertTrue(e is LinkageError); expect(2) }) { expect(1) 42 }.subscribe(Consumer { @@ -212,7 +212,7 @@ class SingleTest : TestBase() { @Test fun testFatalExceptionInSingle() = runTest { - GlobalScope.rxSingle(Dispatchers.Unconfined) { + rxSingle(Dispatchers.Unconfined) { throw LinkageError() }.subscribe({ _, e -> assertTrue(e is LinkageError); expect(1) })