• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.flow.internal
6 
7 import kotlinx.coroutines.*
8 import kotlinx.coroutines.channels.*
9 import kotlinx.coroutines.flow.*
10 import kotlinx.coroutines.internal.*
11 import kotlinx.coroutines.intrinsics.*
12 import kotlin.coroutines.*
13 import kotlin.coroutines.intrinsics.*
14 import kotlinx.coroutines.flow.internal.unsafeFlow as flow
15 
16 /**
17  * Creates a [CoroutineScope] and calls the specified suspend block with this scope.
18  * This builder is similar to [coroutineScope] with the only exception that it *ties* lifecycle of children
19  * and itself regarding the cancellation, thus being cancelled when one of the children becomes cancelled.
20  *
21  * For example:
22  * ```
23  * flowScope {
24  *     launch {
25  *         throw CancellationException()
26  *     }
27  * } // <- CE will be rethrown here
28  * ```
29  */
30 internal suspend fun <R> flowScope(@BuilderInference block: suspend CoroutineScope.() -> R): R =
31     suspendCoroutineUninterceptedOrReturn { uCont ->
32         val coroutine = FlowCoroutine(uCont.context, uCont)
33         coroutine.startUndispatchedOrReturn(coroutine, block)
34     }
35 
36 /**
37  * Creates a flow that also provides a [CoroutineScope] for each collector
38  * Shorthand for:
39  * ```
40  * flow {
41  *     flowScope {
42  *         ...
43  *     }
44  * }
45  * ```
46  * with additional constraint on cancellation.
47  * To cancel child without cancelling itself, `cancel(ChildCancelledException())` should be used.
48  */
scopedFlownull49 internal fun <R> scopedFlow(@BuilderInference block: suspend CoroutineScope.(FlowCollector<R>) -> Unit): Flow<R> =
50     flow {
51         flowScope { block(this@flow) }
52     }
53 
flowProducenull54 internal fun <T> CoroutineScope.flowProduce(
55     context: CoroutineContext,
56     capacity: Int = 0,
57     @BuilderInference block: suspend ProducerScope<T>.() -> Unit
58 ): ReceiveChannel<T> {
59     val channel = Channel<T>(capacity)
60     val newContext = newCoroutineContext(context)
61     val coroutine = FlowProduceCoroutine(newContext, channel)
62     coroutine.start(CoroutineStart.ATOMIC, coroutine, block)
63     return coroutine
64 }
65 
66 private class FlowCoroutine<T>(
67     context: CoroutineContext,
68     uCont: Continuation<T>
69 ) : ScopeCoroutine<T>(context, uCont) {
childCancellednull70     public override fun childCancelled(cause: Throwable): Boolean {
71         if (cause is ChildCancelledException) return true
72         return cancelImpl(cause)
73     }
74 }
75 
76 private class FlowProduceCoroutine<T>(
77     parentContext: CoroutineContext,
78     channel: Channel<T>
79 ) : ProducerCoroutine<T>(parentContext, channel) {
childCancellednull80     public override fun childCancelled(cause: Throwable): Boolean {
81         if (cause is ChildCancelledException) return true
82         return cancelImpl(cause)
83     }
84 }
85