-
Notifications
You must be signed in to change notification settings - Fork 1.9k
/
Copy pathChannels.kt
85 lines (77 loc) · 3.23 KB
/
Channels.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
@file:JvmMultifileClass
@file:JvmName("FlowKt")
package kotlinx.coroutines.flow
import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.internal.*
import kotlin.jvm.*
import kotlinx.coroutines.flow.unsafeFlow as flow
/**
* Represents the given receive channel as a hot flow and [consumes][ReceiveChannel.consume] the channel
* on the first collection from this flow. The resulting flow can be collected just once and throws
* [IllegalStateException] when trying to collect it more than once.
*
* ### Cancellation semantics
* 1) Flow consumer is cancelled when the original channel is cancelled.
* 2) Flow consumer completes normally when the original channel completes (~is closed) normally.
* 3) If the flow consumer fails with an exception, channel is cancelled.
*
*/
@FlowPreview
public fun <T> ReceiveChannel<T>.consumeAsFlow(): Flow<T> = object : Flow<T> {
val collected = atomic(false)
override suspend fun collect(collector: FlowCollector<T>) {
check(!collected.getAndSet(true)) { "ReceiveChannel.consumeAsFlow can be collected just once" }
[email protected](collector)
}
}
/**
* Represents the given broadcast channel as a hot flow.
* Every flow collector will trigger a new broadcast channel subscription.
*
* ### Cancellation semantics
* 1) Flow consumer is cancelled when the original channel is cancelled.
* 2) Flow consumer completes normally when the original channel completes (~is closed) normally.
* 3) If the flow consumer fails with an exception, subscription is cancelled.
*/
@FlowPreview
public fun <T> BroadcastChannel<T>.asFlow(): Flow<T> = flow {
openSubscription().consumeEachTo(this)
}
/**
* Creates a [broadcast] coroutine that collects the given flow.
*
* This transformation is **stateful**, it launches a [broadcast] coroutine
* that collects the given flow and thus resulting channel should be properly closed or cancelled.
*
* A channel with [default][Channel.Factory.BUFFERED] buffer size is created.
* Use [buffer] operator on the flow before calling `produce` to specify a value other than
* default and to control what happens when data is produced faster than it is consumed,
* that is to control backpressure behavior.
*/
@FlowPreview
public fun <T> Flow<T>.broadcastIn(
scope: CoroutineScope,
start: CoroutineStart = CoroutineStart.LAZY
): BroadcastChannel<T> =
asChannelFlow().broadcastImpl(scope, start)
/**
* Creates a [produce] coroutine that collects the given flow.
*
* This transformation is **stateful**, it launches a [produce] coroutine
* that collects the given flow and thus resulting channel should be properly closed or cancelled.
*
* A channel with [default][Channel.Factory.BUFFERED] buffer size is created.
* Use [buffer] operator on the flow before calling `produce` to specify a value other than
* default and to control what happens when data is produced faster than it is consumed,
* that is to control backpressure behavior.
*/
@FlowPreview
public fun <T> Flow<T>.produceIn(
scope: CoroutineScope
): ReceiveChannel<T> =
asChannelFlow().produceImpl(scope)