Skip to content

Commit c1e85a9

Browse files
Stop using onReceiveOrClosed because it is extremely broken.
I think this fixes #626. Likely caused by Kotlin/kotlinx.coroutines#1584.
1 parent 6021294 commit c1e85a9

File tree

2 files changed

+43
-7
lines changed

2 files changed

+43
-7
lines changed

kotlin/workflow-runtime/src/main/java/com/squareup/workflow/internal/Workers.kt

+37-1
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,11 @@ import kotlinx.coroutines.ExperimentalCoroutinesApi
2121
import kotlinx.coroutines.FlowPreview
2222
import kotlinx.coroutines.channels.Channel.Factory.RENDEZVOUS
2323
import kotlinx.coroutines.channels.ReceiveChannel
24+
import kotlinx.coroutines.flow.Flow
2425
import kotlinx.coroutines.flow.buffer
2526
import kotlinx.coroutines.flow.catch
27+
import kotlinx.coroutines.flow.collect
28+
import kotlinx.coroutines.flow.flow
2629
import kotlinx.coroutines.flow.produceIn
2730

2831
/**
@@ -31,8 +34,9 @@ import kotlinx.coroutines.flow.produceIn
3134
* will emit everything from the worker. The channel will be closed when the flow completes.
3235
*/
3336
@UseExperimental(FlowPreview::class, ExperimentalCoroutinesApi::class)
34-
internal fun <T> CoroutineScope.launchWorker(worker: Worker<T>): ReceiveChannel<T> =
37+
internal fun <T> CoroutineScope.launchWorker(worker: Worker<T>): ReceiveChannel<ValueOrDone<T>> =
3538
worker.run()
39+
.transformToValueOrDone()
3640
.catch { e ->
3741
// Workers that failed (as opposed to just cancelled) should have their failure reason
3842
// re-thrown from the workflow runtime. If we don't unwrap the cause here, they'll just
@@ -42,3 +46,35 @@ internal fun <T> CoroutineScope.launchWorker(worker: Worker<T>): ReceiveChannel<
4246
}
4347
.buffer(RENDEZVOUS)
4448
.produceIn(this)
49+
50+
/**
51+
* Pretend we can use ReceiveChannel.onReceiveOrClosed.
52+
*
53+
* See https://github.com/Kotlin/kotlinx.coroutines/issues/1584 and
54+
* https://github.com/square/workflow/issues/626.
55+
*/
56+
internal class ValueOrDone<out T> private constructor(private val _value: Any?) {
57+
58+
val isDone: Boolean get() = this === Done
59+
60+
@Suppress("UNCHECKED_CAST")
61+
val value: T
62+
get() {
63+
check(!isDone)
64+
return _value as T
65+
}
66+
67+
companion object {
68+
private val Done = ValueOrDone<Nothing>(null)
69+
70+
fun <T> value(value: T): ValueOrDone<T> = ValueOrDone(value)
71+
fun done(): ValueOrDone<Nothing> = Done
72+
}
73+
}
74+
75+
private fun <T> Flow<T>.transformToValueOrDone(): Flow<ValueOrDone<T>> = flow {
76+
collect {
77+
emit(ValueOrDone.value(it))
78+
}
79+
emit(ValueOrDone.done())
80+
}

kotlin/workflow-runtime/src/main/java/com/squareup/workflow/internal/WorkflowNode.kt

+6-6
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ internal class WorkflowNode<PropsT, StateT, OutputT : Any, RenderingT>(
6060
* continues to listen to the worker after it finishes.
6161
*/
6262
private class WorkerSession(
63-
val channel: ReceiveChannel<*>,
63+
val channel: ReceiveChannel<ValueOrDone<*>>,
6464
var tombstone: Boolean = false
6565
)
6666

@@ -143,8 +143,8 @@ internal class WorkflowNode<PropsT, StateT, OutputT : Any, RenderingT>(
143143
.filter { (_, session) -> !session.tombstone }
144144
.forEach { (case, session) ->
145145
with(selector) {
146-
session.channel.onReceiveOrClosed { valueOrClosed ->
147-
if (valueOrClosed.isClosed) {
146+
session.channel.onReceive { valueOrDone ->
147+
if (valueOrDone.isDone) {
148148
// Set the tombstone flag so we don't continue to listen to the subscription.
149149
session.tombstone = true
150150
// Nothing to do on close other than update the session, so don't emit any output.
@@ -156,10 +156,10 @@ internal class WorkflowNode<PropsT, StateT, OutputT : Any, RenderingT>(
156156
)
157157
)
158158
)
159-
return@onReceiveOrClosed OutputEnvelope(null, debugInfo)
159+
return@onReceive OutputEnvelope(null, debugInfo)
160160
} else {
161-
val update = case.acceptUpdate(valueOrClosed.value)
162-
acceptUpdate(update, Kind.Updated(Source.Worker(case.key, valueOrClosed.value!!)))
161+
val update = case.acceptUpdate(valueOrDone.value)
162+
acceptUpdate(update, Kind.Updated(Source.Worker(case.key, valueOrDone.value!!)))
163163
}
164164
}
165165
}

0 commit comments

Comments
 (0)