• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 @file:JvmMultifileClass
5 @file:JvmName("ChannelsKt")
6 @file:Suppress("DEPRECATION_ERROR")
7 
8 package kotlinx.coroutines.channels
9 
10 import kotlinx.coroutines.*
11 import kotlinx.coroutines.selects.*
12 import kotlin.coroutines.*
13 import kotlin.jvm.*
14 
15 internal const val DEFAULT_CLOSE_MESSAGE = "Channel was closed"
16 
17 
18 // -------- Operations on BroadcastChannel --------
19 
20 /**
21  * Opens subscription to this [BroadcastChannel] and makes sure that the given [block] consumes all elements
22  * from it by always invoking [cancel][ReceiveChannel.cancel] after the execution of the block.
23  *
24  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
25  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
26  */
27 @ObsoleteCoroutinesApi
28 public inline fun <E, R> BroadcastChannel<E>.consume(block: ReceiveChannel<E>.() -> R): R {
29     val channel = openSubscription()
30     try {
31         return channel.block()
32     } finally {
33         channel.cancel()
34     }
35 }
36 
37 /**
38  * Retrieves and removes the element from this channel suspending the caller while this channel [isEmpty]
39  * or returns `null` if the channel is [closed][Channel.isClosedForReceive].
40  *
41  * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
42  * function is suspended, this function immediately resumes with [CancellationException].
43  * There is a **prompt cancellation guarantee**. If the job was cancelled while this function was
44  * suspended, it will not resume successfully. If the `receiveOrNull` call threw [CancellationException] there is no way
45  * to tell if some element was already received from the channel or not. See [Channel] documentation for details.
46  *
47  * Note, that this function does not check for cancellation when it is not suspended.
48  * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
49  *
50  * This extension is defined only for channels on non-null types, so that generic functions defined using
51  * these extensions do not accidentally confuse `null` value and a normally closed channel, leading to hard
52  * to find bugs.
53  */
54 @Suppress("EXTENSION_SHADOWED_BY_MEMBER")
55 @ExperimentalCoroutinesApi // since 1.3.0, tentatively stable in 1.4.x
receiveOrNullnull56 public suspend fun <E : Any> ReceiveChannel<E>.receiveOrNull(): E? {
57     @Suppress("DEPRECATION", "UNCHECKED_CAST")
58     return (this as ReceiveChannel<E?>).receiveOrNull()
59 }
60 
61 /**
62  * Clause for [select] expression of [receiveOrNull] suspending function that selects with the element that
63  * is received from the channel or selects with `null` if the channel
64  * [isClosedForReceive][ReceiveChannel.isClosedForReceive] without cause. The [select] invocation fails with
65  * the original [close][SendChannel.close] cause exception if the channel has _failed_.
66  *
67  * This extension is defined only for channels on non-null types, so that generic functions defined using
68  * these extensions do not accidentally confuse `null` value and a normally closed channel, leading to hard
69  * to find bugs.
70  **/
71 @ExperimentalCoroutinesApi // since 1.3.0, tentatively stable in 1.4.x
onReceiveOrNullnull72 public fun <E : Any> ReceiveChannel<E>.onReceiveOrNull(): SelectClause1<E?> {
73     @Suppress("DEPRECATION", "UNCHECKED_CAST")
74     return (this as ReceiveChannel<E?>).onReceiveOrNull
75 }
76 
77 /**
78  * Subscribes to this [BroadcastChannel] and performs the specified action for each received element.
79  *
80  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
81  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
82  */
83 @ObsoleteCoroutinesApi
consumeEachnull84 public suspend inline fun <E> BroadcastChannel<E>.consumeEach(action: (E) -> Unit): Unit =
85     consume {
86         for (element in this) action(element)
87     }
88 
89 // -------- Operations on ReceiveChannel --------
90 
91 /**
92  * Returns a [CompletionHandler] that invokes [cancel][ReceiveChannel.cancel] on the [ReceiveChannel]
93  * with the corresponding cause. See also [ReceiveChannel.consume].
94  *
95  * **WARNING**: It is planned that in the future a second invocation of this method
96  * on an channel that is already being consumed is going to fail fast, that it
97  * immediately throws an [IllegalStateException].
98  * See [this issue](https://github.com/Kotlin/kotlinx.coroutines/issues/167)
99  * for details.
100  *
101  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
102  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
103  */
104 @Deprecated(
105     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
106     level = DeprecationLevel.ERROR
107 )
causenull108 public fun ReceiveChannel<*>.consumes(): CompletionHandler = { cause: Throwable? ->
109     cancelConsumed(cause)
110 }
111 
112 @PublishedApi
cancelConsumednull113 internal fun ReceiveChannel<*>.cancelConsumed(cause: Throwable?) {
114     cancel(cause?.let {
115         it as? CancellationException ?: CancellationException("Channel was consumed, consumer had failed", it)
116     })
117 }
118 
119 /**
120  * Returns a [CompletionHandler] that invokes [cancel][ReceiveChannel.cancel] on all the
121  * specified [ReceiveChannel] instances with the corresponding cause.
122  * See also [ReceiveChannel.consumes()] for a version on one channel.
123  *
124  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
125  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
126  */
127 @Deprecated(
128     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
129     level = DeprecationLevel.ERROR
130 )
consumesAllnull131 public fun consumesAll(vararg channels: ReceiveChannel<*>): CompletionHandler =
132     { cause: Throwable? ->
133         var exception: Throwable? = null
134         for (channel in channels)
135             try {
136                 channel.cancelConsumed(cause)
137             } catch (e: Throwable) {
138                 if (exception == null) {
139                     exception = e
140                 } else {
141                     exception.addSuppressedThrowable(e)
142                 }
143             }
144         exception?.let { throw it }
145     }
146 
147 /**
148  * Makes sure that the given [block] consumes all elements from the given channel
149  * by always invoking [cancel][ReceiveChannel.cancel] after the execution of the block.
150  *
151  * The operation is _terminal_.
152  */
consumenull153 public inline fun <E, R> ReceiveChannel<E>.consume(block: ReceiveChannel<E>.() -> R): R {
154     var cause: Throwable? = null
155     try {
156         return block()
157     } catch (e: Throwable) {
158         cause = e
159         throw e
160     } finally {
161         cancelConsumed(cause)
162     }
163 }
164 
165 /**
166  * Performs the given [action] for each received element and [cancels][ReceiveChannel.cancel]
167  * the channel after the execution of the block.
168  * If you need to iterate over the channel without consuming it, a regular `for` loop should be used instead.
169  *
170  * The operation is _terminal_.
171  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
172  */
consumeEachnull173 public suspend inline fun <E> ReceiveChannel<E>.consumeEach(action: (E) -> Unit): Unit =
174     consume {
175         for (e in this) action(e)
176     }
177 
178 /**
179  * Performs the given [action] for each received element.
180  *
181  * The operation is _terminal_.
182  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
183  *
184  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
185  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
186  */
187 @Deprecated(
188     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
189     level = DeprecationLevel.ERROR
190 )
consumeEachIndexednull191 public suspend inline fun <E> ReceiveChannel<E>.consumeEachIndexed(action: (IndexedValue<E>) -> Unit) {
192     var index = 0
193     consumeEach {
194         action(IndexedValue(index++, it))
195     }
196 }
197 
198 /**
199  * Returns an element at the given [index] or throws an [IndexOutOfBoundsException] if the [index] is out of bounds of this channel.
200  *
201  * The operation is _terminal_.
202  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
203  *
204  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
205  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
206  */
207 @Deprecated(
208     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
209     level = DeprecationLevel.ERROR
210 )
elementAtnull211 public suspend fun <E> ReceiveChannel<E>.elementAt(index: Int): E =
212     elementAtOrElse(index) { throw IndexOutOfBoundsException("ReceiveChannel doesn't contain element at index $index.") }
213 
214 /**
215  * Returns an element at the given [index] or the result of calling the [defaultValue] function if the [index] is out of bounds of this channel.
216  *
217  * The operation is _terminal_.
218  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
219  *
220  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
221  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
222  */
223 @Deprecated(
224     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
225     level = DeprecationLevel.ERROR
226 )
elementAtOrElsenull227 public suspend inline fun <E> ReceiveChannel<E>.elementAtOrElse(index: Int, defaultValue: (Int) -> E): E =
228     consume {
229         if (index < 0)
230             return defaultValue(index)
231         var count = 0
232         for (element in this) {
233             if (index == count++)
234                 return element
235         }
236         return defaultValue(index)
237     }
238 
239 /**
240  * Returns an element at the given [index] or `null` if the [index] is out of bounds of this channel.
241  *
242  * The operation is _terminal_.
243  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
244  *
245  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
246  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
247  */
248 @Deprecated(
249     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
250     level = DeprecationLevel.ERROR
251 )
elementAtOrNullnull252 public suspend fun <E> ReceiveChannel<E>.elementAtOrNull(index: Int): E? =
253     consume {
254         if (index < 0)
255             return null
256         var count = 0
257         for (element in this) {
258             if (index == count++)
259                 return element
260         }
261         return null
262     }
263 
264 /**
265  * Returns the first element matching the given [predicate], or `null` if no such element was found.
266  *
267  * The operation is _terminal_.
268  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
269  *
270  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
271  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
272  */
273 @Deprecated(
274     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
275     level = DeprecationLevel.ERROR
276 )
findnull277 public suspend inline fun <E> ReceiveChannel<E>.find(predicate: (E) -> Boolean): E? =
278     firstOrNull(predicate)
279 
280 /**
281  * Returns the last element matching the given [predicate], or `null` if no such element was found.
282  *
283  * The operation is _terminal_.
284  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
285  *
286  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
287  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
288  */
289 @Deprecated(
290     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
291     level = DeprecationLevel.ERROR
292 )
293 public suspend inline fun <E> ReceiveChannel<E>.findLast(predicate: (E) -> Boolean): E? =
294     lastOrNull(predicate)
295 
296 /**
297  * Returns first element.
298  * @throws [NoSuchElementException] if the channel is empty.
299  *
300  * The operation is _terminal_.
301  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
302  *
303  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
304  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
305  */
306 @Deprecated(
307     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
308     level = DeprecationLevel.ERROR
309 )
310 public suspend fun <E> ReceiveChannel<E>.first(): E =
311     consume {
312         val iterator = iterator()
313         if (!iterator.hasNext())
314             throw NoSuchElementException("ReceiveChannel is empty.")
315         return iterator.next()
316     }
317 
318 /**
319  * Returns the first element matching the given [predicate].
320  * @throws [NoSuchElementException] if no such element is found.
321  *
322  * The operation is _terminal_.
323  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
324  *
325  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
326  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
327  */
328 @Deprecated(
329     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
330     level = DeprecationLevel.ERROR
331 )
firstnull332 public suspend inline fun <E> ReceiveChannel<E>.first(predicate: (E) -> Boolean): E {
333     consumeEach {
334         if (predicate(it)) return it
335     }
336     throw NoSuchElementException("ReceiveChannel contains no element matching the predicate.")
337 }
338 
339 /**
340  * Returns the first element, or `null` if the channel is empty.
341  *
342  * The operation is _terminal_.
343  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
344  *
345  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
346  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
347  */
348 @Deprecated(
349     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
350     level = DeprecationLevel.ERROR
351 )
firstOrNullnull352 public suspend fun <E> ReceiveChannel<E>.firstOrNull(): E? =
353     consume {
354         val iterator = iterator()
355         if (!iterator.hasNext())
356             return null
357         return iterator.next()
358     }
359 
360 /**
361  * Returns the first element matching the given [predicate], or `null` if element was not found.
362  *
363  * The operation is _terminal_.
364  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
365  *
366  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
367  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
368  */
369 @Deprecated(
370     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
371     level = DeprecationLevel.ERROR
372 )
firstOrNullnull373 public suspend inline fun <E> ReceiveChannel<E>.firstOrNull(predicate: (E) -> Boolean): E? {
374     consumeEach {
375         if (predicate(it)) return it
376     }
377     return null
378 }
379 
380 /**
381  * Returns first index of [element], or -1 if the channel does not contain element.
382  *
383  * The operation is _terminal_.
384  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
385  *
386  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
387  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
388  */
389 @Deprecated(
390     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
391     level = DeprecationLevel.ERROR
392 )
indexOfnull393 public suspend fun <E> ReceiveChannel<E>.indexOf(element: E): Int {
394     var index = 0
395     consumeEach {
396         if (element == it)
397             return index
398         index++
399     }
400     return -1
401 }
402 
403 /**
404  * Returns index of the first element matching the given [predicate], or -1 if the channel does not contain such element.
405  *
406  * The operation is _terminal_.
407  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
408  *
409  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
410  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
411  */
412 @Deprecated(
413     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
414     level = DeprecationLevel.ERROR
415 )
indexOfFirstnull416 public suspend inline fun <E> ReceiveChannel<E>.indexOfFirst(predicate: (E) -> Boolean): Int {
417     var index = 0
418     consumeEach {
419         if (predicate(it))
420             return index
421         index++
422     }
423     return -1
424 }
425 
426 /**
427  * Returns index of the last element matching the given [predicate], or -1 if the channel does not contain such element.
428  *
429  * The operation is _terminal_.
430  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
431  *
432  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
433  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
434  */
435 @Deprecated(
436     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
437     level = DeprecationLevel.ERROR
438 )
indexOfLastnull439 public suspend inline fun <E> ReceiveChannel<E>.indexOfLast(predicate: (E) -> Boolean): Int {
440     var lastIndex = -1
441     var index = 0
442     consumeEach {
443         if (predicate(it))
444             lastIndex = index
445         index++
446     }
447     return lastIndex
448 }
449 
450 /**
451  * Returns the last element.
452  * @throws [NoSuchElementException] if the channel is empty.
453  *
454  * The operation is _terminal_.
455  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
456  *
457  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
458  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
459  */
460 @Deprecated(
461     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
462     level = DeprecationLevel.ERROR
463 )
lastnull464 public suspend fun <E> ReceiveChannel<E>.last(): E =
465     consume {
466         val iterator = iterator()
467         if (!iterator.hasNext())
468             throw NoSuchElementException("ReceiveChannel is empty.")
469         var last = iterator.next()
470         while (iterator.hasNext())
471             last = iterator.next()
472         return last
473     }
474 
475 /**
476  * Returns the last element matching the given [predicate].
477  * @throws [NoSuchElementException] if no such element is found.
478  *
479  * The operation is _terminal_.
480  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
481  *
482  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
483  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
484  */
485 @Deprecated(
486     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
487     level = DeprecationLevel.ERROR
488 )
lastnull489 public suspend inline fun <E> ReceiveChannel<E>.last(predicate: (E) -> Boolean): E {
490     var last: E? = null
491     var found = false
492     consumeEach {
493         if (predicate(it)) {
494             last = it
495             found = true
496         }
497     }
498     if (!found) throw NoSuchElementException("ReceiveChannel contains no element matching the predicate.")
499     @Suppress("UNCHECKED_CAST")
500     return last as E
501 }
502 
503 /**
504  * Returns last index of [element], or -1 if the channel does not contain element.
505  *
506  * The operation is _terminal_.
507  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
508  *
509  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
510  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
511  */
512 @Deprecated(
513     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
514     level = DeprecationLevel.ERROR
515 )
lastIndexOfnull516 public suspend fun <E> ReceiveChannel<E>.lastIndexOf(element: E): Int {
517     var lastIndex = -1
518     var index = 0
519     consumeEach {
520         if (element == it)
521             lastIndex = index
522         index++
523     }
524     return lastIndex
525 }
526 
527 /**
528  * Returns the last element, or `null` if the channel is empty.
529  *
530  * The operation is _terminal_.
531  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
532  *
533  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
534  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
535  */
536 @Deprecated(
537     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
538     level = DeprecationLevel.ERROR
539 )
lastOrNullnull540 public suspend fun <E> ReceiveChannel<E>.lastOrNull(): E? =
541     consume {
542         val iterator = iterator()
543         if (!iterator.hasNext())
544             return null
545         var last = iterator.next()
546         while (iterator.hasNext())
547             last = iterator.next()
548         return last
549     }
550 
551 /**
552  * Returns the last element matching the given [predicate], or `null` if no such element was found.
553  *
554  * The operation is _terminal_.
555  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
556  *
557  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
558  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
559  */
560 @Deprecated(
561     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
562     level = DeprecationLevel.ERROR
563 )
lastOrNullnull564 public suspend inline fun <E> ReceiveChannel<E>.lastOrNull(predicate: (E) -> Boolean): E? {
565     var last: E? = null
566     consumeEach {
567         if (predicate(it)) {
568             last = it
569         }
570     }
571     return last
572 }
573 
574 /**
575  * Returns the single element, or throws an exception if the channel is empty or has more than one element.
576  *
577  * The operation is _terminal_.
578  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
579  *
580  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
581  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
582  */
583 @Deprecated(
584     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
585     level = DeprecationLevel.ERROR
586 )
singlenull587 public suspend fun <E> ReceiveChannel<E>.single(): E =
588     consume {
589         val iterator = iterator()
590         if (!iterator.hasNext())
591             throw NoSuchElementException("ReceiveChannel is empty.")
592         val single = iterator.next()
593         if (iterator.hasNext())
594             throw IllegalArgumentException("ReceiveChannel has more than one element.")
595         return single
596     }
597 
598 /**
599  * Returns the single element matching the given [predicate], or throws exception if there is no or more than one matching element.
600  *
601  * The operation is _terminal_.
602  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
603  *
604  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
605  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
606  */
607 @Deprecated(
608     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
609     level = DeprecationLevel.ERROR
610 )
singlenull611 public suspend inline fun <E> ReceiveChannel<E>.single(predicate: (E) -> Boolean): E {
612     var single: E? = null
613     var found = false
614     consumeEach {
615         if (predicate(it)) {
616             if (found) throw IllegalArgumentException("ReceiveChannel contains more than one matching element.")
617             single = it
618             found = true
619         }
620     }
621     if (!found) throw NoSuchElementException("ReceiveChannel contains no element matching the predicate.")
622     @Suppress("UNCHECKED_CAST")
623     return single as E
624 }
625 
626 /**
627  * Returns single element, or `null` if the channel is empty or has more than one element.
628  *
629  * The operation is _terminal_.
630  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
631  *
632  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
633  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
634  */
635 @Deprecated(
636     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
637     level = DeprecationLevel.ERROR
638 )
singleOrNullnull639 public suspend fun <E> ReceiveChannel<E>.singleOrNull(): E? =
640     consume {
641         val iterator = iterator()
642         if (!iterator.hasNext())
643             return null
644         val single = iterator.next()
645         if (iterator.hasNext())
646             return null
647         return single
648     }
649 
650 /**
651  * Returns the single element matching the given [predicate], or `null` if element was not found or more than one element was found.
652  *
653  * The operation is _terminal_.
654  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
655  *
656  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
657  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
658  */
659 @Deprecated(
660     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
661     level = DeprecationLevel.ERROR
662 )
singleOrNullnull663 public suspend inline fun <E> ReceiveChannel<E>.singleOrNull(predicate: (E) -> Boolean): E? {
664     var single: E? = null
665     var found = false
666     consumeEach {
667         if (predicate(it)) {
668             if (found) return null
669             single = it
670             found = true
671         }
672     }
673     if (!found) return null
674     return single
675 }
676 
677 /**
678  * Returns a channel containing all elements except first [n] elements.
679  *
680  * The operation is _intermediate_ and _stateless_.
681  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
682  *
683  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
684  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
685  */
686 @Deprecated(
687     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
688     level = DeprecationLevel.ERROR
689 )
dropnull690 public fun <E> ReceiveChannel<E>.drop(n: Int, context: CoroutineContext = Dispatchers.Unconfined): ReceiveChannel<E> =
691     GlobalScope.produce(context, onCompletion = consumes()) {
692         require(n >= 0) { "Requested element count $n is less than zero." }
693         var remaining: Int = n
694         if (remaining > 0)
695             for (e in this@drop) {
696                 remaining--
697                 if (remaining == 0)
698                     break
699             }
700         for (e in this@drop) {
701             send(e)
702         }
703     }
704 
705 /**
706  * Returns a channel containing all elements except first elements that satisfy the given [predicate].
707  *
708  * The operation is _intermediate_ and _stateless_.
709  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
710  *
711  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
712  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
713  */
714 @Deprecated(
715     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
716     level = DeprecationLevel.ERROR
717 )
dropWhilenull718 public fun <E> ReceiveChannel<E>.dropWhile(context: CoroutineContext = Dispatchers.Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel<E> =
719     GlobalScope.produce(context, onCompletion = consumes()) {
720         for (e in this@dropWhile) {
721             if (!predicate(e)) {
722                 send(e)
723                 break
724             }
725         }
726         for (e in this@dropWhile) {
727             send(e)
728         }
729     }
730 
731 /**
732  * Returns a channel containing only elements matching the given [predicate].
733  *
734  * The operation is _intermediate_ and _stateless_.
735  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
736  *
737  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
738  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
739  */
740 @Deprecated(
741     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
742     level = DeprecationLevel.ERROR
743 )
filternull744 public fun <E> ReceiveChannel<E>.filter(context: CoroutineContext = Dispatchers.Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel<E> =
745     GlobalScope.produce(context, onCompletion = consumes()) {
746         for (e in this@filter) {
747             if (predicate(e)) send(e)
748         }
749     }
750 
751 /**
752  * Returns a channel containing only elements matching the given [predicate].
753  * @param [predicate] function that takes the index of an element and the element itself
754  * and returns the result of predicate evaluation on the element.
755  *
756  * The operation is _intermediate_ and _stateless_.
757  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
758  *
759  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
760  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
761  */
762 @Deprecated(
763     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
764     level = DeprecationLevel.ERROR
765 )
filterIndexednull766 public fun <E> ReceiveChannel<E>.filterIndexed(context: CoroutineContext = Dispatchers.Unconfined, predicate: suspend (index: Int, E) -> Boolean): ReceiveChannel<E> =
767     GlobalScope.produce(context, onCompletion = consumes()) {
768         var index = 0
769         for (e in this@filterIndexed) {
770             if (predicate(index++, e)) send(e)
771         }
772     }
773 
774 /**
775  * Appends all elements matching the given [predicate] to the given [destination].
776  * @param [predicate] function that takes the index of an element and the element itself
777  * and returns the result of predicate evaluation on the element.
778  *
779  * The operation is _terminal_.
780  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
781  *
782  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
783  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
784  */
785 @Deprecated(
786     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
787     level = DeprecationLevel.ERROR
788 )
filterIndexedTonull789 public suspend inline fun <E, C : MutableCollection<in E>> ReceiveChannel<E>.filterIndexedTo(destination: C, predicate: (index: Int, E) -> Boolean): C {
790     consumeEachIndexed { (index, element) ->
791         if (predicate(index, element)) destination.add(element)
792     }
793     return destination
794 }
795 
796 /**
797  * Appends all elements matching the given [predicate] to the given [destination].
798  * @param [predicate] function that takes the index of an element and the element itself
799  * and returns the result of predicate evaluation on the element.
800  *
801  * The operation is _terminal_.
802  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
803  *
804  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
805  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
806  */
807 @Deprecated(
808     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
809     level = DeprecationLevel.ERROR
810 )
filterIndexedTonull811 public suspend inline fun <E, C : SendChannel<E>> ReceiveChannel<E>.filterIndexedTo(destination: C, predicate: (index: Int, E) -> Boolean): C {
812     consumeEachIndexed { (index, element) ->
813         if (predicate(index, element)) destination.send(element)
814     }
815     return destination
816 }
817 
818 /**
819  * Returns a channel containing all elements not matching the given [predicate].
820  *
821  * The operation is _intermediate_ and _stateless_.
822  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
823  *
824  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
825  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
826  */
827 @Deprecated(
828     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
829     level = DeprecationLevel.ERROR
830 )
filterNotnull831 public fun <E> ReceiveChannel<E>.filterNot(context: CoroutineContext = Dispatchers.Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel<E> =
832     filter(context) { !predicate(it) }
833 
834 /**
835  * Returns a channel containing all elements that are not `null`.
836  *
837  * The operation is _intermediate_ and _stateless_.
838  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
839  *
840  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
841  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
842  */
843 @Deprecated(
844     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
845     level = DeprecationLevel.ERROR
846 )
847 @Suppress("UNCHECKED_CAST")
filterNotNullnull848 public fun <E : Any> ReceiveChannel<E?>.filterNotNull(): ReceiveChannel<E> =
849     filter { it != null } as ReceiveChannel<E>
850 
851 /**
852  * Appends all elements that are not `null` to the given [destination].
853  *
854  * The operation is _terminal_.
855  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
856  *
857  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
858  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
859  */
860 @Deprecated(
861     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
862     level = DeprecationLevel.ERROR
863 )
filterNotNullTonull864 public suspend fun <E : Any, C : MutableCollection<in E>> ReceiveChannel<E?>.filterNotNullTo(destination: C): C {
865     consumeEach {
866         if (it != null) destination.add(it)
867     }
868     return destination
869 }
870 
871 /**
872  * Appends all elements that are not `null` to the given [destination].
873  *
874  * The operation is _terminal_.
875  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
876  *
877  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
878  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
879  */
880 @Deprecated(
881     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
882     level = DeprecationLevel.ERROR
883 )
filterNotNullTonull884 public suspend fun <E : Any, C : SendChannel<E>> ReceiveChannel<E?>.filterNotNullTo(destination: C): C {
885     consumeEach {
886         if (it != null) destination.send(it)
887     }
888     return destination
889 }
890 
891 /**
892  * Appends all elements not matching the given [predicate] to the given [destination].
893  *
894  * The operation is _terminal_.
895  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
896  *
897  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
898  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
899  */
900 @Deprecated(
901     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
902     level = DeprecationLevel.ERROR
903 )
filterNotTonull904 public suspend inline fun <E, C : MutableCollection<in E>> ReceiveChannel<E>.filterNotTo(destination: C, predicate: (E) -> Boolean): C {
905     consumeEach {
906         if (!predicate(it)) destination.add(it)
907     }
908     return destination
909 }
910 
911 /**
912  * Appends all elements not matching the given [predicate] to the given [destination].
913  *
914  * The operation is _terminal_.
915  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
916  *
917  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
918  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
919  */
920 @Deprecated(
921     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
922     level = DeprecationLevel.ERROR
923 )
filterNotTonull924 public suspend inline fun <E, C : SendChannel<E>> ReceiveChannel<E>.filterNotTo(destination: C, predicate: (E) -> Boolean): C {
925     consumeEach {
926         if (!predicate(it)) destination.send(it)
927     }
928     return destination
929 }
930 
931 /**
932  * Appends all elements matching the given [predicate] to the given [destination].
933  *
934  * The operation is _terminal_.
935  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
936  *
937  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
938  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
939  */
940 @Deprecated(
941     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
942     level = DeprecationLevel.ERROR
943 )
filterTonull944 public suspend inline fun <E, C : MutableCollection<in E>> ReceiveChannel<E>.filterTo(destination: C, predicate: (E) -> Boolean): C {
945     consumeEach {
946         if (predicate(it)) destination.add(it)
947     }
948     return destination
949 }
950 
951 /**
952  * Appends all elements matching the given [predicate] to the given [destination].
953  *
954  * The operation is _terminal_.
955  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
956  *
957  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
958  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
959  */
960 @Deprecated(
961     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
962     level = DeprecationLevel.ERROR
963 )
filterTonull964 public suspend inline fun <E, C : SendChannel<E>> ReceiveChannel<E>.filterTo(destination: C, predicate: (E) -> Boolean): C {
965     consumeEach {
966         if (predicate(it)) destination.send(it)
967     }
968     return destination
969 }
970 
971 /**
972  * Returns a channel containing first [n] elements.
973  *
974  * The operation is _intermediate_ and _stateless_.
975  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
976  *
977  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
978  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
979  */
980 @Deprecated(
981     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
982     level = DeprecationLevel.ERROR
983 )
takenull984 public fun <E> ReceiveChannel<E>.take(n: Int, context: CoroutineContext = Dispatchers.Unconfined): ReceiveChannel<E> =
985     GlobalScope.produce(context, onCompletion = consumes()) {
986         if (n == 0) return@produce
987         require(n >= 0) { "Requested element count $n is less than zero." }
988         var remaining: Int = n
989         for (e in this@take) {
990             send(e)
991             remaining--
992             if (remaining == 0)
993                 return@produce
994         }
995     }
996 
997 /**
998  * Returns a channel containing first elements satisfying the given [predicate].
999  *
1000  * The operation is _intermediate_ and _stateless_.
1001  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1002  *
1003  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1004  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1005  */
1006 @Deprecated(
1007     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1008     level = DeprecationLevel.ERROR
1009 )
takeWhilenull1010 public fun <E> ReceiveChannel<E>.takeWhile(context: CoroutineContext = Dispatchers.Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel<E> =
1011     GlobalScope.produce(context, onCompletion = consumes()) {
1012         for (e in this@takeWhile) {
1013             if (!predicate(e)) return@produce
1014             send(e)
1015         }
1016     }
1017 
1018 /**
1019  * Returns a [Map] containing key-value pairs provided by [transform] function
1020  * applied to elements of the given channel.
1021  *
1022  * If any of two pairs would have the same key the last one gets added to the map.
1023  *
1024  * The returned map preserves the entry iteration order of the original channel.
1025  *
1026  * The operation is _terminal_.
1027  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1028  *
1029  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1030  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1031  */
1032 @Deprecated(
1033     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1034     level = DeprecationLevel.ERROR
1035 )
associatenull1036 public suspend inline fun <E, K, V> ReceiveChannel<E>.associate(transform: (E) -> Pair<K, V>): Map<K, V> =
1037     associateTo(LinkedHashMap(), transform)
1038 
1039 /**
1040  * Returns a [Map] containing the elements from the given channel indexed by the key
1041  * returned from [keySelector] function applied to each element.
1042  *
1043  * If any two elements would have the same key returned by [keySelector] the last one gets added to the map.
1044  *
1045  * The returned map preserves the entry iteration order of the original channel.
1046  *
1047  * The operation is _terminal_.
1048  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1049  *
1050  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1051  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1052  */
1053 @Deprecated(
1054     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1055     level = DeprecationLevel.ERROR
1056 )
1057 public suspend inline fun <E, K> ReceiveChannel<E>.associateBy(keySelector: (E) -> K): Map<K, E> =
1058     associateByTo(LinkedHashMap(), keySelector)
1059 
1060 /**
1061  * Returns a [Map] containing the values provided by [valueTransform] and indexed by [keySelector] functions applied to elements of the given channel.
1062  *
1063  * If any two elements would have the same key returned by [keySelector] the last one gets added to the map.
1064  *
1065  * The returned map preserves the entry iteration order of the original channel.
1066  *
1067  * The operation is _terminal_.
1068  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1069  *
1070  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1071  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1072  */
1073 @Deprecated(
1074     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1075     level = DeprecationLevel.ERROR
1076 )
1077 public suspend inline fun <E, K, V> ReceiveChannel<E>.associateBy(keySelector: (E) -> K, valueTransform: (E) -> V): Map<K, V> =
1078     associateByTo(LinkedHashMap(), keySelector, valueTransform)
1079 
1080 /**
1081  * Populates and returns the [destination] mutable map with key-value pairs,
1082  * where key is provided by the [keySelector] function applied to each element of the given channel
1083  * and value is the element itself.
1084  *
1085  * If any two elements would have the same key returned by [keySelector] the last one gets added to the map.
1086  *
1087  * The operation is _terminal_.
1088  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1089  *
1090  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1091  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1092  */
1093 @Deprecated(
1094     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1095     level = DeprecationLevel.ERROR
1096 )
1097 public suspend inline fun <E, K, M : MutableMap<in K, in E>> ReceiveChannel<E>.associateByTo(destination: M, keySelector: (E) -> K): M {
1098     consumeEach {
1099         destination.put(keySelector(it), it)
1100     }
1101     return destination
1102 }
1103 
1104 /**
1105  * Populates and returns the [destination] mutable map with key-value pairs,
1106  * where key is provided by the [keySelector] function and
1107  * and value is provided by the [valueTransform] function applied to elements of the given channel.
1108  *
1109  * If any two elements would have the same key returned by [keySelector] the last one gets added to the map.
1110  *
1111  * The operation is _terminal_.
1112  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1113  *
1114  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1115  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1116  */
1117 @Deprecated(
1118     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1119     level = DeprecationLevel.ERROR
1120 )
associateByTonull1121 public suspend inline fun <E, K, V, M : MutableMap<in K, in V>> ReceiveChannel<E>.associateByTo(destination: M, keySelector: (E) -> K, valueTransform: (E) -> V): M {
1122     consumeEach {
1123         destination.put(keySelector(it), valueTransform(it))
1124     }
1125     return destination
1126 }
1127 
1128 /**
1129  * Populates and returns the [destination] mutable map with key-value pairs
1130  * provided by [transform] function applied to each element of the given channel.
1131  *
1132  * If any of two pairs would have the same key the last one gets added to the map.
1133  *
1134  * The operation is _terminal_.
1135  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1136  *
1137  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1138  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1139  */
1140 @Deprecated(
1141     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1142     level = DeprecationLevel.ERROR
1143 )
associateTonull1144 public suspend inline fun <E, K, V, M : MutableMap<in K, in V>> ReceiveChannel<E>.associateTo(destination: M, transform: (E) -> Pair<K, V>): M {
1145     consumeEach {
1146         destination += transform(it)
1147     }
1148     return destination
1149 }
1150 
1151 /**
1152  * Send each element of the original channel
1153  * and appends the results to the given [destination].
1154  *
1155  * The operation is _terminal_.
1156  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1157  *
1158  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1159  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1160  */
1161 @Deprecated(
1162     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1163     level = DeprecationLevel.ERROR
1164 )
toChannelnull1165 public suspend fun <E, C : SendChannel<E>> ReceiveChannel<E>.toChannel(destination: C): C {
1166     consumeEach {
1167         destination.send(it)
1168     }
1169     return destination
1170 }
1171 
1172 /**
1173  * Appends all elements to the given [destination] collection.
1174  *
1175  * The operation is _terminal_.
1176  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1177  *
1178  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1179  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1180  */
1181 @Deprecated(
1182     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1183     level = DeprecationLevel.ERROR
1184 )
toCollectionnull1185 public suspend fun <E, C : MutableCollection<in E>> ReceiveChannel<E>.toCollection(destination: C): C {
1186     consumeEach {
1187         destination.add(it)
1188     }
1189     return destination
1190 }
1191 
1192 /**
1193  * Returns a [List] containing all elements.
1194  *
1195  * The operation is _terminal_.
1196  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1197  */
toListnull1198 public suspend fun <E> ReceiveChannel<E>.toList(): List<E> =
1199     this.toMutableList()
1200 
1201 /**
1202  * Returns a [Map] filled with all elements of this channel.
1203  *
1204  * The operation is _terminal_.
1205  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1206  *
1207  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1208  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1209  */
1210 @Deprecated(
1211     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1212     level = DeprecationLevel.ERROR
1213 )
1214 public suspend fun <K, V> ReceiveChannel<Pair<K, V>>.toMap(): Map<K, V> =
1215     toMap(LinkedHashMap())
1216 
1217 /**
1218  * Returns a [MutableMap] filled with all elements of this channel.
1219  *
1220  * The operation is _terminal_.
1221  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1222  *
1223  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1224  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1225  */
1226 @Deprecated(
1227     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1228     level = DeprecationLevel.ERROR
1229 )
1230 public suspend fun <K, V, M : MutableMap<in K, in V>> ReceiveChannel<Pair<K, V>>.toMap(destination: M): M {
1231     consumeEach {
1232         destination += it
1233     }
1234     return destination
1235 }
1236 
1237 /**
1238  * Returns a [MutableList] filled with all elements of this channel.
1239  *
1240  * The operation is _terminal_.
1241  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1242  *
1243  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1244  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1245  */
1246 @Deprecated(
1247     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1248     level = DeprecationLevel.ERROR
1249 )
toMutableListnull1250 public suspend fun <E> ReceiveChannel<E>.toMutableList(): MutableList<E> =
1251     toCollection(ArrayList())
1252 
1253 /**
1254  * Returns a [Set] of all elements.
1255  *
1256  * The returned set preserves the element iteration order of the original channel.
1257  *
1258  * The operation is _terminal_.
1259  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1260  *
1261  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1262  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1263  */
1264 @Deprecated(
1265     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1266     level = DeprecationLevel.ERROR
1267 )
1268 public suspend fun <E> ReceiveChannel<E>.toSet(): Set<E> =
1269     this.toMutableSet()
1270 
1271 /**
1272  * Returns a single channel of all elements from results of [transform] function being invoked on each element of original channel.
1273  *
1274  * The operation is _intermediate_ and _stateless_.
1275  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1276  *
1277  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1278  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1279  */
1280 @Deprecated(
1281     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1282     level = DeprecationLevel.ERROR
1283 )
1284 public fun <E, R> ReceiveChannel<E>.flatMap(context: CoroutineContext = Dispatchers.Unconfined, transform: suspend (E) -> ReceiveChannel<R>): ReceiveChannel<R> =
1285     GlobalScope.produce(context, onCompletion = consumes()) {
1286         for (e in this@flatMap) {
1287             transform(e).toChannel(this)
1288         }
1289     }
1290 
1291 /**
1292  * Groups elements of the original channel by the key returned by the given [keySelector] function
1293  * applied to each element and returns a map where each group key is associated with a list of corresponding elements.
1294  *
1295  * The returned map preserves the entry iteration order of the keys produced from the original channel.
1296  *
1297  * The operation is _terminal_.
1298  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1299  *
1300  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1301  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1302  */
1303 @Deprecated(
1304     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1305     level = DeprecationLevel.ERROR
1306 )
groupBynull1307 public suspend inline fun <E, K> ReceiveChannel<E>.groupBy(keySelector: (E) -> K): Map<K, List<E>> =
1308     groupByTo(LinkedHashMap(), keySelector)
1309 
1310 /**
1311  * Groups values returned by the [valueTransform] function applied to each element of the original channel
1312  * by the key returned by the given [keySelector] function applied to the element
1313  * and returns a map where each group key is associated with a list of corresponding values.
1314  *
1315  * The returned map preserves the entry iteration order of the keys produced from the original channel.
1316  *
1317  * The operation is _terminal_.
1318  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1319  *
1320  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1321  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1322  */
1323 @Deprecated(
1324     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1325     level = DeprecationLevel.ERROR
1326 )
1327 public suspend inline fun <E, K, V> ReceiveChannel<E>.groupBy(keySelector: (E) -> K, valueTransform: (E) -> V): Map<K, List<V>> =
1328     groupByTo(LinkedHashMap(), keySelector, valueTransform)
1329 
1330 /**
1331  * Groups elements of the original channel by the key returned by the given [keySelector] function
1332  * applied to each element and puts to the [destination] map each group key associated with a list of corresponding elements.
1333  *
1334  * @return The [destination] map.
1335  *
1336  * The operation is _terminal_.
1337  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1338  *
1339  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1340  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1341  */
1342 @Deprecated(
1343     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1344     level = DeprecationLevel.ERROR
1345 )
1346 public suspend inline fun <E, K, M : MutableMap<in K, MutableList<E>>> ReceiveChannel<E>.groupByTo(destination: M, keySelector: (E) -> K): M {
1347     consumeEach {
1348         val key = keySelector(it)
1349         val list = destination.getOrPut(key) { ArrayList() }
1350         list.add(it)
1351     }
1352     return destination
1353 }
1354 
1355 /**
1356  * Groups values returned by the [valueTransform] function applied to each element of the original channel
1357  * by the key returned by the given [keySelector] function applied to the element
1358  * and puts to the [destination] map each group key associated with a list of corresponding values.
1359  *
1360  * @return The [destination] map.
1361  *
1362  * The operation is _terminal_.
1363  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1364  *
1365  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1366  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1367  */
1368 @Deprecated(
1369     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1370     level = DeprecationLevel.ERROR
1371 )
groupByTonull1372 public suspend inline fun <E, K, V, M : MutableMap<in K, MutableList<V>>> ReceiveChannel<E>.groupByTo(destination: M, keySelector: (E) -> K, valueTransform: (E) -> V): M {
1373     consumeEach {
1374         val key = keySelector(it)
1375         val list = destination.getOrPut(key) { ArrayList() }
1376         list.add(valueTransform(it))
1377     }
1378     return destination
1379 }
1380 
1381 /**
1382  * Returns a channel containing the results of applying the given [transform] function
1383  * to each element in the original channel.
1384  *
1385  * The operation is _intermediate_ and _stateless_.
1386  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1387  */
1388 @Deprecated(
1389     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1390     level = DeprecationLevel.ERROR
1391 )
mapnull1392 public fun <E, R> ReceiveChannel<E>.map(context: CoroutineContext = Dispatchers.Unconfined, transform: suspend (E) -> R): ReceiveChannel<R> =
1393     GlobalScope.produce(context, onCompletion = consumes()) {
1394         consumeEach {
1395             send(transform(it))
1396         }
1397     }
1398 
1399 /**
1400  * Returns a channel containing the results of applying the given [transform] function
1401  * to each element and its index in the original channel.
1402  * @param [transform] function that takes the index of an element and the element itself
1403  * and returns the result of the transform applied to the element.
1404  *
1405  * The operation is _intermediate_ and _stateless_.
1406  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1407  *
1408  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1409  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1410  */
1411 @Deprecated(
1412     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1413     level = DeprecationLevel.ERROR
1414 )
mapIndexednull1415 public fun <E, R> ReceiveChannel<E>.mapIndexed(context: CoroutineContext = Dispatchers.Unconfined, transform: suspend (index: Int, E) -> R): ReceiveChannel<R> =
1416     GlobalScope.produce(context, onCompletion = consumes()) {
1417         var index = 0
1418         for (e in this@mapIndexed) {
1419             send(transform(index++, e))
1420         }
1421     }
1422 
1423 /**
1424  * Returns a channel containing only the non-null results of applying the given [transform] function
1425  * to each element and its index in the original channel.
1426  * @param [transform] function that takes the index of an element and the element itself
1427  * and returns the result of the transform applied to the element.
1428  *
1429  * The operation is _intermediate_ and _stateless_.
1430  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1431  *
1432  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1433  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1434  */
1435 @Deprecated(
1436     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1437     level = DeprecationLevel.ERROR
1438 )
mapIndexedNotNullnull1439 public fun <E, R : Any> ReceiveChannel<E>.mapIndexedNotNull(context: CoroutineContext = Dispatchers.Unconfined, transform: suspend (index: Int, E) -> R?): ReceiveChannel<R> =
1440     mapIndexed(context, transform).filterNotNull()
1441 
1442 /**
1443  * Applies the given [transform] function to each element and its index in the original channel
1444  * and appends only the non-null results to the given [destination].
1445  * @param [transform] function that takes the index of an element and the element itself
1446  * and returns the result of the transform applied to the element.
1447  *
1448  * The operation is _terminal_.
1449  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1450  *
1451  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1452  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1453  */
1454 @Deprecated(
1455     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1456     level = DeprecationLevel.ERROR
1457 )
1458 public suspend inline fun <E, R : Any, C : MutableCollection<in R>> ReceiveChannel<E>.mapIndexedNotNullTo(destination: C, transform: (index: Int, E) -> R?): C {
1459     consumeEachIndexed { (index, element) ->
1460         transform(index, element)?.let { destination.add(it) }
1461     }
1462     return destination
1463 }
1464 
1465 /**
1466  * Applies the given [transform] function to each element and its index in the original channel
1467  * and appends only the non-null results to the given [destination].
1468  * @param [transform] function that takes the index of an element and the element itself
1469  * and returns the result of the transform applied to the element.
1470  *
1471  * The operation is _terminal_.
1472  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1473  *
1474  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1475  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1476  */
1477 @Deprecated(
1478     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1479     level = DeprecationLevel.ERROR
1480 )
mapIndexedNotNullTonull1481 public suspend inline fun <E, R : Any, C : SendChannel<R>> ReceiveChannel<E>.mapIndexedNotNullTo(destination: C, transform: (index: Int, E) -> R?): C {
1482     consumeEachIndexed { (index, element) ->
1483         transform(index, element)?.let { destination.send(it) }
1484     }
1485     return destination
1486 }
1487 
1488 /**
1489  * Applies the given [transform] function to each element and its index in the original channel
1490  * and appends the results to the given [destination].
1491  * @param [transform] function that takes the index of an element and the element itself
1492  * and returns the result of the transform applied to the element.
1493  *
1494  * The operation is _terminal_.
1495  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1496  *
1497  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1498  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1499  */
1500 @Deprecated(
1501     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1502     level = DeprecationLevel.ERROR
1503 )
mapIndexedTonull1504 public suspend inline fun <E, R, C : MutableCollection<in R>> ReceiveChannel<E>.mapIndexedTo(destination: C, transform: (index: Int, E) -> R): C {
1505     var index = 0
1506     consumeEach {
1507         destination.add(transform(index++, it))
1508     }
1509     return destination
1510 }
1511 
1512 /**
1513  * Applies the given [transform] function to each element and its index in the original channel
1514  * and appends the results to the given [destination].
1515  * @param [transform] function that takes the index of an element and the element itself
1516  * and returns the result of the transform applied to the element.
1517  *
1518  * The operation is _terminal_.
1519  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1520  *
1521  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1522  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1523  */
1524 @Deprecated(
1525     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1526     level = DeprecationLevel.ERROR
1527 )
mapIndexedTonull1528 public suspend inline fun <E, R, C : SendChannel<R>> ReceiveChannel<E>.mapIndexedTo(destination: C, transform: (index: Int, E) -> R): C {
1529     var index = 0
1530     consumeEach {
1531         destination.send(transform(index++, it))
1532     }
1533     return destination
1534 }
1535 
1536 /**
1537  * Returns a channel containing only the non-null results of applying the given [transform] function
1538  * to each element in the original channel.
1539  *
1540  * The operation is _intermediate_ and _stateless_.
1541  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1542  *
1543  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1544  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1545  */
1546 @Deprecated(
1547     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1548     level = DeprecationLevel.ERROR
1549 )
mapNotNullnull1550 public fun <E, R : Any> ReceiveChannel<E>.mapNotNull(context: CoroutineContext = Dispatchers.Unconfined, transform: suspend (E) -> R?): ReceiveChannel<R> =
1551     map(context, transform).filterNotNull()
1552 
1553 /**
1554  * Applies the given [transform] function to each element in the original channel
1555  * and appends only the non-null results to the given [destination].
1556  *
1557  * The operation is _terminal_.
1558  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1559  *
1560  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1561  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1562  */
1563 @Deprecated(
1564     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1565     level = DeprecationLevel.ERROR
1566 )
1567 public suspend inline fun <E, R : Any, C : MutableCollection<in R>> ReceiveChannel<E>.mapNotNullTo(destination: C, transform: (E) -> R?): C {
1568     consumeEach {
1569         transform(it)?.let { destination.add(it) }
1570     }
1571     return destination
1572 }
1573 
1574 /**
1575  * Applies the given [transform] function to each element in the original channel
1576  * and appends only the non-null results to the given [destination].
1577  *
1578  * The operation is _terminal_.
1579  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1580  *
1581  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1582  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1583  */
1584 @Deprecated(
1585     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1586     level = DeprecationLevel.ERROR
1587 )
mapNotNullTonull1588 public suspend inline fun <E, R : Any, C : SendChannel<R>> ReceiveChannel<E>.mapNotNullTo(destination: C, transform: (E) -> R?): C {
1589     consumeEach {
1590         transform(it)?.let { destination.send(it) }
1591     }
1592     return destination
1593 }
1594 
1595 /**
1596  * Applies the given [transform] function to each element of the original channel
1597  * and appends the results to the given [destination].
1598  *
1599  * The operation is _terminal_.
1600  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1601  *
1602  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1603  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1604  */
1605 @Deprecated(
1606     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1607     level = DeprecationLevel.ERROR
1608 )
mapTonull1609 public suspend inline fun <E, R, C : MutableCollection<in R>> ReceiveChannel<E>.mapTo(destination: C, transform: (E) -> R): C {
1610     consumeEach {
1611         destination.add(transform(it))
1612     }
1613     return destination
1614 }
1615 
1616 /**
1617  * Applies the given [transform] function to each element of the original channel
1618  * and appends the results to the given [destination].
1619  *
1620  * The operation is _terminal_.
1621  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1622  *
1623  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1624  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1625  */
1626 @Deprecated(
1627     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1628     level = DeprecationLevel.ERROR
1629 )
mapTonull1630 public suspend inline fun <E, R, C : SendChannel<R>> ReceiveChannel<E>.mapTo(destination: C, transform: (E) -> R): C {
1631     consumeEach {
1632         destination.send(transform(it))
1633     }
1634     return destination
1635 }
1636 
1637 /**
1638  * Returns a channel of [IndexedValue] for each element of the original channel.
1639  *
1640  * The operation is _intermediate_ and _stateless_.
1641  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1642  *
1643  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1644  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1645  */
1646 @Deprecated(
1647     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1648     level = DeprecationLevel.ERROR
1649 )
withIndexnull1650 public fun <E> ReceiveChannel<E>.withIndex(context: CoroutineContext = Dispatchers.Unconfined): ReceiveChannel<IndexedValue<E>> =
1651     GlobalScope.produce(context, onCompletion = consumes()) {
1652         var index = 0
1653         for (e in this@withIndex) {
1654             send(IndexedValue(index++, e))
1655         }
1656     }
1657 
1658 /**
1659  * Returns a channel containing only distinct elements from the given channel.
1660  *
1661  * The elements in the resulting channel are in the same order as they were in the source channel.
1662  *
1663  * The operation is _intermediate_ and _stateful_.
1664  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1665  *
1666  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1667  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1668  */
1669 @Deprecated(
1670     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1671     level = DeprecationLevel.ERROR
1672 )
distinctnull1673 public fun <E> ReceiveChannel<E>.distinct(): ReceiveChannel<E> =
1674     this.distinctBy { it }
1675 
1676 /**
1677  * Returns a channel containing only elements from the given channel
1678  * having distinct keys returned by the given [selector] function.
1679  *
1680  * The elements in the resulting channel are in the same order as they were in the source channel.
1681  *
1682  * The operation is _intermediate_ and _stateful_.
1683  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1684  *
1685  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1686  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1687  */
1688 @Deprecated(
1689     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1690     level = DeprecationLevel.ERROR
1691 )
distinctBynull1692 public fun <E, K> ReceiveChannel<E>.distinctBy(context: CoroutineContext = Dispatchers.Unconfined, selector: suspend (E) -> K): ReceiveChannel<E> =
1693     GlobalScope.produce(context, onCompletion = consumes()) {
1694         val keys = HashSet<K>()
1695         for (e in this@distinctBy) {
1696             val k = selector(e)
1697             if (k !in keys) {
1698                 send(e)
1699                 keys += k
1700             }
1701         }
1702     }
1703 
1704 /**
1705  * Returns a mutable set containing all distinct elements from the given channel.
1706  *
1707  * The returned set preserves the element iteration order of the original channel.
1708  *
1709  * The operation is _terminal_.
1710  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1711  *
1712  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1713  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1714  */
1715 @Deprecated(
1716     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1717     level = DeprecationLevel.ERROR
1718 )
toMutableSetnull1719 public suspend fun <E> ReceiveChannel<E>.toMutableSet(): MutableSet<E> =
1720     toCollection(LinkedHashSet())
1721 
1722 /**
1723  * Returns `true` if all elements match the given [predicate].
1724  *
1725  * The operation is _terminal_.
1726  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1727  *
1728  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1729  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1730  */
1731 @Deprecated(
1732     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1733     level = DeprecationLevel.ERROR
1734 )
1735 public suspend inline fun <E> ReceiveChannel<E>.all(predicate: (E) -> Boolean): Boolean {
1736     consumeEach {
1737         if (!predicate(it)) return false
1738     }
1739     return true
1740 }
1741 
1742 /**
1743  * Returns `true` if channel has at least one element.
1744  *
1745  * The operation is _terminal_.
1746  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1747  *
1748  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1749  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1750  */
1751 @Deprecated(
1752     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1753     level = DeprecationLevel.ERROR
1754 )
anynull1755 public suspend fun <E> ReceiveChannel<E>.any(): Boolean =
1756     consume {
1757         return iterator().hasNext()
1758     }
1759 
1760 /**
1761  * Returns `true` if at least one element matches the given [predicate].
1762  *
1763  * The operation is _terminal_.
1764  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1765  *
1766  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1767  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1768  */
1769 @Deprecated(
1770     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1771     level = DeprecationLevel.ERROR
1772 )
anynull1773 public suspend inline fun <E> ReceiveChannel<E>.any(predicate: (E) -> Boolean): Boolean {
1774     consumeEach {
1775         if (predicate(it)) return true
1776     }
1777     return false
1778 }
1779 
1780 /**
1781  * Returns the number of elements in this channel.
1782  *
1783  * The operation is _terminal_.
1784  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1785  *
1786  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1787  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1788  */
1789 @Deprecated(
1790     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1791     level = DeprecationLevel.ERROR
1792 )
countnull1793 public suspend fun <E> ReceiveChannel<E>.count(): Int {
1794     var count = 0
1795     consumeEach { count++ }
1796     return count
1797 }
1798 
1799 /**
1800  * Returns the number of elements matching the given [predicate].
1801  *
1802  * The operation is _terminal_.
1803  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1804  *
1805  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1806  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1807  */
1808 @Deprecated(
1809     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1810     level = DeprecationLevel.ERROR
1811 )
countnull1812 public suspend inline fun <E> ReceiveChannel<E>.count(predicate: (E) -> Boolean): Int {
1813     var count = 0
1814     consumeEach {
1815         if (predicate(it)) count++
1816     }
1817     return count
1818 }
1819 
1820 /**
1821  * Accumulates value starting with [initial] value and applying [operation] from left to right to current accumulator value and each element.
1822  *
1823  * The operation is _terminal_.
1824  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1825  *
1826  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1827  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1828  */
1829 @Deprecated(
1830     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1831     level = DeprecationLevel.ERROR
1832 )
foldnull1833 public suspend inline fun <E, R> ReceiveChannel<E>.fold(initial: R, operation: (acc: R, E) -> R): R {
1834     var accumulator = initial
1835     consumeEach {
1836         accumulator = operation(accumulator, it)
1837     }
1838     return accumulator
1839 }
1840 
1841 /**
1842  * Accumulates value starting with [initial] value and applying [operation] from left to right
1843  * to current accumulator value and each element with its index in the original channel.
1844  * @param [operation] function that takes the index of an element, current accumulator value
1845  * and the element itself, and calculates the next accumulator value.
1846  *
1847  * The operation is _terminal_.
1848  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1849  *
1850  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1851  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1852  */
1853 @Deprecated(
1854     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1855     level = DeprecationLevel.ERROR
1856 )
foldIndexednull1857 public suspend inline fun <E, R> ReceiveChannel<E>.foldIndexed(initial: R, operation: (index: Int, acc: R, E) -> R): R {
1858     var index = 0
1859     var accumulator = initial
1860     consumeEach {
1861         accumulator = operation(index++, accumulator, it)
1862     }
1863     return accumulator
1864 }
1865 
1866 /**
1867  * Returns the first element yielding the largest value of the given function or `null` if there are no elements.
1868  *
1869  * The operation is _terminal_.
1870  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1871  *
1872  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1873  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1874  */
1875 @Deprecated(
1876     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1877     level = DeprecationLevel.ERROR
1878 )
maxBynull1879 public suspend inline fun <E, R : Comparable<R>> ReceiveChannel<E>.maxBy(selector: (E) -> R): E? =
1880     consume {
1881         val iterator = iterator()
1882         if (!iterator.hasNext()) return null
1883         var maxElem = iterator.next()
1884         var maxValue = selector(maxElem)
1885         while (iterator.hasNext()) {
1886             val e = iterator.next()
1887             val v = selector(e)
1888             if (maxValue < v) {
1889                 maxElem = e
1890                 maxValue = v
1891             }
1892         }
1893         return maxElem
1894     }
1895 
1896 /**
1897  * Returns the first element having the largest value according to the provided [comparator] or `null` if there are no elements.
1898  *
1899  * The operation is _terminal_.
1900  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1901  *
1902  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1903  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1904  */
1905 @Deprecated(
1906     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1907     level = DeprecationLevel.ERROR
1908 )
maxWithnull1909 public suspend fun <E> ReceiveChannel<E>.maxWith(comparator: Comparator<in E>): E? =
1910     consume {
1911         val iterator = iterator()
1912         if (!iterator.hasNext()) return null
1913         var max = iterator.next()
1914         while (iterator.hasNext()) {
1915             val e = iterator.next()
1916             if (comparator.compare(max, e) < 0) max = e
1917         }
1918         return max
1919     }
1920 
1921 /**
1922  * Returns the first element yielding the smallest value of the given function or `null` if there are no elements.
1923  *
1924  * The operation is _terminal_.
1925  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1926  *
1927  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1928  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1929  */
1930 @Deprecated(
1931     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1932     level = DeprecationLevel.ERROR
1933 )
minBynull1934 public suspend inline fun <E, R : Comparable<R>> ReceiveChannel<E>.minBy(selector: (E) -> R): E? =
1935     consume {
1936         val iterator = iterator()
1937         if (!iterator.hasNext()) return null
1938         var minElem = iterator.next()
1939         var minValue = selector(minElem)
1940         while (iterator.hasNext()) {
1941             val e = iterator.next()
1942             val v = selector(e)
1943             if (minValue > v) {
1944                 minElem = e
1945                 minValue = v
1946             }
1947         }
1948         return minElem
1949     }
1950 
1951 /**
1952  * Returns the first element having the smallest value according to the provided [comparator] or `null` if there are no elements.
1953  *
1954  * The operation is _terminal_.
1955  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1956  *
1957  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1958  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1959  */
1960 @Deprecated(
1961     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1962     level = DeprecationLevel.ERROR
1963 )
minWithnull1964 public suspend fun <E> ReceiveChannel<E>.minWith(comparator: Comparator<in E>): E? =
1965     consume {
1966         val iterator = iterator()
1967         if (!iterator.hasNext()) return null
1968         var min = iterator.next()
1969         while (iterator.hasNext()) {
1970             val e = iterator.next()
1971             if (comparator.compare(min, e) > 0) min = e
1972         }
1973         return min
1974     }
1975 
1976 /**
1977  * Returns `true` if the channel has no elements.
1978  *
1979  * The operation is _terminal_.
1980  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1981  *
1982  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1983  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1984  */
1985 @Deprecated(
1986     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1987     level = DeprecationLevel.ERROR
1988 )
nonenull1989 public suspend fun <E> ReceiveChannel<E>.none(): Boolean =
1990     consume {
1991         return !iterator().hasNext()
1992     }
1993 
1994 /**
1995  * Returns `true` if no elements match the given [predicate].
1996  *
1997  * The operation is _terminal_.
1998  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1999  *
2000  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
2001  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
2002  */
2003 @Deprecated(
2004     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
2005     level = DeprecationLevel.ERROR
2006 )
nonenull2007 public suspend inline fun <E> ReceiveChannel<E>.none(predicate: (E) -> Boolean): Boolean {
2008     consumeEach {
2009         if (predicate(it)) return false
2010     }
2011     return true
2012 }
2013 
2014 /**
2015  * Accumulates value starting with the first element and applying [operation] from left to right to current accumulator value and each element.
2016  *
2017  * The operation is _terminal_.
2018  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
2019  *
2020  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
2021  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
2022  */
2023 @Deprecated(
2024     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
2025     level = DeprecationLevel.ERROR
2026 )
reducenull2027 public suspend inline fun <S, E : S> ReceiveChannel<E>.reduce(operation: (acc: S, E) -> S): S =
2028     consume {
2029         val iterator = this.iterator()
2030         if (!iterator.hasNext()) throw UnsupportedOperationException("Empty channel can't be reduced.")
2031         var accumulator: S = iterator.next()
2032         while (iterator.hasNext()) {
2033             accumulator = operation(accumulator, iterator.next())
2034         }
2035         return accumulator
2036     }
2037 
2038 /**
2039  * Accumulates value starting with the first element and applying [operation] from left to right
2040  * to current accumulator value and each element with its index in the original channel.
2041  * @param [operation] function that takes the index of an element, current accumulator value
2042  * and the element itself and calculates the next accumulator value.
2043  *
2044  * The operation is _terminal_.
2045  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
2046  *
2047  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
2048  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
2049  */
2050 @Deprecated(
2051     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
2052     level = DeprecationLevel.ERROR
2053 )
reduceIndexednull2054 public suspend inline fun <S, E : S> ReceiveChannel<E>.reduceIndexed(operation: (index: Int, acc: S, E) -> S): S =
2055     consume {
2056         val iterator = this.iterator()
2057         if (!iterator.hasNext()) throw UnsupportedOperationException("Empty channel can't be reduced.")
2058         var index = 1
2059         var accumulator: S = iterator.next()
2060         while (iterator.hasNext()) {
2061             accumulator = operation(index++, accumulator, iterator.next())
2062         }
2063         return accumulator
2064     }
2065 
2066 /**
2067  * Returns the sum of all values produced by [selector] function applied to each element in the channel.
2068  *
2069  * The operation is _terminal_.
2070  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
2071  *
2072  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
2073  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
2074  */
2075 @Deprecated(
2076     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
2077     level = DeprecationLevel.ERROR
2078 )
sumBynull2079 public suspend inline fun <E> ReceiveChannel<E>.sumBy(selector: (E) -> Int): Int {
2080     var sum = 0
2081     consumeEach {
2082         sum += selector(it)
2083     }
2084     return sum
2085 }
2086 
2087 /**
2088  * Returns the sum of all values produced by [selector] function applied to each element in the channel.
2089  *
2090  * The operation is _terminal_.
2091  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
2092  *
2093  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
2094  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
2095  */
2096 @Deprecated(
2097     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
2098     level = DeprecationLevel.ERROR
2099 )
sumByDoublenull2100 public suspend inline fun <E> ReceiveChannel<E>.sumByDouble(selector: (E) -> Double): Double {
2101     var sum = 0.0
2102     consumeEach {
2103         sum += selector(it)
2104     }
2105     return sum
2106 }
2107 
2108 /**
2109  * Returns an original collection containing all the non-`null` elements, throwing an [IllegalArgumentException] if there are any `null` elements.
2110  *
2111  * The operation is _intermediate_ and _stateless_.
2112  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
2113  *
2114  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
2115  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
2116  */
2117 @Deprecated(
2118     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
2119     level = DeprecationLevel.ERROR
2120 )
requireNoNullsnull2121 public fun <E : Any> ReceiveChannel<E?>.requireNoNulls(): ReceiveChannel<E> =
2122     map { it ?: throw IllegalArgumentException("null element found in $this.") }
2123 
2124 /**
2125  * Splits the original channel into pair of lists,
2126  * where *first* list contains elements for which [predicate] yielded `true`,
2127  * while *second* list contains elements for which [predicate] yielded `false`.
2128  *
2129  * The operation is _terminal_.
2130  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
2131  *
2132  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
2133  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
2134  */
2135 @Deprecated(
2136     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
2137     level = DeprecationLevel.ERROR
2138 )
partitionnull2139 public suspend inline fun <E> ReceiveChannel<E>.partition(predicate: (E) -> Boolean): Pair<List<E>, List<E>> {
2140     val first = ArrayList<E>()
2141     val second = ArrayList<E>()
2142     consumeEach {
2143         if (predicate(it)) {
2144             first.add(it)
2145         } else {
2146             second.add(it)
2147         }
2148     }
2149     return Pair(first, second)
2150 }
2151 
2152 /**
2153  * Returns a channel of pairs built from elements of both channels with same indexes.
2154  * Resulting channel has length of shortest input channel.
2155  *
2156  * The operation is _intermediate_ and _stateless_.
2157  * This function [consumes][ReceiveChannel.consume] all elements of both the original [ReceiveChannel] and the `other` one.
2158  *
2159  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
2160  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
2161  */
2162 @Deprecated(
2163     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
2164     level = DeprecationLevel.ERROR
2165 )
zipnull2166 public infix fun <E, R> ReceiveChannel<E>.zip(other: ReceiveChannel<R>): ReceiveChannel<Pair<E, R>> =
2167     zip(other) { t1, t2 -> t1 to t2 }
2168 
2169 /**
2170  * Returns a channel of values built from elements of both collections with same indexes using provided [transform]. Resulting channel has length of shortest input channels.
2171  *
2172  * The operation is _intermediate_ and _stateless_.
2173  * This function [consumes][consume] all elements of both the original [ReceiveChannel] and the `other` one.
2174  *
2175  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
2176  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
2177  */
2178 @Deprecated(
2179     message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
2180     level = DeprecationLevel.ERROR
2181 )
zipnull2182 public fun <E, R, V> ReceiveChannel<E>.zip(other: ReceiveChannel<R>, context: CoroutineContext = Dispatchers.Unconfined, transform: (a: E, b: R) -> V): ReceiveChannel<V> =
2183     GlobalScope.produce(context, onCompletion = consumesAll(this, other)) {
2184         val otherIterator = other.iterator()
2185         this@zip.consumeEach { element1 ->
2186             if (!otherIterator.hasNext()) return@consumeEach
2187             val element2 = otherIterator.next()
2188             send(transform(element1, element2))
2189         }
2190     }
2191