1 /*
2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 @file:OptIn(ExperimentalContracts::class)
6 package kotlinx.coroutines
7
8 import kotlinx.cinterop.*
9 import platform.posix.*
10 import kotlin.contracts.*
11 import kotlin.coroutines.*
12 import kotlin.native.concurrent.*
13
14 /**
15 * Runs new coroutine and **blocks** current thread _interruptibly_ until its completion.
16 * This function should not be used from coroutine. It is designed to bridge regular blocking code
17 * to libraries that are written in suspending style, to be used in `main` functions and in tests.
18 *
19 * The default [CoroutineDispatcher] for this builder in an implementation of [EventLoop] that processes continuations
20 * in this blocked thread until the completion of this coroutine.
21 * See [CoroutineDispatcher] for the other implementations that are provided by `kotlinx.coroutines`.
22 *
23 * When [CoroutineDispatcher] is explicitly specified in the [context], then the new coroutine runs in the context of
24 * the specified dispatcher while the current thread is blocked. If the specified dispatcher implements [EventLoop]
25 * interface and this `runBlocking` invocation is performed from inside of the this event loop's thread, then
26 * this event loop is processed using its [processNextEvent][EventLoop.processNextEvent] method until coroutine completes.
27 *
28 * If this blocked thread is interrupted (see [Thread.interrupt]), then the coroutine job is cancelled and
29 * this `runBlocking` invocation throws [InterruptedException].
30 *
31 * See [newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine.
32 *
33 * @param context context of the coroutine. The default value is an implementation of [EventLoop].
34 * @param block the coroutine code.
35 */
runBlockingnull36 public actual fun <T> runBlocking(context: CoroutineContext, block: suspend CoroutineScope.() -> T): T {
37 contract {
38 callsInPlace(block, InvocationKind.EXACTLY_ONCE)
39 }
40 val contextInterceptor = context[ContinuationInterceptor]
41 val eventLoop: EventLoop?
42 val newContext: CoroutineContext
43 if (contextInterceptor == null) {
44 // create or use private event loop if no dispatcher is specified
45 eventLoop = ThreadLocalEventLoop.eventLoop
46 newContext = GlobalScope.newCoroutineContext(context + eventLoop)
47 } else {
48 // See if context's interceptor is an event loop that we shall use (to support TestContext)
49 // or take an existing thread-local event loop if present to avoid blocking it (but don't create one)
50 eventLoop = (contextInterceptor as? EventLoop)?.takeIf { it.shouldBeProcessedFromContext() }
51 ?: ThreadLocalEventLoop.currentOrNull()
52 newContext = GlobalScope.newCoroutineContext(context)
53 }
54 val coroutine = BlockingCoroutine<T>(newContext, eventLoop)
55 coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
56 return coroutine.joinBlocking()
57 }
58
59 private class BlockingCoroutine<T>(
60 parentContext: CoroutineContext,
61 private val eventLoop: EventLoop?
62 ) : AbstractCoroutine<T>(parentContext, true, true) {
63 private val joinWorker = Worker.current
64
65 override val isScopedCoroutine: Boolean get() = true
66
afterCompletionnull67 override fun afterCompletion(state: Any?) {
68 // wake up blocked thread
69 if (joinWorker != Worker.current) {
70 // Unpark waiting worker
71 joinWorker.executeAfter(0L, {}) // send an empty task to unpark the waiting event loop
72 }
73 }
74
75 @Suppress("UNCHECKED_CAST")
joinBlockingnull76 fun joinBlocking(): T {
77 try {
78 eventLoop?.incrementUseCount()
79 while (true) {
80 var parkNanos: Long
81 // Workaround for bug in BE optimizer that cannot eliminate boxing here
82 if (eventLoop != null) {
83 parkNanos = eventLoop.processNextEvent()
84 } else {
85 parkNanos = Long.MAX_VALUE
86 }
87 // note: processNextEvent may lose unpark flag, so check if completed before parking
88 if (isCompleted) break
89 joinWorker.park(parkNanos / 1000L, true)
90 }
91 } finally { // paranoia
92 eventLoop?.decrementUseCount()
93 }
94 // now return result
95 val state = state.unboxState()
96 (state as? CompletedExceptionally)?.let { throw it.cause }
97 return state as T
98 }
99 }
100