• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 @file:JvmMultifileClass
5 @file:JvmName("ChannelsKt")
6 @file:Suppress("unused")
7 
8 package kotlinx.coroutines.channels
9 
10 import kotlinx.coroutines.*
11 import kotlin.coroutines.*
12 import kotlin.jvm.*
13 
14 /** @suppress **/
15 @PublishedApi // Binary compatibility
16 internal fun consumesAll(vararg channels: ReceiveChannel<*>): CompletionHandler =
17     { cause: Throwable? ->
18         var exception: Throwable? = null
19         for (channel in channels)
20             try {
21                 channel.cancelConsumed(cause)
22             } catch (e: Throwable) {
23                 if (exception == null) {
24                     exception = e
25                 } else {
26                     exception.addSuppressedThrowable(e)
27                 }
28             }
29         exception?.let { throw it }
30     }
31 
32 /** @suppress **/
33 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
<lambda>null34 public suspend fun <E> ReceiveChannel<E>.elementAt(index: Int): E = consume {
35     if (index < 0)
36         throw IndexOutOfBoundsException("ReceiveChannel doesn't contain element at index $index.")
37     var count = 0
38     for (element in this) {
39         @Suppress("UNUSED_CHANGED_VALUE") // KT-47628
40         if (index == count++)
41             return element
42     }
43     throw IndexOutOfBoundsException("ReceiveChannel doesn't contain element at index $index.")
44 }
45 
46 /** @suppress **/
47 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
elementAtOrNullnull48 public suspend fun <E> ReceiveChannel<E>.elementAtOrNull(index: Int): E? =
49     consume {
50         if (index < 0)
51             return null
52         var count = 0
53         for (element in this) {
54             if (index == count++)
55                 return element
56         }
57         return null
58     }
59 
60 /** @suppress **/
61 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
firstnull62 public suspend fun <E> ReceiveChannel<E>.first(): E =
63     consume {
64         val iterator = iterator()
65         if (!iterator.hasNext())
66             throw NoSuchElementException("ReceiveChannel is empty.")
67         return iterator.next()
68     }
69 
70 /** @suppress **/
71 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
firstOrNullnull72 public suspend fun <E> ReceiveChannel<E>.firstOrNull(): E? =
73     consume {
74         val iterator = iterator()
75         if (!iterator.hasNext())
76             return null
77         return iterator.next()
78     }
79 
80 /** @suppress **/
81 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
indexOfnull82 public suspend fun <E> ReceiveChannel<E>.indexOf(element: E): Int {
83     var index = 0
84     consumeEach {
85         if (element == it)
86             return index
87         index++
88     }
89     return -1
90 }
91 
92 /** @suppress **/
93 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
lastnull94 public suspend fun <E> ReceiveChannel<E>.last(): E =
95     consume {
96         val iterator = iterator()
97         if (!iterator.hasNext())
98             throw NoSuchElementException("ReceiveChannel is empty.")
99         var last = iterator.next()
100         while (iterator.hasNext())
101             last = iterator.next()
102         return last
103     }
104 
105 /** @suppress **/
106 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
lastIndexOfnull107 public suspend fun <E> ReceiveChannel<E>.lastIndexOf(element: E): Int {
108     var lastIndex = -1
109     var index = 0
110     consumeEach {
111         if (element == it)
112             lastIndex = index
113         index++
114     }
115     return lastIndex
116 }
117 
118 /** @suppress **/
119 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
lastOrNullnull120 public suspend fun <E> ReceiveChannel<E>.lastOrNull(): E? =
121     consume {
122         val iterator = iterator()
123         if (!iterator.hasNext())
124             return null
125         var last = iterator.next()
126         while (iterator.hasNext())
127             last = iterator.next()
128         return last
129     }
130 
131 /** @suppress **/
132 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
singlenull133 public suspend fun <E> ReceiveChannel<E>.single(): E =
134     consume {
135         val iterator = iterator()
136         if (!iterator.hasNext())
137             throw NoSuchElementException("ReceiveChannel is empty.")
138         val single = iterator.next()
139         if (iterator.hasNext())
140             throw IllegalArgumentException("ReceiveChannel has more than one element.")
141         return single
142     }
143 
144 /** @suppress **/
145 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
singleOrNullnull146 public suspend fun <E> ReceiveChannel<E>.singleOrNull(): E? =
147     consume {
148         val iterator = iterator()
149         if (!iterator.hasNext())
150             return null
151         val single = iterator.next()
152         if (iterator.hasNext())
153             return null
154         return single
155     }
156 
157 /** @suppress **/
158 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
dropnull159 public fun <E> ReceiveChannel<E>.drop(n: Int, context: CoroutineContext = Dispatchers.Unconfined): ReceiveChannel<E> =
160     GlobalScope.produce(context, onCompletion = consumes()) {
161         require(n >= 0) { "Requested element count $n is less than zero." }
162         var remaining: Int = n
163         if (remaining > 0)
164             for (e in this@drop) {
165                 remaining--
166                 if (remaining == 0)
167                     break
168             }
169         for (e in this@drop) {
170             send(e)
171         }
172     }
173 
174 /** @suppress **/
175 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
dropWhilenull176 public fun <E> ReceiveChannel<E>.dropWhile(
177     context: CoroutineContext = Dispatchers.Unconfined,
178     predicate: suspend (E) -> Boolean
179 ): ReceiveChannel<E> =
180     GlobalScope.produce(context, onCompletion = consumes()) {
181         for (e in this@dropWhile) {
182             if (!predicate(e)) {
183                 send(e)
184                 break
185             }
186         }
187         for (e in this@dropWhile) {
188             send(e)
189         }
190     }
191 
192 @PublishedApi
filternull193 internal fun <E> ReceiveChannel<E>.filter(
194     context: CoroutineContext = Dispatchers.Unconfined,
195     predicate: suspend (E) -> Boolean
196 ): ReceiveChannel<E> =
197     GlobalScope.produce(context, onCompletion = consumes()) {
198         for (e in this@filter) {
199             if (predicate(e)) send(e)
200         }
201     }
202 
203 /** @suppress **/
204 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
filterIndexednull205 public fun <E> ReceiveChannel<E>.filterIndexed(
206     context: CoroutineContext = Dispatchers.Unconfined,
207     predicate: suspend (index: Int, E) -> Boolean
208 ): ReceiveChannel<E> =
209     GlobalScope.produce(context, onCompletion = consumes()) {
210         var index = 0
211         for (e in this@filterIndexed) {
212             if (predicate(index++, e)) send(e)
213         }
214     }
215 
216 /** @suppress **/
217 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
filterNotnull218 public fun <E> ReceiveChannel<E>.filterNot(
219     context: CoroutineContext = Dispatchers.Unconfined,
220     predicate: suspend (E) -> Boolean
221 ): ReceiveChannel<E> =
222     filter(context) { !predicate(it) }
223 
224 @PublishedApi
225 @Suppress("UNCHECKED_CAST")
filterNotNullnull226 internal fun <E : Any> ReceiveChannel<E?>.filterNotNull(): ReceiveChannel<E> =
227     filter { it != null } as ReceiveChannel<E>
228 
229 /** @suppress **/
230 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
filterNotNullTonull231 public suspend fun <E : Any, C : MutableCollection<in E>> ReceiveChannel<E?>.filterNotNullTo(destination: C): C {
232     consumeEach {
233         if (it != null) destination.add(it)
234     }
235     return destination
236 }
237 
238 /** @suppress **/
239 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
filterNotNullTonull240 public suspend fun <E : Any, C : SendChannel<E>> ReceiveChannel<E?>.filterNotNullTo(destination: C): C {
241     consumeEach {
242         if (it != null) destination.send(it)
243     }
244     return destination
245 }
246 
247 /** @suppress **/
248 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
takenull249 public fun <E> ReceiveChannel<E>.take(n: Int, context: CoroutineContext = Dispatchers.Unconfined): ReceiveChannel<E> =
250     GlobalScope.produce(context, onCompletion = consumes()) {
251         if (n == 0) return@produce
252         require(n >= 0) { "Requested element count $n is less than zero." }
253         var remaining: Int = n
254         for (e in this@take) {
255             send(e)
256             remaining--
257             if (remaining == 0)
258                 return@produce
259         }
260     }
261 
262 /** @suppress **/
263 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
takeWhilenull264 public fun <E> ReceiveChannel<E>.takeWhile(
265     context: CoroutineContext = Dispatchers.Unconfined,
266     predicate: suspend (E) -> Boolean
267 ): ReceiveChannel<E> =
268     GlobalScope.produce(context, onCompletion = consumes()) {
269         for (e in this@takeWhile) {
270             if (!predicate(e)) return@produce
271             send(e)
272         }
273     }
274 
275 @PublishedApi
toChannelnull276 internal suspend fun <E, C : SendChannel<E>> ReceiveChannel<E>.toChannel(destination: C): C {
277     consumeEach {
278         destination.send(it)
279     }
280     return destination
281 }
282 
283 @PublishedApi
toCollectionnull284 internal suspend fun <E, C : MutableCollection<in E>> ReceiveChannel<E>.toCollection(destination: C): C {
285     consumeEach {
286         destination.add(it)
287     }
288     return destination
289 }
290 
291 /** @suppress **/
292 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
toMapnull293 public suspend fun <K, V> ReceiveChannel<Pair<K, V>>.toMap(): Map<K, V> =
294     toMap(LinkedHashMap())
295 
296 @PublishedApi
297 internal suspend fun <K, V, M : MutableMap<in K, in V>> ReceiveChannel<Pair<K, V>>.toMap(destination: M): M {
298     consumeEach {
299         destination += it
300     }
301     return destination
302 }
303 
304 /** @suppress **/
305 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
toMutableListnull306 public suspend fun <E> ReceiveChannel<E>.toMutableList(): MutableList<E> =
307     toCollection(ArrayList())
308 
309 /** @suppress **/
310 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
311 public suspend fun <E> ReceiveChannel<E>.toSet(): Set<E> =
312     this.toMutableSet()
313 
314 /** @suppress **/
315 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
316 public fun <E, R> ReceiveChannel<E>.flatMap(
317     context: CoroutineContext = Dispatchers.Unconfined,
318     transform: suspend (E) -> ReceiveChannel<R>
319 ): ReceiveChannel<R> =
320     GlobalScope.produce(context, onCompletion = consumes()) {
321         for (e in this@flatMap) {
322             transform(e).toChannel(this)
323         }
324     }
325 
326 @PublishedApi
mapnull327 internal fun <E, R> ReceiveChannel<E>.map(
328     context: CoroutineContext = Dispatchers.Unconfined,
329     transform: suspend (E) -> R
330 ): ReceiveChannel<R> =
331     GlobalScope.produce(context, onCompletion = consumes()) {
332         consumeEach {
333             send(transform(it))
334         }
335     }
336 
337 @PublishedApi
mapIndexednull338 internal fun <E, R> ReceiveChannel<E>.mapIndexed(
339     context: CoroutineContext = Dispatchers.Unconfined,
340     transform: suspend (index: Int, E) -> R
341 ): ReceiveChannel<R> =
342     GlobalScope.produce(context, onCompletion = consumes()) {
343         var index = 0
344         for (e in this@mapIndexed) {
345             send(transform(index++, e))
346         }
347     }
348 
349 /** @suppress **/
350 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
mapIndexedNotNullnull351 public fun <E, R : Any> ReceiveChannel<E>.mapIndexedNotNull(
352     context: CoroutineContext = Dispatchers.Unconfined,
353     transform: suspend (index: Int, E) -> R?
354 ): ReceiveChannel<R> =
355     mapIndexed(context, transform).filterNotNull()
356 
357 /** @suppress **/
358 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
359 public fun <E, R : Any> ReceiveChannel<E>.mapNotNull(
360     context: CoroutineContext = Dispatchers.Unconfined,
361     transform: suspend (E) -> R?
362 ): ReceiveChannel<R> =
363     map(context, transform).filterNotNull()
364 
365 /** @suppress **/
366 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
367 public fun <E> ReceiveChannel<E>.withIndex(context: CoroutineContext = Dispatchers.Unconfined): ReceiveChannel<IndexedValue<E>> =
368     GlobalScope.produce(context, onCompletion = consumes()) {
369         var index = 0
370         for (e in this@withIndex) {
371             send(IndexedValue(index++, e))
372         }
373     }
374 
375 /** @suppress **/
376 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
distinctnull377 public fun <E> ReceiveChannel<E>.distinct(): ReceiveChannel<E> =
378     this.distinctBy { it }
379 
380 @PublishedApi
distinctBynull381 internal fun <E, K> ReceiveChannel<E>.distinctBy(
382     context: CoroutineContext = Dispatchers.Unconfined,
383     selector: suspend (E) -> K
384 ): ReceiveChannel<E> =
385     GlobalScope.produce(context, onCompletion = consumes()) {
386         val keys = HashSet<K>()
387         for (e in this@distinctBy) {
388             val k = selector(e)
389             if (k !in keys) {
390                 send(e)
391                 keys += k
392             }
393         }
394     }
395 
396 @PublishedApi
toMutableSetnull397 internal suspend fun <E> ReceiveChannel<E>.toMutableSet(): MutableSet<E> =
398     toCollection(LinkedHashSet())
399 
400 /** @suppress **/
401 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
402 public suspend fun <E> ReceiveChannel<E>.any(): Boolean =
403     consume {
404         return iterator().hasNext()
405     }
406 
407 /** @suppress **/
408 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
countnull409 public suspend fun <E> ReceiveChannel<E>.count(): Int {
410     var count = 0
411     consumeEach { count++ }
412     return count
413 }
414 
415 /** @suppress **/
416 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
maxWithnull417 public suspend fun <E> ReceiveChannel<E>.maxWith(comparator: Comparator<in E>): E? =
418     consume {
419         val iterator = iterator()
420         if (!iterator.hasNext()) return null
421         var max = iterator.next()
422         while (iterator.hasNext()) {
423             val e = iterator.next()
424             if (comparator.compare(max, e) < 0) max = e
425         }
426         return max
427     }
428 
429 /** @suppress **/
430 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
minWithnull431 public suspend fun <E> ReceiveChannel<E>.minWith(comparator: Comparator<in E>): E? =
432     consume {
433         val iterator = iterator()
434         if (!iterator.hasNext()) return null
435         var min = iterator.next()
436         while (iterator.hasNext()) {
437             val e = iterator.next()
438             if (comparator.compare(min, e) > 0) min = e
439         }
440         return min
441     }
442 
443 /** @suppress **/
444 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
nonenull445 public suspend fun <E> ReceiveChannel<E>.none(): Boolean =
446     consume {
447         return !iterator().hasNext()
448     }
449 
450 /** @suppress **/
451 @Deprecated(message = "Left for binary compatibility", level = DeprecationLevel.HIDDEN)
requireNoNullsnull452 public fun <E : Any> ReceiveChannel<E?>.requireNoNulls(): ReceiveChannel<E> =
453     map { it ?: throw IllegalArgumentException("null element found in $this.") }
454 
455 /** @suppress **/
456 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
zipnull457 public infix fun <E, R> ReceiveChannel<E>.zip(other: ReceiveChannel<R>): ReceiveChannel<Pair<E, R>> =
458     zip(other) { t1, t2 -> t1 to t2 }
459 
460 @PublishedApi // Binary compatibility
zipnull461 internal fun <E, R, V> ReceiveChannel<E>.zip(
462     other: ReceiveChannel<R>,
463     context: CoroutineContext = Dispatchers.Unconfined,
464     transform: (a: E, b: R) -> V
465 ): ReceiveChannel<V> =
466     GlobalScope.produce(context, onCompletion = consumesAll(this, other)) {
467         val otherIterator = other.iterator()
468         this@zip.consumeEach { element1 ->
469             if (!otherIterator.hasNext()) return@consumeEach
470             val element2 = otherIterator.next()
471             send(transform(element1, element2))
472         }
473     }
474 
475 @PublishedApi // Binary compatibility
causenull476 internal fun ReceiveChannel<*>.consumes(): CompletionHandler = { cause: Throwable? ->
477     cancelConsumed(cause)
478 }
479