<lambda>null1 @file:JvmMultifileClass
2 @file:JvmName("ChannelsKt")
3 @file:Suppress("unused")
4
5 package kotlinx.coroutines.channels
6
7 import kotlinx.coroutines.*
8 import kotlin.coroutines.*
9 import kotlin.jvm.*
10
11 /**
12 * Opens subscription to this [BroadcastChannel] and makes sure that the given [block] consumes all elements
13 * from it by always invoking [cancel][ReceiveChannel.cancel] after the execution of the block.
14 *
15 * **Note: This API is obsolete since 1.5.0 and deprecated for removal since 1.7.0**
16 * It is replaced with [SharedFlow][kotlinx.coroutines.flow.SharedFlow].
17 *
18 * Safe to remove in 1.9.0 as was inline before.
19 */
20 @ObsoleteCoroutinesApi
21 @Suppress("DEPRECATION_ERROR")
22 @Deprecated(level = DeprecationLevel.ERROR, message = "BroadcastChannel is deprecated in the favour of SharedFlow and is no longer supported")
23 public inline fun <E, R> BroadcastChannel<E>.consume(block: ReceiveChannel<E>.() -> R): R {
24 val channel = openSubscription()
25 try {
26 return channel.block()
27 } finally {
28 channel.cancel()
29 }
30 }
31
32 /**
33 * Subscribes to this [BroadcastChannel] and performs the specified action for each received element.
34 *
35 * **Note: This API is obsolete since 1.5.0 and deprecated for removal since 1.7.0**
36 */
37 @Deprecated(level = DeprecationLevel.ERROR, message = "BroadcastChannel is deprecated in the favour of SharedFlow and is no longer supported")
38 @Suppress("DEPRECATION", "DEPRECATION_ERROR")
consumeEachnull39 public suspend inline fun <E> BroadcastChannel<E>.consumeEach(action: (E) -> Unit): Unit =
40 consume {
41 for (element in this) action(element)
42 }
43
44 /** @suppress **/
45 @PublishedApi // Binary compatibility
consumesAllnull46 internal fun consumesAll(vararg channels: ReceiveChannel<*>): CompletionHandler =
47 { cause: Throwable? ->
48 var exception: Throwable? = null
49 for (channel in channels)
50 try {
51 channel.cancelConsumed(cause)
52 } catch (e: Throwable) {
53 if (exception == null) {
54 exception = e
55 } else {
56 exception.addSuppressed(e)
57 }
58 }
59 exception?.let { throw it }
60 }
61
62 /** @suppress **/
63 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
<lambda>null64 public suspend fun <E> ReceiveChannel<E>.elementAt(index: Int): E = consume {
65 if (index < 0)
66 throw IndexOutOfBoundsException("ReceiveChannel doesn't contain element at index $index.")
67 var count = 0
68 for (element in this) {
69 @Suppress("UNUSED_CHANGED_VALUE") // KT-47628
70 if (index == count++)
71 return element
72 }
73 throw IndexOutOfBoundsException("ReceiveChannel doesn't contain element at index $index.")
74 }
75
76 /** @suppress **/
77 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
elementAtOrNullnull78 public suspend fun <E> ReceiveChannel<E>.elementAtOrNull(index: Int): E? =
79 consume {
80 if (index < 0)
81 return null
82 var count = 0
83 for (element in this) {
84 if (index == count++)
85 return element
86 }
87 return null
88 }
89
90 /** @suppress **/
91 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
firstnull92 public suspend fun <E> ReceiveChannel<E>.first(): E =
93 consume {
94 val iterator = iterator()
95 if (!iterator.hasNext())
96 throw NoSuchElementException("ReceiveChannel is empty.")
97 return iterator.next()
98 }
99
100 /** @suppress **/
101 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
firstOrNullnull102 public suspend fun <E> ReceiveChannel<E>.firstOrNull(): E? =
103 consume {
104 val iterator = iterator()
105 if (!iterator.hasNext())
106 return null
107 return iterator.next()
108 }
109
110 /** @suppress **/
111 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
indexOfnull112 public suspend fun <E> ReceiveChannel<E>.indexOf(element: E): Int {
113 var index = 0
114 consumeEach {
115 if (element == it)
116 return index
117 index++
118 }
119 return -1
120 }
121
122 /** @suppress **/
123 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
lastnull124 public suspend fun <E> ReceiveChannel<E>.last(): E =
125 consume {
126 val iterator = iterator()
127 if (!iterator.hasNext())
128 throw NoSuchElementException("ReceiveChannel is empty.")
129 var last = iterator.next()
130 while (iterator.hasNext())
131 last = iterator.next()
132 return last
133 }
134
135 /** @suppress **/
136 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
lastIndexOfnull137 public suspend fun <E> ReceiveChannel<E>.lastIndexOf(element: E): Int {
138 var lastIndex = -1
139 var index = 0
140 consumeEach {
141 if (element == it)
142 lastIndex = index
143 index++
144 }
145 return lastIndex
146 }
147
148 /** @suppress **/
149 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
lastOrNullnull150 public suspend fun <E> ReceiveChannel<E>.lastOrNull(): E? =
151 consume {
152 val iterator = iterator()
153 if (!iterator.hasNext())
154 return null
155 var last = iterator.next()
156 while (iterator.hasNext())
157 last = iterator.next()
158 return last
159 }
160
161 /** @suppress **/
162 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
singlenull163 public suspend fun <E> ReceiveChannel<E>.single(): E =
164 consume {
165 val iterator = iterator()
166 if (!iterator.hasNext())
167 throw NoSuchElementException("ReceiveChannel is empty.")
168 val single = iterator.next()
169 if (iterator.hasNext())
170 throw IllegalArgumentException("ReceiveChannel has more than one element.")
171 return single
172 }
173
174 /** @suppress **/
175 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
singleOrNullnull176 public suspend fun <E> ReceiveChannel<E>.singleOrNull(): E? =
177 consume {
178 val iterator = iterator()
179 if (!iterator.hasNext())
180 return null
181 val single = iterator.next()
182 if (iterator.hasNext())
183 return null
184 return single
185 }
186
187 /** @suppress **/
188 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
dropnull189 public fun <E> ReceiveChannel<E>.drop(n: Int, context: CoroutineContext = Dispatchers.Unconfined): ReceiveChannel<E> =
190 GlobalScope.produce(context, onCompletion = consumes()) {
191 require(n >= 0) { "Requested element count $n is less than zero." }
192 var remaining: Int = n
193 if (remaining > 0)
194 for (e in this@drop) {
195 remaining--
196 if (remaining == 0)
197 break
198 }
199 for (e in this@drop) {
200 send(e)
201 }
202 }
203
204 /** @suppress **/
205 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
dropWhilenull206 public fun <E> ReceiveChannel<E>.dropWhile(
207 context: CoroutineContext = Dispatchers.Unconfined,
208 predicate: suspend (E) -> Boolean
209 ): ReceiveChannel<E> =
210 GlobalScope.produce(context, onCompletion = consumes()) {
211 for (e in this@dropWhile) {
212 if (!predicate(e)) {
213 send(e)
214 break
215 }
216 }
217 for (e in this@dropWhile) {
218 send(e)
219 }
220 }
221
222 @PublishedApi
filternull223 internal fun <E> ReceiveChannel<E>.filter(
224 context: CoroutineContext = Dispatchers.Unconfined,
225 predicate: suspend (E) -> Boolean
226 ): ReceiveChannel<E> =
227 GlobalScope.produce(context, onCompletion = consumes()) {
228 for (e in this@filter) {
229 if (predicate(e)) send(e)
230 }
231 }
232
233 /** @suppress **/
234 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
filterIndexednull235 public fun <E> ReceiveChannel<E>.filterIndexed(
236 context: CoroutineContext = Dispatchers.Unconfined,
237 predicate: suspend (index: Int, E) -> Boolean
238 ): ReceiveChannel<E> =
239 GlobalScope.produce(context, onCompletion = consumes()) {
240 var index = 0
241 for (e in this@filterIndexed) {
242 if (predicate(index++, e)) send(e)
243 }
244 }
245
246 /** @suppress **/
247 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
filterNotnull248 public fun <E> ReceiveChannel<E>.filterNot(
249 context: CoroutineContext = Dispatchers.Unconfined,
250 predicate: suspend (E) -> Boolean
251 ): ReceiveChannel<E> =
252 filter(context) { !predicate(it) }
253
254 @PublishedApi
255 @Suppress("UNCHECKED_CAST")
filterNotNullnull256 internal fun <E : Any> ReceiveChannel<E?>.filterNotNull(): ReceiveChannel<E> =
257 filter { it != null } as ReceiveChannel<E>
258
259 /** @suppress **/
260 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
filterNotNullTonull261 public suspend fun <E : Any, C : MutableCollection<in E>> ReceiveChannel<E?>.filterNotNullTo(destination: C): C {
262 consumeEach {
263 if (it != null) destination.add(it)
264 }
265 return destination
266 }
267
268 /** @suppress **/
269 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
filterNotNullTonull270 public suspend fun <E : Any, C : SendChannel<E>> ReceiveChannel<E?>.filterNotNullTo(destination: C): C {
271 consumeEach {
272 if (it != null) destination.send(it)
273 }
274 return destination
275 }
276
277 /** @suppress **/
278 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
takenull279 public fun <E> ReceiveChannel<E>.take(n: Int, context: CoroutineContext = Dispatchers.Unconfined): ReceiveChannel<E> =
280 GlobalScope.produce(context, onCompletion = consumes()) {
281 if (n == 0) return@produce
282 require(n >= 0) { "Requested element count $n is less than zero." }
283 var remaining: Int = n
284 for (e in this@take) {
285 send(e)
286 remaining--
287 if (remaining == 0)
288 return@produce
289 }
290 }
291
292 /** @suppress **/
293 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
takeWhilenull294 public fun <E> ReceiveChannel<E>.takeWhile(
295 context: CoroutineContext = Dispatchers.Unconfined,
296 predicate: suspend (E) -> Boolean
297 ): ReceiveChannel<E> =
298 GlobalScope.produce(context, onCompletion = consumes()) {
299 for (e in this@takeWhile) {
300 if (!predicate(e)) return@produce
301 send(e)
302 }
303 }
304
305 @PublishedApi
toChannelnull306 internal suspend fun <E, C : SendChannel<E>> ReceiveChannel<E>.toChannel(destination: C): C {
307 consumeEach {
308 destination.send(it)
309 }
310 return destination
311 }
312
313 @PublishedApi
toCollectionnull314 internal suspend fun <E, C : MutableCollection<in E>> ReceiveChannel<E>.toCollection(destination: C): C {
315 consumeEach {
316 destination.add(it)
317 }
318 return destination
319 }
320
321 /** @suppress **/
322 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
toMapnull323 public suspend fun <K, V> ReceiveChannel<Pair<K, V>>.toMap(): Map<K, V> =
324 toMap(LinkedHashMap())
325
326 @PublishedApi
327 internal suspend fun <K, V, M : MutableMap<in K, in V>> ReceiveChannel<Pair<K, V>>.toMap(destination: M): M {
328 consumeEach {
329 destination += it
330 }
331 return destination
332 }
333
334 /** @suppress **/
335 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
toMutableListnull336 public suspend fun <E> ReceiveChannel<E>.toMutableList(): MutableList<E> =
337 toCollection(ArrayList())
338
339 /** @suppress **/
340 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
341 public suspend fun <E> ReceiveChannel<E>.toSet(): Set<E> =
342 this.toMutableSet()
343
344 /** @suppress **/
345 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
346 public fun <E, R> ReceiveChannel<E>.flatMap(
347 context: CoroutineContext = Dispatchers.Unconfined,
348 transform: suspend (E) -> ReceiveChannel<R>
349 ): ReceiveChannel<R> =
350 GlobalScope.produce(context, onCompletion = consumes()) {
351 for (e in this@flatMap) {
352 transform(e).toChannel(this)
353 }
354 }
355
356 @PublishedApi
mapnull357 internal fun <E, R> ReceiveChannel<E>.map(
358 context: CoroutineContext = Dispatchers.Unconfined,
359 transform: suspend (E) -> R
360 ): ReceiveChannel<R> =
361 GlobalScope.produce(context, onCompletion = consumes()) {
362 consumeEach {
363 send(transform(it))
364 }
365 }
366
367 @PublishedApi
mapIndexednull368 internal fun <E, R> ReceiveChannel<E>.mapIndexed(
369 context: CoroutineContext = Dispatchers.Unconfined,
370 transform: suspend (index: Int, E) -> R
371 ): ReceiveChannel<R> =
372 GlobalScope.produce(context, onCompletion = consumes()) {
373 var index = 0
374 for (e in this@mapIndexed) {
375 send(transform(index++, e))
376 }
377 }
378
379 /** @suppress **/
380 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
mapIndexedNotNullnull381 public fun <E, R : Any> ReceiveChannel<E>.mapIndexedNotNull(
382 context: CoroutineContext = Dispatchers.Unconfined,
383 transform: suspend (index: Int, E) -> R?
384 ): ReceiveChannel<R> =
385 mapIndexed(context, transform).filterNotNull()
386
387 /** @suppress **/
388 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
389 public fun <E, R : Any> ReceiveChannel<E>.mapNotNull(
390 context: CoroutineContext = Dispatchers.Unconfined,
391 transform: suspend (E) -> R?
392 ): ReceiveChannel<R> =
393 map(context, transform).filterNotNull()
394
395 /** @suppress **/
396 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
397 public fun <E> ReceiveChannel<E>.withIndex(context: CoroutineContext = Dispatchers.Unconfined): ReceiveChannel<IndexedValue<E>> =
398 GlobalScope.produce(context, onCompletion = consumes()) {
399 var index = 0
400 for (e in this@withIndex) {
401 send(IndexedValue(index++, e))
402 }
403 }
404
405 /** @suppress **/
406 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
distinctnull407 public fun <E> ReceiveChannel<E>.distinct(): ReceiveChannel<E> =
408 this.distinctBy { it }
409
410 @PublishedApi
distinctBynull411 internal fun <E, K> ReceiveChannel<E>.distinctBy(
412 context: CoroutineContext = Dispatchers.Unconfined,
413 selector: suspend (E) -> K
414 ): ReceiveChannel<E> =
415 GlobalScope.produce(context, onCompletion = consumes()) {
416 val keys = HashSet<K>()
417 for (e in this@distinctBy) {
418 val k = selector(e)
419 if (k !in keys) {
420 send(e)
421 keys += k
422 }
423 }
424 }
425
426 @PublishedApi
toMutableSetnull427 internal suspend fun <E> ReceiveChannel<E>.toMutableSet(): MutableSet<E> =
428 toCollection(LinkedHashSet())
429
430 /** @suppress **/
431 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
432 public suspend fun <E> ReceiveChannel<E>.any(): Boolean =
433 consume {
434 return iterator().hasNext()
435 }
436
437 /** @suppress **/
438 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
countnull439 public suspend fun <E> ReceiveChannel<E>.count(): Int {
440 var count = 0
441 consumeEach { count++ }
442 return count
443 }
444
445 /** @suppress **/
446 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
maxWithnull447 public suspend fun <E> ReceiveChannel<E>.maxWith(comparator: Comparator<in E>): E? =
448 consume {
449 val iterator = iterator()
450 if (!iterator.hasNext()) return null
451 var max = iterator.next()
452 while (iterator.hasNext()) {
453 val e = iterator.next()
454 if (comparator.compare(max, e) < 0) max = e
455 }
456 return max
457 }
458
459 /** @suppress **/
460 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
minWithnull461 public suspend fun <E> ReceiveChannel<E>.minWith(comparator: Comparator<in E>): E? =
462 consume {
463 val iterator = iterator()
464 if (!iterator.hasNext()) return null
465 var min = iterator.next()
466 while (iterator.hasNext()) {
467 val e = iterator.next()
468 if (comparator.compare(min, e) > 0) min = e
469 }
470 return min
471 }
472
473 /** @suppress **/
474 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
nonenull475 public suspend fun <E> ReceiveChannel<E>.none(): Boolean =
476 consume {
477 return !iterator().hasNext()
478 }
479
480 /** @suppress **/
481 @Deprecated(message = "Left for binary compatibility", level = DeprecationLevel.HIDDEN)
requireNoNullsnull482 public fun <E : Any> ReceiveChannel<E?>.requireNoNulls(): ReceiveChannel<E> =
483 map { it ?: throw IllegalArgumentException("null element found in $this.") }
484
485 /** @suppress **/
486 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
zipnull487 public infix fun <E, R> ReceiveChannel<E>.zip(other: ReceiveChannel<R>): ReceiveChannel<Pair<E, R>> =
488 zip(other) { t1, t2 -> t1 to t2 }
489
490 @PublishedApi // Binary compatibility
zipnull491 internal fun <E, R, V> ReceiveChannel<E>.zip(
492 other: ReceiveChannel<R>,
493 context: CoroutineContext = Dispatchers.Unconfined,
494 transform: (a: E, b: R) -> V
495 ): ReceiveChannel<V> =
496 GlobalScope.produce(context, onCompletion = consumesAll(this, other)) {
497 val otherIterator = other.iterator()
498 this@zip.consumeEach { element1 ->
499 if (!otherIterator.hasNext()) return@consumeEach
500 val element2 = otherIterator.next()
501 send(transform(element1, element2))
502 }
503 }
504
505 @PublishedApi // Binary compatibility
causenull506 internal fun ReceiveChannel<*>.consumes(): CompletionHandler = { cause: Throwable? ->
507 cancelConsumed(cause)
508 }
509