1 /*
2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 @file:JvmMultifileClass
6 @file:JvmName("BuildersKt")
7 @file:OptIn(ExperimentalContracts::class)
8
9 package kotlinx.coroutines
10
11 import java.util.concurrent.locks.*
12 import kotlin.contracts.*
13 import kotlin.coroutines.*
14
15 /**
16 * Runs a new coroutine and **blocks** the current thread _interruptibly_ until its completion.
17 * This function should not be used from a coroutine. It is designed to bridge regular blocking code
18 * to libraries that are written in suspending style, to be used in `main` functions and in tests.
19 *
20 * The default [CoroutineDispatcher] for this builder is an internal implementation of event loop that processes continuations
21 * in this blocked thread until the completion of this coroutine.
22 * See [CoroutineDispatcher] for the other implementations that are provided by `kotlinx.coroutines`.
23 *
24 * When [CoroutineDispatcher] is explicitly specified in the [context], then the new coroutine runs in the context of
25 * the specified dispatcher while the current thread is blocked. If the specified dispatcher is an event loop of another `runBlocking`,
26 * then this invocation uses the outer event loop.
27 *
28 * If this blocked thread is interrupted (see [Thread.interrupt]), then the coroutine job is cancelled and
29 * this `runBlocking` invocation throws [InterruptedException].
30 *
31 * See [newCoroutineContext][CoroutineScope.newCoroutineContext] for a description of debugging facilities that are available
32 * for a newly created coroutine.
33 *
34 * @param context the context of the coroutine. The default value is an event loop on the current thread.
35 * @param block the coroutine code.
36 */
37 @Throws(InterruptedException::class)
runBlockingnull38 public actual fun <T> runBlocking(context: CoroutineContext, block: suspend CoroutineScope.() -> T): T {
39 contract {
40 callsInPlace(block, InvocationKind.EXACTLY_ONCE)
41 }
42 val currentThread = Thread.currentThread()
43 val contextInterceptor = context[ContinuationInterceptor]
44 val eventLoop: EventLoop?
45 val newContext: CoroutineContext
46 if (contextInterceptor == null) {
47 // create or use private event loop if no dispatcher is specified
48 eventLoop = ThreadLocalEventLoop.eventLoop
49 newContext = GlobalScope.newCoroutineContext(context + eventLoop)
50 } else {
51 // See if context's interceptor is an event loop that we shall use (to support TestContext)
52 // or take an existing thread-local event loop if present to avoid blocking it (but don't create one)
53 eventLoop = (contextInterceptor as? EventLoop)?.takeIf { it.shouldBeProcessedFromContext() }
54 ?: ThreadLocalEventLoop.currentOrNull()
55 newContext = GlobalScope.newCoroutineContext(context)
56 }
57 val coroutine = BlockingCoroutine<T>(newContext, currentThread, eventLoop)
58 coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
59 return coroutine.joinBlocking()
60 }
61
62 private class BlockingCoroutine<T>(
63 parentContext: CoroutineContext,
64 private val blockedThread: Thread,
65 private val eventLoop: EventLoop?
66 ) : AbstractCoroutine<T>(parentContext, true, true) {
67
68 override val isScopedCoroutine: Boolean get() = true
69
afterCompletionnull70 override fun afterCompletion(state: Any?) {
71 // wake up blocked thread
72 if (Thread.currentThread() != blockedThread)
73 unpark(blockedThread)
74 }
75
76 @Suppress("UNCHECKED_CAST")
joinBlockingnull77 fun joinBlocking(): T {
78 registerTimeLoopThread()
79 try {
80 eventLoop?.incrementUseCount()
81 try {
82 while (true) {
83 @Suppress("DEPRECATION")
84 if (Thread.interrupted()) throw InterruptedException().also { cancelCoroutine(it) }
85 val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE
86 // note: process next even may loose unpark flag, so check if completed before parking
87 if (isCompleted) break
88 parkNanos(this, parkNanos)
89 }
90 } finally { // paranoia
91 eventLoop?.decrementUseCount()
92 }
93 } finally { // paranoia
94 unregisterTimeLoopThread()
95 }
96 // now return result
97 val state = this.state.unboxState()
98 (state as? CompletedExceptionally)?.let { throw it.cause }
99 return state as T
100 }
101 }
102