Skip to content

Commit b733f7d

Browse files
committed
Speedup linerizability tests, make conflation part of helpClose (fixes nightly stress failure)
1 parent 69c15b3 commit b733f7d

File tree

5 files changed

+17
-31
lines changed

5 files changed

+17
-31
lines changed

common/kotlinx-coroutines-core-common/src/channels/AbstractChannel.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -257,13 +257,11 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
257257
if (!closeAdded) {
258258
val actualClosed = queue.prevNode as Closed<*>
259259
helpClose(actualClosed)
260-
onClosedIdempotent(actualClosed)
261260
return false
262261
}
263262

264263
helpClose(closed)
265264
invokeOnCloseHandler(cause)
266-
onClosedIdempotent(closed)
267265
return true
268266
}
269267

@@ -325,6 +323,8 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
325323
previous as Receive<E> // type assertion
326324
previous.resumeReceiveClosed(closed)
327325
}
326+
327+
onClosedIdempotent(closed)
328328
}
329329

330330
/**

common/kotlinx-coroutines-core-common/src/channels/ConflatedChannel.kt

+12-3
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,17 @@ internal open class ConflatedChannel<E> : AbstractChannel<E>() {
3838
val sendResult = sendConflated(element)
3939
when (sendResult) {
4040
null -> return OFFER_SUCCESS
41-
is Closed<*> -> return sendResult
41+
is Closed<*> -> {
42+
conflatePreviousSendBuffered(sendResult)
43+
return sendResult
44+
}
4245
}
4346
// otherwise there was receiver in queue, retry super.offerInternal
4447
}
45-
result is Closed<*> -> return result
48+
result is Closed<*> -> {
49+
conflatePreviousSendBuffered(result)
50+
return result
51+
}
4652
else -> error("Invalid offerInternal result $result")
4753
}
4854
}
@@ -58,7 +64,10 @@ internal open class ConflatedChannel<E> : AbstractChannel<E>() {
5864
result === ALREADY_SELECTED -> return ALREADY_SELECTED
5965
result === OFFER_SUCCESS -> return OFFER_SUCCESS
6066
result === OFFER_FAILED -> {} // retry
61-
result is Closed<*> -> return result
67+
result is Closed<*> -> {
68+
conflatePreviousSendBuffered(result)
69+
return result
70+
}
6271
else -> error("Invalid result $result")
6372
}
6473
}

core/kotlinx-coroutines-core/test/TestBase.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ private val VERBOSE = systemProp("test.verbose", false)
3535
@Suppress("DEPRECATION")
3636
public actual open class TestBase actual constructor() {
3737
/**
38-
* Is `true` when nightly stress test is done.
38+
* Is `true` when running in a nightly stress test mode.
3939
*/
4040
public actual val isStressTest = System.getProperty("stressTest") != null
4141

core/kotlinx-coroutines-core/test/linearizability/ChannelIsClosedLinearizabilityTest.kt

-17
Original file line numberDiff line numberDiff line change
@@ -51,21 +51,4 @@ class ChannelIsClosedLinearizabilityTest : TestBase() {
5151

5252
LinChecker.check(ChannelIsClosedLinearizabilityTest::class.java, options)
5353
}
54-
55-
override fun equals(other: Any?): Boolean {
56-
if (this === other) return true
57-
if (javaClass != other?.javaClass) return false
58-
59-
other as ChannelIsClosedLinearizabilityTest
60-
61-
if (channel != other.channel) return false
62-
63-
return true
64-
}
65-
66-
override fun hashCode(): Int {
67-
return channel.hashCode()
68-
}
69-
70-
7154
}

core/kotlinx-coroutines-core/test/linearizability/ChannelLinearizabilityTest.kt

+2-8
Original file line numberDiff line numberDiff line change
@@ -31,18 +31,12 @@ class ChannelLinearizabilityTest : TestBase() {
3131
@Operation(runOnce = true)
3232
fun send2(@Param(name = "value") value: Int) = lt.run("send2") { channel.send(value) }
3333

34-
@Operation(runOnce = true)
35-
fun send3(@Param(name = "value") value: Int) = lt.run("send3") { channel.send(value) }
36-
3734
@Operation(runOnce = true)
3835
fun receive1() = lt.run("receive1") { channel.receive() }
3936

4037
@Operation(runOnce = true)
4138
fun receive2() = lt.run("receive2") { channel.receive() }
4239

43-
@Operation(runOnce = true)
44-
fun receive3() = lt.run("receive3") { channel.receive() }
45-
4640
@Operation(runOnce = true)
4741
fun close1() = lt.run("close1") { channel.close(IOException("close1")) }
4842

@@ -70,8 +64,8 @@ class ChannelLinearizabilityTest : TestBase() {
7064
private fun runTest(capacity: Int) {
7165
ChannelLinearizabilityTest.capacity = capacity
7266
val options = StressOptions()
73-
.iterations(100 * stressTestMultiplierSqrt)
74-
.invocationsPerIteration(1000 * stressTestMultiplierSqrt)
67+
.iterations(50 * stressTestMultiplierSqrt)
68+
.invocationsPerIteration(500 * stressTestMultiplierSqrt)
7569
.threads(3)
7670
.verifier(LinVerifier::class.java)
7771
LinChecker.check(ChannelLinearizabilityTest::class.java, options)

0 commit comments

Comments
 (0)