1 /*
<lambda>null2 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 package kotlinx.coroutines.channels
6
7 import kotlinx.coroutines.*
8 import kotlin.coroutines.*
9
10 /**
11 * Scope for [produce][CoroutineScope.produce] coroutine builder.
12 *
13 * **Note: This is an experimental api.** Behaviour of producers that work as children in a parent scope with respect
14 * to cancellation and error handling may change in the future.
15 */
16 @ExperimentalCoroutinesApi
17 public interface ProducerScope<in E> : CoroutineScope, SendChannel<E> {
18 /**
19 * A reference to the channel that this coroutine [sends][send] elements to.
20 * It is provided for convenience, so that the code in the coroutine can refer
21 * to the channel as `channel` as apposed to `this`.
22 * All the [SendChannel] functions on this interface delegate to
23 * the channel instance returned by this function.
24 */
25 val channel: SendChannel<E>
26 }
27
28 /**
29 * Suspends the current coroutine until the channel is either [closed][SendChannel.close] or [cancelled][ReceiveChannel.cancel]
30 * and invokes the given [block] before resuming the coroutine.
31 *
32 * Note that when producer channel is cancelled this function resumes with cancellation exception,
33 * so putting the code after calling this function would not lead to its execution in case of cancellation.
34 * That is why this code takes a lambda parameter.
35 *
36 * Example of usage:
37 * ```
38 * val callbackEventsStream = produce {
39 * val disposable = registerChannelInCallback(channel)
40 * awaitClose { disposable.dispose() }
41 * }
42 * ```
43 */
44 @ExperimentalCoroutinesApi
<lambda>null45 public suspend fun ProducerScope<*>.awaitClose(block: () -> Unit = {}) {
<lambda>null46 check(kotlin.coroutines.coroutineContext[Job] === this) { "awaitClose() can be invoke only from the producer context" }
47 try {
contnull48 suspendCancellableCoroutine<Unit> { cont ->
49 invokeOnClose {
50 cont.resume(Unit)
51 }
52 }
53 } finally {
54 block()
55 }
56 }
57
58 /**
59 * Launches new coroutine to produce a stream of values by sending them to a channel
60 * and returns a reference to the coroutine as a [ReceiveChannel]. This resulting
61 * object can be used to [receive][ReceiveChannel.receive] elements produced by this coroutine.
62 *
63 * The scope of the coroutine contains [ProducerScope] interface, which implements
64 * both [CoroutineScope] and [SendChannel], so that coroutine can invoke
65 * [send][SendChannel.send] directly. The channel is [closed][SendChannel.close]
66 * when the coroutine completes.
67 * The running coroutine is cancelled when its receive channel is [cancelled][ReceiveChannel.cancel].
68 *
69 * Coroutine context is inherited from a [CoroutineScope], additional context elements can be specified with [context] argument.
70 * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
71 * The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden
72 * with corresponding [coroutineContext] element.
73 *
74 * Uncaught exceptions in this coroutine close the channel with this exception as a cause and
75 * the resulting channel becomes _failed_, so that any attempt to receive from such a channel throws exception.
76 *
77 * The kind of the resulting channel depends on the specified [capacity] parameter.
78 * See [Channel] interface documentation for details.
79 *
80 * See [newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine.
81 *
82 * **Note: This is an experimental api.** Behaviour of producers that work as children in a parent scope with respect
83 * to cancellation and error handling may change in the future.
84 *
85 * @param context additional to [CoroutineScope.coroutineContext] context of the coroutine.
86 * @param capacity capacity of the channel's buffer (no buffer by default).
87 * @param block the coroutine code.
88 */
89 @ExperimentalCoroutinesApi
producenull90 public fun <E> CoroutineScope.produce(
91 context: CoroutineContext = EmptyCoroutineContext,
92 capacity: Int = 0,
93 @BuilderInference block: suspend ProducerScope<E>.() -> Unit
94 ): ReceiveChannel<E> {
95 val channel = Channel<E>(capacity)
96 val newContext = newCoroutineContext(context)
97 val coroutine = ProducerCoroutine(newContext, channel)
98 coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
99 return coroutine
100 }
101
102 /**
103 * This an internal API and should not be used from general code.**
104 * onCompletion parameter will be redesigned.
105 * If you have to use `onCompletion` operator, please report to https://github.com/Kotlin/kotlinx.coroutines/issues/.
106 * As a temporary solution, [invokeOnCompletion][Job.invokeOnCompletion] can be used instead:
107 * ```
108 * fun <E> ReceiveChannel<E>.myOperator(): ReceiveChannel<E> = GlobalScope.produce(Dispatchers.Unconfined) {
109 * coroutineContext[Job]?.invokeOnCompletion { consumes() }
110 * }
111 * ```
112 * @suppress
113 */
114 @InternalCoroutinesApi
producenull115 public fun <E> CoroutineScope.produce(
116 context: CoroutineContext = EmptyCoroutineContext,
117 capacity: Int = 0,
118 onCompletion: CompletionHandler? = null,
119 @BuilderInference block: suspend ProducerScope<E>.() -> Unit
120 ): ReceiveChannel<E> {
121 val channel = Channel<E>(capacity)
122 val newContext = newCoroutineContext(context)
123 val coroutine = ProducerCoroutine(newContext, channel)
124 if (onCompletion != null) coroutine.invokeOnCompletion(handler = onCompletion)
125 coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
126 return coroutine
127 }
128
129 internal open class ProducerCoroutine<E>(
130 parentContext: CoroutineContext, channel: Channel<E>
131 ) : ChannelCoroutine<E>(parentContext, channel, active = true), ProducerScope<E> {
132 override val isActive: Boolean
133 get() = super.isActive
134
onCompletednull135 override fun onCompleted(value: Unit) {
136 _channel.close()
137 }
138
onCancellednull139 override fun onCancelled(cause: Throwable, handled: Boolean) {
140 val processed = _channel.close(cause)
141 if (!processed && !handled) handleCoroutineException(context, cause)
142 }
143 }
144