Skip to content

Commit 69adf9e

Browse files
Stop using onReceiveOrClosed because it is extremely broken.
I think this fixes #626. Likely caused by Kotlin/kotlinx.coroutines#1584.
1 parent 1cdf4a7 commit 69adf9e

File tree

4 files changed

+102
-10
lines changed

4 files changed

+102
-10
lines changed

kotlin/workflow-runtime/src/main/java/com/squareup/workflow/internal/Workers.kt

+34-1
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,9 @@ internal fun <T> CoroutineScope.launchWorker(
4141
workerDiagnosticId: Long,
4242
workflowDiagnosticId: Long,
4343
diagnosticListener: WorkflowDiagnosticListener?
44-
): ReceiveChannel<T> = worker.run()
44+
): ReceiveChannel<ValueOrDone<T>> = worker.run()
4545
.wireUpDebugger(workerDiagnosticId, workflowDiagnosticId, diagnosticListener)
46+
.transformToValueOrDone()
4647
.catch { e ->
4748
// Workers that failed (as opposed to just cancelled) should have their failure reason
4849
// re-thrown from the workflow runtime. If we don't unwrap the cause here, they'll just
@@ -74,3 +75,35 @@ private fun <T> Flow<T>.wireUpDebugger(
7475
}
7576
}
7677
}
78+
79+
/**
80+
* Pretend we can use ReceiveChannel.onReceiveOrClosed.
81+
*
82+
* See https://github.com/Kotlin/kotlinx.coroutines/issues/1584 and
83+
* https://github.com/square/workflow/issues/626.
84+
*/
85+
internal class ValueOrDone<out T> private constructor(private val _value: Any?) {
86+
87+
val isDone: Boolean get() = this === Done
88+
89+
@Suppress("UNCHECKED_CAST")
90+
val value: T
91+
get() {
92+
check(!isDone)
93+
return _value as T
94+
}
95+
96+
companion object {
97+
private val Done = ValueOrDone<Nothing>(null)
98+
99+
fun <T> value(value: T): ValueOrDone<T> = ValueOrDone(value)
100+
fun done(): ValueOrDone<Nothing> = Done
101+
}
102+
}
103+
104+
private fun <T> Flow<T>.transformToValueOrDone(): Flow<ValueOrDone<T>> = flow {
105+
collect {
106+
emit(ValueOrDone.value(it))
107+
}
108+
emit(ValueOrDone.done())
109+
}

kotlin/workflow-runtime/src/main/java/com/squareup/workflow/internal/WorkflowNode.kt

+5-5
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ internal class WorkflowNode<PropsT, StateT, OutputT : Any, RenderingT>(
6464
* continues to listen to the worker after it finishes.
6565
*/
6666
private class WorkerSession(
67-
val channel: ReceiveChannel<*>,
67+
val channel: ReceiveChannel<ValueOrDone<*>>,
6868
var tombstone: Boolean = false
6969
)
7070

@@ -182,14 +182,14 @@ internal class WorkflowNode<PropsT, StateT, OutputT : Any, RenderingT>(
182182
.filter { (_, session) -> !session.tombstone }
183183
.forEach { (case, session) ->
184184
with(selector) {
185-
session.channel.onReceiveOrClosed { valueOrClosed ->
186-
if (valueOrClosed.isClosed) {
185+
session.channel.onReceive { valueOrDone ->
186+
if (valueOrDone.isDone) {
187187
// Set the tombstone flag so we don't continue to listen to the subscription.
188188
session.tombstone = true
189189
// Nothing to do on close other than update the session, so don't emit any output.
190-
return@onReceiveOrClosed null
190+
return@onReceive null
191191
} else {
192-
val update = case.acceptUpdate(valueOrClosed.value)
192+
val update = case.acceptUpdate(valueOrDone.value)
193193
acceptUpdate(update)
194194
}
195195
}

kotlin/workflow-runtime/src/test/java/com/squareup/workflow/internal/WorkersTest.kt

+63-2
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,12 @@ import kotlinx.coroutines.channels.Channel
2424
import kotlinx.coroutines.flow.emitAll
2525
import kotlinx.coroutines.launch
2626
import kotlinx.coroutines.runBlocking
27+
import kotlinx.coroutines.supervisorScope
2728
import kotlinx.coroutines.yield
2829
import java.util.concurrent.atomic.AtomicInteger
2930
import kotlin.test.Test
3031
import kotlin.test.assertEquals
32+
import kotlin.test.assertFailsWith
3133
import kotlin.test.assertTrue
3234

3335
class WorkersTest {
@@ -58,11 +60,11 @@ class WorkersTest {
5860
yield()
5961
assertEquals(1, counter.getAndIncrement())
6062

61-
assertEquals("a", workerOutputs.poll())
63+
assertEquals("a", workerOutputs.poll()!!.value)
6264
yield()
6365
assertEquals(3, counter.getAndIncrement())
6466

65-
assertEquals("b", workerOutputs.poll())
67+
assertEquals("b", workerOutputs.poll()!!.value)
6668
yield()
6769
assertEquals(6, counter.getAndIncrement())
6870

@@ -94,6 +96,65 @@ class WorkersTest {
9496
yield()
9597

9698
assertEquals("onWorkerStopped(0, 1)", listener.consumeNextEvent())
99+
// Read the last event so the scope can complete.
100+
assertTrue(outputs.receive().isDone)
97101
}
98102
}
103+
104+
@Test fun `emits done when complete immediately`() {
105+
val channel = Channel<String>(capacity = 1)
106+
107+
runBlocking {
108+
val workerOutputs = launchWorker(channel.asWorker(), 0, 0, null)
109+
assertTrue(workerOutputs.isEmpty)
110+
111+
channel.close()
112+
assertTrue(workerOutputs.receive().isDone)
113+
}
114+
}
115+
116+
@Test fun `emits done when complete after sending`() {
117+
val channel = Channel<String>(capacity = 1)
118+
119+
runBlocking {
120+
val workerOutputs = launchWorker(channel.asWorker(), 0, 0, null)
121+
assertTrue(workerOutputs.isEmpty)
122+
123+
channel.send("foo")
124+
assertEquals("foo", workerOutputs.receive().value)
125+
126+
channel.close()
127+
assertTrue(workerOutputs.receive().isDone)
128+
}
129+
}
130+
131+
@Test fun `does not emit done when failed`() {
132+
val channel = Channel<String>(capacity = 1)
133+
134+
runBlocking {
135+
// Needed so that cancelling the channel doesn't cancel our job, which means receive will
136+
// throw the JobCancellationException instead of the actual channel failure.
137+
supervisorScope {
138+
val workerOutputs = launchWorker(channel.asWorker(), 0, 0, null)
139+
assertTrue(workerOutputs.isEmpty)
140+
141+
channel.close(ExpectedException())
142+
assertFailsWith<ExpectedException> { workerOutputs.receive() }
143+
}
144+
}
145+
}
146+
147+
@Test fun `completes after emitting done`() {
148+
val channel = Channel<String>(capacity = 1)
149+
150+
runBlocking {
151+
val workerOutputs = launchWorker(channel.asWorker(), 0, 0, null)
152+
channel.close()
153+
assertTrue(workerOutputs.receive().isDone)
154+
155+
assertTrue(channel.isClosedForReceive)
156+
}
157+
}
158+
159+
private class ExpectedException : RuntimeException()
99160
}

kotlin/workflow-testing/src/test/java/com/squareup/workflow/WorkerStressTest.kt

-2
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,11 @@ import kotlinx.coroutines.flow.onCompletion
1212
import kotlinx.coroutines.flow.reduce
1313
import kotlinx.coroutines.flow.take
1414
import kotlinx.coroutines.runBlocking
15-
import kotlin.test.Ignore
1615
import kotlin.test.Test
1716
import kotlin.test.assertEquals
1817

1918
class WorkerStressTest {
2019

21-
@Ignore("https://github.com/square/workflow/issues/626")
2220
@UseExperimental(ExperimentalCoroutinesApi::class)
2321
@Test fun `multiple subscriptions to single channel when closed`() {
2422
val channel = Channel<Unit>()

0 commit comments

Comments
 (0)