-
Notifications
You must be signed in to change notification settings - Fork 1.9k
/
Copy pathPublisherAsFlow.kt
140 lines (120 loc) · 4.95 KB
/
PublisherAsFlow.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
@file:JvmMultifileClass
@file:JvmName("FlowKt")
package kotlinx.coroutines.reactive
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.flow.internal.*
import org.reactivestreams.*
import java.util.*
import kotlin.coroutines.*
/**
* Transforms the given reactive [Publisher] into [Flow].
* Use [buffer] operator on the resulting flow to specify the size of the backpressure.
* More precisely, to it specifies the value of the subscription's [request][Subscription.request].
* `1` is used by default.
*
* If any of the resulting flow transformations fails, subscription is immediately cancelled and all in-flights elements
* are discarded.
*/
@ExperimentalCoroutinesApi
public fun <T : Any> Publisher<T>.asFlow(): Flow<T> =
PublisherAsFlow(this, 1)
@FlowPreview
@Deprecated(
message = "batchSize parameter is deprecated, use .buffer() instead to control the backpressure",
level = DeprecationLevel.ERROR,
replaceWith = ReplaceWith("asFlow().buffer(batchSize)", imports = ["kotlinx.coroutines.flow.*"])
)
public fun <T : Any> Publisher<T>.asFlow(batchSize: Int): Flow<T> = asFlow().buffer(batchSize)
private class PublisherAsFlow<T : Any>(
private val publisher: Publisher<T>,
capacity: Int
) : ChannelFlow<T>(EmptyCoroutineContext, capacity) {
override fun create(context: CoroutineContext, capacity: Int): ChannelFlow<T> =
PublisherAsFlow(publisher, capacity)
override fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> {
// use another channel for conflation (cannot do openSubscription)
if (capacity < 0) return super.produceImpl(scope)
// Open subscription channel directly
val channel = publisher
.injectCoroutineContext(scope.coroutineContext)
.openSubscription(capacity)
val handle = scope.coroutineContext[Job]?.invokeOnCompletion(onCancelling = true) { cause ->
channel.cancel(cause?.let {
it as? CancellationException ?: CancellationException("Job was cancelled", it)
})
}
if (handle != null && handle !== NonDisposableHandle) {
(channel as SendChannel<*>).invokeOnClose {
handle.dispose()
}
}
return channel
}
private val requestSize: Long
get() = when (capacity) {
Channel.CONFLATED -> Long.MAX_VALUE // request all and conflate incoming
Channel.RENDEZVOUS -> 1L // need to request at least one anyway
Channel.UNLIMITED -> Long.MAX_VALUE // reactive streams way to say "give all" must be Long.MAX_VALUE
else -> capacity.toLong().also { check(it >= 1) }
}
override suspend fun collect(collector: FlowCollector<T>) {
val subscriber = ReactiveSubscriber<T>(capacity, requestSize)
publisher.injectCoroutineContext(coroutineContext).subscribe(subscriber)
try {
var consumed = 0L
while (true) {
val value = subscriber.takeNextOrNull() ?: break
collector.emit(value)
if (++consumed == requestSize) {
consumed = 0L
subscriber.makeRequest()
}
}
} finally {
subscriber.cancel()
}
}
// The second channel here is used only for broadcast
override suspend fun collectTo(scope: ProducerScope<T>) =
collect(SendingCollector(scope.channel))
}
@Suppress("SubscriberImplementation")
private class ReactiveSubscriber<T : Any>(
capacity: Int,
private val requestSize: Long
) : Subscriber<T> {
private lateinit var subscription: Subscription
private val channel = Channel<T>(capacity)
suspend fun takeNextOrNull(): T? = channel.receiveOrNull()
override fun onNext(value: T) {
// Controlled by requestSize
require(channel.offer(value)) { "Element $value was not added to channel because it was full, $channel" }
}
override fun onComplete() {
channel.close()
}
override fun onError(t: Throwable?) {
channel.close(t)
}
override fun onSubscribe(s: Subscription) {
subscription = s
makeRequest()
}
fun makeRequest() {
subscription.request(requestSize)
}
fun cancel() {
subscription.cancel()
}
}
// ContextInjector service is implemented in `kotlinx-coroutines-reactor` module only.
// If `kotlinx-coroutines-reactor` module is not included, the list is empty.
private val contextInjectors: List<ContextInjector> =
ServiceLoader.load(ContextInjector::class.java, ContextInjector::class.java.classLoader).toList()
private fun <T> Publisher<T>.injectCoroutineContext(coroutineContext: CoroutineContext) =
contextInjectors.fold(this) { pub, contextInjector -> contextInjector.injectCoroutineContext(pub, coroutineContext) }