Skip to content

Commit 0cc349d

Browse files
committed
Remove deprecated methods from the Java Flow integration module
The `kotlinx-coroutines-jdk9` module, being a copy of `kotlinx-coroutines-reactive`, has its share of legacy APIs defined. As this is a new module, there is no reason to preserve them. The changes include: * `Migration.kt`, being a file completely dedicated to warnings about old API usage, has been removed. * `IntegrationTest.kt` changed slightly so that it no longer uses the subscription channel API, which is deprecated. * `Channel.kt` is now just an implementation detail. It does not expose any public methods. * In particular, `Publisher<T>.collect` has been moved to `ReactiveFlow.kt` and is no longer inline, as it would expose `openSubscription`, which is deprecated. * `PublisherSubscriptionSelectTest`, which tests use of `select` with the subscription channel API, is not included anymore. * `Convert.kt` also has been removed altogether, having no non-deprecated methods.
1 parent 99f0b2b commit 0cc349d

File tree

7 files changed

+12
-158
lines changed

7 files changed

+12
-158
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-jdk9.txt

+1-18
Original file line numberDiff line numberDiff line change
@@ -7,28 +7,10 @@ public final class kotlinx/coroutines/jdk9/AwaitKt {
77
public static final fun awaitSingle (Ljava/util/concurrent/Flow$Publisher;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
88
}
99

10-
public final class kotlinx/coroutines/jdk9/ChannelKt {
11-
public static final fun collect (Ljava/util/concurrent/Flow$Publisher;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
12-
public static final fun consumeEach (Ljava/util/concurrent/Flow$Publisher;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
13-
public static final fun openSubscription (Ljava/util/concurrent/Flow$Publisher;I)Lkotlinx/coroutines/channels/ReceiveChannel;
14-
public static synthetic fun openSubscription$default (Ljava/util/concurrent/Flow$Publisher;IILjava/lang/Object;)Lkotlinx/coroutines/channels/ReceiveChannel;
15-
}
16-
1710
public abstract interface class kotlinx/coroutines/jdk9/ContextInjector {
1811
public abstract fun injectCoroutineContext (Ljava/util/concurrent/Flow$Publisher;Lkotlin/coroutines/CoroutineContext;)Ljava/util/concurrent/Flow$Publisher;
1912
}
2013

21-
public final class kotlinx/coroutines/jdk9/ConvertKt {
22-
public static final fun asPublisher (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/CoroutineContext;)Ljava/util/concurrent/Flow$Publisher;
23-
public static synthetic fun asPublisher$default (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Ljava/util/concurrent/Flow$Publisher;
24-
}
25-
26-
public final class kotlinx/coroutines/jdk9/FlowKt {
27-
public static final fun asFlow (Ljava/util/concurrent/Flow$Publisher;)Lkotlinx/coroutines/flow/Flow;
28-
public static final fun asFlow (Ljava/util/concurrent/Flow$Publisher;I)Lkotlinx/coroutines/flow/Flow;
29-
public static final fun asPublisher (Lkotlinx/coroutines/flow/Flow;)Ljava/util/concurrent/Flow$Publisher;
30-
}
31-
3214
public final class kotlinx/coroutines/jdk9/FlowSubscription : kotlinx/coroutines/AbstractCoroutine, java/util/concurrent/Flow$Subscription {
3315
public final field flow Lkotlinx/coroutines/flow/Flow;
3416
public final field subscriber Ljava/util/concurrent/Flow$Subscriber;
@@ -65,5 +47,6 @@ public final class kotlinx/coroutines/jdk9/PublisherCoroutine : kotlinx/coroutin
6547
public final class kotlinx/coroutines/jdk9/ReactiveFlowKt {
6648
public static final fun asFlow (Ljava/util/concurrent/Flow$Publisher;)Lkotlinx/coroutines/flow/Flow;
6749
public static final fun asPublisher (Lkotlinx/coroutines/flow/Flow;)Ljava/util/concurrent/Flow$Publisher;
50+
public static final fun collect (Ljava/util/concurrent/Flow$Publisher;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
6851
}
6952

reactive/kotlinx-coroutines-jdk9/src/Channel.kt

+1-16
Original file line numberDiff line numberDiff line change
@@ -24,27 +24,12 @@ import java.util.concurrent.Flow.*
2424
* }
2525
* ```
2626
*/
27-
@Deprecated(
28-
message = "Transforming publisher to channel is deprecated, use asFlow() instead",
29-
level = DeprecationLevel.WARNING) // Will be error in 1.4
30-
public fun <T> Publisher<T>.openSubscription(request: Int = 1): ReceiveChannel<T> {
27+
internal fun <T> Publisher<T>.openSubscription(request: Int = 1): ReceiveChannel<T> {
3128
val channel = SubscriptionChannel<T>(request)
3229
subscribe(channel)
3330
return channel
3431
}
3532

36-
// Will be promoted to error in 1.3.0, removed in 1.4.0
37-
@Deprecated(message = "Use collect instead", level = DeprecationLevel.ERROR, replaceWith = ReplaceWith("this.collect(action)"))
38-
public suspend inline fun <T> Publisher<T>.consumeEach(action: (T) -> Unit) =
39-
openSubscription().consumeEach(action)
40-
41-
/**
42-
* Subscribes to this [Publisher] and performs the specified action for each received element.
43-
* Cancels subscription if any exception happens during collect.
44-
*/
45-
public suspend inline fun <T> Publisher<T>.collect(action: (T) -> Unit) =
46-
openSubscription().consumeEach(action)
47-
4833
@Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER", "SubscriberImplementation")
4934
private class SubscriptionChannel<T>(
5035
private val request: Int

reactive/kotlinx-coroutines-jdk9/src/Convert.kt

-24
This file was deleted.

reactive/kotlinx-coroutines-jdk9/src/Migration.kt

-36
This file was deleted.

reactive/kotlinx-coroutines-jdk9/src/ReactiveFlow.kt

+7
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,13 @@ public fun <T : Any> Publisher<T>.asFlow(): Flow<T> =
3737
*/
3838
public fun <T : Any> Flow<T>.asPublisher(): Publisher<T> = FlowAsPublisher(this)
3939

40+
/**
41+
* Subscribes to this [Publisher] and performs the specified action for each received element.
42+
* Cancels subscription if any exception happens during collect.
43+
*/
44+
public suspend fun <T> Publisher<T>.collect(action: (T) -> Unit) =
45+
openSubscription().consumeEach(action)
46+
4047
private class PublisherAsFlow<T : Any>(
4148
private val publisher: Publisher<T>,
4249
context: CoroutineContext = EmptyCoroutineContext,

reactive/kotlinx-coroutines-jdk9/test/IntegrationTest.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package kotlinx.coroutines.jdk9
66

77
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.flow.flowOn
89
import org.hamcrest.MatcherAssert.*
910
import org.hamcrest.core.*
1011
import org.junit.*
@@ -90,9 +91,8 @@ class IntegrationTest(
9091
assertThat(pub.awaitFirstOrElse { 0 }, IsEqual(1))
9192
assertIAE { pub.awaitSingle() }
9293
checkNumbers(n, pub)
93-
val channel = pub.openSubscription()
94-
checkNumbers(n, channel.asPublisher(ctx(coroutineContext)))
95-
channel.cancel()
94+
val flow = pub.asFlow()
95+
checkNumbers(n, flow.flowOn(ctx(coroutineContext)).asPublisher())
9696
}
9797

9898
@Test

reactive/kotlinx-coroutines-jdk9/test/PublisherSubscriptionSelectTest.kt

-61
This file was deleted.

0 commit comments

Comments
 (0)