• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 @file:OptIn(ExperimentalContracts::class)
6 package kotlinx.coroutines
7 
8 import kotlinx.cinterop.*
9 import platform.posix.*
10 import kotlin.contracts.*
11 import kotlin.coroutines.*
12 import kotlin.native.concurrent.*
13 
14 /**
15  * Runs new coroutine and **blocks** current thread _interruptibly_ until its completion.
16  * This function should not be used from coroutine. It is designed to bridge regular blocking code
17  * to libraries that are written in suspending style, to be used in `main` functions and in tests.
18  *
19  * The default [CoroutineDispatcher] for this builder in an implementation of [EventLoop] that processes continuations
20  * in this blocked thread until the completion of this coroutine.
21  * See [CoroutineDispatcher] for the other implementations that are provided by `kotlinx.coroutines`.
22  *
23  * When [CoroutineDispatcher] is explicitly specified in the [context], then the new coroutine runs in the context of
24  * the specified dispatcher while the current thread is blocked. If the specified dispatcher implements [EventLoop]
25  * interface and this `runBlocking` invocation is performed from inside of the this event loop's thread, then
26  * this event loop is processed using its [processNextEvent][EventLoop.processNextEvent] method until coroutine completes.
27  *
28  * If this blocked thread is interrupted (see [Thread.interrupt]), then the coroutine job is cancelled and
29  * this `runBlocking` invocation throws [InterruptedException].
30  *
31  * See [newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine.
32  *
33  * @param context context of the coroutine. The default value is an implementation of [EventLoop].
34  * @param block the coroutine code.
35  */
runBlockingnull36 public actual fun <T> runBlocking(context: CoroutineContext, block: suspend CoroutineScope.() -> T): T {
37     contract {
38         callsInPlace(block, InvocationKind.EXACTLY_ONCE)
39     }
40     val contextInterceptor = context[ContinuationInterceptor]
41     val eventLoop: EventLoop?
42     val newContext: CoroutineContext
43     if (contextInterceptor == null) {
44         // create or use private event loop if no dispatcher is specified
45         eventLoop = ThreadLocalEventLoop.eventLoop
46         newContext = GlobalScope.newCoroutineContext(context + eventLoop)
47     } else {
48         // See if context's interceptor is an event loop that we shall use (to support TestContext)
49         // or take an existing thread-local event loop if present to avoid blocking it (but don't create one)
50         eventLoop = (contextInterceptor as? EventLoop)?.takeIf { it.shouldBeProcessedFromContext() }
51             ?: ThreadLocalEventLoop.currentOrNull()
52         newContext = GlobalScope.newCoroutineContext(context)
53     }
54     val coroutine = BlockingCoroutine<T>(newContext, eventLoop)
55     var completed = false
56     ThreadLocalKeepAlive.addCheck { !completed }
57     try {
58         coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
59         return coroutine.joinBlocking()
60     } finally {
61         completed = true
62     }
63 }
64 
65 @ThreadLocal
66 private object ThreadLocalKeepAlive {
67     /** If any of these checks passes, this means this [Worker] is still used. */
68     private var checks = mutableListOf<() -> Boolean>()
69 
70     /** Whether the worker currently tries to keep itself alive. */
71     private var keepAliveLoopActive = false
72 
73     /** Adds another stopgap that must be passed before the [Worker] can be terminated. */
addChecknull74     fun addCheck(terminationForbidden: () -> Boolean) {
75         checks.add(terminationForbidden)
76         if (!keepAliveLoopActive) keepAlive()
77     }
78 
79     /**
80      * Send a ping to the worker to prevent it from terminating while this coroutine is running,
81      * ensuring that continuations don't get dropped and forgotten.
82      */
keepAlivenull83     private fun keepAlive() {
84         // only keep the checks that still forbid the termination
85         checks = checks.filter { it() }.toMutableList()
86         // if there are no checks left, we no longer keep the worker alive, it can be terminated
87         keepAliveLoopActive = checks.isNotEmpty()
88         if (keepAliveLoopActive) {
89             Worker.current.executeAfter(afterMicroseconds = 100_000) {
90                 keepAlive()
91             }
92         }
93     }
94 }
95 
96 private class BlockingCoroutine<T>(
97     parentContext: CoroutineContext,
98     private val eventLoop: EventLoop?
99 ) : AbstractCoroutine<T>(parentContext, true, true) {
100     private val joinWorker = Worker.current
101 
102     override val isScopedCoroutine: Boolean get() = true
103 
afterCompletionnull104     override fun afterCompletion(state: Any?) {
105         // wake up blocked thread
106         if (joinWorker != Worker.current) {
107             // Unpark waiting worker
108             joinWorker.executeAfter(0L, {}) // send an empty task to unpark the waiting event loop
109         }
110     }
111 
112     @Suppress("UNCHECKED_CAST")
joinBlockingnull113     fun joinBlocking(): T {
114         try {
115             eventLoop?.incrementUseCount()
116             while (true) {
117                 var parkNanos: Long
118                 // Workaround for bug in BE optimizer that cannot eliminate boxing here
119                 if (eventLoop != null) {
120                     parkNanos = eventLoop.processNextEvent()
121                 } else {
122                     parkNanos = Long.MAX_VALUE
123                 }
124                 // note: processNextEvent may lose unpark flag, so check if completed before parking
125                 if (isCompleted) break
126                 joinWorker.park(parkNanos / 1000L, true)
127             }
128         } finally { // paranoia
129             eventLoop?.decrementUseCount()
130         }
131         // now return result
132         val state = state.unboxState()
133         (state as? CompletedExceptionally)?.let { throw it.cause }
134         return state as T
135     }
136 }
137