Skip to content

Commit 7bd983f

Browse files
qwwdfsadelizarov
authored andcommitted
Fix linearizability in AbstractChannel#sendSuspend, add infrastructure to inject custom execution into linearizability checker
1 parent df80002 commit 7bd983f

File tree

4 files changed

+104
-0
lines changed

4 files changed

+104
-0
lines changed

common/kotlinx-coroutines-core-common/src/channels/AbstractChannel.kt

+2
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,7 @@ public abstract class AbstractSendChannel<E> : SendChannel<E> {
192192
return@sc
193193
}
194194
is Closed<*> -> {
195+
helpClose(enqueueResult)
195196
cont.resumeWithException(enqueueResult.sendException)
196197
return@sc
197198
}
@@ -205,6 +206,7 @@ public abstract class AbstractSendChannel<E> : SendChannel<E> {
205206
}
206207
offerResult === OFFER_FAILED -> continue@loop
207208
offerResult is Closed<*> -> {
209+
helpClose(offerResult)
208210
cont.resumeWithException(offerResult.sendException)
209211
return@sc
210212
}

core/kotlinx-coroutines-core/test/channels/ChannelIsClosedLinearizabilityTest.kt

+11
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,17 @@ class ChannelIsClosedLinearizabilityTest : TestBase() {
5353
.addThread(1, 3)
5454
.addThread(1, 3)
5555
.verifier(LinVerifier::class.java)
56+
.injectExecution { actors, methods ->
57+
actors[0].add(actorMethod(methods, "receive1"))
58+
actors[0].add(actorMethod(methods, "receive2"))
59+
actors[0].add(actorMethod(methods, "close1"))
60+
61+
actors[1].add(actorMethod(methods, "send2"))
62+
actors[1].add(actorMethod(methods, "send1"))
63+
64+
actors[2].add(actorMethod(methods, "isClosedForSend"))
65+
}
66+
5667
LinChecker.check(ChannelIsClosedLinearizabilityTest::class.java, options)
5768
}
5869
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.experimental.channels
6+
7+
import com.devexperts.dxlab.lincheck.*
8+
import com.devexperts.dxlab.lincheck.execution.*
9+
import java.util.*
10+
11+
typealias ExecutionBuilder = (List<MutableList<Actor>>, List<ActorGenerator>) -> Unit
12+
13+
/**
14+
* Example of usage:
15+
*
16+
* ```
17+
* StressOptions().injectExecution { actors, methods ->
18+
* actors[0].add(actorMethod(methods, "receive1"))
19+
* actors[0].add(actorMethod(methods, "receive2"))
20+
*
21+
* actors[1].add(actorMethod(methods, "send2"))
22+
* actors[1].add(actorMethod(methods, "send1"))
23+
* }
24+
*
25+
* ```
26+
*
27+
* Will produce
28+
* ```
29+
* Actors per thread:
30+
* [receive1(), receive2()]
31+
* [send2(), send1()]
32+
* ```
33+
* at the first iteration.
34+
*
35+
* DSL will be improved when this method will be used frequently
36+
*/
37+
fun Options<*, *>.injectExecution(behaviourBuilder: ExecutionBuilder): Options<*, *> {
38+
injectedBehaviour.add(behaviourBuilder)
39+
executionGenerator(FixedBehaviourInjectorExecutionGenerator::class.java)
40+
return this
41+
}
42+
43+
fun actorMethod(generators: List<ActorGenerator>, name: String): Actor =
44+
generators.find { it.generate().method.name.contains(name) }?.generate() ?: error("Actor method $name is not found in ${generators.map { it.generate().method.name }}")
45+
46+
private val injectedBehaviour: Queue<ExecutionBuilder> = ArrayDeque<ExecutionBuilder>()
47+
48+
class FixedBehaviourInjectorExecutionGenerator(testConfiguration: CTestConfiguration, testStructure: CTestStructure)
49+
: ExecutionGenerator(testConfiguration, testStructure) {
50+
51+
private val randomGenerator = RandomExecutionGenerator(testConfiguration, testStructure)
52+
53+
override fun nextExecution(): List<List<Actor>> {
54+
val injector = injectedBehaviour.poll()
55+
if (injector != null) {
56+
val parallelGroup = ArrayList(testStructure.actorGenerators)
57+
val actorsPerThread = ArrayList<MutableList<Actor>>()
58+
for (i in testConfiguration.threadConfigurations.indices) {
59+
actorsPerThread.add(ArrayList())
60+
}
61+
62+
injector.invoke(actorsPerThread, parallelGroup)
63+
return actorsPerThread
64+
}
65+
66+
return randomGenerator.nextExecution()
67+
}
68+
}
69+
70+
// Ad-hoc fixed execution injection for lin-checker
71+
class FixedBehaviourExecutionGenerator(testConfiguration: CTestConfiguration, testStructure: CTestStructure)
72+
: ExecutionGenerator(testConfiguration, testStructure) {
73+
74+
override fun nextExecution(): List<List<Actor>> {
75+
val parallelGroup = ArrayList(testStructure.actorGenerators)
76+
val actorsPerThread = ArrayList<MutableList<Actor>>()
77+
for (i in testConfiguration.threadConfigurations.indices) {
78+
actorsPerThread.add(ArrayList())
79+
}
80+
81+
82+
actorsPerThread[0].add(actorMethod(parallelGroup, "receive1"))
83+
actorsPerThread[0].add(actorMethod(parallelGroup, "receive2"))
84+
actorsPerThread[0].add(actorMethod(parallelGroup, "close1"))
85+
86+
actorsPerThread[1].add(actorMethod(parallelGroup, "send2"))
87+
actorsPerThread[1].add(actorMethod(parallelGroup, "send1"))
88+
89+
return actorsPerThread
90+
}
91+
}

0 commit comments

Comments
 (0)