Skip to content

Commit b9b999c

Browse files
committed
Make unsafeFlow less explicit, return preview status to AbstractFlow
1 parent dbf365c commit b9b999c

File tree

6 files changed

+29
-27
lines changed

6 files changed

+29
-27
lines changed

kotlinx-coroutines-core/common/src/flow/Builders.kt

+12-24
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import kotlinx.coroutines.*
1111
import kotlinx.coroutines.channels.*
1212
import kotlinx.coroutines.channels.Channel.Factory.BUFFERED
1313
import kotlinx.coroutines.flow.internal.*
14+
import kotlinx.coroutines.flow.internal.unsafeFlow as flow
1415
import kotlin.coroutines.*
1516
import kotlin.jvm.*
1617

@@ -53,24 +54,11 @@ private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit
5354
}
5455
}
5556

56-
/**
57-
* An analogue of the [flow] builder that does not check the context of execution of the resulting flow.
58-
* Used in our own operators where we trust the context of invocations.
59-
*/
60-
@PublishedApi
61-
internal inline fun <T> unsafeFlow(@BuilderInference crossinline block: suspend FlowCollector<T>.() -> Unit): Flow<T> {
62-
return object : Flow<T> {
63-
override suspend fun collect(collector: FlowCollector<T>) {
64-
collector.block()
65-
}
66-
}
67-
}
68-
6957
/**
7058
* Creates a flow that produces a single value from the given functional type.
7159
*/
7260
@FlowPreview
73-
public fun <T> (() -> T).asFlow(): Flow<T> = unsafeFlow {
61+
public fun <T> (() -> T).asFlow(): Flow<T> = flow {
7462
emit(invoke())
7563
}
7664

@@ -83,14 +71,14 @@ public fun <T> (() -> T).asFlow(): Flow<T> = unsafeFlow {
8371
* ```
8472
*/
8573
@FlowPreview
86-
public fun <T> (suspend () -> T).asFlow(): Flow<T> = unsafeFlow {
74+
public fun <T> (suspend () -> T).asFlow(): Flow<T> = flow {
8775
emit(invoke())
8876
}
8977

9078
/**
9179
* Creates a flow that produces values from the given iterable.
9280
*/
93-
public fun <T> Iterable<T>.asFlow(): Flow<T> = unsafeFlow {
81+
public fun <T> Iterable<T>.asFlow(): Flow<T> = flow {
9482
forEach { value ->
9583
emit(value)
9684
}
@@ -99,7 +87,7 @@ public fun <T> Iterable<T>.asFlow(): Flow<T> = unsafeFlow {
9987
/**
10088
* Creates a flow that produces values from the given iterable.
10189
*/
102-
public fun <T> Iterator<T>.asFlow(): Flow<T> = unsafeFlow {
90+
public fun <T> Iterator<T>.asFlow(): Flow<T> = flow {
10391
forEach { value ->
10492
emit(value)
10593
}
@@ -108,7 +96,7 @@ public fun <T> Iterator<T>.asFlow(): Flow<T> = unsafeFlow {
10896
/**
10997
* Creates a flow that produces values from the given sequence.
11098
*/
111-
public fun <T> Sequence<T>.asFlow(): Flow<T> = unsafeFlow {
99+
public fun <T> Sequence<T>.asFlow(): Flow<T> = flow {
112100
forEach { value ->
113101
emit(value)
114102
}
@@ -117,7 +105,7 @@ public fun <T> Sequence<T>.asFlow(): Flow<T> = unsafeFlow {
117105
/**
118106
* Creates a flow that produces values from the given array of elements.
119107
*/
120-
public fun <T> flowOf(vararg elements: T): Flow<T> = unsafeFlow {
108+
public fun <T> flowOf(vararg elements: T): Flow<T> = flow {
121109
for (element in elements) {
122110
emit(element)
123111
}
@@ -126,7 +114,7 @@ public fun <T> flowOf(vararg elements: T): Flow<T> = unsafeFlow {
126114
/**
127115
* Creates flow that produces a given [value].
128116
*/
129-
public fun <T> flowOf(value: T): Flow<T> = unsafeFlow {
117+
public fun <T> flowOf(value: T): Flow<T> = flow {
130118
/*
131119
* Implementation note: this is just an "optimized" overload of flowOf(vararg)
132120
* which significantly reduce the footprint of widespread single-value flows.
@@ -146,7 +134,7 @@ private object EmptyFlow : Flow<Nothing> {
146134
/**
147135
* Creates a flow that produces values from the given array.
148136
*/
149-
public fun <T> Array<T>.asFlow(): Flow<T> = unsafeFlow {
137+
public fun <T> Array<T>.asFlow(): Flow<T> = flow {
150138
forEach { value ->
151139
emit(value)
152140
}
@@ -155,7 +143,7 @@ public fun <T> Array<T>.asFlow(): Flow<T> = unsafeFlow {
155143
/**
156144
* Creates flow that produces values from the given array.
157145
*/
158-
public fun IntArray.asFlow(): Flow<Int> = unsafeFlow {
146+
public fun IntArray.asFlow(): Flow<Int> = flow {
159147
forEach { value ->
160148
emit(value)
161149
}
@@ -164,7 +152,7 @@ public fun IntArray.asFlow(): Flow<Int> = unsafeFlow {
164152
/**
165153
* Creates flow that produces values from the given array.
166154
*/
167-
public fun LongArray.asFlow(): Flow<Long> = unsafeFlow {
155+
public fun LongArray.asFlow(): Flow<Long> = flow {
168156
forEach { value ->
169157
emit(value)
170158
}
@@ -173,7 +161,7 @@ public fun LongArray.asFlow(): Flow<Long> = unsafeFlow {
173161
/**
174162
* Creates flow that produces values from the given range.
175163
*/
176-
public fun IntRange.asFlow(): Flow<Int> = unsafeFlow {
164+
public fun IntRange.asFlow(): Flow<Int> = flow {
177165
forEach { value ->
178166
emit(value)
179167
}

kotlinx-coroutines-core/common/src/flow/Flow.kt

+1
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,7 @@ public interface Flow<out T> {
194194
* }
195195
* ```
196196
*/
197+
@FlowPreview
197198
public abstract class AbstractFlow<T> : Flow<T> {
198199

199200
@InternalCoroutinesApi

kotlinx-coroutines-core/common/src/flow/internal/FlowCoroutine.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import kotlinx.coroutines.internal.*
1111
import kotlinx.coroutines.intrinsics.*
1212
import kotlin.coroutines.*
1313
import kotlin.coroutines.intrinsics.*
14-
import kotlinx.coroutines.flow.unsafeFlow as flow
14+
import kotlinx.coroutines.flow.internal.unsafeFlow as flow
1515

1616
/**
1717
* Creates a [CoroutineScope] and calls the specified suspend block with this scope.

kotlinx-coroutines-core/common/src/flow/internal/SafeCollector.kt

+13-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import kotlinx.coroutines.flow.*
99
import kotlinx.coroutines.internal.*
1010
import kotlin.coroutines.*
1111

12-
@PublishedApi
1312
internal class SafeCollector<T>(
1413
private val collector: FlowCollector<T>,
1514
private val collectContext: CoroutineContext
@@ -99,3 +98,16 @@ internal class SafeCollector<T>(
9998
return parent.transitiveCoroutineParent(collectJob)
10099
}
101100
}
101+
102+
/**
103+
* An analogue of the [flow] builder that does not check the context of execution of the resulting flow.
104+
* Used in our own operators where we trust the context of invocations.
105+
*/
106+
@PublishedApi
107+
internal inline fun <T> unsafeFlow(@BuilderInference crossinline block: suspend FlowCollector<T>.() -> Unit): Flow<T> {
108+
return object : Flow<T> {
109+
override suspend fun collect(collector: FlowCollector<T>) {
110+
collector.block()
111+
}
112+
}
113+
}

kotlinx-coroutines-core/common/src/flow/operators/Merge.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import kotlinx.coroutines.internal.*
1616
import kotlinx.coroutines.sync.*
1717
import kotlin.coroutines.*
1818
import kotlin.jvm.*
19-
import kotlinx.coroutines.flow.unsafeFlow as flow
19+
import kotlinx.coroutines.flow.internal.unsafeFlow as flow
2020

2121
/**
2222
* Name of the property that defines the value of [DEFAULT_CONCURRENCY].

kotlinx-coroutines-core/jvm/test/flow/FlatMapStressTest.kt

+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package kotlinx.coroutines.flow
66

77
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.flow.internal.*
89
import kotlinx.coroutines.scheduling.*
910
import org.junit.Assume.*
1011
import org.junit.Test

0 commit comments

Comments
 (0)