• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 @file:Suppress("FunctionName")
6 
7 package kotlinx.coroutines.channels
8 
9 import kotlinx.coroutines.*
10 import kotlinx.coroutines.channels.Channel.Factory.BUFFERED
11 import kotlinx.coroutines.channels.Channel.Factory.CHANNEL_DEFAULT_CAPACITY
12 import kotlinx.coroutines.channels.Channel.Factory.CONFLATED
13 import kotlinx.coroutines.channels.Channel.Factory.RENDEZVOUS
14 import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
15 import kotlinx.coroutines.internal.*
16 import kotlinx.coroutines.selects.*
17 import kotlin.contracts.*
18 import kotlin.internal.*
19 import kotlin.jvm.*
20 
21 /**
22  * Sender's interface to [Channel].
23  */
24 public interface SendChannel<in E> {
25     /**
26      * Returns `true` if this channel was closed by an invocation of [close] or its receiving side was [cancelled][ReceiveChannel.cancel].
27      * This means that calling [send] will result in an exception.
28      *
29      * Note that if this property returns `false`, it does not guarantee that consecutive call to [send] will succeed, as the
30      * channel can be concurrently closed right after the check. For such scenarios, it is recommended to use [trySend] instead.
31      *
32      * @see SendChannel.trySend
33      * @see SendChannel.close
34      * @see ReceiveChannel.cancel
35      */
36     @DelicateCoroutinesApi
37     public val isClosedForSend: Boolean
38 
39     /**
40      * Sends the specified [element] to this channel, suspending the caller while the buffer of this channel is full
41      * or if it does not exist, or throws an exception if the channel [is closed for `send`][isClosedForSend] (see [close] for details).
42      *
43      * [Closing][close] a channel _after_ this function has suspended does not cause this suspended [send] invocation
44      * to abort, because closing a channel is conceptually like sending a special "close token" over this channel.
45      * All elements sent over the channel are delivered in first-in first-out order. The sent element
46      * will be delivered to receivers before the close token.
47      *
48      * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
49      * function is suspended, this function immediately resumes with a [CancellationException].
50      * There is a **prompt cancellation guarantee**. If the job was cancelled while this function was
51      * suspended, it will not resume successfully. The `send` call can send the element to the channel,
52      * but then throw [CancellationException], thus an exception should not be treated as a failure to deliver the element.
53      * See "Undelivered elements" section in [Channel] documentation for details on handling undelivered elements.
54      *
55      * Note that this function does not check for cancellation when it is not suspended.
56      * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
57      *
58      * This function can be used in [select] invocations with the [onSend] clause.
59      * Use [trySend] to try sending to this channel without waiting.
60      */
sendnull61     public suspend fun send(element: E)
62 
63     /**
64      * Clause for the [select] expression of the [send] suspending function that selects when the element that is specified
65      * as the parameter is sent to the channel. When the clause is selected, the reference to this channel
66      * is passed into the corresponding block.
67      *
68      * The [select] invocation fails with an exception if the channel [is closed for `send`][isClosedForSend] (see [close] for details).
69      */
70     public val onSend: SelectClause2<E, SendChannel<E>>
71 
72     /**
73      * Immediately adds the specified [element] to this channel, if this doesn't violate its capacity restrictions,
74      * and returns the successful result. Otherwise, returns failed or closed result.
75      * This is synchronous variant of [send], which backs off in situations when `send` suspends or throws.
76      *
77      * When `trySend` call returns a non-successful result, it guarantees that the element was not delivered to the consumer, and
78      * it does not call `onUndeliveredElement` that was installed for this channel.
79      * See "Undelivered elements" section in [Channel] documentation for details on handling undelivered elements.
80      */
81     public fun trySend(element: E): ChannelResult<Unit>
82 
83     /**
84      * Closes this channel.
85      * This is an idempotent operation &mdash; subsequent invocations of this function have no effect and return `false`.
86      * Conceptually, it sends a special "close token" over this channel.
87      *
88      * Immediately after invocation of this function,
89      * [isClosedForSend] starts returning `true`. However, [isClosedForReceive][ReceiveChannel.isClosedForReceive]
90      * on the side of [ReceiveChannel] starts returning `true` only after all previously sent elements
91      * are received.
92      *
93      * A channel that was closed without a [cause] throws a [ClosedSendChannelException] on attempts to [send]
94      * and [ClosedReceiveChannelException] on attempts to [receive][ReceiveChannel.receive].
95      * A channel that was closed with non-null [cause] is called a _failed_ channel. Attempts to send or
96      * receive on a failed channel throw the specified [cause] exception.
97      */
98     public fun close(cause: Throwable? = null): Boolean
99 
100     /**
101      * Registers a [handler] which is synchronously invoked once the channel is [closed][close]
102      * or the receiving side of this channel is [cancelled][ReceiveChannel.cancel].
103      * Only one handler can be attached to a channel during its lifetime.
104      * The `handler` is invoked when [isClosedForSend] starts to return `true`.
105      * If the channel is closed already, the handler is invoked immediately.
106      *
107      * The meaning of `cause` that is passed to the handler:
108      * - `null` if the channel was closed normally without the corresponding argument.
109      * - Instance of [CancellationException] if the channel was cancelled normally without the corresponding argument.
110      * - The cause of `close` or `cancel` otherwise.
111      *
112      * ### Execution context and exception safety
113      *
114      * The [handler] is executed as part of the closing or cancelling operation, and only after the channel reaches its final state.
115      * This means that if the handler throws an exception or hangs, the channel will still be successfully closed or cancelled.
116      * Unhandled exceptions from [handler] are propagated to the closing or cancelling operation's caller.
117      *
118      * Example of usage:
119      * ```
120      * val events = Channel<Event>(UNLIMITED)
121      * callbackBasedApi.registerCallback { event ->
122      *   events.trySend(event)
123      *       .onClosed { /* channel is already closed, but the callback hasn't stopped yet */ }
124      * }
125      *
126      * val uiUpdater = uiScope.launch(Dispatchers.Main) {
127      *    events.consume { /* handle events */ }
128      * }
129      * // Stop the callback after the channel is closed or cancelled
130      * events.invokeOnClose { callbackBasedApi.stop() }
131      * ```
132      *
133      * **Stability note.** This function constitutes a stable API surface, with the only exception being
134      * that an [IllegalStateException] is thrown when multiple handlers are registered.
135      * This restriction could be lifted in the future.
136      *
137      * @throws UnsupportedOperationException if the underlying channel does not support [invokeOnClose].
138      * Implementation note: currently, [invokeOnClose] is unsupported only by Rx-like integrations
139      *
140      * @throws IllegalStateException if another handler was already registered
141      */
142     public fun invokeOnClose(handler: (cause: Throwable?) -> Unit)
143 
144     /**
145      * **Deprecated** offer method.
146      *
147      * This method was deprecated in the favour of [trySend].
148      * It has proven itself as the most error-prone method in Channel API:
149      *
150      * * `Boolean` return type creates the false sense of security, implying that `false`
151      *    is returned instead of throwing an exception.
152      * * It was used mostly from non-suspending APIs where CancellationException triggered
153      *   internal failures in the application (the most common source of bugs).
154      * * Due to signature and explicit `if (ch.offer(...))` checks it was easy to
155      *   oversee such error during code review.
156      * * Its name was not aligned with the rest of the API and tried to mimic Java's queue instead.
157      *
158      * **NB** Automatic migration provides best-effort for the user experience, but requires removal
159      * or adjusting of the code that relied on the exception handling.
160      * The complete replacement has a more verbose form:
161      * ```
162      * channel.trySend(element)
163      *     .onClosed { throw it ?: ClosedSendChannelException("Channel was closed normally") }
164      *     .isSuccess
165      * ```
166      *
167      * See https://github.com/Kotlin/kotlinx.coroutines/issues/974 for more context.
168      *
169      * @suppress **Deprecated**.
170      */
171     @Deprecated(
172         level = DeprecationLevel.ERROR,
173         message = "Deprecated in the favour of 'trySend' method",
174         replaceWith = ReplaceWith("trySend(element).isSuccess")
175     ) // Warning since 1.5.0, error since 1.6.0, not hidden until 1.8+ because API is quite widespread
176     public fun offer(element: E): Boolean {
177         val result = trySend(element)
178         if (result.isSuccess) return true
179         throw recoverStackTrace(result.exceptionOrNull() ?: return false)
180     }
181 }
182 
183 /**
184  * Receiver's interface to [Channel].
185  */
186 public interface ReceiveChannel<out E> {
187     /**
188      * Returns `true` if this channel was closed by invocation of [close][SendChannel.close] on the [SendChannel]
189      * side and all previously sent items were already received, or if the receiving side was [cancelled][ReceiveChannel.cancel].
190      *
191      * This means that calling [receive] will result in a [ClosedReceiveChannelException] or a corresponding cancellation cause.
192      * If the channel was closed because of an exception, it is considered closed, too, but is called a _failed_ channel.
193      * All suspending attempts to receive an element from a failed channel throw the original [close][SendChannel.close] cause exception.
194      *
195      * Note that if this property returns `false`, it does not guarantee that consecutive call to [receive] will succeed, as the
196      * channel can be concurrently closed right after the check. For such scenarios, it is recommended to use [receiveCatching] instead.
197      *
198      * @see ReceiveChannel.receiveCatching
199      * @see ReceiveChannel.cancel
200      * @see SendChannel.close
201      */
202     @DelicateCoroutinesApi
203     public val isClosedForReceive: Boolean
204 
205     /**
206      * Returns `true` if the channel is empty (contains no elements), which means that an attempt to [receive] will suspend.
207      * This function returns `false` if the channel [is closed for `receive`][isClosedForReceive].
208      */
209     @ExperimentalCoroutinesApi
210     public val isEmpty: Boolean
211 
212     /**
213      * Retrieves and removes an element from this channel if it's not empty, or suspends the caller while the channel is empty,
214      * or throws a [ClosedReceiveChannelException] if the channel [is closed for `receive`][isClosedForReceive].
215      * If the channel was closed because of an exception, it is called a _failed_ channel and this function
216      * will throw the original [close][SendChannel.close] cause exception.
217      *
218      * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
219      * function is suspended, this function immediately resumes with a [CancellationException].
220      * There is a **prompt cancellation guarantee**. If the job was cancelled while this function was
221      * suspended, it will not resume successfully. The `receive` call can retrieve the element from the channel,
222      * but then throw [CancellationException], thus failing to deliver the element.
223      * See "Undelivered elements" section in [Channel] documentation for details on handling undelivered elements.
224      *
225      * Note that this function does not check for cancellation when it is not suspended.
226      * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
227      *
228      * This function can be used in [select] invocations with the [onReceive] clause.
229      * Use [tryReceive] to try receiving from this channel without waiting.
230      */
receivenull231     public suspend fun receive(): E
232 
233     /**
234      * Clause for the [select] expression of the [receive] suspending function that selects with the element
235      * received from the channel.
236      * The [select] invocation fails with an exception if the channel
237      * [is closed for `receive`][isClosedForReceive] (see [close][SendChannel.close] for details).
238      */
239     public val onReceive: SelectClause1<E>
240 
241     /**
242      * Retrieves and removes an element from this channel if it's not empty, or suspends the caller while this channel is empty.
243      * This method returns [ChannelResult] with the value of an element successfully retrieved from the channel
244      * or the close cause if the channel was closed. Closed cause may be `null` if the channel was closed normally.
245      * The result cannot be [failed][ChannelResult.isFailure] without being [closed][ChannelResult.isClosed].
246      *
247      * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
248      * function is suspended, this function immediately resumes with a [CancellationException].
249      * There is a **prompt cancellation guarantee**. If the job was cancelled while this function was
250      * suspended, it will not resume successfully. The `receiveCatching` call can retrieve the element from the channel,
251      * but then throw [CancellationException], thus failing to deliver the element.
252      * See "Undelivered elements" section in [Channel] documentation for details on handling undelivered elements.
253      *
254      * Note that this function does not check for cancellation when it is not suspended.
255      * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
256      *
257      * This function can be used in [select] invocations with the [onReceiveCatching] clause.
258      * Use [tryReceive] to try receiving from this channel without waiting.
259      */
260     public suspend fun receiveCatching(): ChannelResult<E>
261 
262     /**
263      * Clause for the [select] expression of the [onReceiveCatching] suspending function that selects with the [ChannelResult] with a value
264      * that is received from the channel or with a close cause if the channel
265      * [is closed for `receive`][isClosedForReceive].
266      */
267     public val onReceiveCatching: SelectClause1<ChannelResult<E>>
268 
269     /**
270      * Retrieves and removes an element from this channel if it's not empty, returning a [successful][ChannelResult.success]
271      * result, returns [failed][ChannelResult.failed] result if the channel is empty, and [closed][ChannelResult.closed]
272      * result if the channel is closed.
273      */
274     public fun tryReceive(): ChannelResult<E>
275 
276     /**
277      * Returns a new iterator to receive elements from this channel using a `for` loop.
278      * Iteration completes normally when the channel [is closed for `receive`][isClosedForReceive] without a cause and
279      * throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
280      */
281     public operator fun iterator(): ChannelIterator<E>
282 
283     /**
284      * Cancels reception of remaining elements from this channel with an optional [cause].
285      * This function closes the channel and removes all buffered sent elements from it.
286      *
287      * A cause can be used to specify an error message or to provide other details on
288      * the cancellation reason for debugging purposes.
289      * If the cause is not specified, then an instance of [CancellationException] with a
290      * default message is created to [close][SendChannel.close] the channel.
291      *
292      * Immediately after invocation of this function [isClosedForReceive] and
293      * [isClosedForSend][SendChannel.isClosedForSend]
294      * on the side of [SendChannel] start returning `true`. Any attempt to send to or receive from this channel
295      * will lead to a [CancellationException].
296      */
297     public fun cancel(cause: CancellationException? = null)
298 
299     /**
300      * @suppress This method implements old version of JVM ABI. Use [cancel].
301      */
302     @Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.2.0, binary compatibility with versions <= 1.1.x")
303     public fun cancel(): Unit = cancel(null)
304 
305     /**
306      * @suppress This method has bad semantics when cause is not a [CancellationException]. Use [cancel].
307      */
308     @Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.2.0, binary compatibility with versions <= 1.1.x")
309     public fun cancel(cause: Throwable? = null): Boolean
310 
311     /**
312      * **Deprecated** poll method.
313      *
314      * This method was deprecated in the favour of [tryReceive].
315      * It has proven itself as error-prone method in Channel API:
316      *
317      * * Nullable return type creates the false sense of security, implying that `null`
318      *    is returned instead of throwing an exception.
319      * * It was used mostly from non-suspending APIs where CancellationException triggered
320      *   internal failures in the application (the most common source of bugs).
321      * * Its name was not aligned with the rest of the API and tried to mimic Java's queue instead.
322      *
323      * See https://github.com/Kotlin/kotlinx.coroutines/issues/974 for more context.
324      *
325      * ### Replacement note
326      *
327      * The replacement `tryReceive().getOrNull()` is a default that ignores all close exceptions and
328      * proceeds with `null`, while `poll` throws an exception if the channel was closed with an exception.
329      * Replacement with the very same 'poll' semantics is `tryReceive().onClosed { if (it != null) throw it }.getOrNull()`
330      *
331      * @suppress **Deprecated**.
332      */
333     @Deprecated(
334         level = DeprecationLevel.ERROR,
335         message = "Deprecated in the favour of 'tryReceive'. " +
336             "Please note that the provided replacement does not rethrow channel's close cause as 'poll' did, " +
337             "for the precise replacement please refer to the 'poll' documentation",
338         replaceWith = ReplaceWith("tryReceive().getOrNull()")
339     ) // Warning since 1.5.0, error since 1.6.0, not hidden until 1.8+ because API is quite widespread
340     public fun poll(): E? {
341         val result = tryReceive()
342         if (result.isSuccess) return result.getOrThrow()
343         throw recoverStackTrace(result.exceptionOrNull() ?: return null)
344     }
345 
346     /**
347      * This function was deprecated since 1.3.0 and is no longer recommended to use
348      * or to implement in subclasses.
349      *
350      * It had the following pitfalls:
351      * - Didn't allow to distinguish 'null' as "closed channel" from "null as a value"
352      * - Was throwing if the channel has failed even though its signature may suggest it returns 'null'
353      * - It didn't really belong to core channel API and can be exposed as an extension instead.
354      *
355      * ### Replacement note
356      *
357      * The replacement `receiveCatching().getOrNull()` is a safe default that ignores all close exceptions and
358      * proceeds with `null`, while `receiveOrNull` throws an exception if the channel was closed with an exception.
359      * Replacement with the very same `receiveOrNull` semantics is `receiveCatching().onClosed { if (it != null) throw it }.getOrNull()`.
360      *
361      * @suppress **Deprecated**
362      */
363     @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
364     @LowPriorityInOverloadResolution
365     @Deprecated(
366         message = "Deprecated in favor of 'receiveCatching'. " +
367             "Please note that the provided replacement does not rethrow channel's close cause as 'receiveOrNull' did, " +
368             "for the detailed replacement please refer to the 'receiveOrNull' documentation",
369         level = DeprecationLevel.ERROR,
370         replaceWith = ReplaceWith("receiveCatching().getOrNull()")
371     ) // Warning since 1.3.0, error in 1.5.0, cannot be hidden due to deprecated extensions
receiveOrNullnull372     public suspend fun receiveOrNull(): E? = receiveCatching().getOrNull()
373 
374     /**
375      * This function was deprecated since 1.3.0 and is no longer recommended to use
376      * or to implement in subclasses.
377      * See [receiveOrNull] documentation.
378      *
379      * @suppress **Deprecated**: in favor of onReceiveCatching extension.
380      */
381     @Suppress("DEPRECATION_ERROR")
382     @Deprecated(
383         message = "Deprecated in favor of onReceiveCatching extension",
384         level = DeprecationLevel.ERROR,
385         replaceWith = ReplaceWith("onReceiveCatching")
386     ) // Warning since 1.3.0, error in 1.5.0, will be hidden or removed in 1.7.0
387     public val onReceiveOrNull: SelectClause1<E?> get() = (this as BufferedChannel<E>).onReceiveOrNull
388 }
389 
390 /**
391  * A discriminated union of channel operation result.
392  * It encapsulates the successful or failed result of a channel operation or a failed operation to a closed channel with
393  * an optional cause.
394  *
395  * The successful result represents a successful operation with a value of type [T], for example,
396  * the result of [Channel.receiveCatching] operation or a successfully sent element as a result of [Channel.trySend].
397  *
398  * The failed result represents a failed operation attempt to a channel, but it doesn't necessarily indicate that the channel is failed.
399  * E.g. when the channel is full, [Channel.trySend] returns failed result, but the channel itself is not in the failed state.
400  *
401  * The closed result represents an operation attempt to a closed channel and also implies that the operation has failed.
402  * It is guaranteed that if the result is _closed_, then the target channel is either [closed for send][Channel.isClosedForSend]
403  * or is [closed for receive][Channel.isClosedForReceive] depending on whether the failed operation was sending or receiving.
404  */
405 @JvmInline
406 public value class ChannelResult<out T>
407 @PublishedApi internal constructor(@PublishedApi internal val holder: Any?) {
408     /**
409      * Returns `true` if this instance represents a successful
410      * operation outcome.
411      *
412      * In this case [isFailure] and [isClosed] return `false`.
413      */
414     public val isSuccess: Boolean get() = holder !is Failed
415 
416     /**
417      * Returns `true` if this instance represents unsuccessful operation.
418      *
419      * In this case [isSuccess] returns false, but it does not imply
420      * that the channel is failed or closed.
421      *
422      * Example of a failed operation without an exception and channel being closed
423      * is [Channel.trySend] attempt to a channel that is full.
424      */
425     public val isFailure: Boolean get() = holder is Failed
426 
427     /**
428      * Returns `true` if this instance represents unsuccessful operation
429      * to a closed or cancelled channel.
430      *
431      * In this case [isSuccess] returns `false`, [isFailure] returns `true`, but it does not imply
432      * that [exceptionOrNull] returns non-null value.
433      *
434      * It can happen if the channel was [closed][Channel.close] normally without an exception.
435      */
436     public val isClosed: Boolean get() = holder is Closed
437 
438     /**
439      * Returns the encapsulated value if this instance represents success or `null` if it represents failed result.
440      */
441     @Suppress("UNCHECKED_CAST")
442     public fun getOrNull(): T? = if (holder !is Failed) holder as T else null
443 
444     /**
445      *  Returns the encapsulated value if this instance represents success or throws an exception if it is closed or failed.
446      */
447     public fun getOrThrow(): T {
448         @Suppress("UNCHECKED_CAST")
449         if (holder !is Failed) return holder as T
450         if (holder is Closed && holder.cause != null) throw holder.cause
451         error("Trying to call 'getOrThrow' on a failed channel result: $holder")
452     }
453 
454     /**
455      * Returns the encapsulated exception if this instance represents failure or `null` if it is success
456      * or unsuccessful operation to closed channel.
457      */
458     public fun exceptionOrNull(): Throwable? = (holder as? Closed)?.cause
459 
460     internal open class Failed {
461         override fun toString(): String = "Failed"
462     }
463 
464     internal class Closed(@JvmField val cause: Throwable?): Failed() {
465         override fun equals(other: Any?): Boolean = other is Closed && cause == other.cause
466         override fun hashCode(): Int = cause.hashCode()
467         override fun toString(): String = "Closed($cause)"
468     }
469 
470     @InternalCoroutinesApi
471     public companion object {
472         private val failed = Failed()
473 
474         @InternalCoroutinesApi
475         public fun <E> success(value: E): ChannelResult<E> =
476             ChannelResult(value)
477 
478         @InternalCoroutinesApi
479         public fun <E> failure(): ChannelResult<E> =
480             ChannelResult(failed)
481 
482         @InternalCoroutinesApi
483         public fun <E> closed(cause: Throwable?): ChannelResult<E> =
484             ChannelResult(Closed(cause))
485     }
486 
487     public override fun toString(): String =
488         when (holder) {
489             is Closed -> holder.toString()
490             else -> "Value($holder)"
491         }
492 }
493 
494 /**
495  * Returns the encapsulated value if this instance represents [success][ChannelResult.isSuccess] or the
496  * result of [onFailure] function for the encapsulated [Throwable] exception if it is failed or closed
497  * result.
498  */
499 @OptIn(ExperimentalContracts::class)
getOrElsenull500 public inline fun <T> ChannelResult<T>.getOrElse(onFailure: (exception: Throwable?) -> T): T {
501     contract {
502         callsInPlace(onFailure, InvocationKind.AT_MOST_ONCE)
503     }
504     @Suppress("UNCHECKED_CAST")
505     return if (holder is ChannelResult.Failed) onFailure(exceptionOrNull()) else holder as T
506 }
507 
508 /**
509  * Performs the given [action] on the encapsulated value if this instance represents [success][ChannelResult.isSuccess].
510  * Returns the original `ChannelResult` unchanged.
511  */
512 @OptIn(ExperimentalContracts::class)
onSuccessnull513 public inline fun <T> ChannelResult<T>.onSuccess(action: (value: T) -> Unit): ChannelResult<T> {
514     contract {
515         callsInPlace(action, InvocationKind.AT_MOST_ONCE)
516     }
517     @Suppress("UNCHECKED_CAST")
518     if (holder !is ChannelResult.Failed) action(holder as T)
519     return this
520 }
521 
522 /**
523  * Performs the given [action] on the encapsulated [Throwable] exception if this instance represents [failure][ChannelResult.isFailure].
524  * The result of [ChannelResult.exceptionOrNull] is passed to the [action] parameter.
525  *
526  * Returns the original `ChannelResult` unchanged.
527  */
528 @OptIn(ExperimentalContracts::class)
onFailurenull529 public inline fun <T> ChannelResult<T>.onFailure(action: (exception: Throwable?) -> Unit): ChannelResult<T> {
530     contract {
531         callsInPlace(action, InvocationKind.AT_MOST_ONCE)
532     }
533     if (holder is ChannelResult.Failed) action(exceptionOrNull())
534     return this
535 }
536 
537 /**
538  * Performs the given [action] on the encapsulated [Throwable] exception if this instance represents [failure][ChannelResult.isFailure]
539  * due to channel being [closed][Channel.close].
540  * The result of [ChannelResult.exceptionOrNull] is passed to the [action] parameter.
541  * It is guaranteed that if action is invoked, then the channel is either [closed for send][Channel.isClosedForSend]
542  * or is [closed for receive][Channel.isClosedForReceive] depending on the failed operation.
543  *
544  * Returns the original `ChannelResult` unchanged.
545  */
546 @OptIn(ExperimentalContracts::class)
onClosednull547 public inline fun <T> ChannelResult<T>.onClosed(action: (exception: Throwable?) -> Unit): ChannelResult<T> {
548     contract {
549         callsInPlace(action, InvocationKind.AT_MOST_ONCE)
550     }
551     if (holder is ChannelResult.Closed) action(exceptionOrNull())
552     return this
553 }
554 
555 /**
556  * Iterator for [ReceiveChannel]. Instances of this interface are *not thread-safe* and shall not be used
557  * from concurrent coroutines.
558  */
559 public interface ChannelIterator<out E> {
560     /**
561      * Returns `true` if the channel has more elements, suspending the caller while this channel is empty,
562      * or returns `false` if the channel [is closed for `receive`][ReceiveChannel.isClosedForReceive] without a cause.
563      * It throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
564      *
565      * This function retrieves and removes an element from this channel for the subsequent invocation
566      * of [next].
567      *
568      * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
569      * function is suspended, this function immediately resumes with a [CancellationException].
570      * There is a **prompt cancellation guarantee**. If the job was cancelled while this function was
571      * suspended, it will not resume successfully. The `hasNext` call can retrieve the element from the channel,
572      * but then throw [CancellationException], thus failing to deliver the element.
573      * See "Undelivered elements" section in [Channel] documentation for details on handling undelivered elements.
574      *
575      * Note that this function does not check for cancellation when it is not suspended.
576      * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
577      */
hasNextnull578     public suspend operator fun hasNext(): Boolean
579 
580     @Deprecated(message = "Since 1.3.0, binary compatibility with versions <= 1.2.x", level = DeprecationLevel.HIDDEN)
581     @Suppress("INAPPLICABLE_JVM_NAME")
582     @JvmName("next")
583     public suspend fun next0(): E {
584         /*
585          * Before 1.3.0 the "next()" could have been used without invoking "hasNext" first and there were code samples
586          * demonstrating this behavior, so we preserve this logic for full binary backwards compatibility with previously
587          * compiled code.
588          */
589         if (!hasNext()) throw ClosedReceiveChannelException(DEFAULT_CLOSE_MESSAGE)
590         return next()
591     }
592 
593     /**
594      * Retrieves the element removed from the channel by a preceding call to [hasNext], or
595      * throws an [IllegalStateException] if [hasNext] was not invoked.
596      * This method should only be used in pair with [hasNext]:
597      * ```
598      * while (iterator.hasNext()) {
599      *     val element = iterator.next()
600      *     // ... handle element ...
601      * }
602      * ```
603      *
604      * This method throws a [ClosedReceiveChannelException] if the channel [is closed for `receive`][ReceiveChannel.isClosedForReceive] without a cause.
605      * It throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
606      */
nextnull607     public operator fun next(): E
608 }
609 
610 /**
611  * Channel is a non-blocking primitive for communication between a sender (via [SendChannel]) and a receiver (via [ReceiveChannel]).
612  * Conceptually, a channel is similar to Java's [BlockingQueue][java.util.concurrent.BlockingQueue],
613  * but it has suspending operations instead of blocking ones and can be [closed][SendChannel.close].
614  *
615  * ### Creating channels
616  *
617  * The `Channel(capacity)` factory function is used to create channels of different kinds depending on
618  * the value of the `capacity` integer:
619  *
620  * * When `capacity` is 0 &mdash; it creates a _rendezvous_ channel.
621  *   This channel does not have any buffer at all. An element is transferred from the sender
622  *   to the receiver only when [send] and [receive] invocations meet in time (rendezvous), so [send] suspends
623  *   until another coroutine invokes [receive], and [receive] suspends until another coroutine invokes [send].
624  *
625  * * When `capacity` is [Channel.UNLIMITED] &mdash; it creates a channel with effectively unlimited buffer.
626  *   This channel has a linked-list buffer of unlimited capacity (limited only by available memory).
627  *   [Sending][send] to this channel never suspends, and [trySend] always succeeds.
628  *
629  * * When `capacity` is [Channel.CONFLATED] &mdash; it creates a _conflated_ channel
630  *   This channel buffers at most one element and conflates all subsequent `send` and `trySend` invocations,
631  *   so that the receiver always gets the last element sent.
632  *   Back-to-back sent elements are conflated &mdash; only the last sent element is received,
633  *   while previously sent elements **are lost**.
634  *   [Sending][send] to this channel never suspends, and [trySend] always succeeds.
635  *
636  * * When `capacity` is positive but less than [UNLIMITED] &mdash; it creates an array-based channel with the specified capacity.
637  *   This channel has an array buffer of a fixed `capacity`.
638  *   [Sending][send] suspends only when the buffer is full, and [receiving][receive] suspends only when the buffer is empty.
639  *
640  * Buffered channels can be configured with an additional [`onBufferOverflow`][BufferOverflow] parameter. It controls the behaviour
641  * of the channel's [send][Channel.send] function on buffer overflow:
642  *
643  * * [SUSPEND][BufferOverflow.SUSPEND] &mdash; the default, suspend `send` on buffer overflow until there is
644  *   free space in the buffer.
645  * * [DROP_OLDEST][BufferOverflow.DROP_OLDEST] &mdash; do not suspend the `send`, add the latest value to the buffer,
646  *   drop the oldest one from the buffer.
647  *   A channel with `capacity = 1` and `onBufferOverflow = DROP_OLDEST` is a _conflated_ channel.
648  * * [DROP_LATEST][BufferOverflow.DROP_LATEST] &mdash; do not suspend the `send`, drop the value that is being sent,
649  *   keep the buffer contents intact.
650  *
651  * A non-default `onBufferOverflow` implicitly creates a channel with at least one buffered element and
652  * is ignored for a channel with unlimited buffer. It cannot be specified for `capacity = CONFLATED`, which
653  * is a shortcut by itself.
654  *
655  * ### Prompt cancellation guarantee
656  *
657  * All suspending functions with channels provide **prompt cancellation guarantee**.
658  * If the job was cancelled while send or receive function was suspended, it will not resume successfully,
659  * but throws a [CancellationException].
660  * With a single-threaded [dispatcher][CoroutineDispatcher] like [Dispatchers.Main] this gives a
661  * guarantee that if a piece code running in this thread cancels a [Job], then a coroutine running this job cannot
662  * resume successfully and continue to run, ensuring a prompt response to its cancellation.
663  *
664  * > **Prompt cancellation guarantee** for channel operations was added since `kotlinx.coroutines` version `1.4.0`
665  * > and had replaced a channel-specific atomic-cancellation that was not consistent with other suspending functions.
666  * > The low-level mechanics of prompt cancellation are explained in [suspendCancellableCoroutine] function.
667  *
668  * ### Undelivered elements
669  *
670  * As a result of a prompt cancellation guarantee, when a closeable resource
671  * (like open file or a handle to another native resource) is transferred via channel from one coroutine to another
672  * it can fail to be delivered and will be lost if either send or receive operations are cancelled in transit.
673  *
674  * A `Channel()` constructor function has an `onUndeliveredElement` optional parameter.
675  * When `onUndeliveredElement` parameter is set, the corresponding function is called once for each element
676  * that was sent to the channel with the call to the [send][SendChannel.send] function but failed to be delivered,
677  * which can happen in the following cases:
678  *
679  * * When [send][SendChannel.send] operation throws an exception because it was cancelled before it had a chance to actually
680  *   send the element or because the channel was [closed][SendChannel.close] or [cancelled][ReceiveChannel.cancel].
681  * * When [receive][ReceiveChannel.receive], [receiveOrNull][ReceiveChannel.receiveOrNull], or [hasNext][ChannelIterator.hasNext]
682  *   operation throws an exception when it had retrieved the element from the
683  *   channel but was cancelled before the code following the receive call resumed.
684  * * The channel was [cancelled][ReceiveChannel.cancel], in which case `onUndeliveredElement` is called on every
685  *   remaining element in the channel's buffer.
686  *
687  * Note, that `onUndeliveredElement` function is called synchronously in an arbitrary context. It should be fast, non-blocking,
688  * and should not throw exceptions. Any exception thrown by `onUndeliveredElement` is wrapped into an internal runtime
689  * exception which is either rethrown from the caller method or handed off to the exception handler in the current context
690  * (see [CoroutineExceptionHandler]) when one is available.
691  *
692  * A typical usage for `onUndeliveredElement` is to close a resource that is being transferred via the channel. The
693  * following code pattern guarantees that opened resources are closed even if producer, consumer, and/or channel
694  * are cancelled. Resources are never lost.
695  *
696  * ```
697  * // Create the channel with onUndeliveredElement block that closes a resource
698  * val channel = Channel<Resource>(capacity) { resource -> resource.close() }
699  *
700  * // Producer code
701  * val resourceToSend = openResource()
702  * channel.send(resourceToSend)
703  *
704  * // Consumer code
705  * val resourceReceived = channel.receive()
706  * try {
707  *     // work with received resource
708  * } finally {
709  *     resourceReceived.close()
710  * }
711  * ```
712  *
713  * > Note, that if you do any kind of work in between `openResource()` and `channel.send(...)`, then you should
714  * > ensure that resource gets closed in case this additional code fails.
715  */
716 public interface Channel<E> : SendChannel<E>, ReceiveChannel<E> {
717     /**
718      * Constants for the channel factory function `Channel()`.
719      */
720     public companion object Factory {
721         /**
722          * Requests a channel with an unlimited capacity buffer in the `Channel(...)` factory function.
723          */
724         public const val UNLIMITED: Int = Int.MAX_VALUE
725 
726         /**
727          * Requests a rendezvous channel in the `Channel(...)` factory function &mdash; a channel that does not have a buffer.
728          */
729         public const val RENDEZVOUS: Int = 0
730 
731         /**
732          * Requests a conflated channel in the `Channel(...)` factory function. This is a shortcut to creating
733          * a channel with [`onBufferOverflow = DROP_OLDEST`][BufferOverflow.DROP_OLDEST].
734          */
735         public const val CONFLATED: Int = -1
736 
737         /**
738          * Requests a buffered channel with the default buffer capacity in the `Channel(...)` factory function.
739          * The default capacity for a channel that [suspends][BufferOverflow.SUSPEND] on overflow
740          * is 64 and can be overridden by setting [DEFAULT_BUFFER_PROPERTY_NAME] on JVM.
741          * For non-suspending channels, a buffer of capacity 1 is used.
742          */
743         public const val BUFFERED: Int = -2
744 
745         // only for internal use, cannot be used with Channel(...)
746         internal const val OPTIONAL_CHANNEL = -3
747 
748         /**
749          * Name of the property that defines the default channel capacity when
750          * [BUFFERED] is used as parameter in `Channel(...)` factory function.
751          */
752         public const val DEFAULT_BUFFER_PROPERTY_NAME: String = "kotlinx.coroutines.channels.defaultBuffer"
753 
754         internal val CHANNEL_DEFAULT_CAPACITY = systemProp(DEFAULT_BUFFER_PROPERTY_NAME,
755             64, 1, UNLIMITED - 1
756         )
757     }
758 }
759 
760 /**
761  * Creates a channel with the specified buffer capacity (or without a buffer by default).
762  * See [Channel] interface documentation for details.
763  *
764  * @param capacity either a positive channel capacity or one of the constants defined in [Channel.Factory].
765  * @param onBufferOverflow configures an action on buffer overflow (optional, defaults to
766  *   a [suspending][BufferOverflow.SUSPEND] attempt to [send][Channel.send] a value,
767  *   supported only when `capacity >= 0` or `capacity == Channel.BUFFERED`,
768  *   implicitly creates a channel with at least one buffered element).
769  * @param onUndeliveredElement an optional function that is called when element was sent but was not delivered to the consumer.
770  *   See "Undelivered elements" section in [Channel] documentation.
771  * @throws IllegalArgumentException when [capacity] < -2
772  */
Channelnull773 public fun <E> Channel(
774     capacity: Int = RENDEZVOUS,
775     onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
776     onUndeliveredElement: ((E) -> Unit)? = null
777 ): Channel<E> =
778     when (capacity) {
779         RENDEZVOUS -> {
780             if (onBufferOverflow == BufferOverflow.SUSPEND)
781                 BufferedChannel(RENDEZVOUS, onUndeliveredElement) // an efficient implementation of rendezvous channel
782             else
783                 ConflatedBufferedChannel(1, onBufferOverflow, onUndeliveredElement) // support buffer overflow with buffered channel
784         }
785         CONFLATED -> {
786             require(onBufferOverflow == BufferOverflow.SUSPEND) {
787                 "CONFLATED capacity cannot be used with non-default onBufferOverflow"
788             }
789             ConflatedBufferedChannel(1, BufferOverflow.DROP_OLDEST, onUndeliveredElement)
790         }
791         UNLIMITED -> BufferedChannel(UNLIMITED, onUndeliveredElement) // ignores onBufferOverflow: it has buffer, but it never overflows
792         BUFFERED -> { // uses default capacity with SUSPEND
793             if (onBufferOverflow == BufferOverflow.SUSPEND) BufferedChannel(CHANNEL_DEFAULT_CAPACITY, onUndeliveredElement)
794             else ConflatedBufferedChannel(1, onBufferOverflow, onUndeliveredElement)
795         }
796         else -> {
797             if (onBufferOverflow === BufferOverflow.SUSPEND) BufferedChannel(capacity, onUndeliveredElement)
798             else ConflatedBufferedChannel(capacity, onBufferOverflow, onUndeliveredElement)
799         }
800     }
801 
802 @Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.4.0, binary compatibility with earlier versions")
Channelnull803 public fun <E> Channel(capacity: Int = RENDEZVOUS): Channel<E> = Channel(capacity)
804 
805 /**
806  * Indicates an attempt to [send][SendChannel.send] to a [isClosedForSend][SendChannel.isClosedForSend] channel
807  * that was closed without a cause. A _failed_ channel rethrows the original [close][SendChannel.close] cause
808  * exception on send attempts.
809  *
810  * This exception is a subclass of [IllegalStateException], because, conceptually, it is the sender's responsibility
811  * to close the channel and not try to send anything thereafter. Attempts to
812  * send to a closed channel indicate a logical error in the sender's code.
813  */
814 public class ClosedSendChannelException(message: String?) : IllegalStateException(message)
815 
816 /**
817  * Indicates an attempt to [receive][ReceiveChannel.receive] from a [isClosedForReceive][ReceiveChannel.isClosedForReceive]
818  * channel that was closed without a cause. A _failed_ channel rethrows the original [close][SendChannel.close] cause
819  * exception on receive attempts.
820  *
821  * This exception is a subclass of [NoSuchElementException] to be consistent with plain collections.
822  */
823 public class ClosedReceiveChannelException(message: String?) : NoSuchElementException(message)
824