1 /*
<lambda>null2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 package kotlinx.coroutines
6
7 import kotlinx.coroutines.channels.*
8 import kotlinx.coroutines.internal.*
9 import kotlin.coroutines.*
10 import kotlin.native.concurrent.*
11
12 @ExperimentalCoroutinesApi
13 public actual fun newSingleThreadContext(name: String): CloseableCoroutineDispatcher {
14 if (!multithreadingSupported) throw IllegalStateException("This API is only supported for experimental K/N memory model")
15 return WorkerDispatcher(name)
16 }
17
newFixedThreadPoolContextnull18 public actual fun newFixedThreadPoolContext(nThreads: Int, name: String): CloseableCoroutineDispatcher {
19 if (!multithreadingSupported) throw IllegalStateException("This API is only supported for experimental K/N memory model")
20 require(nThreads >= 1) { "Expected at least one thread, but got: $nThreads"}
21 return MultiWorkerDispatcher(name, nThreads)
22 }
23
24 internal class WorkerDispatcher(name: String) : CloseableCoroutineDispatcher(), Delay {
25 private val worker = Worker.start(name = name)
26
dispatchnull27 override fun dispatch(context: CoroutineContext, block: Runnable) {
28 worker.executeAfter(0L) { block.run() }
29 }
30
scheduleResumeAfterDelaynull31 override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
32 worker.executeAfter(timeMillis.toMicrosSafe()) {
33 with(continuation) { resumeUndispatched(Unit) }
34 }
35 }
36
invokeOnTimeoutnull37 override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
38 // Workers don't have an API to cancel sent "executeAfter" block, but we are trying
39 // to control the damage and reduce reachable objects by nulling out `block`
40 // that may retain a lot of references, and leaving only an empty shell after a timely disposal
41 // This is a class and not an object with `block` in a closure because that would defeat the purpose.
42 class DisposableBlock(block: Runnable) : DisposableHandle, Function0<Unit> {
43 private val disposableHolder = AtomicReference<Runnable?>(block)
44
45 override fun invoke() {
46 disposableHolder.value?.run()
47 }
48
49 override fun dispose() {
50 disposableHolder.value = null
51 }
52 }
53
54 val disposableBlock = DisposableBlock(block)
55 worker.executeAfter(timeMillis.toMicrosSafe(), disposableBlock)
56 return disposableBlock
57 }
58
closenull59 override fun close() {
60 worker.requestTermination().result // Note: calling "result" blocks
61 }
62
toMicrosSafenull63 private fun Long.toMicrosSafe(): Long {
64 val result = this * 1000
65 return if (result > this) result else Long.MAX_VALUE
66 }
67 }
68
69 private class MultiWorkerDispatcher(name: String, workersCount: Int) : CloseableCoroutineDispatcher() {
70 private val tasksQueue = Channel<Runnable>(Channel.UNLIMITED)
<lambda>null71 private val workers = Array(workersCount) { Worker.start(name = "$name-$it") }
72
73 init {
<lambda>null74 workers.forEach { w -> w.executeAfter(0L) { workerRunLoop() } }
75 }
76
<lambda>null77 private fun workerRunLoop() = runBlocking {
78 for (task in tasksQueue) {
79 // TODO error handling
80 task.run()
81 }
82 }
83
dispatchnull84 override fun dispatch(context: CoroutineContext, block: Runnable) {
85 // TODO handle rejections
86 tasksQueue.trySend(block)
87 }
88
closenull89 override fun close() {
90 tasksQueue.close()
91 workers.forEach { it.requestTermination().result }
92 }
93 }
94