Skip to content

Commit d196082

Browse files
committed
~zip leftovers
1 parent e4c859d commit d196082

File tree

2 files changed

+37
-37
lines changed

2 files changed

+37
-37
lines changed

kotlinx-coroutines-core/common/src/flow/internal/Combine.kt

+3-4
Original file line numberDiff line numberDiff line change
@@ -76,10 +76,10 @@ internal fun <T1, T2, R> zipImpl(flow: Flow<T1>, flow2: Flow<T2>, transform: sus
7676
* Invariant: this clause is invoked only when all elements from the channel were processed (=> rendezvous restriction).
7777
*/
7878
val collectJob = Job()
79-
val scopeJob = currentCoroutineContext()[Job]!!
79+
val scopeJob = currentCoroutineContext()[Job]!! // TODO replace with extension when #2245 is here
8080
(second as SendChannel<*>).invokeOnClose {
8181
// Optimization to avoid AFE allocation when the other flow is done
82-
if (!collectJob.isActive) collectJob.cancel(AbortFlowException(this@unsafeFlow))
82+
if (collectJob.isActive) collectJob.cancel(AbortFlowException(this@unsafeFlow))
8383
}
8484

8585
val newContext = coroutineContext + scopeJob
@@ -99,11 +99,10 @@ internal fun <T1, T2, R> zipImpl(flow: Flow<T1>, flow2: Flow<T2>, transform: sus
9999
*/
100100
withContextUndispatched( coroutineContext + collectJob) {
101101
flow.collect { value ->
102-
val otherValue = second.receiveOrNull() ?: return@collect
103102
withContextUndispatched(newContext, cnt) {
103+
val otherValue = second.receiveOrNull() ?: throw AbortFlowException(this@unsafeFlow)
104104
emit(transform(getNull().unbox(value), getNull().unbox(otherValue)))
105105
}
106-
ensureActive()
107106
}
108107
}
109108
} catch (e: AbortFlowException) {

kotlinx-coroutines-core/common/test/flow/operators/ZipTest.kt

+34-33
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,6 @@ import kotlin.test.*
1313
*/
1414
class ZipTest : TestBase() {
1515

16-
internal fun <T1, T2, R> Flow<T1>.zip(flow2: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> {
17-
return zipImpl(this, flow2, transform)
18-
}
19-
2016
@Test
2117
fun testZip() = runTest {
2218
val f1 = flowOf("a", "b", "c")
@@ -28,7 +24,7 @@ class ZipTest : TestBase() {
2824
fun testUnevenZip() = runTest {
2925
val f1 = flowOf("a", "b", "c", "d", "e")
3026
val f2 = flowOf(1, 2, 3)
31-
assertEquals(listOf("a1", "b2", "c3"), f1.zip(f2, { i, j -> i + j }).toList())
27+
assertEquals(listOf("a1", "b2", "c3"), f1.zip(f2) { i, j -> i + j }.toList())
3228
assertEquals(listOf("a1", "b2", "c3"), f2.zip(f1) { i, j -> j + i }.toList())
3329
}
3430

@@ -83,6 +79,24 @@ class ZipTest : TestBase() {
8379
finish(1)
8480
}
8581

82+
@Test
83+
fun testCancelWhenFlowIsDone2() = runTest {
84+
val f1 = flow<String> {
85+
emit("1")
86+
emit("2")
87+
try {
88+
emit("3")
89+
expectUnreached()
90+
} finally {
91+
expect(1)
92+
}
93+
}
94+
95+
val f2 = flowOf("a", "b")
96+
assertEquals(listOf("1a", "2b"), f1.zip(f2) { s1, s2 -> s1 + s2 }.toList())
97+
finish(2)
98+
}
99+
86100
@Test
87101
fun testCancelWhenFlowIsDoneReversed() = runTest {
88102
val f1 = flow<String> {
@@ -93,7 +107,7 @@ class ZipTest : TestBase() {
93107
}
94108
}
95109

96-
val f2 =flow<String> {
110+
val f2 = flow<String> {
97111
emit("a")
98112
emit("b")
99113
yield()
@@ -135,19 +149,19 @@ class ZipTest : TestBase() {
135149
finish(6)
136150
}
137151

138-
// @Test
152+
@Test
139153
fun testErrorInDownstreamCancelsUpstream() = runTest {
140154
val f1 = flow {
141155
emit("a")
142156
hang {
143-
expect(2)
157+
expect(3)
144158
}
145159
}.flowOn(NamedDispatchers("first"))
146160

147161
val f2 = flow {
148162
emit(1)
149163
hang {
150-
expect(3)
164+
expect(2)
151165
}
152166
}.flowOn(NamedDispatchers("second"))
153167

@@ -227,31 +241,18 @@ class ZipTest : TestBase() {
227241
finish(6)
228242
}
229243

230-
private fun numbers(limit: Long = Long.MAX_VALUE) = flow {
231-
for (i in 2L..limit) emit(i)
232-
}
233-
234244
@Test
235-
fun zip() = runTest {
236-
val numbers = numbers(1000)
237-
val first = numbers
238-
.filter { it % 2L != 0L }
239-
.map { it * it }
240-
val second = numbers
241-
.filter { it % 2L == 0L }
242-
.map { it * it }
243-
first.zip(second) { v1, v2 -> v1 + v2 }.filter { it % 3 == 0L }.count()
244-
}
245+
fun testCancellationOfCollector() = runTest {
246+
val f1 = flow {
247+
emit("1")
248+
awaitCancellation()
249+
}
245250

246-
@Test
247-
fun zip2() = runTest {
248-
val numbers = numbers(10000)
249-
val first = numbers
250-
.filter { it % 2L != 0L }
251-
.map { it * it }
252-
val second = numbers
253-
.filter { it % 2L == 0L }
254-
.map { it * it }
255-
first.zip(second) { v1, v2 -> v1 + v2 }.filter { it % 3 == 0L }.count()
251+
val f2 = flow {
252+
emit("2")
253+
yield()
254+
}
255+
256+
f1.zip(f2) { a, b -> a + b }.collect { }
256257
}
257258
}

0 commit comments

Comments
 (0)