Skip to content

Commit 1d8b6be

Browse files
Stop using onReceiveOrClosed because it is extremely broken.
I think this fixes #626. Likely caused by Kotlin/kotlinx.coroutines#1584.
1 parent f5cc366 commit 1d8b6be

File tree

4 files changed

+105
-11
lines changed

4 files changed

+105
-11
lines changed

kotlin/workflow-runtime/src/main/java/com/squareup/workflow/internal/Workers.kt

+37-1
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,11 @@ import kotlinx.coroutines.ExperimentalCoroutinesApi
2121
import kotlinx.coroutines.FlowPreview
2222
import kotlinx.coroutines.channels.Channel.Factory.RENDEZVOUS
2323
import kotlinx.coroutines.channels.ReceiveChannel
24+
import kotlinx.coroutines.flow.Flow
2425
import kotlinx.coroutines.flow.buffer
2526
import kotlinx.coroutines.flow.catch
27+
import kotlinx.coroutines.flow.collect
28+
import kotlinx.coroutines.flow.flow
2629
import kotlinx.coroutines.flow.produceIn
2730

2831
/**
@@ -31,8 +34,9 @@ import kotlinx.coroutines.flow.produceIn
3134
* will emit everything from the worker. The channel will be closed when the flow completes.
3235
*/
3336
@UseExperimental(FlowPreview::class, ExperimentalCoroutinesApi::class)
34-
internal fun <T> CoroutineScope.launchWorker(worker: Worker<T>): ReceiveChannel<T> =
37+
internal fun <T> CoroutineScope.launchWorker(worker: Worker<T>): ReceiveChannel<ValueOrDone<T>> =
3538
worker.run()
39+
.transformToValueOrDone()
3640
.catch { e ->
3741
// Workers that failed (as opposed to just cancelled) should have their failure reason
3842
// re-thrown from the workflow runtime. If we don't unwrap the cause here, they'll just
@@ -42,3 +46,35 @@ internal fun <T> CoroutineScope.launchWorker(worker: Worker<T>): ReceiveChannel<
4246
}
4347
.buffer(RENDEZVOUS)
4448
.produceIn(this)
49+
50+
/**
51+
* Pretend we can use ReceiveChannel.onReceiveOrClosed.
52+
*
53+
* See https://github.com/Kotlin/kotlinx.coroutines/issues/1584 and
54+
* https://github.com/square/workflow/issues/626.
55+
*/
56+
internal class ValueOrDone<out T> private constructor(private val _value: Any?) {
57+
58+
val isDone: Boolean get() = this === Done
59+
60+
@Suppress("UNCHECKED_CAST")
61+
val value: T
62+
get() {
63+
check(!isDone)
64+
return _value as T
65+
}
66+
67+
companion object {
68+
private val Done = ValueOrDone<Nothing>(null)
69+
70+
fun <T> value(value: T): ValueOrDone<T> = ValueOrDone(value)
71+
fun done(): ValueOrDone<Nothing> = Done
72+
}
73+
}
74+
75+
private fun <T> Flow<T>.transformToValueOrDone(): Flow<ValueOrDone<T>> = flow {
76+
collect {
77+
emit(ValueOrDone.value(it))
78+
}
79+
emit(ValueOrDone.done())
80+
}

kotlin/workflow-runtime/src/main/java/com/squareup/workflow/internal/WorkflowNode.kt

+6-6
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ internal class WorkflowNode<PropsT, StateT, OutputT : Any, RenderingT>(
6060
* continues to listen to the worker after it finishes.
6161
*/
6262
private class WorkerSession(
63-
val channel: ReceiveChannel<*>,
63+
val channel: ReceiveChannel<ValueOrDone<*>>,
6464
var tombstone: Boolean = false
6565
)
6666

@@ -143,8 +143,8 @@ internal class WorkflowNode<PropsT, StateT, OutputT : Any, RenderingT>(
143143
.filter { (_, session) -> !session.tombstone }
144144
.forEach { (case, session) ->
145145
with(selector) {
146-
session.channel.onReceiveOrClosed { valueOrClosed ->
147-
if (valueOrClosed.isClosed) {
146+
session.channel.onReceive { valueOrDone ->
147+
if (valueOrDone.isDone) {
148148
// Set the tombstone flag so we don't continue to listen to the subscription.
149149
session.tombstone = true
150150
// Nothing to do on close other than update the session, so don't emit any output.
@@ -156,10 +156,10 @@ internal class WorkflowNode<PropsT, StateT, OutputT : Any, RenderingT>(
156156
)
157157
)
158158
)
159-
return@onReceiveOrClosed OutputEnvelope(null, debugInfo)
159+
return@onReceive OutputEnvelope(null, debugInfo)
160160
} else {
161-
val update = case.acceptUpdate(valueOrClosed.value)
162-
acceptUpdate(update, Kind.Updated(Source.Worker(case.key, valueOrClosed.value!!)))
161+
val update = case.acceptUpdate(valueOrDone.value)
162+
acceptUpdate(update, Kind.Updated(Source.Worker(case.key, valueOrDone.value!!)))
163163
}
164164
}
165165
}

kotlin/workflow-runtime/src/test/java/com/squareup/workflow/internal/WorkersTest.kt

+62-2
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,13 @@ import kotlinx.coroutines.CoroutineStart.UNDISPATCHED
2222
import kotlinx.coroutines.channels.Channel
2323
import kotlinx.coroutines.launch
2424
import kotlinx.coroutines.runBlocking
25+
import kotlinx.coroutines.supervisorScope
2526
import kotlinx.coroutines.yield
2627
import java.util.concurrent.atomic.AtomicInteger
2728
import kotlin.test.Test
2829
import kotlin.test.assertEquals
30+
import kotlin.test.assertFailsWith
31+
import kotlin.test.assertTrue
2932

3033
class WorkersTest {
3134

@@ -50,16 +53,73 @@ class WorkersTest {
5053
yield()
5154
assertEquals(1, counter.getAndIncrement())
5255

53-
assertEquals("a", workerOutputs.poll())
56+
assertEquals("a", workerOutputs.poll()!!.value)
5457
yield()
5558
assertEquals(3, counter.getAndIncrement())
5659

57-
assertEquals("b", workerOutputs.poll())
60+
assertEquals("b", workerOutputs.poll()!!.value)
5861
yield()
5962
assertEquals(6, counter.getAndIncrement())
6063

6164
// Cancel the worker so we can exit this loop.
6265
workerOutputs.cancel()
6366
}
6467
}
68+
69+
@Test fun `emits done when complete immediately`() {
70+
val channel = Channel<String>(capacity = 1)
71+
72+
runBlocking {
73+
val workerOutputs = launchWorker(channel.asWorker())
74+
assertTrue(workerOutputs.isEmpty)
75+
76+
channel.close()
77+
assertTrue(workerOutputs.receive().isDone)
78+
}
79+
}
80+
81+
@Test fun `emits done when complete after sending`() {
82+
val channel = Channel<String>(capacity = 1)
83+
84+
runBlocking {
85+
val workerOutputs = launchWorker(channel.asWorker())
86+
assertTrue(workerOutputs.isEmpty)
87+
88+
channel.send("foo")
89+
assertEquals("foo", workerOutputs.receive().value)
90+
91+
channel.close()
92+
assertTrue(workerOutputs.receive().isDone)
93+
}
94+
}
95+
96+
@Test fun `does not emit done when failed`() {
97+
val channel = Channel<String>(capacity = 1)
98+
99+
runBlocking {
100+
// Needed so that cancelling the channel doesn't cancel our job, which means receive will
101+
// throw the JobCancellationException instead of the actual channel failure.
102+
supervisorScope {
103+
val workerOutputs = launchWorker(channel.asWorker())
104+
assertTrue(workerOutputs.isEmpty)
105+
106+
channel.close(ExpectedException())
107+
assertFailsWith<ExpectedException> { workerOutputs.receive() }
108+
}
109+
}
110+
}
111+
112+
@Test fun `completes after emitting done`() {
113+
val channel = Channel<String>(capacity = 1)
114+
115+
runBlocking {
116+
val workerOutputs = launchWorker(channel.asWorker())
117+
channel.close()
118+
assertTrue(workerOutputs.receive().isDone)
119+
120+
assertTrue(channel.isClosedForReceive)
121+
}
122+
}
123+
124+
private class ExpectedException : RuntimeException()
65125
}

kotlin/workflow-testing/src/test/java/com/squareup/workflow/WorkerStressTest.kt

-2
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,11 @@ import kotlinx.coroutines.flow.onCompletion
1212
import kotlinx.coroutines.flow.reduce
1313
import kotlinx.coroutines.flow.take
1414
import kotlinx.coroutines.runBlocking
15-
import kotlin.test.Ignore
1615
import kotlin.test.Test
1716
import kotlin.test.assertEquals
1817

1918
class WorkerStressTest {
2019

21-
@Ignore("https://github.com/square/workflow/issues/626")
2220
@UseExperimental(ExperimentalCoroutinesApi::class)
2321
@Test fun `multiple subscriptions to single channel when closed`() {
2422
val channel = Channel<Unit>()

0 commit comments

Comments
 (0)