1 /*
<lambda>null2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 package kotlinx.coroutines
6
7 import kotlinx.atomicfu.*
8 import kotlinx.coroutines.internal.*
9 import kotlin.coroutines.*
10 import kotlin.jvm.*
11
12 /**
13 * Extended by [CoroutineDispatcher] implementations that have event loop inside and can
14 * be asked to process next event from their event queue.
15 *
16 * It may optionally implement [Delay] interface and support time-scheduled tasks.
17 * It is created or pigged back onto (see [ThreadLocalEventLoop])
18 * by `runBlocking` and by [Dispatchers.Unconfined].
19 *
20 * @suppress **This an internal API and should not be used from general code.**
21 */
22 internal abstract class EventLoop : CoroutineDispatcher() {
23 /**
24 * Counts the number of nested `runBlocking` and [Dispatchers.Unconfined] that use this event loop.
25 */
26 private var useCount = 0L
27
28 /**
29 * Set to true on any use by `runBlocking`, because it potentially leaks this loop to other threads, so
30 * this instance must be properly shutdown. We don't need to shutdown event loop that was used solely
31 * by [Dispatchers.Unconfined] -- it can be left as [ThreadLocalEventLoop] and reused next time.
32 */
33 private var shared = false
34
35 /**
36 * Queue used by [Dispatchers.Unconfined] tasks.
37 * These tasks are thread-local for performance and take precedence over the rest of the queue.
38 */
39 private var unconfinedQueue: ArrayDeque<DispatchedTask<*>>? = null
40
41 /**
42 * Processes next event in this event loop.
43 *
44 * The result of this function is to be interpreted like this:
45 * * `<= 0` -- there are potentially more events for immediate processing;
46 * * `> 0` -- a number of nanoseconds to wait for next scheduled event;
47 * * [Long.MAX_VALUE] -- no more events.
48 *
49 * **NOTE**: Must be invoked only from the event loop's thread
50 * (no check for performance reasons, may be added in the future).
51 */
52 open fun processNextEvent(): Long {
53 if (!processUnconfinedEvent()) return Long.MAX_VALUE
54 return 0
55 }
56
57 protected open val isEmpty: Boolean get() = isUnconfinedQueueEmpty
58
59 protected open val nextTime: Long
60 get() {
61 val queue = unconfinedQueue ?: return Long.MAX_VALUE
62 return if (queue.isEmpty()) Long.MAX_VALUE else 0L
63 }
64
65 fun processUnconfinedEvent(): Boolean {
66 val queue = unconfinedQueue ?: return false
67 val task = queue.removeFirstOrNull() ?: return false
68 task.run()
69 return true
70 }
71 /**
72 * Returns `true` if the invoking `runBlocking(context) { ... }` that was passed this event loop in its context
73 * parameter should call [processNextEvent] for this event loop (otherwise, it will process thread-local one).
74 * By default, event loop implementation is thread-local and should not processed in the context
75 * (current thread's event loop should be processed instead).
76 */
77 open fun shouldBeProcessedFromContext(): Boolean = false
78
79 /**
80 * Dispatches task whose dispatcher returned `false` from [CoroutineDispatcher.isDispatchNeeded]
81 * into the current event loop.
82 */
83 fun dispatchUnconfined(task: DispatchedTask<*>) {
84 val queue = unconfinedQueue ?:
85 ArrayDeque<DispatchedTask<*>>().also { unconfinedQueue = it }
86 queue.addLast(task)
87 }
88
89 val isActive: Boolean
90 get() = useCount > 0
91
92 val isUnconfinedLoopActive: Boolean
93 get() = useCount >= delta(unconfined = true)
94
95 // May only be used from the event loop's thread
96 val isUnconfinedQueueEmpty: Boolean
97 get() = unconfinedQueue?.isEmpty() ?: true
98
99 private fun delta(unconfined: Boolean) =
100 if (unconfined) (1L shl 32) else 1L
101
102 fun incrementUseCount(unconfined: Boolean = false) {
103 useCount += delta(unconfined)
104 if (!unconfined) shared = true
105 }
106
107 fun decrementUseCount(unconfined: Boolean = false) {
108 useCount -= delta(unconfined)
109 if (useCount > 0) return
110 assert { useCount == 0L } // "Extra decrementUseCount"
111 if (shared) {
112 // shut it down and remove from ThreadLocalEventLoop
113 shutdown()
114 }
115 }
116
117 final override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
118 parallelism.checkParallelism()
119 return this
120 }
121
122 open fun shutdown() {}
123 }
124
125 internal object ThreadLocalEventLoop {
126 private val ref = commonThreadLocal<EventLoop?>(Symbol("ThreadLocalEventLoop"))
127
128 internal val eventLoop: EventLoop
<lambda>null129 get() = ref.get() ?: createEventLoop().also { ref.set(it) }
130
currentOrNullnull131 internal fun currentOrNull(): EventLoop? =
132 ref.get()
133
134 internal fun resetEventLoop() {
135 ref.set(null)
136 }
137
setEventLoopnull138 internal fun setEventLoop(eventLoop: EventLoop) {
139 ref.set(eventLoop)
140 }
141 }
142
143 private val DISPOSED_TASK = Symbol("REMOVED_TASK")
144
145 // results for scheduleImpl
146 private const val SCHEDULE_OK = 0
147 private const val SCHEDULE_COMPLETED = 1
148 private const val SCHEDULE_DISPOSED = 2
149
150 private const val MS_TO_NS = 1_000_000L
151 private const val MAX_MS = Long.MAX_VALUE / MS_TO_NS
152
153 /**
154 * First-line overflow protection -- limit maximal delay.
155 * Delays longer than this one (~146 years) are considered to be delayed "forever".
156 */
157 private const val MAX_DELAY_NS = Long.MAX_VALUE / 2
158
delayToNanosnull159 internal fun delayToNanos(timeMillis: Long): Long = when {
160 timeMillis <= 0 -> 0L
161 timeMillis >= MAX_MS -> Long.MAX_VALUE
162 else -> timeMillis * MS_TO_NS
163 }
164
delayNanosToMillisnull165 internal fun delayNanosToMillis(timeNanos: Long): Long =
166 timeNanos / MS_TO_NS
167
168 private val CLOSED_EMPTY = Symbol("CLOSED_EMPTY")
169
170 private typealias Queue<T> = LockFreeTaskQueueCore<T>
171
172 internal expect abstract class EventLoopImplPlatform() : EventLoop {
173 // Called to unpark this event loop's thread
174 protected fun unpark()
175
176 // Called to reschedule to DefaultExecutor when this event loop is complete
177 protected fun reschedule(now: Long, delayedTask: EventLoopImplBase.DelayedTask)
178 }
179
180 internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
181 // null | CLOSED_EMPTY | task | Queue<Runnable>
182 private val _queue = atomic<Any?>(null)
183
184 // Allocated only only once
185 private val _delayed = atomic<DelayedTaskQueue?>(null)
186
187 private val _isCompleted = atomic(false)
188 private var isCompleted
189 get() = _isCompleted.value
190 set(value) { _isCompleted.value = value }
191
192 override val isEmpty: Boolean get() {
193 if (!isUnconfinedQueueEmpty) return false
194 val delayed = _delayed.value
195 if (delayed != null && !delayed.isEmpty) return false
196 return when (val queue = _queue.value) {
197 null -> true
198 is Queue<*> -> queue.isEmpty
199 else -> queue === CLOSED_EMPTY
200 }
201 }
202
203 override val nextTime: Long
204 get() {
205 if (super.nextTime == 0L) return 0L
206 val queue = _queue.value
207 when {
208 queue === null -> {} // empty queue -- proceed
209 queue is Queue<*> -> if (!queue.isEmpty) return 0 // non-empty queue
210 queue === CLOSED_EMPTY -> return Long.MAX_VALUE // no more events -- closed
211 else -> return 0 // non-empty queue
212 }
213 val nextDelayedTask = _delayed.value?.peek() ?: return Long.MAX_VALUE
214 return (nextDelayedTask.nanoTime - nanoTime()).coerceAtLeast(0)
215 }
216
shutdownnull217 override fun shutdown() {
218 // Clean up thread-local reference here -- this event loop is shutting down
219 ThreadLocalEventLoop.resetEventLoop()
220 // We should signal that this event loop should not accept any more tasks
221 // and process queued events (that could have been added after last processNextEvent)
222 isCompleted = true
223 closeQueue()
224 // complete processing of all queued tasks
225 while (processNextEvent() <= 0) { /* spin */ }
226 // reschedule the rest of delayed tasks
227 rescheduleAllDelayed()
228 }
229
scheduleResumeAfterDelaynull230 override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
231 val timeNanos = delayToNanos(timeMillis)
232 if (timeNanos < MAX_DELAY_NS) {
233 val now = nanoTime()
234 DelayedResumeTask(now + timeNanos, continuation).also { task ->
235 /*
236 * Order is important here: first we schedule the heap and only then
237 * publish it to continuation. Otherwise, `DelayedResumeTask` would
238 * have to know how to be disposed of even when it wasn't scheduled yet.
239 */
240 schedule(now, task)
241 continuation.disposeOnCancellation(task)
242 }
243 }
244 }
245
scheduleInvokeOnTimeoutnull246 protected fun scheduleInvokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
247 val timeNanos = delayToNanos(timeMillis)
248 return if (timeNanos < MAX_DELAY_NS) {
249 val now = nanoTime()
250 DelayedRunnableTask(now + timeNanos, block).also { task ->
251 schedule(now, task)
252 }
253 } else {
254 NonDisposableHandle
255 }
256 }
257
processNextEventnull258 override fun processNextEvent(): Long {
259 // unconfined events take priority
260 if (processUnconfinedEvent()) return 0
261 // queue all delayed tasks that are due to be executed
262 val delayed = _delayed.value
263 if (delayed != null && !delayed.isEmpty) {
264 val now = nanoTime()
265 while (true) {
266 // make sure that moving from delayed to queue removes from delayed only after it is added to queue
267 // to make sure that 'isEmpty' and `nextTime` that check both of them
268 // do not transiently report that both delayed and queue are empty during move
269 delayed.removeFirstIf {
270 if (it.timeToExecute(now)) {
271 enqueueImpl(it)
272 } else
273 false
274 } ?: break // quit loop when nothing more to remove or enqueueImpl returns false on "isComplete"
275 }
276 }
277 // then process one event from queue
278 val task = dequeue()
279 if (task != null) {
280 platformAutoreleasePool { task.run() }
281 return 0
282 }
283 return nextTime
284 }
285
dispatchnull286 final override fun dispatch(context: CoroutineContext, block: Runnable) = enqueue(block)
287
288 open fun enqueue(task: Runnable) {
289 if (enqueueImpl(task)) {
290 // todo: we should unpark only when this delayed task became first in the queue
291 unpark()
292 } else {
293 DefaultExecutor.enqueue(task)
294 }
295 }
296
297 @Suppress("UNCHECKED_CAST")
enqueueImplnull298 private fun enqueueImpl(task: Runnable): Boolean {
299 _queue.loop { queue ->
300 if (isCompleted) return false // fail fast if already completed, may still add, but queues will close
301 when (queue) {
302 null -> if (_queue.compareAndSet(null, task)) return true
303 is Queue<*> -> {
304 when ((queue as Queue<Runnable>).addLast(task)) {
305 Queue.ADD_SUCCESS -> return true
306 Queue.ADD_CLOSED -> return false
307 Queue.ADD_FROZEN -> _queue.compareAndSet(queue, queue.next())
308 }
309 }
310 else -> when {
311 queue === CLOSED_EMPTY -> return false
312 else -> {
313 // update to full-blown queue to add one more
314 val newQueue = Queue<Runnable>(Queue.INITIAL_CAPACITY, singleConsumer = true)
315 newQueue.addLast(queue as Runnable)
316 newQueue.addLast(task)
317 if (_queue.compareAndSet(queue, newQueue)) return true
318 }
319 }
320 }
321 }
322 }
323
324 @Suppress("UNCHECKED_CAST")
dequeuenull325 private fun dequeue(): Runnable? {
326 _queue.loop { queue ->
327 when (queue) {
328 null -> return null
329 is Queue<*> -> {
330 val result = (queue as Queue<Runnable>).removeFirstOrNull()
331 if (result !== Queue.REMOVE_FROZEN) return result as Runnable?
332 _queue.compareAndSet(queue, queue.next())
333 }
334 else -> when {
335 queue === CLOSED_EMPTY -> return null
336 else -> if (_queue.compareAndSet(queue, null)) return queue as Runnable
337 }
338 }
339 }
340 }
341
closeQueuenull342 private fun closeQueue() {
343 assert { isCompleted }
344 _queue.loop { queue ->
345 when (queue) {
346 null -> if (_queue.compareAndSet(null, CLOSED_EMPTY)) return
347 is Queue<*> -> {
348 queue.close()
349 return
350 }
351 else -> when {
352 queue === CLOSED_EMPTY -> return
353 else -> {
354 // update to full-blown queue to close
355 val newQueue = Queue<Runnable>(Queue.INITIAL_CAPACITY, singleConsumer = true)
356 newQueue.addLast(queue as Runnable)
357 if (_queue.compareAndSet(queue, newQueue)) return
358 }
359 }
360 }
361 }
362
363 }
364
schedulenull365 fun schedule(now: Long, delayedTask: DelayedTask) {
366 when (scheduleImpl(now, delayedTask)) {
367 SCHEDULE_OK -> if (shouldUnpark(delayedTask)) unpark()
368 SCHEDULE_COMPLETED -> reschedule(now, delayedTask)
369 SCHEDULE_DISPOSED -> {} // do nothing -- task was already disposed
370 else -> error("unexpected result")
371 }
372 }
373
shouldUnparknull374 private fun shouldUnpark(task: DelayedTask): Boolean = _delayed.value?.peek() === task
375
376 private fun scheduleImpl(now: Long, delayedTask: DelayedTask): Int {
377 if (isCompleted) return SCHEDULE_COMPLETED
378 val delayedQueue = _delayed.value ?: run {
379 _delayed.compareAndSet(null, DelayedTaskQueue(now))
380 _delayed.value!!
381 }
382 return delayedTask.scheduleTask(now, delayedQueue, this)
383 }
384
385 // It performs "hard" shutdown for test cleanup purposes
resetAllnull386 protected fun resetAll() {
387 _queue.value = null
388 _delayed.value = null
389 }
390
391 // This is a "soft" (normal) shutdown
rescheduleAllDelayednull392 private fun rescheduleAllDelayed() {
393 val now = nanoTime()
394 while (true) {
395 /*
396 * `removeFirstOrNull` below is the only operation on DelayedTask & ThreadSafeHeap that is not
397 * synchronized on DelayedTask itself. All other operation are synchronized both on
398 * DelayedTask & ThreadSafeHeap instances (in this order). It is still safe, because `dispose`
399 * first removes DelayedTask from the heap (under synchronization) then
400 * assign "_heap = DISPOSED_TASK", so there cannot be ever a race to _heap reference update.
401 */
402 val delayedTask = _delayed.value?.removeFirstOrNull() ?: break
403 reschedule(now, delayedTask)
404 }
405 }
406
407 internal abstract class DelayedTask(
408 /**
409 * This field can be only modified in [scheduleTask] before putting this DelayedTask
410 * into heap to avoid overflow and corruption of heap data structure.
411 */
412 @JvmField var nanoTime: Long
413 ) : Runnable, Comparable<DelayedTask>, DisposableHandle, ThreadSafeHeapNode, SynchronizedObject() {
414 @Volatile
415 private var _heap: Any? = null // null | ThreadSafeHeap | DISPOSED_TASK
416
417 override var heap: ThreadSafeHeap<*>?
418 get() = _heap as? ThreadSafeHeap<*>
419 set(value) {
420 require(_heap !== DISPOSED_TASK) // this can never happen, it is always checked before adding/removing
421 _heap = value
422 }
423
424 override var index: Int = -1
425
compareTonull426 override fun compareTo(other: DelayedTask): Int {
427 val dTime = nanoTime - other.nanoTime
428 return when {
429 dTime > 0 -> 1
430 dTime < 0 -> -1
431 else -> 0
432 }
433 }
434
timeToExecutenull435 fun timeToExecute(now: Long): Boolean = now - nanoTime >= 0L
436
437 fun scheduleTask(now: Long, delayed: DelayedTaskQueue, eventLoop: EventLoopImplBase): Int = synchronized<Int>(this) {
438 if (_heap === DISPOSED_TASK) return SCHEDULE_DISPOSED // don't add -- was already disposed
439 delayed.addLastIf(this) { firstTask ->
440 if (eventLoop.isCompleted) return SCHEDULE_COMPLETED // non-local return from scheduleTask
441 /**
442 * We are about to add new task and we have to make sure that [DelayedTaskQueue]
443 * invariant is maintained. The code in this lambda is additionally executed under
444 * the lock of [DelayedTaskQueue] and working with [DelayedTaskQueue.timeNow] here is thread-safe.
445 */
446 if (firstTask == null) {
447 /**
448 * When adding the first delayed task we simply update queue's [DelayedTaskQueue.timeNow] to
449 * the current now time even if that means "going backwards in time". This makes the structure
450 * self-correcting in spite of wild jumps in `nanoTime()` measurements once all delayed tasks
451 * are removed from the delayed queue for execution.
452 */
453 delayed.timeNow = now
454 } else {
455 /**
456 * Carefully update [DelayedTaskQueue.timeNow] so that it does not sweep past first's tasks time
457 * and only goes forward in time. We cannot let it go backwards in time or invariant can be
458 * violated for tasks that were already scheduled.
459 */
460 val firstTime = firstTask.nanoTime
461 // compute min(now, firstTime) using a wrap-safe check
462 val minTime = if (firstTime - now >= 0) now else firstTime
463 // update timeNow only when going forward in time
464 if (minTime - delayed.timeNow > 0) delayed.timeNow = minTime
465 }
466 /**
467 * Here [DelayedTaskQueue.timeNow] was already modified and we have to double-check that newly added
468 * task does not violate [DelayedTaskQueue] invariant because of that. Note also that this scheduleTask
469 * function can be called to reschedule from one queue to another and this might be another reason
470 * where new task's time might now violate invariant.
471 * We correct invariant violation (if any) by simply changing this task's time to now.
472 */
473 if (nanoTime - delayed.timeNow < 0) nanoTime = delayed.timeNow
474 true
475 }
476 return SCHEDULE_OK
477 }
478
<lambda>null479 final override fun dispose(): Unit = synchronized(this) {
480 val heap = _heap
481 if (heap === DISPOSED_TASK) return // already disposed
482 (heap as? DelayedTaskQueue)?.remove(this) // remove if it is in heap (first)
483 _heap = DISPOSED_TASK // never add again to any heap
484 }
485
toStringnull486 override fun toString(): String = "Delayed[nanos=$nanoTime]"
487 }
488
489 private inner class DelayedResumeTask(
490 nanoTime: Long,
491 private val cont: CancellableContinuation<Unit>
492 ) : DelayedTask(nanoTime) {
493 override fun run() { with(cont) { resumeUndispatched(Unit) } }
494 override fun toString(): String = super.toString() + cont.toString()
495 }
496
497 private class DelayedRunnableTask(
498 nanoTime: Long,
499 private val block: Runnable
500 ) : DelayedTask(nanoTime) {
runnull501 override fun run() { block.run() }
toStringnull502 override fun toString(): String = super.toString() + block.toString()
503 }
504
505 /**
506 * Delayed task queue maintains stable time-comparision invariant despite potential wraparounds in
507 * long nano time measurements by maintaining last observed [timeNow]. It protects the integrity of the
508 * heap data structure in spite of potential non-monotonicity of `nanoTime()` source.
509 * The invariant is that for every scheduled [DelayedTask]:
510 *
511 * ```
512 * delayedTask.nanoTime - timeNow >= 0
513 * ```
514 *
515 * So the comparison of scheduled tasks via [DelayedTask.compareTo] is always stable as
516 * scheduled [DelayedTask.nanoTime] can be at most [Long.MAX_VALUE] apart. This invariant is maintained when
517 * new tasks are added by [DelayedTask.scheduleTask] function and it cannot be violated when tasks are removed
518 * (so there is nothing special to do there).
519 */
520 internal class DelayedTaskQueue(
521 @JvmField var timeNow: Long
522 ) : ThreadSafeHeap<DelayedTask>()
523 }
524
525 internal expect fun createEventLoop(): EventLoop
526
527 internal expect fun nanoTime(): Long
528
529 internal expect object DefaultExecutor {
530 fun enqueue(task: Runnable)
531 }
532
533 /**
534 * Used by Darwin targets to wrap a [Runnable.run] call in an Objective-C Autorelease Pool. It is a no-op on JVM, JS and
535 * non-Darwin native targets.
536 *
537 * Coroutines on Darwin targets can call into the Objective-C world, where a callee may push a to-be-returned object to
538 * the Autorelease Pool, so as to avoid a premature ARC release before it reaches the caller. This means the pool must
539 * be eventually drained to avoid leaks. Since Kotlin Coroutines does not use [NSRunLoop], which provides automatic
540 * pool management, it must manage the pool creation and pool drainage manually.
541 */
platformAutoreleasePoolnull542 internal expect inline fun platformAutoreleasePool(crossinline block: () -> Unit)
543