1 /*
2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 package kotlinx.coroutines
6
7 import kotlinx.atomicfu.*
8 import kotlinx.coroutines.channels.*
9 import kotlinx.coroutines.internal.*
10 import kotlin.coroutines.*
11 import kotlin.native.concurrent.*
12 import kotlin.time.*
13 import kotlin.time.Duration.Companion.milliseconds
14
newFixedThreadPoolContextnull15 public actual fun newFixedThreadPoolContext(nThreads: Int, name: String): CloseableCoroutineDispatcher {
16 require(nThreads >= 1) { "Expected at least one thread, but got: $nThreads" }
17 return MultiWorkerDispatcher(name, nThreads)
18 }
19
20 @OptIn(ExperimentalTime::class)
21 internal class WorkerDispatcher(name: String) : CloseableCoroutineDispatcher(), Delay {
22 private val worker = Worker.start(name = name)
23
dispatchnull24 override fun dispatch(context: CoroutineContext, block: Runnable) {
25 worker.executeAfter(0L) { block.run() }
26 }
27
scheduleResumeAfterDelaynull28 override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
29 val handle = schedule(timeMillis, Runnable {
30 with(continuation) { resumeUndispatched(Unit) }
31 })
32 continuation.disposeOnCancellation(handle)
33 }
34
invokeOnTimeoutnull35 override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle =
36 schedule(timeMillis, block)
37
38 private fun schedule(timeMillis: Long, block: Runnable): DisposableHandle {
39 // Workers don't have an API to cancel sent "executeAfter" block, but we are trying
40 // to control the damage and reduce reachable objects by nulling out `block`
41 // that may retain a lot of references, and leaving only an empty shell after a timely disposal
42 // This is a class and not an object with `block` in a closure because that would defeat the purpose.
43 class DisposableBlock(block: Runnable) : DisposableHandle, Function0<Unit> {
44 private val disposableHolder = AtomicReference<Runnable?>(block)
45
46 override fun invoke() {
47 disposableHolder.value?.run()
48 }
49
50 override fun dispose() {
51 disposableHolder.value = null
52 }
53
54 fun isDisposed() = disposableHolder.value == null
55 }
56
57 fun Worker.runAfterDelay(block: DisposableBlock, targetMoment: TimeMark) {
58 if (block.isDisposed()) return
59 val durationUntilTarget = -targetMoment.elapsedNow()
60 val quantum = 100.milliseconds
61 if (durationUntilTarget > quantum) {
62 executeAfter(quantum.inWholeMicroseconds) { runAfterDelay(block, targetMoment) }
63 } else {
64 executeAfter(maxOf(0, durationUntilTarget.inWholeMicroseconds), block)
65 }
66 }
67
68 val disposableBlock = DisposableBlock(block)
69 val targetMoment = TimeSource.Monotonic.markNow() + timeMillis.milliseconds
70 worker.runAfterDelay(disposableBlock, targetMoment)
71 return disposableBlock
72 }
73
closenull74 override fun close() {
75 worker.requestTermination().result // Note: calling "result" blocks
76 }
77 }
78
79 private class MultiWorkerDispatcher(
80 private val name: String,
81 workersCount: Int
82 ) : CloseableCoroutineDispatcher() {
83 private val tasksQueue = Channel<Runnable>(Channel.UNLIMITED)
84 private val availableWorkers = Channel<CancellableContinuation<Runnable>>(Channel.UNLIMITED)
<lambda>null85 private val workerPool = OnDemandAllocatingPool(workersCount) {
86 Worker.start(name = "$name-$it").apply {
87 executeAfter { workerRunLoop() }
88 }
89 }
90
91 /**
92 * (number of tasks - number of workers) * 2 + (1 if closed)
93 */
94 private val tasksAndWorkersCounter = atomic(0L)
95
isClosednull96 private inline fun Long.isClosed() = this and 1L == 1L
97 private inline fun Long.hasTasks() = this >= 2
98 private inline fun Long.hasWorkers() = this < 0
99
100 private fun workerRunLoop() = runBlocking {
101 while (true) {
102 val state = tasksAndWorkersCounter.getAndUpdate {
103 if (it.isClosed() && !it.hasTasks()) return@runBlocking
104 it - 2
105 }
106 if (state.hasTasks()) {
107 // we promised to process a task, and there are some
108 tasksQueue.receive().run()
109 } else {
110 try {
111 suspendCancellableCoroutine {
112 val result = availableWorkers.trySend(it)
113 checkChannelResult(result)
114 }.run()
115 } catch (e: CancellationException) {
116 /** we are cancelled from [close] and thus will never get back to this branch of code,
117 but there may still be pending work, so we can't just exit here. */
118 }
119 }
120 }
121 }
122
123 // a worker that promised to be here and should actually arrive, so we wait for it in a blocking manner.
obtainWorkernull124 private fun obtainWorker(): CancellableContinuation<Runnable> =
125 availableWorkers.tryReceive().getOrNull() ?: runBlocking { availableWorkers.receive() }
126
dispatchnull127 override fun dispatch(context: CoroutineContext, block: Runnable) {
128 val state = tasksAndWorkersCounter.getAndUpdate {
129 if (it.isClosed())
130 throw IllegalStateException("Dispatcher $name was closed, attempted to schedule: $block")
131 it + 2
132 }
133 if (state.hasWorkers()) {
134 // there are workers that have nothing to do, let's grab one of them
135 obtainWorker().resume(block)
136 } else {
137 workerPool.allocate()
138 // no workers are available, we must queue the task
139 val result = tasksQueue.trySend(block)
140 checkChannelResult(result)
141 }
142 }
143
closenull144 override fun close() {
145 tasksAndWorkersCounter.getAndUpdate { if (it.isClosed()) it else it or 1L }
146 val workers = workerPool.close() // no new workers will be created
147 while (true) {
148 // check if there are workers that await tasks in their personal channels, we need to wake them up
149 val state = tasksAndWorkersCounter.getAndUpdate {
150 if (it.hasWorkers()) it + 2 else it
151 }
152 if (!state.hasWorkers())
153 break
154 obtainWorker().cancel()
155 }
156 /*
157 * Here we cannot avoid waiting on `.result`, otherwise it will lead
158 * to a native memory leak, including a pthread handle.
159 */
160 val requests = workers.map { it.requestTermination() }
161 requests.map { it.result }
162 }
163
checkChannelResultnull164 private fun checkChannelResult(result: ChannelResult<*>) {
165 if (!result.isSuccess)
166 throw IllegalStateException(
167 "Internal invariants of $this were violated, please file a bug to kotlinx.coroutines",
168 result.exceptionOrNull()
169 )
170 }
171 }
172