1 /*
<lambda>null2 * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 package kotlinx.coroutines.flow.internal
6
7 import kotlinx.coroutines.*
8 import kotlinx.coroutines.channels.*
9 import kotlinx.coroutines.flow.*
10 import kotlinx.coroutines.internal.*
11 import kotlinx.coroutines.intrinsics.*
12 import kotlin.coroutines.*
13 import kotlin.coroutines.intrinsics.*
14 import kotlinx.coroutines.flow.internal.unsafeFlow as flow
15
16 /**
17 * Creates a [CoroutineScope] and calls the specified suspend block with this scope.
18 * This builder is similar to [coroutineScope] with the only exception that it *ties* lifecycle of children
19 * and itself regarding the cancellation, thus being cancelled when one of the children becomes cancelled.
20 *
21 * For example:
22 * ```
23 * flowScope {
24 * launch {
25 * throw CancellationException()
26 * }
27 * } // <- CE will be rethrown here
28 * ```
29 */
30 internal suspend fun <R> flowScope(@BuilderInference block: suspend CoroutineScope.() -> R): R =
31 suspendCoroutineUninterceptedOrReturn { uCont ->
32 val coroutine = FlowCoroutine(uCont.context, uCont)
33 coroutine.startUndispatchedOrReturn(coroutine, block)
34 }
35
36 /**
37 * Creates a flow that also provides a [CoroutineScope] for each collector
38 * Shorthand for:
39 * ```
40 * flow {
41 * flowScope {
42 * ...
43 * }
44 * }
45 * ```
46 * with additional constraint on cancellation.
47 * To cancel child without cancelling itself, `cancel(ChildCancelledException())` should be used.
48 */
scopedFlownull49 internal fun <R> scopedFlow(@BuilderInference block: suspend CoroutineScope.(FlowCollector<R>) -> Unit): Flow<R> =
50 flow {
51 flowScope { block(this@flow) }
52 }
53
flowProducenull54 internal fun <T> CoroutineScope.flowProduce(
55 context: CoroutineContext,
56 capacity: Int = 0,
57 @BuilderInference block: suspend ProducerScope<T>.() -> Unit
58 ): ReceiveChannel<T> {
59 val channel = Channel<T>(capacity)
60 val newContext = newCoroutineContext(context)
61 val coroutine = FlowProduceCoroutine(newContext, channel)
62 coroutine.start(CoroutineStart.ATOMIC, coroutine, block)
63 return coroutine
64 }
65
66 private class FlowCoroutine<T>(
67 context: CoroutineContext,
68 uCont: Continuation<T>
69 ) : ScopeCoroutine<T>(context, uCont) {
childCancellednull70 public override fun childCancelled(cause: Throwable): Boolean {
71 if (cause is ChildCancelledException) return true
72 return cancelImpl(cause)
73 }
74 }
75
76 private class FlowProduceCoroutine<T>(
77 parentContext: CoroutineContext,
78 channel: Channel<T>
79 ) : ProducerCoroutine<T>(parentContext, channel) {
childCancellednull80 public override fun childCancelled(cause: Throwable): Boolean {
81 if (cause is ChildCancelledException) return true
82 return cancelImpl(cause)
83 }
84 }
85