-
Notifications
You must be signed in to change notification settings - Fork 1.9k
/
Copy pathChannel.kt
392 lines (363 loc) · 19.5 KB
/
Channel.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
@file:Suppress("FunctionName")
package kotlinx.coroutines.channels
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel.Factory.CONFLATED
import kotlinx.coroutines.channels.Channel.Factory.RENDEZVOUS
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
import kotlinx.coroutines.selects.*
import kotlin.jvm.*
/**
* Sender's interface to [Channel].
*/
public interface SendChannel<in E> {
/**
* Returns `true` if this channel was closed by invocation of [close] and thus
* the [send] and [offer] attempts throws exception.
*
* **Note: This is an experimental api.** This property may change its semantics and/or name in the future.
*/
@ExperimentalCoroutinesApi
public val isClosedForSend: Boolean
/**
* Returns `true` if the channel is full (out of capacity) and the [send] attempt will suspend.
* This function returns `false` for [isClosedForSend] channel.
*
* **Note: This is an experimental api.** This property may change its semantics and/or name in the future.
*/
@ExperimentalCoroutinesApi
public val isFull: Boolean
/**
* Adds [element] into to this channel, suspending the caller while this channel [isFull],
* or throws exception if the channel [isClosedForSend] (see [close] for details).
*
* Note, that closing a channel _after_ this function had suspended does not cause this suspended send invocation
* to abort, because closing a channel is conceptually like sending a special "close token" over this channel.
* All elements that are sent over the channel are delivered in first-in first-out order. The element that
* is being sent will get delivered to receivers before a close token.
*
* This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
* function is suspended, this function immediately resumes with [CancellationException].
*
* *Cancellation of suspended send is atomic* -- when this function
* throws [CancellationException] it means that the [element] was not sent to this channel.
* As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may
* continue to execute even after it was cancelled from the same thread in the case when this send operation
* was already resumed and the continuation was posted for execution to the thread's queue.
*
* Note, that this function does not check for cancellation when it is not suspended.
* Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
*
* This function can be used in [select] invocation with [onSend] clause.
* Use [offer] to try sending to this channel without waiting.
*/
public suspend fun send(element: E)
/**
* Clause for [select] expression of [send] suspending function that selects when the element that is specified
* as parameter is sent to the channel. When the clause is selected the reference to this channel
* is passed into the corresponding block.
*
* The [select] invocation fails with exception if the channel [isClosedForSend] (see [close] for details).
*/
public val onSend: SelectClause2<E, SendChannel<E>>
/**
* Adds [element] into this queue if it is possible to do so immediately without violating capacity restrictions
* and returns `true`. Otherwise, it returns `false` immediately
* or throws exception if the channel [isClosedForSend] (see [close] for details).
*/
public fun offer(element: E): Boolean
/**
* Closes this channel.
* This is an idempotent operation -- repeated invocations of this function have no effect and return `false`.
* Conceptually, its sends a special "close token" over this channel.
*
* Immediately after invocation of this function
* [isClosedForSend] starts returning `true`. However, [isClosedForReceive][ReceiveChannel.isClosedForReceive]
* on the side of [ReceiveChannel] starts returning `true` only after all previously sent elements
* are received.
*
* A channel that was closed without a [cause] throws [ClosedSendChannelException] on attempts to send or receive.
* A channel that was closed with non-null [cause] is called a _failed_ channel. Attempts to send or
* receive on a failed channel throw the specified [cause] exception.
*/
public fun close(cause: Throwable? = null): Boolean
/**
* Registers handler which is synchronously invoked once the channel is [closed][close]
* or receiving side of this channel is [cancelled][ReceiveChannel.cancel].
* Only one handler can be attached to the channel during channel's lifetime.
* Handler is invoked when [isClosedForSend] starts to return `true`.
* If channel is already closed, handler is invoked immediately.
*
* The meaning of `cause` that is passed to the handler:
* * `null` if channel was closed or cancelled without corresponding argument
* * close or cancel cause otherwise.
*
* Example of usage (exception handling is omitted):
* ```
* val events = Channel(UNLIMITED)
* callbackBasedApi.registerCallback { event ->
* events.offer(event)
* }
*
* val uiUpdater = launch(UI, parent = UILifecycle) {
* events.consume {}
* events.cancel()
* }
*
* events.invokeOnClose { callbackBasedApi.stop() }
*
* ```
*
* **Note: This is an experimental api.** This function may change its semantics, parameters or return type in the future.
*
* @throws UnsupportedOperationException if underlying channel doesn't support [invokeOnClose].
* Implementation note: currently, [invokeOnClose] is unsupported only by Rx-like integrations
*
* @throws IllegalStateException if another handler was already registered
*/
@ExperimentalCoroutinesApi
public fun invokeOnClose(handler: (cause: Throwable?) -> Unit)
}
/**
* Receiver's interface to [Channel].
*/
public interface ReceiveChannel<out E> {
/**
* Returns `true` if this channel was closed by invocation of [close][SendChannel.close] on the [SendChannel]
* side and all previously sent items were already received, so that the [receive] attempt
* throws [ClosedReceiveChannelException]. If the channel was closed because of the exception, it
* is considered closed, too, but it is called a _failed_ channel. All suspending attempts to receive
* an element from a failed channel throw the original [close][SendChannel.close] cause exception.
*
* **Note: This is an experimental api.** This property may change its semantics and/or name in the future.
*/
@ExperimentalCoroutinesApi
public val isClosedForReceive: Boolean
/**
* Returns `true` if the channel is empty (contains no elements) and the [receive] attempt will suspend.
* This function returns `false` for [isClosedForReceive] channel.
*
* **Note: This is an experimental api.** This property may change its semantics and/or name in the future.
*/
@ExperimentalCoroutinesApi
public val isEmpty: Boolean
/**
* Retrieves and removes the element from this channel suspending the caller while this channel [isEmpty]
* or throws [ClosedReceiveChannelException] if the channel [isClosedForReceive].
* If the channel was closed because of the exception, it is called a _failed_ channel and this function
* throws the original [close][SendChannel.close] cause exception.
*
* This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
* function is suspended, this function immediately resumes with [CancellationException].
*
* *Cancellation of suspended receive is atomic* -- when this function
* throws [CancellationException] it means that the element was not retrieved from this channel.
* As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may
* continue to execute even after it was cancelled from the same thread in the case when this receive operation
* was already resumed and the continuation was posted for execution to the thread's queue.
*
* Note, that this function does not check for cancellation when it is not suspended.
* Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
*
* This function can be used in [select] invocation with [onReceive] clause.
* Use [poll] to try receiving from this channel without waiting.
*/
public suspend fun receive(): E
/**
* Clause for [select] expression of [receive] suspending function that selects with the element that
* is received from the channel.
* The [select] invocation fails with exception if the channel
* [isClosedForReceive] (see [close][SendChannel.close] for details).
*/
public val onReceive: SelectClause1<E>
/**
* Retrieves and removes the element from this channel suspending the caller while this channel [isEmpty]
* or returns `null` if the channel is [closed][isClosedForReceive] without cause
* or throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
*
* This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
* function is suspended, this function immediately resumes with [CancellationException].
*
* *Cancellation of suspended receive is atomic* -- when this function
* throws [CancellationException] it means that the element was not retrieved from this channel.
* As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may
* continue to execute even after it was cancelled from the same thread in the case when this receive operation
* was already resumed and the continuation was posted for execution to the thread's queue.
*
* Note, that this function does not check for cancellation when it is not suspended.
* Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
*
* This function can be used in [select] invocation with [onReceiveOrNull] clause.
* Use [poll] to try receiving from this channel without waiting.
*/
public suspend fun receiveOrNull(): E?
/**
* Clause for [select] expression of [receiveOrNull] suspending function that selects with the element that
* is received from the channel or selects with `null` if if the channel
* [isClosedForReceive] without cause. The [select] invocation fails with
* the original [close][SendChannel.close] cause exception if the channel has _failed_.
*
* **Note: This is an experimental api.** This function may be replaced with a better on in the future.
*/
@ExperimentalCoroutinesApi
public val onReceiveOrNull: SelectClause1<E?>
/**
* Retrieves and removes the element from this channel, or returns `null` if this channel [isEmpty]
* or is [isClosedForReceive] without cause.
* It throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
*/
public fun poll(): E?
/**
* Returns new iterator to receive elements from this channels using `for` loop.
* Iteration completes normally when the channel is [isClosedForReceive] without cause and
* throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
*/
public operator fun iterator(): ChannelIterator<E>
/**
* Cancels reception of remaining elements from this channel. This function closes the channel
* and removes all buffered sent elements from it.
*
* Immediately after invocation of this function [isClosedForReceive] and
* [isClosedForSend][SendChannel.isClosedForSend]
* on the side of [SendChannel] start returning `true`, so all attempts to send to this channel
* afterwards will throw [ClosedSendChannelException], while attempts to receive will throw
* [ClosedReceiveChannelException].
*/
public fun cancel(): Unit
/**
* @suppress
*/
@Suppress("INAPPLICABLE_JVM_NAME")
@Deprecated(level = DeprecationLevel.HIDDEN, message = "Left here for binary compatibility")
@JvmName("cancel")
public fun cancel0(): Boolean = cancel(null)
/**
* @suppress
*/
@ObsoleteCoroutinesApi
@Deprecated(level = DeprecationLevel.WARNING, message = "Use cancel without cause", replaceWith = ReplaceWith("cancel()"))
public fun cancel(cause: Throwable? = null): Boolean
}
/**
* Iterator for [ReceiveChannel]. Instances of this interface are *not thread-safe* and shall not be used
* from concurrent coroutines.
*/
public interface ChannelIterator<out E> {
/**
* Returns `true` if the channel has more elements suspending the caller while this channel
* [isEmpty][ReceiveChannel.isEmpty] or returns `false` if the channel
* [isClosedForReceive][ReceiveChannel.isClosedForReceive] without cause.
* It throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
*
* This function retrieves and removes the element from this channel for the subsequent invocation
* of [next].
*
* This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
* function is suspended, this function immediately resumes with [CancellationException].
*
* *Cancellation of suspended receive is atomic* -- when this function
* throws [CancellationException] it means that the element was not retrieved from this channel.
* As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may
* continue to execute even after it was cancelled from the same thread in the case when this receive operation
* was already resumed and the continuation was posted for execution to the thread's queue.
*
* Note, that this function does not check for cancellation when it is not suspended.
* Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
*/
public suspend operator fun hasNext(): Boolean
/**
* Retrieves and removes the element from this channel suspending the caller while this channel
* [isEmpty][ReceiveChannel.isEmpty] or throws [ClosedReceiveChannelException] if the channel
* [isClosedForReceive][ReceiveChannel.isClosedForReceive] without cause.
* It throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
*
* This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
* function is suspended, this function immediately resumes with [CancellationException].
*
* *Cancellation of suspended receive is atomic* -- when this function
* throws [CancellationException] it means that the element was not retrieved from this channel.
* As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may
* continue to execute even after it was cancelled from the same thread in the case when this receive operation
* was already resumed and the continuation was posted for execution to the thread's queue.
*
* Note, that this function does not check for cancellation when it is not suspended.
* Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
*/
public suspend operator fun next(): E
}
/**
* Channel is a non-blocking primitive for communication between sender using [SendChannel] and receiver using [ReceiveChannel].
* Conceptually, a channel is similar to [BlockingQueue][java.util.concurrent.BlockingQueue],
* but it has suspending operations instead of blocking ones and it can be closed.
*
* `Channel(capacity)` factory function is used to create channels of different kind depending on
* the value of `capacity` integer:
*
* * When `capacity` is 0 -- it creates `RendezvousChannel`.
* This channel does not have any buffer at all. An element is transferred from sender
* to receiver only when [send] and [receive] invocations meet in time (rendezvous), so [send] suspends
* until another coroutine invokes [receive] and [receive] suspends until another coroutine invokes [send].
*
* * When `capacity` is [Channel.UNLIMITED] -- it creates `LinkedListChannel`.
* This is a channel with linked-list buffer of a unlimited capacity (limited only by available memory).
* Sender to this channel never suspends and [offer] always returns `true`.
*
* * When `capacity` is [Channel.CONFLATED] -- it creates `ConflatedChannel`.
* This channel buffers at most one element and conflates all subsequent `send` and `offer` invocations,
* so that the receiver always gets the most recently sent element.
* Back-to-send sent elements are _conflated_ -- only the the most recently sent element is received,
* while previously sent elements **are lost**.
* Sender to this channel never suspends and [offer] always returns `true`.
*
* * When `capacity` is positive, but less than [UNLIMITED] -- it creates array-based channel with given capacity.
* This channel has an array buffer of a fixed `capacity`.
* Sender suspends only when buffer is fully and receiver suspends only when buffer is empty.
*/
public interface Channel<E> : SendChannel<E>, ReceiveChannel<E> {
/**
* Constants for channel factory function `Channel()`.
*/
public companion object Factory {
/**
* Requests channel with unlimited capacity buffer in `Channel(...)` factory function
*/
public const val UNLIMITED = Int.MAX_VALUE
/**
* Requests rendezvous channel in `Channel(...)` factory function -- the `RendezvousChannel` gets created.
*/
public const val RENDEZVOUS = 0
/**
* Requests conflated channel in `Channel(...)` factory function -- the `ConflatedChannel` gets created.
*/
public const val CONFLATED = -1
}
}
/**
* Creates a channel with the specified buffer capacity (or without a buffer by default).
* See [Channel] interface documentation for details.
*
* @throws IllegalArgumentException when [capacity] < -1
*/
public fun <E> Channel(capacity: Int = RENDEZVOUS): Channel<E> =
when (capacity) {
RENDEZVOUS -> RendezvousChannel()
UNLIMITED -> LinkedListChannel()
CONFLATED -> ConflatedChannel()
else -> ArrayChannel(capacity)
}
/**
* Indicates attempt to [send][SendChannel.send] on [isClosedForSend][SendChannel.isClosedForSend] channel
* that was closed without a cause. A _failed_ channel rethrows the original [close][SendChannel.close] cause
* exception on send attempts.
*/
public class ClosedSendChannelException(message: String?) : CancellationException(message)
/**
* Indicates attempt to [receive][ReceiveChannel.receive] on [isClosedForReceive][ReceiveChannel.isClosedForReceive]
* channel that was closed without a cause. A _failed_ channel rethrows the original [close][SendChannel.close] cause
* exception on receive attempts.
*
* This exception is subclass of [NoSuchElementException] to be consistent with plain collections.
*/
public class ClosedReceiveChannelException(message: String?) : NoSuchElementException(message)