• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 @file:Suppress("FunctionName")
2 
3 package kotlinx.coroutines.channels
4 
5 import kotlinx.coroutines.*
6 import kotlinx.coroutines.channels.Channel.Factory.BUFFERED
7 import kotlinx.coroutines.channels.Channel.Factory.CHANNEL_DEFAULT_CAPACITY
8 import kotlinx.coroutines.channels.Channel.Factory.CONFLATED
9 import kotlinx.coroutines.channels.Channel.Factory.RENDEZVOUS
10 import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
11 import kotlinx.coroutines.internal.*
12 import kotlinx.coroutines.selects.*
13 import kotlin.contracts.*
14 import kotlin.internal.*
15 import kotlin.jvm.*
16 
17 /**
18  * Sender's interface to a [Channel].
19  *
20  * Combined, [SendChannel] and [ReceiveChannel] define the complete [Channel] interface.
21  *
22  * It is not expected that this interface will be implemented directly.
23  * Instead, the existing [Channel] implementations can be used or delegated to.
24  */
25 public interface SendChannel<in E> {
26     /**
27      * Returns `true` if this channel was closed by an invocation of [close] or its receiving side was [cancelled][ReceiveChannel.cancel].
28      * This means that calling [send] will result in an exception.
29      *
30      * Note that if this property returns `false`, it does not guarantee that a subsequent call to [send] will succeed,
31      * as the channel can be concurrently closed right after the check.
32      * For such scenarios, [trySend] is the more robust solution: it attempts to send the element and returns
33      * a result that says whether the channel was closed, and if not, whether sending a value was successful.
34      *
35      * ```
36      * // DANGER! THIS CHECK IS NOT RELIABLE!
37      * if (!channel.isClosedForSend) {
38      *     channel.send(element) // can still fail!
39      * } else {
40      *     println("Can not send: the channel is closed")
41      * }
42      * // DO THIS INSTEAD:
43      * channel.trySend(element).onClosed {
44      *     println("Can not send: the channel is closed")
45      * }
46      * ```
47      *
48      * The primary intended usage of this property is skipping some portions of code that should not be executed if the
49      * channel is already known to be closed.
50      * For example:
51      *
52      * ```
53      * if (channel.isClosedForSend) {
54      *    // fast path
55      *    return
56      * } else {
57      *    // slow path: actually computing the value
58      *    val nextElement = run {
59      *        // some heavy computation
60      *    }
61      *    channel.send(nextElement) // can fail anyway,
62      *    // but at least we tried to avoid the computation
63      * }
64      * ```
65      *
66      * However, in many cases, even that can be achieved more idiomatically by cancelling the coroutine producing the
67      * elements to send.
68      * See [produce] for a way to launch a coroutine that produces elements and cancels itself when the channel is
69      * closed.
70      *
71      * [isClosedForSend] can also be used for assertions and diagnostics to verify the expected state of the channel.
72      *
73      * @see SendChannel.trySend
74      * @see SendChannel.close
75      * @see ReceiveChannel.cancel
76      */
77     @DelicateCoroutinesApi
78     public val isClosedForSend: Boolean
79 
80     /**
81      * Sends the specified [element] to this channel.
82      *
83      * This function suspends if it does not manage to pass the element to the channel's buffer
84      * (or directly the receiving side if there's no buffer),
85      * and it can be cancelled with or without having successfully passed the element.
86      * See the "Suspending and cancellation" section below for details.
87      * If the channel is [closed][close], an exception is thrown (see below).
88      *
89      * ```
90      * val channel = Channel<Int>()
91      * launch {
92      *     check(channel.receive() == 5)
93      * }
94      * channel.send(5) // suspends until 5 is received
95      * ```
96      *
97      * ## Suspending and cancellation
98      *
99      * If the [BufferOverflow] strategy of this channel is [BufferOverflow.SUSPEND],
100      * this function may suspend.
101      * The exact scenarios differ depending on the channel's capacity:
102      * - If the channel is [rendezvous][RENDEZVOUS],
103      *   the sender will be suspended until the receiver calls [ReceiveChannel.receive].
104      * - If the channel is [unlimited][UNLIMITED] or [conflated][CONFLATED],
105      *   the sender will never be suspended even with the [BufferOverflow.SUSPEND] strategy.
106      * - If the channel is buffered (either [BUFFERED] or uses a non-default buffer capacity),
107      *   the sender will be suspended until the buffer has free space.
108      *
109      * This suspending function is cancellable: if the [Job] of the current coroutine is cancelled while this
110      * suspending function is waiting, this function immediately resumes with [CancellationException].
111      * There is a **prompt cancellation guarantee**: even if [send] managed to send the element, but was cancelled
112      * while suspended, [CancellationException] will be thrown. See [suspendCancellableCoroutine] for low-level details.
113      *
114      * Because of the prompt cancellation guarantee, an exception does not always mean a failure to deliver the element.
115      * See the "Undelivered elements" section in the [Channel] documentation
116      * for details on handling undelivered elements.
117      *
118      * Note that this function does not check for cancellation when it is not suspended.
119      * Use [ensureActive] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed:
120      *
121      * ```
122      * // because of UNLIMITED, sending to this channel never suspends
123      * val channel = Channel<Int>(Channel.UNLIMITED)
124      * val job = launch {
125      *     while (isActive) {
126      *         channel.send(42)
127      *     }
128      *     // the loop exits when the job is cancelled
129      * }
130      * ```
131      *
132      * This isn't needed if other cancellable functions are called inside the loop, like [delay].
133      *
134      * ## Sending to a closed channel
135      *
136      * If a channel was [closed][close] before [send] was called and no cause was specified,
137      * an [ClosedSendChannelException] will be thrown from [send].
138      * If a channel was [closed][close] with a cause before [send] was called,
139      * then [send] will rethrow the same (in the `===` sense) exception that was passed to [close].
140      *
141      * In both cases, it is guaranteed that the element was not delivered to the consumer,
142      * and the `onUndeliveredElement` callback will be called.
143      * See the "Undelivered elements" section in the [Channel] documentation
144      * for details on handling undelivered elements.
145      *
146      * [Closing][close] a channel _after_ this function suspends does not cause this suspended [send] invocation
147      * to abort: although subsequent invocations of [send] fail, the existing ones will continue to completion,
148      * unless the sending coroutine is cancelled.
149      *
150      * ## Related
151      *
152      * This function can be used in [select] invocations with the [onSend] clause.
153      * Use [trySend] to try sending to this channel without waiting and throwing.
154      */
sendnull155     public suspend fun send(element: E)
156 
157     /**
158      * Clause for the [select] expression of the [send] suspending function that selects when the element that is
159      * specified as the parameter is sent to the channel.
160      * When the clause is selected, the reference to this channel is passed into the corresponding block.
161      *
162      * The [select] invocation fails with an exception if the channel [is closed for `send`][isClosedForSend] before
163      * the [select] suspends (see the "Sending to a closed channel" section of [send]).
164      *
165      * Example:
166      * ```
167      * val sendChannels = List(4) { index ->
168      *     Channel<Int>(onUndeliveredElement = {
169      *         println("Undelivered element $it for $index")
170      *     }).also { channel ->
171      *         // launch a consumer for this channel
172      *         launch {
173      *             withTimeout(1.seconds) {
174      *                 println("Consumer $index receives: ${channel.receive()}")
175      *             }
176      *         }
177      *     }
178      * }
179      * val element = 42
180      * select {
181      *     for (channel in sendChannels) {
182      *         channel.onSend(element) {
183      *             println("Sent to channel $it")
184      *         }
185      *     }
186      * }
187      * ```
188      * Here, we start a [select] expression that waits for exactly one of the four [onSend] invocations
189      * to successfully send the element to the receiver,
190      * and the other three will instead invoke the `onUndeliveredElement` callback.
191      * See the "Undelivered elements" section in the [Channel] documentation
192      * for details on handling undelivered elements.
193      *
194      * Like [send], [onSend] obeys the rules of prompt cancellation:
195      * [select] may finish with a [CancellationException] even if the element was successfully sent.
196      */
197     public val onSend: SelectClause2<E, SendChannel<E>>
198 
199     /**
200      * Attempts to add the specified [element] to this channel without waiting.
201      *
202      * [trySend] never suspends and never throws exceptions.
203      * Instead, it returns a [ChannelResult] that encapsulates the result of the operation.
204      * This makes it different from [send], which can suspend and throw exceptions.
205      *
206      * If this channel is currently full and cannot receive new elements at the time or is [closed][close],
207      * this function returns a result that indicates [a failure][ChannelResult.isFailure].
208      * In this case, it is guaranteed that the element was not delivered to the consumer and the
209      * `onUndeliveredElement` callback, if one is provided during the [Channel]'s construction, does *not* get called.
210      *
211      * [trySend] can be used as a non-`suspend` alternative to [send] in cases where it's known beforehand
212      * that the channel's buffer can not overflow.
213      * ```
214      * class Coordinates(val x: Int, val y: Int)
215      * // A channel for a single subscriber that stores the latest mouse position update.
216      * // If more than one subscriber is expected, consider using a `StateFlow` instead.
217      * val mousePositionUpdates = Channel<Coordinates>(Channel.CONFLATED)
218      * // Notifies the subscriber about the new mouse position.
219      * // If the subscriber is slow, the intermediate updates are dropped.
220      * fun moveMouse(coordinates: Coordinates) {
221      *     val result = mousePositionUpdates.trySend(coordinates)
222      *     if (result.isClosed) {
223      *         error("Mouse position is no longer being processed")
224      *     }
225      * }
226      * ```
227      */
228     public fun trySend(element: E): ChannelResult<Unit>
229 
230     /**
231      * Closes this channel so that subsequent attempts to [send] to it fail.
232      *
233      * Returns `true` if the channel was not closed previously and the call to this function closed it.
234      * If the channel was already closed, this function does nothing and returns `false`.
235      *
236      * The existing elements in the channel remain there, and likewise,
237      * the calls to [send] an [onSend] that have suspended before [close] was called will not be affected.
238      * Only the subsequent calls to [send], [trySend], or [onSend] will fail.
239      * [isClosedForSend] will start returning `true` immediately after this function is called.
240      *
241      * Once all the existing elements are received, the channel will be considered closed for `receive` as well.
242      * This means that [receive][ReceiveChannel.receive] will also start throwing exceptions.
243      * At that point, [isClosedForReceive][ReceiveChannel.isClosedForReceive] will start returning `true`.
244      *
245      * If the [cause] is non-null, it will be thrown from all the subsequent attempts to [send] to this channel,
246      * as well as from all the attempts to [receive][ReceiveChannel.receive] from the channel after no elements remain.
247      *
248      * If the [cause] is null, the channel is considered to have completed normally.
249      * All subsequent calls to [send] will throw a [ClosedSendChannelException],
250      * whereas calling [receive][ReceiveChannel.receive] will throw a [ClosedReceiveChannelException]
251      * after there are no more elements.
252      *
253      * ```
254      * val channel = Channel<Int>()
255      * channel.send(1)
256      * channel.close()
257      * try {
258      *     channel.send(2)
259      *     error("The channel is closed, so this line is never reached")
260      * } catch (e: ClosedSendChannelException) {
261      *     // expected
262      * }
263      * ```
264      */
265     public fun close(cause: Throwable? = null): Boolean
266 
267     /**
268      * Registers a [handler] that is synchronously invoked once the channel is [closed][close]
269      * or the receiving side of this channel is [cancelled][ReceiveChannel.cancel].
270      * Only one handler can be attached to a channel during its lifetime.
271      * The `handler` is invoked when [isClosedForSend] starts to return `true`.
272      * If the channel is closed already, the handler is invoked immediately.
273      *
274      * The meaning of `cause` that is passed to the handler:
275      * - `null` if the channel was [closed][close] normally with `cause = null`.
276      * - Instance of [CancellationException] if the channel was [cancelled][ReceiveChannel.cancel] normally
277      *   without the corresponding argument.
278      * - The cause of `close` or `cancel` otherwise.
279      *
280      * ### Execution context and exception safety
281      *
282      * The [handler] is executed as part of the closing or cancelling operation,
283      * and only after the channel reaches its final state.
284      * This means that if the handler throws an exception or hangs,
285      * the channel will still be successfully closed or cancelled.
286      * Unhandled exceptions from [handler] are propagated to the closing or cancelling operation's caller.
287      *
288      * Example of usage:
289      * ```
290      * val events = Channel<Event>(Channel.UNLIMITED)
291      * callbackBasedApi.registerCallback { event ->
292      *     events.trySend(event)
293      *         .onClosed { /* channel is already closed, but the callback hasn't stopped yet */ }
294      * }
295      *
296      * val uiUpdater = uiScope.launch(Dispatchers.Main) {
297      *     events.consume { /* handle events */ }
298      * }
299      * // Stop the callback after the channel is closed or cancelled
300      * events.invokeOnClose { callbackBasedApi.stop() }
301      * ```
302      *
303      * **Stability note.** This function constitutes a stable API surface, with the only exception being
304      * that an [IllegalStateException] is thrown when multiple handlers are registered.
305      * This restriction could be lifted in the future.
306      *
307      * @throws UnsupportedOperationException if the underlying channel does not support [invokeOnClose].
308      * Implementation note: currently, [invokeOnClose] is unsupported only by Rx-like integrations.
309      *
310      * @throws IllegalStateException if another handler was already registered
311      */
312     public fun invokeOnClose(handler: (cause: Throwable?) -> Unit)
313 
314     /**
315      * **Deprecated** offer method.
316      *
317      * This method was deprecated in the favour of [trySend].
318      * It has proven itself as the most error-prone method in Channel API:
319      *
320      * - `Boolean` return type creates the false sense of security, implying that `false`
321      *   is returned instead of throwing an exception.
322      * - It was used mostly from non-suspending APIs where CancellationException triggered
323      *   internal failures in the application (the most common source of bugs).
324      * - Due to signature and explicit `if (ch.offer(...))` checks it was easy to
325      *   oversee such error during code review.
326      * - Its name was not aligned with the rest of the API and tried to mimic Java's queue instead.
327      *
328      * **NB** Automatic migration provides best-effort for the user experience, but requires removal
329      * or adjusting of the code that relied on the exception handling.
330      * The complete replacement has a more verbose form:
331      * ```
332      * channel.trySend(element)
333      *     .onClosed { throw it ?: ClosedSendChannelException("Channel was closed normally") }
334      *     .isSuccess
335      * ```
336      *
337      * See https://github.com/Kotlin/kotlinx.coroutines/issues/974 for more context.
338      *
339      * @suppress **Deprecated**.
340      */
341     @Deprecated(
342         level = DeprecationLevel.ERROR,
343         message = "Deprecated in the favour of 'trySend' method",
344         replaceWith = ReplaceWith("trySend(element).isSuccess")
345     ) // Warning since 1.5.0, error since 1.6.0, not hidden until 1.8+ because API is quite widespread
346     public fun offer(element: E): Boolean {
347         val result = trySend(element)
348         if (result.isSuccess) return true
349         throw recoverStackTrace(result.exceptionOrNull() ?: return false)
350     }
351 }
352 
353 /**
354  * Receiver's interface to a [Channel].
355  *
356  * Combined, [SendChannel] and [ReceiveChannel] define the complete [Channel] interface.
357  */
358 public interface ReceiveChannel<out E> {
359     /**
360      * Returns `true` if the sending side of this channel was [closed][SendChannel.close]
361      * and all previously sent items were already received (which also happens for [cancelled][cancel] channels).
362      *
363      * Note that if this property returns `false`,
364      * it does not guarantee that a subsequent call to [receive] will succeed,
365      * as the channel can be concurrently cancelled or closed right after the check.
366      * For such scenarios, [receiveCatching] is the more robust solution:
367      * if the channel is closed, instead of throwing an exception, [receiveCatching] returns a result that allows
368      * querying it.
369      *
370      * ```
371      * // DANGER! THIS CHECK IS NOT RELIABLE!
372      * if (!channel.isClosedForReceive) {
373      *     channel.receive() // can still fail!
374      * } else {
375      *     println("Can not receive: the channel is closed")
376      *     null
377      * }
378      * // DO THIS INSTEAD:
379      * channel.receiveCatching().onClosed {
380      *     println("Can not receive: the channel is closed")
381      * }.getOrNull()
382      * ```
383      *
384      * The primary intended usage of this property is for assertions and diagnostics to verify the expected state of
385      * the channel.
386      * Using it in production code is discouraged.
387      *
388      * @see ReceiveChannel.receiveCatching
389      * @see ReceiveChannel.cancel
390      * @see SendChannel.close
391      */
392     @DelicateCoroutinesApi
393     public val isClosedForReceive: Boolean
394 
395     /**
396      * Returns `true` if the channel contains no elements and isn't [closed for `receive`][isClosedForReceive].
397      *
398      * If [isEmpty] returns `true`, it means that calling [receive] at exactly the same moment would suspend.
399      * However, calling [receive] immediately after checking [isEmpty] may or may not suspend, as new elements
400      * could have been added or removed or the channel could have been closed for `receive` between the two invocations.
401      * Consider using [tryReceive] in cases when suspensions are undesirable:
402      *
403      * ```
404      * // DANGER! THIS CHECK IS NOT RELIABLE!
405      * while (!channel.isEmpty) {
406      *     // can still suspend if other `receive` happens in parallel!
407      *     val element = channel.receive()
408      *     println(element)
409      * }
410      * // DO THIS INSTEAD:
411      * while (true) {
412      *     val element = channel.tryReceive().getOrNull() ?: break
413      *     println(element)
414      * }
415      * ```
416      */
417     @ExperimentalCoroutinesApi
418     public val isEmpty: Boolean
419 
420     /**
421      * Retrieves an element, removing it from the channel.
422      *
423      * This function suspends if the channel is empty, waiting until an element is available.
424      * If the channel is [closed for `receive`][isClosedForReceive], an exception is thrown (see below).
425      * ```
426      * val channel = Channel<Int>()
427      * launch {
428      *     val element = channel.receive() // suspends until 5 is available
429      *     check(element == 5)
430      * }
431      * channel.send(5)
432      * ```
433      *
434      * ## Suspending and cancellation
435      *
436      * This suspending function is cancellable: if the [Job] of the current coroutine is cancelled while this
437      * suspending function is waiting, this function immediately resumes with [CancellationException].
438      * There is a **prompt cancellation guarantee**: even if [receive] managed to retrieve the element from the channel,
439      * but was cancelled while suspended, [CancellationException] will be thrown, and, if
440      * the channel has an `onUndeliveredElement` callback installed, the retrieved element will be passed to it.
441      * See the "Undelivered elements" section in the [Channel] documentation
442      * for details on handling undelivered elements.
443      * See [suspendCancellableCoroutine] for the low-level details of prompt cancellation.
444      *
445      * Note that this function does not check for cancellation when it manages to immediately receive an element without
446      * suspending.
447      * Use [ensureActive] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed:
448      *
449      * ```
450      * val channel = Channel<Int>()
451      * launch { // a very fast producer
452      *     while (true) {
453      *         channel.send(42)
454      *     }
455      * }
456      * val consumer = launch { // a slow consumer
457      *     while (isActive) {
458      *         val element = channel.receive()
459      *         // some slow computation involving `element`
460      *     }
461      * }
462      * delay(100.milliseconds)
463      * consumer.cancelAndJoin()
464      * ```
465      *
466      * ## Receiving from a closed channel
467      *
468      * - Attempting to [receive] from a [closed][SendChannel.close] channel while there are still some elements
469      *   will successfully retrieve an element from the channel.
470      * - When a channel is [closed][SendChannel.close] and there are no elements remaining,
471      *   the channel becomes [closed for `receive`][isClosedForReceive].
472      *   After that,
473      *   [receive] will rethrow the same (in the `===` sense) exception that was passed to [SendChannel.close],
474      *   or [ClosedReceiveChannelException] if none was given.
475      *
476      * ## Related
477      *
478      * This function can be used in [select] invocations with the [onReceive] clause.
479      * Use [tryReceive] to try receiving from this channel without waiting and throwing.
480      * Use [receiveCatching] to receive from this channel without throwing.
481      */
receivenull482     public suspend fun receive(): E
483 
484     /**
485      * Clause for the [select] expression of the [receive] suspending function that selects with the element
486      * received from the channel.
487      *
488      * The [select] invocation fails with an exception if the channel [is closed for `receive`][isClosedForReceive]
489      * at any point, even if other [select] clauses could still work.
490      *
491      * Example:
492      * ```
493      * class ScreenSize(val width: Int, val height: Int)
494      * class MouseClick(val x: Int, val y: Int)
495      * val screenResizes = Channel<ScreenSize>(Channel.CONFLATED)
496      * val mouseClicks = Channel<MouseClick>(Channel.CONFLATED)
497      *
498      * launch(Dispatchers.Main) {
499      *     while (true) {
500      *         select {
501      *             screenResizes.onReceive { newSize ->
502      *                 // update the UI to the new screen size
503      *             }
504      *             mouseClicks.onReceive { click ->
505      *                 // react to a mouse click
506      *             }
507      *         }
508      *     }
509      * }
510      * ```
511      *
512      * Like [receive], [onReceive] obeys the rules of prompt cancellation:
513      * [select] may finish with a [CancellationException] even if an element was successfully retrieved,
514      * in which case the `onUndeliveredElement` callback will be called.
515      */
516     public val onReceive: SelectClause1<E>
517 
518     /**
519      * Retrieves an element, removing it from the channel.
520      *
521      * A difference from [receive] is that this function encapsulates a failure in its return value instead of throwing
522      * an exception.
523      * However, it will still throw [CancellationException] if the coroutine calling [receiveCatching] is cancelled.
524      *
525      * It is guaranteed that the only way this function can return a [failed][ChannelResult.isFailure] result is when
526      * the channel is [closed for `receive`][isClosedForReceive], so [ChannelResult.isClosed] is also true.
527      *
528      * This function suspends if the channel is empty, waiting until an element is available or the channel becomes
529      * closed.
530      * ```
531      * val channel = Channel<Int>()
532      * launch {
533      *     while (true) {
534      *         val result = channel.receiveCatching() // suspends
535      *         when (val element = result.getOrNull()) {
536      *             null -> break // the channel is closed
537      *             else -> check(element == 5)
538      *         }
539      *     }
540      * }
541      * channel.send(5)
542      * ```
543      *
544      * ## Suspending and cancellation
545      *
546      * This suspending function is cancellable: if the [Job] of the current coroutine is cancelled while this
547      * suspending function is waiting, this function immediately resumes with [CancellationException].
548      * There is a **prompt cancellation guarantee**: even if [receiveCatching] managed to retrieve the element from the
549      * channel, but was cancelled while suspended, [CancellationException] will be thrown, and, if
550      * the channel has an `onUndeliveredElement` callback installed, the retrieved element will be passed to it.
551      * See the "Undelivered elements" section in the [Channel] documentation
552      * for details on handling undelivered elements.
553      * See [suspendCancellableCoroutine] for the low-level details of prompt cancellation.
554      *
555      * Note that this function does not check for cancellation when it manages to immediately receive an element without
556      * suspending.
557      * Use [ensureActive] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed:
558      *
559      * ```
560      * val channel = Channel<Int>()
561      * launch { // a very fast producer
562      *     while (true) {
563      *         channel.send(42)
564      *     }
565      * }
566      * val consumer = launch { // a slow consumer
567      *     while (isActive) {
568      *         val element = channel.receiveCatching().getOrNull() ?: break
569      *         // some slow computation involving `element`
570      *     }
571      * }
572      * delay(100.milliseconds)
573      * consumer.cancelAndJoin()
574      * ```
575      *
576      * ## Receiving from a closed channel
577      *
578      * - Attempting to [receiveCatching] from a [closed][SendChannel.close] channel while there are still some elements
579      *   will successfully retrieve an element from the channel.
580      * - When a channel is [closed][SendChannel.close] and there are no elements remaining,
581      *   the channel becomes [closed for `receive`][isClosedForReceive].
582      *   After that, [receiveCatching] will return a result with [ChannelResult.isClosed] set.
583      *   [ChannelResult.exceptionOrNull] will be the exact (in the `===` sense) exception
584      *   that was passed to [SendChannel.close],
585      *   or `null` if none was given.
586      *
587      * ## Related
588      *
589      * This function can be used in [select] invocations with the [onReceiveCatching] clause.
590      * Use [tryReceive] to try receiving from this channel without waiting and throwing.
591      * Use [receive] to receive from this channel and throw exceptions on error.
592      */
593     public suspend fun receiveCatching(): ChannelResult<E>
594 
595     /**
596      * Clause for the [select] expression of the [receiveCatching] suspending function that selects
597      * with a [ChannelResult] when an element is retrieved or the channel gets closed.
598      *
599      * Like [receiveCatching], [onReceiveCatching] obeys the rules of prompt cancellation:
600      * [select] may finish with a [CancellationException] even if an element was successfully retrieved,
601      * in which case the `onUndeliveredElement` callback will be called.
602      */
603     // TODO: think of an example of when this could be useful
604     public val onReceiveCatching: SelectClause1<ChannelResult<E>>
605 
606     /**
607      * Attempts to retrieve an element without waiting, removing it from the channel.
608      *
609      * - When the channel is non-empty, a [successful][ChannelResult.isSuccess] result is returned,
610      *   and [ChannelResult.getOrNull] returns the retrieved element.
611      * - When the channel is empty, a [failed][ChannelResult.isFailure] result is returned.
612      * - When the channel is already [closed for `receive`][isClosedForReceive],
613      *   returns the ["channel is closed"][ChannelResult.isClosed] result.
614      *   If the channel was [closed][SendChannel.close] with a cause (for example, [cancelled][cancel]),
615      *   [ChannelResult.exceptionOrNull] contains the cause.
616      *
617      * This function is useful when implementing on-demand allocation of resources to be stored in the channel:
618      *
619      * ```
620      * val resourcePool = Channel<Resource>(maxResources)
621      *
622      * suspend fun withResource(block: (Resource) -> Unit) {
623      *     val result = resourcePool.tryReceive()
624      *     val resource = result.getOrNull()
625      *         ?: tryCreateNewResource() // try to create a new resource
626      *         ?: resourcePool.receive() // could not create: actually wait for the resource
627      *     try {
628      *         block(resource)
629      *     } finally {
630      *         resourcePool.trySend(resource)
631      *     }
632      * }
633      * ```
634      */
635     public fun tryReceive(): ChannelResult<E>
636 
637     /**
638      * Returns a new iterator to receive elements from this channel using a `for` loop.
639      * Iteration completes normally when the channel [is closed for `receive`][isClosedForReceive] without a cause and
640      * throws the exception passed to [close][SendChannel.close] if there was one.
641      *
642      * Instances of [ChannelIterator] are not thread-safe and shall not be used from concurrent coroutines.
643      *
644      * Example:
645      *
646      * ```
647      * val channel = produce<Int> {
648      *     repeat(1000) {
649      *         send(it)
650      *     }
651      * }
652      * for (v in channel) {
653      *     println(v)
654      * }
655      * ```
656      *
657      * Note that if an early return happens from the `for` loop, the channel does not get cancelled.
658      * To forbid sending new elements after the iteration is completed, use [consumeEach] or
659      * call [cancel] manually.
660      */
661     public operator fun iterator(): ChannelIterator<E>
662 
663     /**
664      * [Closes][SendChannel.close] the channel for new elements and removes all existing ones.
665      *
666      * A [cause] can be used to specify an error message or to provide other details on
667      * the cancellation reason for debugging purposes.
668      * If the cause is not specified, then an instance of [CancellationException] with a
669      * default message is created to [close][SendChannel.close] the channel.
670      *
671      * If the channel was already [closed][SendChannel.close],
672      * [cancel] only has the effect of removing all elements from the channel.
673      *
674      * Immediately after the invocation of this function,
675      * [isClosedForReceive] and, on the [SendChannel] side, [isClosedForSend][SendChannel.isClosedForSend]
676      * start returning `true`.
677      * Any attempt to send to or receive from this channel will lead to a [CancellationException].
678      * This also applies to the existing senders and receivers that are suspended at the time of the call:
679      * they will be resumed with a [CancellationException] immediately after [cancel] is called.
680      *
681      * If the channel has an `onUndeliveredElement` callback installed, this function will invoke it for each of the
682      * elements still in the channel, since these elements will be inaccessible otherwise.
683      * If the callback is not installed, these elements will simply be removed from the channel for garbage collection.
684      *
685      * ```
686      * val channel = Channel<Int>()
687      * channel.send(1)
688      * channel.send(2)
689      * channel.cancel()
690      * channel.trySend(3) // returns ChannelResult.isClosed
691      * for (element in channel) { println(element) } // prints nothing
692      * ```
693      *
694      * [consume] and [consumeEach] are convenient shorthands for cancelling the channel after the single consumer
695      * has finished processing.
696      */
697     public fun cancel(cause: CancellationException? = null)
698 
699     /**
700      * @suppress This method implements old version of JVM ABI. Use [cancel].
701      */
702     @Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.2.0, binary compatibility with versions <= 1.1.x")
703     public fun cancel(): Unit = cancel(null)
704 
705     /**
706      * @suppress This method has bad semantics when cause is not a [CancellationException]. Use [cancel].
707      */
708     @Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.2.0, binary compatibility with versions <= 1.1.x")
709     public fun cancel(cause: Throwable? = null): Boolean
710 
711     /**
712      * **Deprecated** poll method.
713      *
714      * This method was deprecated in the favour of [tryReceive].
715      * It has proven itself as error-prone method in Channel API:
716      *
717      * - Nullable return type creates the false sense of security, implying that `null`
718      *   is returned instead of throwing an exception.
719      * - It was used mostly from non-suspending APIs where CancellationException triggered
720      *   internal failures in the application (the most common source of bugs).
721      * - Its name was not aligned with the rest of the API and tried to mimic Java's queue instead.
722      *
723      * See https://github.com/Kotlin/kotlinx.coroutines/issues/974 for more context.
724      *
725      * ### Replacement note
726      *
727      * The replacement `tryReceive().getOrNull()` is a default that ignores all close exceptions and
728      * proceeds with `null`, while `poll` throws an exception if the channel was closed with an exception.
729      * Replacement with the very same 'poll' semantics is `tryReceive().onClosed { if (it != null) throw it }.getOrNull()`
730      *
731      * @suppress **Deprecated**.
732      */
733     @Deprecated(
734         level = DeprecationLevel.ERROR,
735         message = "Deprecated in the favour of 'tryReceive'. " +
736             "Please note that the provided replacement does not rethrow channel's close cause as 'poll' did, " +
737             "for the precise replacement please refer to the 'poll' documentation",
738         replaceWith = ReplaceWith("tryReceive().getOrNull()")
739     ) // Warning since 1.5.0, error since 1.6.0, not hidden until 1.8+ because API is quite widespread
740     public fun poll(): E? {
741         val result = tryReceive()
742         if (result.isSuccess) return result.getOrThrow()
743         throw recoverStackTrace(result.exceptionOrNull() ?: return null)
744     }
745 
746     /**
747      * This function was deprecated since 1.3.0 and is no longer recommended to use
748      * or to implement in subclasses.
749      *
750      * It had the following pitfalls:
751      * - Didn't allow to distinguish 'null' as "closed channel" from "null as a value"
752      * - Was throwing if the channel has failed even though its signature may suggest it returns 'null'
753      * - It didn't really belong to core channel API and can be exposed as an extension instead.
754      *
755      * ### Replacement note
756      *
757      * The replacement `receiveCatching().getOrNull()` is a safe default that ignores all close exceptions and
758      * proceeds with `null`, while `receiveOrNull` throws an exception if the channel was closed with an exception.
759      * Replacement with the very same `receiveOrNull` semantics is `receiveCatching().onClosed { if (it != null) throw it }.getOrNull()`.
760      *
761      * @suppress **Deprecated**
762      */
763     @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
764     @LowPriorityInOverloadResolution
765     @Deprecated(
766         message = "Deprecated in favor of 'receiveCatching'. " +
767             "Please note that the provided replacement does not rethrow channel's close cause as 'receiveOrNull' did, " +
768             "for the detailed replacement please refer to the 'receiveOrNull' documentation",
769         level = DeprecationLevel.ERROR,
770         replaceWith = ReplaceWith("receiveCatching().getOrNull()")
771     ) // Warning since 1.3.0, error in 1.5.0, cannot be hidden due to deprecated extensions
receiveOrNullnull772     public suspend fun receiveOrNull(): E? = receiveCatching().getOrNull()
773 
774     /**
775      * This function was deprecated since 1.3.0 and is no longer recommended to use
776      * or to implement in subclasses.
777      * See [receiveOrNull] documentation.
778      *
779      * @suppress **Deprecated**: in favor of onReceiveCatching extension.
780      */
781     @Suppress("DEPRECATION_ERROR")
782     @Deprecated(
783         message = "Deprecated in favor of onReceiveCatching extension",
784         level = DeprecationLevel.ERROR,
785         replaceWith = ReplaceWith("onReceiveCatching")
786     ) // Warning since 1.3.0, error in 1.5.0, will be hidden or removed in 1.7.0
787     public val onReceiveOrNull: SelectClause1<E?> get() = (this as BufferedChannel<E>).onReceiveOrNull
788 }
789 
790 /**
791  * A discriminated union representing a channel operation result.
792  * It encapsulates the knowledge of whether the operation succeeded, failed with an option to retry,
793  * or failed because the channel was closed.
794  *
795  * If the operation was [successful][isSuccess], [T] is the result of the operation:
796  * for example, for [ReceiveChannel.receiveCatching] and [ReceiveChannel.tryReceive],
797  * it is the element received from the channel, and for [Channel.trySend], it is [Unit],
798  * as the channel does not receive anything in return for sending a channel.
799  * This value can be retrieved with [getOrNull] or [getOrThrow].
800  *
801  * If the operation [failed][isFailure], it does not necessarily mean that the channel itself is closed.
802  * For example, [ReceiveChannel.receiveCatching] and [ReceiveChannel.tryReceive] can fail because the channel is empty,
803  * and [Channel.trySend] can fail because the channel is full.
804  *
805  * If the operation [failed][isFailure] because the channel was closed for that operation, [isClosed] returns `true`.
806  * The opposite is also true: if [isClosed] returns `true`, then the channel is closed for that operation
807  * ([ReceiveChannel.isClosedForReceive] or [SendChannel.isClosedForSend]).
808  * In this case, retrying the operation is meaningless: once closed, the channel will remain closed.
809  * The [exceptionOrNull] function returns the reason the channel was closed, if any was given.
810  *
811  * Manually obtaining a [ChannelResult] instance is not supported.
812  * See the documentation for [ChannelResult]-returning functions for usage examples.
813  */
814 @JvmInline
815 public value class ChannelResult<out T>
816 @PublishedApi internal constructor(@PublishedApi internal val holder: Any?) {
817     /**
818      * Whether the operation succeeded.
819      *
820      * If this returns `true`, the operation was successful.
821      * In this case, [getOrNull] and [getOrThrow] can be used to retrieve the value.
822      *
823      * If this returns `false`, the operation failed.
824      * [isClosed] can be used to determine whether the operation failed because the channel was closed
825      * (and therefore retrying the operation is meaningless).
826      *
827      * ```
828      * val result = channel.tryReceive()
829      * if (result.isSuccess) {
830      *    println("Successfully received the value ${result.getOrThrow()}")
831      * } else {
832      *    println("Failed to receive the value.")
833      *    if (result.isClosed) {
834      *        println("The channel is closed.")
835      *        if (result.exceptionOrNull() != null) {
836      *            println("The reason: ${result.exceptionOrNull()}")
837      *        }
838      *    }
839      * }
840      * ```
841      *
842      * [isFailure] is a shorthand for `!isSuccess`.
843      * [getOrNull] can simplify [isSuccess] followed by [getOrThrow] into just one check if [T] is known
844      * to be non-nullable.
845      */
846     public val isSuccess: Boolean get() = holder !is Failed
847 
848     /**
849      * Whether the operation failed.
850      *
851      * A shorthand for `!isSuccess`. See [isSuccess] for more details.
852      */
853     public val isFailure: Boolean get() = holder is Failed
854 
855     /**
856      * Whether the operation failed because the channel was closed.
857      *
858      * If this returns `true`, the channel was closed for the operation that returned this result.
859      * In this case, retrying the operation is meaningless: once closed, the channel will remain closed.
860      * [isSuccess] will return `false`.
861      * [exceptionOrNull] can be used to determine the reason the channel was [closed][SendChannel.close]
862      * if one was given.
863      *
864      * If this returns `false`, subsequent attempts to perform the same operation may succeed.
865      *
866      * ```
867      * val result = channel.trySend(42)
868      * if (result.isClosed) {
869      *     println("The channel is closed.")
870      *     if (result.exceptionOrNull() != null) {
871      *         println("The reason: ${result.exceptionOrNull()}")
872      *     }
873      * }
874      */
875     public val isClosed: Boolean get() = holder is Closed
876 
877     /**
878      * Returns the encapsulated [T] if the operation succeeded, or `null` if it failed.
879      *
880      * For non-nullable [T], the following code can be used to handle the result:
881      * ```
882      * val result = channel.tryReceive()
883      * val value = result.getOrNull()
884      * if (value == null) {
885      *     if (result.isClosed) {
886      *         println("The channel is closed.")
887      *         if (result.exceptionOrNull() != null) {
888      *             println("The reason: ${result.exceptionOrNull()}")
889      *         }
890      *     }
891      *     return
892      * }
893      * println("Successfully received the value $value")
894      * ```
895      *
896      * If [T] is nullable, [getOrThrow] together with [isSuccess] is a more reliable way to handle the result.
897      */
898     @Suppress("UNCHECKED_CAST")
899     public fun getOrNull(): T? = if (holder !is Failed) holder as T else null
900 
901     /**
902      *  Returns the encapsulated [T] if the operation succeeded, or throws the encapsulated exception if it failed.
903      *
904      *  Example:
905      *  ```
906      *  val result = channel.tryReceive()
907      *  if (result.isSuccess) {
908      *      println("Successfully received the value ${result.getOrThrow()}")
909      *  }
910      *  ```
911      *
912      *  @throws IllegalStateException if the operation failed, but the channel was not closed with a cause.
913      */
914     public fun getOrThrow(): T {
915         @Suppress("UNCHECKED_CAST")
916         if (holder !is Failed) return holder as T
917         if (holder is Closed) {
918             check(holder.cause != null) { "Trying to call 'getOrThrow' on a channel closed without a cause" }
919             throw holder.cause
920         }
921         error("Trying to call 'getOrThrow' on a failed result of a non-closed channel")
922     }
923 
924     /**
925      * Returns the exception with which the channel was closed, or `null` if the channel was not closed or was closed
926      * without a cause.
927      *
928      * [exceptionOrNull] can only return a non-`null` value if [isClosed] is `true`,
929      * but even if [isClosed] is `true`,
930      * [exceptionOrNull] can still return `null` if the channel was closed without a cause.
931      *
932      * ```
933      * val result = channel.tryReceive()
934      * if (result.isClosed) {
935      *     // Now we know not to retry the operation later.
936      *     // Check if the channel was closed with a cause and rethrow the exception:
937      *     result.exceptionOrNull()?.let { throw it }
938      *     // Otherwise, the channel was closed without a cause.
939      * }
940      * ```
941      */
942     public fun exceptionOrNull(): Throwable? = (holder as? Closed)?.cause
943 
944     internal open class Failed {
945         override fun toString(): String = "Failed"
946     }
947 
948     internal class Closed(@JvmField val cause: Throwable?): Failed() {
949         override fun equals(other: Any?): Boolean = other is Closed && cause == other.cause
950         override fun hashCode(): Int = cause.hashCode()
951         override fun toString(): String = "Closed($cause)"
952     }
953 
954     /**
955      * @suppress **This is internal API and it is subject to change.**
956      */
957     @InternalCoroutinesApi
958     public companion object {
959         private val failed = Failed()
960 
961         @InternalCoroutinesApi
962         public fun <E> success(value: E): ChannelResult<E> =
963             ChannelResult(value)
964 
965         @InternalCoroutinesApi
966         public fun <E> failure(): ChannelResult<E> =
967             ChannelResult(failed)
968 
969         @InternalCoroutinesApi
970         public fun <E> closed(cause: Throwable?): ChannelResult<E> =
971             ChannelResult(Closed(cause))
972     }
973 
974     public override fun toString(): String =
975         when (holder) {
976             is Closed -> holder.toString()
977             else -> "Value($holder)"
978         }
979 }
980 
981 /**
982  * Returns the encapsulated value if the operation [succeeded][ChannelResult.isSuccess], or the
983  * result of [onFailure] function for [ChannelResult.exceptionOrNull] otherwise.
984  *
985  * A shorthand for `if (isSuccess) getOrNull() else onFailure(exceptionOrNull())`.
986  *
987  * @see ChannelResult.getOrNull
988  * @see ChannelResult.exceptionOrNull
989  */
990 @OptIn(ExperimentalContracts::class)
getOrElsenull991 public inline fun <T> ChannelResult<T>.getOrElse(onFailure: (exception: Throwable?) -> T): T {
992     contract {
993         callsInPlace(onFailure, InvocationKind.AT_MOST_ONCE)
994     }
995     @Suppress("UNCHECKED_CAST")
996     return if (holder is ChannelResult.Failed) onFailure(exceptionOrNull()) else holder as T
997 }
998 
999 /**
1000  * Performs the given [action] on the encapsulated value if the operation [succeeded][ChannelResult.isSuccess].
1001  * Returns the original `ChannelResult` unchanged.
1002  *
1003  * A shorthand for `this.also { if (isSuccess) action(getOrThrow()) }`.
1004  */
1005 @OptIn(ExperimentalContracts::class)
onSuccessnull1006 public inline fun <T> ChannelResult<T>.onSuccess(action: (value: T) -> Unit): ChannelResult<T> {
1007     contract {
1008         callsInPlace(action, InvocationKind.AT_MOST_ONCE)
1009     }
1010     @Suppress("UNCHECKED_CAST")
1011     if (holder !is ChannelResult.Failed) action(holder as T)
1012     return this
1013 }
1014 
1015 /**
1016  * Performs the given [action] if the operation [failed][ChannelResult.isFailure].
1017  * The result of [ChannelResult.exceptionOrNull] is passed to the [action] parameter.
1018  *
1019  * Returns the original `ChannelResult` unchanged.
1020  *
1021  * A shorthand for `this.also { if (isFailure) action(exceptionOrNull()) }`.
1022  */
1023 @OptIn(ExperimentalContracts::class)
onFailurenull1024 public inline fun <T> ChannelResult<T>.onFailure(action: (exception: Throwable?) -> Unit): ChannelResult<T> {
1025     contract {
1026         callsInPlace(action, InvocationKind.AT_MOST_ONCE)
1027     }
1028     if (holder is ChannelResult.Failed) action(exceptionOrNull())
1029     return this
1030 }
1031 
1032 /**
1033  * Performs the given [action] if the operation failed because the channel was [closed][ChannelResult.isClosed] for
1034  * that operation.
1035  * The result of [ChannelResult.exceptionOrNull] is passed to the [action] parameter.
1036  *
1037  * It is guaranteed that if action is invoked, then the channel is either [closed for send][Channel.isClosedForSend]
1038  * or is [closed for receive][Channel.isClosedForReceive] depending on the failed operation.
1039  *
1040  * Returns the original `ChannelResult` unchanged.
1041  *
1042  * A shorthand for `this.also { if (isClosed) action(exceptionOrNull()) }`.
1043  */
1044 @OptIn(ExperimentalContracts::class)
onClosednull1045 public inline fun <T> ChannelResult<T>.onClosed(action: (exception: Throwable?) -> Unit): ChannelResult<T> {
1046     contract {
1047         callsInPlace(action, InvocationKind.AT_MOST_ONCE)
1048     }
1049     if (holder is ChannelResult.Closed) action(exceptionOrNull())
1050     return this
1051 }
1052 
1053 /**
1054  * Iterator for a [ReceiveChannel].
1055  * Instances of this interface are *not thread-safe* and shall not be used from concurrent coroutines.
1056  */
1057 public interface ChannelIterator<out E> {
1058     /**
1059      * Prepare an element for retrieval by the invocation of [next].
1060      *
1061      * - If the element that was retrieved by an earlier [hasNext] call was not yet consumed by [next], returns `true`.
1062      * - If the channel has an element available, returns `true` and removes it from the channel.
1063      *   This element will be returned by the subsequent invocation of [next].
1064      * - If the channel is [closed for receiving][ReceiveChannel.isClosedForReceive] without a cause, returns `false`.
1065      * - If the channel is closed with a cause, throws the original [close][SendChannel.close] cause exception.
1066      * - If the channel is not closed but does not contain an element,
1067      *   suspends until either an element is sent to the channel or the channel gets closed.
1068      *
1069      * This suspending function is cancellable: if the [Job] of the current coroutine is cancelled while this
1070      * suspending function is waiting, this function immediately resumes with [CancellationException].
1071      * There is a **prompt cancellation guarantee**: even if [hasNext] retrieves the element from the channel during
1072      * its operation, but was cancelled while suspended, [CancellationException] will be thrown.
1073      * See [suspendCancellableCoroutine] for low-level details.
1074      *
1075      * Because of the prompt cancellation guarantee, some values retrieved from the channel can become lost.
1076      * See the "Undelivered elements" section in the [Channel] documentation
1077      * for details on handling undelivered elements.
1078      *
1079      * Note that this function does not check for cancellation when it is not suspended, that is,
1080      * if the next element is immediately available.
1081      * Use [ensureActive] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
1082      */
hasNextnull1083     public suspend operator fun hasNext(): Boolean
1084 
1085     @Deprecated(message = "Since 1.3.0, binary compatibility with versions <= 1.2.x", level = DeprecationLevel.HIDDEN)
1086     @Suppress("INAPPLICABLE_JVM_NAME")
1087     @JvmName("next")
1088     public suspend fun next0(): E {
1089         /*
1090          * Before 1.3.0 the "next()" could have been used without invoking "hasNext" first and there were code samples
1091          * demonstrating this behavior, so we preserve this logic for full binary backwards compatibility with previously
1092          * compiled code.
1093          */
1094         if (!hasNext()) throw ClosedReceiveChannelException(DEFAULT_CLOSE_MESSAGE)
1095         return next()
1096     }
1097 
1098     /**
1099      * Retrieves the element removed from the channel by the preceding call to [hasNext], or
1100      * throws an [IllegalStateException] if [hasNext] was not invoked.
1101      *
1102      * This method can only be used together with [hasNext]:
1103      * ```
1104      * while (iterator.hasNext()) {
1105      *     val element = iterator.next()
1106      *     // ... handle the element ...
1107      * }
1108      * ```
1109      *
1110      * A more idiomatic way to iterate over a channel is to use a `for` loop:
1111      * ```
1112      * for (element in channel) {
1113      *    // ... handle the element ...
1114      * }
1115      * ```
1116      *
1117      * This method never throws if [hasNext] returned `true`.
1118      * If [hasNext] threw the cause with which the channel was closed, this method will rethrow the same exception.
1119      * If [hasNext] returned `false` because the channel was closed without a cause, this method throws
1120      * a [ClosedReceiveChannelException].
1121      */
nextnull1122     public operator fun next(): E
1123 }
1124 
1125 /**
1126  * Channel is a non-blocking primitive for communication between a sender (via [SendChannel]) and a receiver (via [ReceiveChannel]).
1127  * Conceptually, a channel is similar to `java.util.concurrent.BlockingQueue`,
1128  * but it has suspending operations instead of blocking ones and can be [closed][SendChannel.close].
1129  *
1130  * ### Channel capacity
1131  *
1132  * Most ways to create a [Channel] (in particular, the `Channel()` factory function) allow specifying a capacity,
1133  * which determines how elements are buffered in the channel.
1134  * There are several predefined constants for the capacity that have special behavior:
1135  *
1136  * - [Channel.RENDEZVOUS] (or 0) creates a _rendezvous_ channel, which does not have a buffer at all.
1137  *   Instead, the sender and the receiver must rendezvous (meet):
1138  *   [SendChannel.send] suspends until another coroutine invokes [ReceiveChannel.receive], and vice versa.
1139  * - [Channel.CONFLATED] creates a buffer for a single element and automatically changes the
1140  *   [buffer overflow strategy][BufferOverflow] to [BufferOverflow.DROP_OLDEST].
1141  * - [Channel.UNLIMITED] creates a channel with an unlimited buffer, which never suspends the sender.
1142  * - [Channel.BUFFERED] creates a channel with a buffer whose size depends on
1143  *   the [buffer overflow strategy][BufferOverflow].
1144  *
1145  * See each constant's documentation for more details.
1146  *
1147  * If the capacity is positive but less than [Channel.UNLIMITED], the channel has a buffer with the specified capacity.
1148  * It is safe to construct a channel with a large buffer, as memory is only allocated gradually as elements are added.
1149  *
1150  * Constructing a channel with a negative capacity not equal to a predefined constant is not allowed
1151  * and throws an [IllegalArgumentException].
1152  *
1153  * ### Buffer overflow
1154  *
1155  * Some ways to create a [Channel] also expose a [BufferOverflow] parameter (by convention, `onBufferOverflow`),
1156  * which does not affect the receiver but determines the behavior of the sender when the buffer is full.
1157  * The options include [suspending][BufferOverflow.SUSPEND] until there is space in the buffer,
1158  * [dropping the oldest element][BufferOverflow.DROP_OLDEST] to make room for the new one, or
1159  * [dropping the element to be sent][BufferOverflow.DROP_LATEST]. See the [BufferOverflow] documentation.
1160  *
1161  * By convention, the default value for [BufferOverflow] whenever it can not be configured is [BufferOverflow.SUSPEND].
1162  *
1163  * See the [Channel.RENDEZVOUS], [Channel.CONFLATED], and [Channel.UNLIMITED] documentation for a description of how
1164  * they interact with the [BufferOverflow] parameter.
1165  *
1166  * ### Prompt cancellation guarantee
1167  *
1168  * All suspending functions with channels provide **prompt cancellation guarantee**.
1169  * If the job was cancelled while send or receive function was suspended, it will not resume successfully, even if it
1170  * already changed the channel's state, but throws a [CancellationException].
1171  * With a single-threaded [dispatcher][CoroutineDispatcher] like [Dispatchers.Main], this gives a
1172  * guarantee that the coroutine promptly reacts to the cancellation of its [Job] and does not resume its execution.
1173  *
1174  * > **Prompt cancellation guarantee** for channel operations was added in `kotlinx.coroutines` version `1.4.0`
1175  * > and has replaced the channel-specific atomic cancellation that was not consistent with other suspending functions.
1176  * > The low-level mechanics of prompt cancellation are explained in the [suspendCancellableCoroutine] documentation.
1177  *
1178  * ### Undelivered elements
1179  *
1180  * As a result of the prompt cancellation guarantee, when a closeable resource
1181  * (like an open file or a handle to another native resource) is transferred via a channel,
1182  * it can be successfully extracted from the channel,
1183  * but still be lost if the receiving operation is cancelled in parallel.
1184  *
1185  * The `Channel()` factory function has the optional parameter `onUndeliveredElement`.
1186  * When that parameter is set, the corresponding function is called once for each element
1187  * that was sent to the channel with the call to the [send][SendChannel.send] function but failed to be delivered,
1188  * which can happen in the following cases:
1189  *
1190  * - When an element is dropped due to the limited buffer capacity.
1191  *   This can happen when the overflow strategy is [BufferOverflow.DROP_LATEST] or [BufferOverflow.DROP_OLDEST].
1192  * - When the sending operations like [send][SendChannel.send] or [onSend][SendChannel.onSend]
1193  *   throw an exception because it was cancelled
1194  *   before it had a chance to actually send the element
1195  *   or because the channel was [closed][SendChannel.close] or [cancelled][ReceiveChannel.cancel].
1196  * - When the receiving operations like [receive][ReceiveChannel.receive],
1197  *   [onReceive][ReceiveChannel.onReceive], or [hasNext][ChannelIterator.hasNext]
1198  *   throw an exception after retrieving the element from the channel
1199  *   because of being cancelled before the code following them had a chance to resume.
1200  * - When the channel was [cancelled][ReceiveChannel.cancel], in which case `onUndeliveredElement` is called on every
1201  *   remaining element in the channel's buffer.
1202  *
1203  * Note that `onUndeliveredElement` is called synchronously in an arbitrary context.
1204  * It should be fast, non-blocking, and should not throw exceptions.
1205  * Any exception thrown by `onUndeliveredElement` is wrapped into an internal runtime exception
1206  * which is either rethrown from the caller method or handed off to the exception handler in the current context
1207  * (see [CoroutineExceptionHandler]) when one is available.
1208  *
1209  * A typical usage for `onUndeliveredElement` is to close a resource that is being transferred via the channel. The
1210  * following code pattern guarantees that opened resources are closed even if the producer, the consumer,
1211  * and/or the channel are cancelled. Resources are never lost.
1212  *
1213  * ```
1214  * // Create a channel with an onUndeliveredElement block that closes a resource
1215  * val channel = Channel<Resource>(onUndeliveredElement = { resource -> resource.close() })
1216  *
1217  * // Producer code
1218  * val resourceToSend = openResource()
1219  * channel.send(resourceToSend)
1220  *
1221  * // Consumer code
1222  * val resourceReceived = channel.receive()
1223  * try {
1224  *     // work with received resource
1225  * } finally {
1226  *     resourceReceived.close()
1227  * }
1228  * ```
1229  *
1230  * > Note that if any work happens between `openResource()` and `channel.send(...)`,
1231  * > it is your responsibility to ensure that resource gets closed in case this additional code fails.
1232  */
1233 public interface Channel<E> : SendChannel<E>, ReceiveChannel<E> {
1234     /**
1235      * Constants for the channel factory function `Channel()`.
1236      */
1237     public companion object Factory {
1238         /**
1239          * An unlimited buffer capacity.
1240          *
1241          * `Channel(UNLIMITED)` creates a channel with an unlimited buffer, which never suspends the sender.
1242          * The total amount of elements that can be sent to the channel is limited only by the available memory.
1243          *
1244          * If [BufferOverflow] is specified for the channel, it is completely ignored,
1245          * as the channel never suspends the sender.
1246          *
1247          * ```
1248          * val channel = Channel<Int>(Channel.UNLIMITED)
1249          * repeat(1000) {
1250          *    channel.trySend(it)
1251          * }
1252          * repeat(1000) {
1253          *    check(channel.tryReceive().getOrNull() == it)
1254          * }
1255          * ```
1256          */
1257         public const val UNLIMITED: Int = Int.MAX_VALUE
1258 
1259         /**
1260          * The zero buffer capacity.
1261          *
1262          * For the default [BufferOverflow] value of [BufferOverflow.SUSPEND],
1263          * `Channel(RENDEZVOUS)` creates a channel without a buffer.
1264          * An element is transferred from the sender to the receiver only when [send] and [receive] invocations meet
1265          * in time (that is, they _rendezvous_),
1266          * so [send] suspends until another coroutine invokes [receive],
1267          * and [receive] suspends until another coroutine invokes [send].
1268          *
1269          * ```
1270          * val channel = Channel<Int>(Channel.RENDEZVOUS)
1271          * check(channel.trySend(5).isFailure) // sending fails: no receiver is waiting
1272          * launch(start = CoroutineStart.UNDISPATCHED) {
1273          *     val element = channel.receive() // suspends
1274          *     check(element == 3)
1275          * }
1276          * check(channel.trySend(3).isSuccess) // sending succeeds: receiver is waiting
1277          * ```
1278          *
1279          * If a different [BufferOverflow] is specified,
1280          * `Channel(RENDEZVOUS)` creates a channel with a buffer of size 1:
1281          *
1282          * ```
1283          * val channel = Channel<Int>(0, onBufferOverflow = BufferOverflow.DROP_OLDEST)
1284          * // None of the calls suspend, since the buffer overflow strategy is not SUSPEND
1285          * channel.send(1)
1286          * channel.send(2)
1287          * channel.send(3)
1288          * check(channel.receive() == 3)
1289          * ```
1290          */
1291         public const val RENDEZVOUS: Int = 0
1292 
1293         /**
1294          * A single-element buffer with conflating behavior.
1295          *
1296          * Specifying [CONFLATED] as the capacity in the `Channel(...)` factory function is equivalent to
1297          * creating a channel with a buffer of size 1 and a [BufferOverflow] strategy of [BufferOverflow.DROP_OLDEST]:
1298          * `Channel(1, onBufferOverflow = BufferOverflow.DROP_OLDEST)`.
1299          * Such a channel buffers at most one element and conflates all subsequent `send` and `trySend` invocations
1300          * so that the receiver always gets the last element sent, **losing** the previously sent elements:
1301          * see the "Undelivered elements" section in the [Channel] documentation.
1302          * [Sending][send] to this channel never suspends, and [trySend] always succeeds.
1303          *
1304          * ```
1305          * val channel = Channel<Int>(Channel.CONFLATED)
1306          * channel.send(1)
1307          * channel.send(2)
1308          * channel.send(3)
1309          * check(channel.receive() == 3)
1310          * ```
1311          *
1312          * Specifying a [BufferOverflow] other than [BufferOverflow.SUSPEND] is not allowed with [CONFLATED], and
1313          * an [IllegalArgumentException] is thrown if such a combination is used.
1314          * For creating a conflated channel that instead keeps the existing element in the channel and throws out
1315          * the new one, use `Channel(1, onBufferOverflow = BufferOverflow.DROP_LATEST)`.
1316          */
1317         public const val CONFLATED: Int = -1
1318 
1319         /**
1320          * A channel capacity marker that is substituted by the default buffer capacity.
1321          *
1322          * When passed as a parameter to the `Channel(...)` factory function, the default buffer capacity is used.
1323          * For [BufferOverflow.SUSPEND] (the default buffer overflow strategy), the default capacity is 64,
1324          * but on the JVM it can be overridden by setting the [DEFAULT_BUFFER_PROPERTY_NAME] system property.
1325          * The overridden value is used for all channels created with a default buffer capacity,
1326          * including those created in third-party libraries.
1327          *
1328          * ```
1329          * val channel = Channel<Int>(Channel.BUFFERED)
1330          * repeat(100) {
1331          *     channel.trySend(it)
1332          * }
1333          * channel.close()
1334          * // The check can fail if the default buffer capacity is changed
1335          * check(channel.toList() == (0..<64).toList())
1336          * ```
1337          *
1338          * If a different [BufferOverflow] is specified, `Channel(BUFFERED)` creates a channel with a buffer of size 1:
1339          *
1340          * ```
1341          * val channel = Channel<Int>(Channel.BUFFERED, onBufferOverflow = BufferOverflow.DROP_OLDEST)
1342          * channel.send(1)
1343          * channel.send(2)
1344          * channel.send(3)
1345          * channel.close()
1346          * check(channel.toList() == listOf(3))
1347          * ```
1348          */
1349         public const val BUFFERED: Int = -2
1350 
1351         // only for internal use, cannot be used with Channel(...)
1352         internal const val OPTIONAL_CHANNEL = -3
1353 
1354         /**
1355          * Name of the JVM system property for the default channel capacity (64 by default).
1356          *
1357          * See [BUFFERED] for details on how this property is used.
1358          *
1359          * Setting this property affects the default channel capacity for channel constructors,
1360          * channel-backed coroutines and flow operators that imply channel usage,
1361          * including ones defined in 3rd-party libraries.
1362          *
1363          * Usage of this property is highly discouraged and is intended to be used as a last-ditch effort
1364          * as an immediate measure for hot fixes and duct-taping.
1365          */
1366         @DelicateCoroutinesApi
1367         public const val DEFAULT_BUFFER_PROPERTY_NAME: String = "kotlinx.coroutines.channels.defaultBuffer"
1368 
1369         internal val CHANNEL_DEFAULT_CAPACITY = systemProp(DEFAULT_BUFFER_PROPERTY_NAME,
1370             64, 1, UNLIMITED - 1
1371         )
1372     }
1373 }
1374 
1375 /**
1376  * Creates a channel. See the [Channel] interface documentation for details.
1377  *
1378  * This function is the most flexible way to create a channel.
1379  * It allows specifying the channel's capacity, buffer overflow strategy, and an optional function to call
1380  * to handle undelivered elements.
1381  *
1382  * ```
1383  * val allocatedResources = HashSet<Int>()
1384  * // An autocloseable resource that must be closed when it is no longer needed
1385  * class Resource(val id: Int): AutoCloseable {
1386  *     init {
1387  *         allocatedResources.add(id)
1388  *     }
1389  *     override fun close() {
1390  *         allocatedResources.remove(id)
1391  *     }
1392  * }
1393  * // A channel with a 15-element buffer that drops the oldest element on buffer overflow
1394  * // and closes the elements that were not delivered to the consumer
1395  * val channel = Channel<Resource>(
1396  *     capacity = 15,
1397  *     onBufferOverflow = BufferOverflow.DROP_OLDEST,
1398  *     onUndeliveredElement = { element -> element.close() }
1399  * )
1400  * // A sender's view of the channel
1401  * val sendChannel: SendChannel<Resource> = channel
1402  * repeat(100) {
1403  *     sendChannel.send(Resource(it))
1404  * }
1405  * sendChannel.close()
1406  * // A receiver's view of the channel
1407  * val receiveChannel: ReceiveChannel<Resource> = channel
1408  * val receivedResources = receiveChannel.toList()
1409  * // Check that the last 15 sent resources were received
1410  * check(receivedResources.map { it.id } == (85 until 100).toList())
1411  * // Close the resources that were successfully received
1412  * receivedResources.forEach { it.close() }
1413  * // The dropped resources were closed by the channel itself
1414  * check(allocatedResources.isEmpty())
1415  * ```
1416  *
1417  * For a full explanation of every parameter and their interaction, see the [Channel] interface documentation.
1418  *
1419  * @param capacity either a positive channel capacity or one of the constants defined in [Channel.Factory].
1420  *   See the "Channel capacity" section in the [Channel] documentation.
1421  * @param onBufferOverflow configures an action on buffer overflow.
1422  *   See the "Buffer overflow" section in the [Channel] documentation.
1423  * @param onUndeliveredElement a function that is called when element was sent but was not delivered to the consumer.
1424  *   See the "Undelivered elements" section in the [Channel] documentation.
1425  * @throws IllegalArgumentException when [capacity] < -2
1426  */
Channelnull1427 public fun <E> Channel(
1428     capacity: Int = RENDEZVOUS,
1429     onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
1430     onUndeliveredElement: ((E) -> Unit)? = null
1431 ): Channel<E> =
1432     when (capacity) {
1433         RENDEZVOUS -> {
1434             if (onBufferOverflow == BufferOverflow.SUSPEND)
1435                 BufferedChannel(RENDEZVOUS, onUndeliveredElement) // an efficient implementation of rendezvous channel
1436             else
1437                 ConflatedBufferedChannel(1, onBufferOverflow, onUndeliveredElement) // support buffer overflow with buffered channel
1438         }
1439         CONFLATED -> {
1440             require(onBufferOverflow == BufferOverflow.SUSPEND) {
1441                 "CONFLATED capacity cannot be used with non-default onBufferOverflow"
1442             }
1443             ConflatedBufferedChannel(1, BufferOverflow.DROP_OLDEST, onUndeliveredElement)
1444         }
1445         UNLIMITED -> BufferedChannel(UNLIMITED, onUndeliveredElement) // ignores onBufferOverflow: it has buffer, but it never overflows
1446         BUFFERED -> { // uses default capacity with SUSPEND
1447             if (onBufferOverflow == BufferOverflow.SUSPEND) BufferedChannel(CHANNEL_DEFAULT_CAPACITY, onUndeliveredElement)
1448             else ConflatedBufferedChannel(1, onBufferOverflow, onUndeliveredElement)
1449         }
1450         else -> {
1451             if (onBufferOverflow === BufferOverflow.SUSPEND) BufferedChannel(capacity, onUndeliveredElement)
1452             else ConflatedBufferedChannel(capacity, onBufferOverflow, onUndeliveredElement)
1453         }
1454     }
1455 
1456 @Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.4.0, binary compatibility with earlier versions")
Channelnull1457 public fun <E> Channel(capacity: Int = RENDEZVOUS): Channel<E> = Channel(capacity)
1458 
1459 /**
1460  * Indicates an attempt to [send][SendChannel.send] to a [closed-for-sending][SendChannel.isClosedForSend] channel
1461  * that was [closed][SendChannel.close] without a cause.
1462  *
1463  * If a cause was provided, that cause is thrown from [send][SendChannel.send] instead of this exception.
1464  * In particular, if the channel was closed because it was [cancelled][ReceiveChannel.cancel],
1465  * this exception will never be thrown: either the `cause` of the cancellation is thrown,
1466  * or a new [CancellationException] gets constructed to be thrown from [SendChannel.send].
1467  *
1468  * This exception is a subclass of [IllegalStateException], because the sender should not attempt to send to a closed
1469  * channel after it itself has [closed][SendChannel.close] it, and indicates an error on the part of the programmer.
1470  * Usually, this exception can be avoided altogether by restructuring the code.
1471  */
1472 public class ClosedSendChannelException(message: String?) : IllegalStateException(message)
1473 
1474 /**
1475  * Indicates an attempt to [receive][ReceiveChannel.receive] from a
1476  * [closed-for-receiving][ReceiveChannel.isClosedForReceive] channel
1477  * that was [closed][SendChannel.close] without a cause.
1478  *
1479  * If a clause was provided, that clause is thrown from [receive][ReceiveChannel.receive] instead of this exception.
1480  * In particular, if the channel was closed because it was [cancelled][ReceiveChannel.cancel],
1481  * this exception will never be thrown: either the `cause` of the cancellation is thrown,
1482  * or a new [CancellationException] gets constructed to be thrown from [ReceiveChannel.receive].
1483  *
1484  * This exception is a subclass of [NoSuchElementException] to be consistent with plain collections.
1485  */
1486 public class ClosedReceiveChannelException(message: String?) : NoSuchElementException(message)
1487