Skip to content

Commit ba5b343

Browse files
committed
Introduce Flow.onEmpty operator
Fixes #1890
1 parent 6e66695 commit ba5b343

File tree

3 files changed

+115
-0
lines changed

3 files changed

+115
-0
lines changed

kotlinx-coroutines-core/api/kotlinx-coroutines-core.api

+1
Original file line numberDiff line numberDiff line change
@@ -942,6 +942,7 @@ public final class kotlinx/coroutines/flow/FlowKt {
942942
public static final synthetic fun onCompletion (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
943943
public static final fun onCompletion (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
944944
public static final fun onEach (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
945+
public static final fun onEmpty (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
945946
public static final fun onErrorCollect (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
946947
public static synthetic fun onErrorCollect$default (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
947948
public static final fun onErrorResume (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;

kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt

+33
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,39 @@ public fun <T> Flow<T>.onCompletion(
155155
exception?.let { throw it }
156156
}
157157

158+
/**
159+
* Invokes the given [action] when this flow completes without emitting any elements.
160+
* The receiver of the [action] is [FlowCollector], so `onEmpty` can emit additional elements.
161+
*
162+
* For example:
163+
*
164+
* ```
165+
* emptyFlow<Int>().onEmpty {
166+
* emit(1)
167+
* emit(2)
168+
* }
169+
* .collect { println(it) } // prints 1, 2
170+
* ```
171+
*/
172+
@ExperimentalCoroutinesApi
173+
public fun <T> Flow<T>.onEmpty(
174+
action: suspend FlowCollector<T>.() -> Unit
175+
): Flow<T> = unsafeFlow {
176+
var isEmpty = true
177+
collect {
178+
isEmpty = false
179+
emit(it)
180+
}
181+
if (isEmpty) {
182+
val collector = SafeCollector(this, coroutineContext)
183+
try {
184+
collector.action()
185+
} finally {
186+
collector.releaseIntercepted()
187+
}
188+
}
189+
}
190+
158191
private class ThrowingCollector(private val e: Throwable) : FlowCollector<Any?> {
159192
override suspend fun emit(value: Any?) {
160193
throw e
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.flow.operators
6+
7+
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.flow.*
9+
import kotlin.test.*
10+
11+
class OnEmptyTest : TestBase() {
12+
13+
@Test
14+
fun testOnEmptyInvoked() = runTest {
15+
val flow = emptyFlow<Int>().onEmpty { emit(1) }
16+
assertEquals(1, flow.single())
17+
}
18+
19+
@Test
20+
fun testOnEmptyNotInvoked() = runTest {
21+
val flow = flowOf(1).onEmpty { emit(2) }
22+
assertEquals(1, flow.single())
23+
}
24+
25+
@Test
26+
fun testOnEmptyNotInvokedOnError() = runTest {
27+
val flow = flow<Int> {
28+
throw TestException()
29+
}.onEmpty { expectUnreached() }
30+
assertFailsWith<TestException>(flow)
31+
}
32+
33+
@Test
34+
fun testOnEmptyNotInvokedOnCancellation() = runTest {
35+
val flow = flow<Int> {
36+
expect(2)
37+
hang { expect(4) }
38+
}.onEmpty { expectUnreached() }
39+
40+
expect(1)
41+
val job = flow.onEach { expectUnreached() }.launchIn(this)
42+
yield()
43+
expect(3)
44+
job.cancelAndJoin()
45+
finish(5)
46+
}
47+
48+
@Test
49+
fun testOnEmptyCancellation() = runTest {
50+
val flow = emptyFlow<Int>().onEmpty {
51+
expect(2)
52+
hang { expect(4) }
53+
emit(1)
54+
}
55+
expect(1)
56+
val job = flow.onEach { expectUnreached() }.launchIn(this)
57+
yield()
58+
expect(3)
59+
job.cancelAndJoin()
60+
finish(5)
61+
}
62+
63+
@Test
64+
fun testOnEmptyTransparencyViolation() = runTest {
65+
val flow = emptyFlow<Int>().onEmpty {
66+
expect(2)
67+
coroutineScope {
68+
launch {
69+
try {
70+
emit(1)
71+
} catch (e: IllegalStateException) {
72+
expect(3)
73+
}
74+
}
75+
}
76+
}
77+
expect(1)
78+
assertNull(flow.singleOrNull())
79+
finish(4)
80+
}
81+
}

0 commit comments

Comments
 (0)