1 @file:Suppress("FunctionName")
2
3 package kotlinx.coroutines.channels
4
5 import kotlinx.coroutines.*
6 import kotlinx.coroutines.channels.Channel.Factory.BUFFERED
7 import kotlinx.coroutines.channels.Channel.Factory.CHANNEL_DEFAULT_CAPACITY
8 import kotlinx.coroutines.channels.Channel.Factory.CONFLATED
9 import kotlinx.coroutines.channels.Channel.Factory.RENDEZVOUS
10 import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
11 import kotlinx.coroutines.internal.*
12 import kotlinx.coroutines.selects.*
13 import kotlin.contracts.*
14 import kotlin.internal.*
15 import kotlin.jvm.*
16
17 /**
18 * Sender's interface to a [Channel].
19 *
20 * Combined, [SendChannel] and [ReceiveChannel] define the complete [Channel] interface.
21 *
22 * It is not expected that this interface will be implemented directly.
23 * Instead, the existing [Channel] implementations can be used or delegated to.
24 */
25 public interface SendChannel<in E> {
26 /**
27 * Returns `true` if this channel was closed by an invocation of [close] or its receiving side was [cancelled][ReceiveChannel.cancel].
28 * This means that calling [send] will result in an exception.
29 *
30 * Note that if this property returns `false`, it does not guarantee that a subsequent call to [send] will succeed,
31 * as the channel can be concurrently closed right after the check.
32 * For such scenarios, [trySend] is the more robust solution: it attempts to send the element and returns
33 * a result that says whether the channel was closed, and if not, whether sending a value was successful.
34 *
35 * ```
36 * // DANGER! THIS CHECK IS NOT RELIABLE!
37 * if (!channel.isClosedForSend) {
38 * channel.send(element) // can still fail!
39 * } else {
40 * println("Can not send: the channel is closed")
41 * }
42 * // DO THIS INSTEAD:
43 * channel.trySend(element).onClosed {
44 * println("Can not send: the channel is closed")
45 * }
46 * ```
47 *
48 * The primary intended usage of this property is skipping some portions of code that should not be executed if the
49 * channel is already known to be closed.
50 * For example:
51 *
52 * ```
53 * if (channel.isClosedForSend) {
54 * // fast path
55 * return
56 * } else {
57 * // slow path: actually computing the value
58 * val nextElement = run {
59 * // some heavy computation
60 * }
61 * channel.send(nextElement) // can fail anyway,
62 * // but at least we tried to avoid the computation
63 * }
64 * ```
65 *
66 * However, in many cases, even that can be achieved more idiomatically by cancelling the coroutine producing the
67 * elements to send.
68 * See [produce] for a way to launch a coroutine that produces elements and cancels itself when the channel is
69 * closed.
70 *
71 * [isClosedForSend] can also be used for assertions and diagnostics to verify the expected state of the channel.
72 *
73 * @see SendChannel.trySend
74 * @see SendChannel.close
75 * @see ReceiveChannel.cancel
76 */
77 @DelicateCoroutinesApi
78 public val isClosedForSend: Boolean
79
80 /**
81 * Sends the specified [element] to this channel.
82 *
83 * This function suspends if it does not manage to pass the element to the channel's buffer
84 * (or directly the receiving side if there's no buffer),
85 * and it can be cancelled with or without having successfully passed the element.
86 * See the "Suspending and cancellation" section below for details.
87 * If the channel is [closed][close], an exception is thrown (see below).
88 *
89 * ```
90 * val channel = Channel<Int>()
91 * launch {
92 * check(channel.receive() == 5)
93 * }
94 * channel.send(5) // suspends until 5 is received
95 * ```
96 *
97 * ## Suspending and cancellation
98 *
99 * If the [BufferOverflow] strategy of this channel is [BufferOverflow.SUSPEND],
100 * this function may suspend.
101 * The exact scenarios differ depending on the channel's capacity:
102 * - If the channel is [rendezvous][RENDEZVOUS],
103 * the sender will be suspended until the receiver calls [ReceiveChannel.receive].
104 * - If the channel is [unlimited][UNLIMITED] or [conflated][CONFLATED],
105 * the sender will never be suspended even with the [BufferOverflow.SUSPEND] strategy.
106 * - If the channel is buffered (either [BUFFERED] or uses a non-default buffer capacity),
107 * the sender will be suspended until the buffer has free space.
108 *
109 * This suspending function is cancellable: if the [Job] of the current coroutine is cancelled while this
110 * suspending function is waiting, this function immediately resumes with [CancellationException].
111 * There is a **prompt cancellation guarantee**: even if [send] managed to send the element, but was cancelled
112 * while suspended, [CancellationException] will be thrown. See [suspendCancellableCoroutine] for low-level details.
113 *
114 * Because of the prompt cancellation guarantee, an exception does not always mean a failure to deliver the element.
115 * See the "Undelivered elements" section in the [Channel] documentation
116 * for details on handling undelivered elements.
117 *
118 * Note that this function does not check for cancellation when it is not suspended.
119 * Use [ensureActive] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed:
120 *
121 * ```
122 * // because of UNLIMITED, sending to this channel never suspends
123 * val channel = Channel<Int>(Channel.UNLIMITED)
124 * val job = launch {
125 * while (isActive) {
126 * channel.send(42)
127 * }
128 * // the loop exits when the job is cancelled
129 * }
130 * ```
131 *
132 * This isn't needed if other cancellable functions are called inside the loop, like [delay].
133 *
134 * ## Sending to a closed channel
135 *
136 * If a channel was [closed][close] before [send] was called and no cause was specified,
137 * an [ClosedSendChannelException] will be thrown from [send].
138 * If a channel was [closed][close] with a cause before [send] was called,
139 * then [send] will rethrow the same (in the `===` sense) exception that was passed to [close].
140 *
141 * In both cases, it is guaranteed that the element was not delivered to the consumer,
142 * and the `onUndeliveredElement` callback will be called.
143 * See the "Undelivered elements" section in the [Channel] documentation
144 * for details on handling undelivered elements.
145 *
146 * [Closing][close] a channel _after_ this function suspends does not cause this suspended [send] invocation
147 * to abort: although subsequent invocations of [send] fail, the existing ones will continue to completion,
148 * unless the sending coroutine is cancelled.
149 *
150 * ## Related
151 *
152 * This function can be used in [select] invocations with the [onSend] clause.
153 * Use [trySend] to try sending to this channel without waiting and throwing.
154 */
sendnull155 public suspend fun send(element: E)
156
157 /**
158 * Clause for the [select] expression of the [send] suspending function that selects when the element that is
159 * specified as the parameter is sent to the channel.
160 * When the clause is selected, the reference to this channel is passed into the corresponding block.
161 *
162 * The [select] invocation fails with an exception if the channel [is closed for `send`][isClosedForSend] before
163 * the [select] suspends (see the "Sending to a closed channel" section of [send]).
164 *
165 * Example:
166 * ```
167 * val sendChannels = List(4) { index ->
168 * Channel<Int>(onUndeliveredElement = {
169 * println("Undelivered element $it for $index")
170 * }).also { channel ->
171 * // launch a consumer for this channel
172 * launch {
173 * withTimeout(1.seconds) {
174 * println("Consumer $index receives: ${channel.receive()}")
175 * }
176 * }
177 * }
178 * }
179 * val element = 42
180 * select {
181 * for (channel in sendChannels) {
182 * channel.onSend(element) {
183 * println("Sent to channel $it")
184 * }
185 * }
186 * }
187 * ```
188 * Here, we start a [select] expression that waits for exactly one of the four [onSend] invocations
189 * to successfully send the element to the receiver,
190 * and the other three will instead invoke the `onUndeliveredElement` callback.
191 * See the "Undelivered elements" section in the [Channel] documentation
192 * for details on handling undelivered elements.
193 *
194 * Like [send], [onSend] obeys the rules of prompt cancellation:
195 * [select] may finish with a [CancellationException] even if the element was successfully sent.
196 */
197 public val onSend: SelectClause2<E, SendChannel<E>>
198
199 /**
200 * Attempts to add the specified [element] to this channel without waiting.
201 *
202 * [trySend] never suspends and never throws exceptions.
203 * Instead, it returns a [ChannelResult] that encapsulates the result of the operation.
204 * This makes it different from [send], which can suspend and throw exceptions.
205 *
206 * If this channel is currently full and cannot receive new elements at the time or is [closed][close],
207 * this function returns a result that indicates [a failure][ChannelResult.isFailure].
208 * In this case, it is guaranteed that the element was not delivered to the consumer and the
209 * `onUndeliveredElement` callback, if one is provided during the [Channel]'s construction, does *not* get called.
210 *
211 * [trySend] can be used as a non-`suspend` alternative to [send] in cases where it's known beforehand
212 * that the channel's buffer can not overflow.
213 * ```
214 * class Coordinates(val x: Int, val y: Int)
215 * // A channel for a single subscriber that stores the latest mouse position update.
216 * // If more than one subscriber is expected, consider using a `StateFlow` instead.
217 * val mousePositionUpdates = Channel<Coordinates>(Channel.CONFLATED)
218 * // Notifies the subscriber about the new mouse position.
219 * // If the subscriber is slow, the intermediate updates are dropped.
220 * fun moveMouse(coordinates: Coordinates) {
221 * val result = mousePositionUpdates.trySend(coordinates)
222 * if (result.isClosed) {
223 * error("Mouse position is no longer being processed")
224 * }
225 * }
226 * ```
227 */
228 public fun trySend(element: E): ChannelResult<Unit>
229
230 /**
231 * Closes this channel so that subsequent attempts to [send] to it fail.
232 *
233 * Returns `true` if the channel was not closed previously and the call to this function closed it.
234 * If the channel was already closed, this function does nothing and returns `false`.
235 *
236 * The existing elements in the channel remain there, and likewise,
237 * the calls to [send] an [onSend] that have suspended before [close] was called will not be affected.
238 * Only the subsequent calls to [send], [trySend], or [onSend] will fail.
239 * [isClosedForSend] will start returning `true` immediately after this function is called.
240 *
241 * Once all the existing elements are received, the channel will be considered closed for `receive` as well.
242 * This means that [receive][ReceiveChannel.receive] will also start throwing exceptions.
243 * At that point, [isClosedForReceive][ReceiveChannel.isClosedForReceive] will start returning `true`.
244 *
245 * If the [cause] is non-null, it will be thrown from all the subsequent attempts to [send] to this channel,
246 * as well as from all the attempts to [receive][ReceiveChannel.receive] from the channel after no elements remain.
247 *
248 * If the [cause] is null, the channel is considered to have completed normally.
249 * All subsequent calls to [send] will throw a [ClosedSendChannelException],
250 * whereas calling [receive][ReceiveChannel.receive] will throw a [ClosedReceiveChannelException]
251 * after there are no more elements.
252 *
253 * ```
254 * val channel = Channel<Int>()
255 * channel.send(1)
256 * channel.close()
257 * try {
258 * channel.send(2)
259 * error("The channel is closed, so this line is never reached")
260 * } catch (e: ClosedSendChannelException) {
261 * // expected
262 * }
263 * ```
264 */
265 public fun close(cause: Throwable? = null): Boolean
266
267 /**
268 * Registers a [handler] that is synchronously invoked once the channel is [closed][close]
269 * or the receiving side of this channel is [cancelled][ReceiveChannel.cancel].
270 * Only one handler can be attached to a channel during its lifetime.
271 * The `handler` is invoked when [isClosedForSend] starts to return `true`.
272 * If the channel is closed already, the handler is invoked immediately.
273 *
274 * The meaning of `cause` that is passed to the handler:
275 * - `null` if the channel was [closed][close] normally with `cause = null`.
276 * - Instance of [CancellationException] if the channel was [cancelled][ReceiveChannel.cancel] normally
277 * without the corresponding argument.
278 * - The cause of `close` or `cancel` otherwise.
279 *
280 * ### Execution context and exception safety
281 *
282 * The [handler] is executed as part of the closing or cancelling operation,
283 * and only after the channel reaches its final state.
284 * This means that if the handler throws an exception or hangs,
285 * the channel will still be successfully closed or cancelled.
286 * Unhandled exceptions from [handler] are propagated to the closing or cancelling operation's caller.
287 *
288 * Example of usage:
289 * ```
290 * val events = Channel<Event>(Channel.UNLIMITED)
291 * callbackBasedApi.registerCallback { event ->
292 * events.trySend(event)
293 * .onClosed { /* channel is already closed, but the callback hasn't stopped yet */ }
294 * }
295 *
296 * val uiUpdater = uiScope.launch(Dispatchers.Main) {
297 * events.consume { /* handle events */ }
298 * }
299 * // Stop the callback after the channel is closed or cancelled
300 * events.invokeOnClose { callbackBasedApi.stop() }
301 * ```
302 *
303 * **Stability note.** This function constitutes a stable API surface, with the only exception being
304 * that an [IllegalStateException] is thrown when multiple handlers are registered.
305 * This restriction could be lifted in the future.
306 *
307 * @throws UnsupportedOperationException if the underlying channel does not support [invokeOnClose].
308 * Implementation note: currently, [invokeOnClose] is unsupported only by Rx-like integrations.
309 *
310 * @throws IllegalStateException if another handler was already registered
311 */
312 public fun invokeOnClose(handler: (cause: Throwable?) -> Unit)
313
314 /**
315 * **Deprecated** offer method.
316 *
317 * This method was deprecated in the favour of [trySend].
318 * It has proven itself as the most error-prone method in Channel API:
319 *
320 * - `Boolean` return type creates the false sense of security, implying that `false`
321 * is returned instead of throwing an exception.
322 * - It was used mostly from non-suspending APIs where CancellationException triggered
323 * internal failures in the application (the most common source of bugs).
324 * - Due to signature and explicit `if (ch.offer(...))` checks it was easy to
325 * oversee such error during code review.
326 * - Its name was not aligned with the rest of the API and tried to mimic Java's queue instead.
327 *
328 * **NB** Automatic migration provides best-effort for the user experience, but requires removal
329 * or adjusting of the code that relied on the exception handling.
330 * The complete replacement has a more verbose form:
331 * ```
332 * channel.trySend(element)
333 * .onClosed { throw it ?: ClosedSendChannelException("Channel was closed normally") }
334 * .isSuccess
335 * ```
336 *
337 * See https://github.com/Kotlin/kotlinx.coroutines/issues/974 for more context.
338 *
339 * @suppress **Deprecated**.
340 */
341 @Deprecated(
342 level = DeprecationLevel.ERROR,
343 message = "Deprecated in the favour of 'trySend' method",
344 replaceWith = ReplaceWith("trySend(element).isSuccess")
345 ) // Warning since 1.5.0, error since 1.6.0, not hidden until 1.8+ because API is quite widespread
346 public fun offer(element: E): Boolean {
347 val result = trySend(element)
348 if (result.isSuccess) return true
349 throw recoverStackTrace(result.exceptionOrNull() ?: return false)
350 }
351 }
352
353 /**
354 * Receiver's interface to a [Channel].
355 *
356 * Combined, [SendChannel] and [ReceiveChannel] define the complete [Channel] interface.
357 */
358 public interface ReceiveChannel<out E> {
359 /**
360 * Returns `true` if the sending side of this channel was [closed][SendChannel.close]
361 * and all previously sent items were already received (which also happens for [cancelled][cancel] channels).
362 *
363 * Note that if this property returns `false`,
364 * it does not guarantee that a subsequent call to [receive] will succeed,
365 * as the channel can be concurrently cancelled or closed right after the check.
366 * For such scenarios, [receiveCatching] is the more robust solution:
367 * if the channel is closed, instead of throwing an exception, [receiveCatching] returns a result that allows
368 * querying it.
369 *
370 * ```
371 * // DANGER! THIS CHECK IS NOT RELIABLE!
372 * if (!channel.isClosedForReceive) {
373 * channel.receive() // can still fail!
374 * } else {
375 * println("Can not receive: the channel is closed")
376 * null
377 * }
378 * // DO THIS INSTEAD:
379 * channel.receiveCatching().onClosed {
380 * println("Can not receive: the channel is closed")
381 * }.getOrNull()
382 * ```
383 *
384 * The primary intended usage of this property is for assertions and diagnostics to verify the expected state of
385 * the channel.
386 * Using it in production code is discouraged.
387 *
388 * @see ReceiveChannel.receiveCatching
389 * @see ReceiveChannel.cancel
390 * @see SendChannel.close
391 */
392 @DelicateCoroutinesApi
393 public val isClosedForReceive: Boolean
394
395 /**
396 * Returns `true` if the channel contains no elements and isn't [closed for `receive`][isClosedForReceive].
397 *
398 * If [isEmpty] returns `true`, it means that calling [receive] at exactly the same moment would suspend.
399 * However, calling [receive] immediately after checking [isEmpty] may or may not suspend, as new elements
400 * could have been added or removed or the channel could have been closed for `receive` between the two invocations.
401 * Consider using [tryReceive] in cases when suspensions are undesirable:
402 *
403 * ```
404 * // DANGER! THIS CHECK IS NOT RELIABLE!
405 * while (!channel.isEmpty) {
406 * // can still suspend if other `receive` happens in parallel!
407 * val element = channel.receive()
408 * println(element)
409 * }
410 * // DO THIS INSTEAD:
411 * while (true) {
412 * val element = channel.tryReceive().getOrNull() ?: break
413 * println(element)
414 * }
415 * ```
416 */
417 @ExperimentalCoroutinesApi
418 public val isEmpty: Boolean
419
420 /**
421 * Retrieves an element, removing it from the channel.
422 *
423 * This function suspends if the channel is empty, waiting until an element is available.
424 * If the channel is [closed for `receive`][isClosedForReceive], an exception is thrown (see below).
425 * ```
426 * val channel = Channel<Int>()
427 * launch {
428 * val element = channel.receive() // suspends until 5 is available
429 * check(element == 5)
430 * }
431 * channel.send(5)
432 * ```
433 *
434 * ## Suspending and cancellation
435 *
436 * This suspending function is cancellable: if the [Job] of the current coroutine is cancelled while this
437 * suspending function is waiting, this function immediately resumes with [CancellationException].
438 * There is a **prompt cancellation guarantee**: even if [receive] managed to retrieve the element from the channel,
439 * but was cancelled while suspended, [CancellationException] will be thrown, and, if
440 * the channel has an `onUndeliveredElement` callback installed, the retrieved element will be passed to it.
441 * See the "Undelivered elements" section in the [Channel] documentation
442 * for details on handling undelivered elements.
443 * See [suspendCancellableCoroutine] for the low-level details of prompt cancellation.
444 *
445 * Note that this function does not check for cancellation when it manages to immediately receive an element without
446 * suspending.
447 * Use [ensureActive] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed:
448 *
449 * ```
450 * val channel = Channel<Int>()
451 * launch { // a very fast producer
452 * while (true) {
453 * channel.send(42)
454 * }
455 * }
456 * val consumer = launch { // a slow consumer
457 * while (isActive) {
458 * val element = channel.receive()
459 * // some slow computation involving `element`
460 * }
461 * }
462 * delay(100.milliseconds)
463 * consumer.cancelAndJoin()
464 * ```
465 *
466 * ## Receiving from a closed channel
467 *
468 * - Attempting to [receive] from a [closed][SendChannel.close] channel while there are still some elements
469 * will successfully retrieve an element from the channel.
470 * - When a channel is [closed][SendChannel.close] and there are no elements remaining,
471 * the channel becomes [closed for `receive`][isClosedForReceive].
472 * After that,
473 * [receive] will rethrow the same (in the `===` sense) exception that was passed to [SendChannel.close],
474 * or [ClosedReceiveChannelException] if none was given.
475 *
476 * ## Related
477 *
478 * This function can be used in [select] invocations with the [onReceive] clause.
479 * Use [tryReceive] to try receiving from this channel without waiting and throwing.
480 * Use [receiveCatching] to receive from this channel without throwing.
481 */
receivenull482 public suspend fun receive(): E
483
484 /**
485 * Clause for the [select] expression of the [receive] suspending function that selects with the element
486 * received from the channel.
487 *
488 * The [select] invocation fails with an exception if the channel [is closed for `receive`][isClosedForReceive]
489 * at any point, even if other [select] clauses could still work.
490 *
491 * Example:
492 * ```
493 * class ScreenSize(val width: Int, val height: Int)
494 * class MouseClick(val x: Int, val y: Int)
495 * val screenResizes = Channel<ScreenSize>(Channel.CONFLATED)
496 * val mouseClicks = Channel<MouseClick>(Channel.CONFLATED)
497 *
498 * launch(Dispatchers.Main) {
499 * while (true) {
500 * select {
501 * screenResizes.onReceive { newSize ->
502 * // update the UI to the new screen size
503 * }
504 * mouseClicks.onReceive { click ->
505 * // react to a mouse click
506 * }
507 * }
508 * }
509 * }
510 * ```
511 *
512 * Like [receive], [onReceive] obeys the rules of prompt cancellation:
513 * [select] may finish with a [CancellationException] even if an element was successfully retrieved,
514 * in which case the `onUndeliveredElement` callback will be called.
515 */
516 public val onReceive: SelectClause1<E>
517
518 /**
519 * Retrieves an element, removing it from the channel.
520 *
521 * A difference from [receive] is that this function encapsulates a failure in its return value instead of throwing
522 * an exception.
523 * However, it will still throw [CancellationException] if the coroutine calling [receiveCatching] is cancelled.
524 *
525 * It is guaranteed that the only way this function can return a [failed][ChannelResult.isFailure] result is when
526 * the channel is [closed for `receive`][isClosedForReceive], so [ChannelResult.isClosed] is also true.
527 *
528 * This function suspends if the channel is empty, waiting until an element is available or the channel becomes
529 * closed.
530 * ```
531 * val channel = Channel<Int>()
532 * launch {
533 * while (true) {
534 * val result = channel.receiveCatching() // suspends
535 * when (val element = result.getOrNull()) {
536 * null -> break // the channel is closed
537 * else -> check(element == 5)
538 * }
539 * }
540 * }
541 * channel.send(5)
542 * ```
543 *
544 * ## Suspending and cancellation
545 *
546 * This suspending function is cancellable: if the [Job] of the current coroutine is cancelled while this
547 * suspending function is waiting, this function immediately resumes with [CancellationException].
548 * There is a **prompt cancellation guarantee**: even if [receiveCatching] managed to retrieve the element from the
549 * channel, but was cancelled while suspended, [CancellationException] will be thrown, and, if
550 * the channel has an `onUndeliveredElement` callback installed, the retrieved element will be passed to it.
551 * See the "Undelivered elements" section in the [Channel] documentation
552 * for details on handling undelivered elements.
553 * See [suspendCancellableCoroutine] for the low-level details of prompt cancellation.
554 *
555 * Note that this function does not check for cancellation when it manages to immediately receive an element without
556 * suspending.
557 * Use [ensureActive] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed:
558 *
559 * ```
560 * val channel = Channel<Int>()
561 * launch { // a very fast producer
562 * while (true) {
563 * channel.send(42)
564 * }
565 * }
566 * val consumer = launch { // a slow consumer
567 * while (isActive) {
568 * val element = channel.receiveCatching().getOrNull() ?: break
569 * // some slow computation involving `element`
570 * }
571 * }
572 * delay(100.milliseconds)
573 * consumer.cancelAndJoin()
574 * ```
575 *
576 * ## Receiving from a closed channel
577 *
578 * - Attempting to [receiveCatching] from a [closed][SendChannel.close] channel while there are still some elements
579 * will successfully retrieve an element from the channel.
580 * - When a channel is [closed][SendChannel.close] and there are no elements remaining,
581 * the channel becomes [closed for `receive`][isClosedForReceive].
582 * After that, [receiveCatching] will return a result with [ChannelResult.isClosed] set.
583 * [ChannelResult.exceptionOrNull] will be the exact (in the `===` sense) exception
584 * that was passed to [SendChannel.close],
585 * or `null` if none was given.
586 *
587 * ## Related
588 *
589 * This function can be used in [select] invocations with the [onReceiveCatching] clause.
590 * Use [tryReceive] to try receiving from this channel without waiting and throwing.
591 * Use [receive] to receive from this channel and throw exceptions on error.
592 */
593 public suspend fun receiveCatching(): ChannelResult<E>
594
595 /**
596 * Clause for the [select] expression of the [receiveCatching] suspending function that selects
597 * with a [ChannelResult] when an element is retrieved or the channel gets closed.
598 *
599 * Like [receiveCatching], [onReceiveCatching] obeys the rules of prompt cancellation:
600 * [select] may finish with a [CancellationException] even if an element was successfully retrieved,
601 * in which case the `onUndeliveredElement` callback will be called.
602 */
603 // TODO: think of an example of when this could be useful
604 public val onReceiveCatching: SelectClause1<ChannelResult<E>>
605
606 /**
607 * Attempts to retrieve an element without waiting, removing it from the channel.
608 *
609 * - When the channel is non-empty, a [successful][ChannelResult.isSuccess] result is returned,
610 * and [ChannelResult.getOrNull] returns the retrieved element.
611 * - When the channel is empty, a [failed][ChannelResult.isFailure] result is returned.
612 * - When the channel is already [closed for `receive`][isClosedForReceive],
613 * returns the ["channel is closed"][ChannelResult.isClosed] result.
614 * If the channel was [closed][SendChannel.close] with a cause (for example, [cancelled][cancel]),
615 * [ChannelResult.exceptionOrNull] contains the cause.
616 *
617 * This function is useful when implementing on-demand allocation of resources to be stored in the channel:
618 *
619 * ```
620 * val resourcePool = Channel<Resource>(maxResources)
621 *
622 * suspend fun withResource(block: (Resource) -> Unit) {
623 * val result = resourcePool.tryReceive()
624 * val resource = result.getOrNull()
625 * ?: tryCreateNewResource() // try to create a new resource
626 * ?: resourcePool.receive() // could not create: actually wait for the resource
627 * try {
628 * block(resource)
629 * } finally {
630 * resourcePool.trySend(resource)
631 * }
632 * }
633 * ```
634 */
635 public fun tryReceive(): ChannelResult<E>
636
637 /**
638 * Returns a new iterator to receive elements from this channel using a `for` loop.
639 * Iteration completes normally when the channel [is closed for `receive`][isClosedForReceive] without a cause and
640 * throws the exception passed to [close][SendChannel.close] if there was one.
641 *
642 * Instances of [ChannelIterator] are not thread-safe and shall not be used from concurrent coroutines.
643 *
644 * Example:
645 *
646 * ```
647 * val channel = produce<Int> {
648 * repeat(1000) {
649 * send(it)
650 * }
651 * }
652 * for (v in channel) {
653 * println(v)
654 * }
655 * ```
656 *
657 * Note that if an early return happens from the `for` loop, the channel does not get cancelled.
658 * To forbid sending new elements after the iteration is completed, use [consumeEach] or
659 * call [cancel] manually.
660 */
661 public operator fun iterator(): ChannelIterator<E>
662
663 /**
664 * [Closes][SendChannel.close] the channel for new elements and removes all existing ones.
665 *
666 * A [cause] can be used to specify an error message or to provide other details on
667 * the cancellation reason for debugging purposes.
668 * If the cause is not specified, then an instance of [CancellationException] with a
669 * default message is created to [close][SendChannel.close] the channel.
670 *
671 * If the channel was already [closed][SendChannel.close],
672 * [cancel] only has the effect of removing all elements from the channel.
673 *
674 * Immediately after the invocation of this function,
675 * [isClosedForReceive] and, on the [SendChannel] side, [isClosedForSend][SendChannel.isClosedForSend]
676 * start returning `true`.
677 * Any attempt to send to or receive from this channel will lead to a [CancellationException].
678 * This also applies to the existing senders and receivers that are suspended at the time of the call:
679 * they will be resumed with a [CancellationException] immediately after [cancel] is called.
680 *
681 * If the channel has an `onUndeliveredElement` callback installed, this function will invoke it for each of the
682 * elements still in the channel, since these elements will be inaccessible otherwise.
683 * If the callback is not installed, these elements will simply be removed from the channel for garbage collection.
684 *
685 * ```
686 * val channel = Channel<Int>()
687 * channel.send(1)
688 * channel.send(2)
689 * channel.cancel()
690 * channel.trySend(3) // returns ChannelResult.isClosed
691 * for (element in channel) { println(element) } // prints nothing
692 * ```
693 *
694 * [consume] and [consumeEach] are convenient shorthands for cancelling the channel after the single consumer
695 * has finished processing.
696 */
697 public fun cancel(cause: CancellationException? = null)
698
699 /**
700 * @suppress This method implements old version of JVM ABI. Use [cancel].
701 */
702 @Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.2.0, binary compatibility with versions <= 1.1.x")
703 public fun cancel(): Unit = cancel(null)
704
705 /**
706 * @suppress This method has bad semantics when cause is not a [CancellationException]. Use [cancel].
707 */
708 @Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.2.0, binary compatibility with versions <= 1.1.x")
709 public fun cancel(cause: Throwable? = null): Boolean
710
711 /**
712 * **Deprecated** poll method.
713 *
714 * This method was deprecated in the favour of [tryReceive].
715 * It has proven itself as error-prone method in Channel API:
716 *
717 * - Nullable return type creates the false sense of security, implying that `null`
718 * is returned instead of throwing an exception.
719 * - It was used mostly from non-suspending APIs where CancellationException triggered
720 * internal failures in the application (the most common source of bugs).
721 * - Its name was not aligned with the rest of the API and tried to mimic Java's queue instead.
722 *
723 * See https://github.com/Kotlin/kotlinx.coroutines/issues/974 for more context.
724 *
725 * ### Replacement note
726 *
727 * The replacement `tryReceive().getOrNull()` is a default that ignores all close exceptions and
728 * proceeds with `null`, while `poll` throws an exception if the channel was closed with an exception.
729 * Replacement with the very same 'poll' semantics is `tryReceive().onClosed { if (it != null) throw it }.getOrNull()`
730 *
731 * @suppress **Deprecated**.
732 */
733 @Deprecated(
734 level = DeprecationLevel.ERROR,
735 message = "Deprecated in the favour of 'tryReceive'. " +
736 "Please note that the provided replacement does not rethrow channel's close cause as 'poll' did, " +
737 "for the precise replacement please refer to the 'poll' documentation",
738 replaceWith = ReplaceWith("tryReceive().getOrNull()")
739 ) // Warning since 1.5.0, error since 1.6.0, not hidden until 1.8+ because API is quite widespread
740 public fun poll(): E? {
741 val result = tryReceive()
742 if (result.isSuccess) return result.getOrThrow()
743 throw recoverStackTrace(result.exceptionOrNull() ?: return null)
744 }
745
746 /**
747 * This function was deprecated since 1.3.0 and is no longer recommended to use
748 * or to implement in subclasses.
749 *
750 * It had the following pitfalls:
751 * - Didn't allow to distinguish 'null' as "closed channel" from "null as a value"
752 * - Was throwing if the channel has failed even though its signature may suggest it returns 'null'
753 * - It didn't really belong to core channel API and can be exposed as an extension instead.
754 *
755 * ### Replacement note
756 *
757 * The replacement `receiveCatching().getOrNull()` is a safe default that ignores all close exceptions and
758 * proceeds with `null`, while `receiveOrNull` throws an exception if the channel was closed with an exception.
759 * Replacement with the very same `receiveOrNull` semantics is `receiveCatching().onClosed { if (it != null) throw it }.getOrNull()`.
760 *
761 * @suppress **Deprecated**
762 */
763 @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
764 @LowPriorityInOverloadResolution
765 @Deprecated(
766 message = "Deprecated in favor of 'receiveCatching'. " +
767 "Please note that the provided replacement does not rethrow channel's close cause as 'receiveOrNull' did, " +
768 "for the detailed replacement please refer to the 'receiveOrNull' documentation",
769 level = DeprecationLevel.ERROR,
770 replaceWith = ReplaceWith("receiveCatching().getOrNull()")
771 ) // Warning since 1.3.0, error in 1.5.0, cannot be hidden due to deprecated extensions
receiveOrNullnull772 public suspend fun receiveOrNull(): E? = receiveCatching().getOrNull()
773
774 /**
775 * This function was deprecated since 1.3.0 and is no longer recommended to use
776 * or to implement in subclasses.
777 * See [receiveOrNull] documentation.
778 *
779 * @suppress **Deprecated**: in favor of onReceiveCatching extension.
780 */
781 @Suppress("DEPRECATION_ERROR")
782 @Deprecated(
783 message = "Deprecated in favor of onReceiveCatching extension",
784 level = DeprecationLevel.ERROR,
785 replaceWith = ReplaceWith("onReceiveCatching")
786 ) // Warning since 1.3.0, error in 1.5.0, will be hidden or removed in 1.7.0
787 public val onReceiveOrNull: SelectClause1<E?> get() = (this as BufferedChannel<E>).onReceiveOrNull
788 }
789
790 /**
791 * A discriminated union representing a channel operation result.
792 * It encapsulates the knowledge of whether the operation succeeded, failed with an option to retry,
793 * or failed because the channel was closed.
794 *
795 * If the operation was [successful][isSuccess], [T] is the result of the operation:
796 * for example, for [ReceiveChannel.receiveCatching] and [ReceiveChannel.tryReceive],
797 * it is the element received from the channel, and for [Channel.trySend], it is [Unit],
798 * as the channel does not receive anything in return for sending a channel.
799 * This value can be retrieved with [getOrNull] or [getOrThrow].
800 *
801 * If the operation [failed][isFailure], it does not necessarily mean that the channel itself is closed.
802 * For example, [ReceiveChannel.receiveCatching] and [ReceiveChannel.tryReceive] can fail because the channel is empty,
803 * and [Channel.trySend] can fail because the channel is full.
804 *
805 * If the operation [failed][isFailure] because the channel was closed for that operation, [isClosed] returns `true`.
806 * The opposite is also true: if [isClosed] returns `true`, then the channel is closed for that operation
807 * ([ReceiveChannel.isClosedForReceive] or [SendChannel.isClosedForSend]).
808 * In this case, retrying the operation is meaningless: once closed, the channel will remain closed.
809 * The [exceptionOrNull] function returns the reason the channel was closed, if any was given.
810 *
811 * Manually obtaining a [ChannelResult] instance is not supported.
812 * See the documentation for [ChannelResult]-returning functions for usage examples.
813 */
814 @JvmInline
815 public value class ChannelResult<out T>
816 @PublishedApi internal constructor(@PublishedApi internal val holder: Any?) {
817 /**
818 * Whether the operation succeeded.
819 *
820 * If this returns `true`, the operation was successful.
821 * In this case, [getOrNull] and [getOrThrow] can be used to retrieve the value.
822 *
823 * If this returns `false`, the operation failed.
824 * [isClosed] can be used to determine whether the operation failed because the channel was closed
825 * (and therefore retrying the operation is meaningless).
826 *
827 * ```
828 * val result = channel.tryReceive()
829 * if (result.isSuccess) {
830 * println("Successfully received the value ${result.getOrThrow()}")
831 * } else {
832 * println("Failed to receive the value.")
833 * if (result.isClosed) {
834 * println("The channel is closed.")
835 * if (result.exceptionOrNull() != null) {
836 * println("The reason: ${result.exceptionOrNull()}")
837 * }
838 * }
839 * }
840 * ```
841 *
842 * [isFailure] is a shorthand for `!isSuccess`.
843 * [getOrNull] can simplify [isSuccess] followed by [getOrThrow] into just one check if [T] is known
844 * to be non-nullable.
845 */
846 public val isSuccess: Boolean get() = holder !is Failed
847
848 /**
849 * Whether the operation failed.
850 *
851 * A shorthand for `!isSuccess`. See [isSuccess] for more details.
852 */
853 public val isFailure: Boolean get() = holder is Failed
854
855 /**
856 * Whether the operation failed because the channel was closed.
857 *
858 * If this returns `true`, the channel was closed for the operation that returned this result.
859 * In this case, retrying the operation is meaningless: once closed, the channel will remain closed.
860 * [isSuccess] will return `false`.
861 * [exceptionOrNull] can be used to determine the reason the channel was [closed][SendChannel.close]
862 * if one was given.
863 *
864 * If this returns `false`, subsequent attempts to perform the same operation may succeed.
865 *
866 * ```
867 * val result = channel.trySend(42)
868 * if (result.isClosed) {
869 * println("The channel is closed.")
870 * if (result.exceptionOrNull() != null) {
871 * println("The reason: ${result.exceptionOrNull()}")
872 * }
873 * }
874 */
875 public val isClosed: Boolean get() = holder is Closed
876
877 /**
878 * Returns the encapsulated [T] if the operation succeeded, or `null` if it failed.
879 *
880 * For non-nullable [T], the following code can be used to handle the result:
881 * ```
882 * val result = channel.tryReceive()
883 * val value = result.getOrNull()
884 * if (value == null) {
885 * if (result.isClosed) {
886 * println("The channel is closed.")
887 * if (result.exceptionOrNull() != null) {
888 * println("The reason: ${result.exceptionOrNull()}")
889 * }
890 * }
891 * return
892 * }
893 * println("Successfully received the value $value")
894 * ```
895 *
896 * If [T] is nullable, [getOrThrow] together with [isSuccess] is a more reliable way to handle the result.
897 */
898 @Suppress("UNCHECKED_CAST")
899 public fun getOrNull(): T? = if (holder !is Failed) holder as T else null
900
901 /**
902 * Returns the encapsulated [T] if the operation succeeded, or throws the encapsulated exception if it failed.
903 *
904 * Example:
905 * ```
906 * val result = channel.tryReceive()
907 * if (result.isSuccess) {
908 * println("Successfully received the value ${result.getOrThrow()}")
909 * }
910 * ```
911 *
912 * @throws IllegalStateException if the operation failed, but the channel was not closed with a cause.
913 */
914 public fun getOrThrow(): T {
915 @Suppress("UNCHECKED_CAST")
916 if (holder !is Failed) return holder as T
917 if (holder is Closed) {
918 check(holder.cause != null) { "Trying to call 'getOrThrow' on a channel closed without a cause" }
919 throw holder.cause
920 }
921 error("Trying to call 'getOrThrow' on a failed result of a non-closed channel")
922 }
923
924 /**
925 * Returns the exception with which the channel was closed, or `null` if the channel was not closed or was closed
926 * without a cause.
927 *
928 * [exceptionOrNull] can only return a non-`null` value if [isClosed] is `true`,
929 * but even if [isClosed] is `true`,
930 * [exceptionOrNull] can still return `null` if the channel was closed without a cause.
931 *
932 * ```
933 * val result = channel.tryReceive()
934 * if (result.isClosed) {
935 * // Now we know not to retry the operation later.
936 * // Check if the channel was closed with a cause and rethrow the exception:
937 * result.exceptionOrNull()?.let { throw it }
938 * // Otherwise, the channel was closed without a cause.
939 * }
940 * ```
941 */
942 public fun exceptionOrNull(): Throwable? = (holder as? Closed)?.cause
943
944 internal open class Failed {
945 override fun toString(): String = "Failed"
946 }
947
948 internal class Closed(@JvmField val cause: Throwable?): Failed() {
949 override fun equals(other: Any?): Boolean = other is Closed && cause == other.cause
950 override fun hashCode(): Int = cause.hashCode()
951 override fun toString(): String = "Closed($cause)"
952 }
953
954 /**
955 * @suppress **This is internal API and it is subject to change.**
956 */
957 @InternalCoroutinesApi
958 public companion object {
959 private val failed = Failed()
960
961 @InternalCoroutinesApi
962 public fun <E> success(value: E): ChannelResult<E> =
963 ChannelResult(value)
964
965 @InternalCoroutinesApi
966 public fun <E> failure(): ChannelResult<E> =
967 ChannelResult(failed)
968
969 @InternalCoroutinesApi
970 public fun <E> closed(cause: Throwable?): ChannelResult<E> =
971 ChannelResult(Closed(cause))
972 }
973
974 public override fun toString(): String =
975 when (holder) {
976 is Closed -> holder.toString()
977 else -> "Value($holder)"
978 }
979 }
980
981 /**
982 * Returns the encapsulated value if the operation [succeeded][ChannelResult.isSuccess], or the
983 * result of [onFailure] function for [ChannelResult.exceptionOrNull] otherwise.
984 *
985 * A shorthand for `if (isSuccess) getOrNull() else onFailure(exceptionOrNull())`.
986 *
987 * @see ChannelResult.getOrNull
988 * @see ChannelResult.exceptionOrNull
989 */
990 @OptIn(ExperimentalContracts::class)
getOrElsenull991 public inline fun <T> ChannelResult<T>.getOrElse(onFailure: (exception: Throwable?) -> T): T {
992 contract {
993 callsInPlace(onFailure, InvocationKind.AT_MOST_ONCE)
994 }
995 @Suppress("UNCHECKED_CAST")
996 return if (holder is ChannelResult.Failed) onFailure(exceptionOrNull()) else holder as T
997 }
998
999 /**
1000 * Performs the given [action] on the encapsulated value if the operation [succeeded][ChannelResult.isSuccess].
1001 * Returns the original `ChannelResult` unchanged.
1002 *
1003 * A shorthand for `this.also { if (isSuccess) action(getOrThrow()) }`.
1004 */
1005 @OptIn(ExperimentalContracts::class)
onSuccessnull1006 public inline fun <T> ChannelResult<T>.onSuccess(action: (value: T) -> Unit): ChannelResult<T> {
1007 contract {
1008 callsInPlace(action, InvocationKind.AT_MOST_ONCE)
1009 }
1010 @Suppress("UNCHECKED_CAST")
1011 if (holder !is ChannelResult.Failed) action(holder as T)
1012 return this
1013 }
1014
1015 /**
1016 * Performs the given [action] if the operation [failed][ChannelResult.isFailure].
1017 * The result of [ChannelResult.exceptionOrNull] is passed to the [action] parameter.
1018 *
1019 * Returns the original `ChannelResult` unchanged.
1020 *
1021 * A shorthand for `this.also { if (isFailure) action(exceptionOrNull()) }`.
1022 */
1023 @OptIn(ExperimentalContracts::class)
onFailurenull1024 public inline fun <T> ChannelResult<T>.onFailure(action: (exception: Throwable?) -> Unit): ChannelResult<T> {
1025 contract {
1026 callsInPlace(action, InvocationKind.AT_MOST_ONCE)
1027 }
1028 if (holder is ChannelResult.Failed) action(exceptionOrNull())
1029 return this
1030 }
1031
1032 /**
1033 * Performs the given [action] if the operation failed because the channel was [closed][ChannelResult.isClosed] for
1034 * that operation.
1035 * The result of [ChannelResult.exceptionOrNull] is passed to the [action] parameter.
1036 *
1037 * It is guaranteed that if action is invoked, then the channel is either [closed for send][Channel.isClosedForSend]
1038 * or is [closed for receive][Channel.isClosedForReceive] depending on the failed operation.
1039 *
1040 * Returns the original `ChannelResult` unchanged.
1041 *
1042 * A shorthand for `this.also { if (isClosed) action(exceptionOrNull()) }`.
1043 */
1044 @OptIn(ExperimentalContracts::class)
onClosednull1045 public inline fun <T> ChannelResult<T>.onClosed(action: (exception: Throwable?) -> Unit): ChannelResult<T> {
1046 contract {
1047 callsInPlace(action, InvocationKind.AT_MOST_ONCE)
1048 }
1049 if (holder is ChannelResult.Closed) action(exceptionOrNull())
1050 return this
1051 }
1052
1053 /**
1054 * Iterator for a [ReceiveChannel].
1055 * Instances of this interface are *not thread-safe* and shall not be used from concurrent coroutines.
1056 */
1057 public interface ChannelIterator<out E> {
1058 /**
1059 * Prepare an element for retrieval by the invocation of [next].
1060 *
1061 * - If the element that was retrieved by an earlier [hasNext] call was not yet consumed by [next], returns `true`.
1062 * - If the channel has an element available, returns `true` and removes it from the channel.
1063 * This element will be returned by the subsequent invocation of [next].
1064 * - If the channel is [closed for receiving][ReceiveChannel.isClosedForReceive] without a cause, returns `false`.
1065 * - If the channel is closed with a cause, throws the original [close][SendChannel.close] cause exception.
1066 * - If the channel is not closed but does not contain an element,
1067 * suspends until either an element is sent to the channel or the channel gets closed.
1068 *
1069 * This suspending function is cancellable: if the [Job] of the current coroutine is cancelled while this
1070 * suspending function is waiting, this function immediately resumes with [CancellationException].
1071 * There is a **prompt cancellation guarantee**: even if [hasNext] retrieves the element from the channel during
1072 * its operation, but was cancelled while suspended, [CancellationException] will be thrown.
1073 * See [suspendCancellableCoroutine] for low-level details.
1074 *
1075 * Because of the prompt cancellation guarantee, some values retrieved from the channel can become lost.
1076 * See the "Undelivered elements" section in the [Channel] documentation
1077 * for details on handling undelivered elements.
1078 *
1079 * Note that this function does not check for cancellation when it is not suspended, that is,
1080 * if the next element is immediately available.
1081 * Use [ensureActive] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
1082 */
hasNextnull1083 public suspend operator fun hasNext(): Boolean
1084
1085 @Deprecated(message = "Since 1.3.0, binary compatibility with versions <= 1.2.x", level = DeprecationLevel.HIDDEN)
1086 @Suppress("INAPPLICABLE_JVM_NAME")
1087 @JvmName("next")
1088 public suspend fun next0(): E {
1089 /*
1090 * Before 1.3.0 the "next()" could have been used without invoking "hasNext" first and there were code samples
1091 * demonstrating this behavior, so we preserve this logic for full binary backwards compatibility with previously
1092 * compiled code.
1093 */
1094 if (!hasNext()) throw ClosedReceiveChannelException(DEFAULT_CLOSE_MESSAGE)
1095 return next()
1096 }
1097
1098 /**
1099 * Retrieves the element removed from the channel by the preceding call to [hasNext], or
1100 * throws an [IllegalStateException] if [hasNext] was not invoked.
1101 *
1102 * This method can only be used together with [hasNext]:
1103 * ```
1104 * while (iterator.hasNext()) {
1105 * val element = iterator.next()
1106 * // ... handle the element ...
1107 * }
1108 * ```
1109 *
1110 * A more idiomatic way to iterate over a channel is to use a `for` loop:
1111 * ```
1112 * for (element in channel) {
1113 * // ... handle the element ...
1114 * }
1115 * ```
1116 *
1117 * This method never throws if [hasNext] returned `true`.
1118 * If [hasNext] threw the cause with which the channel was closed, this method will rethrow the same exception.
1119 * If [hasNext] returned `false` because the channel was closed without a cause, this method throws
1120 * a [ClosedReceiveChannelException].
1121 */
nextnull1122 public operator fun next(): E
1123 }
1124
1125 /**
1126 * Channel is a non-blocking primitive for communication between a sender (via [SendChannel]) and a receiver (via [ReceiveChannel]).
1127 * Conceptually, a channel is similar to `java.util.concurrent.BlockingQueue`,
1128 * but it has suspending operations instead of blocking ones and can be [closed][SendChannel.close].
1129 *
1130 * ### Channel capacity
1131 *
1132 * Most ways to create a [Channel] (in particular, the `Channel()` factory function) allow specifying a capacity,
1133 * which determines how elements are buffered in the channel.
1134 * There are several predefined constants for the capacity that have special behavior:
1135 *
1136 * - [Channel.RENDEZVOUS] (or 0) creates a _rendezvous_ channel, which does not have a buffer at all.
1137 * Instead, the sender and the receiver must rendezvous (meet):
1138 * [SendChannel.send] suspends until another coroutine invokes [ReceiveChannel.receive], and vice versa.
1139 * - [Channel.CONFLATED] creates a buffer for a single element and automatically changes the
1140 * [buffer overflow strategy][BufferOverflow] to [BufferOverflow.DROP_OLDEST].
1141 * - [Channel.UNLIMITED] creates a channel with an unlimited buffer, which never suspends the sender.
1142 * - [Channel.BUFFERED] creates a channel with a buffer whose size depends on
1143 * the [buffer overflow strategy][BufferOverflow].
1144 *
1145 * See each constant's documentation for more details.
1146 *
1147 * If the capacity is positive but less than [Channel.UNLIMITED], the channel has a buffer with the specified capacity.
1148 * It is safe to construct a channel with a large buffer, as memory is only allocated gradually as elements are added.
1149 *
1150 * Constructing a channel with a negative capacity not equal to a predefined constant is not allowed
1151 * and throws an [IllegalArgumentException].
1152 *
1153 * ### Buffer overflow
1154 *
1155 * Some ways to create a [Channel] also expose a [BufferOverflow] parameter (by convention, `onBufferOverflow`),
1156 * which does not affect the receiver but determines the behavior of the sender when the buffer is full.
1157 * The options include [suspending][BufferOverflow.SUSPEND] until there is space in the buffer,
1158 * [dropping the oldest element][BufferOverflow.DROP_OLDEST] to make room for the new one, or
1159 * [dropping the element to be sent][BufferOverflow.DROP_LATEST]. See the [BufferOverflow] documentation.
1160 *
1161 * By convention, the default value for [BufferOverflow] whenever it can not be configured is [BufferOverflow.SUSPEND].
1162 *
1163 * See the [Channel.RENDEZVOUS], [Channel.CONFLATED], and [Channel.UNLIMITED] documentation for a description of how
1164 * they interact with the [BufferOverflow] parameter.
1165 *
1166 * ### Prompt cancellation guarantee
1167 *
1168 * All suspending functions with channels provide **prompt cancellation guarantee**.
1169 * If the job was cancelled while send or receive function was suspended, it will not resume successfully, even if it
1170 * already changed the channel's state, but throws a [CancellationException].
1171 * With a single-threaded [dispatcher][CoroutineDispatcher] like [Dispatchers.Main], this gives a
1172 * guarantee that the coroutine promptly reacts to the cancellation of its [Job] and does not resume its execution.
1173 *
1174 * > **Prompt cancellation guarantee** for channel operations was added in `kotlinx.coroutines` version `1.4.0`
1175 * > and has replaced the channel-specific atomic cancellation that was not consistent with other suspending functions.
1176 * > The low-level mechanics of prompt cancellation are explained in the [suspendCancellableCoroutine] documentation.
1177 *
1178 * ### Undelivered elements
1179 *
1180 * As a result of the prompt cancellation guarantee, when a closeable resource
1181 * (like an open file or a handle to another native resource) is transferred via a channel,
1182 * it can be successfully extracted from the channel,
1183 * but still be lost if the receiving operation is cancelled in parallel.
1184 *
1185 * The `Channel()` factory function has the optional parameter `onUndeliveredElement`.
1186 * When that parameter is set, the corresponding function is called once for each element
1187 * that was sent to the channel with the call to the [send][SendChannel.send] function but failed to be delivered,
1188 * which can happen in the following cases:
1189 *
1190 * - When an element is dropped due to the limited buffer capacity.
1191 * This can happen when the overflow strategy is [BufferOverflow.DROP_LATEST] or [BufferOverflow.DROP_OLDEST].
1192 * - When the sending operations like [send][SendChannel.send] or [onSend][SendChannel.onSend]
1193 * throw an exception because it was cancelled
1194 * before it had a chance to actually send the element
1195 * or because the channel was [closed][SendChannel.close] or [cancelled][ReceiveChannel.cancel].
1196 * - When the receiving operations like [receive][ReceiveChannel.receive],
1197 * [onReceive][ReceiveChannel.onReceive], or [hasNext][ChannelIterator.hasNext]
1198 * throw an exception after retrieving the element from the channel
1199 * because of being cancelled before the code following them had a chance to resume.
1200 * - When the channel was [cancelled][ReceiveChannel.cancel], in which case `onUndeliveredElement` is called on every
1201 * remaining element in the channel's buffer.
1202 *
1203 * Note that `onUndeliveredElement` is called synchronously in an arbitrary context.
1204 * It should be fast, non-blocking, and should not throw exceptions.
1205 * Any exception thrown by `onUndeliveredElement` is wrapped into an internal runtime exception
1206 * which is either rethrown from the caller method or handed off to the exception handler in the current context
1207 * (see [CoroutineExceptionHandler]) when one is available.
1208 *
1209 * A typical usage for `onUndeliveredElement` is to close a resource that is being transferred via the channel. The
1210 * following code pattern guarantees that opened resources are closed even if the producer, the consumer,
1211 * and/or the channel are cancelled. Resources are never lost.
1212 *
1213 * ```
1214 * // Create a channel with an onUndeliveredElement block that closes a resource
1215 * val channel = Channel<Resource>(onUndeliveredElement = { resource -> resource.close() })
1216 *
1217 * // Producer code
1218 * val resourceToSend = openResource()
1219 * channel.send(resourceToSend)
1220 *
1221 * // Consumer code
1222 * val resourceReceived = channel.receive()
1223 * try {
1224 * // work with received resource
1225 * } finally {
1226 * resourceReceived.close()
1227 * }
1228 * ```
1229 *
1230 * > Note that if any work happens between `openResource()` and `channel.send(...)`,
1231 * > it is your responsibility to ensure that resource gets closed in case this additional code fails.
1232 */
1233 public interface Channel<E> : SendChannel<E>, ReceiveChannel<E> {
1234 /**
1235 * Constants for the channel factory function `Channel()`.
1236 */
1237 public companion object Factory {
1238 /**
1239 * An unlimited buffer capacity.
1240 *
1241 * `Channel(UNLIMITED)` creates a channel with an unlimited buffer, which never suspends the sender.
1242 * The total amount of elements that can be sent to the channel is limited only by the available memory.
1243 *
1244 * If [BufferOverflow] is specified for the channel, it is completely ignored,
1245 * as the channel never suspends the sender.
1246 *
1247 * ```
1248 * val channel = Channel<Int>(Channel.UNLIMITED)
1249 * repeat(1000) {
1250 * channel.trySend(it)
1251 * }
1252 * repeat(1000) {
1253 * check(channel.tryReceive().getOrNull() == it)
1254 * }
1255 * ```
1256 */
1257 public const val UNLIMITED: Int = Int.MAX_VALUE
1258
1259 /**
1260 * The zero buffer capacity.
1261 *
1262 * For the default [BufferOverflow] value of [BufferOverflow.SUSPEND],
1263 * `Channel(RENDEZVOUS)` creates a channel without a buffer.
1264 * An element is transferred from the sender to the receiver only when [send] and [receive] invocations meet
1265 * in time (that is, they _rendezvous_),
1266 * so [send] suspends until another coroutine invokes [receive],
1267 * and [receive] suspends until another coroutine invokes [send].
1268 *
1269 * ```
1270 * val channel = Channel<Int>(Channel.RENDEZVOUS)
1271 * check(channel.trySend(5).isFailure) // sending fails: no receiver is waiting
1272 * launch(start = CoroutineStart.UNDISPATCHED) {
1273 * val element = channel.receive() // suspends
1274 * check(element == 3)
1275 * }
1276 * check(channel.trySend(3).isSuccess) // sending succeeds: receiver is waiting
1277 * ```
1278 *
1279 * If a different [BufferOverflow] is specified,
1280 * `Channel(RENDEZVOUS)` creates a channel with a buffer of size 1:
1281 *
1282 * ```
1283 * val channel = Channel<Int>(0, onBufferOverflow = BufferOverflow.DROP_OLDEST)
1284 * // None of the calls suspend, since the buffer overflow strategy is not SUSPEND
1285 * channel.send(1)
1286 * channel.send(2)
1287 * channel.send(3)
1288 * check(channel.receive() == 3)
1289 * ```
1290 */
1291 public const val RENDEZVOUS: Int = 0
1292
1293 /**
1294 * A single-element buffer with conflating behavior.
1295 *
1296 * Specifying [CONFLATED] as the capacity in the `Channel(...)` factory function is equivalent to
1297 * creating a channel with a buffer of size 1 and a [BufferOverflow] strategy of [BufferOverflow.DROP_OLDEST]:
1298 * `Channel(1, onBufferOverflow = BufferOverflow.DROP_OLDEST)`.
1299 * Such a channel buffers at most one element and conflates all subsequent `send` and `trySend` invocations
1300 * so that the receiver always gets the last element sent, **losing** the previously sent elements:
1301 * see the "Undelivered elements" section in the [Channel] documentation.
1302 * [Sending][send] to this channel never suspends, and [trySend] always succeeds.
1303 *
1304 * ```
1305 * val channel = Channel<Int>(Channel.CONFLATED)
1306 * channel.send(1)
1307 * channel.send(2)
1308 * channel.send(3)
1309 * check(channel.receive() == 3)
1310 * ```
1311 *
1312 * Specifying a [BufferOverflow] other than [BufferOverflow.SUSPEND] is not allowed with [CONFLATED], and
1313 * an [IllegalArgumentException] is thrown if such a combination is used.
1314 * For creating a conflated channel that instead keeps the existing element in the channel and throws out
1315 * the new one, use `Channel(1, onBufferOverflow = BufferOverflow.DROP_LATEST)`.
1316 */
1317 public const val CONFLATED: Int = -1
1318
1319 /**
1320 * A channel capacity marker that is substituted by the default buffer capacity.
1321 *
1322 * When passed as a parameter to the `Channel(...)` factory function, the default buffer capacity is used.
1323 * For [BufferOverflow.SUSPEND] (the default buffer overflow strategy), the default capacity is 64,
1324 * but on the JVM it can be overridden by setting the [DEFAULT_BUFFER_PROPERTY_NAME] system property.
1325 * The overridden value is used for all channels created with a default buffer capacity,
1326 * including those created in third-party libraries.
1327 *
1328 * ```
1329 * val channel = Channel<Int>(Channel.BUFFERED)
1330 * repeat(100) {
1331 * channel.trySend(it)
1332 * }
1333 * channel.close()
1334 * // The check can fail if the default buffer capacity is changed
1335 * check(channel.toList() == (0..<64).toList())
1336 * ```
1337 *
1338 * If a different [BufferOverflow] is specified, `Channel(BUFFERED)` creates a channel with a buffer of size 1:
1339 *
1340 * ```
1341 * val channel = Channel<Int>(Channel.BUFFERED, onBufferOverflow = BufferOverflow.DROP_OLDEST)
1342 * channel.send(1)
1343 * channel.send(2)
1344 * channel.send(3)
1345 * channel.close()
1346 * check(channel.toList() == listOf(3))
1347 * ```
1348 */
1349 public const val BUFFERED: Int = -2
1350
1351 // only for internal use, cannot be used with Channel(...)
1352 internal const val OPTIONAL_CHANNEL = -3
1353
1354 /**
1355 * Name of the JVM system property for the default channel capacity (64 by default).
1356 *
1357 * See [BUFFERED] for details on how this property is used.
1358 *
1359 * Setting this property affects the default channel capacity for channel constructors,
1360 * channel-backed coroutines and flow operators that imply channel usage,
1361 * including ones defined in 3rd-party libraries.
1362 *
1363 * Usage of this property is highly discouraged and is intended to be used as a last-ditch effort
1364 * as an immediate measure for hot fixes and duct-taping.
1365 */
1366 @DelicateCoroutinesApi
1367 public const val DEFAULT_BUFFER_PROPERTY_NAME: String = "kotlinx.coroutines.channels.defaultBuffer"
1368
1369 internal val CHANNEL_DEFAULT_CAPACITY = systemProp(DEFAULT_BUFFER_PROPERTY_NAME,
1370 64, 1, UNLIMITED - 1
1371 )
1372 }
1373 }
1374
1375 /**
1376 * Creates a channel. See the [Channel] interface documentation for details.
1377 *
1378 * This function is the most flexible way to create a channel.
1379 * It allows specifying the channel's capacity, buffer overflow strategy, and an optional function to call
1380 * to handle undelivered elements.
1381 *
1382 * ```
1383 * val allocatedResources = HashSet<Int>()
1384 * // An autocloseable resource that must be closed when it is no longer needed
1385 * class Resource(val id: Int): AutoCloseable {
1386 * init {
1387 * allocatedResources.add(id)
1388 * }
1389 * override fun close() {
1390 * allocatedResources.remove(id)
1391 * }
1392 * }
1393 * // A channel with a 15-element buffer that drops the oldest element on buffer overflow
1394 * // and closes the elements that were not delivered to the consumer
1395 * val channel = Channel<Resource>(
1396 * capacity = 15,
1397 * onBufferOverflow = BufferOverflow.DROP_OLDEST,
1398 * onUndeliveredElement = { element -> element.close() }
1399 * )
1400 * // A sender's view of the channel
1401 * val sendChannel: SendChannel<Resource> = channel
1402 * repeat(100) {
1403 * sendChannel.send(Resource(it))
1404 * }
1405 * sendChannel.close()
1406 * // A receiver's view of the channel
1407 * val receiveChannel: ReceiveChannel<Resource> = channel
1408 * val receivedResources = receiveChannel.toList()
1409 * // Check that the last 15 sent resources were received
1410 * check(receivedResources.map { it.id } == (85 until 100).toList())
1411 * // Close the resources that were successfully received
1412 * receivedResources.forEach { it.close() }
1413 * // The dropped resources were closed by the channel itself
1414 * check(allocatedResources.isEmpty())
1415 * ```
1416 *
1417 * For a full explanation of every parameter and their interaction, see the [Channel] interface documentation.
1418 *
1419 * @param capacity either a positive channel capacity or one of the constants defined in [Channel.Factory].
1420 * See the "Channel capacity" section in the [Channel] documentation.
1421 * @param onBufferOverflow configures an action on buffer overflow.
1422 * See the "Buffer overflow" section in the [Channel] documentation.
1423 * @param onUndeliveredElement a function that is called when element was sent but was not delivered to the consumer.
1424 * See the "Undelivered elements" section in the [Channel] documentation.
1425 * @throws IllegalArgumentException when [capacity] < -2
1426 */
Channelnull1427 public fun <E> Channel(
1428 capacity: Int = RENDEZVOUS,
1429 onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
1430 onUndeliveredElement: ((E) -> Unit)? = null
1431 ): Channel<E> =
1432 when (capacity) {
1433 RENDEZVOUS -> {
1434 if (onBufferOverflow == BufferOverflow.SUSPEND)
1435 BufferedChannel(RENDEZVOUS, onUndeliveredElement) // an efficient implementation of rendezvous channel
1436 else
1437 ConflatedBufferedChannel(1, onBufferOverflow, onUndeliveredElement) // support buffer overflow with buffered channel
1438 }
1439 CONFLATED -> {
1440 require(onBufferOverflow == BufferOverflow.SUSPEND) {
1441 "CONFLATED capacity cannot be used with non-default onBufferOverflow"
1442 }
1443 ConflatedBufferedChannel(1, BufferOverflow.DROP_OLDEST, onUndeliveredElement)
1444 }
1445 UNLIMITED -> BufferedChannel(UNLIMITED, onUndeliveredElement) // ignores onBufferOverflow: it has buffer, but it never overflows
1446 BUFFERED -> { // uses default capacity with SUSPEND
1447 if (onBufferOverflow == BufferOverflow.SUSPEND) BufferedChannel(CHANNEL_DEFAULT_CAPACITY, onUndeliveredElement)
1448 else ConflatedBufferedChannel(1, onBufferOverflow, onUndeliveredElement)
1449 }
1450 else -> {
1451 if (onBufferOverflow === BufferOverflow.SUSPEND) BufferedChannel(capacity, onUndeliveredElement)
1452 else ConflatedBufferedChannel(capacity, onBufferOverflow, onUndeliveredElement)
1453 }
1454 }
1455
1456 @Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.4.0, binary compatibility with earlier versions")
Channelnull1457 public fun <E> Channel(capacity: Int = RENDEZVOUS): Channel<E> = Channel(capacity)
1458
1459 /**
1460 * Indicates an attempt to [send][SendChannel.send] to a [closed-for-sending][SendChannel.isClosedForSend] channel
1461 * that was [closed][SendChannel.close] without a cause.
1462 *
1463 * If a cause was provided, that cause is thrown from [send][SendChannel.send] instead of this exception.
1464 * In particular, if the channel was closed because it was [cancelled][ReceiveChannel.cancel],
1465 * this exception will never be thrown: either the `cause` of the cancellation is thrown,
1466 * or a new [CancellationException] gets constructed to be thrown from [SendChannel.send].
1467 *
1468 * This exception is a subclass of [IllegalStateException], because the sender should not attempt to send to a closed
1469 * channel after it itself has [closed][SendChannel.close] it, and indicates an error on the part of the programmer.
1470 * Usually, this exception can be avoided altogether by restructuring the code.
1471 */
1472 public class ClosedSendChannelException(message: String?) : IllegalStateException(message)
1473
1474 /**
1475 * Indicates an attempt to [receive][ReceiveChannel.receive] from a
1476 * [closed-for-receiving][ReceiveChannel.isClosedForReceive] channel
1477 * that was [closed][SendChannel.close] without a cause.
1478 *
1479 * If a clause was provided, that clause is thrown from [receive][ReceiveChannel.receive] instead of this exception.
1480 * In particular, if the channel was closed because it was [cancelled][ReceiveChannel.cancel],
1481 * this exception will never be thrown: either the `cause` of the cancellation is thrown,
1482 * or a new [CancellationException] gets constructed to be thrown from [ReceiveChannel.receive].
1483 *
1484 * This exception is a subclass of [NoSuchElementException] to be consistent with plain collections.
1485 */
1486 public class ClosedReceiveChannelException(message: String?) : NoSuchElementException(message)
1487