• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download

<lambda>null1 package kotlinx.coroutines.channels
2 
3 import kotlinx.coroutines.*
4 import kotlin.coroutines.*
5 import kotlinx.coroutines.flow.*
6 
7 /**
8  * Scope for the [produce][CoroutineScope.produce], [callbackFlow] and [channelFlow] builders.
9  */
10 public interface ProducerScope<in E> : CoroutineScope, SendChannel<E> {
11     /**
12      * A reference to the channel this coroutine [sends][send] elements to.
13      * It is provided for convenience, so that the code in the coroutine can refer
14      * to the channel as `channel` as opposed to `this`.
15      * All the [SendChannel] functions on this interface delegate to
16      * the channel instance returned by this property.
17      */
18     public val channel: SendChannel<E>
19 }
20 
21 /**
22  * Suspends the current coroutine until the channel is either
23  * [closed][SendChannel.close] or [cancelled][ReceiveChannel.cancel].
24  *
25  * The given [block] will be executed unconditionally before this function returns.
26  * `awaitClose { cleanup() }` is a convenient shorthand for the often useful form
27  * `try { awaitClose() } finally { cleanup() }`.
28  *
29  * This function can only be invoked directly inside the same coroutine that is its receiver.
30  * Specifying the receiver of [awaitClose] explicitly is most probably a mistake.
31  *
32  * This suspending function is cancellable: if the [Job] of the current coroutine is [cancelled][CoroutineScope.cancel]
33  * while this suspending function is waiting, this function immediately resumes with [CancellationException].
34  * There is a **prompt cancellation guarantee**: even if this function is ready to return, but was cancelled
35  * while suspended, [CancellationException] will be thrown. See [suspendCancellableCoroutine] for low-level details.
36  *
37  * Example of usage:
38  * ```
39  * val callbackEventsStream = produce {
40  *     val disposable = registerChannelInCallback(channel)
41  *     awaitClose { disposable.dispose() }
42  * }
43  * ```
44  *
45  * Internally, [awaitClose] is implemented using [SendChannel.invokeOnClose].
46  * Currently, every channel can have at most one [SendChannel.invokeOnClose] handler.
47  * This means that calling [awaitClose] several times in a row or combining it with other [SendChannel.invokeOnClose]
48  * invocations is prohibited.
49  * An [IllegalStateException] will be thrown if this rule is broken.
50  *
51  * **Pitfall**: when used in [produce], if the channel is [cancelled][ReceiveChannel.cancel], [awaitClose] can either
52  * return normally or throw a [CancellationException] due to a race condition.
53  * The reason is that, for [produce], cancelling the channel and cancelling the coroutine of the [ProducerScope] is
54  * done simultaneously.
55  *
56  * @throws IllegalStateException if invoked from outside the [ProducerScope] (by leaking `this` outside the producer
57  * coroutine).
58  * @throws IllegalStateException if this channel already has a [SendChannel.invokeOnClose] handler registered.
59  */
<lambda>null60 public suspend fun ProducerScope<*>.awaitClose(block: () -> Unit = {}) {
<lambda>null61     check(kotlin.coroutines.coroutineContext[Job] === this) { "awaitClose() can only be invoked from the producer context" }
62     try {
contnull63         suspendCancellableCoroutine<Unit> { cont ->
64             invokeOnClose {
65                 cont.resume(Unit)
66             }
67         }
68     } finally {
69         block()
70     }
71 }
72 
73 /**
74  * Launches a new coroutine to produce a stream of values by sending them to a channel
75  * and returns a reference to the coroutine as a [ReceiveChannel]. This resulting
76  * object can be used to [receive][ReceiveChannel.receive] elements produced by this coroutine.
77  *
78  * The scope of the coroutine contains the [ProducerScope] interface, which implements
79  * both [CoroutineScope] and [SendChannel], so that the coroutine can invoke [send][SendChannel.send] directly.
80  *
81  * The kind of the resulting channel depends on the specified [capacity] parameter.
82  * See the [Channel] interface documentation for details.
83  * By default, an unbuffered channel is created.
84  * If an invalid [capacity] value is specified, an [IllegalArgumentException] is thrown.
85  *
86  * ### Behavior on termination
87  *
88  * The channel is [closed][SendChannel.close] when the coroutine completes.
89  *
90  * ```
91  * val values = listOf(1, 2, 3, 4)
92  * val channel = produce<Int> {
93  *     for (value in values) {
94  *         send(value)
95  *     }
96  * }
97  * check(channel.toList() == values)
98  * ```
99  *
100  * The running coroutine is cancelled when the channel is [cancelled][ReceiveChannel.cancel].
101  *
102  * ```
103  * val channel = produce<Int> {
104  *     send(1)
105  *     send(2)
106  *     try {
107  *         send(3) // will throw CancellationException
108  *     } catch (e: CancellationException) {
109  *         println("The channel was cancelled!)
110  *         throw e // always rethrow CancellationException
111  *     }
112  * }
113  * check(channel.receive() == 1)
114  * check(channel.receive() == 2)
115  * channel.cancel()
116  * ```
117  *
118  * If this coroutine finishes with an exception, it will close the channel with that exception as the cause,
119  * so after receiving all the existing elements,
120  * all further attempts to receive from it will throw the exception with which the coroutine finished.
121  *
122  * ```
123  * val produceJob = Job()
124  * // create and populate a channel with a buffer
125  * val channel = produce<Int>(produceJob, capacity = Channel.UNLIMITED) {
126  *     repeat(5) { send(it) }
127  *     throw TestException()
128  * }
129  * produceJob.join() // wait for `produce` to fail
130  * check(produceJob.isCancelled == true)
131  * // prints 0, 1, 2, 3, 4, then throws `TestException`
132  * for (value in channel) { println(value) }
133  * ```
134  *
135  * When the coroutine is cancelled via structured concurrency and not the `cancel` function,
136  * the channel does not automatically close until the coroutine completes,
137  * so it is possible that some elements will be sent even after the coroutine is cancelled:
138  *
139  * ```
140  * val parentScope = CoroutineScope(Dispatchers.Default)
141  * val channel = parentScope.produce<Int>(capacity = Channel.UNLIMITED) {
142  *     repeat(5) {
143  *         send(it)
144  *     }
145  *     parentScope.cancel()
146  *     // suspending after this point would fail, but sending succeeds
147  *     send(-1)
148  * }
149  * for (c in channel) {
150  *     println(c) // 0, 1, 2, 3, 4, -1
151  * } // throws a `CancellationException` exception after reaching -1
152  * ```
153  *
154  * Note that cancelling `produce` via structured concurrency closes the channel with a cause.
155  *
156  * The behavior around coroutine cancellation and error handling is experimental and may change in a future release.
157  *
158  * ### Coroutine context
159  *
160  * The coroutine context is inherited from this [CoroutineScope]. Additional context elements can be specified with the [context] argument.
161  * If the context does not have any dispatcher or other [ContinuationInterceptor], then [Dispatchers.Default] is used.
162  * The parent job is inherited from the [CoroutineScope] as well, but it can also be overridden
163  * with a corresponding [context] element.
164  *
165  * See [newCoroutineContext] for a description of debugging facilities available for newly created coroutines.
166  *
167  * ### Undelivered elements
168  *
169  * Some values that [produce] creates may be lost:
170  *
171  * ```
172  * val channel = produce(Dispatchers.Default, capacity = 5) {
173  *     repeat(100) {
174  *         send(it)
175  *         println("Sent $it")
176  *     }
177  * }
178  * channel.cancel() // no elements can be received after this!
179  * ```
180  *
181  * There is no way to recover these lost elements.
182  * If this is unsuitable, please create a [Channel] manually and pass the `onUndeliveredElement` callback to the
183  * constructor: [Channel(onUndeliveredElement = ...)][Channel].
184  *
185  * ### Usage example
186  *
187  * ```
188  * /* Generate random integers until we find the square root of 9801.
189  *    To calculate whether the given number is that square root,
190  *    use several coroutines that separately process these integers.
191  *    Alternatively, we may randomly give up during value generation.
192  *    `produce` is used to generate the integers and put them into a
193  *    channel, from which the square-computing coroutines take them. */
194  * val parentScope = CoroutineScope(SupervisorJob())
195  * val channel = parentScope.produce<Int>(
196  *     Dispatchers.IO,
197  *     capacity = 16 // buffer of size 16
198  * ) {
199  *     // this code will run on Dispatchers.IO
200  *     while (true) {
201  *         val request = run {
202  *             // simulate waiting for the next request
203  *             delay(5.milliseconds)
204  *             val randomInt = Random.nextInt(-1, 100)
205  *             if (randomInt == -1) {
206  *                 // external termination request received
207  *                 println("Producer: no longer accepting requests")
208  *                 return@produce
209  *             }
210  *             println("Producer: sending a request ($randomInt)")
211  *             randomInt
212  *         }
213  *         send(request)
214  *     }
215  * }
216  * // Launch consumers
217  * repeat(4) {
218  *     launch(Dispatchers.Default) {
219  *         for (request in channel) {
220  *             // simulate processing a request
221  *             delay(25.milliseconds)
222  *             println("Consumer $it: received a request ($request)")
223  *             if (request * request == 9801) {
224  *                 println("Consumer $it found the square root of 9801!")
225  *                 /* the work is done, the producer may finish.
226  *                    the internal termination request will cancel
227  *                    the producer on the next suspension point. */
228  *                 channel.cancel()
229  *             }
230  *         }
231  *     }
232  * }
233  * ```
234  *
235  * **Note: This is an experimental api.** Behaviour of producers that work as children in a parent scope with respect
236  *        to cancellation and error handling may change in the future.
237  */
238 @ExperimentalCoroutinesApi
producenull239 public fun <E> CoroutineScope.produce(
240     context: CoroutineContext = EmptyCoroutineContext,
241     capacity: Int = Channel.RENDEZVOUS,
242     @BuilderInference block: suspend ProducerScope<E>.() -> Unit
243 ): ReceiveChannel<E> =
244     produce(context, capacity, BufferOverflow.SUSPEND, CoroutineStart.DEFAULT, onCompletion = null, block = block)
245 
246 /**
247  * **This is an internal API and should not be used from general code.**
248  * The `onCompletion` parameter will be redesigned.
249  * If you have to use the `onCompletion` operator, please report to https://github.com/Kotlin/kotlinx.coroutines/issues/.
250  * As a temporary solution, [invokeOnCompletion][Job.invokeOnCompletion] can be used instead:
251  * ```
252  * fun <E> ReceiveChannel<E>.myOperator(): ReceiveChannel<E> = GlobalScope.produce(Dispatchers.Unconfined) {
253  *     coroutineContext[Job]?.invokeOnCompletion { consumes() }
254  * }
255  * ```
256  * @suppress
257  */
258 @InternalCoroutinesApi
259 public fun <E> CoroutineScope.produce(
260     context: CoroutineContext = EmptyCoroutineContext,
261     capacity: Int = 0,
262     start: CoroutineStart = CoroutineStart.DEFAULT,
263     onCompletion: CompletionHandler? = null,
264     @BuilderInference block: suspend ProducerScope<E>.() -> Unit
265 ): ReceiveChannel<E> =
266     produce(context, capacity, BufferOverflow.SUSPEND, start, onCompletion, block)
267 
268 // Internal version of produce that is maximally flexible, but is not exposed through public API (too many params)
269 internal fun <E> CoroutineScope.produce(
270     context: CoroutineContext = EmptyCoroutineContext,
271     capacity: Int = 0,
272     onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
273     start: CoroutineStart = CoroutineStart.DEFAULT,
274     onCompletion: CompletionHandler? = null,
275     @BuilderInference block: suspend ProducerScope<E>.() -> Unit
276 ): ReceiveChannel<E> {
277     val channel = Channel<E>(capacity, onBufferOverflow)
278     val newContext = newCoroutineContext(context)
279     val coroutine = ProducerCoroutine(newContext, channel)
280     if (onCompletion != null) coroutine.invokeOnCompletion(handler = onCompletion)
281     coroutine.start(start, coroutine, block)
282     return coroutine
283 }
284 
285 private class ProducerCoroutine<E>(
286     parentContext: CoroutineContext, channel: Channel<E>
287 ) : ChannelCoroutine<E>(parentContext, channel, true, active = true), ProducerScope<E> {
288     override val isActive: Boolean
289         get() = super.isActive
290 
onCompletednull291     override fun onCompleted(value: Unit) {
292         _channel.close()
293     }
294 
onCancellednull295     override fun onCancelled(cause: Throwable, handled: Boolean) {
296         val processed = _channel.close(cause)
297         if (!processed && !handled) handleCoroutineException(context, cause)
298     }
299 }
300