/* * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines import kotlinx.coroutines.channels.* import kotlinx.coroutines.internal.* import kotlin.coroutines.* import kotlin.native.concurrent.* @ExperimentalCoroutinesApi public actual fun newSingleThreadContext(name: String): CloseableCoroutineDispatcher { if (!multithreadingSupported) throw IllegalStateException("This API is only supported for experimental K/N memory model") return WorkerDispatcher(name) } public actual fun newFixedThreadPoolContext(nThreads: Int, name: String): CloseableCoroutineDispatcher { if (!multithreadingSupported) throw IllegalStateException("This API is only supported for experimental K/N memory model") require(nThreads >= 1) { "Expected at least one thread, but got: $nThreads"} return MultiWorkerDispatcher(name, nThreads) } internal class WorkerDispatcher(name: String) : CloseableCoroutineDispatcher(), Delay { private val worker = Worker.start(name = name) override fun dispatch(context: CoroutineContext, block: Runnable) { worker.executeAfter(0L) { block.run() } } override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation) { worker.executeAfter(timeMillis.toMicrosSafe()) { with(continuation) { resumeUndispatched(Unit) } } } override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { // Workers don't have an API to cancel sent "executeAfter" block, but we are trying // to control the damage and reduce reachable objects by nulling out `block` // that may retain a lot of references, and leaving only an empty shell after a timely disposal // This is a class and not an object with `block` in a closure because that would defeat the purpose. class DisposableBlock(block: Runnable) : DisposableHandle, Function0 { private val disposableHolder = AtomicReference(block) override fun invoke() { disposableHolder.value?.run() } override fun dispose() { disposableHolder.value = null } } val disposableBlock = DisposableBlock(block) worker.executeAfter(timeMillis.toMicrosSafe(), disposableBlock) return disposableBlock } override fun close() { worker.requestTermination().result // Note: calling "result" blocks } private fun Long.toMicrosSafe(): Long { val result = this * 1000 return if (result > this) result else Long.MAX_VALUE } } private class MultiWorkerDispatcher(name: String, workersCount: Int) : CloseableCoroutineDispatcher() { private val tasksQueue = Channel(Channel.UNLIMITED) private val workers = Array(workersCount) { Worker.start(name = "$name-$it") } init { workers.forEach { w -> w.executeAfter(0L) { workerRunLoop() } } } private fun workerRunLoop() = runBlocking { for (task in tasksQueue) { // TODO error handling task.run() } } override fun dispatch(context: CoroutineContext, block: Runnable) { // TODO handle rejections tasksQueue.trySend(block) } override fun close() { tasksQueue.close() workers.forEach { it.requestTermination().result } } }