• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.channels
6 
7 import kotlinx.coroutines.*
8 import kotlin.coroutines.*
9 
10 /**
11  * Scope for [produce][CoroutineScope.produce] coroutine builder.
12  *
13  * **Note: This is an experimental api.** Behaviour of producers that work as children in a parent scope with respect
14  *        to cancellation and error handling may change in the future.
15  */
16 @ExperimentalCoroutinesApi
17 public interface ProducerScope<in E> : CoroutineScope, SendChannel<E> {
18     /**
19      * A reference to the channel that this coroutine [sends][send] elements to.
20      * It is provided for convenience, so that the code in the coroutine can refer
21      * to the channel as `channel` as apposed to `this`.
22      * All the [SendChannel] functions on this interface delegate to
23      * the channel instance returned by this function.
24      */
25     val channel: SendChannel<E>
26 }
27 
28 /**
29  * Suspends the current coroutine until the channel is either [closed][SendChannel.close] or [cancelled][ReceiveChannel.cancel]
30  * and invokes the given [block] before resuming the coroutine.
31  *
32  * Note that when producer channel is cancelled this function resumes with cancellation exception,
33  * so putting the code after calling this function would not lead to its execution in case of cancellation.
34  * That is why this code takes a lambda parameter.
35  *
36  * Example of usage:
37  * ```
38  * val callbackEventsStream = produce {
39  *     val disposable = registerChannelInCallback(channel)
40  *     awaitClose { disposable.dispose() }
41  * }
42  * ```
43  */
44 @ExperimentalCoroutinesApi
<lambda>null45 public suspend fun ProducerScope<*>.awaitClose(block: () -> Unit = {}) {
<lambda>null46     check(kotlin.coroutines.coroutineContext[Job] === this) { "awaitClose() can be invoke only from the producer context" }
47     try {
contnull48         suspendCancellableCoroutine<Unit> { cont ->
49             invokeOnClose {
50                 cont.resume(Unit)
51             }
52         }
53     } finally {
54         block()
55     }
56 }
57 
58 /**
59  * Launches new coroutine to produce a stream of values by sending them to a channel
60  * and returns a reference to the coroutine as a [ReceiveChannel]. This resulting
61  * object can be used to [receive][ReceiveChannel.receive] elements produced by this coroutine.
62  *
63  * The scope of the coroutine contains [ProducerScope] interface, which implements
64  * both [CoroutineScope] and [SendChannel], so that coroutine can invoke
65  * [send][SendChannel.send] directly. The channel is [closed][SendChannel.close]
66  * when the coroutine completes.
67  * The running coroutine is cancelled when its receive channel is [cancelled][ReceiveChannel.cancel].
68  *
69  * Coroutine context is inherited from a [CoroutineScope], additional context elements can be specified with [context] argument.
70  * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
71  * The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden
72  * with corresponding [coroutineContext] element.
73  *
74  * Uncaught exceptions in this coroutine close the channel with this exception as a cause and
75  * the resulting channel becomes _failed_, so that any attempt to receive from such a channel throws exception.
76  *
77  * The kind of the resulting channel depends on the specified [capacity] parameter.
78  * See [Channel] interface documentation for details.
79  *
80  * See [newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine.
81  *
82  * **Note: This is an experimental api.** Behaviour of producers that work as children in a parent scope with respect
83  *        to cancellation and error handling may change in the future.
84  *
85  * @param context additional to [CoroutineScope.coroutineContext] context of the coroutine.
86  * @param capacity capacity of the channel's buffer (no buffer by default).
87  * @param block the coroutine code.
88  */
89 @ExperimentalCoroutinesApi
producenull90 public fun <E> CoroutineScope.produce(
91     context: CoroutineContext = EmptyCoroutineContext,
92     capacity: Int = 0,
93     @BuilderInference block: suspend ProducerScope<E>.() -> Unit
94 ): ReceiveChannel<E> {
95     val channel = Channel<E>(capacity)
96     val newContext = newCoroutineContext(context)
97     val coroutine = ProducerCoroutine(newContext, channel)
98     coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
99     return coroutine
100 }
101 
102 /**
103  * This an internal API and should not be used from general code.**
104  * onCompletion parameter will be redesigned.
105  * If you have to use `onCompletion` operator, please report to https://github.com/Kotlin/kotlinx.coroutines/issues/.
106  * As a temporary solution, [invokeOnCompletion][Job.invokeOnCompletion] can be used instead:
107  * ```
108  * fun <E> ReceiveChannel<E>.myOperator(): ReceiveChannel<E> = GlobalScope.produce(Dispatchers.Unconfined) {
109  *     coroutineContext[Job]?.invokeOnCompletion { consumes() }
110  * }
111  * ```
112  * @suppress
113  */
114 @InternalCoroutinesApi
producenull115 public fun <E> CoroutineScope.produce(
116     context: CoroutineContext = EmptyCoroutineContext,
117     capacity: Int = 0,
118     onCompletion: CompletionHandler? = null,
119     @BuilderInference block: suspend ProducerScope<E>.() -> Unit
120 ): ReceiveChannel<E> {
121     val channel = Channel<E>(capacity)
122     val newContext = newCoroutineContext(context)
123     val coroutine = ProducerCoroutine(newContext, channel)
124     if (onCompletion != null) coroutine.invokeOnCompletion(handler = onCompletion)
125     coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
126     return coroutine
127 }
128 
129 internal open class ProducerCoroutine<E>(
130     parentContext: CoroutineContext, channel: Channel<E>
131 ) : ChannelCoroutine<E>(parentContext, channel, active = true), ProducerScope<E> {
132     override val isActive: Boolean
133         get() = super.isActive
134 
onCompletednull135     override fun onCompleted(value: Unit) {
136         _channel.close()
137     }
138 
onCancellednull139     override fun onCancelled(cause: Throwable, handled: Boolean) {
140         val processed = _channel.close(cause)
141         if (!processed && !handled) handleCoroutineException(context, cause)
142     }
143 }
144