• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines
6 
7 import kotlinx.atomicfu.*
8 import kotlinx.coroutines.channels.*
9 import kotlinx.coroutines.internal.*
10 import kotlin.coroutines.*
11 import kotlin.native.concurrent.*
12 import kotlin.time.*
13 import kotlin.time.Duration.Companion.milliseconds
14 
newFixedThreadPoolContextnull15 public actual fun newFixedThreadPoolContext(nThreads: Int, name: String): CloseableCoroutineDispatcher {
16     require(nThreads >= 1) { "Expected at least one thread, but got: $nThreads" }
17     return MultiWorkerDispatcher(name, nThreads)
18 }
19 
20 @OptIn(ExperimentalTime::class)
21 internal class WorkerDispatcher(name: String) : CloseableCoroutineDispatcher(), Delay {
22     private val worker = Worker.start(name = name)
23 
dispatchnull24     override fun dispatch(context: CoroutineContext, block: Runnable) {
25         worker.executeAfter(0L) { block.run() }
26     }
27 
scheduleResumeAfterDelaynull28     override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
29         val handle = schedule(timeMillis, Runnable {
30             with(continuation) { resumeUndispatched(Unit) }
31         })
32         continuation.disposeOnCancellation(handle)
33     }
34 
invokeOnTimeoutnull35     override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle =
36         schedule(timeMillis, block)
37 
38     private fun schedule(timeMillis: Long, block: Runnable): DisposableHandle {
39         // Workers don't have an API to cancel sent "executeAfter" block, but we are trying
40         // to control the damage and reduce reachable objects by nulling out `block`
41         // that may retain a lot of references, and leaving only an empty shell after a timely disposal
42         // This is a class and not an object with `block` in a closure because that would defeat the purpose.
43         class DisposableBlock(block: Runnable) : DisposableHandle, Function0<Unit> {
44             private val disposableHolder = AtomicReference<Runnable?>(block)
45 
46             override fun invoke() {
47                 disposableHolder.value?.run()
48             }
49 
50             override fun dispose() {
51                 disposableHolder.value = null
52             }
53 
54             fun isDisposed() = disposableHolder.value == null
55         }
56 
57         fun Worker.runAfterDelay(block: DisposableBlock, targetMoment: TimeMark) {
58             if (block.isDisposed()) return
59             val durationUntilTarget = -targetMoment.elapsedNow()
60             val quantum = 100.milliseconds
61             if (durationUntilTarget > quantum) {
62                 executeAfter(quantum.inWholeMicroseconds) { runAfterDelay(block, targetMoment) }
63             } else {
64                 executeAfter(maxOf(0, durationUntilTarget.inWholeMicroseconds), block)
65             }
66         }
67 
68         val disposableBlock = DisposableBlock(block)
69         val targetMoment = TimeSource.Monotonic.markNow() + timeMillis.milliseconds
70         worker.runAfterDelay(disposableBlock, targetMoment)
71         return disposableBlock
72     }
73 
closenull74     override fun close() {
75         worker.requestTermination().result // Note: calling "result" blocks
76     }
77 }
78 
79 private class MultiWorkerDispatcher(
80     private val name: String,
81     workersCount: Int
82 ) : CloseableCoroutineDispatcher() {
83     private val tasksQueue = Channel<Runnable>(Channel.UNLIMITED)
84     private val availableWorkers = Channel<CancellableContinuation<Runnable>>(Channel.UNLIMITED)
<lambda>null85     private val workerPool = OnDemandAllocatingPool(workersCount) {
86         Worker.start(name = "$name-$it").apply {
87             executeAfter { workerRunLoop() }
88         }
89     }
90 
91     /**
92      * (number of tasks - number of workers) * 2 + (1 if closed)
93      */
94     private val tasksAndWorkersCounter = atomic(0L)
95 
isClosednull96     private inline fun Long.isClosed() = this and 1L == 1L
97     private inline fun Long.hasTasks() = this >= 2
98     private inline fun Long.hasWorkers() = this < 0
99 
100     private fun workerRunLoop() = runBlocking {
101         while (true) {
102             val state = tasksAndWorkersCounter.getAndUpdate {
103                 if (it.isClosed() && !it.hasTasks()) return@runBlocking
104                 it - 2
105             }
106             if (state.hasTasks()) {
107                 // we promised to process a task, and there are some
108                 tasksQueue.receive().run()
109             } else {
110                 try {
111                     suspendCancellableCoroutine {
112                         val result = availableWorkers.trySend(it)
113                         checkChannelResult(result)
114                     }.run()
115                 } catch (e: CancellationException) {
116                     /** we are cancelled from [close] and thus will never get back to this branch of code,
117                     but there may still be pending work, so we can't just exit here. */
118                 }
119             }
120         }
121     }
122 
123     // a worker that promised to be here and should actually arrive, so we wait for it in a blocking manner.
obtainWorkernull124     private fun obtainWorker(): CancellableContinuation<Runnable> =
125         availableWorkers.tryReceive().getOrNull() ?: runBlocking { availableWorkers.receive() }
126 
dispatchnull127     override fun dispatch(context: CoroutineContext, block: Runnable) {
128         val state = tasksAndWorkersCounter.getAndUpdate {
129             if (it.isClosed())
130                 throw IllegalStateException("Dispatcher $name was closed, attempted to schedule: $block")
131             it + 2
132         }
133         if (state.hasWorkers()) {
134             // there are workers that have nothing to do, let's grab one of them
135             obtainWorker().resume(block)
136         } else {
137             workerPool.allocate()
138             // no workers are available, we must queue the task
139             val result = tasksQueue.trySend(block)
140             checkChannelResult(result)
141         }
142     }
143 
closenull144     override fun close() {
145         tasksAndWorkersCounter.getAndUpdate { if (it.isClosed()) it else it or 1L }
146         val workers = workerPool.close() // no new workers will be created
147         while (true) {
148             // check if there are workers that await tasks in their personal channels, we need to wake them up
149             val state = tasksAndWorkersCounter.getAndUpdate {
150                 if (it.hasWorkers()) it + 2 else it
151             }
152             if (!state.hasWorkers())
153                 break
154             obtainWorker().cancel()
155         }
156         /*
157          * Here we cannot avoid waiting on `.result`, otherwise it will lead
158          * to a native memory leak, including a pthread handle.
159          */
160         val requests = workers.map { it.requestTermination() }
161         requests.map { it.result }
162     }
163 
checkChannelResultnull164     private fun checkChannelResult(result: ChannelResult<*>) {
165         if (!result.isSuccess)
166             throw IllegalStateException(
167                 "Internal invariants of $this were violated, please file a bug to kotlinx.coroutines",
168                 result.exceptionOrNull()
169             )
170     }
171 }
172