• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 @file:Suppress("FunctionName")
6 
7 package kotlinx.coroutines.channels
8 
9 import kotlinx.coroutines.*
10 import kotlinx.coroutines.channels.Channel.Factory.BUFFERED
11 import kotlinx.coroutines.channels.Channel.Factory.CHANNEL_DEFAULT_CAPACITY
12 import kotlinx.coroutines.channels.Channel.Factory.CONFLATED
13 import kotlinx.coroutines.channels.Channel.Factory.RENDEZVOUS
14 import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
15 import kotlinx.coroutines.internal.*
16 import kotlinx.coroutines.selects.*
17 import kotlin.internal.*
18 import kotlin.jvm.*
19 
20 /**
21  * Sender's interface to [Channel].
22  */
23 public interface SendChannel<in E> {
24     /**
25      * Returns `true` if this channel was closed by an invocation of [close]. This means that
26      * calling [send] or [offer] will result in an exception.
27      *
28      * **Note: This is an experimental api.** This property may change its semantics and/or name in the future.
29      */
30     @ExperimentalCoroutinesApi
31     public val isClosedForSend: Boolean
32 
33     /**
34      * Returns `true` if the channel is full (out of capacity), which means that an attempt to [send] will suspend.
35      * This function returns `false`  if the channel [is closed for `send`][isClosedForSend].
36      *
37      * @suppress **Will be removed in next releases, no replacement.**
38      */
39     @ExperimentalCoroutinesApi
40     @Deprecated(level = DeprecationLevel.ERROR, message = "Will be removed in next releases without replacement")
41     public val isFull: Boolean
42 
43     /**
44      * Sends the specified [element] to this channel, suspending the caller while the buffer of this channel is full
45      * or if it does not exist, or throws an exception if the channel [is closed for `send`][isClosedForSend] (see [close] for details).
46      *
47      * [Closing][close] a channel _after_ this function has suspended does not cause this suspended [send] invocation
48      * to abort, because closing a channel is conceptually like sending a special "close token" over this channel.
49      * All elements sent over the channel are delivered in first-in first-out order. The sent element
50      * will be delivered to receivers before the close token.
51      *
52      * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
53      * function is suspended, this function immediately resumes with a [CancellationException].
54      * There is a **prompt cancellation guarantee**. If the job was cancelled while this function was
55      * suspended, it will not resume successfully. The `send` call can send the element to the channel,
56      * but then throw [CancellationException], thus an exception should not be treated as a failure to deliver the element.
57      * See "Undelivered elements" section in [Channel] documentation for details on handling undelivered elements.
58      *
59      * Note that this function does not check for cancellation when it is not suspended.
60      * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
61      *
62      * This function can be used in [select] invocations with the [onSend] clause.
63      * Use [offer] to try sending to this channel without waiting.
64      */
sendnull65     public suspend fun send(element: E)
66 
67     /**
68      * Clause for the [select] expression of the [send] suspending function that selects when the element that is specified
69      * as the parameter is sent to the channel. When the clause is selected, the reference to this channel
70      * is passed into the corresponding block.
71      *
72      * The [select] invocation fails with an exception if the channel [is closed for `send`][isClosedForSend] (see [close] for details).
73      */
74     public val onSend: SelectClause2<E, SendChannel<E>>
75 
76     /**
77      * Immediately adds the specified [element] to this channel, if this doesn't violate its capacity restrictions,
78      * and returns `true`. Otherwise, just returns `false`. This is a synchronous variant of [send] which backs off
79      * in situations when `send` suspends.
80      *
81      * Throws an exception if the channel [is closed for `send`][isClosedForSend] (see [close] for details).
82      *
83      * When `offer` call returns `false` it guarantees that the element was not delivered to the consumer and it
84      * it does not call `onUndeliveredElement` that was installed for this channel. If the channel was closed,
85      * then it calls `onUndeliveredElement` before throwing an exception.
86      * See "Undelivered elements" section in [Channel] documentation for details on handling undelivered elements.
87      */
88     public fun offer(element: E): Boolean
89 
90     /**
91      * Closes this channel.
92      * This is an idempotent operation &mdash; subsequent invocations of this function have no effect and return `false`.
93      * Conceptually, its sends a special "close token" over this channel.
94      *
95      * Immediately after invocation of this function,
96      * [isClosedForSend] starts returning `true`. However, [isClosedForReceive][ReceiveChannel.isClosedForReceive]
97      * on the side of [ReceiveChannel] starts returning `true` only after all previously sent elements
98      * are received.
99      *
100      * A channel that was closed without a [cause] throws a [ClosedSendChannelException] on attempts to [send] or [offer]
101      * and [ClosedReceiveChannelException] on attempts to [receive][ReceiveChannel.receive].
102      * A channel that was closed with non-null [cause] is called a _failed_ channel. Attempts to send or
103      * receive on a failed channel throw the specified [cause] exception.
104      */
105     public fun close(cause: Throwable? = null): Boolean
106 
107     /**
108      * Registers a [handler] which is synchronously invoked once the channel is [closed][close]
109      * or the receiving side of this channel is [cancelled][ReceiveChannel.cancel].
110      * Only one handler can be attached to a channel during its lifetime.
111      * The `handler` is invoked when [isClosedForSend] starts to return `true`.
112      * If the channel is closed already, the handler is invoked immediately.
113      *
114      * The meaning of `cause` that is passed to the handler:
115      * * `null` if the channel was closed or cancelled without the corresponding argument
116      * * the cause of `close` or `cancel` otherwise.
117      *
118      * Example of usage (exception handling is omitted):
119      * ```
120      * val events = Channel(UNLIMITED)
121      * callbackBasedApi.registerCallback { event ->
122      *   events.offer(event)
123      * }
124      *
125      * val uiUpdater = launch(Dispatchers.Main, parent = UILifecycle) {
126      *    events.consume {}
127      *    events.cancel()
128      * }
129      *
130      * events.invokeOnClose { callbackBasedApi.stop() }
131      *
132      * ```
133      *
134      * **Note: This is an experimental api.** This function may change its semantics, parameters or return type in the future.
135      *
136      * @throws UnsupportedOperationException if the underlying channel doesn't support [invokeOnClose].
137      * Implementation note: currently, [invokeOnClose] is unsupported only by Rx-like integrations
138      *
139      * @throws IllegalStateException if another handler was already registered
140      */
141     @ExperimentalCoroutinesApi
142     public fun invokeOnClose(handler: (cause: Throwable?) -> Unit)
143 }
144 
145 /**
146  * Receiver's interface to [Channel].
147  */
148 public interface ReceiveChannel<out E> {
149     /**
150      * Returns `true` if this channel was closed by invocation of [close][SendChannel.close] on the [SendChannel]
151      * side and all previously sent items were already received. This means that calling [receive]
152      * will result in a [ClosedReceiveChannelException]. If the channel was closed because of an exception, it
153      * is considered closed, too, but is called a _failed_ channel. All suspending attempts to receive
154      * an element from a failed channel throw the original [close][SendChannel.close] cause exception.
155      *
156      * **Note: This is an experimental api.** This property may change its semantics and/or name in the future.
157      */
158     @ExperimentalCoroutinesApi
159     public val isClosedForReceive: Boolean
160 
161     /**
162      * Returns `true` if the channel is empty (contains no elements), which means that an attempt to [receive] will suspend.
163      * This function returns `false` if the channel [is closed for `receive`][isClosedForReceive].
164      */
165     @ExperimentalCoroutinesApi
166     public val isEmpty: Boolean
167 
168     /**
169      * Retrieves and removes an element from this channel if it's not empty, or suspends the caller while the channel is empty,
170      * or throws a [ClosedReceiveChannelException] if the channel [is closed for `receive`][isClosedForReceive].
171      * If the channel was closed because of an exception, it is called a _failed_ channel and this function
172      * will throw the original [close][SendChannel.close] cause exception.
173      *
174      * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
175      * function is suspended, this function immediately resumes with a [CancellationException].
176      * There is a **prompt cancellation guarantee**. If the job was cancelled while this function was
177      * suspended, it will not resume successfully. The `receive` call can retrieve the element from the channel,
178      * but then throw [CancellationException], thus failing to deliver the element.
179      * See "Undelivered elements" section in [Channel] documentation for details on handling undelivered elements.
180      *
181      * Note that this function does not check for cancellation when it is not suspended.
182      * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
183      *
184      * This function can be used in [select] invocations with the [onReceive] clause.
185      * Use [poll] to try receiving from this channel without waiting.
186      */
187     public suspend fun receive(): E
188 
189     /**
190      * Clause for the [select] expression of the [receive] suspending function that selects with the element
191      * received from the channel.
192      * The [select] invocation fails with an exception if the channel
193      * [is closed for `receive`][isClosedForReceive] (see [close][SendChannel.close] for details).
194      */
195     public val onReceive: SelectClause1<E>
196 
197     /**
198      * Retrieves and removes an element from this channel if it's not empty, or suspends the caller while the channel is empty,
199      * or returns `null` if the channel is [closed for `receive`][isClosedForReceive] without cause,
200      * or throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
201      *
202      * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
203      * function is suspended, this function immediately resumes with a [CancellationException].
204      * There is a **prompt cancellation guarantee**. If the job was cancelled while this function was
205      * suspended, it will not resume successfully.  The `receiveOrNull` call can retrieve the element from the channel,
206      * but then throw [CancellationException], thus failing to deliver the element.
207      * See "Undelivered elements" section in [Channel] documentation for details on handling undelivered elements.
208      *
209      * Note that this function does not check for cancellation when it is not suspended.
210      * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
211      *
212      * This function can be used in [select] invocations with the [onReceiveOrNull] clause.
213      * Use [poll] to try receiving from this channel without waiting.
214      *
215      * @suppress **Deprecated**: in favor of receiveOrClosed and receiveOrNull extension.
216      */
217     @ObsoleteCoroutinesApi
218     @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
219     @LowPriorityInOverloadResolution
220     @Deprecated(
221         message = "Deprecated in favor of receiveOrClosed and receiveOrNull extension",
222         level = DeprecationLevel.WARNING,
223         replaceWith = ReplaceWith("receiveOrNull", "kotlinx.coroutines.channels.receiveOrNull")
224     )
225     public suspend fun receiveOrNull(): E?
226 
227     /**
228      * Clause for the [select] expression of the [receiveOrNull] suspending function that selects with the element
229      * received from the channel or `null` if the channel is
230      * [closed for `receive`][isClosedForReceive] without a cause. The [select] invocation fails with
231      * the original [close][SendChannel.close] cause exception if the channel has _failed_.
232      *
233      * @suppress **Deprecated**: in favor of onReceiveOrClosed and onReceiveOrNull extension.
234      */
235     @ObsoleteCoroutinesApi
236     @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
237     @LowPriorityInOverloadResolution
238     @Deprecated(
239         message = "Deprecated in favor of onReceiveOrClosed and onReceiveOrNull extension",
240         level = DeprecationLevel.WARNING,
241         replaceWith = ReplaceWith("onReceiveOrNull", "kotlinx.coroutines.channels.onReceiveOrNull")
242     )
243     public val onReceiveOrNull: SelectClause1<E?>
244 
245     /**
246      * Retrieves and removes an element from this channel if it's not empty, or suspends the caller while this channel is empty.
247      * This method returns [ValueOrClosed] with the value of an element successfully retrieved from the channel
248      * or the close cause if the channel was closed.
249      *
250      * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
251      * function is suspended, this function immediately resumes with a [CancellationException].
252      * There is a **prompt cancellation guarantee**. If the job was cancelled while this function was
253      * suspended, it will not resume successfully. The `receiveOrClosed` call can retrieve the element from the channel,
254      * but then throw [CancellationException], thus failing to deliver the element.
255      * See "Undelivered elements" section in [Channel] documentation for details on handling undelivered elements.
256      *
257      * Note that this function does not check for cancellation when it is not suspended.
258      * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
259      *
260      * This function can be used in [select] invocations with the [onReceiveOrClosed] clause.
261      * Use [poll] to try receiving from this channel without waiting.
262      *
263      * @suppress *This is an internal API, do not use*: Inline classes ABI is not stable yet and
264      *            [KT-27524](https://youtrack.jetbrains.com/issue/KT-27524) needs to be fixed.
265      */
266     @InternalCoroutinesApi // until https://youtrack.jetbrains.com/issue/KT-27524 is fixed
267     public suspend fun receiveOrClosed(): ValueOrClosed<E>
268 
269     /**
270      * Clause for the [select] expression of the [receiveOrClosed] suspending function that selects with the [ValueOrClosed] with a value
271      * that is received from the channel or with a close cause if the channel
272      * [is closed for `receive`][isClosedForReceive].
273      *
274      * @suppress *This is an internal API, do not use*: Inline classes ABI is not stable yet and
275      *            [KT-27524](https://youtrack.jetbrains.com/issue/KT-27524) needs to be fixed.
276      */
277     @InternalCoroutinesApi // until https://youtrack.jetbrains.com/issue/KT-27524 is fixed
278     public val onReceiveOrClosed: SelectClause1<ValueOrClosed<E>>
279 
280     /**
281      * Retrieves and removes an element from this channel if its not empty, or returns `null` if the channel is empty
282      * or is [is closed for `receive`][isClosedForReceive] without a cause.
283      * It throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
284      */
285     public fun poll(): E?
286 
287     /**
288      * Returns a new iterator to receive elements from this channel using a `for` loop.
289      * Iteration completes normally when the channel [is closed for `receive`][isClosedForReceive] without a cause and
290      * throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
291      */
292     public operator fun iterator(): ChannelIterator<E>
293 
294     /**
295      * Cancels reception of remaining elements from this channel with an optional [cause].
296      * This function closes the channel and removes all buffered sent elements from it.
297      *
298      * A cause can be used to specify an error message or to provide other details on
299      * the cancellation reason for debugging purposes.
300      * If the cause is not specified, then an instance of [CancellationException] with a
301      * default message is created to [close][SendChannel.close] the channel.
302      *
303      * Immediately after invocation of this function [isClosedForReceive] and
304      * [isClosedForSend][SendChannel.isClosedForSend]
305      * on the side of [SendChannel] start returning `true`. Any attempt to send to or receive from this channel
306      * will lead to a [CancellationException].
307      */
308     public fun cancel(cause: CancellationException? = null)
309 
310     /**
311      * @suppress This method implements old version of JVM ABI. Use [cancel].
312      */
313     @Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.2.0, binary compatibility with versions <= 1.1.x")
314     public fun cancel(): Unit = cancel(null)
315 
316     /**
317      * @suppress This method has bad semantics when cause is not a [CancellationException]. Use [cancel].
318      */
319     @Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.2.0, binary compatibility with versions <= 1.1.x")
320     public fun cancel(cause: Throwable? = null): Boolean
321 }
322 
323 /**
324  * A discriminated union of [ReceiveChannel.receiveOrClosed] result
325  * that encapsulates either an element of type [T] successfully received from the channel or a close cause.
326  *
327  * :todo: Do not make it public before resolving todos in the code of this class.
328  *
329  * @suppress *This is an internal API, do not use*: Inline classes ABI is not stable yet and
330  *            [KT-27524](https://youtrack.jetbrains.com/issue/KT-27524) needs to be fixed.
331  */
332 @Suppress("NON_PUBLIC_PRIMARY_CONSTRUCTOR_OF_INLINE_CLASS", "EXPERIMENTAL_FEATURE_WARNING")
333 @InternalCoroutinesApi // until https://youtrack.jetbrains.com/issue/KT-27524 is fixed
334 public inline class ValueOrClosed<out T>
335 internal constructor(private val holder: Any?) {
336     /**
337      * Returns `true` if this instance represents a received element.
338      * In this case [isClosed] returns `false`.
339      * todo: it is commented for now, because it is not used
340      */
341     //public val isValue: Boolean get() = holder !is Closed
342 
343     /**
344      * Returns `true` if this instance represents a close cause.
345      * In this case [isValue] returns `false`.
346      */
347     public val isClosed: Boolean get() = holder is Closed
348 
349     /**
350      * Returns the received value if this instance represents a received value, or throws an [IllegalStateException] otherwise.
351      *
352      * :todo: Decide, if it is needed, how it shall be named with relation to [valueOrThrow]:
353      *
354      * So we have the following methods on `ValueOrClosed`: `value`, `valueOrNull`, `valueOrThrow`.
355      * On the other hand, the channel has the following `receive` variants:
356      *  * `receive` which corresponds to `receiveOrClosed().valueOrThrow`... huh?
357      *  * `receiveOrNull` which corresponds to `receiveOrClosed().valueOrNull`
358      *  * `receiveOrClosed`
359      * For the sake of simplicity consider dropping this version of `value` and rename [valueOrThrow] to simply `value`.
360      */
361     @Suppress("UNCHECKED_CAST")
362     public val value: T
363         get() = if (holder is Closed) error(DEFAULT_CLOSE_MESSAGE) else holder as T
364 
365     /**
366      * Returns the received value if this element represents a received value, or `null` otherwise.
367      * :todo: Decide if it shall be made into extension that is available only for non-null T.
368      * Note: it might become inconsistent with kotlin.Result
369      */
370     @Suppress("UNCHECKED_CAST")
371     public val valueOrNull: T?
372         get() = if (holder is Closed) null else holder as T
373 
374     /**
375      * :todo: Decide, if it is needed, how it shall be named with relation to [value].
376      * Note that `valueOrThrow` rethrows the cause adding no meaningful information about the call site,
377      * so if one is sure that `ValueOrClosed` always holds a value, this very property should be used.
378      * Otherwise, it could be very hard to locate the source of the exception.
379      * todo: it is commented for now, because it is not used
380      */
381     //@Suppress("UNCHECKED_CAST")
382     //public val valueOrThrow: T
383     //    get() = if (holder is Closed) throw holder.exception else holder as T
384 
385     /**
386      * Returns the close cause of the channel if this instance represents a close cause, or throws
387      * an [IllegalStateException] otherwise.
388      */
389     @Suppress("UNCHECKED_CAST")
390     public val closeCause: Throwable? get() =
391         if (holder is Closed) holder.cause else error("Channel was not closed")
392 
393     /**
394      * @suppress
395      */
toStringnull396     public override fun toString(): String =
397         when (holder) {
398             is Closed -> holder.toString()
399             else -> "Value($holder)"
400     }
401 
402     internal class Closed(@JvmField val cause: Throwable?) {
403         // todo: it is commented for now, because it is not used
404         //val exception: Throwable get() = cause ?: ClosedReceiveChannelException(DEFAULT_CLOSE_MESSAGE)
equalsnull405         override fun equals(other: Any?): Boolean = other is Closed && cause == other.cause
406         override fun hashCode(): Int = cause.hashCode()
407         override fun toString(): String = "Closed($cause)"
408     }
409 
410     /**
411      * todo: consider making value/closed constructors public in the future.
412      */
413     internal companion object {
414         @Suppress("NOTHING_TO_INLINE")
415         internal inline fun <E> value(value: E): ValueOrClosed<E> =
416             ValueOrClosed(value)
417 
418         @Suppress("NOTHING_TO_INLINE")
419         internal inline fun <E> closed(cause: Throwable?): ValueOrClosed<E> =
420             ValueOrClosed(Closed(cause))
421     }
422 }
423 
424 /**
425  * Iterator for [ReceiveChannel]. Instances of this interface are *not thread-safe* and shall not be used
426  * from concurrent coroutines.
427  */
428 public interface ChannelIterator<out E> {
429     /**
430      * Returns `true` if the channel has more elements, suspending the caller while this channel is empty,
431      * or returns `false` if the channel [is closed for `receive`][ReceiveChannel.isClosedForReceive] without a cause.
432      * It throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
433      *
434      * This function retrieves and removes an element from this channel for the subsequent invocation
435      * of [next].
436      *
437      * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
438      * function is suspended, this function immediately resumes with a [CancellationException].
439      * There is a **prompt cancellation guarantee**. If the job was cancelled while this function was
440      * suspended, it will not resume successfully. The `hasNext` call can retrieve the element from the channel,
441      * but then throw [CancellationException], thus failing to deliver the element.
442      * See "Undelivered elements" section in [Channel] documentation for details on handling undelivered elements.
443      *
444      * Note that this function does not check for cancellation when it is not suspended.
445      * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
446      */
hasNextnull447     public suspend operator fun hasNext(): Boolean
448 
449     @Deprecated(message = "Since 1.3.0, binary compatibility with versions <= 1.2.x", level = DeprecationLevel.HIDDEN)
450     @Suppress("INAPPLICABLE_JVM_NAME")
451     @JvmName("next")
452     public suspend fun next0(): E {
453         /*
454          * Before 1.3.0 the "next()" could have been used without invoking "hasNext" first and there were code samples
455          * demonstrating this behavior, so we preserve this logic for full binary backwards compatibility with previously
456          * compiled code.
457          */
458         if (!hasNext()) throw ClosedReceiveChannelException(DEFAULT_CLOSE_MESSAGE)
459         return next()
460     }
461 
462     /**
463      * Retrieves the element removed from the channel by a preceding call to [hasNext], or
464      * throws an [IllegalStateException] if [hasNext] was not invoked.
465      * This method should only be used in pair with [hasNext]:
466      * ```
467      * while (iterator.hasNext()) {
468      *     val element = iterator.next()
469      *     // ... handle element ...
470      * }
471      * ```
472      *
473      * This method throws a [ClosedReceiveChannelException] if the channel [is closed for `receive`][ReceiveChannel.isClosedForReceive] without a cause.
474      * It throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
475      */
nextnull476     public operator fun next(): E
477 }
478 
479 /**
480  * Channel is a non-blocking primitive for communication between a sender (via [SendChannel]) and a receiver (via [ReceiveChannel]).
481  * Conceptually, a channel is similar to Java's [BlockingQueue][java.util.concurrent.BlockingQueue],
482  * but it has suspending operations instead of blocking ones and can be [closed][SendChannel.close].
483  *
484  * ### Creating channels
485  *
486  * The `Channel(capacity)` factory function is used to create channels of different kinds depending on
487  * the value of the `capacity` integer:
488  *
489  * * When `capacity` is 0 &mdash; it creates a _rendezvous_ channel.
490  *   This channel does not have any buffer at all. An element is transferred from the sender
491  *   to the receiver only when [send] and [receive] invocations meet in time (rendezvous), so [send] suspends
492  *   until another coroutine invokes [receive], and [receive] suspends until another coroutine invokes [send].
493  *
494  * * When `capacity` is [Channel.UNLIMITED] &mdash; it creates a channel with effectively unlimited buffer.
495  *   This channel has a linked-list buffer of unlimited capacity (limited only by available memory).
496  *   [Sending][send] to this channel never suspends, and [offer] always returns `true`.
497  *
498  * * When `capacity` is [Channel.CONFLATED] &mdash; it creates a _conflated_ channel
499  *   This channel buffers at most one element and conflates all subsequent `send` and `offer` invocations,
500  *   so that the receiver always gets the last element sent.
501  *   Back-to-send sent elements are conflated &mdash; only the last sent element is received,
502  *   while previously sent elements **are lost**.
503  *   [Sending][send] to this channel never suspends, and [offer] always returns `true`.
504  *
505  * * When `capacity` is positive but less than [UNLIMITED] &mdash; it creates an array-based channel with the specified capacity.
506  *   This channel has an array buffer of a fixed `capacity`.
507  *   [Sending][send] suspends only when the buffer is full, and [receiving][receive] suspends only when the buffer is empty.
508  *
509  * Buffered channels can be configured with an additional [`onBufferOverflow`][BufferOverflow] parameter. It controls the behaviour
510  * of the channel's [send][Channel.send] function on buffer overflow:
511  *
512  * * [SUSPEND][BufferOverflow.SUSPEND] &mdash; the default, suspend `send` on buffer overflow until there is
513  *   free space in the buffer.
514  * * [DROP_OLDEST][BufferOverflow.DROP_OLDEST] &mdash; do not suspend the `send`, add the latest value to the buffer,
515  *   drop the oldest one from the buffer.
516  *   A channel with `capacity = 1` and `onBufferOverflow = DROP_OLDEST` is a _conflated_ channel.
517  * * [DROP_LATEST][BufferOverflow.DROP_LATEST] &mdash; do not suspend the `send`, drop the value that is being sent,
518  *   keep the buffer contents intact.
519  *
520  * A non-default `onBufferOverflow` implicitly creates a channel with at least one buffered element and
521  * is ignored for a channel with unlimited buffer. It cannot be specified for `capacity = CONFLATED`, which
522  * is a shortcut by itself.
523  *
524  * ### Prompt cancellation guarantee
525  *
526  * All suspending functions with channels provide **prompt cancellation guarantee**.
527  * If the job was cancelled while send or receive function was suspended, it will not resume successfully,
528  * but throws a [CancellationException].
529  * With a single-threaded [dispatcher][CoroutineDispatcher] like [Dispatchers.Main] this gives a
530  * guarantee that if a piece code running in this thread cancels a [Job], then a coroutine running this job cannot
531  * resume successfully and continue to run, ensuring a prompt response to its cancellation.
532  *
533  * > **Prompt cancellation guarantee** for channel operations was added since `kotlinx.coroutines` version `1.4.0`
534  * > and had replaced a channel-specific atomic-cancellation that was not consistent with other suspending functions.
535  * > The low-level mechanics of prompt cancellation are explained in [suspendCancellableCoroutine] function.
536  *
537  * ### Undelivered elements
538  *
539  * As a result of a prompt cancellation guarantee, when a closeable resource
540  * (like open file or a handle to another native resource) is transferred via channel from one coroutine to another
541  * it can fail to be delivered and will be lost if either send or receive operations are cancelled in transit.
542  *
543  * A `Channel()` constructor function has an `onUndeliveredElement` optional parameter.
544  * When `onUndeliveredElement` parameter is set, the corresponding function is called once for each element
545  * that was sent to the channel with the call to the [send][SendChannel.send] function but failed to be delivered,
546  * which can happen in the following cases:
547  *
548  * * When [send][SendChannel.send] operation throws an exception because it was cancelled before it had a chance to actually
549  *   send the element or because the channel was [closed][SendChannel.close] or [cancelled][ReceiveChannel.cancel].
550  * * When [offer][SendChannel.offer] operation throws an exception when
551  *   the channel was [closed][SendChannel.close] or [cancelled][ReceiveChannel.cancel].
552  * * When [receive][ReceiveChannel.receive], [receiveOrNull][ReceiveChannel.receiveOrNull], or [hasNext][ChannelIterator.hasNext]
553  *   operation throws an exception when it had retrieved the element from the
554  *   channel but was cancelled before the code following the receive call resumed.
555  * * The channel was [cancelled][ReceiveChannel.cancel], in which case `onUndeliveredElement` is called on every
556  *   remaining element in the channel's buffer.
557  *
558  * Note, that `onUndeliveredElement` function is called synchronously in an arbitrary context. It should be fast, non-blocking,
559  * and should not throw exceptions. Any exception thrown by `onUndeliveredElement` is wrapped into an internal runtime
560  * exception which is either rethrown from the caller method or handed off to the exception handler in the current context
561  * (see [CoroutineExceptionHandler]) when one is available.
562  *
563  * A typical usage for `onDeliveredElement` is to close a resource that is being transferred via the channel. The
564  * following code pattern guarantees that opened resources are closed even if producer, consumer, and/or channel
565  * are cancelled. Resources are never lost.
566  *
567  * ```
568  * // Create the channel with onUndeliveredElement block that closes a resource
569  * val channel = Channel<Resource>(capacity) { resource -> resource.close() }
570  *
571  * // Producer code
572  * val resourceToSend = openResource()
573  * channel.send(resourceToSend)
574  *
575  * // Consumer code
576  * val resourceReceived = channel.receive()
577  * try {
578  *     // work with received resource
579  * } finally {
580  *     resourceReceived.close()
581  * }
582  * ```
583  *
584  * > Note, that if you do any kind of work in between `openResource()` and `channel.send(...)`, then you should
585  * > ensure that resource gets closed in case this additional code fails.
586  */
587 public interface Channel<E> : SendChannel<E>, ReceiveChannel<E> {
588     /**
589      * Constants for the channel factory function `Channel()`.
590      */
591     public companion object Factory {
592         /**
593          * Requests a channel with an unlimited capacity buffer in the `Channel(...)` factory function.
594          */
595         public const val UNLIMITED: Int = Int.MAX_VALUE
596 
597         /**
598          * Requests a rendezvous channel in the `Channel(...)` factory function &mdash; a channel that does not have a buffer.
599          */
600         public const val RENDEZVOUS: Int = 0
601 
602         /**
603          * Requests a conflated channel in the `Channel(...)` factory function. This is a shortcut to creating
604          * a channel with [`onBufferOverflow = DROP_OLDEST`][BufferOverflow.DROP_OLDEST].
605          */
606         public const val CONFLATED: Int = -1
607 
608         /**
609          * Requests a buffered channel with the default buffer capacity in the `Channel(...)` factory function.
610          * The default capacity for a channel that [suspends][BufferOverflow.SUSPEND] on overflow
611          * is 64 and can be overridden by setting [DEFAULT_BUFFER_PROPERTY_NAME] on JVM.
612          * For non-suspending channels, a buffer of capacity 1 is used.
613          */
614         public const val BUFFERED: Int = -2
615 
616         // only for internal use, cannot be used with Channel(...)
617         internal const val OPTIONAL_CHANNEL = -3
618 
619         /**
620          * Name of the property that defines the default channel capacity when
621          * [BUFFERED] is used as parameter in `Channel(...)` factory function.
622          */
623         public const val DEFAULT_BUFFER_PROPERTY_NAME: String = "kotlinx.coroutines.channels.defaultBuffer"
624 
625         internal val CHANNEL_DEFAULT_CAPACITY = systemProp(DEFAULT_BUFFER_PROPERTY_NAME,
626             64, 1, UNLIMITED - 1
627         )
628     }
629 }
630 
631 /**
632  * Creates a channel with the specified buffer capacity (or without a buffer by default).
633  * See [Channel] interface documentation for details.
634  *
635  * @param capacity either a positive channel capacity or one of the constants defined in [Channel.Factory].
636  * @param onBufferOverflow configures an action on buffer overflow (optional, defaults to
637  *   a [suspending][BufferOverflow.SUSPEND] attempt to [send][Channel.send] a value,
638  *   supported only when `capacity >= 0` or `capacity == Channel.BUFFERED`,
639  *   implicitly creates a channel with at least one buffered element).
640  * @param onUndeliveredElement an optional function that is called when element was sent but was not delivered to the consumer.
641  *   See "Undelivered elements" section in [Channel] documentation.
642  * @throws IllegalArgumentException when [capacity] < -2
643  */
Channelnull644 public fun <E> Channel(
645     capacity: Int = RENDEZVOUS,
646     onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
647     onUndeliveredElement: ((E) -> Unit)? = null
648 ): Channel<E> =
649     when (capacity) {
650         RENDEZVOUS -> {
651             if (onBufferOverflow == BufferOverflow.SUSPEND)
652                 RendezvousChannel(onUndeliveredElement) // an efficient implementation of rendezvous channel
653             else
654                 ArrayChannel(1, onBufferOverflow, onUndeliveredElement) // support buffer overflow with buffered channel
655         }
656         CONFLATED -> {
657             require(onBufferOverflow == BufferOverflow.SUSPEND) {
658                 "CONFLATED capacity cannot be used with non-default onBufferOverflow"
659             }
660             ConflatedChannel(onUndeliveredElement)
661         }
662         UNLIMITED -> LinkedListChannel(onUndeliveredElement) // ignores onBufferOverflow: it has buffer, but it never overflows
663         BUFFERED -> ArrayChannel( // uses default capacity with SUSPEND
664             if (onBufferOverflow == BufferOverflow.SUSPEND) CHANNEL_DEFAULT_CAPACITY else 1,
665             onBufferOverflow, onUndeliveredElement
666         )
667         else -> {
668             if (capacity == 1 && onBufferOverflow == BufferOverflow.DROP_OLDEST)
669                 ConflatedChannel(onUndeliveredElement) // conflated implementation is more efficient but appears to work in the same way
670             else
671                 ArrayChannel(capacity, onBufferOverflow, onUndeliveredElement)
672         }
673     }
674 
675 @Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.4.0, binary compatibility with earlier versions")
Channelnull676 public fun <E> Channel(capacity: Int = RENDEZVOUS): Channel<E> = Channel(capacity)
677 
678 /**
679  * Indicates an attempt to [send][SendChannel.send] to a [isClosedForSend][SendChannel.isClosedForSend] channel
680  * that was closed without a cause. A _failed_ channel rethrows the original [close][SendChannel.close] cause
681  * exception on send attempts.
682  *
683  * This exception is a subclass of [IllegalStateException], because, conceptually, it is the sender's responsibility
684  * to close the channel and not try to send anything thereafter. Attempts to
685  * send to a closed channel indicate a logical error in the sender's code.
686  */
687 public class ClosedSendChannelException(message: String?) : IllegalStateException(message)
688 
689 /**
690  * Indicates an attempt to [receive][ReceiveChannel.receive] from a [isClosedForReceive][ReceiveChannel.isClosedForReceive]
691  * channel that was closed without a cause. A _failed_ channel rethrows the original [close][SendChannel.close] cause
692  * exception on receive attempts.
693  *
694  * This exception is a subclass of [NoSuchElementException] to be consistent with plain collections.
695  */
696 public class ClosedReceiveChannelException(message: String?) : NoSuchElementException(message)
697