• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 @file:Suppress("FunctionName")
6 
7 package kotlinx.coroutines.channels
8 
9 import kotlinx.coroutines.*
10 import kotlinx.coroutines.channels.Channel.Factory.BUFFERED
11 import kotlinx.coroutines.channels.Channel.Factory.CHANNEL_DEFAULT_CAPACITY
12 import kotlinx.coroutines.channels.Channel.Factory.CONFLATED
13 import kotlinx.coroutines.channels.Channel.Factory.RENDEZVOUS
14 import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
15 import kotlinx.coroutines.internal.*
16 import kotlinx.coroutines.selects.*
17 import kotlin.contracts.*
18 import kotlin.internal.*
19 import kotlin.jvm.*
20 
21 /**
22  * Sender's interface to [Channel].
23  */
24 public interface SendChannel<in E> {
25     /**
26      * Returns `true` if this channel was closed by an invocation of [close]. This means that
27      * calling [send] will result in an exception.
28      *
29      * **Note: This is an experimental api.** This property may change its semantics and/or name in the future.
30      */
31     @ExperimentalCoroutinesApi
32     public val isClosedForSend: Boolean
33 
34     /**
35      * Sends the specified [element] to this channel, suspending the caller while the buffer of this channel is full
36      * or if it does not exist, or throws an exception if the channel [is closed for `send`][isClosedForSend] (see [close] for details).
37      *
38      * [Closing][close] a channel _after_ this function has suspended does not cause this suspended [send] invocation
39      * to abort, because closing a channel is conceptually like sending a special "close token" over this channel.
40      * All elements sent over the channel are delivered in first-in first-out order. The sent element
41      * will be delivered to receivers before the close token.
42      *
43      * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
44      * function is suspended, this function immediately resumes with a [CancellationException].
45      * There is a **prompt cancellation guarantee**. If the job was cancelled while this function was
46      * suspended, it will not resume successfully. The `send` call can send the element to the channel,
47      * but then throw [CancellationException], thus an exception should not be treated as a failure to deliver the element.
48      * See "Undelivered elements" section in [Channel] documentation for details on handling undelivered elements.
49      *
50      * Note that this function does not check for cancellation when it is not suspended.
51      * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
52      *
53      * This function can be used in [select] invocations with the [onSend] clause.
54      * Use [trySend] to try sending to this channel without waiting.
55      */
sendnull56     public suspend fun send(element: E)
57 
58     /**
59      * Clause for the [select] expression of the [send] suspending function that selects when the element that is specified
60      * as the parameter is sent to the channel. When the clause is selected, the reference to this channel
61      * is passed into the corresponding block.
62      *
63      * The [select] invocation fails with an exception if the channel [is closed for `send`][isClosedForSend] (see [close] for details).
64      */
65     public val onSend: SelectClause2<E, SendChannel<E>>
66 
67     /**
68      * Immediately adds the specified [element] to this channel, if this doesn't violate its capacity restrictions,
69      * and returns the successful result. Otherwise, returns failed or closed result.
70      * This is synchronous variant of [send], which backs off in situations when `send` suspends or throws.
71      *
72      * When `trySend` call returns a non-successful result, it guarantees that the element was not delivered to the consumer, and
73      * it does not call `onUndeliveredElement` that was installed for this channel.
74      * See "Undelivered elements" section in [Channel] documentation for details on handling undelivered elements.
75      */
76     public fun trySend(element: E): ChannelResult<Unit>
77 
78     /**
79      * Closes this channel.
80      * This is an idempotent operation &mdash; subsequent invocations of this function have no effect and return `false`.
81      * Conceptually, it sends a special "close token" over this channel.
82      *
83      * Immediately after invocation of this function,
84      * [isClosedForSend] starts returning `true`. However, [isClosedForReceive][ReceiveChannel.isClosedForReceive]
85      * on the side of [ReceiveChannel] starts returning `true` only after all previously sent elements
86      * are received.
87      *
88      * A channel that was closed without a [cause] throws a [ClosedSendChannelException] on attempts to [send]
89      * and [ClosedReceiveChannelException] on attempts to [receive][ReceiveChannel.receive].
90      * A channel that was closed with non-null [cause] is called a _failed_ channel. Attempts to send or
91      * receive on a failed channel throw the specified [cause] exception.
92      */
93     public fun close(cause: Throwable? = null): Boolean
94 
95     /**
96      * Registers a [handler] which is synchronously invoked once the channel is [closed][close]
97      * or the receiving side of this channel is [cancelled][ReceiveChannel.cancel].
98      * Only one handler can be attached to a channel during its lifetime.
99      * The `handler` is invoked when [isClosedForSend] starts to return `true`.
100      * If the channel is closed already, the handler is invoked immediately.
101      *
102      * The meaning of `cause` that is passed to the handler:
103      * * `null` if the channel was closed or cancelled without the corresponding argument
104      * * the cause of `close` or `cancel` otherwise.
105      *
106      * Example of usage (exception handling is omitted):
107      *
108      * ```
109      * val events = Channel(UNLIMITED)
110      * callbackBasedApi.registerCallback { event ->
111      *   events.trySend(event)
112      * }
113      *
114      * val uiUpdater = launch(Dispatchers.Main, parent = UILifecycle) {
115      *    events.consume {}
116      *    events.cancel()
117      * }
118      *
119      * events.invokeOnClose { callbackBasedApi.stop() }
120      * ```
121      *
122      * **Note: This is an experimental api.** This function may change its semantics, parameters or return type in the future.
123      *
124      * @throws UnsupportedOperationException if the underlying channel doesn't support [invokeOnClose].
125      * Implementation note: currently, [invokeOnClose] is unsupported only by Rx-like integrations
126      *
127      * @throws IllegalStateException if another handler was already registered
128      */
129     @ExperimentalCoroutinesApi
130     public fun invokeOnClose(handler: (cause: Throwable?) -> Unit)
131 
132     /**
133      * **Deprecated** offer method.
134      *
135      * This method was deprecated in the favour of [trySend].
136      * It has proven itself as the most error-prone method in Channel API:
137      *
138      * * `Boolean` return type creates the false sense of security, implying that `false`
139      *    is returned instead of throwing an exception.
140      * * It was used mostly from non-suspending APIs where CancellationException triggered
141      *   internal failures in the application (the most common source of bugs).
142      * * Due to signature and explicit `if (ch.offer(...))` checks it was easy to
143      *   oversee such error during code review.
144      * * Its name was not aligned with the rest of the API and tried to mimic Java's queue instead.
145      *
146      * **NB** Automatic migration provides best-effort for the user experience, but requires removal
147      * or adjusting of the code that relied on the exception handling.
148      * The complete replacement has a more verbose form:
149      * ```
150      * channel.trySend(element)
151      *     .onClosed { throw it ?: ClosedSendChannelException("Channel was closed normally") }
152      *     .isSuccess
153      * ```
154      *
155      * See https://github.com/Kotlin/kotlinx.coroutines/issues/974 for more context.
156      *
157      * @suppress **Deprecated**.
158      */
159     @Deprecated(
160         level = DeprecationLevel.ERROR,
161         message = "Deprecated in the favour of 'trySend' method",
162         replaceWith = ReplaceWith("trySend(element).isSuccess")
163     ) // Warning since 1.5.0, error since 1.6.0
164     public fun offer(element: E): Boolean {
165         val result = trySend(element)
166         if (result.isSuccess) return true
167         throw recoverStackTrace(result.exceptionOrNull() ?: return false)
168     }
169 }
170 
171 /**
172  * Receiver's interface to [Channel].
173  */
174 public interface ReceiveChannel<out E> {
175     /**
176      * Returns `true` if this channel was closed by invocation of [close][SendChannel.close] on the [SendChannel]
177      * side and all previously sent items were already received. This means that calling [receive]
178      * will result in a [ClosedReceiveChannelException]. If the channel was closed because of an exception, it
179      * is considered closed, too, but is called a _failed_ channel. All suspending attempts to receive
180      * an element from a failed channel throw the original [close][SendChannel.close] cause exception.
181      *
182      * **Note: This is an experimental api.** This property may change its semantics and/or name in the future.
183      */
184     @ExperimentalCoroutinesApi
185     public val isClosedForReceive: Boolean
186 
187     /**
188      * Returns `true` if the channel is empty (contains no elements), which means that an attempt to [receive] will suspend.
189      * This function returns `false` if the channel [is closed for `receive`][isClosedForReceive].
190      */
191     @ExperimentalCoroutinesApi
192     public val isEmpty: Boolean
193 
194     /**
195      * Retrieves and removes an element from this channel if it's not empty, or suspends the caller while the channel is empty,
196      * or throws a [ClosedReceiveChannelException] if the channel [is closed for `receive`][isClosedForReceive].
197      * If the channel was closed because of an exception, it is called a _failed_ channel and this function
198      * will throw the original [close][SendChannel.close] cause exception.
199      *
200      * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
201      * function is suspended, this function immediately resumes with a [CancellationException].
202      * There is a **prompt cancellation guarantee**. If the job was cancelled while this function was
203      * suspended, it will not resume successfully. The `receive` call can retrieve the element from the channel,
204      * but then throw [CancellationException], thus failing to deliver the element.
205      * See "Undelivered elements" section in [Channel] documentation for details on handling undelivered elements.
206      *
207      * Note that this function does not check for cancellation when it is not suspended.
208      * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
209      *
210      * This function can be used in [select] invocations with the [onReceive] clause.
211      * Use [tryReceive] to try receiving from this channel without waiting.
212      */
receivenull213     public suspend fun receive(): E
214 
215     /**
216      * Clause for the [select] expression of the [receive] suspending function that selects with the element
217      * received from the channel.
218      * The [select] invocation fails with an exception if the channel
219      * [is closed for `receive`][isClosedForReceive] (see [close][SendChannel.close] for details).
220      */
221     public val onReceive: SelectClause1<E>
222 
223     /**
224      * Retrieves and removes an element from this channel if it's not empty, or suspends the caller while this channel is empty.
225      * This method returns [ChannelResult] with the value of an element successfully retrieved from the channel
226      * or the close cause if the channel was closed. Closed cause may be `null` if the channel was closed normally.
227      * The result cannot be [failed][ChannelResult.isFailure] without being [closed][ChannelResult.isClosed].
228      *
229      * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
230      * function is suspended, this function immediately resumes with a [CancellationException].
231      * There is a **prompt cancellation guarantee**. If the job was cancelled while this function was
232      * suspended, it will not resume successfully. The `receiveCatching` call can retrieve the element from the channel,
233      * but then throw [CancellationException], thus failing to deliver the element.
234      * See "Undelivered elements" section in [Channel] documentation for details on handling undelivered elements.
235      *
236      * Note that this function does not check for cancellation when it is not suspended.
237      * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
238      *
239      * This function can be used in [select] invocations with the [onReceiveCatching] clause.
240      * Use [tryReceive] to try receiving from this channel without waiting.
241      */
242     public suspend fun receiveCatching(): ChannelResult<E>
243 
244     /**
245      * Clause for the [select] expression of the [onReceiveCatching] suspending function that selects with the [ChannelResult] with a value
246      * that is received from the channel or with a close cause if the channel
247      * [is closed for `receive`][isClosedForReceive].
248      */
249     public val onReceiveCatching: SelectClause1<ChannelResult<E>>
250 
251     /**
252      * Retrieves and removes an element from this channel if it's not empty, returning a [successful][ChannelResult.success]
253      * result, returns [failed][ChannelResult.failed] result if the channel is empty, and [closed][ChannelResult.closed]
254      * result if the channel is closed.
255      */
256     public fun tryReceive(): ChannelResult<E>
257 
258     /**
259      * Returns a new iterator to receive elements from this channel using a `for` loop.
260      * Iteration completes normally when the channel [is closed for `receive`][isClosedForReceive] without a cause and
261      * throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
262      */
263     public operator fun iterator(): ChannelIterator<E>
264 
265     /**
266      * Cancels reception of remaining elements from this channel with an optional [cause].
267      * This function closes the channel and removes all buffered sent elements from it.
268      *
269      * A cause can be used to specify an error message or to provide other details on
270      * the cancellation reason for debugging purposes.
271      * If the cause is not specified, then an instance of [CancellationException] with a
272      * default message is created to [close][SendChannel.close] the channel.
273      *
274      * Immediately after invocation of this function [isClosedForReceive] and
275      * [isClosedForSend][SendChannel.isClosedForSend]
276      * on the side of [SendChannel] start returning `true`. Any attempt to send to or receive from this channel
277      * will lead to a [CancellationException].
278      */
279     public fun cancel(cause: CancellationException? = null)
280 
281     /**
282      * @suppress This method implements old version of JVM ABI. Use [cancel].
283      */
284     @Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.2.0, binary compatibility with versions <= 1.1.x")
285     public fun cancel(): Unit = cancel(null)
286 
287     /**
288      * @suppress This method has bad semantics when cause is not a [CancellationException]. Use [cancel].
289      */
290     @Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.2.0, binary compatibility with versions <= 1.1.x")
291     public fun cancel(cause: Throwable? = null): Boolean
292 
293     /**
294      * **Deprecated** poll method.
295      *
296      * This method was deprecated in the favour of [tryReceive].
297      * It has proven itself as error-prone method in Channel API:
298      *
299      * * Nullable return type creates the false sense of security, implying that `null`
300      *    is returned instead of throwing an exception.
301      * * It was used mostly from non-suspending APIs where CancellationException triggered
302      *   internal failures in the application (the most common source of bugs).
303      * * Its name was not aligned with the rest of the API and tried to mimic Java's queue instead.
304      *
305      * See https://github.com/Kotlin/kotlinx.coroutines/issues/974 for more context.
306      *
307      * ### Replacement note
308      *
309      * The replacement `tryReceive().getOrNull()` is a default that ignores all close exceptions and
310      * proceeds with `null`, while `poll` throws an exception if the channel was closed with an exception.
311      * Replacement with the very same 'poll' semantics is `tryReceive().onClosed { if (it != null) throw it }.getOrNull()`
312      *
313      * @suppress **Deprecated**.
314      */
315     @Deprecated(
316         level = DeprecationLevel.ERROR,
317         message = "Deprecated in the favour of 'tryReceive'. " +
318             "Please note that the provided replacement does not rethrow channel's close cause as 'poll' did, " +
319             "for the precise replacement please refer to the 'poll' documentation",
320         replaceWith = ReplaceWith("tryReceive().getOrNull()")
321     ) // Warning since 1.5.0, error since 1.6.0
322     public fun poll(): E? {
323         val result = tryReceive()
324         if (result.isSuccess) return result.getOrThrow()
325         throw recoverStackTrace(result.exceptionOrNull() ?: return null)
326     }
327 
328     /**
329      * This function was deprecated since 1.3.0 and is no longer recommended to use
330      * or to implement in subclasses.
331      *
332      * It had the following pitfalls:
333      * - Didn't allow to distinguish 'null' as "closed channel" from "null as a value"
334      * - Was throwing if the channel has failed even though its signature may suggest it returns 'null'
335      * - It didn't really belong to core channel API and can be exposed as an extension instead.
336      *
337      * ### Replacement note
338      *
339      * The replacement `receiveCatching().getOrNull()` is a safe default that ignores all close exceptions and
340      * proceeds with `null`, while `receiveOrNull` throws an exception if the channel was closed with an exception.
341      * Replacement with the very same `receiveOrNull` semantics is `receiveCatching().onClosed { if (it != null) throw it }.getOrNull()`.
342      *
343      * @suppress **Deprecated**
344      */
345     @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
346     @LowPriorityInOverloadResolution
347     @Deprecated(
348         message = "Deprecated in favor of 'receiveCatching'. " +
349             "Please note that the provided replacement does not rethrow channel's close cause as 'receiveOrNull' did, " +
350             "for the detailed replacement please refer to the 'receiveOrNull' documentation",
351         level = DeprecationLevel.ERROR,
352         replaceWith = ReplaceWith("receiveCatching().getOrNull()")
353     ) // Warning since 1.3.0, error in 1.5.0, will be hidden in 1.6.0
receiveOrNullnull354     public suspend fun receiveOrNull(): E? = receiveCatching().getOrNull()
355 
356     /**
357      * This function was deprecated since 1.3.0 and is no longer recommended to use
358      * or to implement in subclasses.
359      * See [receiveOrNull] documentation.
360      *
361      * @suppress **Deprecated**: in favor of onReceiveCatching extension.
362      */
363     @Deprecated(
364         message = "Deprecated in favor of onReceiveCatching extension",
365         level = DeprecationLevel.ERROR,
366         replaceWith = ReplaceWith("onReceiveCatching")
367     ) // Warning since 1.3.0, error in 1.5.0, will be hidden or removed in 1.7.0
368     public val onReceiveOrNull: SelectClause1<E?>
369         get() {
370             return object : SelectClause1<E?> {
371                 @InternalCoroutinesApi
372                 override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (E?) -> R) {
373                     onReceiveCatching.registerSelectClause1(select) {
374                         it.exceptionOrNull()?.let { throw it }
375                         block(it.getOrNull())
376                     }
377                 }
378             }
379         }
380 }
381 
382 /**
383  * A discriminated union of channel operation result.
384  * It encapsulates the successful or failed result of a channel operation or a failed operation to a closed channel with
385  * an optional cause.
386  *
387  * The successful result represents a successful operation with a value of type [T], for example,
388  * the result of [Channel.receiveCatching] operation or a successfully sent element as a result of [Channel.trySend].
389  *
390  * The failed result represents a failed operation attempt to a channel, but it doesn't necessary indicate that the channel is failed.
391  * E.g. when the channel is full, [Channel.trySend] returns failed result, but the channel itself is not in the failed state.
392  *
393  * The closed result represents an operation attempt to a closed channel and also implies that the operation has failed.
394  * It is guaranteed that if the result is _closed_, then the target channel is either [closed for send][Channel.isClosedForSend]
395  * or is [closed for receive][Channel.isClosedForReceive] depending on whether the failed operation was sending or receiving.
396  */
397 @JvmInline
398 public value class ChannelResult<out T>
399 @PublishedApi internal constructor(@PublishedApi internal val holder: Any?) {
400     /**
401      * Returns `true` if this instance represents a successful
402      * operation outcome.
403      *
404      * In this case [isFailure] and [isClosed] return `false`.
405      */
406     public val isSuccess: Boolean get() = holder !is Failed
407 
408     /**
409      * Returns `true` if this instance represents unsuccessful operation.
410      *
411      * In this case [isSuccess] returns false, but it does not imply
412      * that the channel is failed or closed.
413      *
414      * Example of a failed operation without an exception and channel being closed
415      * is [Channel.trySend] attempt to a channel that is full.
416      */
417     public val isFailure: Boolean get() = holder is Failed
418 
419     /**
420      * Returns `true` if this instance represents unsuccessful operation
421      * to a closed or cancelled channel.
422      *
423      * In this case [isSuccess] returns `false`, [isFailure] returns `true`, but it does not imply
424      * that [exceptionOrNull] returns non-null value.
425      *
426      * It can happen if the channel was [closed][Channel.close] normally without an exception.
427      */
428     public val isClosed: Boolean get() = holder is Closed
429 
430     /**
431      * Returns the encapsulated value if this instance represents success or `null` if it represents failed result.
432      */
433     @Suppress("UNCHECKED_CAST")
getOrNullnull434     public fun getOrNull(): T? = if (holder !is Failed) holder as T else null
435 
436     /**
437      *  Returns the encapsulated value if this instance represents success or throws an exception if it is closed or failed.
438      */
439     public fun getOrThrow(): T {
440         @Suppress("UNCHECKED_CAST")
441         if (holder !is Failed) return holder as T
442         if (holder is Closed && holder.cause != null) throw holder.cause
443         error("Trying to call 'getOrThrow' on a failed channel result: $holder")
444     }
445 
446     /**
447      * Returns the encapsulated exception if this instance represents failure or `null` if it is success
448      * or unsuccessful operation to closed channel.
449      */
exceptionOrNullnull450     public fun exceptionOrNull(): Throwable? = (holder as? Closed)?.cause
451 
452     internal open class Failed {
453         override fun toString(): String = "Failed"
454     }
455 
456     internal class Closed(@JvmField val cause: Throwable?): Failed() {
equalsnull457         override fun equals(other: Any?): Boolean = other is Closed && cause == other.cause
458         override fun hashCode(): Int = cause.hashCode()
459         override fun toString(): String = "Closed($cause)"
460     }
461 
462     @Suppress("NOTHING_TO_INLINE")
463     @InternalCoroutinesApi
464     public companion object {
465         private val failed = Failed()
466 
467         @InternalCoroutinesApi
468         public fun <E> success(value: E): ChannelResult<E> =
469             ChannelResult(value)
470 
471         @InternalCoroutinesApi
472         public fun <E> failure(): ChannelResult<E> =
473             ChannelResult(failed)
474 
475         @InternalCoroutinesApi
476         public fun <E> closed(cause: Throwable?): ChannelResult<E> =
477             ChannelResult(Closed(cause))
478     }
479 
toStringnull480     public override fun toString(): String =
481         when (holder) {
482             is Closed -> holder.toString()
483             else -> "Value($holder)"
484         }
485 }
486 
487 /**
488  * Returns the encapsulated value if this instance represents [success][ChannelResult.isSuccess] or the
489  * result of [onFailure] function for the encapsulated [Throwable] exception if it is failed or closed
490  * result.
491  */
492 @OptIn(ExperimentalContracts::class)
getOrElsenull493 public inline fun <T> ChannelResult<T>.getOrElse(onFailure: (exception: Throwable?) -> T): T {
494     contract {
495         callsInPlace(onFailure, InvocationKind.AT_MOST_ONCE)
496     }
497     @Suppress("UNCHECKED_CAST")
498     return if (holder is ChannelResult.Failed) onFailure(exceptionOrNull()) else holder as T
499 }
500 
501 /**
502  * Performs the given [action] on the encapsulated value if this instance represents [success][ChannelResult.isSuccess].
503  * Returns the original `ChannelResult` unchanged.
504  */
505 @OptIn(ExperimentalContracts::class)
onSuccessnull506 public inline fun <T> ChannelResult<T>.onSuccess(action: (value: T) -> Unit): ChannelResult<T> {
507     contract {
508         callsInPlace(action, InvocationKind.AT_MOST_ONCE)
509     }
510     @Suppress("UNCHECKED_CAST")
511     if (holder !is ChannelResult.Failed) action(holder as T)
512     return this
513 }
514 
515 /**
516  * Performs the given [action] on the encapsulated [Throwable] exception if this instance represents [failure][ChannelResult.isFailure].
517  * The result of [ChannelResult.exceptionOrNull] is passed to the [action] parameter.
518  *
519  * Returns the original `ChannelResult` unchanged.
520  */
521 @OptIn(ExperimentalContracts::class)
onFailurenull522 public inline fun <T> ChannelResult<T>.onFailure(action: (exception: Throwable?) -> Unit): ChannelResult<T> {
523     contract {
524         callsInPlace(action, InvocationKind.AT_MOST_ONCE)
525     }
526     @Suppress("UNCHECKED_CAST")
527     if (holder is ChannelResult.Failed) action(exceptionOrNull())
528     return this
529 }
530 
531 /**
532  * Performs the given [action] on the encapsulated [Throwable] exception if this instance represents [failure][ChannelResult.isFailure]
533  * due to channel being [closed][Channel.close].
534  * The result of [ChannelResult.exceptionOrNull] is passed to the [action] parameter.
535  * It is guaranteed that if action is invoked, then the channel is either [closed for send][Channel.isClosedForSend]
536  * or is [closed for receive][Channel.isClosedForReceive] depending on the failed operation.
537  *
538  * Returns the original `ChannelResult` unchanged.
539  */
540 @OptIn(ExperimentalContracts::class)
onClosednull541 public inline fun <T> ChannelResult<T>.onClosed(action: (exception: Throwable?) -> Unit): ChannelResult<T> {
542     contract {
543         callsInPlace(action, InvocationKind.AT_MOST_ONCE)
544     }
545     @Suppress("UNCHECKED_CAST")
546     if (holder is ChannelResult.Closed) action(exceptionOrNull())
547     return this
548 }
549 
550 /**
551  * Iterator for [ReceiveChannel]. Instances of this interface are *not thread-safe* and shall not be used
552  * from concurrent coroutines.
553  */
554 public interface ChannelIterator<out E> {
555     /**
556      * Returns `true` if the channel has more elements, suspending the caller while this channel is empty,
557      * or returns `false` if the channel [is closed for `receive`][ReceiveChannel.isClosedForReceive] without a cause.
558      * It throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
559      *
560      * This function retrieves and removes an element from this channel for the subsequent invocation
561      * of [next].
562      *
563      * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
564      * function is suspended, this function immediately resumes with a [CancellationException].
565      * There is a **prompt cancellation guarantee**. If the job was cancelled while this function was
566      * suspended, it will not resume successfully. The `hasNext` call can retrieve the element from the channel,
567      * but then throw [CancellationException], thus failing to deliver the element.
568      * See "Undelivered elements" section in [Channel] documentation for details on handling undelivered elements.
569      *
570      * Note that this function does not check for cancellation when it is not suspended.
571      * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
572      */
hasNextnull573     public suspend operator fun hasNext(): Boolean
574 
575     @Deprecated(message = "Since 1.3.0, binary compatibility with versions <= 1.2.x", level = DeprecationLevel.HIDDEN)
576     @Suppress("INAPPLICABLE_JVM_NAME")
577     @JvmName("next")
578     public suspend fun next0(): E {
579         /*
580          * Before 1.3.0 the "next()" could have been used without invoking "hasNext" first and there were code samples
581          * demonstrating this behavior, so we preserve this logic for full binary backwards compatibility with previously
582          * compiled code.
583          */
584         if (!hasNext()) throw ClosedReceiveChannelException(DEFAULT_CLOSE_MESSAGE)
585         return next()
586     }
587 
588     /**
589      * Retrieves the element removed from the channel by a preceding call to [hasNext], or
590      * throws an [IllegalStateException] if [hasNext] was not invoked.
591      * This method should only be used in pair with [hasNext]:
592      * ```
593      * while (iterator.hasNext()) {
594      *     val element = iterator.next()
595      *     // ... handle element ...
596      * }
597      * ```
598      *
599      * This method throws a [ClosedReceiveChannelException] if the channel [is closed for `receive`][ReceiveChannel.isClosedForReceive] without a cause.
600      * It throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
601      */
nextnull602     public operator fun next(): E
603 }
604 
605 /**
606  * Channel is a non-blocking primitive for communication between a sender (via [SendChannel]) and a receiver (via [ReceiveChannel]).
607  * Conceptually, a channel is similar to Java's [BlockingQueue][java.util.concurrent.BlockingQueue],
608  * but it has suspending operations instead of blocking ones and can be [closed][SendChannel.close].
609  *
610  * ### Creating channels
611  *
612  * The `Channel(capacity)` factory function is used to create channels of different kinds depending on
613  * the value of the `capacity` integer:
614  *
615  * * When `capacity` is 0 &mdash; it creates a _rendezvous_ channel.
616  *   This channel does not have any buffer at all. An element is transferred from the sender
617  *   to the receiver only when [send] and [receive] invocations meet in time (rendezvous), so [send] suspends
618  *   until another coroutine invokes [receive], and [receive] suspends until another coroutine invokes [send].
619  *
620  * * When `capacity` is [Channel.UNLIMITED] &mdash; it creates a channel with effectively unlimited buffer.
621  *   This channel has a linked-list buffer of unlimited capacity (limited only by available memory).
622  *   [Sending][send] to this channel never suspends, and [trySend] always succeeds.
623  *
624  * * When `capacity` is [Channel.CONFLATED] &mdash; it creates a _conflated_ channel
625  *   This channel buffers at most one element and conflates all subsequent `send` and `trySend` invocations,
626  *   so that the receiver always gets the last element sent.
627  *   Back-to-back sent elements are conflated &mdash; only the last sent element is received,
628  *   while previously sent elements **are lost**.
629  *   [Sending][send] to this channel never suspends, and [trySend] always succeeds.
630  *
631  * * When `capacity` is positive but less than [UNLIMITED] &mdash; it creates an array-based channel with the specified capacity.
632  *   This channel has an array buffer of a fixed `capacity`.
633  *   [Sending][send] suspends only when the buffer is full, and [receiving][receive] suspends only when the buffer is empty.
634  *
635  * Buffered channels can be configured with an additional [`onBufferOverflow`][BufferOverflow] parameter. It controls the behaviour
636  * of the channel's [send][Channel.send] function on buffer overflow:
637  *
638  * * [SUSPEND][BufferOverflow.SUSPEND] &mdash; the default, suspend `send` on buffer overflow until there is
639  *   free space in the buffer.
640  * * [DROP_OLDEST][BufferOverflow.DROP_OLDEST] &mdash; do not suspend the `send`, add the latest value to the buffer,
641  *   drop the oldest one from the buffer.
642  *   A channel with `capacity = 1` and `onBufferOverflow = DROP_OLDEST` is a _conflated_ channel.
643  * * [DROP_LATEST][BufferOverflow.DROP_LATEST] &mdash; do not suspend the `send`, drop the value that is being sent,
644  *   keep the buffer contents intact.
645  *
646  * A non-default `onBufferOverflow` implicitly creates a channel with at least one buffered element and
647  * is ignored for a channel with unlimited buffer. It cannot be specified for `capacity = CONFLATED`, which
648  * is a shortcut by itself.
649  *
650  * ### Prompt cancellation guarantee
651  *
652  * All suspending functions with channels provide **prompt cancellation guarantee**.
653  * If the job was cancelled while send or receive function was suspended, it will not resume successfully,
654  * but throws a [CancellationException].
655  * With a single-threaded [dispatcher][CoroutineDispatcher] like [Dispatchers.Main] this gives a
656  * guarantee that if a piece code running in this thread cancels a [Job], then a coroutine running this job cannot
657  * resume successfully and continue to run, ensuring a prompt response to its cancellation.
658  *
659  * > **Prompt cancellation guarantee** for channel operations was added since `kotlinx.coroutines` version `1.4.0`
660  * > and had replaced a channel-specific atomic-cancellation that was not consistent with other suspending functions.
661  * > The low-level mechanics of prompt cancellation are explained in [suspendCancellableCoroutine] function.
662  *
663  * ### Undelivered elements
664  *
665  * As a result of a prompt cancellation guarantee, when a closeable resource
666  * (like open file or a handle to another native resource) is transferred via channel from one coroutine to another
667  * it can fail to be delivered and will be lost if either send or receive operations are cancelled in transit.
668  *
669  * A `Channel()` constructor function has an `onUndeliveredElement` optional parameter.
670  * When `onUndeliveredElement` parameter is set, the corresponding function is called once for each element
671  * that was sent to the channel with the call to the [send][SendChannel.send] function but failed to be delivered,
672  * which can happen in the following cases:
673  *
674  * * When [send][SendChannel.send] operation throws an exception because it was cancelled before it had a chance to actually
675  *   send the element or because the channel was [closed][SendChannel.close] or [cancelled][ReceiveChannel.cancel].
676  * * When [receive][ReceiveChannel.receive], [receiveOrNull][ReceiveChannel.receiveOrNull], or [hasNext][ChannelIterator.hasNext]
677  *   operation throws an exception when it had retrieved the element from the
678  *   channel but was cancelled before the code following the receive call resumed.
679  * * The channel was [cancelled][ReceiveChannel.cancel], in which case `onUndeliveredElement` is called on every
680  *   remaining element in the channel's buffer.
681  *
682  * Note, that `onUndeliveredElement` function is called synchronously in an arbitrary context. It should be fast, non-blocking,
683  * and should not throw exceptions. Any exception thrown by `onUndeliveredElement` is wrapped into an internal runtime
684  * exception which is either rethrown from the caller method or handed off to the exception handler in the current context
685  * (see [CoroutineExceptionHandler]) when one is available.
686  *
687  * A typical usage for `onUndeliveredElement` is to close a resource that is being transferred via the channel. The
688  * following code pattern guarantees that opened resources are closed even if producer, consumer, and/or channel
689  * are cancelled. Resources are never lost.
690  *
691  * ```
692  * // Create the channel with onUndeliveredElement block that closes a resource
693  * val channel = Channel<Resource>(capacity) { resource -> resource.close() }
694  *
695  * // Producer code
696  * val resourceToSend = openResource()
697  * channel.send(resourceToSend)
698  *
699  * // Consumer code
700  * val resourceReceived = channel.receive()
701  * try {
702  *     // work with received resource
703  * } finally {
704  *     resourceReceived.close()
705  * }
706  * ```
707  *
708  * > Note, that if you do any kind of work in between `openResource()` and `channel.send(...)`, then you should
709  * > ensure that resource gets closed in case this additional code fails.
710  */
711 public interface Channel<E> : SendChannel<E>, ReceiveChannel<E> {
712     /**
713      * Constants for the channel factory function `Channel()`.
714      */
715     public companion object Factory {
716         /**
717          * Requests a channel with an unlimited capacity buffer in the `Channel(...)` factory function.
718          */
719         public const val UNLIMITED: Int = Int.MAX_VALUE
720 
721         /**
722          * Requests a rendezvous channel in the `Channel(...)` factory function &mdash; a channel that does not have a buffer.
723          */
724         public const val RENDEZVOUS: Int = 0
725 
726         /**
727          * Requests a conflated channel in the `Channel(...)` factory function. This is a shortcut to creating
728          * a channel with [`onBufferOverflow = DROP_OLDEST`][BufferOverflow.DROP_OLDEST].
729          */
730         public const val CONFLATED: Int = -1
731 
732         /**
733          * Requests a buffered channel with the default buffer capacity in the `Channel(...)` factory function.
734          * The default capacity for a channel that [suspends][BufferOverflow.SUSPEND] on overflow
735          * is 64 and can be overridden by setting [DEFAULT_BUFFER_PROPERTY_NAME] on JVM.
736          * For non-suspending channels, a buffer of capacity 1 is used.
737          */
738         public const val BUFFERED: Int = -2
739 
740         // only for internal use, cannot be used with Channel(...)
741         internal const val OPTIONAL_CHANNEL = -3
742 
743         /**
744          * Name of the property that defines the default channel capacity when
745          * [BUFFERED] is used as parameter in `Channel(...)` factory function.
746          */
747         public const val DEFAULT_BUFFER_PROPERTY_NAME: String = "kotlinx.coroutines.channels.defaultBuffer"
748 
749         internal val CHANNEL_DEFAULT_CAPACITY = systemProp(DEFAULT_BUFFER_PROPERTY_NAME,
750             64, 1, UNLIMITED - 1
751         )
752     }
753 }
754 
755 /**
756  * Creates a channel with the specified buffer capacity (or without a buffer by default).
757  * See [Channel] interface documentation for details.
758  *
759  * @param capacity either a positive channel capacity or one of the constants defined in [Channel.Factory].
760  * @param onBufferOverflow configures an action on buffer overflow (optional, defaults to
761  *   a [suspending][BufferOverflow.SUSPEND] attempt to [send][Channel.send] a value,
762  *   supported only when `capacity >= 0` or `capacity == Channel.BUFFERED`,
763  *   implicitly creates a channel with at least one buffered element).
764  * @param onUndeliveredElement an optional function that is called when element was sent but was not delivered to the consumer.
765  *   See "Undelivered elements" section in [Channel] documentation.
766  * @throws IllegalArgumentException when [capacity] < -2
767  */
Channelnull768 public fun <E> Channel(
769     capacity: Int = RENDEZVOUS,
770     onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
771     onUndeliveredElement: ((E) -> Unit)? = null
772 ): Channel<E> =
773     when (capacity) {
774         RENDEZVOUS -> {
775             if (onBufferOverflow == BufferOverflow.SUSPEND)
776                 RendezvousChannel(onUndeliveredElement) // an efficient implementation of rendezvous channel
777             else
778                 ArrayChannel(1, onBufferOverflow, onUndeliveredElement) // support buffer overflow with buffered channel
779         }
780         CONFLATED -> {
781             require(onBufferOverflow == BufferOverflow.SUSPEND) {
782                 "CONFLATED capacity cannot be used with non-default onBufferOverflow"
783             }
784             ConflatedChannel(onUndeliveredElement)
785         }
786         UNLIMITED -> LinkedListChannel(onUndeliveredElement) // ignores onBufferOverflow: it has buffer, but it never overflows
787         BUFFERED -> ArrayChannel( // uses default capacity with SUSPEND
788             if (onBufferOverflow == BufferOverflow.SUSPEND) CHANNEL_DEFAULT_CAPACITY else 1,
789             onBufferOverflow, onUndeliveredElement
790         )
791         else -> {
792             if (capacity == 1 && onBufferOverflow == BufferOverflow.DROP_OLDEST)
793                 ConflatedChannel(onUndeliveredElement) // conflated implementation is more efficient but appears to work in the same way
794             else
795                 ArrayChannel(capacity, onBufferOverflow, onUndeliveredElement)
796         }
797     }
798 
799 @Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.4.0, binary compatibility with earlier versions")
Channelnull800 public fun <E> Channel(capacity: Int = RENDEZVOUS): Channel<E> = Channel(capacity)
801 
802 /**
803  * Indicates an attempt to [send][SendChannel.send] to a [isClosedForSend][SendChannel.isClosedForSend] channel
804  * that was closed without a cause. A _failed_ channel rethrows the original [close][SendChannel.close] cause
805  * exception on send attempts.
806  *
807  * This exception is a subclass of [IllegalStateException], because, conceptually, it is the sender's responsibility
808  * to close the channel and not try to send anything thereafter. Attempts to
809  * send to a closed channel indicate a logical error in the sender's code.
810  */
811 public class ClosedSendChannelException(message: String?) : IllegalStateException(message)
812 
813 /**
814  * Indicates an attempt to [receive][ReceiveChannel.receive] from a [isClosedForReceive][ReceiveChannel.isClosedForReceive]
815  * channel that was closed without a cause. A _failed_ channel rethrows the original [close][SendChannel.close] cause
816  * exception on receive attempts.
817  *
818  * This exception is a subclass of [NoSuchElementException] to be consistent with plain collections.
819  */
820 public class ClosedReceiveChannelException(message: String?) : NoSuchElementException(message)
821