• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.flow.internal
6 
7 import kotlinx.coroutines.*
8 import kotlinx.coroutines.flow.*
9 import kotlinx.coroutines.internal.ScopeCoroutine
10 import kotlin.coroutines.*
11 import kotlin.jvm.*
12 
13 internal expect class SafeCollector<T>(
14     collector: FlowCollector<T>,
15     collectContext: CoroutineContext
16 ) : FlowCollector<T> {
17     internal val collector: FlowCollector<T>
18     internal val collectContext: CoroutineContext
19     internal val collectContextSize: Int
20     public fun releaseIntercepted()
21 }
22 
23 @JvmName("checkContext") // For prettier stack traces
checkContextnull24 internal fun SafeCollector<*>.checkContext(currentContext: CoroutineContext) {
25     val result = currentContext.fold(0) fold@{ count, element ->
26         val key = element.key
27         val collectElement = collectContext[key]
28         if (key !== Job) {
29             return@fold if (element !== collectElement) Int.MIN_VALUE
30             else count + 1
31         }
32 
33         val collectJob = collectElement as Job?
34         val emissionParentJob = (element as Job).transitiveCoroutineParent(collectJob)
35         /*
36          * Code like
37          * ```
38          * coroutineScope {
39          *     launch {
40          *         emit(1)
41          *     }
42          *
43          *     launch {
44          *         emit(2)
45          *     }
46          * }
47          * ```
48          * is prohibited because 'emit' is not thread-safe by default. Use 'channelFlow' instead if you need concurrent emission
49          * or want to switch context dynamically (e.g. with `withContext`).
50          *
51          * Note that collecting from another coroutine is allowed, e.g.:
52          * ```
53          * coroutineScope {
54          *     val channel = produce {
55          *         collect { value ->
56          *             send(value)
57          *         }
58          *     }
59          *     channel.consumeEach { value ->
60          *         emit(value)
61          *     }
62          * }
63          * ```
64          * is a completely valid.
65          */
66         if (emissionParentJob !== collectJob) {
67             error(
68                 "Flow invariant is violated:\n" +
69                         "\t\tEmission from another coroutine is detected.\n" +
70                         "\t\tChild of $emissionParentJob, expected child of $collectJob.\n" +
71                         "\t\tFlowCollector is not thread-safe and concurrent emissions are prohibited.\n" +
72                         "\t\tTo mitigate this restriction please use 'channelFlow' builder instead of 'flow'"
73             )
74         }
75 
76         /*
77          * If collect job is null (-> EmptyCoroutineContext, probably run from `suspend fun main`), then invariant is maintained
78          * (common transitive parent is "null"), but count check will fail, so just do not count job context element when
79          * flow is collected from EmptyCoroutineContext
80          */
81         if (collectJob == null) count else count + 1
82     }
83     if (result != collectContextSize) {
84         error(
85             "Flow invariant is violated:\n" +
86                     "\t\tFlow was collected in $collectContext,\n" +
87                     "\t\tbut emission happened in $currentContext.\n" +
88                     "\t\tPlease refer to 'flow' documentation or use 'flowOn' instead"
89         )
90     }
91 }
92 
transitiveCoroutineParentnull93 internal tailrec fun Job?.transitiveCoroutineParent(collectJob: Job?): Job? {
94     if (this === null) return null
95     if (this === collectJob) return this
96     if (this !is ScopeCoroutine<*>) return this
97     return parent.transitiveCoroutineParent(collectJob)
98 }
99 
100 /**
101  * An analogue of the [flow] builder that does not check the context of execution of the resulting flow.
102  * Used in our own operators where we trust the context of invocations.
103  */
104 @PublishedApi
unsafeFlownull105 internal inline fun <T> unsafeFlow(@BuilderInference crossinline block: suspend FlowCollector<T>.() -> Unit): Flow<T> {
106     return object : Flow<T> {
107         override suspend fun collect(collector: FlowCollector<T>) {
108             collector.block()
109         }
110     }
111 }
112