• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download

<lambda>null1 package kotlinx.coroutines.flow.internal
2 
3 import kotlinx.coroutines.*
4 import kotlinx.coroutines.flow.*
5 import kotlinx.coroutines.internal.ScopeCoroutine
6 import kotlin.coroutines.*
7 import kotlin.jvm.*
8 
9 // Collector that ensures exception transparency and context preservation on a best-effort basis.
10 // See an explanation in SafeCollector JVM actualization.
11 internal expect class SafeCollector<T>(
12     collector: FlowCollector<T>,
13     collectContext: CoroutineContext
14 ) : FlowCollector<T> {
15     internal val collector: FlowCollector<T>
16     internal val collectContext: CoroutineContext
17     internal val collectContextSize: Int
18     public fun releaseIntercepted()
19     public override suspend fun emit(value: T)
20 }
21 
22 @JvmName("checkContext") // For prettier stack traces
checkContextnull23 internal fun SafeCollector<*>.checkContext(currentContext: CoroutineContext) {
24     val result = currentContext.fold(0) fold@{ count, element ->
25         val key = element.key
26         val collectElement = collectContext[key]
27         if (key !== Job) {
28             return@fold if (element !== collectElement) Int.MIN_VALUE
29             else count + 1
30         }
31 
32         val collectJob = collectElement as Job?
33         val emissionParentJob = (element as Job).transitiveCoroutineParent(collectJob)
34         /*
35          * Code like
36          * ```
37          * coroutineScope {
38          *     launch {
39          *         emit(1)
40          *     }
41          *
42          *     launch {
43          *         emit(2)
44          *     }
45          * }
46          * ```
47          * is prohibited because 'emit' is not thread-safe by default. Use 'channelFlow' instead if you need concurrent emission
48          * or want to switch context dynamically (e.g. with `withContext`).
49          *
50          * Note that collecting from another coroutine is allowed, e.g.:
51          * ```
52          * coroutineScope {
53          *     val channel = produce {
54          *         collect { value ->
55          *             send(value)
56          *         }
57          *     }
58          *     channel.consumeEach { value ->
59          *         emit(value)
60          *     }
61          * }
62          * ```
63          * is a completely valid.
64          */
65         if (emissionParentJob !== collectJob) {
66             error(
67                 "Flow invariant is violated:\n" +
68                         "\t\tEmission from another coroutine is detected.\n" +
69                         "\t\tChild of $emissionParentJob, expected child of $collectJob.\n" +
70                         "\t\tFlowCollector is not thread-safe and concurrent emissions are prohibited.\n" +
71                         "\t\tTo mitigate this restriction please use 'channelFlow' builder instead of 'flow'"
72             )
73         }
74 
75         /*
76          * If collect job is null (-> EmptyCoroutineContext, probably run from `suspend fun main`), then invariant is maintained
77          * (common transitive parent is "null"), but count check will fail, so just do not count job context element when
78          * flow is collected from EmptyCoroutineContext
79          */
80         if (collectJob == null) count else count + 1
81     }
82     if (result != collectContextSize) {
83         error(
84             "Flow invariant is violated:\n" +
85                     "\t\tFlow was collected in $collectContext,\n" +
86                     "\t\tbut emission happened in $currentContext.\n" +
87                     "\t\tPlease refer to 'flow' documentation or use 'flowOn' instead"
88         )
89     }
90 }
91 
transitiveCoroutineParentnull92 internal tailrec fun Job?.transitiveCoroutineParent(collectJob: Job?): Job? {
93     if (this === null) return null
94     if (this === collectJob) return this
95     if (this !is ScopeCoroutine<*>) return this
96     return parent.transitiveCoroutineParent(collectJob)
97 }
98 
99 /**
100  * An analogue of the [flow] builder that does not check the context of execution of the resulting flow.
101  * Used in our own operators where we trust the context of invocations.
102  */
103 @PublishedApi
unsafeFlownull104 internal inline fun <T> unsafeFlow(@BuilderInference crossinline block: suspend FlowCollector<T>.() -> Unit): Flow<T> {
105     return object : Flow<T> {
106         override suspend fun collect(collector: FlowCollector<T>) {
107             collector.block()
108         }
109     }
110 }
111