1
+ /*
2
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3
+ */
4
+
5
+ package kotlinx.coroutines.channels
6
+
7
+ import kotlinx.atomicfu.*
8
+ import kotlinx.coroutines.*
9
+ import java.util.concurrent.atomic.AtomicLong
10
+ import java.util.concurrent.atomic.AtomicLongArray
11
+ import kotlin.math.*
12
+ import kotlin.test.*
13
+
14
+ /* *
15
+ * Tests lock-freedom of send and receive operations on rendezvous channel.
16
+ * There is a single channel with two sender and two receiver threads.
17
+ * When one sender or receiver gets suspended at most one other operation is allowed to cease having progress
18
+ * (`allowSuspendedThreads = 1`).
19
+ *
20
+ * **Note**: In the current implementation buffered channels are not lock-free, so this test would fail
21
+ * * if channel is created with a buffer.
22
+ */
23
+ class RendezvousChannelStressLFTest : TestBase () {
24
+ private val nSeconds = 5 * stressTestMultiplier
25
+ private val env = LockFreedomTestEnvironment (" RendezvousChannelStressLFTest" , allowSuspendedThreads = 1 )
26
+ private val channel = Channel <Long >()
27
+
28
+ private val sendIndex = AtomicLong ()
29
+ private val receiveCount = AtomicLong ()
30
+ private val duplicateCount = AtomicLong ()
31
+
32
+ private val nCheckedSize = 10_000_000
33
+ private val nChecked = (nCheckedSize * Long .SIZE_BITS ).toLong()
34
+ private val receivedBits = AtomicLongArray (nCheckedSize) // bit set of received values
35
+
36
+ @Test
37
+ fun testLockFreedom () {
38
+ env.onCompletion { channel.close() }
39
+ repeat(2 ) { env.testThread { sender() } }
40
+ repeat(2 ) { env.testThread { receiver() } }
41
+ env.performTest(nSeconds) {
42
+ println (" Sent: $sendIndex , Received: $receiveCount , dups: $duplicateCount " )
43
+ }
44
+ // ensure no duplicates
45
+ assertEquals(0L , duplicateCount.get())
46
+ // ensure that all sent were received
47
+ for (i in 0 until min(sendIndex.get(), nChecked)) {
48
+ assertTrue(isReceived(i))
49
+ }
50
+ }
51
+
52
+ private suspend fun sender () {
53
+ val value = sendIndex.getAndIncrement()
54
+ try {
55
+ channel.send(value)
56
+ } catch (e: ClosedSendChannelException ) {
57
+ check(env.isCompleted) // expected when test was completed
58
+ markReceived(value) // fake received (actually failed to send)
59
+ }
60
+ }
61
+
62
+ private suspend fun receiver () {
63
+ val value = try {
64
+ channel.receive()
65
+ } catch (e: ClosedReceiveChannelException ) {
66
+ check(env.isCompleted) // expected when test was completed
67
+ return
68
+ }
69
+ receiveCount.incrementAndGet()
70
+ markReceived(value)
71
+ }
72
+
73
+ private fun markReceived (value : Long ) {
74
+ if (value >= nChecked) return // too big
75
+ val index = (value / Long .SIZE_BITS ).toInt()
76
+ val mask = 1L shl (value % Long .SIZE_BITS ).toInt()
77
+ while (true ) {
78
+ val bits = receivedBits.get(index)
79
+ if (bits and mask != 0L ) {
80
+ duplicateCount.incrementAndGet()
81
+ break
82
+ }
83
+ if (receivedBits.compareAndSet(index, bits, bits or mask)) break
84
+ }
85
+ }
86
+
87
+ private fun isReceived (value : Long ): Boolean {
88
+ val index = (value / Long .SIZE_BITS ).toInt()
89
+ val mask = 1L shl (value % Long .SIZE_BITS ).toInt()
90
+ val bits = receivedBits.get(index)
91
+ return bits or mask != 0L
92
+ }
93
+ }
0 commit comments