• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 @file:OptIn(ExperimentalContracts::class, ObsoleteWorkersApi::class)
2 package kotlinx.coroutines
3 
4 import kotlinx.cinterop.*
5 import kotlin.contracts.*
6 import kotlin.coroutines.*
7 import kotlin.native.concurrent.*
8 
9 /**
10  * Runs a new coroutine and **blocks** the current thread _interruptibly_ until its completion.
11  *
12  * It is designed to bridge regular blocking code to libraries that are written in suspending style, to be used in
13  * `main` functions and in tests.
14  *
15  * Calling [runBlocking] from a suspend function is redundant.
16  * For example, the following code is incorrect:
17  * ```
18  * suspend fun loadConfiguration() {
19  *     // DO NOT DO THIS:
20  *     val data = runBlocking { // <- redundant and blocks the thread, do not do that
21  *         fetchConfigurationData() // suspending function
22  *     }
23  * ```
24  *
25  * Here, instead of releasing the thread on which `loadConfiguration` runs if `fetchConfigurationData` suspends, it will
26  * block, potentially leading to thread starvation issues.
27  *
28  * The default [CoroutineDispatcher] for this builder is an internal implementation of event loop that processes continuations
29  * in this blocked thread until the completion of this coroutine.
30  * See [CoroutineDispatcher] for the other implementations that are provided by `kotlinx.coroutines`.
31  *
32  * When [CoroutineDispatcher] is explicitly specified in the [context], then the new coroutine runs in the context of
33  * the specified dispatcher while the current thread is blocked. If the specified dispatcher is an event loop of another `runBlocking`,
34  * then this invocation uses the outer event loop.
35  *
36  * If this blocked thread is interrupted (see [Thread.interrupt]), then the coroutine job is cancelled and
37  * this `runBlocking` invocation throws [InterruptedException].
38  *
39  * See [newCoroutineContext][CoroutineScope.newCoroutineContext] for a description of debugging facilities that are available
40  * for a newly created coroutine.
41  *
42  * @param context the context of the coroutine. The default value is an event loop on the current thread.
43  * @param block the coroutine code.
44  */
runBlockingnull45 public actual fun <T> runBlocking(context: CoroutineContext, block: suspend CoroutineScope.() -> T): T {
46     contract {
47         callsInPlace(block, InvocationKind.EXACTLY_ONCE)
48     }
49     val contextInterceptor = context[ContinuationInterceptor]
50     val eventLoop: EventLoop?
51     val newContext: CoroutineContext
52     if (contextInterceptor == null) {
53         // create or use private event loop if no dispatcher is specified
54         eventLoop = ThreadLocalEventLoop.eventLoop
55         newContext = GlobalScope.newCoroutineContext(context + eventLoop)
56     } else {
57         // See if context's interceptor is an event loop that we shall use (to support TestContext)
58         // or take an existing thread-local event loop if present to avoid blocking it (but don't create one)
59         eventLoop = (contextInterceptor as? EventLoop)?.takeIf { it.shouldBeProcessedFromContext() }
60             ?: ThreadLocalEventLoop.currentOrNull()
61         newContext = GlobalScope.newCoroutineContext(context)
62     }
63     val coroutine = BlockingCoroutine<T>(newContext, eventLoop)
64     var completed = false
65     ThreadLocalKeepAlive.addCheck { !completed }
66     try {
67         @Suppress("LEAKED_IN_PLACE_LAMBDA") // Contract is preserved, invoked immediately or throws
68         coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
69         return coroutine.joinBlocking()
70     } finally {
71         completed = true
72     }
73 }
74 
75 @ThreadLocal
76 private object ThreadLocalKeepAlive {
77     /** If any of these checks passes, this means this [Worker] is still used. */
78     private var checks = mutableListOf<() -> Boolean>()
79 
80     /** Whether the worker currently tries to keep itself alive. */
81     private var keepAliveLoopActive = false
82 
83     /** Adds another stopgap that must be passed before the [Worker] can be terminated. */
addChecknull84     fun addCheck(terminationForbidden: () -> Boolean) {
85         checks.add(terminationForbidden)
86         if (!keepAliveLoopActive) keepAlive()
87     }
88 
89     /**
90      * Send a ping to the worker to prevent it from terminating while this coroutine is running,
91      * ensuring that continuations don't get dropped and forgotten.
92      */
keepAlivenull93     private fun keepAlive() {
94         // only keep the checks that still forbid the termination
95         checks = checks.filter { it() }.toMutableList()
96         // if there are no checks left, we no longer keep the worker alive, it can be terminated
97         keepAliveLoopActive = checks.isNotEmpty()
98         if (keepAliveLoopActive) {
99             Worker.current.executeAfter(afterMicroseconds = 100_000) {
100                 keepAlive()
101             }
102         }
103     }
104 }
105 
106 private class BlockingCoroutine<T>(
107     parentContext: CoroutineContext,
108     private val eventLoop: EventLoop?
109 ) : AbstractCoroutine<T>(parentContext, true, true) {
110     private val joinWorker = Worker.current
111 
112     override val isScopedCoroutine: Boolean get() = true
113 
afterCompletionnull114     override fun afterCompletion(state: Any?) {
115         // wake up blocked thread
116         if (joinWorker != Worker.current) {
117             // Unpark waiting worker
118             joinWorker.executeAfter(0L, {}) // send an empty task to unpark the waiting event loop
119         }
120     }
121 
122     @Suppress("UNCHECKED_CAST")
joinBlockingnull123     fun joinBlocking(): T {
124         try {
125             eventLoop?.incrementUseCount()
126             while (true) {
127                 var parkNanos: Long
128                 // Workaround for bug in BE optimizer that cannot eliminate boxing here
129                 if (eventLoop != null) {
130                     parkNanos = eventLoop.processNextEvent()
131                 } else {
132                     parkNanos = Long.MAX_VALUE
133                 }
134                 // note: processNextEvent may lose unpark flag, so check if completed before parking
135                 if (isCompleted) break
136                 joinWorker.park(parkNanos / 1000L, true)
137             }
138         } finally { // paranoia
139             eventLoop?.decrementUseCount()
140         }
141         // now return result
142         val state = state.unboxState()
143         (state as? CompletedExceptionally)?.let { throw it.cause }
144         return state as T
145     }
146 }
147