• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 @file:JvmMultifileClass
2 @file:JvmName("ChannelsKt")
3 @file:OptIn(ExperimentalContracts::class)
4 
5 package kotlinx.coroutines.channels
6 
7 import kotlinx.coroutines.*
8 import kotlinx.coroutines.selects.*
9 import kotlin.contracts.*
10 import kotlin.jvm.*
11 
12 internal const val DEFAULT_CLOSE_MESSAGE = "Channel was closed"
13 
14 
15 // -------- Operations on BroadcastChannel --------
16 
17 /**
18  * This function is deprecated in the favour of [ReceiveChannel.receiveCatching].
19  *
20  * This function is considered error-prone for the following reasons;
21  * - Is throwing if the channel has failed even though its signature may suggest it returns 'null'
22  * - It is easy to forget that exception handling still have to be explicit
23  * - During code reviews and code reading, intentions of the code are frequently unclear:
24  *   are potential exceptions ignored deliberately or not?
25  *
26  * @suppress doc
27  */
28 @Deprecated(
29     "Deprecated in the favour of 'receiveCatching'",
30     ReplaceWith("receiveCatching().getOrNull()"),
31     DeprecationLevel.HIDDEN
32 ) // Warning since 1.5.0, ERROR in 1.6.0, HIDDEN in 1.7.0
33 @Suppress("EXTENSION_SHADOWED_BY_MEMBER", "DEPRECATION_ERROR")
receiveOrNullnull34 public suspend fun <E : Any> ReceiveChannel<E>.receiveOrNull(): E? {
35     @Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
36     return (this as ReceiveChannel<E?>).receiveOrNull()
37 }
38 
39 /**
40  * This function is deprecated in the favour of [ReceiveChannel.onReceiveCatching]
41  */
42 @Deprecated(
43     "Deprecated in the favour of 'onReceiveCatching'",
44     level = DeprecationLevel.HIDDEN
45 )  // Warning since 1.5.0, ERROR in 1.6.0, HIDDEN in 1.7.0
46 @Suppress("DEPRECATION_ERROR")
onReceiveOrNullnull47 public fun <E : Any> ReceiveChannel<E>.onReceiveOrNull(): SelectClause1<E?> {
48     return (this as ReceiveChannel<E?>).onReceiveOrNull
49 }
50 
51 /**
52  * Executes the [block] and then [cancels][ReceiveChannel.cancel] the channel.
53  *
54  * It is guaranteed that, after invoking this operation, the channel will be [cancelled][ReceiveChannel.cancel], so
55  * the operation is _terminal_.
56  * If the [block] finishes with an exception, that exception will be used for cancelling the channel and rethrown.
57  *
58  * This function is useful for building more complex terminal operators while ensuring that the producers stop sending
59  * new elements to the channel.
60  *
61  * Example:
62  * ```
63  * suspend fun <E> ReceiveChannel<E>.consumeFirst(): E =
64  *    consume { return receive() }
65  * // Launch a coroutine that constantly sends new values
66  * val channel = produce(Dispatchers.Default) {
67  *     var i = 0
68  *     while (true) {
69  *         // Will fail with a `CancellationException`
70  *         // after `consumeFirst` finishes.
71  *         send(i++)
72  *     }
73  * }
74  * // Grab the first value and discard everything else
75  * val firstElement = channel.consumeFirst()
76  * check(firstElement == 0)
77  * // *Note*: some elements could be lost in the channel!
78  * ```
79  *
80  * In this example, the channel will get closed, and the producer coroutine will finish its work after the first
81  * element is obtained.
82  * If `consumeFirst` was implemented as `for (e in this) { return e }` instead, the producer coroutine would be active
83  * until it was cancelled some other way.
84  *
85  * [consume] does not guarantee that new elements will not enter the channel after [block] finishes executing, so
86  * some channel elements may be lost.
87  * Use the `onUndeliveredElement` parameter of a manually created [Channel] to define what should happen with these
88  * elements during [ReceiveChannel.cancel].
89  */
consumenull90 public inline fun <E, R> ReceiveChannel<E>.consume(block: ReceiveChannel<E>.() -> R): R {
91     contract {
92         callsInPlace(block, InvocationKind.EXACTLY_ONCE)
93     }
94     var cause: Throwable? = null
95     try {
96         return block()
97     } catch (e: Throwable) {
98         cause = e
99         throw e
100     } finally {
101         cancelConsumed(cause)
102     }
103 }
104 
105 /**
106  * Performs the given [action] for each received element and [cancels][ReceiveChannel.cancel] the channel afterward.
107  *
108  * This function stops processing elements when either the channel is [closed][SendChannel.close],
109  * the coroutine in which the collection is performed gets cancelled and there are no readily available elements in the
110  * channel's buffer,
111  * [action] fails with an exception,
112  * or an early return from [action] happens.
113  * If the [action] finishes with an exception, that exception will be used for cancelling the channel and rethrown.
114  * If the channel is [closed][SendChannel.close] with a cause, this cause will be rethrown from [consumeEach].
115  *
116  * When the channel does not need to be closed after iterating over its elements,
117  * a regular `for` loop (`for (element in channel)`) should be used instead.
118  *
119  * The operation is _terminal_.
120  * This function [consumes][ReceiveChannel.consume] the elements of the original [ReceiveChannel].
121  *
122  * This function is useful in cases when this channel is only expected to have a single consumer that decides when
123  * the producer may stop.
124  * Example:
125  *
126  * ```
127  * val channel = Channel<Int>(1)
128  * // Launch several procedures that create values
129  * repeat(5) {
130  *     launch(Dispatchers.Default) {
131  *         while (true) {
132  *             channel.send(Random.nextInt(40, 50))
133  *         }
134  *     }
135  * }
136  * // Launch the exclusive consumer
137  * val result = run {
138  *     channel.consumeEach {
139  *         if (it == 42) {
140  *             println("Found the answer")
141  *             return@run it // forcibly stop collection
142  *         }
143  *     }
144  *     // *Note*: some elements could be lost in the channel!
145  * }
146  * check(result == 42)
147  * ```
148  *
149  * In this example, several coroutines put elements into a single channel, and a single consumer processes the elements.
150  * Once it finds the elements it's looking for, it stops [consumeEach] by making an early return.
151  *
152  * **Pitfall**: even though the name says "each", some elements could be left unprocessed if they are added after
153  * this function decided to close the channel.
154  * In this case, the elements will simply be lost.
155  * If the elements of the channel are resources that must be closed (like file handles, sockets, etc.),
156  * an `onUndeliveredElement` must be passed to the [Channel] on construction.
157  * It will be called for each element left in the channel at the point of cancellation.
158  */
consumeEachnull159 public suspend inline fun <E> ReceiveChannel<E>.consumeEach(action: (E) -> Unit): Unit =
160     consume {
161         for (e in this) action(e)
162     }
163 
164 /**
165  * Returns a [List] containing all the elements sent to this channel, preserving their order.
166  *
167  * This function will attempt to receive elements and put them into the list until the channel is
168  * [closed][SendChannel.close].
169  * Calling [toList] on channels that are not eventually closed is always incorrect:
170  * - It will suspend indefinitely if the channel is not closed, but no new elements arrive.
171  * - If new elements do arrive and the channel is not eventually closed, [toList] will use more and more memory
172  *   until exhausting it.
173  *
174  * If the channel is [closed][SendChannel.close] with a cause, [toList] will rethrow that cause.
175  *
176  * The operation is _terminal_.
177  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
178  *
179  * Example:
180  * ```
181  * val values = listOf(1, 5, 2, 9, 3, 3, 1)
182  * // start a new coroutine that creates a channel,
183  * // sends elements to it, and closes it
184  * // once the coroutine's body finishes
185  * val channel = produce {
186  *     values.forEach { send(it) }
187  * }
188  * check(channel.toList() == values)
189  * ```
190  */
<lambda>null191 public suspend fun <E> ReceiveChannel<E>.toList(): List<E> = buildList {
192     consumeEach {
193         add(it)
194     }
195 }
196 
197 @PublishedApi
cancelConsumednull198 internal fun ReceiveChannel<*>.cancelConsumed(cause: Throwable?) {
199     cancel(cause?.let {
200         it as? CancellationException ?: CancellationException("Channel was consumed, consumer had failed", it)
201     })
202 }
203 
204