Skip to content

Commit f5cc366

Browse files
Add worker stress test that fails with #626.
1 parent 6021294 commit f5cc366

File tree

1 file changed

+80
-0
lines changed

1 file changed

+80
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
package com.squareup.workflow
2+
3+
import com.squareup.workflow.WorkflowAction.Companion.noAction
4+
import kotlinx.coroutines.ExperimentalCoroutinesApi
5+
import kotlinx.coroutines.cancelChildren
6+
import kotlinx.coroutines.channels.Channel
7+
import kotlinx.coroutines.channels.ConflatedBroadcastChannel
8+
import kotlinx.coroutines.delay
9+
import kotlinx.coroutines.flow.collect
10+
import kotlinx.coroutines.flow.flowOf
11+
import kotlinx.coroutines.flow.onCompletion
12+
import kotlinx.coroutines.flow.reduce
13+
import kotlinx.coroutines.flow.take
14+
import kotlinx.coroutines.runBlocking
15+
import kotlin.test.Ignore
16+
import kotlin.test.Test
17+
import kotlin.test.assertEquals
18+
19+
class WorkerStressTest {
20+
21+
@Ignore("https://github.com/square/workflow/issues/626")
22+
@UseExperimental(ExperimentalCoroutinesApi::class)
23+
@Test fun `multiple subscriptions to single channel when closed`() {
24+
val channel = Channel<Unit>()
25+
val workers = List(100) { channel.asWorker() }
26+
val finishedWorkers = List(100) {
27+
channel.asWorker()
28+
.transform { it.onCompletion { emit(Unit) } }
29+
}
30+
val action = WorkflowAction<Nothing, Unit> { Unit }
31+
val workflow = Workflow.stateless<Unit, Unit, Unit> {
32+
// Run lots of workers that will all see the same close event.
33+
workers.forEachIndexed { i, worker ->
34+
runningWorker(worker, key = i.toString()) { noAction() }
35+
}
36+
finishedWorkers.forEachIndexed { i, worker ->
37+
runningWorker(worker, key = "finished $i") { action }
38+
}
39+
}
40+
41+
runBlocking {
42+
val outputs = launchWorkflowIn(this, workflow, flowOf(Unit)) { it.outputs }
43+
44+
// This should just work, and the test will finish, but this is broken by
45+
// https://github.com/Kotlin/kotlinx.coroutines/issues/1584 and will crash instead if
46+
// receiveOrClosed is used.
47+
channel.close()
48+
delay(1)
49+
50+
outputs.take(100)
51+
.collect()
52+
53+
// Cancel the runtime so the test can finish.
54+
coroutineContext.cancelChildren()
55+
}
56+
}
57+
58+
@UseExperimental(ExperimentalCoroutinesApi::class)
59+
@Test fun `multiple subscriptions to single channel when emits`() {
60+
val channel = ConflatedBroadcastChannel(Unit)
61+
val workers = List(100) { channel.asWorker() }
62+
val action = WorkflowAction<Nothing, Int> { 1 }
63+
val workflow = Workflow.stateless<Unit, Int, Unit> {
64+
// Run lots of workers that will all see the same conflated channel value.
65+
workers.forEachIndexed { i, worker ->
66+
runningWorker(worker, key = i.toString()) { action }
67+
}
68+
}
69+
70+
runBlocking {
71+
val outputs = launchWorkflowIn(this, workflow, flowOf(Unit)) { it.outputs }
72+
val sum = outputs.take(100)
73+
.reduce { sum, value -> sum + value }
74+
assertEquals(100, sum)
75+
76+
// Cancel the runtime so the test can finish.
77+
coroutineContext.cancelChildren()
78+
}
79+
}
80+
}

0 commit comments

Comments
 (0)