Skip to content

Commit 14fa4ed

Browse files
author
Nikita Koval
committed
Make elimination similar to Kotlin#4
1 parent 150df3d commit 14fa4ed

File tree

1 file changed

+44
-51
lines changed

1 file changed

+44
-51
lines changed

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/koval/RendezvousChannelKoval.kt

+44-51
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater
1717
class RendezvousChannelKoval<E>(
1818
private val segmentSize: Int = 64,
1919
private val spinThreshold: Int = 300,
20-
private val elemSpinThreshold: Int = 15
20+
private val elemSpinThreshold: Int = 20
2121
): ChannelKoval<E> {
2222
// Waiting queue node
2323
private class Node(segmentSize: Int, @JvmField val id: Int) {
@@ -105,7 +105,6 @@ class RendezvousChannelKoval<E>(
105105
// Main function in this chanel, which implements both `#send` and `#receive` operations.
106106
// Note that `#offer` and `#poll` functions are just simplified versions of this one.
107107
private suspend fun <T> sendOrReceiveSuspend(element: Any) = suspendAtomicCancellableCoroutine<T>(holdCancellability = true) sc@ { curCont ->
108-
var localCounterUpdate = 0
109108
try_again@ while (true) { // CAS loop
110109
// Read the tail and its enqueue index at first, then the head and its indexes.
111110
// It is important to read tail and its index at first. If algorithm
@@ -129,16 +128,16 @@ class RendezvousChannelKoval<E>(
129128
}
130129
// Queue is empty, try to add a new node with the current continuation.
131130
if (addNewNode(head, curCont, element)) {
132-
if (localCounterUpdate > 0) { incElimSenderArraySize(1); incElimReceiverArraySize(1) }
131+
incElimArraySizes(1)
133132
return@sc
134-
} else { localCounterUpdate++ }
133+
}
135134
} else {
136135
// The `head` node is not full, therefore the waiting queue
137136
// is empty. Try to add the current continuation to the queue.
138137
if (storeContinuation(head, headEnqIdx, curCont, element)) {
139-
if (localCounterUpdate > 0) { incElimSenderArraySize(1); incElimReceiverArraySize(1) }
138+
incElimArraySizes(1)
140139
return@sc
141-
} else { localCounterUpdate++ }
140+
}
142141
}
143142
} else {
144143
// The waiting queue is not empty and it is guaranteed that `headDeqIdx < headEnqIdx`.
@@ -156,8 +155,8 @@ class RendezvousChannelKoval<E>(
156155
var firstElement = readElement(head, headDeqIdx)
157156
if (firstElement == TAKEN_ELEMENT) {
158157
// Try to move the deque index in the `head` node
159-
localCounterUpdate++
160158
deqIdxUpdater.compareAndSet(head, headDeqIdx, headDeqIdx + 1)
159+
incElimArraySizes(1)
161160
continue@try_again
162161
}
163162
// The `firstElement` is either sender or receiver. Check if a rendezvous is possible
@@ -179,7 +178,7 @@ class RendezvousChannelKoval<E>(
179178
// Resume the current continuation
180179
val result = (if (element == RECEIVER_ELEMENT) firstElement else Unit) as T
181180
curCont.resume(result)
182-
if (localCounterUpdate > 0) { incElimSenderArraySize(1); incElimReceiverArraySize(1) }
181+
incElimArraySizes(1)
183182
return@sc
184183
}
185184
// Re-read the required pointers
@@ -193,14 +192,12 @@ class RendezvousChannelKoval<E>(
193192
}
194193
// Check that `(head.id, headDeqIdx) < (headIdLimit, headDeqIdxLimit)`
195194
// and re-start the whole operation if needed
196-
if (head.id > headIdLimit || (head.id == headIdLimit && headDeqIdx >= headDeqIdxLimit)) {
197-
localCounterUpdate++
198-
continue@try_again
199-
}
195+
if (head.id > headIdLimit || (head.id == headIdLimit && headDeqIdx >= headDeqIdxLimit)) continue@try_again
200196
// Re-read the first element
201197
firstElement = readElement(head, headDeqIdx)
202198
if (firstElement == TAKEN_ELEMENT) {
203199
deqIdxUpdater.compareAndSet(head, headDeqIdx, headDeqIdx + 1)
200+
incElimArraySizes(1)
204201
continue@read_state
205202
}
206203
break@read_state
@@ -212,12 +209,12 @@ class RendezvousChannelKoval<E>(
212209
// if the tail is full, otherwise try to store it at the `tailEnqIdx` index.
213210
if (tailEnqIdx == segmentSize) {
214211
if (addNewNode(tail, curCont, element)) {
215-
if (localCounterUpdate > 0) { incElimSenderArraySize(1); incElimReceiverArraySize(1) }
212+
incElimArraySizes(1)
216213
return@sc
217214
}
218215
} else {
219216
if (storeContinuation(tail, tailEnqIdx, curCont, element)) {
220-
if (localCounterUpdate > 0) { incElimSenderArraySize(1); incElimReceiverArraySize(1) }
217+
incElimArraySizes(1)
221218
return@sc
222219
}
223220
}
@@ -227,10 +224,7 @@ class RendezvousChannelKoval<E>(
227224
tailEnqIdx = tail._enqIdx
228225
head = _head
229226
headDeqIdx = head._deqIdx
230-
if (head.id > headIdLimit || (head.id == headIdLimit && headDeqIdx >= headDeqIdxLimit)) {
231-
localCounterUpdate++
232-
continue@try_again
233-
}
227+
if (head.id > headIdLimit || (head.id == headIdLimit && headDeqIdx >= headDeqIdxLimit)) continue@try_again
234228
}
235229
}
236230
}
@@ -240,8 +234,8 @@ class RendezvousChannelKoval<E>(
240234
@JvmField var _elimSenderArraySize = 0
241235
@JvmField var _elimReceiverArraySize = 0
242236

243-
@JvmField var _elimsTotal = 0
244-
@JvmField var _elimsSucc = 0
237+
// @JvmField var _elimsTotal = 0
238+
// @JvmField var _elimsSucc = 0
245239

246240
private val _elimSenderArray = AtomicReferenceArray<Any>(ELIM_MAX_ARR_SIZE)
247241
private val _elimReceiverArray = AtomicReferenceArray<Any>(ELIM_MAX_ARR_SIZE)
@@ -252,6 +246,11 @@ class RendezvousChannelKoval<E>(
252246
return value
253247
}
254248

249+
private fun incElimArraySizes(value: Int) {
250+
incElimSenderArraySize(value)
251+
incElimReceiverArraySize(value)
252+
}
253+
255254
private fun incElimSenderArraySize(value: Int) {
256255
val newVal = limitElimCounterValue(_elimSenderArraySize + value)
257256
_elimSenderArraySize = newVal
@@ -277,28 +276,27 @@ class RendezvousChannelKoval<E>(
277276
private fun tryEliminateSender(element: Any): Unit? {
278277
val elimReceiverArraySize = _elimReceiverArraySize
279278
if (elimReceiverArraySize > 0) {
280-
_elimsTotal++
279+
// _elimsTotal++
281280
val position = ThreadLocalRandom.current().nextInt(elimReceiverArraySize)
282281
attempt@ for (i in max(0, position - 1) .. min(position + 1, elimReceiverArraySize - 1)) {
283282
val x = _elimReceiverArray[i]
284283
when (x) {
285-
null -> { continue@attempt }
284+
null -> { decElimReceiverArraySize(1); continue@attempt }
286285
ELIM_RECEIVER_ELEMENT -> {
287286
if (_elimReceiverArray.compareAndSet(i, x, Done(element))) {
288-
_elimsSucc++
287+
// _elimsSucc++
289288
return Unit
290289
} else incElimReceiverArraySize(1)
291290
}
292291
else -> incElimReceiverArraySize(1)
293292
}
294293
}
295294
// Elimination was unsuccessful :(
296-
decElimReceiverArraySize(1)
297295
}
298296

299297
val elimSenderArraySize = _elimSenderArraySize
300298
if (elimSenderArraySize > 0) {
301-
_elimsTotal++
299+
// _elimsTotal++
302300
val position = ThreadLocalRandom.current().nextInt(elimSenderArraySize)
303301
attempt@ for (i in max(0, position - 1) .. min(position + 1, elimSenderArraySize - 1)) {
304302
val x = _elimSenderArray[i]
@@ -311,23 +309,23 @@ class RendezvousChannelKoval<E>(
311309
val probablyDone = _elimSenderArray[i]
312310
if (probablyDone == ELIM_SENDER_DONE) {
313311
_elimSenderArray[i] = null
314-
_elimsSucc++
312+
incElimSenderArraySize(1)
313+
// _elimsSucc++
315314
return Unit
316315
}
317316
}
317+
decElimSenderArraySize(1)
318318
if (!_elimSenderArray.compareAndSet(i, box, null)) {
319319
// _elimSenderArray[i] == ELIM_SENDER_DONE
320-
incElimSenderArraySize(1)
321320
_elimSenderArray[i] = null
322-
_elimsSucc++
321+
// _elimsSucc++
323322
return Unit
324323
}
325324
} else incElimSenderArraySize(1)
326325
}
327326
else -> incElimSenderArraySize(1)
328327
}
329328
}
330-
decElimSenderArraySize(1)
331329
}
332330

333331
return null
@@ -336,28 +334,27 @@ class RendezvousChannelKoval<E>(
336334
private fun tryEliminateReceiver(): Any? {
337335
val elimSenderArraySize = _elimSenderArraySize
338336
if (elimSenderArraySize > 0) {
339-
_elimsTotal++
337+
// _elimsTotal++
340338
val position = ThreadLocalRandom.current().nextInt(elimSenderArraySize)
341339
attempt@ for (i in max(0, position - 1) .. min(position + 1, elimSenderArraySize - 1)) {
342340
val x = _elimSenderArray[i]
343341
when (x) {
344-
null -> continue@attempt
342+
null -> { decElimSenderArraySize(1); continue@attempt }
345343
is ElementBox -> {
346344
if (_elimSenderArray.compareAndSet(i, x, ELIM_SENDER_DONE)) {
347-
_elimsSucc++
345+
// _elimsSucc++
348346
return x.value
349347
} else incElimSenderArraySize(1)
350348
}
351349
else -> incElimSenderArraySize(1)
352350
}
353351
}
354352
// Elimination was unsuccessful :(
355-
decElimSenderArraySize(1)
356353
}
357354

358355
val elimReceiverArraySize = _elimReceiverArraySize
359356
if (elimReceiverArraySize > 0) {
360-
_elimsTotal++
357+
// _elimsTotal++
361358
val position = ThreadLocalRandom.current().nextInt(elimReceiverArraySize)
362359
attempt@ for (i in max(0, position - 1) .. min(position + 1, elimReceiverArraySize - 1)) {
363360
val x = _elimReceiverArray[i]
@@ -369,15 +366,16 @@ class RendezvousChannelKoval<E>(
369366
if (probablyDone is Done) {
370367
val res = probablyDone.value
371368
_elimReceiverArray[i] = null
372-
_elimsSucc++
369+
incElimReceiverArraySize(1)
370+
// _elimsSucc++
373371
return res
374372
}
375373
}
374+
decElimReceiverArraySize(1)
376375
if (!_elimReceiverArray.compareAndSet(i, ELIM_RECEIVER_ELEMENT, null)) {
377376
val done = _elimReceiverArray[i] as Done
378377
_elimReceiverArray[i] = null
379-
incElimReceiverArraySize(1)
380-
_elimsSucc++
378+
// _elimsSucc++
381379
return done.value
382380
}
383381
} else incElimReceiverArraySize(1)
@@ -386,7 +384,6 @@ class RendezvousChannelKoval<E>(
386384
}
387385
}
388386
// Elimination was unsuccessful :(
389-
decElimSenderArraySize(1)
390387
}
391388

392389
return null
@@ -403,7 +400,6 @@ class RendezvousChannelKoval<E>(
403400

404401
// This method is based on `#sendOrReceiveSuspend`. Returns `null` if fails.
405402
private fun <T> sendOrReceiveNonSuspend(element: Any): T? {
406-
var localCounterUpdate = 0
407403
try_again@ while (true) { // CAS loop
408404
// Read the tail and its enqueue index at first, then the head and its indexes.
409405
val tail = _tail
@@ -414,19 +410,19 @@ class RendezvousChannelKoval<E>(
414410
// If the waiting queue is empty, `headDeqIdx == headEnqIdx`.
415411
// This can also happen if the `head` node is full (`headDeqIdx == segmentSize`).
416412
if (headDeqIdx == headEnqIdx) {
413+
val res = tryEliminate(head, headEnqIdx, element)
414+
if (res != null) return res as T
417415
if (headDeqIdx == segmentSize) {
418416
// The `head` node is full. Try to move `_head`
419417
// pointer forward and start the operation again.
420418
if (adjustHead(head)) continue@try_again
421419
// Queue is empty, try to do elimination
422420
// and return `null` if it fails.
423-
if (localCounterUpdate > 0) { incElimSenderArraySize(1); incElimReceiverArraySize(1) }
424-
return tryEliminate(head, headEnqIdx, element) as T
421+
return null
425422
} else {
426423
// Queue is empty, try to do elimination
427424
// and return `null` if it fails.
428-
if (localCounterUpdate > 0) { incElimSenderArraySize(1); incElimReceiverArraySize(1) }
429-
return tryEliminate(head, headEnqIdx, element) as T
425+
return null
430426
}
431427
} else {
432428
// The waiting queue is not empty and it is guaranteed that `headDeqIdx < headEnqIdx`.
@@ -436,7 +432,7 @@ class RendezvousChannelKoval<E>(
436432
if (firstElement == TAKEN_ELEMENT) {
437433
// Try to move the deque index in the `head` node
438434
deqIdxUpdater.compareAndSet(head, headDeqIdx, headDeqIdx + 1)
439-
localCounterUpdate++
435+
incElimArraySizes(1)
440436
continue@try_again
441437
}
442438
val makeRendezvous = if (element == RECEIVER_ELEMENT) firstElement != RECEIVER_ELEMENT else firstElement == RECEIVER_ELEMENT
@@ -446,7 +442,7 @@ class RendezvousChannelKoval<E>(
446442
while (true) {
447443
if (tryResumeContinuation(head, headDeqIdx, element)) {
448444
// The rendezvous is happened, congratulations!
449-
if (localCounterUpdate > 0) { incElimSenderArraySize(1); incElimReceiverArraySize(1) }
445+
incElimArraySizes(1)
450446
return (if (element == RECEIVER_ELEMENT) firstElement else Unit) as T
451447
}
452448
// Re-read the required pointers
@@ -455,26 +451,23 @@ class RendezvousChannelKoval<E>(
455451
head = _head
456452
headDeqIdx = head._deqIdx
457453
if (headDeqIdx == segmentSize) {
458-
if (!adjustHead(head)) { localCounterUpdate++; continue@try_again }
454+
if (!adjustHead(head)) continue@try_again
459455
continue@read_state
460456
}
461457
// Check that `(head.id, headDeqIdx) < (headIdLimit, headDeqIdxLimit)`
462458
// and re-start the whole operation if needed
463-
if (head.id > headIdLimit || (head.id == headIdLimit && headDeqIdx >= headDeqIdxLimit)) {
464-
localCounterUpdate++
465-
continue@try_again
466-
}
459+
if (head.id > headIdLimit || (head.id == headIdLimit && headDeqIdx >= headDeqIdxLimit)) continue@try_again
467460
// Re-read the first element
468461
firstElement = readElement(head, headDeqIdx)
469462
if (firstElement == TAKEN_ELEMENT) {
463+
incElimArraySizes(1)
470464
deqIdxUpdater.compareAndSet(head, headDeqIdx, headDeqIdx + 1)
471465
continue@read_state
472466
}
473467
break@read_state
474468
}
475469
}
476470
} else {
477-
if (localCounterUpdate > 0) { incElimSenderArraySize(1); incElimReceiverArraySize(1) }
478471
return null
479472
}
480473
}

0 commit comments

Comments
 (0)