<lambda>null1package kotlinx.coroutines.flow.internal 2 3 import kotlinx.coroutines.* 4 import kotlinx.coroutines.flow.* 5 import kotlinx.coroutines.internal.ScopeCoroutine 6 import kotlin.coroutines.* 7 import kotlin.jvm.* 8 9 // Collector that ensures exception transparency and context preservation on a best-effort basis. 10 // See an explanation in SafeCollector JVM actualization. 11 internal expect class SafeCollector<T>( 12 collector: FlowCollector<T>, 13 collectContext: CoroutineContext 14 ) : FlowCollector<T> { 15 internal val collector: FlowCollector<T> 16 internal val collectContext: CoroutineContext 17 internal val collectContextSize: Int 18 public fun releaseIntercepted() 19 public override suspend fun emit(value: T) 20 } 21 22 @JvmName("checkContext") // For prettier stack traces checkContextnull23internal fun SafeCollector<*>.checkContext(currentContext: CoroutineContext) { 24 val result = currentContext.fold(0) fold@{ count, element -> 25 val key = element.key 26 val collectElement = collectContext[key] 27 if (key !== Job) { 28 return@fold if (element !== collectElement) Int.MIN_VALUE 29 else count + 1 30 } 31 32 val collectJob = collectElement as Job? 33 val emissionParentJob = (element as Job).transitiveCoroutineParent(collectJob) 34 /* 35 * Code like 36 * ``` 37 * coroutineScope { 38 * launch { 39 * emit(1) 40 * } 41 * 42 * launch { 43 * emit(2) 44 * } 45 * } 46 * ``` 47 * is prohibited because 'emit' is not thread-safe by default. Use 'channelFlow' instead if you need concurrent emission 48 * or want to switch context dynamically (e.g. with `withContext`). 49 * 50 * Note that collecting from another coroutine is allowed, e.g.: 51 * ``` 52 * coroutineScope { 53 * val channel = produce { 54 * collect { value -> 55 * send(value) 56 * } 57 * } 58 * channel.consumeEach { value -> 59 * emit(value) 60 * } 61 * } 62 * ``` 63 * is a completely valid. 64 */ 65 if (emissionParentJob !== collectJob) { 66 error( 67 "Flow invariant is violated:\n" + 68 "\t\tEmission from another coroutine is detected.\n" + 69 "\t\tChild of $emissionParentJob, expected child of $collectJob.\n" + 70 "\t\tFlowCollector is not thread-safe and concurrent emissions are prohibited.\n" + 71 "\t\tTo mitigate this restriction please use 'channelFlow' builder instead of 'flow'" 72 ) 73 } 74 75 /* 76 * If collect job is null (-> EmptyCoroutineContext, probably run from `suspend fun main`), then invariant is maintained 77 * (common transitive parent is "null"), but count check will fail, so just do not count job context element when 78 * flow is collected from EmptyCoroutineContext 79 */ 80 if (collectJob == null) count else count + 1 81 } 82 if (result != collectContextSize) { 83 error( 84 "Flow invariant is violated:\n" + 85 "\t\tFlow was collected in $collectContext,\n" + 86 "\t\tbut emission happened in $currentContext.\n" + 87 "\t\tPlease refer to 'flow' documentation or use 'flowOn' instead" 88 ) 89 } 90 } 91 transitiveCoroutineParentnull92internal tailrec fun Job?.transitiveCoroutineParent(collectJob: Job?): Job? { 93 if (this === null) return null 94 if (this === collectJob) return this 95 if (this !is ScopeCoroutine<*>) return this 96 return parent.transitiveCoroutineParent(collectJob) 97 } 98 99 /** 100 * An analogue of the [flow] builder that does not check the context of execution of the resulting flow. 101 * Used in our own operators where we trust the context of invocations. 102 */ 103 @PublishedApi unsafeFlownull104internal inline fun <T> unsafeFlow(@BuilderInference crossinline block: suspend FlowCollector<T>.() -> Unit): Flow<T> { 105 return object : Flow<T> { 106 override suspend fun collect(collector: FlowCollector<T>) { 107 collector.block() 108 } 109 } 110 } 111