• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 @file:JvmMultifileClass
6 @file:JvmName("BuildersKt")
7 @file:OptIn(ExperimentalContracts::class)
8 
9 package kotlinx.coroutines
10 
11 import java.util.concurrent.locks.*
12 import kotlin.contracts.*
13 import kotlin.coroutines.*
14 
15 /**
16  * Runs a new coroutine and **blocks** the current thread _interruptibly_ until its completion.
17  * This function should not be used from a coroutine. It is designed to bridge regular blocking code
18  * to libraries that are written in suspending style, to be used in `main` functions and in tests.
19  *
20  * The default [CoroutineDispatcher] for this builder is an internal implementation of event loop that processes continuations
21  * in this blocked thread until the completion of this coroutine.
22  * See [CoroutineDispatcher] for the other implementations that are provided by `kotlinx.coroutines`.
23  *
24  * When [CoroutineDispatcher] is explicitly specified in the [context], then the new coroutine runs in the context of
25  * the specified dispatcher while the current thread is blocked. If the specified dispatcher is an event loop of another `runBlocking`,
26  * then this invocation uses the outer event loop.
27  *
28  * If this blocked thread is interrupted (see [Thread.interrupt]), then the coroutine job is cancelled and
29  * this `runBlocking` invocation throws [InterruptedException].
30  *
31  * See [newCoroutineContext][CoroutineScope.newCoroutineContext] for a description of debugging facilities that are available
32  * for a newly created coroutine.
33  *
34  * @param context the context of the coroutine. The default value is an event loop on the current thread.
35  * @param block the coroutine code.
36  */
37 @Throws(InterruptedException::class)
runBlockingnull38 public actual fun <T> runBlocking(context: CoroutineContext, block: suspend CoroutineScope.() -> T): T {
39     contract {
40         callsInPlace(block, InvocationKind.EXACTLY_ONCE)
41     }
42     val currentThread = Thread.currentThread()
43     val contextInterceptor = context[ContinuationInterceptor]
44     val eventLoop: EventLoop?
45     val newContext: CoroutineContext
46     if (contextInterceptor == null) {
47         // create or use private event loop if no dispatcher is specified
48         eventLoop = ThreadLocalEventLoop.eventLoop
49         newContext = GlobalScope.newCoroutineContext(context + eventLoop)
50     } else {
51         // See if context's interceptor is an event loop that we shall use (to support TestContext)
52         // or take an existing thread-local event loop if present to avoid blocking it (but don't create one)
53         eventLoop = (contextInterceptor as? EventLoop)?.takeIf { it.shouldBeProcessedFromContext() }
54             ?: ThreadLocalEventLoop.currentOrNull()
55         newContext = GlobalScope.newCoroutineContext(context)
56     }
57     val coroutine = BlockingCoroutine<T>(newContext, currentThread, eventLoop)
58     coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
59     return coroutine.joinBlocking()
60 }
61 
62 private class BlockingCoroutine<T>(
63     parentContext: CoroutineContext,
64     private val blockedThread: Thread,
65     private val eventLoop: EventLoop?
66 ) : AbstractCoroutine<T>(parentContext, true, true) {
67 
68     override val isScopedCoroutine: Boolean get() = true
69 
afterCompletionnull70     override fun afterCompletion(state: Any?) {
71         // wake up blocked thread
72         if (Thread.currentThread() != blockedThread)
73             unpark(blockedThread)
74     }
75 
76     @Suppress("UNCHECKED_CAST")
joinBlockingnull77     fun joinBlocking(): T {
78         registerTimeLoopThread()
79         try {
80             eventLoop?.incrementUseCount()
81             try {
82                 while (true) {
83                     @Suppress("DEPRECATION")
84                     if (Thread.interrupted()) throw InterruptedException().also { cancelCoroutine(it) }
85                     val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE
86                     // note: process next even may loose unpark flag, so check if completed before parking
87                     if (isCompleted) break
88                     parkNanos(this, parkNanos)
89                 }
90             } finally { // paranoia
91                 eventLoop?.decrementUseCount()
92             }
93         } finally { // paranoia
94             unregisterTimeLoopThread()
95         }
96         // now return result
97         val state = this.state.unboxState()
98         (state as? CompletedExceptionally)?.let { throw it.cause }
99         return state as T
100     }
101 }
102