• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 @file:JvmMultifileClass
5 @file:JvmName("ChannelsKt")
6 @file:OptIn(ExperimentalContracts::class)
7 
8 package kotlinx.coroutines.channels
9 
10 import kotlinx.coroutines.*
11 import kotlinx.coroutines.selects.*
12 import kotlin.contracts.*
13 import kotlin.jvm.*
14 
15 internal const val DEFAULT_CLOSE_MESSAGE = "Channel was closed"
16 
17 
18 // -------- Operations on BroadcastChannel --------
19 
20 /**
21  * Opens subscription to this [BroadcastChannel] and makes sure that the given [block] consumes all elements
22  * from it by always invoking [cancel][ReceiveChannel.cancel] after the execution of the block.
23  *
24  * **Note: This API is obsolete since 1.5.0 and deprecated for removal since 1.7.0**
25  * It is replaced with [SharedFlow][kotlinx.coroutines.flow.SharedFlow].
26  *
27  * Safe to remove in 1.9.0 as was inline before.
28  */
29 @ObsoleteCoroutinesApi
30 @Suppress("DEPRECATION")
31 @Deprecated(level = DeprecationLevel.WARNING, message = "BroadcastChannel is deprecated in the favour of SharedFlow and is no longer supported")
consumenull32 public inline fun <E, R> BroadcastChannel<E>.consume(block: ReceiveChannel<E>.() -> R): R {
33     val channel = openSubscription()
34     try {
35         return channel.block()
36     } finally {
37         channel.cancel()
38     }
39 }
40 
41 /**
42  * This function is deprecated in the favour of [ReceiveChannel.receiveCatching].
43  *
44  * This function is considered error-prone for the following reasons;
45  * * Is throwing if the channel has failed even though its signature may suggest it returns 'null'
46  * * It is easy to forget that exception handling still have to be explicit
47  * * During code reviews and code reading, intentions of the code are frequently unclear:
48  *   are potential exceptions ignored deliberately or not?
49  *
50  * @suppress doc
51  */
52 @Deprecated(
53     "Deprecated in the favour of 'receiveCatching'",
54     ReplaceWith("receiveCatching().getOrNull()"),
55     DeprecationLevel.HIDDEN
56 ) // Warning since 1.5.0, ERROR in 1.6.0, HIDDEN in 1.7.0
57 @Suppress("EXTENSION_SHADOWED_BY_MEMBER", "DEPRECATION_ERROR")
receiveOrNullnull58 public suspend fun <E : Any> ReceiveChannel<E>.receiveOrNull(): E? {
59     @Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
60     return (this as ReceiveChannel<E?>).receiveOrNull()
61 }
62 
63 /**
64  * This function is deprecated in the favour of [ReceiveChannel.onReceiveCatching]
65  */
66 @Deprecated(
67     "Deprecated in the favour of 'onReceiveCatching'",
68     level = DeprecationLevel.HIDDEN
69 )  // Warning since 1.5.0, ERROR in 1.6.0, HIDDEN in 1.7.0
70 @Suppress("DEPRECATION_ERROR")
onReceiveOrNullnull71 public fun <E : Any> ReceiveChannel<E>.onReceiveOrNull(): SelectClause1<E?> {
72     return (this as ReceiveChannel<E?>).onReceiveOrNull
73 }
74 
75 /**
76  * Makes sure that the given [block] consumes all elements from the given channel
77  * by always invoking [cancel][ReceiveChannel.cancel] after the execution of the block.
78  *
79  * The operation is _terminal_.
80  */
consumenull81 public inline fun <E, R> ReceiveChannel<E>.consume(block: ReceiveChannel<E>.() -> R): R {
82     contract {
83         callsInPlace(block, InvocationKind.EXACTLY_ONCE)
84     }
85     var cause: Throwable? = null
86     try {
87         return block()
88     } catch (e: Throwable) {
89         cause = e
90         throw e
91     } finally {
92         cancelConsumed(cause)
93     }
94 }
95 
96 /**
97  * Performs the given [action] for each received element and [cancels][ReceiveChannel.cancel]
98  * the channel after the execution of the block.
99  * If you need to iterate over the channel without consuming it, a regular `for` loop should be used instead.
100  *
101  * The operation is _terminal_.
102  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
103  */
consumeEachnull104 public suspend inline fun <E> ReceiveChannel<E>.consumeEach(action: (E) -> Unit): Unit =
105     consume {
106         for (e in this) action(e)
107     }
108 
109 /**
110  * Returns a [List] containing all elements.
111  *
112  * The operation is _terminal_.
113  * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
114  */
<lambda>null115 public suspend fun <E> ReceiveChannel<E>.toList(): List<E> = buildList {
116     consumeEach {
117         add(it)
118     }
119 }
120 
121 /**
122  * Subscribes to this [BroadcastChannel] and performs the specified action for each received element.
123  *
124  * **Note: This API is obsolete since 1.5.0 and deprecated for removal since 1.7.0**
125  */
126 @Deprecated(level = DeprecationLevel.WARNING, message = "BroadcastChannel is deprecated in the favour of SharedFlow and is no longer supported")
127 @Suppress("DEPRECATION")
consumeEachnull128 public suspend inline fun <E> BroadcastChannel<E>.consumeEach(action: (E) -> Unit): Unit =
129     consume {
130         for (element in this) action(element)
131     }
132 
133 
134 @PublishedApi
cancelConsumednull135 internal fun ReceiveChannel<*>.cancelConsumed(cause: Throwable?) {
136     cancel(cause?.let {
137         it as? CancellationException ?: CancellationException("Channel was consumed, consumer had failed", it)
138     })
139 }
140 
141