/* * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ @file:OptIn(ExperimentalContracts::class) package kotlinx.coroutines import kotlinx.cinterop.* import platform.posix.* import kotlin.contracts.* import kotlin.coroutines.* import kotlin.native.concurrent.* /** * Runs new coroutine and **blocks** current thread _interruptibly_ until its completion. * This function should not be used from coroutine. It is designed to bridge regular blocking code * to libraries that are written in suspending style, to be used in `main` functions and in tests. * * The default [CoroutineDispatcher] for this builder in an implementation of [EventLoop] that processes continuations * in this blocked thread until the completion of this coroutine. * See [CoroutineDispatcher] for the other implementations that are provided by `kotlinx.coroutines`. * * When [CoroutineDispatcher] is explicitly specified in the [context], then the new coroutine runs in the context of * the specified dispatcher while the current thread is blocked. If the specified dispatcher implements [EventLoop] * interface and this `runBlocking` invocation is performed from inside of the this event loop's thread, then * this event loop is processed using its [processNextEvent][EventLoop.processNextEvent] method until coroutine completes. * * If this blocked thread is interrupted (see [Thread.interrupt]), then the coroutine job is cancelled and * this `runBlocking` invocation throws [InterruptedException]. * * See [newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine. * * @param context context of the coroutine. The default value is an implementation of [EventLoop]. * @param block the coroutine code. */ public actual fun runBlocking(context: CoroutineContext, block: suspend CoroutineScope.() -> T): T { contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) } val contextInterceptor = context[ContinuationInterceptor] val eventLoop: EventLoop? val newContext: CoroutineContext if (contextInterceptor == null) { // create or use private event loop if no dispatcher is specified eventLoop = ThreadLocalEventLoop.eventLoop newContext = GlobalScope.newCoroutineContext(context + eventLoop) } else { // See if context's interceptor is an event loop that we shall use (to support TestContext) // or take an existing thread-local event loop if present to avoid blocking it (but don't create one) eventLoop = (contextInterceptor as? EventLoop)?.takeIf { it.shouldBeProcessedFromContext() } ?: ThreadLocalEventLoop.currentOrNull() newContext = GlobalScope.newCoroutineContext(context) } val coroutine = BlockingCoroutine(newContext, eventLoop) var completed = false ThreadLocalKeepAlive.addCheck { !completed } try { coroutine.start(CoroutineStart.DEFAULT, coroutine, block) return coroutine.joinBlocking() } finally { completed = true } } @ThreadLocal private object ThreadLocalKeepAlive { /** If any of these checks passes, this means this [Worker] is still used. */ private var checks = mutableListOf<() -> Boolean>() /** Whether the worker currently tries to keep itself alive. */ private var keepAliveLoopActive = false /** Adds another stopgap that must be passed before the [Worker] can be terminated. */ fun addCheck(terminationForbidden: () -> Boolean) { checks.add(terminationForbidden) if (!keepAliveLoopActive) keepAlive() } /** * Send a ping to the worker to prevent it from terminating while this coroutine is running, * ensuring that continuations don't get dropped and forgotten. */ private fun keepAlive() { // only keep the checks that still forbid the termination checks = checks.filter { it() }.toMutableList() // if there are no checks left, we no longer keep the worker alive, it can be terminated keepAliveLoopActive = checks.isNotEmpty() if (keepAliveLoopActive) { Worker.current.executeAfter(afterMicroseconds = 100_000) { keepAlive() } } } } private class BlockingCoroutine( parentContext: CoroutineContext, private val eventLoop: EventLoop? ) : AbstractCoroutine(parentContext, true, true) { private val joinWorker = Worker.current override val isScopedCoroutine: Boolean get() = true override fun afterCompletion(state: Any?) { // wake up blocked thread if (joinWorker != Worker.current) { // Unpark waiting worker joinWorker.executeAfter(0L, {}) // send an empty task to unpark the waiting event loop } } @Suppress("UNCHECKED_CAST") fun joinBlocking(): T { try { eventLoop?.incrementUseCount() while (true) { var parkNanos: Long // Workaround for bug in BE optimizer that cannot eliminate boxing here if (eventLoop != null) { parkNanos = eventLoop.processNextEvent() } else { parkNanos = Long.MAX_VALUE } // note: processNextEvent may lose unpark flag, so check if completed before parking if (isCompleted) break joinWorker.park(parkNanos / 1000L, true) } } finally { // paranoia eventLoop?.decrementUseCount() } // now return result val state = state.unboxState() (state as? CompletedExceptionally)?.let { throw it.cause } return state as T } }