1 @file:JvmMultifileClass
2 @file:JvmName("ChannelsKt")
3 @file:OptIn(ExperimentalContracts::class)
4
5 package kotlinx.coroutines.channels
6
7 import kotlinx.coroutines.*
8 import kotlinx.coroutines.selects.*
9 import kotlin.contracts.*
10 import kotlin.jvm.*
11
12 internal const val DEFAULT_CLOSE_MESSAGE = "Channel was closed"
13
14
15 // -------- Operations on BroadcastChannel --------
16
17 /**
18 * This function is deprecated in the favour of [ReceiveChannel.receiveCatching].
19 *
20 * This function is considered error-prone for the following reasons;
21 * - Is throwing if the channel has failed even though its signature may suggest it returns 'null'
22 * - It is easy to forget that exception handling still have to be explicit
23 * - During code reviews and code reading, intentions of the code are frequently unclear:
24 * are potential exceptions ignored deliberately or not?
25 *
26 * @suppress doc
27 */
28 @Deprecated(
29 "Deprecated in the favour of 'receiveCatching'",
30 ReplaceWith("receiveCatching().getOrNull()"),
31 DeprecationLevel.HIDDEN
32 ) // Warning since 1.5.0, ERROR in 1.6.0, HIDDEN in 1.7.0
33 @Suppress("EXTENSION_SHADOWED_BY_MEMBER", "DEPRECATION_ERROR")
receiveOrNullnull34 public suspend fun <E : Any> ReceiveChannel<E>.receiveOrNull(): E? {
35 @Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
36 return (this as ReceiveChannel<E?>).receiveOrNull()
37 }
38
39 /**
40 * This function is deprecated in the favour of [ReceiveChannel.onReceiveCatching]
41 */
42 @Deprecated(
43 "Deprecated in the favour of 'onReceiveCatching'",
44 level = DeprecationLevel.HIDDEN
45 ) // Warning since 1.5.0, ERROR in 1.6.0, HIDDEN in 1.7.0
46 @Suppress("DEPRECATION_ERROR")
onReceiveOrNullnull47 public fun <E : Any> ReceiveChannel<E>.onReceiveOrNull(): SelectClause1<E?> {
48 return (this as ReceiveChannel<E?>).onReceiveOrNull
49 }
50
51 /**
52 * Executes the [block] and then [cancels][ReceiveChannel.cancel] the channel.
53 *
54 * It is guaranteed that, after invoking this operation, the channel will be [cancelled][ReceiveChannel.cancel], so
55 * the operation is _terminal_.
56 * If the [block] finishes with an exception, that exception will be used for cancelling the channel and rethrown.
57 *
58 * This function is useful for building more complex terminal operators while ensuring that the producers stop sending
59 * new elements to the channel.
60 *
61 * Example:
62 * ```
63 * suspend fun <E> ReceiveChannel<E>.consumeFirst(): E =
64 * consume { return receive() }
65 * // Launch a coroutine that constantly sends new values
66 * val channel = produce(Dispatchers.Default) {
67 * var i = 0
68 * while (true) {
69 * // Will fail with a `CancellationException`
70 * // after `consumeFirst` finishes.
71 * send(i++)
72 * }
73 * }
74 * // Grab the first value and discard everything else
75 * val firstElement = channel.consumeFirst()
76 * check(firstElement == 0)
77 * // *Note*: some elements could be lost in the channel!
78 * ```
79 *
80 * In this example, the channel will get closed, and the producer coroutine will finish its work after the first
81 * element is obtained.
82 * If `consumeFirst` was implemented as `for (e in this) { return e }` instead, the producer coroutine would be active
83 * until it was cancelled some other way.
84 *
85 * [consume] does not guarantee that new elements will not enter the channel after [block] finishes executing, so
86 * some channel elements may be lost.
87 * Use the `onUndeliveredElement` parameter of a manually created [Channel] to define what should happen with these
88 * elements during [ReceiveChannel.cancel].
89 */
consumenull90 public inline fun <E, R> ReceiveChannel<E>.consume(block: ReceiveChannel<E>.() -> R): R {
91 contract {
92 callsInPlace(block, InvocationKind.EXACTLY_ONCE)
93 }
94 var cause: Throwable? = null
95 try {
96 return block()
97 } catch (e: Throwable) {
98 cause = e
99 throw e
100 } finally {
101 cancelConsumed(cause)
102 }
103 }
104
105 /**
106 * Performs the given [action] for each received element and [cancels][ReceiveChannel.cancel] the channel afterward.
107 *
108 * This function stops processing elements when either the channel is [closed][SendChannel.close],
109 * the coroutine in which the collection is performed gets cancelled and there are no readily available elements in the
110 * channel's buffer,
111 * [action] fails with an exception,
112 * or an early return from [action] happens.
113 * If the [action] finishes with an exception, that exception will be used for cancelling the channel and rethrown.
114 * If the channel is [closed][SendChannel.close] with a cause, this cause will be rethrown from [consumeEach].
115 *
116 * When the channel does not need to be closed after iterating over its elements,
117 * a regular `for` loop (`for (element in channel)`) should be used instead.
118 *
119 * The operation is _terminal_.
120 * This function [consumes][ReceiveChannel.consume] the elements of the original [ReceiveChannel].
121 *
122 * This function is useful in cases when this channel is only expected to have a single consumer that decides when
123 * the producer may stop.
124 * Example:
125 *
126 * ```
127 * val channel = Channel<Int>(1)
128 * // Launch several procedures that create values
129 * repeat(5) {
130 * launch(Dispatchers.Default) {
131 * while (true) {
132 * channel.send(Random.nextInt(40, 50))
133 * }
134 * }
135 * }
136 * // Launch the exclusive consumer
137 * val result = run {
138 * channel.consumeEach {
139 * if (it == 42) {
140 * println("Found the answer")
141 * return@run it // forcibly stop collection
142 * }
143 * }
144 * // *Note*: some elements could be lost in the channel!
145 * }
146 * check(result == 42)
147 * ```
148 *
149 * In this example, several coroutines put elements into a single channel, and a single consumer processes the elements.
150 * Once it finds the elements it's looking for, it stops [consumeEach] by making an early return.
151 *
152 * **Pitfall**: even though the name says "each", some elements could be left unprocessed if they are added after
153 * this function decided to close the channel.
154 * In this case, the elements will simply be lost.
155 * If the elements of the channel are resources that must be closed (like file handles, sockets, etc.),
156 * an `onUndeliveredElement` must be passed to the [Channel] on construction.
157 * It will be called for each element left in the channel at the point of cancellation.
158 */
consumeEachnull159 public suspend inline fun <E> ReceiveChannel<E>.consumeEach(action: (E) -> Unit): Unit =
160 consume {
161 for (e in this) action(e)
162 }
163
164 /**
165 * Returns a [List] containing all the elements sent to this channel, preserving their order.
166 *
167 * This function will attempt to receive elements and put them into the list until the channel is
168 * [closed][SendChannel.close].
169 * Calling [toList] on channels that are not eventually closed is always incorrect:
170 * - It will suspend indefinitely if the channel is not closed, but no new elements arrive.
171 * - If new elements do arrive and the channel is not eventually closed, [toList] will use more and more memory
172 * until exhausting it.
173 *
174 * If the channel is [closed][SendChannel.close] with a cause, [toList] will rethrow that cause.
175 *
176 * The operation is _terminal_.
177 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
178 *
179 * Example:
180 * ```
181 * val values = listOf(1, 5, 2, 9, 3, 3, 1)
182 * // start a new coroutine that creates a channel,
183 * // sends elements to it, and closes it
184 * // once the coroutine's body finishes
185 * val channel = produce {
186 * values.forEach { send(it) }
187 * }
188 * check(channel.toList() == values)
189 * ```
190 */
<lambda>null191 public suspend fun <E> ReceiveChannel<E>.toList(): List<E> = buildList {
192 consumeEach {
193 add(it)
194 }
195 }
196
197 @PublishedApi
cancelConsumednull198 internal fun ReceiveChannel<*>.cancelConsumed(cause: Throwable?) {
199 cancel(cause?.let {
200 it as? CancellationException ?: CancellationException("Channel was consumed, consumer had failed", it)
201 })
202 }
203
204