15
15
*/
16
16
package com.squareup.workflow
17
17
18
- import com.squareup.workflow.debugging.WorkflowDebugInfo
19
18
import com.squareup.workflow.internal.RealWorkflowLoop
20
19
import com.squareup.workflow.internal.WorkflowLoop
21
20
import com.squareup.workflow.internal.unwrapCancellationCause
@@ -140,7 +139,12 @@ fun <PropsT, StateT, OutputT : Any, RenderingT, RunnerT> launchWorkflowForTestFr
140
139
beforeStart = beforeStart
141
140
)
142
141
143
- @UseExperimental(ExperimentalCoroutinesApi ::class , FlowPreview ::class )
142
+ @UseExperimental(
143
+ ExperimentalCoroutinesApi ::class ,
144
+ FlowPreview ::class ,
145
+ VeryExperimentalWorkflow ::class
146
+ )
147
+ @Suppress(" LongParameterList" )
144
148
internal fun <PropsT , StateT , OutputT : Any , RenderingT , RunnerT > launchWorkflowImpl (
145
149
scope : CoroutineScope ,
146
150
workflowLoop : WorkflowLoop ,
@@ -152,28 +156,31 @@ internal fun <PropsT, StateT, OutputT : Any, RenderingT, RunnerT> launchWorkflow
152
156
): RunnerT {
153
157
val renderingsAndSnapshots = ConflatedBroadcastChannel <RenderingAndSnapshot <RenderingT >>()
154
158
val outputs = BroadcastChannel <OutputT >(capacity = 1 )
155
- val debugSnapshots = ConflatedBroadcastChannel <WorkflowDebugInfo >()
156
159
val workflowScope = scope + Job (parent = scope.coroutineContext[Job ])
157
160
158
161
// Give the caller a chance to start collecting outputs.
159
- val session = WorkflowSession (
160
- renderingsAndSnapshots.asFlow(),
161
- outputs.asFlow(),
162
- debugSnapshots.asFlow()
163
- )
162
+ val session = WorkflowSession (renderingsAndSnapshots.asFlow(), outputs.asFlow())
164
163
val result = beforeStart(workflowScope, session)
164
+ val visitor = session.diagnosticListener
165
165
166
166
val workflowJob = workflowScope.launch {
167
- // Run the workflow processing loop forever, or until it fails or is cancelled.
168
- workflowLoop.runWorkflowLoop(
169
- workflow,
170
- props,
171
- initialSnapshot = initialSnapshot,
172
- initialState = initialState,
173
- onRendering = renderingsAndSnapshots::send,
174
- onOutput = outputs::send,
175
- onDebugSnapshot = debugSnapshots::send
176
- )
167
+ visitor?.onRuntimeStarted(this )
168
+ try {
169
+ // Run the workflow processing loop forever, or until it fails or is cancelled.
170
+ workflowLoop.runWorkflowLoop(
171
+ workflow,
172
+ props,
173
+ initialSnapshot = initialSnapshot,
174
+ initialState = initialState,
175
+ onRendering = renderingsAndSnapshots::send,
176
+ onOutput = outputs::send,
177
+ diagnosticListener = visitor
178
+ )
179
+ } finally {
180
+ // Only emit the runtime stopped debug event after all child coroutines have completed.
181
+ // coroutineScope does an implicit join on all its children.
182
+ visitor?.onRuntimeStopped()
183
+ }
177
184
}
178
185
179
186
// Ensure we close the channels when we're done, so that they propagate errors.
@@ -183,7 +190,6 @@ internal fun <PropsT, StateT, OutputT : Any, RenderingT, RunnerT> launchWorkflow
183
190
val realCause = cause?.unwrapCancellationCause()
184
191
renderingsAndSnapshots.close(realCause)
185
192
outputs.close(realCause)
186
- debugSnapshots.close(realCause)
187
193
188
194
// If the cancellation came from inside the workflow loop, the outer runtime scope needs to be
189
195
// explicitly cancelled. See https://github.com/square/workflow/issues/464.
0 commit comments