-
Notifications
You must be signed in to change notification settings - Fork 1.9k
/
Copy pathFlowAsPublisher.kt
103 lines (87 loc) · 3.19 KB
/
FlowAsPublisher.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.reactive.flow
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import org.reactivestreams.*
import java.util.concurrent.atomic.*
import kotlin.coroutines.*
/**
* Transforms the given flow to a spec-compliant [Publisher].
*/
@JvmName("from")
@ExperimentalCoroutinesApi
public fun <T : Any> Flow<T>.asPublisher(): Publisher<T> = FlowAsPublisher(this)
/**
* Adapter that transforms [Flow] into TCK-complaint [Publisher].
* [cancel] invocation cancels the original flow.
*/
@Suppress("PublisherImplementation")
private class FlowAsPublisher<T : Any>(private val flow: Flow<T>) : Publisher<T> {
override fun subscribe(subscriber: Subscriber<in T>?) {
if (subscriber == null) throw NullPointerException()
subscriber.onSubscribe(FlowSubscription(flow, subscriber))
}
private class FlowSubscription<T>(val flow: Flow<T>, val subscriber: Subscriber<in T>) : Subscription {
@Volatile
internal var canceled: Boolean = false
private val requested = AtomicLong(0L)
private val producer: AtomicReference<CancellableContinuation<Unit>?> = AtomicReference()
// This is actually optimizable
private var job = GlobalScope.launch(Dispatchers.Unconfined, start = CoroutineStart.LAZY) {
try {
consumeFlow()
subscriber.onComplete()
} catch (e: Throwable) {
// Failed with real exception, not due to cancellation
if (!coroutineContext[Job]!!.isCancelled) {
subscriber.onError(e)
}
}
}
private suspend fun consumeFlow() {
flow.collect { value ->
if (!coroutineContext.isActive) {
subscriber.onComplete()
coroutineContext.ensureActive()
}
if (requested.get() == 0L) {
suspendCancellableCoroutine<Unit> {
producer.set(it)
if (requested.get() != 0L) it.resumeSafely()
}
}
requested.decrementAndGet()
subscriber.onNext(value)
}
}
override fun cancel() {
canceled = true
job.cancel()
}
override fun request(n: Long) {
if (n <= 0) {
return
}
if (canceled) return
job.start()
var snapshot: Long
var newValue: Long
do {
snapshot = requested.get()
newValue = snapshot + n
if (newValue <= 0L) newValue = Long.MAX_VALUE
} while (!requested.compareAndSet(snapshot, newValue))
val prev = producer.get()
if (prev == null || !producer.compareAndSet(prev, null)) return
prev.resumeSafely()
}
private fun CancellableContinuation<Unit>.resumeSafely() {
val token = tryResume(Unit)
if (token != null) {
completeResume(token)
}
}
}
}