Skip to content

Commit 150df3d

Browse files
author
Nikita Koval
committed
Update elimination a bit
1 parent d12eced commit 150df3d

File tree

1 file changed

+37
-21
lines changed

1 file changed

+37
-21
lines changed

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/koval/RendezvousChannelKoval.kt

+37-21
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ class RendezvousChannelKoval<E>(
105105
// Main function in this chanel, which implements both `#send` and `#receive` operations.
106106
// Note that `#offer` and `#poll` functions are just simplified versions of this one.
107107
private suspend fun <T> sendOrReceiveSuspend(element: Any) = suspendAtomicCancellableCoroutine<T>(holdCancellability = true) sc@ { curCont ->
108+
var localCounterUpdate = 0
108109
try_again@ while (true) { // CAS loop
109110
// Read the tail and its enqueue index at first, then the head and its indexes.
110111
// It is important to read tail and its index at first. If algorithm
@@ -128,14 +129,16 @@ class RendezvousChannelKoval<E>(
128129
}
129130
// Queue is empty, try to add a new node with the current continuation.
130131
if (addNewNode(head, curCont, element)) {
132+
if (localCounterUpdate > 0) { incElimSenderArraySize(1); incElimReceiverArraySize(1) }
131133
return@sc
132-
} else { incElimReceiverArraySize(1); incElimSenderArraySize(1) }
134+
} else { localCounterUpdate++ }
133135
} else {
134136
// The `head` node is not full, therefore the waiting queue
135137
// is empty. Try to add the current continuation to the queue.
136138
if (storeContinuation(head, headEnqIdx, curCont, element)) {
139+
if (localCounterUpdate > 0) { incElimSenderArraySize(1); incElimReceiverArraySize(1) }
137140
return@sc
138-
} else { incElimReceiverArraySize(1); incElimSenderArraySize(1) }
141+
} else { localCounterUpdate++ }
139142
}
140143
} else {
141144
// The waiting queue is not empty and it is guaranteed that `headDeqIdx < headEnqIdx`.
@@ -153,7 +156,7 @@ class RendezvousChannelKoval<E>(
153156
var firstElement = readElement(head, headDeqIdx)
154157
if (firstElement == TAKEN_ELEMENT) {
155158
// Try to move the deque index in the `head` node
156-
incElimReceiverArraySize(1); incElimSenderArraySize(1)
159+
localCounterUpdate++
157160
deqIdxUpdater.compareAndSet(head, headDeqIdx, headDeqIdx + 1)
158161
continue@try_again
159162
}
@@ -176,6 +179,7 @@ class RendezvousChannelKoval<E>(
176179
// Resume the current continuation
177180
val result = (if (element == RECEIVER_ELEMENT) firstElement else Unit) as T
178181
curCont.resume(result)
182+
if (localCounterUpdate > 0) { incElimSenderArraySize(1); incElimReceiverArraySize(1) }
179183
return@sc
180184
}
181185
// Re-read the required pointers
@@ -190,7 +194,7 @@ class RendezvousChannelKoval<E>(
190194
// Check that `(head.id, headDeqIdx) < (headIdLimit, headDeqIdxLimit)`
191195
// and re-start the whole operation if needed
192196
if (head.id > headIdLimit || (head.id == headIdLimit && headDeqIdx >= headDeqIdxLimit)) {
193-
incElimReceiverArraySize(1); incElimSenderArraySize(1)
197+
localCounterUpdate++
194198
continue@try_again
195199
}
196200
// Re-read the first element
@@ -208,12 +212,12 @@ class RendezvousChannelKoval<E>(
208212
// if the tail is full, otherwise try to store it at the `tailEnqIdx` index.
209213
if (tailEnqIdx == segmentSize) {
210214
if (addNewNode(tail, curCont, element)) {
211-
decOppositeElimArraySize(element, 1)
215+
if (localCounterUpdate > 0) { incElimSenderArraySize(1); incElimReceiverArraySize(1) }
212216
return@sc
213217
}
214218
} else {
215219
if (storeContinuation(tail, tailEnqIdx, curCont, element)) {
216-
decOppositeElimArraySize(element, 1)
220+
if (localCounterUpdate > 0) { incElimSenderArraySize(1); incElimReceiverArraySize(1) }
217221
return@sc
218222
}
219223
}
@@ -224,7 +228,7 @@ class RendezvousChannelKoval<E>(
224228
head = _head
225229
headDeqIdx = head._deqIdx
226230
if (head.id > headIdLimit || (head.id == headIdLimit && headDeqIdx >= headDeqIdxLimit)) {
227-
incElimReceiverArraySize(1); incElimSenderArraySize(1)
231+
localCounterUpdate++
228232
continue@try_again
229233
}
230234
}
@@ -273,18 +277,19 @@ class RendezvousChannelKoval<E>(
273277
private fun tryEliminateSender(element: Any): Unit? {
274278
val elimReceiverArraySize = _elimReceiverArraySize
275279
if (elimReceiverArraySize > 0) {
276-
// _elimsTotal++
280+
_elimsTotal++
277281
val position = ThreadLocalRandom.current().nextInt(elimReceiverArraySize)
278282
attempt@ for (i in max(0, position - 1) .. min(position + 1, elimReceiverArraySize - 1)) {
279283
val x = _elimReceiverArray[i]
280284
when (x) {
281285
null -> { continue@attempt }
282286
ELIM_RECEIVER_ELEMENT -> {
283287
if (_elimReceiverArray.compareAndSet(i, x, Done(element))) {
284-
// _elimsSucc++
288+
_elimsSucc++
285289
return Unit
286290
} else incElimReceiverArraySize(1)
287291
}
292+
else -> incElimReceiverArraySize(1)
288293
}
289294
}
290295
// Elimination was unsuccessful :(
@@ -293,7 +298,7 @@ class RendezvousChannelKoval<E>(
293298

294299
val elimSenderArraySize = _elimSenderArraySize
295300
if (elimSenderArraySize > 0) {
296-
// _elimsTotal++
301+
_elimsTotal++
297302
val position = ThreadLocalRandom.current().nextInt(elimSenderArraySize)
298303
attempt@ for (i in max(0, position - 1) .. min(position + 1, elimSenderArraySize - 1)) {
299304
val x = _elimSenderArray[i]
@@ -306,19 +311,20 @@ class RendezvousChannelKoval<E>(
306311
val probablyDone = _elimSenderArray[i]
307312
if (probablyDone == ELIM_SENDER_DONE) {
308313
_elimSenderArray[i] = null
309-
// _elimsSucc++
314+
_elimsSucc++
310315
return Unit
311316
}
312317
}
313318
if (!_elimSenderArray.compareAndSet(i, box, null)) {
314319
// _elimSenderArray[i] == ELIM_SENDER_DONE
315320
incElimSenderArraySize(1)
316321
_elimSenderArray[i] = null
317-
// _elimsSucc++
322+
_elimsSucc++
318323
return Unit
319324
}
320325
} else incElimSenderArraySize(1)
321326
}
327+
else -> incElimSenderArraySize(1)
322328
}
323329
}
324330
decElimSenderArraySize(1)
@@ -330,18 +336,19 @@ class RendezvousChannelKoval<E>(
330336
private fun tryEliminateReceiver(): Any? {
331337
val elimSenderArraySize = _elimSenderArraySize
332338
if (elimSenderArraySize > 0) {
333-
// _elimsTotal++
339+
_elimsTotal++
334340
val position = ThreadLocalRandom.current().nextInt(elimSenderArraySize)
335341
attempt@ for (i in max(0, position - 1) .. min(position + 1, elimSenderArraySize - 1)) {
336342
val x = _elimSenderArray[i]
337343
when (x) {
338344
null -> continue@attempt
339345
is ElementBox -> {
340346
if (_elimSenderArray.compareAndSet(i, x, ELIM_SENDER_DONE)) {
341-
// _elimsSucc++
347+
_elimsSucc++
342348
return x.value
343349
} else incElimSenderArraySize(1)
344350
}
351+
else -> incElimSenderArraySize(1)
345352
}
346353
}
347354
// Elimination was unsuccessful :(
@@ -350,7 +357,7 @@ class RendezvousChannelKoval<E>(
350357

351358
val elimReceiverArraySize = _elimReceiverArraySize
352359
if (elimReceiverArraySize > 0) {
353-
// _elimsTotal++
360+
_elimsTotal++
354361
val position = ThreadLocalRandom.current().nextInt(elimReceiverArraySize)
355362
attempt@ for (i in max(0, position - 1) .. min(position + 1, elimReceiverArraySize - 1)) {
356363
val x = _elimReceiverArray[i]
@@ -362,19 +369,20 @@ class RendezvousChannelKoval<E>(
362369
if (probablyDone is Done) {
363370
val res = probablyDone.value
364371
_elimReceiverArray[i] = null
365-
// _elimsSucc++
372+
_elimsSucc++
366373
return res
367374
}
368375
}
369376
if (!_elimReceiverArray.compareAndSet(i, ELIM_RECEIVER_ELEMENT, null)) {
370377
val done = _elimReceiverArray[i] as Done
371378
_elimReceiverArray[i] = null
372379
incElimReceiverArraySize(1)
373-
// _elimsSucc++
380+
_elimsSucc++
374381
return done.value
375382
}
376-
}
383+
} else incElimReceiverArraySize(1)
377384
}
385+
else -> incElimReceiverArraySize(1)
378386
}
379387
}
380388
// Elimination was unsuccessful :(
@@ -395,6 +403,7 @@ class RendezvousChannelKoval<E>(
395403

396404
// This method is based on `#sendOrReceiveSuspend`. Returns `null` if fails.
397405
private fun <T> sendOrReceiveNonSuspend(element: Any): T? {
406+
var localCounterUpdate = 0
398407
try_again@ while (true) { // CAS loop
399408
// Read the tail and its enqueue index at first, then the head and its indexes.
400409
val tail = _tail
@@ -411,10 +420,12 @@ class RendezvousChannelKoval<E>(
411420
if (adjustHead(head)) continue@try_again
412421
// Queue is empty, try to do elimination
413422
// and return `null` if it fails.
423+
if (localCounterUpdate > 0) { incElimSenderArraySize(1); incElimReceiverArraySize(1) }
414424
return tryEliminate(head, headEnqIdx, element) as T
415425
} else {
416426
// Queue is empty, try to do elimination
417427
// and return `null` if it fails.
428+
if (localCounterUpdate > 0) { incElimSenderArraySize(1); incElimReceiverArraySize(1) }
418429
return tryEliminate(head, headEnqIdx, element) as T
419430
}
420431
} else {
@@ -425,7 +436,7 @@ class RendezvousChannelKoval<E>(
425436
if (firstElement == TAKEN_ELEMENT) {
426437
// Try to move the deque index in the `head` node
427438
deqIdxUpdater.compareAndSet(head, headDeqIdx, headDeqIdx + 1)
428-
incElimReceiverArraySize(1); incElimSenderArraySize(1)
439+
localCounterUpdate++
429440
continue@try_again
430441
}
431442
val makeRendezvous = if (element == RECEIVER_ELEMENT) firstElement != RECEIVER_ELEMENT else firstElement == RECEIVER_ELEMENT
@@ -435,6 +446,7 @@ class RendezvousChannelKoval<E>(
435446
while (true) {
436447
if (tryResumeContinuation(head, headDeqIdx, element)) {
437448
// The rendezvous is happened, congratulations!
449+
if (localCounterUpdate > 0) { incElimSenderArraySize(1); incElimReceiverArraySize(1) }
438450
return (if (element == RECEIVER_ELEMENT) firstElement else Unit) as T
439451
}
440452
// Re-read the required pointers
@@ -443,12 +455,15 @@ class RendezvousChannelKoval<E>(
443455
head = _head
444456
headDeqIdx = head._deqIdx
445457
if (headDeqIdx == segmentSize) {
446-
if (!adjustHead(head)) { incElimReceiverArraySize(1); incElimSenderArraySize(1); continue@try_again }
458+
if (!adjustHead(head)) { localCounterUpdate++; continue@try_again }
447459
continue@read_state
448460
}
449461
// Check that `(head.id, headDeqIdx) < (headIdLimit, headDeqIdxLimit)`
450462
// and re-start the whole operation if needed
451-
if (head.id > headIdLimit || (head.id == headIdLimit && headDeqIdx >= headDeqIdxLimit)) { incElimReceiverArraySize(1); incElimSenderArraySize(1); continue@try_again }
463+
if (head.id > headIdLimit || (head.id == headIdLimit && headDeqIdx >= headDeqIdxLimit)) {
464+
localCounterUpdate++
465+
continue@try_again
466+
}
452467
// Re-read the first element
453468
firstElement = readElement(head, headDeqIdx)
454469
if (firstElement == TAKEN_ELEMENT) {
@@ -459,6 +474,7 @@ class RendezvousChannelKoval<E>(
459474
}
460475
}
461476
} else {
477+
if (localCounterUpdate > 0) { incElimSenderArraySize(1); incElimReceiverArraySize(1) }
462478
return null
463479
}
464480
}

0 commit comments

Comments
 (0)