@@ -241,7 +241,7 @@ internal open class BufferedChannel<E>(
241
241
/* *
242
242
* Abstract send implementation.
243
243
*/
244
- protected inline fun <R > sendImpl (
244
+ private inline fun <R > sendImpl (
245
245
/* The element to be sent. */
246
246
element : E ,
247
247
/* The waiter to be stored in case of suspension,
@@ -350,6 +350,43 @@ internal open class BufferedChannel<E>(
350
350
}
351
351
}
352
352
353
+ protected fun trySendDropLatest (element : E , isSendOp : Boolean ): ChannelResult <Unit > {
354
+ // Try to send the element without suspension.
355
+ val result = trySend(element)
356
+ // Complete on success or if this channel is closed.
357
+ if (result.isSuccess || result.isClosed) return result
358
+ // This channel is full. Drop the sending element.
359
+ // Call the `onUndeliveredElement` lambda ONLY for 'send()' invocations,
360
+ // for 'trySend()' it is responsibility of the caller
361
+ if (isSendOp) {
362
+ onUndeliveredElement?.callUndeliveredElementCatchingException(element)?.let {
363
+ throw it
364
+ }
365
+ }
366
+ return success(Unit )
367
+ }
368
+
369
+ protected fun trySendDropOldest (element : E ): ChannelResult <Unit > =
370
+ sendImpl( // <-- this is an inline function
371
+ element = element,
372
+ // Put the element into the logical buffer even
373
+ // if this channel is already full, the `onSuspend`
374
+ // callback below extract the first (oldest) element.
375
+ waiter = BUFFERED ,
376
+ // Finish successfully when a rendezvous has happened
377
+ // or the element has been buffered.
378
+ onRendezvousOrBuffered = { return success(Unit ) },
379
+ // In case the algorithm decided to suspend, the element
380
+ // was added to the buffer. However, as the buffer is now
381
+ // overflowed, the first (oldest) element has to be extracted.
382
+ onSuspend = { segm, i ->
383
+ dropFirstElementUntilTheSpecifiedCellIsInTheBuffer(segm.id * SEGMENT_SIZE + i)
384
+ return success(Unit )
385
+ },
386
+ // If the channel is closed, return the corresponding result.
387
+ onClosed = { return closed(sendException) }
388
+ )
389
+
353
390
private inline fun sendImplOnNoWaiter (
354
391
/* The working cell is specified by
355
392
the segment and the index in it. */
0 commit comments