1 /*
<lambda>null2 * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4 @file:JvmMultifileClass
5 @file:JvmName("ChannelsKt")
6 @file:Suppress("unused")
7
8 package kotlinx.coroutines.channels
9
10 import kotlinx.coroutines.*
11 import kotlin.coroutines.*
12 import kotlin.jvm.*
13
14 /** @suppress **/
15 @PublishedApi // Binary compatibility
16 internal fun consumesAll(vararg channels: ReceiveChannel<*>): CompletionHandler =
17 { cause: Throwable? ->
18 var exception: Throwable? = null
19 for (channel in channels)
20 try {
21 channel.cancelConsumed(cause)
22 } catch (e: Throwable) {
23 if (exception == null) {
24 exception = e
25 } else {
26 exception.addSuppressedThrowable(e)
27 }
28 }
29 exception?.let { throw it }
30 }
31
32 /** @suppress **/
33 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
<lambda>null34 public suspend fun <E> ReceiveChannel<E>.elementAt(index: Int): E = consume {
35 if (index < 0)
36 throw IndexOutOfBoundsException("ReceiveChannel doesn't contain element at index $index.")
37 var count = 0
38 for (element in this) {
39 @Suppress("UNUSED_CHANGED_VALUE") // KT-47628
40 if (index == count++)
41 return element
42 }
43 throw IndexOutOfBoundsException("ReceiveChannel doesn't contain element at index $index.")
44 }
45
46 /** @suppress **/
47 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
elementAtOrNullnull48 public suspend fun <E> ReceiveChannel<E>.elementAtOrNull(index: Int): E? =
49 consume {
50 if (index < 0)
51 return null
52 var count = 0
53 for (element in this) {
54 if (index == count++)
55 return element
56 }
57 return null
58 }
59
60 /** @suppress **/
61 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
firstnull62 public suspend fun <E> ReceiveChannel<E>.first(): E =
63 consume {
64 val iterator = iterator()
65 if (!iterator.hasNext())
66 throw NoSuchElementException("ReceiveChannel is empty.")
67 return iterator.next()
68 }
69
70 /** @suppress **/
71 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
firstOrNullnull72 public suspend fun <E> ReceiveChannel<E>.firstOrNull(): E? =
73 consume {
74 val iterator = iterator()
75 if (!iterator.hasNext())
76 return null
77 return iterator.next()
78 }
79
80 /** @suppress **/
81 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
indexOfnull82 public suspend fun <E> ReceiveChannel<E>.indexOf(element: E): Int {
83 var index = 0
84 consumeEach {
85 if (element == it)
86 return index
87 index++
88 }
89 return -1
90 }
91
92 /** @suppress **/
93 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
lastnull94 public suspend fun <E> ReceiveChannel<E>.last(): E =
95 consume {
96 val iterator = iterator()
97 if (!iterator.hasNext())
98 throw NoSuchElementException("ReceiveChannel is empty.")
99 var last = iterator.next()
100 while (iterator.hasNext())
101 last = iterator.next()
102 return last
103 }
104
105 /** @suppress **/
106 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
lastIndexOfnull107 public suspend fun <E> ReceiveChannel<E>.lastIndexOf(element: E): Int {
108 var lastIndex = -1
109 var index = 0
110 consumeEach {
111 if (element == it)
112 lastIndex = index
113 index++
114 }
115 return lastIndex
116 }
117
118 /** @suppress **/
119 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
lastOrNullnull120 public suspend fun <E> ReceiveChannel<E>.lastOrNull(): E? =
121 consume {
122 val iterator = iterator()
123 if (!iterator.hasNext())
124 return null
125 var last = iterator.next()
126 while (iterator.hasNext())
127 last = iterator.next()
128 return last
129 }
130
131 /** @suppress **/
132 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
singlenull133 public suspend fun <E> ReceiveChannel<E>.single(): E =
134 consume {
135 val iterator = iterator()
136 if (!iterator.hasNext())
137 throw NoSuchElementException("ReceiveChannel is empty.")
138 val single = iterator.next()
139 if (iterator.hasNext())
140 throw IllegalArgumentException("ReceiveChannel has more than one element.")
141 return single
142 }
143
144 /** @suppress **/
145 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
singleOrNullnull146 public suspend fun <E> ReceiveChannel<E>.singleOrNull(): E? =
147 consume {
148 val iterator = iterator()
149 if (!iterator.hasNext())
150 return null
151 val single = iterator.next()
152 if (iterator.hasNext())
153 return null
154 return single
155 }
156
157 /** @suppress **/
158 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
dropnull159 public fun <E> ReceiveChannel<E>.drop(n: Int, context: CoroutineContext = Dispatchers.Unconfined): ReceiveChannel<E> =
160 GlobalScope.produce(context, onCompletion = consumes()) {
161 require(n >= 0) { "Requested element count $n is less than zero." }
162 var remaining: Int = n
163 if (remaining > 0)
164 for (e in this@drop) {
165 remaining--
166 if (remaining == 0)
167 break
168 }
169 for (e in this@drop) {
170 send(e)
171 }
172 }
173
174 /** @suppress **/
175 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
dropWhilenull176 public fun <E> ReceiveChannel<E>.dropWhile(
177 context: CoroutineContext = Dispatchers.Unconfined,
178 predicate: suspend (E) -> Boolean
179 ): ReceiveChannel<E> =
180 GlobalScope.produce(context, onCompletion = consumes()) {
181 for (e in this@dropWhile) {
182 if (!predicate(e)) {
183 send(e)
184 break
185 }
186 }
187 for (e in this@dropWhile) {
188 send(e)
189 }
190 }
191
192 @PublishedApi
filternull193 internal fun <E> ReceiveChannel<E>.filter(
194 context: CoroutineContext = Dispatchers.Unconfined,
195 predicate: suspend (E) -> Boolean
196 ): ReceiveChannel<E> =
197 GlobalScope.produce(context, onCompletion = consumes()) {
198 for (e in this@filter) {
199 if (predicate(e)) send(e)
200 }
201 }
202
203 /** @suppress **/
204 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
filterIndexednull205 public fun <E> ReceiveChannel<E>.filterIndexed(
206 context: CoroutineContext = Dispatchers.Unconfined,
207 predicate: suspend (index: Int, E) -> Boolean
208 ): ReceiveChannel<E> =
209 GlobalScope.produce(context, onCompletion = consumes()) {
210 var index = 0
211 for (e in this@filterIndexed) {
212 if (predicate(index++, e)) send(e)
213 }
214 }
215
216 /** @suppress **/
217 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
filterNotnull218 public fun <E> ReceiveChannel<E>.filterNot(
219 context: CoroutineContext = Dispatchers.Unconfined,
220 predicate: suspend (E) -> Boolean
221 ): ReceiveChannel<E> =
222 filter(context) { !predicate(it) }
223
224 @PublishedApi
225 @Suppress("UNCHECKED_CAST")
filterNotNullnull226 internal fun <E : Any> ReceiveChannel<E?>.filterNotNull(): ReceiveChannel<E> =
227 filter { it != null } as ReceiveChannel<E>
228
229 /** @suppress **/
230 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
filterNotNullTonull231 public suspend fun <E : Any, C : MutableCollection<in E>> ReceiveChannel<E?>.filterNotNullTo(destination: C): C {
232 consumeEach {
233 if (it != null) destination.add(it)
234 }
235 return destination
236 }
237
238 /** @suppress **/
239 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
filterNotNullTonull240 public suspend fun <E : Any, C : SendChannel<E>> ReceiveChannel<E?>.filterNotNullTo(destination: C): C {
241 consumeEach {
242 if (it != null) destination.send(it)
243 }
244 return destination
245 }
246
247 /** @suppress **/
248 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
takenull249 public fun <E> ReceiveChannel<E>.take(n: Int, context: CoroutineContext = Dispatchers.Unconfined): ReceiveChannel<E> =
250 GlobalScope.produce(context, onCompletion = consumes()) {
251 if (n == 0) return@produce
252 require(n >= 0) { "Requested element count $n is less than zero." }
253 var remaining: Int = n
254 for (e in this@take) {
255 send(e)
256 remaining--
257 if (remaining == 0)
258 return@produce
259 }
260 }
261
262 /** @suppress **/
263 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
takeWhilenull264 public fun <E> ReceiveChannel<E>.takeWhile(
265 context: CoroutineContext = Dispatchers.Unconfined,
266 predicate: suspend (E) -> Boolean
267 ): ReceiveChannel<E> =
268 GlobalScope.produce(context, onCompletion = consumes()) {
269 for (e in this@takeWhile) {
270 if (!predicate(e)) return@produce
271 send(e)
272 }
273 }
274
275 @PublishedApi
toChannelnull276 internal suspend fun <E, C : SendChannel<E>> ReceiveChannel<E>.toChannel(destination: C): C {
277 consumeEach {
278 destination.send(it)
279 }
280 return destination
281 }
282
283 @PublishedApi
toCollectionnull284 internal suspend fun <E, C : MutableCollection<in E>> ReceiveChannel<E>.toCollection(destination: C): C {
285 consumeEach {
286 destination.add(it)
287 }
288 return destination
289 }
290
291 /** @suppress **/
292 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
toMapnull293 public suspend fun <K, V> ReceiveChannel<Pair<K, V>>.toMap(): Map<K, V> =
294 toMap(LinkedHashMap())
295
296 @PublishedApi
297 internal suspend fun <K, V, M : MutableMap<in K, in V>> ReceiveChannel<Pair<K, V>>.toMap(destination: M): M {
298 consumeEach {
299 destination += it
300 }
301 return destination
302 }
303
304 /** @suppress **/
305 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
toMutableListnull306 public suspend fun <E> ReceiveChannel<E>.toMutableList(): MutableList<E> =
307 toCollection(ArrayList())
308
309 /** @suppress **/
310 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
311 public suspend fun <E> ReceiveChannel<E>.toSet(): Set<E> =
312 this.toMutableSet()
313
314 /** @suppress **/
315 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
316 public fun <E, R> ReceiveChannel<E>.flatMap(
317 context: CoroutineContext = Dispatchers.Unconfined,
318 transform: suspend (E) -> ReceiveChannel<R>
319 ): ReceiveChannel<R> =
320 GlobalScope.produce(context, onCompletion = consumes()) {
321 for (e in this@flatMap) {
322 transform(e).toChannel(this)
323 }
324 }
325
326 @PublishedApi
mapnull327 internal fun <E, R> ReceiveChannel<E>.map(
328 context: CoroutineContext = Dispatchers.Unconfined,
329 transform: suspend (E) -> R
330 ): ReceiveChannel<R> =
331 GlobalScope.produce(context, onCompletion = consumes()) {
332 consumeEach {
333 send(transform(it))
334 }
335 }
336
337 @PublishedApi
mapIndexednull338 internal fun <E, R> ReceiveChannel<E>.mapIndexed(
339 context: CoroutineContext = Dispatchers.Unconfined,
340 transform: suspend (index: Int, E) -> R
341 ): ReceiveChannel<R> =
342 GlobalScope.produce(context, onCompletion = consumes()) {
343 var index = 0
344 for (e in this@mapIndexed) {
345 send(transform(index++, e))
346 }
347 }
348
349 /** @suppress **/
350 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
mapIndexedNotNullnull351 public fun <E, R : Any> ReceiveChannel<E>.mapIndexedNotNull(
352 context: CoroutineContext = Dispatchers.Unconfined,
353 transform: suspend (index: Int, E) -> R?
354 ): ReceiveChannel<R> =
355 mapIndexed(context, transform).filterNotNull()
356
357 /** @suppress **/
358 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
359 public fun <E, R : Any> ReceiveChannel<E>.mapNotNull(
360 context: CoroutineContext = Dispatchers.Unconfined,
361 transform: suspend (E) -> R?
362 ): ReceiveChannel<R> =
363 map(context, transform).filterNotNull()
364
365 /** @suppress **/
366 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
367 public fun <E> ReceiveChannel<E>.withIndex(context: CoroutineContext = Dispatchers.Unconfined): ReceiveChannel<IndexedValue<E>> =
368 GlobalScope.produce(context, onCompletion = consumes()) {
369 var index = 0
370 for (e in this@withIndex) {
371 send(IndexedValue(index++, e))
372 }
373 }
374
375 /** @suppress **/
376 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
distinctnull377 public fun <E> ReceiveChannel<E>.distinct(): ReceiveChannel<E> =
378 this.distinctBy { it }
379
380 @PublishedApi
distinctBynull381 internal fun <E, K> ReceiveChannel<E>.distinctBy(
382 context: CoroutineContext = Dispatchers.Unconfined,
383 selector: suspend (E) -> K
384 ): ReceiveChannel<E> =
385 GlobalScope.produce(context, onCompletion = consumes()) {
386 val keys = HashSet<K>()
387 for (e in this@distinctBy) {
388 val k = selector(e)
389 if (k !in keys) {
390 send(e)
391 keys += k
392 }
393 }
394 }
395
396 @PublishedApi
toMutableSetnull397 internal suspend fun <E> ReceiveChannel<E>.toMutableSet(): MutableSet<E> =
398 toCollection(LinkedHashSet())
399
400 /** @suppress **/
401 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
402 public suspend fun <E> ReceiveChannel<E>.any(): Boolean =
403 consume {
404 return iterator().hasNext()
405 }
406
407 /** @suppress **/
408 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
countnull409 public suspend fun <E> ReceiveChannel<E>.count(): Int {
410 var count = 0
411 consumeEach { count++ }
412 return count
413 }
414
415 /** @suppress **/
416 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
maxWithnull417 public suspend fun <E> ReceiveChannel<E>.maxWith(comparator: Comparator<in E>): E? =
418 consume {
419 val iterator = iterator()
420 if (!iterator.hasNext()) return null
421 var max = iterator.next()
422 while (iterator.hasNext()) {
423 val e = iterator.next()
424 if (comparator.compare(max, e) < 0) max = e
425 }
426 return max
427 }
428
429 /** @suppress **/
430 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
minWithnull431 public suspend fun <E> ReceiveChannel<E>.minWith(comparator: Comparator<in E>): E? =
432 consume {
433 val iterator = iterator()
434 if (!iterator.hasNext()) return null
435 var min = iterator.next()
436 while (iterator.hasNext()) {
437 val e = iterator.next()
438 if (comparator.compare(min, e) > 0) min = e
439 }
440 return min
441 }
442
443 /** @suppress **/
444 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
nonenull445 public suspend fun <E> ReceiveChannel<E>.none(): Boolean =
446 consume {
447 return !iterator().hasNext()
448 }
449
450 /** @suppress **/
451 @Deprecated(message = "Left for binary compatibility", level = DeprecationLevel.HIDDEN)
requireNoNullsnull452 public fun <E : Any> ReceiveChannel<E?>.requireNoNulls(): ReceiveChannel<E> =
453 map { it ?: throw IllegalArgumentException("null element found in $this.") }
454
455 /** @suppress **/
456 @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
zipnull457 public infix fun <E, R> ReceiveChannel<E>.zip(other: ReceiveChannel<R>): ReceiveChannel<Pair<E, R>> =
458 zip(other) { t1, t2 -> t1 to t2 }
459
460 @PublishedApi // Binary compatibility
zipnull461 internal fun <E, R, V> ReceiveChannel<E>.zip(
462 other: ReceiveChannel<R>,
463 context: CoroutineContext = Dispatchers.Unconfined,
464 transform: (a: E, b: R) -> V
465 ): ReceiveChannel<V> =
466 GlobalScope.produce(context, onCompletion = consumesAll(this, other)) {
467 val otherIterator = other.iterator()
468 this@zip.consumeEach { element1 ->
469 if (!otherIterator.hasNext()) return@consumeEach
470 val element2 = otherIterator.next()
471 send(transform(element1, element2))
472 }
473 }
474
475 @PublishedApi // Binary compatibility
causenull476 internal fun ReceiveChannel<*>.consumes(): CompletionHandler = { cause: Throwable? ->
477 cancelConsumed(cause)
478 }
479