1 /*
2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4 @file:JvmMultifileClass
5 @file:JvmName("ChannelsKt")
6 @file:OptIn(ExperimentalContracts::class)
7
8 package kotlinx.coroutines.channels
9
10 import kotlinx.coroutines.*
11 import kotlinx.coroutines.selects.*
12 import kotlin.contracts.*
13 import kotlin.jvm.*
14
15 internal const val DEFAULT_CLOSE_MESSAGE = "Channel was closed"
16
17
18 // -------- Operations on BroadcastChannel --------
19
20 /**
21 * Opens subscription to this [BroadcastChannel] and makes sure that the given [block] consumes all elements
22 * from it by always invoking [cancel][ReceiveChannel.cancel] after the execution of the block.
23 *
24 * **Note: This API is obsolete since 1.5.0 and deprecated for removal since 1.7.0**
25 * It is replaced with [SharedFlow][kotlinx.coroutines.flow.SharedFlow].
26 *
27 * Safe to remove in 1.9.0 as was inline before.
28 */
29 @ObsoleteCoroutinesApi
30 @Suppress("DEPRECATION")
31 @Deprecated(level = DeprecationLevel.WARNING, message = "BroadcastChannel is deprecated in the favour of SharedFlow and is no longer supported")
consumenull32 public inline fun <E, R> BroadcastChannel<E>.consume(block: ReceiveChannel<E>.() -> R): R {
33 val channel = openSubscription()
34 try {
35 return channel.block()
36 } finally {
37 channel.cancel()
38 }
39 }
40
41 /**
42 * This function is deprecated in the favour of [ReceiveChannel.receiveCatching].
43 *
44 * This function is considered error-prone for the following reasons;
45 * * Is throwing if the channel has failed even though its signature may suggest it returns 'null'
46 * * It is easy to forget that exception handling still have to be explicit
47 * * During code reviews and code reading, intentions of the code are frequently unclear:
48 * are potential exceptions ignored deliberately or not?
49 *
50 * @suppress doc
51 */
52 @Deprecated(
53 "Deprecated in the favour of 'receiveCatching'",
54 ReplaceWith("receiveCatching().getOrNull()"),
55 DeprecationLevel.HIDDEN
56 ) // Warning since 1.5.0, ERROR in 1.6.0, HIDDEN in 1.7.0
57 @Suppress("EXTENSION_SHADOWED_BY_MEMBER", "DEPRECATION_ERROR")
receiveOrNullnull58 public suspend fun <E : Any> ReceiveChannel<E>.receiveOrNull(): E? {
59 @Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
60 return (this as ReceiveChannel<E?>).receiveOrNull()
61 }
62
63 /**
64 * This function is deprecated in the favour of [ReceiveChannel.onReceiveCatching]
65 */
66 @Deprecated(
67 "Deprecated in the favour of 'onReceiveCatching'",
68 level = DeprecationLevel.HIDDEN
69 ) // Warning since 1.5.0, ERROR in 1.6.0, HIDDEN in 1.7.0
70 @Suppress("DEPRECATION_ERROR")
onReceiveOrNullnull71 public fun <E : Any> ReceiveChannel<E>.onReceiveOrNull(): SelectClause1<E?> {
72 return (this as ReceiveChannel<E?>).onReceiveOrNull
73 }
74
75 /**
76 * Makes sure that the given [block] consumes all elements from the given channel
77 * by always invoking [cancel][ReceiveChannel.cancel] after the execution of the block.
78 *
79 * The operation is _terminal_.
80 */
consumenull81 public inline fun <E, R> ReceiveChannel<E>.consume(block: ReceiveChannel<E>.() -> R): R {
82 contract {
83 callsInPlace(block, InvocationKind.EXACTLY_ONCE)
84 }
85 var cause: Throwable? = null
86 try {
87 return block()
88 } catch (e: Throwable) {
89 cause = e
90 throw e
91 } finally {
92 cancelConsumed(cause)
93 }
94 }
95
96 /**
97 * Performs the given [action] for each received element and [cancels][ReceiveChannel.cancel]
98 * the channel after the execution of the block.
99 * If you need to iterate over the channel without consuming it, a regular `for` loop should be used instead.
100 *
101 * The operation is _terminal_.
102 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
103 */
consumeEachnull104 public suspend inline fun <E> ReceiveChannel<E>.consumeEach(action: (E) -> Unit): Unit =
105 consume {
106 for (e in this) action(e)
107 }
108
109 /**
110 * Returns a [List] containing all elements.
111 *
112 * The operation is _terminal_.
113 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
114 */
<lambda>null115 public suspend fun <E> ReceiveChannel<E>.toList(): List<E> = buildList {
116 consumeEach {
117 add(it)
118 }
119 }
120
121 /**
122 * Subscribes to this [BroadcastChannel] and performs the specified action for each received element.
123 *
124 * **Note: This API is obsolete since 1.5.0 and deprecated for removal since 1.7.0**
125 */
126 @Deprecated(level = DeprecationLevel.WARNING, message = "BroadcastChannel is deprecated in the favour of SharedFlow and is no longer supported")
127 @Suppress("DEPRECATION")
consumeEachnull128 public suspend inline fun <E> BroadcastChannel<E>.consumeEach(action: (E) -> Unit): Unit =
129 consume {
130 for (element in this) action(element)
131 }
132
133
134 @PublishedApi
cancelConsumednull135 internal fun ReceiveChannel<*>.cancelConsumed(cause: Throwable?) {
136 cancel(cause?.let {
137 it as? CancellationException ?: CancellationException("Channel was consumed, consumer had failed", it)
138 })
139 }
140
141