1 /*
<lambda>null2 * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4 @file:JvmMultifileClass
5 @file:JvmName("ChannelsKt")
6 @file:Suppress("DEPRECATION_ERROR")
7
8 package kotlinx.coroutines.channels
9
10 import kotlinx.coroutines.*
11 import kotlinx.coroutines.selects.*
12 import kotlin.coroutines.*
13 import kotlin.jvm.*
14
15 internal const val DEFAULT_CLOSE_MESSAGE = "Channel was closed"
16
17
18 // -------- Operations on BroadcastChannel --------
19
20 /**
21 * Opens subscription to this [BroadcastChannel] and makes sure that the given [block] consumes all elements
22 * from it by always invoking [cancel][ReceiveChannel.cancel] after the execution of the block.
23 *
24 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
25 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
26 */
27 @ObsoleteCoroutinesApi
28 public inline fun <E, R> BroadcastChannel<E>.consume(block: ReceiveChannel<E>.() -> R): R {
29 val channel = openSubscription()
30 try {
31 return channel.block()
32 } finally {
33 channel.cancel()
34 }
35 }
36
37 /**
38 * Retrieves and removes the element from this channel suspending the caller while this channel [isEmpty]
39 * or returns `null` if the channel is [closed][Channel.isClosedForReceive].
40 *
41 * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
42 * function is suspended, this function immediately resumes with [CancellationException].
43 * There is a **prompt cancellation guarantee**. If the job was cancelled while this function was
44 * suspended, it will not resume successfully. If the `receiveOrNull` call threw [CancellationException] there is no way
45 * to tell if some element was already received from the channel or not. See [Channel] documentation for details.
46 *
47 * Note, that this function does not check for cancellation when it is not suspended.
48 * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
49 *
50 * This extension is defined only for channels on non-null types, so that generic functions defined using
51 * these extensions do not accidentally confuse `null` value and a normally closed channel, leading to hard
52 * to find bugs.
53 */
54 @Suppress("EXTENSION_SHADOWED_BY_MEMBER")
55 @ExperimentalCoroutinesApi // since 1.3.0, tentatively stable in 1.4.x
receiveOrNullnull56 public suspend fun <E : Any> ReceiveChannel<E>.receiveOrNull(): E? {
57 @Suppress("DEPRECATION", "UNCHECKED_CAST")
58 return (this as ReceiveChannel<E?>).receiveOrNull()
59 }
60
61 /**
62 * Clause for [select] expression of [receiveOrNull] suspending function that selects with the element that
63 * is received from the channel or selects with `null` if the channel
64 * [isClosedForReceive][ReceiveChannel.isClosedForReceive] without cause. The [select] invocation fails with
65 * the original [close][SendChannel.close] cause exception if the channel has _failed_.
66 *
67 * This extension is defined only for channels on non-null types, so that generic functions defined using
68 * these extensions do not accidentally confuse `null` value and a normally closed channel, leading to hard
69 * to find bugs.
70 **/
71 @ExperimentalCoroutinesApi // since 1.3.0, tentatively stable in 1.4.x
onReceiveOrNullnull72 public fun <E : Any> ReceiveChannel<E>.onReceiveOrNull(): SelectClause1<E?> {
73 @Suppress("DEPRECATION", "UNCHECKED_CAST")
74 return (this as ReceiveChannel<E?>).onReceiveOrNull
75 }
76
77 /**
78 * Subscribes to this [BroadcastChannel] and performs the specified action for each received element.
79 *
80 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
81 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
82 */
83 @ObsoleteCoroutinesApi
consumeEachnull84 public suspend inline fun <E> BroadcastChannel<E>.consumeEach(action: (E) -> Unit): Unit =
85 consume {
86 for (element in this) action(element)
87 }
88
89 // -------- Operations on ReceiveChannel --------
90
91 /**
92 * Returns a [CompletionHandler] that invokes [cancel][ReceiveChannel.cancel] on the [ReceiveChannel]
93 * with the corresponding cause. See also [ReceiveChannel.consume].
94 *
95 * **WARNING**: It is planned that in the future a second invocation of this method
96 * on an channel that is already being consumed is going to fail fast, that it
97 * immediately throws an [IllegalStateException].
98 * See [this issue](https://github.com/Kotlin/kotlinx.coroutines/issues/167)
99 * for details.
100 *
101 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
102 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
103 */
104 @Deprecated(
105 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
106 level = DeprecationLevel.ERROR
107 )
causenull108 public fun ReceiveChannel<*>.consumes(): CompletionHandler = { cause: Throwable? ->
109 cancelConsumed(cause)
110 }
111
112 @PublishedApi
cancelConsumednull113 internal fun ReceiveChannel<*>.cancelConsumed(cause: Throwable?) {
114 cancel(cause?.let {
115 it as? CancellationException ?: CancellationException("Channel was consumed, consumer had failed", it)
116 })
117 }
118
119 /**
120 * Returns a [CompletionHandler] that invokes [cancel][ReceiveChannel.cancel] on all the
121 * specified [ReceiveChannel] instances with the corresponding cause.
122 * See also [ReceiveChannel.consumes()] for a version on one channel.
123 *
124 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
125 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
126 */
127 @Deprecated(
128 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
129 level = DeprecationLevel.ERROR
130 )
consumesAllnull131 public fun consumesAll(vararg channels: ReceiveChannel<*>): CompletionHandler =
132 { cause: Throwable? ->
133 var exception: Throwable? = null
134 for (channel in channels)
135 try {
136 channel.cancelConsumed(cause)
137 } catch (e: Throwable) {
138 if (exception == null) {
139 exception = e
140 } else {
141 exception.addSuppressedThrowable(e)
142 }
143 }
144 exception?.let { throw it }
145 }
146
147 /**
148 * Makes sure that the given [block] consumes all elements from the given channel
149 * by always invoking [cancel][ReceiveChannel.cancel] after the execution of the block.
150 *
151 * The operation is _terminal_.
152 */
consumenull153 public inline fun <E, R> ReceiveChannel<E>.consume(block: ReceiveChannel<E>.() -> R): R {
154 var cause: Throwable? = null
155 try {
156 return block()
157 } catch (e: Throwable) {
158 cause = e
159 throw e
160 } finally {
161 cancelConsumed(cause)
162 }
163 }
164
165 /**
166 * Performs the given [action] for each received element and [cancels][ReceiveChannel.cancel]
167 * the channel after the execution of the block.
168 * If you need to iterate over the channel without consuming it, a regular `for` loop should be used instead.
169 *
170 * The operation is _terminal_.
171 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
172 */
consumeEachnull173 public suspend inline fun <E> ReceiveChannel<E>.consumeEach(action: (E) -> Unit): Unit =
174 consume {
175 for (e in this) action(e)
176 }
177
178 /**
179 * Performs the given [action] for each received element.
180 *
181 * The operation is _terminal_.
182 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
183 *
184 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
185 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
186 */
187 @Deprecated(
188 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
189 level = DeprecationLevel.ERROR
190 )
consumeEachIndexednull191 public suspend inline fun <E> ReceiveChannel<E>.consumeEachIndexed(action: (IndexedValue<E>) -> Unit) {
192 var index = 0
193 consumeEach {
194 action(IndexedValue(index++, it))
195 }
196 }
197
198 /**
199 * Returns an element at the given [index] or throws an [IndexOutOfBoundsException] if the [index] is out of bounds of this channel.
200 *
201 * The operation is _terminal_.
202 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
203 *
204 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
205 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
206 */
207 @Deprecated(
208 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
209 level = DeprecationLevel.ERROR
210 )
elementAtnull211 public suspend fun <E> ReceiveChannel<E>.elementAt(index: Int): E =
212 elementAtOrElse(index) { throw IndexOutOfBoundsException("ReceiveChannel doesn't contain element at index $index.") }
213
214 /**
215 * Returns an element at the given [index] or the result of calling the [defaultValue] function if the [index] is out of bounds of this channel.
216 *
217 * The operation is _terminal_.
218 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
219 *
220 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
221 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
222 */
223 @Deprecated(
224 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
225 level = DeprecationLevel.ERROR
226 )
elementAtOrElsenull227 public suspend inline fun <E> ReceiveChannel<E>.elementAtOrElse(index: Int, defaultValue: (Int) -> E): E =
228 consume {
229 if (index < 0)
230 return defaultValue(index)
231 var count = 0
232 for (element in this) {
233 if (index == count++)
234 return element
235 }
236 return defaultValue(index)
237 }
238
239 /**
240 * Returns an element at the given [index] or `null` if the [index] is out of bounds of this channel.
241 *
242 * The operation is _terminal_.
243 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
244 *
245 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
246 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
247 */
248 @Deprecated(
249 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
250 level = DeprecationLevel.ERROR
251 )
elementAtOrNullnull252 public suspend fun <E> ReceiveChannel<E>.elementAtOrNull(index: Int): E? =
253 consume {
254 if (index < 0)
255 return null
256 var count = 0
257 for (element in this) {
258 if (index == count++)
259 return element
260 }
261 return null
262 }
263
264 /**
265 * Returns the first element matching the given [predicate], or `null` if no such element was found.
266 *
267 * The operation is _terminal_.
268 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
269 *
270 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
271 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
272 */
273 @Deprecated(
274 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
275 level = DeprecationLevel.ERROR
276 )
findnull277 public suspend inline fun <E> ReceiveChannel<E>.find(predicate: (E) -> Boolean): E? =
278 firstOrNull(predicate)
279
280 /**
281 * Returns the last element matching the given [predicate], or `null` if no such element was found.
282 *
283 * The operation is _terminal_.
284 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
285 *
286 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
287 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
288 */
289 @Deprecated(
290 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
291 level = DeprecationLevel.ERROR
292 )
293 public suspend inline fun <E> ReceiveChannel<E>.findLast(predicate: (E) -> Boolean): E? =
294 lastOrNull(predicate)
295
296 /**
297 * Returns first element.
298 * @throws [NoSuchElementException] if the channel is empty.
299 *
300 * The operation is _terminal_.
301 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
302 *
303 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
304 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
305 */
306 @Deprecated(
307 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
308 level = DeprecationLevel.ERROR
309 )
310 public suspend fun <E> ReceiveChannel<E>.first(): E =
311 consume {
312 val iterator = iterator()
313 if (!iterator.hasNext())
314 throw NoSuchElementException("ReceiveChannel is empty.")
315 return iterator.next()
316 }
317
318 /**
319 * Returns the first element matching the given [predicate].
320 * @throws [NoSuchElementException] if no such element is found.
321 *
322 * The operation is _terminal_.
323 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
324 *
325 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
326 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
327 */
328 @Deprecated(
329 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
330 level = DeprecationLevel.ERROR
331 )
firstnull332 public suspend inline fun <E> ReceiveChannel<E>.first(predicate: (E) -> Boolean): E {
333 consumeEach {
334 if (predicate(it)) return it
335 }
336 throw NoSuchElementException("ReceiveChannel contains no element matching the predicate.")
337 }
338
339 /**
340 * Returns the first element, or `null` if the channel is empty.
341 *
342 * The operation is _terminal_.
343 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
344 *
345 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
346 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
347 */
348 @Deprecated(
349 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
350 level = DeprecationLevel.ERROR
351 )
firstOrNullnull352 public suspend fun <E> ReceiveChannel<E>.firstOrNull(): E? =
353 consume {
354 val iterator = iterator()
355 if (!iterator.hasNext())
356 return null
357 return iterator.next()
358 }
359
360 /**
361 * Returns the first element matching the given [predicate], or `null` if element was not found.
362 *
363 * The operation is _terminal_.
364 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
365 *
366 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
367 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
368 */
369 @Deprecated(
370 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
371 level = DeprecationLevel.ERROR
372 )
firstOrNullnull373 public suspend inline fun <E> ReceiveChannel<E>.firstOrNull(predicate: (E) -> Boolean): E? {
374 consumeEach {
375 if (predicate(it)) return it
376 }
377 return null
378 }
379
380 /**
381 * Returns first index of [element], or -1 if the channel does not contain element.
382 *
383 * The operation is _terminal_.
384 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
385 *
386 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
387 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
388 */
389 @Deprecated(
390 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
391 level = DeprecationLevel.ERROR
392 )
indexOfnull393 public suspend fun <E> ReceiveChannel<E>.indexOf(element: E): Int {
394 var index = 0
395 consumeEach {
396 if (element == it)
397 return index
398 index++
399 }
400 return -1
401 }
402
403 /**
404 * Returns index of the first element matching the given [predicate], or -1 if the channel does not contain such element.
405 *
406 * The operation is _terminal_.
407 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
408 *
409 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
410 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
411 */
412 @Deprecated(
413 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
414 level = DeprecationLevel.ERROR
415 )
indexOfFirstnull416 public suspend inline fun <E> ReceiveChannel<E>.indexOfFirst(predicate: (E) -> Boolean): Int {
417 var index = 0
418 consumeEach {
419 if (predicate(it))
420 return index
421 index++
422 }
423 return -1
424 }
425
426 /**
427 * Returns index of the last element matching the given [predicate], or -1 if the channel does not contain such element.
428 *
429 * The operation is _terminal_.
430 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
431 *
432 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
433 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
434 */
435 @Deprecated(
436 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
437 level = DeprecationLevel.ERROR
438 )
indexOfLastnull439 public suspend inline fun <E> ReceiveChannel<E>.indexOfLast(predicate: (E) -> Boolean): Int {
440 var lastIndex = -1
441 var index = 0
442 consumeEach {
443 if (predicate(it))
444 lastIndex = index
445 index++
446 }
447 return lastIndex
448 }
449
450 /**
451 * Returns the last element.
452 * @throws [NoSuchElementException] if the channel is empty.
453 *
454 * The operation is _terminal_.
455 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
456 *
457 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
458 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
459 */
460 @Deprecated(
461 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
462 level = DeprecationLevel.ERROR
463 )
lastnull464 public suspend fun <E> ReceiveChannel<E>.last(): E =
465 consume {
466 val iterator = iterator()
467 if (!iterator.hasNext())
468 throw NoSuchElementException("ReceiveChannel is empty.")
469 var last = iterator.next()
470 while (iterator.hasNext())
471 last = iterator.next()
472 return last
473 }
474
475 /**
476 * Returns the last element matching the given [predicate].
477 * @throws [NoSuchElementException] if no such element is found.
478 *
479 * The operation is _terminal_.
480 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
481 *
482 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
483 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
484 */
485 @Deprecated(
486 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
487 level = DeprecationLevel.ERROR
488 )
lastnull489 public suspend inline fun <E> ReceiveChannel<E>.last(predicate: (E) -> Boolean): E {
490 var last: E? = null
491 var found = false
492 consumeEach {
493 if (predicate(it)) {
494 last = it
495 found = true
496 }
497 }
498 if (!found) throw NoSuchElementException("ReceiveChannel contains no element matching the predicate.")
499 @Suppress("UNCHECKED_CAST")
500 return last as E
501 }
502
503 /**
504 * Returns last index of [element], or -1 if the channel does not contain element.
505 *
506 * The operation is _terminal_.
507 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
508 *
509 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
510 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
511 */
512 @Deprecated(
513 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
514 level = DeprecationLevel.ERROR
515 )
lastIndexOfnull516 public suspend fun <E> ReceiveChannel<E>.lastIndexOf(element: E): Int {
517 var lastIndex = -1
518 var index = 0
519 consumeEach {
520 if (element == it)
521 lastIndex = index
522 index++
523 }
524 return lastIndex
525 }
526
527 /**
528 * Returns the last element, or `null` if the channel is empty.
529 *
530 * The operation is _terminal_.
531 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
532 *
533 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
534 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
535 */
536 @Deprecated(
537 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
538 level = DeprecationLevel.ERROR
539 )
lastOrNullnull540 public suspend fun <E> ReceiveChannel<E>.lastOrNull(): E? =
541 consume {
542 val iterator = iterator()
543 if (!iterator.hasNext())
544 return null
545 var last = iterator.next()
546 while (iterator.hasNext())
547 last = iterator.next()
548 return last
549 }
550
551 /**
552 * Returns the last element matching the given [predicate], or `null` if no such element was found.
553 *
554 * The operation is _terminal_.
555 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
556 *
557 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
558 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
559 */
560 @Deprecated(
561 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
562 level = DeprecationLevel.ERROR
563 )
lastOrNullnull564 public suspend inline fun <E> ReceiveChannel<E>.lastOrNull(predicate: (E) -> Boolean): E? {
565 var last: E? = null
566 consumeEach {
567 if (predicate(it)) {
568 last = it
569 }
570 }
571 return last
572 }
573
574 /**
575 * Returns the single element, or throws an exception if the channel is empty or has more than one element.
576 *
577 * The operation is _terminal_.
578 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
579 *
580 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
581 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
582 */
583 @Deprecated(
584 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
585 level = DeprecationLevel.ERROR
586 )
singlenull587 public suspend fun <E> ReceiveChannel<E>.single(): E =
588 consume {
589 val iterator = iterator()
590 if (!iterator.hasNext())
591 throw NoSuchElementException("ReceiveChannel is empty.")
592 val single = iterator.next()
593 if (iterator.hasNext())
594 throw IllegalArgumentException("ReceiveChannel has more than one element.")
595 return single
596 }
597
598 /**
599 * Returns the single element matching the given [predicate], or throws exception if there is no or more than one matching element.
600 *
601 * The operation is _terminal_.
602 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
603 *
604 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
605 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
606 */
607 @Deprecated(
608 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
609 level = DeprecationLevel.ERROR
610 )
singlenull611 public suspend inline fun <E> ReceiveChannel<E>.single(predicate: (E) -> Boolean): E {
612 var single: E? = null
613 var found = false
614 consumeEach {
615 if (predicate(it)) {
616 if (found) throw IllegalArgumentException("ReceiveChannel contains more than one matching element.")
617 single = it
618 found = true
619 }
620 }
621 if (!found) throw NoSuchElementException("ReceiveChannel contains no element matching the predicate.")
622 @Suppress("UNCHECKED_CAST")
623 return single as E
624 }
625
626 /**
627 * Returns single element, or `null` if the channel is empty or has more than one element.
628 *
629 * The operation is _terminal_.
630 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
631 *
632 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
633 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
634 */
635 @Deprecated(
636 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
637 level = DeprecationLevel.ERROR
638 )
singleOrNullnull639 public suspend fun <E> ReceiveChannel<E>.singleOrNull(): E? =
640 consume {
641 val iterator = iterator()
642 if (!iterator.hasNext())
643 return null
644 val single = iterator.next()
645 if (iterator.hasNext())
646 return null
647 return single
648 }
649
650 /**
651 * Returns the single element matching the given [predicate], or `null` if element was not found or more than one element was found.
652 *
653 * The operation is _terminal_.
654 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
655 *
656 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
657 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
658 */
659 @Deprecated(
660 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
661 level = DeprecationLevel.ERROR
662 )
singleOrNullnull663 public suspend inline fun <E> ReceiveChannel<E>.singleOrNull(predicate: (E) -> Boolean): E? {
664 var single: E? = null
665 var found = false
666 consumeEach {
667 if (predicate(it)) {
668 if (found) return null
669 single = it
670 found = true
671 }
672 }
673 if (!found) return null
674 return single
675 }
676
677 /**
678 * Returns a channel containing all elements except first [n] elements.
679 *
680 * The operation is _intermediate_ and _stateless_.
681 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
682 *
683 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
684 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
685 */
686 @Deprecated(
687 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
688 level = DeprecationLevel.ERROR
689 )
dropnull690 public fun <E> ReceiveChannel<E>.drop(n: Int, context: CoroutineContext = Dispatchers.Unconfined): ReceiveChannel<E> =
691 GlobalScope.produce(context, onCompletion = consumes()) {
692 require(n >= 0) { "Requested element count $n is less than zero." }
693 var remaining: Int = n
694 if (remaining > 0)
695 for (e in this@drop) {
696 remaining--
697 if (remaining == 0)
698 break
699 }
700 for (e in this@drop) {
701 send(e)
702 }
703 }
704
705 /**
706 * Returns a channel containing all elements except first elements that satisfy the given [predicate].
707 *
708 * The operation is _intermediate_ and _stateless_.
709 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
710 *
711 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
712 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
713 */
714 @Deprecated(
715 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
716 level = DeprecationLevel.ERROR
717 )
dropWhilenull718 public fun <E> ReceiveChannel<E>.dropWhile(context: CoroutineContext = Dispatchers.Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel<E> =
719 GlobalScope.produce(context, onCompletion = consumes()) {
720 for (e in this@dropWhile) {
721 if (!predicate(e)) {
722 send(e)
723 break
724 }
725 }
726 for (e in this@dropWhile) {
727 send(e)
728 }
729 }
730
731 /**
732 * Returns a channel containing only elements matching the given [predicate].
733 *
734 * The operation is _intermediate_ and _stateless_.
735 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
736 *
737 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
738 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
739 */
740 @Deprecated(
741 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
742 level = DeprecationLevel.ERROR
743 )
filternull744 public fun <E> ReceiveChannel<E>.filter(context: CoroutineContext = Dispatchers.Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel<E> =
745 GlobalScope.produce(context, onCompletion = consumes()) {
746 for (e in this@filter) {
747 if (predicate(e)) send(e)
748 }
749 }
750
751 /**
752 * Returns a channel containing only elements matching the given [predicate].
753 * @param [predicate] function that takes the index of an element and the element itself
754 * and returns the result of predicate evaluation on the element.
755 *
756 * The operation is _intermediate_ and _stateless_.
757 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
758 *
759 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
760 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
761 */
762 @Deprecated(
763 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
764 level = DeprecationLevel.ERROR
765 )
filterIndexednull766 public fun <E> ReceiveChannel<E>.filterIndexed(context: CoroutineContext = Dispatchers.Unconfined, predicate: suspend (index: Int, E) -> Boolean): ReceiveChannel<E> =
767 GlobalScope.produce(context, onCompletion = consumes()) {
768 var index = 0
769 for (e in this@filterIndexed) {
770 if (predicate(index++, e)) send(e)
771 }
772 }
773
774 /**
775 * Appends all elements matching the given [predicate] to the given [destination].
776 * @param [predicate] function that takes the index of an element and the element itself
777 * and returns the result of predicate evaluation on the element.
778 *
779 * The operation is _terminal_.
780 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
781 *
782 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
783 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
784 */
785 @Deprecated(
786 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
787 level = DeprecationLevel.ERROR
788 )
filterIndexedTonull789 public suspend inline fun <E, C : MutableCollection<in E>> ReceiveChannel<E>.filterIndexedTo(destination: C, predicate: (index: Int, E) -> Boolean): C {
790 consumeEachIndexed { (index, element) ->
791 if (predicate(index, element)) destination.add(element)
792 }
793 return destination
794 }
795
796 /**
797 * Appends all elements matching the given [predicate] to the given [destination].
798 * @param [predicate] function that takes the index of an element and the element itself
799 * and returns the result of predicate evaluation on the element.
800 *
801 * The operation is _terminal_.
802 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
803 *
804 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
805 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
806 */
807 @Deprecated(
808 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
809 level = DeprecationLevel.ERROR
810 )
filterIndexedTonull811 public suspend inline fun <E, C : SendChannel<E>> ReceiveChannel<E>.filterIndexedTo(destination: C, predicate: (index: Int, E) -> Boolean): C {
812 consumeEachIndexed { (index, element) ->
813 if (predicate(index, element)) destination.send(element)
814 }
815 return destination
816 }
817
818 /**
819 * Returns a channel containing all elements not matching the given [predicate].
820 *
821 * The operation is _intermediate_ and _stateless_.
822 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
823 *
824 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
825 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
826 */
827 @Deprecated(
828 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
829 level = DeprecationLevel.ERROR
830 )
filterNotnull831 public fun <E> ReceiveChannel<E>.filterNot(context: CoroutineContext = Dispatchers.Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel<E> =
832 filter(context) { !predicate(it) }
833
834 /**
835 * Returns a channel containing all elements that are not `null`.
836 *
837 * The operation is _intermediate_ and _stateless_.
838 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
839 *
840 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
841 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
842 */
843 @Deprecated(
844 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
845 level = DeprecationLevel.ERROR
846 )
847 @Suppress("UNCHECKED_CAST")
filterNotNullnull848 public fun <E : Any> ReceiveChannel<E?>.filterNotNull(): ReceiveChannel<E> =
849 filter { it != null } as ReceiveChannel<E>
850
851 /**
852 * Appends all elements that are not `null` to the given [destination].
853 *
854 * The operation is _terminal_.
855 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
856 *
857 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
858 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
859 */
860 @Deprecated(
861 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
862 level = DeprecationLevel.ERROR
863 )
filterNotNullTonull864 public suspend fun <E : Any, C : MutableCollection<in E>> ReceiveChannel<E?>.filterNotNullTo(destination: C): C {
865 consumeEach {
866 if (it != null) destination.add(it)
867 }
868 return destination
869 }
870
871 /**
872 * Appends all elements that are not `null` to the given [destination].
873 *
874 * The operation is _terminal_.
875 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
876 *
877 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
878 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
879 */
880 @Deprecated(
881 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
882 level = DeprecationLevel.ERROR
883 )
filterNotNullTonull884 public suspend fun <E : Any, C : SendChannel<E>> ReceiveChannel<E?>.filterNotNullTo(destination: C): C {
885 consumeEach {
886 if (it != null) destination.send(it)
887 }
888 return destination
889 }
890
891 /**
892 * Appends all elements not matching the given [predicate] to the given [destination].
893 *
894 * The operation is _terminal_.
895 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
896 *
897 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
898 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
899 */
900 @Deprecated(
901 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
902 level = DeprecationLevel.ERROR
903 )
filterNotTonull904 public suspend inline fun <E, C : MutableCollection<in E>> ReceiveChannel<E>.filterNotTo(destination: C, predicate: (E) -> Boolean): C {
905 consumeEach {
906 if (!predicate(it)) destination.add(it)
907 }
908 return destination
909 }
910
911 /**
912 * Appends all elements not matching the given [predicate] to the given [destination].
913 *
914 * The operation is _terminal_.
915 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
916 *
917 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
918 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
919 */
920 @Deprecated(
921 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
922 level = DeprecationLevel.ERROR
923 )
filterNotTonull924 public suspend inline fun <E, C : SendChannel<E>> ReceiveChannel<E>.filterNotTo(destination: C, predicate: (E) -> Boolean): C {
925 consumeEach {
926 if (!predicate(it)) destination.send(it)
927 }
928 return destination
929 }
930
931 /**
932 * Appends all elements matching the given [predicate] to the given [destination].
933 *
934 * The operation is _terminal_.
935 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
936 *
937 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
938 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
939 */
940 @Deprecated(
941 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
942 level = DeprecationLevel.ERROR
943 )
filterTonull944 public suspend inline fun <E, C : MutableCollection<in E>> ReceiveChannel<E>.filterTo(destination: C, predicate: (E) -> Boolean): C {
945 consumeEach {
946 if (predicate(it)) destination.add(it)
947 }
948 return destination
949 }
950
951 /**
952 * Appends all elements matching the given [predicate] to the given [destination].
953 *
954 * The operation is _terminal_.
955 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
956 *
957 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
958 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
959 */
960 @Deprecated(
961 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
962 level = DeprecationLevel.ERROR
963 )
filterTonull964 public suspend inline fun <E, C : SendChannel<E>> ReceiveChannel<E>.filterTo(destination: C, predicate: (E) -> Boolean): C {
965 consumeEach {
966 if (predicate(it)) destination.send(it)
967 }
968 return destination
969 }
970
971 /**
972 * Returns a channel containing first [n] elements.
973 *
974 * The operation is _intermediate_ and _stateless_.
975 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
976 *
977 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
978 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
979 */
980 @Deprecated(
981 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
982 level = DeprecationLevel.ERROR
983 )
takenull984 public fun <E> ReceiveChannel<E>.take(n: Int, context: CoroutineContext = Dispatchers.Unconfined): ReceiveChannel<E> =
985 GlobalScope.produce(context, onCompletion = consumes()) {
986 if (n == 0) return@produce
987 require(n >= 0) { "Requested element count $n is less than zero." }
988 var remaining: Int = n
989 for (e in this@take) {
990 send(e)
991 remaining--
992 if (remaining == 0)
993 return@produce
994 }
995 }
996
997 /**
998 * Returns a channel containing first elements satisfying the given [predicate].
999 *
1000 * The operation is _intermediate_ and _stateless_.
1001 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1002 *
1003 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1004 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1005 */
1006 @Deprecated(
1007 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1008 level = DeprecationLevel.ERROR
1009 )
takeWhilenull1010 public fun <E> ReceiveChannel<E>.takeWhile(context: CoroutineContext = Dispatchers.Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel<E> =
1011 GlobalScope.produce(context, onCompletion = consumes()) {
1012 for (e in this@takeWhile) {
1013 if (!predicate(e)) return@produce
1014 send(e)
1015 }
1016 }
1017
1018 /**
1019 * Returns a [Map] containing key-value pairs provided by [transform] function
1020 * applied to elements of the given channel.
1021 *
1022 * If any of two pairs would have the same key the last one gets added to the map.
1023 *
1024 * The returned map preserves the entry iteration order of the original channel.
1025 *
1026 * The operation is _terminal_.
1027 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1028 *
1029 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1030 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1031 */
1032 @Deprecated(
1033 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1034 level = DeprecationLevel.ERROR
1035 )
associatenull1036 public suspend inline fun <E, K, V> ReceiveChannel<E>.associate(transform: (E) -> Pair<K, V>): Map<K, V> =
1037 associateTo(LinkedHashMap(), transform)
1038
1039 /**
1040 * Returns a [Map] containing the elements from the given channel indexed by the key
1041 * returned from [keySelector] function applied to each element.
1042 *
1043 * If any two elements would have the same key returned by [keySelector] the last one gets added to the map.
1044 *
1045 * The returned map preserves the entry iteration order of the original channel.
1046 *
1047 * The operation is _terminal_.
1048 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1049 *
1050 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1051 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1052 */
1053 @Deprecated(
1054 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1055 level = DeprecationLevel.ERROR
1056 )
1057 public suspend inline fun <E, K> ReceiveChannel<E>.associateBy(keySelector: (E) -> K): Map<K, E> =
1058 associateByTo(LinkedHashMap(), keySelector)
1059
1060 /**
1061 * Returns a [Map] containing the values provided by [valueTransform] and indexed by [keySelector] functions applied to elements of the given channel.
1062 *
1063 * If any two elements would have the same key returned by [keySelector] the last one gets added to the map.
1064 *
1065 * The returned map preserves the entry iteration order of the original channel.
1066 *
1067 * The operation is _terminal_.
1068 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1069 *
1070 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1071 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1072 */
1073 @Deprecated(
1074 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1075 level = DeprecationLevel.ERROR
1076 )
1077 public suspend inline fun <E, K, V> ReceiveChannel<E>.associateBy(keySelector: (E) -> K, valueTransform: (E) -> V): Map<K, V> =
1078 associateByTo(LinkedHashMap(), keySelector, valueTransform)
1079
1080 /**
1081 * Populates and returns the [destination] mutable map with key-value pairs,
1082 * where key is provided by the [keySelector] function applied to each element of the given channel
1083 * and value is the element itself.
1084 *
1085 * If any two elements would have the same key returned by [keySelector] the last one gets added to the map.
1086 *
1087 * The operation is _terminal_.
1088 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1089 *
1090 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1091 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1092 */
1093 @Deprecated(
1094 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1095 level = DeprecationLevel.ERROR
1096 )
1097 public suspend inline fun <E, K, M : MutableMap<in K, in E>> ReceiveChannel<E>.associateByTo(destination: M, keySelector: (E) -> K): M {
1098 consumeEach {
1099 destination.put(keySelector(it), it)
1100 }
1101 return destination
1102 }
1103
1104 /**
1105 * Populates and returns the [destination] mutable map with key-value pairs,
1106 * where key is provided by the [keySelector] function and
1107 * and value is provided by the [valueTransform] function applied to elements of the given channel.
1108 *
1109 * If any two elements would have the same key returned by [keySelector] the last one gets added to the map.
1110 *
1111 * The operation is _terminal_.
1112 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1113 *
1114 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1115 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1116 */
1117 @Deprecated(
1118 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1119 level = DeprecationLevel.ERROR
1120 )
associateByTonull1121 public suspend inline fun <E, K, V, M : MutableMap<in K, in V>> ReceiveChannel<E>.associateByTo(destination: M, keySelector: (E) -> K, valueTransform: (E) -> V): M {
1122 consumeEach {
1123 destination.put(keySelector(it), valueTransform(it))
1124 }
1125 return destination
1126 }
1127
1128 /**
1129 * Populates and returns the [destination] mutable map with key-value pairs
1130 * provided by [transform] function applied to each element of the given channel.
1131 *
1132 * If any of two pairs would have the same key the last one gets added to the map.
1133 *
1134 * The operation is _terminal_.
1135 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1136 *
1137 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1138 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1139 */
1140 @Deprecated(
1141 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1142 level = DeprecationLevel.ERROR
1143 )
associateTonull1144 public suspend inline fun <E, K, V, M : MutableMap<in K, in V>> ReceiveChannel<E>.associateTo(destination: M, transform: (E) -> Pair<K, V>): M {
1145 consumeEach {
1146 destination += transform(it)
1147 }
1148 return destination
1149 }
1150
1151 /**
1152 * Send each element of the original channel
1153 * and appends the results to the given [destination].
1154 *
1155 * The operation is _terminal_.
1156 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1157 *
1158 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1159 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1160 */
1161 @Deprecated(
1162 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1163 level = DeprecationLevel.ERROR
1164 )
toChannelnull1165 public suspend fun <E, C : SendChannel<E>> ReceiveChannel<E>.toChannel(destination: C): C {
1166 consumeEach {
1167 destination.send(it)
1168 }
1169 return destination
1170 }
1171
1172 /**
1173 * Appends all elements to the given [destination] collection.
1174 *
1175 * The operation is _terminal_.
1176 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1177 *
1178 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1179 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1180 */
1181 @Deprecated(
1182 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1183 level = DeprecationLevel.ERROR
1184 )
toCollectionnull1185 public suspend fun <E, C : MutableCollection<in E>> ReceiveChannel<E>.toCollection(destination: C): C {
1186 consumeEach {
1187 destination.add(it)
1188 }
1189 return destination
1190 }
1191
1192 /**
1193 * Returns a [List] containing all elements.
1194 *
1195 * The operation is _terminal_.
1196 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1197 */
toListnull1198 public suspend fun <E> ReceiveChannel<E>.toList(): List<E> =
1199 this.toMutableList()
1200
1201 /**
1202 * Returns a [Map] filled with all elements of this channel.
1203 *
1204 * The operation is _terminal_.
1205 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1206 *
1207 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1208 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1209 */
1210 @Deprecated(
1211 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1212 level = DeprecationLevel.ERROR
1213 )
1214 public suspend fun <K, V> ReceiveChannel<Pair<K, V>>.toMap(): Map<K, V> =
1215 toMap(LinkedHashMap())
1216
1217 /**
1218 * Returns a [MutableMap] filled with all elements of this channel.
1219 *
1220 * The operation is _terminal_.
1221 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1222 *
1223 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1224 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1225 */
1226 @Deprecated(
1227 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1228 level = DeprecationLevel.ERROR
1229 )
1230 public suspend fun <K, V, M : MutableMap<in K, in V>> ReceiveChannel<Pair<K, V>>.toMap(destination: M): M {
1231 consumeEach {
1232 destination += it
1233 }
1234 return destination
1235 }
1236
1237 /**
1238 * Returns a [MutableList] filled with all elements of this channel.
1239 *
1240 * The operation is _terminal_.
1241 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1242 *
1243 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1244 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1245 */
1246 @Deprecated(
1247 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1248 level = DeprecationLevel.ERROR
1249 )
toMutableListnull1250 public suspend fun <E> ReceiveChannel<E>.toMutableList(): MutableList<E> =
1251 toCollection(ArrayList())
1252
1253 /**
1254 * Returns a [Set] of all elements.
1255 *
1256 * The returned set preserves the element iteration order of the original channel.
1257 *
1258 * The operation is _terminal_.
1259 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1260 *
1261 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1262 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1263 */
1264 @Deprecated(
1265 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1266 level = DeprecationLevel.ERROR
1267 )
1268 public suspend fun <E> ReceiveChannel<E>.toSet(): Set<E> =
1269 this.toMutableSet()
1270
1271 /**
1272 * Returns a single channel of all elements from results of [transform] function being invoked on each element of original channel.
1273 *
1274 * The operation is _intermediate_ and _stateless_.
1275 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1276 *
1277 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1278 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1279 */
1280 @Deprecated(
1281 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1282 level = DeprecationLevel.ERROR
1283 )
1284 public fun <E, R> ReceiveChannel<E>.flatMap(context: CoroutineContext = Dispatchers.Unconfined, transform: suspend (E) -> ReceiveChannel<R>): ReceiveChannel<R> =
1285 GlobalScope.produce(context, onCompletion = consumes()) {
1286 for (e in this@flatMap) {
1287 transform(e).toChannel(this)
1288 }
1289 }
1290
1291 /**
1292 * Groups elements of the original channel by the key returned by the given [keySelector] function
1293 * applied to each element and returns a map where each group key is associated with a list of corresponding elements.
1294 *
1295 * The returned map preserves the entry iteration order of the keys produced from the original channel.
1296 *
1297 * The operation is _terminal_.
1298 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1299 *
1300 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1301 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1302 */
1303 @Deprecated(
1304 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1305 level = DeprecationLevel.ERROR
1306 )
groupBynull1307 public suspend inline fun <E, K> ReceiveChannel<E>.groupBy(keySelector: (E) -> K): Map<K, List<E>> =
1308 groupByTo(LinkedHashMap(), keySelector)
1309
1310 /**
1311 * Groups values returned by the [valueTransform] function applied to each element of the original channel
1312 * by the key returned by the given [keySelector] function applied to the element
1313 * and returns a map where each group key is associated with a list of corresponding values.
1314 *
1315 * The returned map preserves the entry iteration order of the keys produced from the original channel.
1316 *
1317 * The operation is _terminal_.
1318 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1319 *
1320 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1321 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1322 */
1323 @Deprecated(
1324 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1325 level = DeprecationLevel.ERROR
1326 )
1327 public suspend inline fun <E, K, V> ReceiveChannel<E>.groupBy(keySelector: (E) -> K, valueTransform: (E) -> V): Map<K, List<V>> =
1328 groupByTo(LinkedHashMap(), keySelector, valueTransform)
1329
1330 /**
1331 * Groups elements of the original channel by the key returned by the given [keySelector] function
1332 * applied to each element and puts to the [destination] map each group key associated with a list of corresponding elements.
1333 *
1334 * @return The [destination] map.
1335 *
1336 * The operation is _terminal_.
1337 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1338 *
1339 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1340 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1341 */
1342 @Deprecated(
1343 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1344 level = DeprecationLevel.ERROR
1345 )
1346 public suspend inline fun <E, K, M : MutableMap<in K, MutableList<E>>> ReceiveChannel<E>.groupByTo(destination: M, keySelector: (E) -> K): M {
1347 consumeEach {
1348 val key = keySelector(it)
1349 val list = destination.getOrPut(key) { ArrayList() }
1350 list.add(it)
1351 }
1352 return destination
1353 }
1354
1355 /**
1356 * Groups values returned by the [valueTransform] function applied to each element of the original channel
1357 * by the key returned by the given [keySelector] function applied to the element
1358 * and puts to the [destination] map each group key associated with a list of corresponding values.
1359 *
1360 * @return The [destination] map.
1361 *
1362 * The operation is _terminal_.
1363 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1364 *
1365 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1366 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1367 */
1368 @Deprecated(
1369 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1370 level = DeprecationLevel.ERROR
1371 )
groupByTonull1372 public suspend inline fun <E, K, V, M : MutableMap<in K, MutableList<V>>> ReceiveChannel<E>.groupByTo(destination: M, keySelector: (E) -> K, valueTransform: (E) -> V): M {
1373 consumeEach {
1374 val key = keySelector(it)
1375 val list = destination.getOrPut(key) { ArrayList() }
1376 list.add(valueTransform(it))
1377 }
1378 return destination
1379 }
1380
1381 /**
1382 * Returns a channel containing the results of applying the given [transform] function
1383 * to each element in the original channel.
1384 *
1385 * The operation is _intermediate_ and _stateless_.
1386 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1387 */
1388 @Deprecated(
1389 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1390 level = DeprecationLevel.ERROR
1391 )
mapnull1392 public fun <E, R> ReceiveChannel<E>.map(context: CoroutineContext = Dispatchers.Unconfined, transform: suspend (E) -> R): ReceiveChannel<R> =
1393 GlobalScope.produce(context, onCompletion = consumes()) {
1394 consumeEach {
1395 send(transform(it))
1396 }
1397 }
1398
1399 /**
1400 * Returns a channel containing the results of applying the given [transform] function
1401 * to each element and its index in the original channel.
1402 * @param [transform] function that takes the index of an element and the element itself
1403 * and returns the result of the transform applied to the element.
1404 *
1405 * The operation is _intermediate_ and _stateless_.
1406 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1407 *
1408 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1409 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1410 */
1411 @Deprecated(
1412 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1413 level = DeprecationLevel.ERROR
1414 )
mapIndexednull1415 public fun <E, R> ReceiveChannel<E>.mapIndexed(context: CoroutineContext = Dispatchers.Unconfined, transform: suspend (index: Int, E) -> R): ReceiveChannel<R> =
1416 GlobalScope.produce(context, onCompletion = consumes()) {
1417 var index = 0
1418 for (e in this@mapIndexed) {
1419 send(transform(index++, e))
1420 }
1421 }
1422
1423 /**
1424 * Returns a channel containing only the non-null results of applying the given [transform] function
1425 * to each element and its index in the original channel.
1426 * @param [transform] function that takes the index of an element and the element itself
1427 * and returns the result of the transform applied to the element.
1428 *
1429 * The operation is _intermediate_ and _stateless_.
1430 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1431 *
1432 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1433 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1434 */
1435 @Deprecated(
1436 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1437 level = DeprecationLevel.ERROR
1438 )
mapIndexedNotNullnull1439 public fun <E, R : Any> ReceiveChannel<E>.mapIndexedNotNull(context: CoroutineContext = Dispatchers.Unconfined, transform: suspend (index: Int, E) -> R?): ReceiveChannel<R> =
1440 mapIndexed(context, transform).filterNotNull()
1441
1442 /**
1443 * Applies the given [transform] function to each element and its index in the original channel
1444 * and appends only the non-null results to the given [destination].
1445 * @param [transform] function that takes the index of an element and the element itself
1446 * and returns the result of the transform applied to the element.
1447 *
1448 * The operation is _terminal_.
1449 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1450 *
1451 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1452 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1453 */
1454 @Deprecated(
1455 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1456 level = DeprecationLevel.ERROR
1457 )
1458 public suspend inline fun <E, R : Any, C : MutableCollection<in R>> ReceiveChannel<E>.mapIndexedNotNullTo(destination: C, transform: (index: Int, E) -> R?): C {
1459 consumeEachIndexed { (index, element) ->
1460 transform(index, element)?.let { destination.add(it) }
1461 }
1462 return destination
1463 }
1464
1465 /**
1466 * Applies the given [transform] function to each element and its index in the original channel
1467 * and appends only the non-null results to the given [destination].
1468 * @param [transform] function that takes the index of an element and the element itself
1469 * and returns the result of the transform applied to the element.
1470 *
1471 * The operation is _terminal_.
1472 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1473 *
1474 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1475 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1476 */
1477 @Deprecated(
1478 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1479 level = DeprecationLevel.ERROR
1480 )
mapIndexedNotNullTonull1481 public suspend inline fun <E, R : Any, C : SendChannel<R>> ReceiveChannel<E>.mapIndexedNotNullTo(destination: C, transform: (index: Int, E) -> R?): C {
1482 consumeEachIndexed { (index, element) ->
1483 transform(index, element)?.let { destination.send(it) }
1484 }
1485 return destination
1486 }
1487
1488 /**
1489 * Applies the given [transform] function to each element and its index in the original channel
1490 * and appends the results to the given [destination].
1491 * @param [transform] function that takes the index of an element and the element itself
1492 * and returns the result of the transform applied to the element.
1493 *
1494 * The operation is _terminal_.
1495 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1496 *
1497 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1498 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1499 */
1500 @Deprecated(
1501 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1502 level = DeprecationLevel.ERROR
1503 )
mapIndexedTonull1504 public suspend inline fun <E, R, C : MutableCollection<in R>> ReceiveChannel<E>.mapIndexedTo(destination: C, transform: (index: Int, E) -> R): C {
1505 var index = 0
1506 consumeEach {
1507 destination.add(transform(index++, it))
1508 }
1509 return destination
1510 }
1511
1512 /**
1513 * Applies the given [transform] function to each element and its index in the original channel
1514 * and appends the results to the given [destination].
1515 * @param [transform] function that takes the index of an element and the element itself
1516 * and returns the result of the transform applied to the element.
1517 *
1518 * The operation is _terminal_.
1519 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1520 *
1521 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1522 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1523 */
1524 @Deprecated(
1525 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1526 level = DeprecationLevel.ERROR
1527 )
mapIndexedTonull1528 public suspend inline fun <E, R, C : SendChannel<R>> ReceiveChannel<E>.mapIndexedTo(destination: C, transform: (index: Int, E) -> R): C {
1529 var index = 0
1530 consumeEach {
1531 destination.send(transform(index++, it))
1532 }
1533 return destination
1534 }
1535
1536 /**
1537 * Returns a channel containing only the non-null results of applying the given [transform] function
1538 * to each element in the original channel.
1539 *
1540 * The operation is _intermediate_ and _stateless_.
1541 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1542 *
1543 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1544 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1545 */
1546 @Deprecated(
1547 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1548 level = DeprecationLevel.ERROR
1549 )
mapNotNullnull1550 public fun <E, R : Any> ReceiveChannel<E>.mapNotNull(context: CoroutineContext = Dispatchers.Unconfined, transform: suspend (E) -> R?): ReceiveChannel<R> =
1551 map(context, transform).filterNotNull()
1552
1553 /**
1554 * Applies the given [transform] function to each element in the original channel
1555 * and appends only the non-null results to the given [destination].
1556 *
1557 * The operation is _terminal_.
1558 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1559 *
1560 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1561 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1562 */
1563 @Deprecated(
1564 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1565 level = DeprecationLevel.ERROR
1566 )
1567 public suspend inline fun <E, R : Any, C : MutableCollection<in R>> ReceiveChannel<E>.mapNotNullTo(destination: C, transform: (E) -> R?): C {
1568 consumeEach {
1569 transform(it)?.let { destination.add(it) }
1570 }
1571 return destination
1572 }
1573
1574 /**
1575 * Applies the given [transform] function to each element in the original channel
1576 * and appends only the non-null results to the given [destination].
1577 *
1578 * The operation is _terminal_.
1579 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1580 *
1581 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1582 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1583 */
1584 @Deprecated(
1585 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1586 level = DeprecationLevel.ERROR
1587 )
mapNotNullTonull1588 public suspend inline fun <E, R : Any, C : SendChannel<R>> ReceiveChannel<E>.mapNotNullTo(destination: C, transform: (E) -> R?): C {
1589 consumeEach {
1590 transform(it)?.let { destination.send(it) }
1591 }
1592 return destination
1593 }
1594
1595 /**
1596 * Applies the given [transform] function to each element of the original channel
1597 * and appends the results to the given [destination].
1598 *
1599 * The operation is _terminal_.
1600 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1601 *
1602 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1603 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1604 */
1605 @Deprecated(
1606 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1607 level = DeprecationLevel.ERROR
1608 )
mapTonull1609 public suspend inline fun <E, R, C : MutableCollection<in R>> ReceiveChannel<E>.mapTo(destination: C, transform: (E) -> R): C {
1610 consumeEach {
1611 destination.add(transform(it))
1612 }
1613 return destination
1614 }
1615
1616 /**
1617 * Applies the given [transform] function to each element of the original channel
1618 * and appends the results to the given [destination].
1619 *
1620 * The operation is _terminal_.
1621 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1622 *
1623 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1624 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1625 */
1626 @Deprecated(
1627 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1628 level = DeprecationLevel.ERROR
1629 )
mapTonull1630 public suspend inline fun <E, R, C : SendChannel<R>> ReceiveChannel<E>.mapTo(destination: C, transform: (E) -> R): C {
1631 consumeEach {
1632 destination.send(transform(it))
1633 }
1634 return destination
1635 }
1636
1637 /**
1638 * Returns a channel of [IndexedValue] for each element of the original channel.
1639 *
1640 * The operation is _intermediate_ and _stateless_.
1641 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1642 *
1643 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1644 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1645 */
1646 @Deprecated(
1647 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1648 level = DeprecationLevel.ERROR
1649 )
withIndexnull1650 public fun <E> ReceiveChannel<E>.withIndex(context: CoroutineContext = Dispatchers.Unconfined): ReceiveChannel<IndexedValue<E>> =
1651 GlobalScope.produce(context, onCompletion = consumes()) {
1652 var index = 0
1653 for (e in this@withIndex) {
1654 send(IndexedValue(index++, e))
1655 }
1656 }
1657
1658 /**
1659 * Returns a channel containing only distinct elements from the given channel.
1660 *
1661 * The elements in the resulting channel are in the same order as they were in the source channel.
1662 *
1663 * The operation is _intermediate_ and _stateful_.
1664 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1665 *
1666 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1667 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1668 */
1669 @Deprecated(
1670 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1671 level = DeprecationLevel.ERROR
1672 )
distinctnull1673 public fun <E> ReceiveChannel<E>.distinct(): ReceiveChannel<E> =
1674 this.distinctBy { it }
1675
1676 /**
1677 * Returns a channel containing only elements from the given channel
1678 * having distinct keys returned by the given [selector] function.
1679 *
1680 * The elements in the resulting channel are in the same order as they were in the source channel.
1681 *
1682 * The operation is _intermediate_ and _stateful_.
1683 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1684 *
1685 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1686 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1687 */
1688 @Deprecated(
1689 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1690 level = DeprecationLevel.ERROR
1691 )
distinctBynull1692 public fun <E, K> ReceiveChannel<E>.distinctBy(context: CoroutineContext = Dispatchers.Unconfined, selector: suspend (E) -> K): ReceiveChannel<E> =
1693 GlobalScope.produce(context, onCompletion = consumes()) {
1694 val keys = HashSet<K>()
1695 for (e in this@distinctBy) {
1696 val k = selector(e)
1697 if (k !in keys) {
1698 send(e)
1699 keys += k
1700 }
1701 }
1702 }
1703
1704 /**
1705 * Returns a mutable set containing all distinct elements from the given channel.
1706 *
1707 * The returned set preserves the element iteration order of the original channel.
1708 *
1709 * The operation is _terminal_.
1710 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1711 *
1712 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1713 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1714 */
1715 @Deprecated(
1716 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1717 level = DeprecationLevel.ERROR
1718 )
toMutableSetnull1719 public suspend fun <E> ReceiveChannel<E>.toMutableSet(): MutableSet<E> =
1720 toCollection(LinkedHashSet())
1721
1722 /**
1723 * Returns `true` if all elements match the given [predicate].
1724 *
1725 * The operation is _terminal_.
1726 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1727 *
1728 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1729 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1730 */
1731 @Deprecated(
1732 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1733 level = DeprecationLevel.ERROR
1734 )
1735 public suspend inline fun <E> ReceiveChannel<E>.all(predicate: (E) -> Boolean): Boolean {
1736 consumeEach {
1737 if (!predicate(it)) return false
1738 }
1739 return true
1740 }
1741
1742 /**
1743 * Returns `true` if channel has at least one element.
1744 *
1745 * The operation is _terminal_.
1746 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1747 *
1748 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1749 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1750 */
1751 @Deprecated(
1752 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1753 level = DeprecationLevel.ERROR
1754 )
anynull1755 public suspend fun <E> ReceiveChannel<E>.any(): Boolean =
1756 consume {
1757 return iterator().hasNext()
1758 }
1759
1760 /**
1761 * Returns `true` if at least one element matches the given [predicate].
1762 *
1763 * The operation is _terminal_.
1764 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1765 *
1766 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1767 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1768 */
1769 @Deprecated(
1770 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1771 level = DeprecationLevel.ERROR
1772 )
anynull1773 public suspend inline fun <E> ReceiveChannel<E>.any(predicate: (E) -> Boolean): Boolean {
1774 consumeEach {
1775 if (predicate(it)) return true
1776 }
1777 return false
1778 }
1779
1780 /**
1781 * Returns the number of elements in this channel.
1782 *
1783 * The operation is _terminal_.
1784 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1785 *
1786 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1787 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1788 */
1789 @Deprecated(
1790 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1791 level = DeprecationLevel.ERROR
1792 )
countnull1793 public suspend fun <E> ReceiveChannel<E>.count(): Int {
1794 var count = 0
1795 consumeEach { count++ }
1796 return count
1797 }
1798
1799 /**
1800 * Returns the number of elements matching the given [predicate].
1801 *
1802 * The operation is _terminal_.
1803 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1804 *
1805 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1806 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1807 */
1808 @Deprecated(
1809 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1810 level = DeprecationLevel.ERROR
1811 )
countnull1812 public suspend inline fun <E> ReceiveChannel<E>.count(predicate: (E) -> Boolean): Int {
1813 var count = 0
1814 consumeEach {
1815 if (predicate(it)) count++
1816 }
1817 return count
1818 }
1819
1820 /**
1821 * Accumulates value starting with [initial] value and applying [operation] from left to right to current accumulator value and each element.
1822 *
1823 * The operation is _terminal_.
1824 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1825 *
1826 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1827 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1828 */
1829 @Deprecated(
1830 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1831 level = DeprecationLevel.ERROR
1832 )
foldnull1833 public suspend inline fun <E, R> ReceiveChannel<E>.fold(initial: R, operation: (acc: R, E) -> R): R {
1834 var accumulator = initial
1835 consumeEach {
1836 accumulator = operation(accumulator, it)
1837 }
1838 return accumulator
1839 }
1840
1841 /**
1842 * Accumulates value starting with [initial] value and applying [operation] from left to right
1843 * to current accumulator value and each element with its index in the original channel.
1844 * @param [operation] function that takes the index of an element, current accumulator value
1845 * and the element itself, and calculates the next accumulator value.
1846 *
1847 * The operation is _terminal_.
1848 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1849 *
1850 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1851 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1852 */
1853 @Deprecated(
1854 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1855 level = DeprecationLevel.ERROR
1856 )
foldIndexednull1857 public suspend inline fun <E, R> ReceiveChannel<E>.foldIndexed(initial: R, operation: (index: Int, acc: R, E) -> R): R {
1858 var index = 0
1859 var accumulator = initial
1860 consumeEach {
1861 accumulator = operation(index++, accumulator, it)
1862 }
1863 return accumulator
1864 }
1865
1866 /**
1867 * Returns the first element yielding the largest value of the given function or `null` if there are no elements.
1868 *
1869 * The operation is _terminal_.
1870 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1871 *
1872 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1873 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1874 */
1875 @Deprecated(
1876 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1877 level = DeprecationLevel.ERROR
1878 )
maxBynull1879 public suspend inline fun <E, R : Comparable<R>> ReceiveChannel<E>.maxBy(selector: (E) -> R): E? =
1880 consume {
1881 val iterator = iterator()
1882 if (!iterator.hasNext()) return null
1883 var maxElem = iterator.next()
1884 var maxValue = selector(maxElem)
1885 while (iterator.hasNext()) {
1886 val e = iterator.next()
1887 val v = selector(e)
1888 if (maxValue < v) {
1889 maxElem = e
1890 maxValue = v
1891 }
1892 }
1893 return maxElem
1894 }
1895
1896 /**
1897 * Returns the first element having the largest value according to the provided [comparator] or `null` if there are no elements.
1898 *
1899 * The operation is _terminal_.
1900 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1901 *
1902 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1903 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1904 */
1905 @Deprecated(
1906 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1907 level = DeprecationLevel.ERROR
1908 )
maxWithnull1909 public suspend fun <E> ReceiveChannel<E>.maxWith(comparator: Comparator<in E>): E? =
1910 consume {
1911 val iterator = iterator()
1912 if (!iterator.hasNext()) return null
1913 var max = iterator.next()
1914 while (iterator.hasNext()) {
1915 val e = iterator.next()
1916 if (comparator.compare(max, e) < 0) max = e
1917 }
1918 return max
1919 }
1920
1921 /**
1922 * Returns the first element yielding the smallest value of the given function or `null` if there are no elements.
1923 *
1924 * The operation is _terminal_.
1925 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1926 *
1927 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1928 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1929 */
1930 @Deprecated(
1931 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1932 level = DeprecationLevel.ERROR
1933 )
minBynull1934 public suspend inline fun <E, R : Comparable<R>> ReceiveChannel<E>.minBy(selector: (E) -> R): E? =
1935 consume {
1936 val iterator = iterator()
1937 if (!iterator.hasNext()) return null
1938 var minElem = iterator.next()
1939 var minValue = selector(minElem)
1940 while (iterator.hasNext()) {
1941 val e = iterator.next()
1942 val v = selector(e)
1943 if (minValue > v) {
1944 minElem = e
1945 minValue = v
1946 }
1947 }
1948 return minElem
1949 }
1950
1951 /**
1952 * Returns the first element having the smallest value according to the provided [comparator] or `null` if there are no elements.
1953 *
1954 * The operation is _terminal_.
1955 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1956 *
1957 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1958 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1959 */
1960 @Deprecated(
1961 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1962 level = DeprecationLevel.ERROR
1963 )
minWithnull1964 public suspend fun <E> ReceiveChannel<E>.minWith(comparator: Comparator<in E>): E? =
1965 consume {
1966 val iterator = iterator()
1967 if (!iterator.hasNext()) return null
1968 var min = iterator.next()
1969 while (iterator.hasNext()) {
1970 val e = iterator.next()
1971 if (comparator.compare(min, e) > 0) min = e
1972 }
1973 return min
1974 }
1975
1976 /**
1977 * Returns `true` if the channel has no elements.
1978 *
1979 * The operation is _terminal_.
1980 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1981 *
1982 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1983 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1984 */
1985 @Deprecated(
1986 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
1987 level = DeprecationLevel.ERROR
1988 )
nonenull1989 public suspend fun <E> ReceiveChannel<E>.none(): Boolean =
1990 consume {
1991 return !iterator().hasNext()
1992 }
1993
1994 /**
1995 * Returns `true` if no elements match the given [predicate].
1996 *
1997 * The operation is _terminal_.
1998 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1999 *
2000 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
2001 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
2002 */
2003 @Deprecated(
2004 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
2005 level = DeprecationLevel.ERROR
2006 )
nonenull2007 public suspend inline fun <E> ReceiveChannel<E>.none(predicate: (E) -> Boolean): Boolean {
2008 consumeEach {
2009 if (predicate(it)) return false
2010 }
2011 return true
2012 }
2013
2014 /**
2015 * Accumulates value starting with the first element and applying [operation] from left to right to current accumulator value and each element.
2016 *
2017 * The operation is _terminal_.
2018 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
2019 *
2020 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
2021 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
2022 */
2023 @Deprecated(
2024 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
2025 level = DeprecationLevel.ERROR
2026 )
reducenull2027 public suspend inline fun <S, E : S> ReceiveChannel<E>.reduce(operation: (acc: S, E) -> S): S =
2028 consume {
2029 val iterator = this.iterator()
2030 if (!iterator.hasNext()) throw UnsupportedOperationException("Empty channel can't be reduced.")
2031 var accumulator: S = iterator.next()
2032 while (iterator.hasNext()) {
2033 accumulator = operation(accumulator, iterator.next())
2034 }
2035 return accumulator
2036 }
2037
2038 /**
2039 * Accumulates value starting with the first element and applying [operation] from left to right
2040 * to current accumulator value and each element with its index in the original channel.
2041 * @param [operation] function that takes the index of an element, current accumulator value
2042 * and the element itself and calculates the next accumulator value.
2043 *
2044 * The operation is _terminal_.
2045 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
2046 *
2047 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
2048 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
2049 */
2050 @Deprecated(
2051 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
2052 level = DeprecationLevel.ERROR
2053 )
reduceIndexednull2054 public suspend inline fun <S, E : S> ReceiveChannel<E>.reduceIndexed(operation: (index: Int, acc: S, E) -> S): S =
2055 consume {
2056 val iterator = this.iterator()
2057 if (!iterator.hasNext()) throw UnsupportedOperationException("Empty channel can't be reduced.")
2058 var index = 1
2059 var accumulator: S = iterator.next()
2060 while (iterator.hasNext()) {
2061 accumulator = operation(index++, accumulator, iterator.next())
2062 }
2063 return accumulator
2064 }
2065
2066 /**
2067 * Returns the sum of all values produced by [selector] function applied to each element in the channel.
2068 *
2069 * The operation is _terminal_.
2070 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
2071 *
2072 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
2073 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
2074 */
2075 @Deprecated(
2076 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
2077 level = DeprecationLevel.ERROR
2078 )
sumBynull2079 public suspend inline fun <E> ReceiveChannel<E>.sumBy(selector: (E) -> Int): Int {
2080 var sum = 0
2081 consumeEach {
2082 sum += selector(it)
2083 }
2084 return sum
2085 }
2086
2087 /**
2088 * Returns the sum of all values produced by [selector] function applied to each element in the channel.
2089 *
2090 * The operation is _terminal_.
2091 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
2092 *
2093 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
2094 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
2095 */
2096 @Deprecated(
2097 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
2098 level = DeprecationLevel.ERROR
2099 )
sumByDoublenull2100 public suspend inline fun <E> ReceiveChannel<E>.sumByDouble(selector: (E) -> Double): Double {
2101 var sum = 0.0
2102 consumeEach {
2103 sum += selector(it)
2104 }
2105 return sum
2106 }
2107
2108 /**
2109 * Returns an original collection containing all the non-`null` elements, throwing an [IllegalArgumentException] if there are any `null` elements.
2110 *
2111 * The operation is _intermediate_ and _stateless_.
2112 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
2113 *
2114 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
2115 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
2116 */
2117 @Deprecated(
2118 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
2119 level = DeprecationLevel.ERROR
2120 )
requireNoNullsnull2121 public fun <E : Any> ReceiveChannel<E?>.requireNoNulls(): ReceiveChannel<E> =
2122 map { it ?: throw IllegalArgumentException("null element found in $this.") }
2123
2124 /**
2125 * Splits the original channel into pair of lists,
2126 * where *first* list contains elements for which [predicate] yielded `true`,
2127 * while *second* list contains elements for which [predicate] yielded `false`.
2128 *
2129 * The operation is _terminal_.
2130 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
2131 *
2132 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
2133 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
2134 */
2135 @Deprecated(
2136 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
2137 level = DeprecationLevel.ERROR
2138 )
partitionnull2139 public suspend inline fun <E> ReceiveChannel<E>.partition(predicate: (E) -> Boolean): Pair<List<E>, List<E>> {
2140 val first = ArrayList<E>()
2141 val second = ArrayList<E>()
2142 consumeEach {
2143 if (predicate(it)) {
2144 first.add(it)
2145 } else {
2146 second.add(it)
2147 }
2148 }
2149 return Pair(first, second)
2150 }
2151
2152 /**
2153 * Returns a channel of pairs built from elements of both channels with same indexes.
2154 * Resulting channel has length of shortest input channel.
2155 *
2156 * The operation is _intermediate_ and _stateless_.
2157 * This function [consumes][ReceiveChannel.consume] all elements of both the original [ReceiveChannel] and the `other` one.
2158 *
2159 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
2160 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
2161 */
2162 @Deprecated(
2163 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
2164 level = DeprecationLevel.ERROR
2165 )
zipnull2166 public infix fun <E, R> ReceiveChannel<E>.zip(other: ReceiveChannel<R>): ReceiveChannel<Pair<E, R>> =
2167 zip(other) { t1, t2 -> t1 to t2 }
2168
2169 /**
2170 * Returns a channel of values built from elements of both collections with same indexes using provided [transform]. Resulting channel has length of shortest input channels.
2171 *
2172 * The operation is _intermediate_ and _stateless_.
2173 * This function [consumes][consume] all elements of both the original [ReceiveChannel] and the `other` one.
2174 *
2175 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
2176 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
2177 */
2178 @Deprecated(
2179 message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
2180 level = DeprecationLevel.ERROR
2181 )
zipnull2182 public fun <E, R, V> ReceiveChannel<E>.zip(other: ReceiveChannel<R>, context: CoroutineContext = Dispatchers.Unconfined, transform: (a: E, b: R) -> V): ReceiveChannel<V> =
2183 GlobalScope.produce(context, onCompletion = consumesAll(this, other)) {
2184 val otherIterator = other.iterator()
2185 this@zip.consumeEach { element1 ->
2186 if (!otherIterator.hasNext()) return@consumeEach
2187 val element2 = otherIterator.next()
2188 send(transform(element1, element2))
2189 }
2190 }
2191