1 @file:OptIn(ExperimentalContracts::class, ObsoleteWorkersApi::class)
2 package kotlinx.coroutines
3
4 import kotlinx.cinterop.*
5 import kotlin.contracts.*
6 import kotlin.coroutines.*
7 import kotlin.native.concurrent.*
8
9 /**
10 * Runs a new coroutine and **blocks** the current thread _interruptibly_ until its completion.
11 *
12 * It is designed to bridge regular blocking code to libraries that are written in suspending style, to be used in
13 * `main` functions and in tests.
14 *
15 * Calling [runBlocking] from a suspend function is redundant.
16 * For example, the following code is incorrect:
17 * ```
18 * suspend fun loadConfiguration() {
19 * // DO NOT DO THIS:
20 * val data = runBlocking { // <- redundant and blocks the thread, do not do that
21 * fetchConfigurationData() // suspending function
22 * }
23 * ```
24 *
25 * Here, instead of releasing the thread on which `loadConfiguration` runs if `fetchConfigurationData` suspends, it will
26 * block, potentially leading to thread starvation issues.
27 *
28 * The default [CoroutineDispatcher] for this builder is an internal implementation of event loop that processes continuations
29 * in this blocked thread until the completion of this coroutine.
30 * See [CoroutineDispatcher] for the other implementations that are provided by `kotlinx.coroutines`.
31 *
32 * When [CoroutineDispatcher] is explicitly specified in the [context], then the new coroutine runs in the context of
33 * the specified dispatcher while the current thread is blocked. If the specified dispatcher is an event loop of another `runBlocking`,
34 * then this invocation uses the outer event loop.
35 *
36 * If this blocked thread is interrupted (see [Thread.interrupt]), then the coroutine job is cancelled and
37 * this `runBlocking` invocation throws [InterruptedException].
38 *
39 * See [newCoroutineContext][CoroutineScope.newCoroutineContext] for a description of debugging facilities that are available
40 * for a newly created coroutine.
41 *
42 * @param context the context of the coroutine. The default value is an event loop on the current thread.
43 * @param block the coroutine code.
44 */
runBlockingnull45 public actual fun <T> runBlocking(context: CoroutineContext, block: suspend CoroutineScope.() -> T): T {
46 contract {
47 callsInPlace(block, InvocationKind.EXACTLY_ONCE)
48 }
49 val contextInterceptor = context[ContinuationInterceptor]
50 val eventLoop: EventLoop?
51 val newContext: CoroutineContext
52 if (contextInterceptor == null) {
53 // create or use private event loop if no dispatcher is specified
54 eventLoop = ThreadLocalEventLoop.eventLoop
55 newContext = GlobalScope.newCoroutineContext(context + eventLoop)
56 } else {
57 // See if context's interceptor is an event loop that we shall use (to support TestContext)
58 // or take an existing thread-local event loop if present to avoid blocking it (but don't create one)
59 eventLoop = (contextInterceptor as? EventLoop)?.takeIf { it.shouldBeProcessedFromContext() }
60 ?: ThreadLocalEventLoop.currentOrNull()
61 newContext = GlobalScope.newCoroutineContext(context)
62 }
63 val coroutine = BlockingCoroutine<T>(newContext, eventLoop)
64 var completed = false
65 ThreadLocalKeepAlive.addCheck { !completed }
66 try {
67 @Suppress("LEAKED_IN_PLACE_LAMBDA") // Contract is preserved, invoked immediately or throws
68 coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
69 return coroutine.joinBlocking()
70 } finally {
71 completed = true
72 }
73 }
74
75 @ThreadLocal
76 private object ThreadLocalKeepAlive {
77 /** If any of these checks passes, this means this [Worker] is still used. */
78 private var checks = mutableListOf<() -> Boolean>()
79
80 /** Whether the worker currently tries to keep itself alive. */
81 private var keepAliveLoopActive = false
82
83 /** Adds another stopgap that must be passed before the [Worker] can be terminated. */
addChecknull84 fun addCheck(terminationForbidden: () -> Boolean) {
85 checks.add(terminationForbidden)
86 if (!keepAliveLoopActive) keepAlive()
87 }
88
89 /**
90 * Send a ping to the worker to prevent it from terminating while this coroutine is running,
91 * ensuring that continuations don't get dropped and forgotten.
92 */
keepAlivenull93 private fun keepAlive() {
94 // only keep the checks that still forbid the termination
95 checks = checks.filter { it() }.toMutableList()
96 // if there are no checks left, we no longer keep the worker alive, it can be terminated
97 keepAliveLoopActive = checks.isNotEmpty()
98 if (keepAliveLoopActive) {
99 Worker.current.executeAfter(afterMicroseconds = 100_000) {
100 keepAlive()
101 }
102 }
103 }
104 }
105
106 private class BlockingCoroutine<T>(
107 parentContext: CoroutineContext,
108 private val eventLoop: EventLoop?
109 ) : AbstractCoroutine<T>(parentContext, true, true) {
110 private val joinWorker = Worker.current
111
112 override val isScopedCoroutine: Boolean get() = true
113
afterCompletionnull114 override fun afterCompletion(state: Any?) {
115 // wake up blocked thread
116 if (joinWorker != Worker.current) {
117 // Unpark waiting worker
118 joinWorker.executeAfter(0L, {}) // send an empty task to unpark the waiting event loop
119 }
120 }
121
122 @Suppress("UNCHECKED_CAST")
joinBlockingnull123 fun joinBlocking(): T {
124 try {
125 eventLoop?.incrementUseCount()
126 while (true) {
127 var parkNanos: Long
128 // Workaround for bug in BE optimizer that cannot eliminate boxing here
129 if (eventLoop != null) {
130 parkNanos = eventLoop.processNextEvent()
131 } else {
132 parkNanos = Long.MAX_VALUE
133 }
134 // note: processNextEvent may lose unpark flag, so check if completed before parking
135 if (isCompleted) break
136 joinWorker.park(parkNanos / 1000L, true)
137 }
138 } finally { // paranoia
139 eventLoop?.decrementUseCount()
140 }
141 // now return result
142 val state = state.unboxState()
143 (state as? CompletedExceptionally)?.let { throw it.cause }
144 return state as T
145 }
146 }
147