-
Notifications
You must be signed in to change notification settings - Fork 1.9k
/
Copy pathBufferedChannel.kt
3086 lines (2910 loc) · 147 KB
/
BufferedChannel.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
@file:Suppress("PrivatePropertyName")
package kotlinx.coroutines.channels
import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.ChannelResult.Companion.closed
import kotlinx.coroutines.channels.ChannelResult.Companion.failure
import kotlinx.coroutines.channels.ChannelResult.Companion.success
import kotlinx.coroutines.internal.*
import kotlinx.coroutines.selects.*
import kotlinx.coroutines.selects.TrySelectDetailedResult.*
import kotlin.coroutines.*
import kotlin.js.*
import kotlin.jvm.*
import kotlin.math.*
import kotlin.reflect.*
/**
* The buffered channel implementation, which also serves as a rendezvous channel when the capacity is zero.
* The high-level structure bases on a conceptually infinite array for storing elements and waiting requests,
* separate counters of [send] and [receive] invocations that were ever performed, and an additional counter
* that indicates the end of the logical buffer by counting the number of array cells it ever contained.
* The key idea is that both [send] and [receive] start by incrementing their counters, assigning the array cell
* referenced by the counter. In case of rendezvous channels, the operation either suspends and stores its continuation
* in the cell or makes a rendezvous with the opposite request. Each cell can be processed by exactly one [send] and
* one [receive]. As for buffered channels, [send]-s can also add elements without suspension if the logical buffer
* contains the cell, while the [receive] operation updates the end of the buffer when its synchronization finishes.
*
* Please see the ["Fast and Scalable Channels in Kotlin Coroutines"](https://arxiv.org/abs/2211.04986)
* paper by Nikita Koval, Roman Elizarov, and Dan Alistarh for the detailed algorithm description.
*/
internal open class BufferedChannel<E>(
/**
* Channel capacity; `Channel.RENDEZVOUS` for rendezvous channel
* and `Channel.UNLIMITED` for unlimited capacity.
*/
private val capacity: Int,
@JvmField
internal val onUndeliveredElement: OnUndeliveredElement<E>? = null
) : Channel<E> {
init {
require(capacity >= 0) { "Invalid channel capacity: $capacity, should be >=0" }
// This implementation has second `init`.
}
// Maintenance note: use `Buffered1ChannelLincheckTest` to check hypotheses.
/*
The counters indicate the total numbers of send, receive, and buffer expansion calls
ever performed. The counters are incremented in the beginning of the corresponding
operation; thus, acquiring a unique (for the operation type) cell to process.
The segments reference to the last working one for each operation type.
Notably, the counter for send is combined with the channel closing status
for synchronization simplicity and performance reasons.
The logical end of the buffer is initialized with the channel capacity.
If the channel is rendezvous or unlimited, the counter equals `BUFFER_END_RENDEZVOUS`
or `BUFFER_END_RENDEZVOUS`, respectively, and never updates. The `bufferEndSegment`
point to a special `NULL_SEGMENT` in this case.
*/
private val sendersAndCloseStatus = atomic(0L)
private val receivers = atomic(0L)
private val bufferEnd = atomic(initialBufferEnd(capacity))
internal val sendersCounter: Long get() = sendersAndCloseStatus.value.sendersCounter
internal val receiversCounter: Long get() = receivers.value
private val bufferEndCounter: Long get() = bufferEnd.value
/*
Additionally to the counters above, we need an extra one that
tracks the number of cells processed by `expandBuffer()`.
When a receiver aborts, the corresponding cell might be
physically removed from the data structure to avoid memory
leaks, while it still can be unprocessed by `expandBuffer()`.
In this case, `expandBuffer()` cannot know whether the
removed cell contained sender or receiver and, therefore,
cannot proceed. To solve the race, we ensure that cells
correspond to cancelled receivers cannot be physically
removed until the cell is processed.
This additional counter enables the synchronization,
*/
private val completedExpandBuffersAndPauseFlag = atomic(bufferEndCounter)
private val isRendezvousOrUnlimited
get() = bufferEndCounter.let { it == BUFFER_END_RENDEZVOUS || it == BUFFER_END_UNLIMITED }
private val sendSegment: AtomicRef<ChannelSegment<E>>
private val receiveSegment: AtomicRef<ChannelSegment<E>>
private val bufferEndSegment: AtomicRef<ChannelSegment<E>>
/*
These values are used in [ChannelSegment.isLeftmostOrProcessed].
They help to detect when the `prev` reference of the segment should be cleaned.
*/
internal val sendSegmentId: Long get() = sendSegment.value.id
internal val receiveSegmentId: Long get() = receiveSegment.value.id
init {
@Suppress("LeakingThis")
val firstSegment = ChannelSegment(id = 0, prev = null, channel = this)
sendSegment = atomic(firstSegment)
receiveSegment = atomic(firstSegment)
// If this channel is rendezvous or has unlimited capacity, the algorithm never
// invokes the buffer expansion procedure, and the corresponding segment reference
// points to a special `NULL_SEGMENT` one and never updates.
@Suppress("UNCHECKED_CAST")
bufferEndSegment = atomic(if (isRendezvousOrUnlimited) (NULL_SEGMENT as ChannelSegment<E>) else firstSegment)
}
// #########################
// ## The send operations ##
// #########################
override suspend fun send(element: E): Unit =
sendImpl( // <-- this is an inline function
element = element,
// Do not create a continuation until it is required;
// it is created later via [onNoWaiterSuspend], if needed.
waiter = null,
// Finish immediately if a rendezvous happens
// or the element has been buffered.
onRendezvousOrBuffered = {},
// As no waiter is provided, suspension is impossible.
onSuspend = { _, _ -> assert { false } },
// According to the `send(e)` contract, we need to call
// `onUndeliveredElement(..)` handler and throw an exception
// if the channel is already closed.
onClosed = { onClosedSend(element) },
// When `send(e)` decides to suspend, the corresponding
// `onNoWaiterSuspend` function that creates a continuation
// is called. The tail-call optimization is applied here.
onNoWaiterSuspend = { segm, i, elem, s -> sendOnNoWaiterSuspend(segm, i, elem, s) }
)
// NB: return type could've been Nothing, but it breaks TCO
private suspend fun onClosedSend(element: E): Unit = suspendCancellableCoroutine { continuation ->
onUndeliveredElement?.callUndeliveredElementCatchingException(element)?.let {
// If it crashes, add send exception as suppressed for better diagnostics
it.addSuppressed(sendException)
continuation.resumeWithStackTrace(it)
return@suspendCancellableCoroutine
}
continuation.resumeWithStackTrace(sendException)
}
private suspend fun sendOnNoWaiterSuspend(
/* The working cell is specified by
the segment and the index in it. */
segment: ChannelSegment<E>,
index: Int,
/* The element to be inserted. */
element: E,
/* The global index of the cell. */
s: Long
) = suspendCancellableCoroutineReusable sc@{ cont ->
sendImplOnNoWaiter( // <-- this is an inline function
segment = segment, index = index, element = element, s = s,
// Store the created continuation as a waiter.
waiter = cont,
// If a rendezvous happens or the element has been buffered,
// resume the continuation and finish. In case of prompt
// cancellation, it is guaranteed that the element
// has been already buffered or passed to receiver.
onRendezvousOrBuffered = { cont.resume(Unit) },
// If the channel is closed, call `onUndeliveredElement(..)` and complete the
// continuation with the corresponding exception.
onClosed = { onClosedSendOnNoWaiterSuspend(element, cont) },
)
}
private fun Waiter.prepareSenderForSuspension(
/* The working cell is specified by
the segment and the index in it. */
segment: ChannelSegment<E>,
index: Int
) {
// To distinguish cancelled senders and receivers,
// senders equip the index value with an additional marker,
// adding `SEGMENT_SIZE` to the value.
invokeOnCancellation(segment, index + SEGMENT_SIZE)
}
private fun onClosedSendOnNoWaiterSuspend(element: E, cont: CancellableContinuation<Unit>) {
onUndeliveredElement?.callUndeliveredElement(element, cont.context)
cont.resumeWithException(recoverStackTrace(sendException, cont))
}
override fun trySend(element: E): ChannelResult<Unit> {
// Do not try to send the element if the plain `send(e)` operation would suspend.
if (shouldSendSuspend(sendersAndCloseStatus.value)) return failure()
// This channel either has waiting receivers or is closed.
// Let's try to send the element!
// The logic is similar to the plain `send(e)` operation, with
// the only difference that we install `INTERRUPTED_SEND` in case
// the operation decides to suspend.
return sendImpl( // <-- this is an inline function
element = element,
// Store an already interrupted sender in case of suspension.
waiter = INTERRUPTED_SEND,
// Finish successfully when a rendezvous happens
// or the element has been buffered.
onRendezvousOrBuffered = { success(Unit) },
// On suspension, the `INTERRUPTED_SEND` token has been installed,
// and this `trySend(e)` must fail. According to the contract,
// we do not need to call the [onUndeliveredElement] handler.
onSuspend = { segm, _ ->
segm.onSlotCleaned()
failure()
},
// If the channel is closed, return the corresponding result.
onClosed = { closed(sendException) }
)
}
/**
* This is a special `send(e)` implementation that returns `true` if the element
* has been successfully sent, and `false` if the channel is closed.
*
* In case of coroutine cancellation, the element may be undelivered --
* the [onUndeliveredElement] feature is unsupported in this implementation.
*
*/
internal open suspend fun sendBroadcast(element: E): Boolean = suspendCancellableCoroutine { cont ->
check(onUndeliveredElement == null) {
"the `onUndeliveredElement` feature is unsupported for `sendBroadcast(e)`"
}
sendImpl(
element = element,
waiter = SendBroadcast(cont),
onRendezvousOrBuffered = { cont.resume(true) },
onSuspend = { _, _ -> },
onClosed = { cont.resume(false) }
)
}
/**
* Specifies waiting [sendBroadcast] operation.
*/
private class SendBroadcast(
val cont: CancellableContinuation<Boolean>
) : Waiter by cont as CancellableContinuationImpl<Boolean>
/**
* Abstract send implementation.
*/
private inline fun <R> sendImpl(
/* The element to be sent. */
element: E,
/* The waiter to be stored in case of suspension,
or `null` if the waiter is not created yet.
In the latter case, when the algorithm decides
to suspend, [onNoWaiterSuspend] is called. */
waiter: Any?,
/* This lambda is invoked when the element has been
buffered or a rendezvous with a receiver happens. */
onRendezvousOrBuffered: () -> R,
/* This lambda is called when the operation suspends in the
cell specified by the segment and the index in it. */
onSuspend: (segm: ChannelSegment<E>, i: Int) -> R,
/* This lambda is called when the channel
is observed in the closed state. */
onClosed: () -> R,
/* This lambda is called when the operation decides
to suspend, but the waiter is not provided (equals `null`).
It should create a waiter and delegate to `sendImplOnNoWaiter`. */
onNoWaiterSuspend: (
segm: ChannelSegment<E>,
i: Int,
element: E,
s: Long
) -> R = { _, _, _, _ -> error("unexpected") }
): R {
// Read the segment reference before the counter increment;
// it is crucial to be able to find the required segment later.
var segment = sendSegment.value
while (true) {
// Atomically increment the `senders` counter and obtain the
// value right before the increment along with the close status.
val sendersAndCloseStatusCur = sendersAndCloseStatus.getAndIncrement()
val s = sendersAndCloseStatusCur.sendersCounter
// Is this channel already closed? Keep the information.
val closed = sendersAndCloseStatusCur.isClosedForSend0
// Count the required segment id and the cell index in it.
val id = s / SEGMENT_SIZE
val i = (s % SEGMENT_SIZE).toInt()
// Try to find the required segment if the initially obtained
// one (in the beginning of this function) has lower id.
if (segment.id != id) {
// Find the required segment.
segment = findSegmentSend(id, segment) ?:
// The required segment has not been found.
// Finish immediately if this channel is closed,
// restarting the operation otherwise.
// In the latter case, the required segment was full
// of interrupted waiters and, therefore, removed
// physically to avoid memory leaks.
if (closed) {
return onClosed()
} else {
continue
}
}
// Update the cell according to the algorithm. Importantly, when
// the channel is already closed, storing a waiter is illegal, so
// the algorithm stores the `INTERRUPTED_SEND` token in this case.
when (updateCellSend(segment, i, element, s, waiter, closed)) {
RESULT_RENDEZVOUS, RESULT_BUFFERED -> {
// The element has been buffered or a rendezvous with a receiver has happened.
return onRendezvousOrBuffered()
}
RESULT_SUSPEND -> {
// The operation has decided to suspend and installed the
// specified waiter. If the channel was already closed,
// and the `INTERRUPTED_SEND` token has been installed as a waiter,
// this request finishes with the `onClosed()` action.
if (closed) {
segment.onSlotCleaned()
return onClosed()
}
(waiter as? Waiter)?.prepareSenderForSuspension(segment, i)
return onSuspend(segment, i)
}
RESULT_CLOSED -> {
// This channel is closed.
return onClosed()
}
RESULT_FAILED -> {
// Either the cell stores an interrupted receiver,
// or it was poisoned by a concurrent receiver.
continue
}
RESULT_SUSPEND_NO_WAITER -> {
// The operation has decided to suspend,
// but no waiter has been provided.
return onNoWaiterSuspend(segment, i, element, s)
}
}
}
}
// Note: this function is temporarily moved from ConflatedBufferedChannel to BufferedChannel class, because of this issue: KT-65554.
// For now, an inline function, which invokes atomic operations, may only be called within a parent class.
protected fun trySendDropOldest(element: E): ChannelResult<Unit> =
sendImpl( // <-- this is an inline function
element = element,
// Put the element into the logical buffer even
// if this channel is already full, the `onSuspend`
// callback below extract the first (oldest) element.
waiter = BUFFERED,
// Finish successfully when a rendezvous has happened
// or the element has been buffered.
onRendezvousOrBuffered = { return success(Unit) },
// In case the algorithm decided to suspend, the element
// was added to the buffer. However, as the buffer is now
// overflowed, the first (oldest) element has to be extracted.
onSuspend = { segm, i ->
dropFirstElementUntilTheSpecifiedCellIsInTheBuffer(segm.id * SEGMENT_SIZE + i)
return success(Unit)
},
// If the channel is closed, return the corresponding result.
onClosed = { return closed(sendException) }
)
private inline fun sendImplOnNoWaiter(
/* The working cell is specified by
the segment and the index in it. */
segment: ChannelSegment<E>,
index: Int,
/* The element to be sent. */
element: E,
/* The global index of the cell. */
s: Long,
/* The waiter to be stored in case of suspension. */
waiter: Waiter,
/* This lambda is invoked when the element has been
buffered or a rendezvous with a receiver happens.*/
onRendezvousOrBuffered: () -> Unit,
/* This lambda is called when the channel
is observed in the closed state. */
onClosed: () -> Unit,
) {
// Update the cell again, now with the non-null waiter,
// restarting the operation from the beginning on failure.
// Check the `sendImpl(..)` function for the comments.
when (updateCellSend(segment, index, element, s, waiter, false)) {
RESULT_RENDEZVOUS, RESULT_BUFFERED -> {
onRendezvousOrBuffered()
}
RESULT_SUSPEND -> {
waiter.prepareSenderForSuspension(segment, index)
}
RESULT_CLOSED -> {
onClosed()
}
RESULT_FAILED -> {
sendImpl(
element = element,
waiter = waiter,
onRendezvousOrBuffered = onRendezvousOrBuffered,
onSuspend = { _, _ -> },
onClosed = onClosed,
)
}
else -> error("unexpected")
}
}
private fun updateCellSend(
/* The working cell is specified by
the segment and the index in it. */
segment: ChannelSegment<E>,
index: Int,
/* The element to be sent. */
element: E,
/* The global index of the cell. */
s: Long,
/* The waiter to be stored in case of suspension. */
waiter: Any?,
closed: Boolean
): Int {
// This is a fast-path of `updateCellSendSlow(..)`.
//
// First, the algorithm stores the element,
// performing the synchronization after that.
// This way, receivers safely retrieve the
// element, following the safe publication pattern.
segment.storeElement(index, element)
if (closed) return updateCellSendSlow(segment, index, element, s, waiter, closed)
// Read the current cell state.
val state = segment.getState(index)
when {
// The cell is empty.
state === null -> {
// If the element should be buffered, or a rendezvous should happen
// while the receiver is still coming, try to buffer the element.
// Otherwise, try to store the specified waiter in the cell.
if (bufferOrRendezvousSend(s)) {
// Move the cell state to `BUFFERED`.
if (segment.casState(index, null, BUFFERED)) {
// The element has been successfully buffered, finish.
return RESULT_BUFFERED
}
} else {
// This `send(e)` operation should suspend.
// However, in case the channel has already
// been observed closed, `INTERRUPTED_SEND`
// is installed instead.
if (waiter == null) {
// The waiter is not specified; return the corresponding result.
return RESULT_SUSPEND_NO_WAITER
} else {
// Try to install the waiter.
if (segment.casState(index, null, waiter)) return RESULT_SUSPEND
}
}
}
// A waiting receiver is stored in the cell.
state is Waiter -> {
// As the element will be passed directly to the waiter,
// the algorithm cleans the element slot in the cell.
segment.cleanElement(index)
// Try to make a rendezvous with the suspended receiver.
return if (state.tryResumeReceiver(element)) {
// Rendezvous! Move the cell state to `DONE_RCV` and finish.
segment.setState(index, DONE_RCV)
onReceiveDequeued()
RESULT_RENDEZVOUS
} else {
// The resumption has failed. Update the cell state correspondingly
// and clean the element field. It is also possible for a concurrent
// cancellation handler to update the cell state; we can safely
// ignore these updates.
if (segment.getAndSetState(index, INTERRUPTED_RCV) !== INTERRUPTED_RCV) {
segment.onCancelledRequest(index, true)
}
RESULT_FAILED
}
}
}
return updateCellSendSlow(segment, index, element, s, waiter, closed)
}
/**
* Updates the working cell of an abstract send operation.
*/
private fun updateCellSendSlow(
/* The working cell is specified by
the segment and the index in it. */
segment: ChannelSegment<E>,
index: Int,
/* The element to be sent. */
element: E,
/* The global index of the cell. */
s: Long,
/* The waiter to be stored in case of suspension. */
waiter: Any?,
closed: Boolean
): Int {
// Then, the cell state should be updated according to
// its state machine; see the paper mentioned in the very
// beginning for the cell life-cycle and the algorithm details.
while (true) {
// Read the current cell state.
val state = segment.getState(index)
when {
// The cell is empty.
state === null -> {
// If the element should be buffered, or a rendezvous should happen
// while the receiver is still coming, try to buffer the element.
// Otherwise, try to store the specified waiter in the cell.
if (bufferOrRendezvousSend(s) && !closed) {
// Move the cell state to `BUFFERED`.
if (segment.casState(index, null, BUFFERED)) {
// The element has been successfully buffered, finish.
return RESULT_BUFFERED
}
} else {
// This `send(e)` operation should suspend.
// However, in case the channel has already
// been observed closed, `INTERRUPTED_SEND`
// is installed instead.
when {
// The channel is closed
closed -> if (segment.casState(index, null, INTERRUPTED_SEND)) {
segment.onCancelledRequest(index, false)
return RESULT_CLOSED
}
// The waiter is not specified; return the corresponding result.
waiter == null -> return RESULT_SUSPEND_NO_WAITER
// Try to install the waiter.
else -> if (segment.casState(index, null, waiter)) return RESULT_SUSPEND
}
}
}
// This cell is in the logical buffer.
state === IN_BUFFER -> {
// Try to buffer the element.
if (segment.casState(index, state, BUFFERED)) {
// The element has been successfully buffered, finish.
return RESULT_BUFFERED
}
}
// The cell stores a cancelled receiver.
state === INTERRUPTED_RCV -> {
// Clean the element slot to avoid memory leaks and finish.
segment.cleanElement(index)
return RESULT_FAILED
}
// The cell is poisoned by a concurrent receive.
state === POISONED -> {
// Clean the element slot to avoid memory leaks and finish.
segment.cleanElement(index)
return RESULT_FAILED
}
// The channel is already closed.
state === CHANNEL_CLOSED -> {
// Clean the element slot to avoid memory leaks,
// ensure that the closing/cancellation procedure
// has been completed, and finish.
segment.cleanElement(index)
completeCloseOrCancel()
return RESULT_CLOSED
}
// A waiting receiver is stored in the cell.
else -> {
assert { state is Waiter || state is WaiterEB }
// As the element will be passed directly to the waiter,
// the algorithm cleans the element slot in the cell.
segment.cleanElement(index)
// Unwrap the waiting receiver from `WaiterEB` if needed.
// As a receiver is stored in the cell, the buffer expansion
// procedure would finish, so senders simply ignore the "EB" marker.
val receiver = if (state is WaiterEB) state.waiter else state
// Try to make a rendezvous with the suspended receiver.
return if (receiver.tryResumeReceiver(element)) {
// Rendezvous! Move the cell state to `DONE_RCV` and finish.
segment.setState(index, DONE_RCV)
onReceiveDequeued()
RESULT_RENDEZVOUS
} else {
// The resumption has failed. Update the cell state correspondingly
// and clean the element field. It is also possible for a concurrent
// `expandBuffer()` or the cancellation handler to update the cell state;
// we can safely ignore these updates as senders do not help `expandBuffer()`.
if (segment.getAndSetState(index, INTERRUPTED_RCV) !== INTERRUPTED_RCV) {
segment.onCancelledRequest(index, true)
}
RESULT_FAILED
}
}
}
}
}
/**
* Checks whether a [send] invocation is bound to suspend if it is called
* with the specified [sendersAndCloseStatus], [receivers], and [bufferEnd]
* values. When this channel is already closed, the function returns `false`.
*
* Specifically, [send] suspends if the channel is not unlimited,
* the number of receivers is greater than then index of the working cell of the
* potential [send] invocation, and the buffer does not cover this cell
* in case of buffered channel.
* When the channel is already closed, [send] does not suspend.
*/
@JsName("shouldSendSuspend0")
private fun shouldSendSuspend(curSendersAndCloseStatus: Long): Boolean {
// Does not suspend if the channel is already closed.
if (curSendersAndCloseStatus.isClosedForSend0) return false
// Does not suspend if a rendezvous may happen or the buffer is not full.
return !bufferOrRendezvousSend(curSendersAndCloseStatus.sendersCounter)
}
/**
* Returns `true` when the specified [send] should place
* its element to the working cell without suspension.
*/
private fun bufferOrRendezvousSend(curSenders: Long): Boolean =
curSenders < bufferEndCounter || curSenders < receiversCounter + capacity
/**
* Checks whether a [send] invocation is bound to suspend if it is called
* with the current counter and close status values. See [shouldSendSuspend] for details.
*
* Note that this implementation is _false positive_ in case of rendezvous channels,
* so it can return `false` when a [send] invocation is bound to suspend. Specifically,
* the counter of `receive()` operations may indicate that there is a waiting receiver,
* while it has already been cancelled, so the potential rendezvous is bound to fail.
*/
internal open fun shouldSendSuspend(): Boolean = shouldSendSuspend(sendersAndCloseStatus.value)
/**
* Tries to resume this receiver with the specified [element] as a result.
* Returns `true` on success and `false` otherwise.
*/
@Suppress("UNCHECKED_CAST")
private fun Any.tryResumeReceiver(element: E): Boolean = when(this) {
is SelectInstance<*> -> { // `onReceiveXXX` select clause
trySelect(this@BufferedChannel, element)
}
is ReceiveCatching<*> -> {
this as ReceiveCatching<E>
cont.tryResume0(success(element), onUndeliveredElement?.bindCancellationFunResult())
}
is BufferedChannel<*>.BufferedChannelIterator -> {
this as BufferedChannel<E>.BufferedChannelIterator
tryResumeHasNext(element)
}
is CancellableContinuation<*> -> { // `receive()`
this as CancellableContinuation<E>
tryResume0(element, onUndeliveredElement?.bindCancellationFun())
}
else -> error("Unexpected receiver type: $this")
}
// ##########################
// # The receive operations #
// ##########################
/**
* This function is invoked when a receiver is added as a waiter in this channel.
*/
protected open fun onReceiveEnqueued() {}
/**
* This function is invoked when a waiting receiver is no longer stored in this channel;
* independently on whether it is caused by rendezvous, cancellation, or channel closing.
*/
protected open fun onReceiveDequeued() {}
override suspend fun receive(): E =
receiveImpl( // <-- this is an inline function
// Do not create a continuation until it is required;
// it is created later via [onNoWaiterSuspend], if needed.
waiter = null,
// Return the received element on successful retrieval from
// the buffer or rendezvous with a suspended sender.
// Also, inform `BufferedChannel` extensions that
// synchronization of this receive operation is completed.
onElementRetrieved = { element ->
return element
},
// As no waiter is provided, suspension is impossible.
onSuspend = { _, _, _ -> error("unexpected") },
// Throw an exception if the channel is already closed.
onClosed = { throw recoverStackTrace(receiveException) },
// If `receive()` decides to suspend, the corresponding
// `suspend` function that creates a continuation is called.
// The tail-call optimization is applied here.
onNoWaiterSuspend = { segm, i, r -> receiveOnNoWaiterSuspend(segm, i, r) }
)
private suspend fun receiveOnNoWaiterSuspend(
/* The working cell is specified by
the segment and the index in it. */
segment: ChannelSegment<E>,
index: Int,
/* The global index of the cell. */
r: Long
) = suspendCancellableCoroutineReusable { cont ->
receiveImplOnNoWaiter( // <-- this is an inline function
segment = segment, index = index, r = r,
// Store the created continuation as a waiter.
waiter = cont,
// In case of successful element retrieval, resume
// the continuation with the element and inform the
// `BufferedChannel` extensions that the synchronization
// is completed. Importantly, the receiver coroutine
// may be cancelled after it is successfully resumed but
// not dispatched yet. In case `onUndeliveredElement` is
// specified, we need to invoke it in the latter case.
onElementRetrieved = { element ->
val onCancellation = onUndeliveredElement?.bindCancellationFun()
cont.resume(element, onCancellation)
},
onClosed = { onClosedReceiveOnNoWaiterSuspend(cont) },
)
}
private fun Waiter.prepareReceiverForSuspension(segment: ChannelSegment<E>, index: Int) {
onReceiveEnqueued()
invokeOnCancellation(segment, index)
}
private fun onClosedReceiveOnNoWaiterSuspend(cont: CancellableContinuation<E>) {
cont.resumeWithException(receiveException)
}
/*
The implementation is exactly the same as of `receive()`,
with the only difference that this function returns a `ChannelResult`
instance and does not throw exception explicitly in case the channel
is already closed for receiving. Please refer the plain `receive()`
implementation for the comments.
*/
override suspend fun receiveCatching(): ChannelResult<E> =
receiveImpl( // <-- this is an inline function
waiter = null,
onElementRetrieved = { element ->
success(element)
},
onSuspend = { _, _, _ -> error("unexpected") },
onClosed = { closed(closeCause) },
onNoWaiterSuspend = { segm, i, r -> receiveCatchingOnNoWaiterSuspend(segm, i, r) }
)
private suspend fun receiveCatchingOnNoWaiterSuspend(
segment: ChannelSegment<E>,
index: Int,
r: Long
) = suspendCancellableCoroutineReusable { cont ->
val waiter = ReceiveCatching(cont as CancellableContinuationImpl<ChannelResult<E>>)
receiveImplOnNoWaiter(
segment, index, r,
waiter = waiter,
onElementRetrieved = { element ->
cont.resume(success(element), onUndeliveredElement?.bindCancellationFunResult())
},
onClosed = { onClosedReceiveCatchingOnNoWaiterSuspend(cont) }
)
}
private fun onClosedReceiveCatchingOnNoWaiterSuspend(cont: CancellableContinuation<ChannelResult<E>>) {
cont.resume(closed(closeCause))
}
override fun tryReceive(): ChannelResult<E> {
// Read the `receivers` counter first.
val r = receivers.value
val sendersAndCloseStatusCur = sendersAndCloseStatus.value
// Is this channel closed for receive?
if (sendersAndCloseStatusCur.isClosedForReceive0) {
return closed(closeCause)
}
// Do not try to receive an element if the plain `receive()` operation would suspend.
val s = sendersAndCloseStatusCur.sendersCounter
if (r >= s) return failure()
// Let's try to retrieve an element!
// The logic is similar to the plain `receive()` operation, with
// the only difference that we store `INTERRUPTED_RCV` in case
// the operation decides to suspend. This way, we can leverage
// the unconditional `Fetch-and-Add` instruction.
// One may consider storing `INTERRUPTED_RCV` instead of an actual waiter
// on suspension (a.k.a. "no elements to retrieve") as a short-cut of
// "suspending and cancelling immediately".
return receiveImpl( // <-- this is an inline function
// Store an already interrupted receiver in case of suspension.
waiter = INTERRUPTED_RCV,
// Finish when an element is successfully retrieved.
onElementRetrieved = { element -> success(element) },
// On suspension, the `INTERRUPTED_RCV` token has been
// installed, and this `tryReceive()` must fail.
onSuspend = { segm, _, globalIndex ->
// Emulate "cancelled" receive, thus invoking 'waitExpandBufferCompletion' manually,
// because effectively there were no cancellation
waitExpandBufferCompletion(globalIndex)
segm.onSlotCleaned()
failure()
},
// If the channel is closed, return the corresponding result.
onClosed = { closed(closeCause) }
)
}
/**
* Extracts the first element from this channel until the cell with the specified
* index is moved to the logical buffer. This is a key procedure for the _conflated_
* channel implementation, see [ConflatedBufferedChannel] with the [BufferOverflow.DROP_OLDEST]
* strategy on buffer overflowing.
*/
protected fun dropFirstElementUntilTheSpecifiedCellIsInTheBuffer(globalCellIndex: Long) {
assert { isConflatedDropOldest }
// Read the segment reference before the counter increment;
// it is crucial to be able to find the required segment later.
var segment = receiveSegment.value
while (true) {
// Read the receivers counter to check whether the specified cell is already in the buffer
// or should be moved to the buffer in a short time, due to the already started `receive()`.
val r = this.receivers.value
if (globalCellIndex < max(r + capacity, bufferEndCounter)) return
// The cell is outside the buffer. Try to extract the first element
// if the `receivers` counter has not been changed.
if (!this.receivers.compareAndSet(r, r + 1)) continue
// Count the required segment id and the cell index in it.
val id = r / SEGMENT_SIZE
val i = (r % SEGMENT_SIZE).toInt()
// Try to find the required segment if the initially obtained
// segment (in the beginning of this function) has lower id.
if (segment.id != id) {
// Find the required segment, restarting the operation if it has not been found.
segment = findSegmentReceive(id, segment) ?:
// The required segment has not been found. It is possible that the channel is already
// closed for receiving, so the linked list of segments is closed as well.
// In the latter case, the operation will finish eventually after incrementing
// the `receivers` counter sufficient times. Note that it is impossible to check
// whether this channel is closed for receiving (we do this in `receive`),
// as it may call this function when helping to complete closing the channel.
continue
}
// Update the cell according to the cell life-cycle.
val updCellResult = updateCellReceive(segment, i, r, null)
when {
updCellResult === FAILED -> {
// The cell is poisoned; restart from the beginning.
}
else -> { // element
// A buffered element was retrieved from the cell.
@Suppress("UNCHECKED_CAST")
onUndeliveredElement?.callUndeliveredElementCatchingException(updCellResult as E)?.let { throw it }
}
}
}
}
/**
* Abstract receive implementation.
*/
private inline fun <R> receiveImpl(
/* The waiter to be stored in case of suspension,
or `null` if the waiter is not created yet.
In the latter case, if the algorithm decides
to suspend, [onNoWaiterSuspend] is called. */
waiter: Any?,
/* This lambda is invoked when an element has been
successfully retrieved, either from the buffer or
by making a rendezvous with a suspended sender. */
onElementRetrieved: (element: E) -> R,
/* This lambda is called when the operation suspends in the cell
specified by the segment and its global and in-segment indices. */
onSuspend: (segm: ChannelSegment<E>, i: Int, r: Long) -> R,
/* This lambda is called when the channel is observed
in the closed state and no waiting sender is found,
which means that it is closed for receiving. */
onClosed: () -> R,
/* This lambda is called when the operation decides
to suspend, but the waiter is not provided (equals `null`).
It should create a waiter and delegate to `sendImplOnNoWaiter`. */
onNoWaiterSuspend: (
segm: ChannelSegment<E>,
i: Int,
r: Long
) -> R = { _, _, _ -> error("unexpected") }
): R {
// Read the segment reference before the counter increment;
// it is crucial to be able to find the required segment later.
var segment = receiveSegment.value
while (true) {
// Similar to the `send(e)` operation, `receive()` first checks
// whether the channel is already closed for receiving.
if (isClosedForReceive) return onClosed()
// Atomically increments the `receivers` counter
// and obtain the value right before the increment.
val r = this.receivers.getAndIncrement()
// Count the required segment id and the cell index in it.
val id = r / SEGMENT_SIZE
val i = (r % SEGMENT_SIZE).toInt()
// Try to find the required segment if the initially obtained
// segment (in the beginning of this function) has lower id.
if (segment.id != id) {
// Find the required segment, restarting the operation if it has not been found.
segment = findSegmentReceive(id, segment) ?:
// The required segment is not found. It is possible that the channel is already
// closed for receiving, so the linked list of segments is closed as well.
// In the latter case, the operation fails with the corresponding check at the beginning.
continue
}
// Update the cell according to the cell life-cycle.
val updCellResult = updateCellReceive(segment, i, r, waiter)
return when {
updCellResult === SUSPEND -> {
// The operation has decided to suspend and
// stored the specified waiter in the cell.
(waiter as? Waiter)?.prepareReceiverForSuspension(segment, i)
onSuspend(segment, i, r)
}
updCellResult === FAILED -> {
// The operation has tried to make a rendezvous
// but failed: either the opposite request has
// already been cancelled or the cell is poisoned.
// Restart from the beginning in this case.
continue
}
updCellResult === SUSPEND_NO_WAITER -> {
// The operation has decided to suspend,
// but no waiter has been provided.
onNoWaiterSuspend(segment, i, r)
}
else -> { // element
// Either a buffered element was retrieved from the cell
// or a rendezvous with a waiting sender has happened.
@Suppress("UNCHECKED_CAST")
onElementRetrieved(updCellResult as E)
}
}
}
}
private inline fun receiveImplOnNoWaiter(
/* The working cell is specified by
the segment and the index in it. */
segment: ChannelSegment<E>,
index: Int,
/* The global index of the cell. */
r: Long,
/* The waiter to be stored in case of suspension. */
waiter: Waiter,
/* This lambda is invoked when an element has been
successfully retrieved, either from the buffer or
by making a rendezvous with a suspended sender. */
onElementRetrieved: (element: E) -> Unit,
/* This lambda is called when the channel is observed
in the closed state and no waiting senders is found,
which means that it is closed for receiving. */
onClosed: () -> Unit
) {
// Update the cell with the non-null waiter,
// restarting from the beginning on failure.
// Check the `receiveImpl(..)` function for the comments.
val updCellResult = updateCellReceive(segment, index, r, waiter)
when {
updCellResult === SUSPEND -> {
waiter.prepareReceiverForSuspension(segment, index)
}
updCellResult === FAILED -> {
receiveImpl(
waiter = waiter,
onElementRetrieved = onElementRetrieved,
onSuspend = { _, _, _ -> },
onClosed = onClosed
)
}
else -> {
@Suppress("UNCHECKED_CAST")
onElementRetrieved(updCellResult as E)
}
}
}
private fun updateCellReceive(
/* The working cell is specified by
the segment and the index in it. */
segment: ChannelSegment<E>,
index: Int,
/* The global index of the cell. */
r: Long,
/* The waiter to be stored in case of suspension. */
waiter: Any?,
): Any? {
// This is a fast-path of `updateCellReceiveSlow(..)`.
//
// Read the current cell state.
val state = segment.getState(index)
when {
// The cell is empty.
state === null -> {
// If a rendezvous must happen, the operation does not wait
// until the cell stores a buffered element or a suspended
// sender, poisoning the cell and restarting instead.