1 /* <lambda>null2 * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.flow.internal 6 7 import kotlinx.coroutines.* 8 import kotlinx.coroutines.flow.* 9 import kotlinx.coroutines.internal.ScopeCoroutine 10 import kotlin.coroutines.* 11 import kotlin.jvm.* 12 13 internal expect class SafeCollector<T>( 14 collector: FlowCollector<T>, 15 collectContext: CoroutineContext 16 ) : FlowCollector<T> { 17 internal val collector: FlowCollector<T> 18 internal val collectContext: CoroutineContext 19 internal val collectContextSize: Int 20 public fun releaseIntercepted() 21 } 22 23 @JvmName("checkContext") // For prettier stack traces checkContextnull24internal fun SafeCollector<*>.checkContext(currentContext: CoroutineContext) { 25 val result = currentContext.fold(0) fold@{ count, element -> 26 val key = element.key 27 val collectElement = collectContext[key] 28 if (key !== Job) { 29 return@fold if (element !== collectElement) Int.MIN_VALUE 30 else count + 1 31 } 32 33 val collectJob = collectElement as Job? 34 val emissionParentJob = (element as Job).transitiveCoroutineParent(collectJob) 35 /* 36 * Code like 37 * ``` 38 * coroutineScope { 39 * launch { 40 * emit(1) 41 * } 42 * 43 * launch { 44 * emit(2) 45 * } 46 * } 47 * ``` 48 * is prohibited because 'emit' is not thread-safe by default. Use 'channelFlow' instead if you need concurrent emission 49 * or want to switch context dynamically (e.g. with `withContext`). 50 * 51 * Note that collecting from another coroutine is allowed, e.g.: 52 * ``` 53 * coroutineScope { 54 * val channel = produce { 55 * collect { value -> 56 * send(value) 57 * } 58 * } 59 * channel.consumeEach { value -> 60 * emit(value) 61 * } 62 * } 63 * ``` 64 * is a completely valid. 65 */ 66 if (emissionParentJob !== collectJob) { 67 error( 68 "Flow invariant is violated:\n" + 69 "\t\tEmission from another coroutine is detected.\n" + 70 "\t\tChild of $emissionParentJob, expected child of $collectJob.\n" + 71 "\t\tFlowCollector is not thread-safe and concurrent emissions are prohibited.\n" + 72 "\t\tTo mitigate this restriction please use 'channelFlow' builder instead of 'flow'" 73 ) 74 } 75 76 /* 77 * If collect job is null (-> EmptyCoroutineContext, probably run from `suspend fun main`), then invariant is maintained 78 * (common transitive parent is "null"), but count check will fail, so just do not count job context element when 79 * flow is collected from EmptyCoroutineContext 80 */ 81 if (collectJob == null) count else count + 1 82 } 83 if (result != collectContextSize) { 84 error( 85 "Flow invariant is violated:\n" + 86 "\t\tFlow was collected in $collectContext,\n" + 87 "\t\tbut emission happened in $currentContext.\n" + 88 "\t\tPlease refer to 'flow' documentation or use 'flowOn' instead" 89 ) 90 } 91 } 92 transitiveCoroutineParentnull93internal tailrec fun Job?.transitiveCoroutineParent(collectJob: Job?): Job? { 94 if (this === null) return null 95 if (this === collectJob) return this 96 if (this !is ScopeCoroutine<*>) return this 97 return parent.transitiveCoroutineParent(collectJob) 98 } 99 100 /** 101 * An analogue of the [flow] builder that does not check the context of execution of the resulting flow. 102 * Used in our own operators where we trust the context of invocations. 103 */ 104 @PublishedApi unsafeFlownull105internal inline fun <T> unsafeFlow(@BuilderInference crossinline block: suspend FlowCollector<T>.() -> Unit): Flow<T> { 106 return object : Flow<T> { 107 override suspend fun collect(collector: FlowCollector<T>) { 108 collector.block() 109 } 110 } 111 } 112