<lambda>null1 package kotlinx.coroutines.channels
2
3 import kotlinx.coroutines.*
4 import kotlin.coroutines.*
5 import kotlinx.coroutines.flow.*
6
7 /**
8 * Scope for the [produce][CoroutineScope.produce], [callbackFlow] and [channelFlow] builders.
9 */
10 public interface ProducerScope<in E> : CoroutineScope, SendChannel<E> {
11 /**
12 * A reference to the channel this coroutine [sends][send] elements to.
13 * It is provided for convenience, so that the code in the coroutine can refer
14 * to the channel as `channel` as opposed to `this`.
15 * All the [SendChannel] functions on this interface delegate to
16 * the channel instance returned by this property.
17 */
18 public val channel: SendChannel<E>
19 }
20
21 /**
22 * Suspends the current coroutine until the channel is either
23 * [closed][SendChannel.close] or [cancelled][ReceiveChannel.cancel].
24 *
25 * The given [block] will be executed unconditionally before this function returns.
26 * `awaitClose { cleanup() }` is a convenient shorthand for the often useful form
27 * `try { awaitClose() } finally { cleanup() }`.
28 *
29 * This function can only be invoked directly inside the same coroutine that is its receiver.
30 * Specifying the receiver of [awaitClose] explicitly is most probably a mistake.
31 *
32 * This suspending function is cancellable: if the [Job] of the current coroutine is [cancelled][CoroutineScope.cancel]
33 * while this suspending function is waiting, this function immediately resumes with [CancellationException].
34 * There is a **prompt cancellation guarantee**: even if this function is ready to return, but was cancelled
35 * while suspended, [CancellationException] will be thrown. See [suspendCancellableCoroutine] for low-level details.
36 *
37 * Example of usage:
38 * ```
39 * val callbackEventsStream = produce {
40 * val disposable = registerChannelInCallback(channel)
41 * awaitClose { disposable.dispose() }
42 * }
43 * ```
44 *
45 * Internally, [awaitClose] is implemented using [SendChannel.invokeOnClose].
46 * Currently, every channel can have at most one [SendChannel.invokeOnClose] handler.
47 * This means that calling [awaitClose] several times in a row or combining it with other [SendChannel.invokeOnClose]
48 * invocations is prohibited.
49 * An [IllegalStateException] will be thrown if this rule is broken.
50 *
51 * **Pitfall**: when used in [produce], if the channel is [cancelled][ReceiveChannel.cancel], [awaitClose] can either
52 * return normally or throw a [CancellationException] due to a race condition.
53 * The reason is that, for [produce], cancelling the channel and cancelling the coroutine of the [ProducerScope] is
54 * done simultaneously.
55 *
56 * @throws IllegalStateException if invoked from outside the [ProducerScope] (by leaking `this` outside the producer
57 * coroutine).
58 * @throws IllegalStateException if this channel already has a [SendChannel.invokeOnClose] handler registered.
59 */
<lambda>null60 public suspend fun ProducerScope<*>.awaitClose(block: () -> Unit = {}) {
<lambda>null61 check(kotlin.coroutines.coroutineContext[Job] === this) { "awaitClose() can only be invoked from the producer context" }
62 try {
contnull63 suspendCancellableCoroutine<Unit> { cont ->
64 invokeOnClose {
65 cont.resume(Unit)
66 }
67 }
68 } finally {
69 block()
70 }
71 }
72
73 /**
74 * Launches a new coroutine to produce a stream of values by sending them to a channel
75 * and returns a reference to the coroutine as a [ReceiveChannel]. This resulting
76 * object can be used to [receive][ReceiveChannel.receive] elements produced by this coroutine.
77 *
78 * The scope of the coroutine contains the [ProducerScope] interface, which implements
79 * both [CoroutineScope] and [SendChannel], so that the coroutine can invoke [send][SendChannel.send] directly.
80 *
81 * The kind of the resulting channel depends on the specified [capacity] parameter.
82 * See the [Channel] interface documentation for details.
83 * By default, an unbuffered channel is created.
84 * If an invalid [capacity] value is specified, an [IllegalArgumentException] is thrown.
85 *
86 * ### Behavior on termination
87 *
88 * The channel is [closed][SendChannel.close] when the coroutine completes.
89 *
90 * ```
91 * val values = listOf(1, 2, 3, 4)
92 * val channel = produce<Int> {
93 * for (value in values) {
94 * send(value)
95 * }
96 * }
97 * check(channel.toList() == values)
98 * ```
99 *
100 * The running coroutine is cancelled when the channel is [cancelled][ReceiveChannel.cancel].
101 *
102 * ```
103 * val channel = produce<Int> {
104 * send(1)
105 * send(2)
106 * try {
107 * send(3) // will throw CancellationException
108 * } catch (e: CancellationException) {
109 * println("The channel was cancelled!)
110 * throw e // always rethrow CancellationException
111 * }
112 * }
113 * check(channel.receive() == 1)
114 * check(channel.receive() == 2)
115 * channel.cancel()
116 * ```
117 *
118 * If this coroutine finishes with an exception, it will close the channel with that exception as the cause,
119 * so after receiving all the existing elements,
120 * all further attempts to receive from it will throw the exception with which the coroutine finished.
121 *
122 * ```
123 * val produceJob = Job()
124 * // create and populate a channel with a buffer
125 * val channel = produce<Int>(produceJob, capacity = Channel.UNLIMITED) {
126 * repeat(5) { send(it) }
127 * throw TestException()
128 * }
129 * produceJob.join() // wait for `produce` to fail
130 * check(produceJob.isCancelled == true)
131 * // prints 0, 1, 2, 3, 4, then throws `TestException`
132 * for (value in channel) { println(value) }
133 * ```
134 *
135 * When the coroutine is cancelled via structured concurrency and not the `cancel` function,
136 * the channel does not automatically close until the coroutine completes,
137 * so it is possible that some elements will be sent even after the coroutine is cancelled:
138 *
139 * ```
140 * val parentScope = CoroutineScope(Dispatchers.Default)
141 * val channel = parentScope.produce<Int>(capacity = Channel.UNLIMITED) {
142 * repeat(5) {
143 * send(it)
144 * }
145 * parentScope.cancel()
146 * // suspending after this point would fail, but sending succeeds
147 * send(-1)
148 * }
149 * for (c in channel) {
150 * println(c) // 0, 1, 2, 3, 4, -1
151 * } // throws a `CancellationException` exception after reaching -1
152 * ```
153 *
154 * Note that cancelling `produce` via structured concurrency closes the channel with a cause.
155 *
156 * The behavior around coroutine cancellation and error handling is experimental and may change in a future release.
157 *
158 * ### Coroutine context
159 *
160 * The coroutine context is inherited from this [CoroutineScope]. Additional context elements can be specified with the [context] argument.
161 * If the context does not have any dispatcher or other [ContinuationInterceptor], then [Dispatchers.Default] is used.
162 * The parent job is inherited from the [CoroutineScope] as well, but it can also be overridden
163 * with a corresponding [context] element.
164 *
165 * See [newCoroutineContext] for a description of debugging facilities available for newly created coroutines.
166 *
167 * ### Undelivered elements
168 *
169 * Some values that [produce] creates may be lost:
170 *
171 * ```
172 * val channel = produce(Dispatchers.Default, capacity = 5) {
173 * repeat(100) {
174 * send(it)
175 * println("Sent $it")
176 * }
177 * }
178 * channel.cancel() // no elements can be received after this!
179 * ```
180 *
181 * There is no way to recover these lost elements.
182 * If this is unsuitable, please create a [Channel] manually and pass the `onUndeliveredElement` callback to the
183 * constructor: [Channel(onUndeliveredElement = ...)][Channel].
184 *
185 * ### Usage example
186 *
187 * ```
188 * /* Generate random integers until we find the square root of 9801.
189 * To calculate whether the given number is that square root,
190 * use several coroutines that separately process these integers.
191 * Alternatively, we may randomly give up during value generation.
192 * `produce` is used to generate the integers and put them into a
193 * channel, from which the square-computing coroutines take them. */
194 * val parentScope = CoroutineScope(SupervisorJob())
195 * val channel = parentScope.produce<Int>(
196 * Dispatchers.IO,
197 * capacity = 16 // buffer of size 16
198 * ) {
199 * // this code will run on Dispatchers.IO
200 * while (true) {
201 * val request = run {
202 * // simulate waiting for the next request
203 * delay(5.milliseconds)
204 * val randomInt = Random.nextInt(-1, 100)
205 * if (randomInt == -1) {
206 * // external termination request received
207 * println("Producer: no longer accepting requests")
208 * return@produce
209 * }
210 * println("Producer: sending a request ($randomInt)")
211 * randomInt
212 * }
213 * send(request)
214 * }
215 * }
216 * // Launch consumers
217 * repeat(4) {
218 * launch(Dispatchers.Default) {
219 * for (request in channel) {
220 * // simulate processing a request
221 * delay(25.milliseconds)
222 * println("Consumer $it: received a request ($request)")
223 * if (request * request == 9801) {
224 * println("Consumer $it found the square root of 9801!")
225 * /* the work is done, the producer may finish.
226 * the internal termination request will cancel
227 * the producer on the next suspension point. */
228 * channel.cancel()
229 * }
230 * }
231 * }
232 * }
233 * ```
234 *
235 * **Note: This is an experimental api.** Behaviour of producers that work as children in a parent scope with respect
236 * to cancellation and error handling may change in the future.
237 */
238 @ExperimentalCoroutinesApi
producenull239 public fun <E> CoroutineScope.produce(
240 context: CoroutineContext = EmptyCoroutineContext,
241 capacity: Int = Channel.RENDEZVOUS,
242 @BuilderInference block: suspend ProducerScope<E>.() -> Unit
243 ): ReceiveChannel<E> =
244 produce(context, capacity, BufferOverflow.SUSPEND, CoroutineStart.DEFAULT, onCompletion = null, block = block)
245
246 /**
247 * **This is an internal API and should not be used from general code.**
248 * The `onCompletion` parameter will be redesigned.
249 * If you have to use the `onCompletion` operator, please report to https://github.com/Kotlin/kotlinx.coroutines/issues/.
250 * As a temporary solution, [invokeOnCompletion][Job.invokeOnCompletion] can be used instead:
251 * ```
252 * fun <E> ReceiveChannel<E>.myOperator(): ReceiveChannel<E> = GlobalScope.produce(Dispatchers.Unconfined) {
253 * coroutineContext[Job]?.invokeOnCompletion { consumes() }
254 * }
255 * ```
256 * @suppress
257 */
258 @InternalCoroutinesApi
259 public fun <E> CoroutineScope.produce(
260 context: CoroutineContext = EmptyCoroutineContext,
261 capacity: Int = 0,
262 start: CoroutineStart = CoroutineStart.DEFAULT,
263 onCompletion: CompletionHandler? = null,
264 @BuilderInference block: suspend ProducerScope<E>.() -> Unit
265 ): ReceiveChannel<E> =
266 produce(context, capacity, BufferOverflow.SUSPEND, start, onCompletion, block)
267
268 // Internal version of produce that is maximally flexible, but is not exposed through public API (too many params)
269 internal fun <E> CoroutineScope.produce(
270 context: CoroutineContext = EmptyCoroutineContext,
271 capacity: Int = 0,
272 onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
273 start: CoroutineStart = CoroutineStart.DEFAULT,
274 onCompletion: CompletionHandler? = null,
275 @BuilderInference block: suspend ProducerScope<E>.() -> Unit
276 ): ReceiveChannel<E> {
277 val channel = Channel<E>(capacity, onBufferOverflow)
278 val newContext = newCoroutineContext(context)
279 val coroutine = ProducerCoroutine(newContext, channel)
280 if (onCompletion != null) coroutine.invokeOnCompletion(handler = onCompletion)
281 coroutine.start(start, coroutine, block)
282 return coroutine
283 }
284
285 private class ProducerCoroutine<E>(
286 parentContext: CoroutineContext, channel: Channel<E>
287 ) : ChannelCoroutine<E>(parentContext, channel, true, active = true), ProducerScope<E> {
288 override val isActive: Boolean
289 get() = super.isActive
290
onCompletednull291 override fun onCompleted(value: Unit) {
292 _channel.close()
293 }
294
onCancellednull295 override fun onCancelled(cause: Throwable, handled: Boolean) {
296 val processed = _channel.close(cause)
297 if (!processed && !handled) handleCoroutineException(context, cause)
298 }
299 }
300