• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download

<lambda>null1 @file:JvmMultifileClass
2 @file:JvmName("ChannelsKt")
3 @file:Suppress("unused")
4 
5 package kotlinx.coroutines.channels
6 
7 import kotlinx.coroutines.*
8 import kotlin.coroutines.*
9 import kotlin.jvm.*
10 
11 /**
12  * Opens subscription to this [BroadcastChannel] and makes sure that the given [block] consumes all elements
13  * from it by always invoking [cancel][ReceiveChannel.cancel] after the execution of the block.
14  *
15  * **Note: This API is obsolete since 1.5.0 and deprecated for removal since 1.7.0**
16  * It is replaced with [SharedFlow][kotlinx.coroutines.flow.SharedFlow].
17  *
18  * Safe to remove in 1.9.0 as was inline before.
19  */
20 @ObsoleteCoroutinesApi
21 @Suppress("DEPRECATION_ERROR")
22 @Deprecated(level = DeprecationLevel.ERROR, message = "BroadcastChannel is deprecated in the favour of SharedFlow and is no longer supported")
23 public inline fun <E, R> BroadcastChannel<E>.consume(block: ReceiveChannel<E>.() -> R): R {
24     val channel = openSubscription()
25     try {
26         return channel.block()
27     } finally {
28         channel.cancel()
29     }
30 }
31 
32 /**
33  * Subscribes to this [BroadcastChannel] and performs the specified action for each received element.
34  *
35  * **Note: This API is obsolete since 1.5.0 and deprecated for removal since 1.7.0**
36  */
37 @Deprecated(level = DeprecationLevel.ERROR, message = "BroadcastChannel is deprecated in the favour of SharedFlow and is no longer supported")
38 @Suppress("DEPRECATION", "DEPRECATION_ERROR")
consumeEachnull39 public suspend inline fun <E> BroadcastChannel<E>.consumeEach(action: (E) -> Unit): Unit =
40     consume {
41         for (element in this) action(element)
42     }
43 
44 /** @suppress **/
45 @PublishedApi // Binary compatibility
consumesAllnull46 internal fun consumesAll(vararg channels: ReceiveChannel<*>): CompletionHandler =
47     { cause: Throwable? ->
48         var exception: Throwable? = null
49         for (channel in channels)
50             try {
51                 channel.cancelConsumed(cause)
52             } catch (e: Throwable) {
53                 if (exception == null) {
54                     exception = e
55                 } else {
56                     exception.addSuppressed(e)
57                 }
58             }
59         exception?.let { throw it }
60     }
61 
62 /** @suppress **/
63 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
<lambda>null64 public suspend fun <E> ReceiveChannel<E>.elementAt(index: Int): E = consume {
65     if (index < 0)
66         throw IndexOutOfBoundsException("ReceiveChannel doesn't contain element at index $index.")
67     var count = 0
68     for (element in this) {
69         @Suppress("UNUSED_CHANGED_VALUE") // KT-47628
70         if (index == count++)
71             return element
72     }
73     throw IndexOutOfBoundsException("ReceiveChannel doesn't contain element at index $index.")
74 }
75 
76 /** @suppress **/
77 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
elementAtOrNullnull78 public suspend fun <E> ReceiveChannel<E>.elementAtOrNull(index: Int): E? =
79     consume {
80         if (index < 0)
81             return null
82         var count = 0
83         for (element in this) {
84             if (index == count++)
85                 return element
86         }
87         return null
88     }
89 
90 /** @suppress **/
91 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
firstnull92 public suspend fun <E> ReceiveChannel<E>.first(): E =
93     consume {
94         val iterator = iterator()
95         if (!iterator.hasNext())
96             throw NoSuchElementException("ReceiveChannel is empty.")
97         return iterator.next()
98     }
99 
100 /** @suppress **/
101 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
firstOrNullnull102 public suspend fun <E> ReceiveChannel<E>.firstOrNull(): E? =
103     consume {
104         val iterator = iterator()
105         if (!iterator.hasNext())
106             return null
107         return iterator.next()
108     }
109 
110 /** @suppress **/
111 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
indexOfnull112 public suspend fun <E> ReceiveChannel<E>.indexOf(element: E): Int {
113     var index = 0
114     consumeEach {
115         if (element == it)
116             return index
117         index++
118     }
119     return -1
120 }
121 
122 /** @suppress **/
123 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
lastnull124 public suspend fun <E> ReceiveChannel<E>.last(): E =
125     consume {
126         val iterator = iterator()
127         if (!iterator.hasNext())
128             throw NoSuchElementException("ReceiveChannel is empty.")
129         var last = iterator.next()
130         while (iterator.hasNext())
131             last = iterator.next()
132         return last
133     }
134 
135 /** @suppress **/
136 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
lastIndexOfnull137 public suspend fun <E> ReceiveChannel<E>.lastIndexOf(element: E): Int {
138     var lastIndex = -1
139     var index = 0
140     consumeEach {
141         if (element == it)
142             lastIndex = index
143         index++
144     }
145     return lastIndex
146 }
147 
148 /** @suppress **/
149 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
lastOrNullnull150 public suspend fun <E> ReceiveChannel<E>.lastOrNull(): E? =
151     consume {
152         val iterator = iterator()
153         if (!iterator.hasNext())
154             return null
155         var last = iterator.next()
156         while (iterator.hasNext())
157             last = iterator.next()
158         return last
159     }
160 
161 /** @suppress **/
162 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
singlenull163 public suspend fun <E> ReceiveChannel<E>.single(): E =
164     consume {
165         val iterator = iterator()
166         if (!iterator.hasNext())
167             throw NoSuchElementException("ReceiveChannel is empty.")
168         val single = iterator.next()
169         if (iterator.hasNext())
170             throw IllegalArgumentException("ReceiveChannel has more than one element.")
171         return single
172     }
173 
174 /** @suppress **/
175 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
singleOrNullnull176 public suspend fun <E> ReceiveChannel<E>.singleOrNull(): E? =
177     consume {
178         val iterator = iterator()
179         if (!iterator.hasNext())
180             return null
181         val single = iterator.next()
182         if (iterator.hasNext())
183             return null
184         return single
185     }
186 
187 /** @suppress **/
188 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
dropnull189 public fun <E> ReceiveChannel<E>.drop(n: Int, context: CoroutineContext = Dispatchers.Unconfined): ReceiveChannel<E> =
190     GlobalScope.produce(context, onCompletion = consumes()) {
191         require(n >= 0) { "Requested element count $n is less than zero." }
192         var remaining: Int = n
193         if (remaining > 0)
194             for (e in this@drop) {
195                 remaining--
196                 if (remaining == 0)
197                     break
198             }
199         for (e in this@drop) {
200             send(e)
201         }
202     }
203 
204 /** @suppress **/
205 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
dropWhilenull206 public fun <E> ReceiveChannel<E>.dropWhile(
207     context: CoroutineContext = Dispatchers.Unconfined,
208     predicate: suspend (E) -> Boolean
209 ): ReceiveChannel<E> =
210     GlobalScope.produce(context, onCompletion = consumes()) {
211         for (e in this@dropWhile) {
212             if (!predicate(e)) {
213                 send(e)
214                 break
215             }
216         }
217         for (e in this@dropWhile) {
218             send(e)
219         }
220     }
221 
222 @PublishedApi
filternull223 internal fun <E> ReceiveChannel<E>.filter(
224     context: CoroutineContext = Dispatchers.Unconfined,
225     predicate: suspend (E) -> Boolean
226 ): ReceiveChannel<E> =
227     GlobalScope.produce(context, onCompletion = consumes()) {
228         for (e in this@filter) {
229             if (predicate(e)) send(e)
230         }
231     }
232 
233 /** @suppress **/
234 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
filterIndexednull235 public fun <E> ReceiveChannel<E>.filterIndexed(
236     context: CoroutineContext = Dispatchers.Unconfined,
237     predicate: suspend (index: Int, E) -> Boolean
238 ): ReceiveChannel<E> =
239     GlobalScope.produce(context, onCompletion = consumes()) {
240         var index = 0
241         for (e in this@filterIndexed) {
242             if (predicate(index++, e)) send(e)
243         }
244     }
245 
246 /** @suppress **/
247 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
filterNotnull248 public fun <E> ReceiveChannel<E>.filterNot(
249     context: CoroutineContext = Dispatchers.Unconfined,
250     predicate: suspend (E) -> Boolean
251 ): ReceiveChannel<E> =
252     filter(context) { !predicate(it) }
253 
254 @PublishedApi
255 @Suppress("UNCHECKED_CAST")
filterNotNullnull256 internal fun <E : Any> ReceiveChannel<E?>.filterNotNull(): ReceiveChannel<E> =
257     filter { it != null } as ReceiveChannel<E>
258 
259 /** @suppress **/
260 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
filterNotNullTonull261 public suspend fun <E : Any, C : MutableCollection<in E>> ReceiveChannel<E?>.filterNotNullTo(destination: C): C {
262     consumeEach {
263         if (it != null) destination.add(it)
264     }
265     return destination
266 }
267 
268 /** @suppress **/
269 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
filterNotNullTonull270 public suspend fun <E : Any, C : SendChannel<E>> ReceiveChannel<E?>.filterNotNullTo(destination: C): C {
271     consumeEach {
272         if (it != null) destination.send(it)
273     }
274     return destination
275 }
276 
277 /** @suppress **/
278 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
takenull279 public fun <E> ReceiveChannel<E>.take(n: Int, context: CoroutineContext = Dispatchers.Unconfined): ReceiveChannel<E> =
280     GlobalScope.produce(context, onCompletion = consumes()) {
281         if (n == 0) return@produce
282         require(n >= 0) { "Requested element count $n is less than zero." }
283         var remaining: Int = n
284         for (e in this@take) {
285             send(e)
286             remaining--
287             if (remaining == 0)
288                 return@produce
289         }
290     }
291 
292 /** @suppress **/
293 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
takeWhilenull294 public fun <E> ReceiveChannel<E>.takeWhile(
295     context: CoroutineContext = Dispatchers.Unconfined,
296     predicate: suspend (E) -> Boolean
297 ): ReceiveChannel<E> =
298     GlobalScope.produce(context, onCompletion = consumes()) {
299         for (e in this@takeWhile) {
300             if (!predicate(e)) return@produce
301             send(e)
302         }
303     }
304 
305 @PublishedApi
toChannelnull306 internal suspend fun <E, C : SendChannel<E>> ReceiveChannel<E>.toChannel(destination: C): C {
307     consumeEach {
308         destination.send(it)
309     }
310     return destination
311 }
312 
313 @PublishedApi
toCollectionnull314 internal suspend fun <E, C : MutableCollection<in E>> ReceiveChannel<E>.toCollection(destination: C): C {
315     consumeEach {
316         destination.add(it)
317     }
318     return destination
319 }
320 
321 /** @suppress **/
322 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
toMapnull323 public suspend fun <K, V> ReceiveChannel<Pair<K, V>>.toMap(): Map<K, V> =
324     toMap(LinkedHashMap())
325 
326 @PublishedApi
327 internal suspend fun <K, V, M : MutableMap<in K, in V>> ReceiveChannel<Pair<K, V>>.toMap(destination: M): M {
328     consumeEach {
329         destination += it
330     }
331     return destination
332 }
333 
334 /** @suppress **/
335 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
toMutableListnull336 public suspend fun <E> ReceiveChannel<E>.toMutableList(): MutableList<E> =
337     toCollection(ArrayList())
338 
339 /** @suppress **/
340 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
341 public suspend fun <E> ReceiveChannel<E>.toSet(): Set<E> =
342     this.toMutableSet()
343 
344 /** @suppress **/
345 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
346 public fun <E, R> ReceiveChannel<E>.flatMap(
347     context: CoroutineContext = Dispatchers.Unconfined,
348     transform: suspend (E) -> ReceiveChannel<R>
349 ): ReceiveChannel<R> =
350     GlobalScope.produce(context, onCompletion = consumes()) {
351         for (e in this@flatMap) {
352             transform(e).toChannel(this)
353         }
354     }
355 
356 @PublishedApi
mapnull357 internal fun <E, R> ReceiveChannel<E>.map(
358     context: CoroutineContext = Dispatchers.Unconfined,
359     transform: suspend (E) -> R
360 ): ReceiveChannel<R> =
361     GlobalScope.produce(context, onCompletion = consumes()) {
362         consumeEach {
363             send(transform(it))
364         }
365     }
366 
367 @PublishedApi
mapIndexednull368 internal fun <E, R> ReceiveChannel<E>.mapIndexed(
369     context: CoroutineContext = Dispatchers.Unconfined,
370     transform: suspend (index: Int, E) -> R
371 ): ReceiveChannel<R> =
372     GlobalScope.produce(context, onCompletion = consumes()) {
373         var index = 0
374         for (e in this@mapIndexed) {
375             send(transform(index++, e))
376         }
377     }
378 
379 /** @suppress **/
380 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
mapIndexedNotNullnull381 public fun <E, R : Any> ReceiveChannel<E>.mapIndexedNotNull(
382     context: CoroutineContext = Dispatchers.Unconfined,
383     transform: suspend (index: Int, E) -> R?
384 ): ReceiveChannel<R> =
385     mapIndexed(context, transform).filterNotNull()
386 
387 /** @suppress **/
388 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
389 public fun <E, R : Any> ReceiveChannel<E>.mapNotNull(
390     context: CoroutineContext = Dispatchers.Unconfined,
391     transform: suspend (E) -> R?
392 ): ReceiveChannel<R> =
393     map(context, transform).filterNotNull()
394 
395 /** @suppress **/
396 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
397 public fun <E> ReceiveChannel<E>.withIndex(context: CoroutineContext = Dispatchers.Unconfined): ReceiveChannel<IndexedValue<E>> =
398     GlobalScope.produce(context, onCompletion = consumes()) {
399         var index = 0
400         for (e in this@withIndex) {
401             send(IndexedValue(index++, e))
402         }
403     }
404 
405 /** @suppress **/
406 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
distinctnull407 public fun <E> ReceiveChannel<E>.distinct(): ReceiveChannel<E> =
408     this.distinctBy { it }
409 
410 @PublishedApi
distinctBynull411 internal fun <E, K> ReceiveChannel<E>.distinctBy(
412     context: CoroutineContext = Dispatchers.Unconfined,
413     selector: suspend (E) -> K
414 ): ReceiveChannel<E> =
415     GlobalScope.produce(context, onCompletion = consumes()) {
416         val keys = HashSet<K>()
417         for (e in this@distinctBy) {
418             val k = selector(e)
419             if (k !in keys) {
420                 send(e)
421                 keys += k
422             }
423         }
424     }
425 
426 @PublishedApi
toMutableSetnull427 internal suspend fun <E> ReceiveChannel<E>.toMutableSet(): MutableSet<E> =
428     toCollection(LinkedHashSet())
429 
430 /** @suppress **/
431 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
432 public suspend fun <E> ReceiveChannel<E>.any(): Boolean =
433     consume {
434         return iterator().hasNext()
435     }
436 
437 /** @suppress **/
438 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
countnull439 public suspend fun <E> ReceiveChannel<E>.count(): Int {
440     var count = 0
441     consumeEach { count++ }
442     return count
443 }
444 
445 /** @suppress **/
446 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
maxWithnull447 public suspend fun <E> ReceiveChannel<E>.maxWith(comparator: Comparator<in E>): E? =
448     consume {
449         val iterator = iterator()
450         if (!iterator.hasNext()) return null
451         var max = iterator.next()
452         while (iterator.hasNext()) {
453             val e = iterator.next()
454             if (comparator.compare(max, e) < 0) max = e
455         }
456         return max
457     }
458 
459 /** @suppress **/
460 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
minWithnull461 public suspend fun <E> ReceiveChannel<E>.minWith(comparator: Comparator<in E>): E? =
462     consume {
463         val iterator = iterator()
464         if (!iterator.hasNext()) return null
465         var min = iterator.next()
466         while (iterator.hasNext()) {
467             val e = iterator.next()
468             if (comparator.compare(min, e) > 0) min = e
469         }
470         return min
471     }
472 
473 /** @suppress **/
474 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
nonenull475 public suspend fun <E> ReceiveChannel<E>.none(): Boolean =
476     consume {
477         return !iterator().hasNext()
478     }
479 
480 /** @suppress **/
481 @Deprecated(message = "Left for binary compatibility", level = DeprecationLevel.HIDDEN)
requireNoNullsnull482 public fun <E : Any> ReceiveChannel<E?>.requireNoNulls(): ReceiveChannel<E> =
483     map { it ?: throw IllegalArgumentException("null element found in $this.") }
484 
485 /** @suppress **/
486 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
zipnull487 public infix fun <E, R> ReceiveChannel<E>.zip(other: ReceiveChannel<R>): ReceiveChannel<Pair<E, R>> =
488     zip(other) { t1, t2 -> t1 to t2 }
489 
490 @PublishedApi // Binary compatibility
zipnull491 internal fun <E, R, V> ReceiveChannel<E>.zip(
492     other: ReceiveChannel<R>,
493     context: CoroutineContext = Dispatchers.Unconfined,
494     transform: (a: E, b: R) -> V
495 ): ReceiveChannel<V> =
496     GlobalScope.produce(context, onCompletion = consumesAll(this, other)) {
497         val otherIterator = other.iterator()
498         this@zip.consumeEach { element1 ->
499             if (!otherIterator.hasNext()) return@consumeEach
500             val element2 = otherIterator.next()
501             send(transform(element1, element2))
502         }
503     }
504 
505 @PublishedApi // Binary compatibility
causenull506 internal fun ReceiveChannel<*>.consumes(): CompletionHandler = { cause: Throwable? ->
507     cancelConsumed(cause)
508 }
509