@@ -241,7 +241,7 @@ internal open class BufferedChannel<E>(
241
241
/* *
242
242
* Abstract send implementation.
243
243
*/
244
- protected inline fun <R > sendImpl (
244
+ private inline fun <R > sendImpl (
245
245
/* The element to be sent. */
246
246
element : E ,
247
247
/* The waiter to be stored in case of suspension,
@@ -350,6 +350,27 @@ internal open class BufferedChannel<E>(
350
350
}
351
351
}
352
352
353
+ protected fun trySendDropOldest (element : E ): ChannelResult <Unit > =
354
+ sendImpl( // <-- this is an inline function
355
+ element = element,
356
+ // Put the element into the logical buffer even
357
+ // if this channel is already full, the `onSuspend`
358
+ // callback below extract the first (oldest) element.
359
+ waiter = BUFFERED ,
360
+ // Finish successfully when a rendezvous has happened
361
+ // or the element has been buffered.
362
+ onRendezvousOrBuffered = { return success(Unit ) },
363
+ // In case the algorithm decided to suspend, the element
364
+ // was added to the buffer. However, as the buffer is now
365
+ // overflowed, the first (oldest) element has to be extracted.
366
+ onSuspend = { segm, i ->
367
+ dropFirstElementUntilTheSpecifiedCellIsInTheBuffer(segm.id * SEGMENT_SIZE + i)
368
+ return success(Unit )
369
+ },
370
+ // If the channel is closed, return the corresponding result.
371
+ onClosed = { return closed(sendException) }
372
+ )
373
+
353
374
private inline fun sendImplOnNoWaiter (
354
375
/* The working cell is specified by
355
376
the segment and the index in it. */
@@ -1587,7 +1608,7 @@ internal open class BufferedChannel<E>(
1587
1608
* It is nulled-out on both completion and cancellation paths that
1588
1609
* could happen concurrently.
1589
1610
*/
1590
- @BenignDataRace
1611
+ // @BenignDataRace
1591
1612
private var continuation: CancellableContinuationImpl <Boolean >? = null
1592
1613
1593
1614
// `hasNext()` is just a special receive operation.
0 commit comments