Skip to content

Commit 2e87880

Browse files
committed
Flow onEmpty (#1904)
* Introduce Flow.onEmpty operator Fixes #1890
1 parent c1f8834 commit 2e87880

File tree

5 files changed

+156
-5
lines changed

5 files changed

+156
-5
lines changed

kotlinx-coroutines-core/api/kotlinx-coroutines-core.api

+1
Original file line numberDiff line numberDiff line change
@@ -942,6 +942,7 @@ public final class kotlinx/coroutines/flow/FlowKt {
942942
public static final synthetic fun onCompletion (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
943943
public static final fun onCompletion (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
944944
public static final fun onEach (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
945+
public static final fun onEmpty (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
945946
public static final fun onErrorCollect (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
946947
public static synthetic fun onErrorCollect$default (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
947948
public static final fun onErrorResume (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;

kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt

+35-4
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,9 @@ internal inline fun <T, R> Flow<T>.unsafeTransform(
5656
}
5757

5858
/**
59-
* Invokes the given [action] when the this flow starts to be collected.
59+
* Invokes the given [action] when this flow starts to be collected.
6060
*
61-
* The receiver of the [action] is [FlowCollector] and thus `onStart` can emit additional elements.
61+
* The receiver of the [action] is [FlowCollector], so `onStart` can emit additional elements.
6262
* For example:
6363
*
6464
* ```
@@ -67,7 +67,7 @@ internal inline fun <T, R> Flow<T>.unsafeTransform(
6767
* .collect { println(it) } // prints Begin, a, b, c
6868
* ```
6969
*/
70-
@ExperimentalCoroutinesApi // tentatively stable in 1.3.0
70+
@ExperimentalCoroutinesApi
7171
public fun <T> Flow<T>.onStart(
7272
action: suspend FlowCollector<T>.() -> Unit
7373
): Flow<T> = unsafeFlow { // Note: unsafe flow is used here, but safe collector is used to invoke start action
@@ -129,7 +129,7 @@ public fun <T> Flow<T>.onStart(
129129
* .collect { println(it) } // prints a, b, c, Done
130130
* ```
131131
*/
132-
@ExperimentalCoroutinesApi // tentatively stable in 1.3.0
132+
@ExperimentalCoroutinesApi
133133
public fun <T> Flow<T>.onCompletion(
134134
action: suspend FlowCollector<T>.(cause: Throwable?) -> Unit
135135
): Flow<T> = unsafeFlow { // Note: unsafe flow is used here, but safe collector is used to invoke completion action
@@ -155,6 +155,37 @@ public fun <T> Flow<T>.onCompletion(
155155
exception?.let { throw it }
156156
}
157157

158+
/**
159+
* Invokes the given [action] when this flow completes without emitting any elements.
160+
* The receiver of the [action] is [FlowCollector], so `onEmpty` can emit additional elements.
161+
* For example:
162+
*
163+
* ```
164+
* emptyFlow<Int>().onEmpty {
165+
* emit(1)
166+
* emit(2)
167+
* }.collect { println(it) } // prints 1, 2
168+
* ```
169+
*/
170+
@ExperimentalCoroutinesApi
171+
public fun <T> Flow<T>.onEmpty(
172+
action: suspend FlowCollector<T>.() -> Unit
173+
): Flow<T> = unsafeFlow {
174+
var isEmpty = true
175+
collect {
176+
isEmpty = false
177+
emit(it)
178+
}
179+
if (isEmpty) {
180+
val collector = SafeCollector(this, coroutineContext)
181+
try {
182+
collector.action()
183+
} finally {
184+
collector.releaseIntercepted()
185+
}
186+
}
187+
}
188+
158189
private class ThrowingCollector(private val e: Throwable) : FlowCollector<Any?> {
159190
override suspend fun emit(value: Any?) {
160191
throw e

kotlinx-coroutines-core/common/test/flow/operators/OnCompletionTest.kt

+19
Original file line numberDiff line numberDiff line change
@@ -259,4 +259,23 @@ class OnCompletionTest : TestBase() {
259259
assertEquals(42, value)
260260
finish(2)
261261
}
262+
263+
@Test
264+
fun testTransparencyViolation() = runTest {
265+
val flow = emptyFlow<Int>().onCompletion {
266+
expect(2)
267+
coroutineScope {
268+
launch {
269+
try {
270+
emit(1)
271+
} catch (e: IllegalStateException) {
272+
expect(3)
273+
}
274+
}
275+
}
276+
}
277+
expect(1)
278+
assertNull(flow.singleOrNull())
279+
finish(4)
280+
}
262281
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.flow.operators
6+
7+
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.flow.*
9+
import kotlin.test.*
10+
11+
class OnEmptyTest : TestBase() {
12+
13+
@Test
14+
fun testOnEmptyInvoked() = runTest {
15+
val flow = emptyFlow<Int>().onEmpty { emit(1) }
16+
assertEquals(1, flow.single())
17+
}
18+
19+
@Test
20+
fun testOnEmptyNotInvoked() = runTest {
21+
val flow = flowOf(1).onEmpty { emit(2) }
22+
assertEquals(1, flow.single())
23+
}
24+
25+
@Test
26+
fun testOnEmptyNotInvokedOnError() = runTest {
27+
val flow = flow<Int> {
28+
throw TestException()
29+
}.onEmpty { expectUnreached() }
30+
assertFailsWith<TestException>(flow)
31+
}
32+
33+
@Test
34+
fun testOnEmptyNotInvokedOnCancellation() = runTest {
35+
val flow = flow<Int> {
36+
expect(2)
37+
hang { expect(4) }
38+
}.onEmpty { expectUnreached() }
39+
40+
expect(1)
41+
val job = flow.onEach { expectUnreached() }.launchIn(this)
42+
yield()
43+
expect(3)
44+
job.cancelAndJoin()
45+
finish(5)
46+
}
47+
48+
@Test
49+
fun testOnEmptyCancellation() = runTest {
50+
val flow = emptyFlow<Int>().onEmpty {
51+
expect(2)
52+
hang { expect(4) }
53+
emit(1)
54+
}
55+
expect(1)
56+
val job = flow.onEach { expectUnreached() }.launchIn(this)
57+
yield()
58+
expect(3)
59+
job.cancelAndJoin()
60+
finish(5)
61+
}
62+
63+
@Test
64+
fun testTransparencyViolation() = runTest {
65+
val flow = emptyFlow<Int>().onEmpty {
66+
expect(2)
67+
coroutineScope {
68+
launch {
69+
try {
70+
emit(1)
71+
} catch (e: IllegalStateException) {
72+
expect(3)
73+
}
74+
}
75+
}
76+
}
77+
expect(1)
78+
assertNull(flow.singleOrNull())
79+
finish(4)
80+
}
81+
}

kotlinx-coroutines-core/common/test/flow/operators/OnStartTest.kt

+20-1
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,23 @@ class OnStartTest : TestBase() {
1414
.onStart { emit("Begin") }
1515
assertEquals(listOf("Begin", "a", "b", "c"), flow.toList())
1616
}
17-
}
17+
18+
@Test
19+
fun testTransparencyViolation() = runTest {
20+
val flow = emptyFlow<Int>().onStart {
21+
expect(2)
22+
coroutineScope {
23+
launch {
24+
try {
25+
emit(1)
26+
} catch (e: IllegalStateException) {
27+
expect(3)
28+
}
29+
}
30+
}
31+
}
32+
expect(1)
33+
assertNull(flow.singleOrNull())
34+
finish(4)
35+
}
36+
}

0 commit comments

Comments
 (0)