• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines
6 
7 import kotlinx.coroutines.channels.*
8 import kotlinx.coroutines.internal.*
9 import kotlin.coroutines.*
10 import kotlin.native.concurrent.*
11 
12 @ExperimentalCoroutinesApi
13 public actual fun newSingleThreadContext(name: String): CloseableCoroutineDispatcher {
14     if (!multithreadingSupported) throw IllegalStateException("This API is only supported for experimental K/N memory model")
15     return WorkerDispatcher(name)
16 }
17 
newFixedThreadPoolContextnull18 public actual fun newFixedThreadPoolContext(nThreads: Int, name: String): CloseableCoroutineDispatcher {
19     if (!multithreadingSupported) throw IllegalStateException("This API is only supported for experimental K/N memory model")
20     require(nThreads >= 1) { "Expected at least one thread, but got: $nThreads"}
21     return MultiWorkerDispatcher(name, nThreads)
22 }
23 
24 internal class WorkerDispatcher(name: String) : CloseableCoroutineDispatcher(), Delay {
25     private val worker = Worker.start(name = name)
26 
dispatchnull27     override fun dispatch(context: CoroutineContext, block: Runnable) {
28         worker.executeAfter(0L) { block.run() }
29     }
30 
scheduleResumeAfterDelaynull31     override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
32         worker.executeAfter(timeMillis.toMicrosSafe()) {
33             with(continuation) { resumeUndispatched(Unit) }
34         }
35     }
36 
invokeOnTimeoutnull37     override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
38         // Workers don't have an API to cancel sent "executeAfter" block, but we are trying
39         // to control the damage and reduce reachable objects by nulling out `block`
40         // that may retain a lot of references, and leaving only an empty shell after a timely disposal
41         // This is a class and not an object with `block` in a closure because that would defeat the purpose.
42         class DisposableBlock(block: Runnable) : DisposableHandle, Function0<Unit> {
43             private val disposableHolder = AtomicReference<Runnable?>(block)
44 
45             override fun invoke() {
46                 disposableHolder.value?.run()
47             }
48 
49             override fun dispose() {
50                 disposableHolder.value = null
51             }
52         }
53 
54         val disposableBlock = DisposableBlock(block)
55         worker.executeAfter(timeMillis.toMicrosSafe(), disposableBlock)
56         return disposableBlock
57     }
58 
closenull59     override fun close() {
60         worker.requestTermination().result // Note: calling "result" blocks
61     }
62 
toMicrosSafenull63     private fun Long.toMicrosSafe(): Long {
64         val result = this * 1000
65         return if (result > this) result else Long.MAX_VALUE
66     }
67 }
68 
69 private class MultiWorkerDispatcher(name: String, workersCount: Int) : CloseableCoroutineDispatcher() {
70     private val tasksQueue = Channel<Runnable>(Channel.UNLIMITED)
<lambda>null71     private val workers = Array(workersCount) { Worker.start(name = "$name-$it") }
72 
73     init {
<lambda>null74         workers.forEach { w -> w.executeAfter(0L) { workerRunLoop() } }
75     }
76 
<lambda>null77     private fun workerRunLoop() = runBlocking {
78         for (task in tasksQueue) {
79             // TODO error handling
80             task.run()
81         }
82     }
83 
dispatchnull84     override fun dispatch(context: CoroutineContext, block: Runnable) {
85         // TODO handle rejections
86         tasksQueue.trySend(block)
87     }
88 
closenull89     override fun close() {
90         tasksQueue.close()
91         workers.forEach { it.requestTermination().result }
92     }
93 }
94