• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines
6 
7 import kotlinx.atomicfu.*
8 import kotlinx.coroutines.internal.*
9 import kotlin.coroutines.*
10 import kotlin.jvm.*
11 import kotlin.native.concurrent.*
12 
13 /**
14  * Extended by [CoroutineDispatcher] implementations that have event loop inside and can
15  * be asked to process next event from their event queue.
16  *
17  * It may optionally implement [Delay] interface and support time-scheduled tasks.
18  * It is created or pigged back onto (see [ThreadLocalEventLoop])
19  * by `runBlocking` and by [Dispatchers.Unconfined].
20  *
21  * @suppress **This an internal API and should not be used from general code.**
22  */
23 internal abstract class EventLoop : CoroutineDispatcher() {
24     /**
25      * Counts the number of nested `runBlocking` and [Dispatchers.Unconfined] that use this event loop.
26      */
27     private var useCount = 0L
28 
29     /**
30      * Set to true on any use by `runBlocking`, because it potentially leaks this loop to other threads, so
31      * this instance must be properly shutdown. We don't need to shutdown event loop that was used solely
32      * by [Dispatchers.Unconfined] -- it can be left as [ThreadLocalEventLoop] and reused next time.
33      */
34     private var shared = false
35 
36     /**
37      * Queue used by [Dispatchers.Unconfined] tasks.
38      * These tasks are thread-local for performance and take precedence over the rest of the queue.
39      */
40     private var unconfinedQueue: ArrayQueue<DispatchedTask<*>>? = null
41 
42     /**
43      * Processes next event in this event loop.
44      *
45      * The result of this function is to be interpreted like this:
46      * * `<= 0` -- there are potentially more events for immediate processing;
47      * * `> 0` -- a number of nanoseconds to wait for next scheduled event;
48      * * [Long.MAX_VALUE] -- no more events.
49      *
50      * **NOTE**: Must be invoked only from the event loop's thread
51      *          (no check for performance reasons, may be added in the future).
52      */
53     public open fun processNextEvent(): Long {
54         if (!processUnconfinedEvent()) return Long.MAX_VALUE
55         return 0
56     }
57 
58     protected open val isEmpty: Boolean get() = isUnconfinedQueueEmpty
59 
60     protected open val nextTime: Long
61         get() {
62             val queue = unconfinedQueue ?: return Long.MAX_VALUE
63             return if (queue.isEmpty) Long.MAX_VALUE else 0L
64         }
65 
66     public fun processUnconfinedEvent(): Boolean {
67         val queue = unconfinedQueue ?: return false
68         val task = queue.removeFirstOrNull() ?: return false
69         task.run()
70         return true
71     }
72     /**
73      * Returns `true` if the invoking `runBlocking(context) { ... }` that was passed this event loop in its context
74      * parameter should call [processNextEvent] for this event loop (otherwise, it will process thread-local one).
75      * By default, event loop implementation is thread-local and should not processed in the context
76      * (current thread's event loop should be processed instead).
77      */
78     public open fun shouldBeProcessedFromContext(): Boolean = false
79 
80     /**
81      * Dispatches task whose dispatcher returned `false` from [CoroutineDispatcher.isDispatchNeeded]
82      * into the current event loop.
83      */
84     public fun dispatchUnconfined(task: DispatchedTask<*>) {
85         val queue = unconfinedQueue ?:
86             ArrayQueue<DispatchedTask<*>>().also { unconfinedQueue = it }
87         queue.addLast(task)
88     }
89 
90     public val isActive: Boolean
91         get() = useCount > 0
92 
93     public val isUnconfinedLoopActive: Boolean
94         get() = useCount >= delta(unconfined = true)
95 
96     // May only be used from the event loop's thread
97     public val isUnconfinedQueueEmpty: Boolean
98         get() = unconfinedQueue?.isEmpty ?: true
99 
100     private fun delta(unconfined: Boolean) =
101         if (unconfined) (1L shl 32) else 1L
102 
103     fun incrementUseCount(unconfined: Boolean = false) {
104         useCount += delta(unconfined)
105         if (!unconfined) shared = true
106     }
107 
108     fun decrementUseCount(unconfined: Boolean = false) {
109         useCount -= delta(unconfined)
110         if (useCount > 0) return
111         assert { useCount == 0L } // "Extra decrementUseCount"
112         if (shared) {
113             // shut it down and remove from ThreadLocalEventLoop
114             shutdown()
115         }
116     }
117 
118     final override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
119         parallelism.checkParallelism()
120         return this
121     }
122 
123     open fun shutdown() {}
124 }
125 
126 @ThreadLocal
127 internal object ThreadLocalEventLoop {
128     private val ref = CommonThreadLocal<EventLoop?>()
129 
130     internal val eventLoop: EventLoop
<lambda>null131         get() = ref.get() ?: createEventLoop().also { ref.set(it) }
132 
currentOrNullnull133     internal fun currentOrNull(): EventLoop? =
134         ref.get()
135 
136     internal fun resetEventLoop() {
137         ref.set(null)
138     }
139 
setEventLoopnull140     internal fun setEventLoop(eventLoop: EventLoop) {
141         ref.set(eventLoop)
142     }
143 }
144 
145 @SharedImmutable
146 private val DISPOSED_TASK = Symbol("REMOVED_TASK")
147 
148 // results for scheduleImpl
149 private const val SCHEDULE_OK = 0
150 private const val SCHEDULE_COMPLETED = 1
151 private const val SCHEDULE_DISPOSED = 2
152 
153 private const val MS_TO_NS = 1_000_000L
154 private const val MAX_MS = Long.MAX_VALUE / MS_TO_NS
155 
156 /**
157  * First-line overflow protection -- limit maximal delay.
158  * Delays longer than this one (~146 years) are considered to be delayed "forever".
159  */
160 private const val MAX_DELAY_NS = Long.MAX_VALUE / 2
161 
delayToNanosnull162 internal fun delayToNanos(timeMillis: Long): Long = when {
163     timeMillis <= 0 -> 0L
164     timeMillis >= MAX_MS -> Long.MAX_VALUE
165     else -> timeMillis * MS_TO_NS
166 }
167 
delayNanosToMillisnull168 internal fun delayNanosToMillis(timeNanos: Long): Long =
169     timeNanos / MS_TO_NS
170 
171 @SharedImmutable
172 private val CLOSED_EMPTY = Symbol("CLOSED_EMPTY")
173 
174 private typealias Queue<T> = LockFreeTaskQueueCore<T>
175 
176 internal expect abstract class EventLoopImplPlatform() : EventLoop {
177     // Called to unpark this event loop's thread
178     protected fun unpark()
179 
180     // Called to reschedule to DefaultExecutor when this event loop is complete
181     protected fun reschedule(now: Long, delayedTask: EventLoopImplBase.DelayedTask)
182 }
183 
184 internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
185     // null | CLOSED_EMPTY | task | Queue<Runnable>
186     private val _queue = atomic<Any?>(null)
187 
188     // Allocated only only once
189     private val _delayed = atomic<DelayedTaskQueue?>(null)
190 
191     private val _isCompleted = atomic(false)
192     private var isCompleted
193         get() = _isCompleted.value
194         set(value) { _isCompleted.value = value }
195 
196     override val isEmpty: Boolean get() {
197         if (!isUnconfinedQueueEmpty) return false
198         val delayed = _delayed.value
199         if (delayed != null && !delayed.isEmpty) return false
200         return when (val queue = _queue.value) {
201             null -> true
202             is Queue<*> -> queue.isEmpty
203             else -> queue === CLOSED_EMPTY
204         }
205     }
206 
207     protected override val nextTime: Long
208         get() {
209             if (super.nextTime == 0L) return 0L
210             val queue = _queue.value
211             when {
212                 queue === null -> {} // empty queue -- proceed
213                 queue is Queue<*> -> if (!queue.isEmpty) return 0 // non-empty queue
214                 queue === CLOSED_EMPTY -> return Long.MAX_VALUE // no more events -- closed
215                 else -> return 0 // non-empty queue
216             }
217             val nextDelayedTask = _delayed.value?.peek() ?: return Long.MAX_VALUE
218             return (nextDelayedTask.nanoTime - nanoTime()).coerceAtLeast(0)
219         }
220 
shutdownnull221     override fun shutdown() {
222         // Clean up thread-local reference here -- this event loop is shutting down
223         ThreadLocalEventLoop.resetEventLoop()
224         // We should signal that this event loop should not accept any more tasks
225         // and process queued events (that could have been added after last processNextEvent)
226         isCompleted = true
227         closeQueue()
228         // complete processing of all queued tasks
229         while (processNextEvent() <= 0) { /* spin */ }
230         // reschedule the rest of delayed tasks
231         rescheduleAllDelayed()
232     }
233 
scheduleResumeAfterDelaynull234     public override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
235         val timeNanos = delayToNanos(timeMillis)
236         if (timeNanos < MAX_DELAY_NS) {
237             val now = nanoTime()
238             DelayedResumeTask(now + timeNanos, continuation).also { task ->
239                 /*
240                  * Order is important here: first we schedule the heap and only then
241                  * publish it to continuation. Otherwise, `DelayedResumeTask` would
242                  * have to know how to be disposed of even when it wasn't scheduled yet.
243                  */
244                 schedule(now, task)
245                 continuation.disposeOnCancellation(task)
246             }
247         }
248     }
249 
scheduleInvokeOnTimeoutnull250     protected fun scheduleInvokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
251         val timeNanos = delayToNanos(timeMillis)
252         return if (timeNanos < MAX_DELAY_NS) {
253             val now = nanoTime()
254             DelayedRunnableTask(now + timeNanos, block).also { task ->
255                 schedule(now, task)
256             }
257         } else {
258             NonDisposableHandle
259         }
260     }
261 
processNextEventnull262     override fun processNextEvent(): Long {
263         // unconfined events take priority
264         if (processUnconfinedEvent()) return 0
265         // queue all delayed tasks that are due to be executed
266         val delayed = _delayed.value
267         if (delayed != null && !delayed.isEmpty) {
268             val now = nanoTime()
269             while (true) {
270                 // make sure that moving from delayed to queue removes from delayed only after it is added to queue
271                 // to make sure that 'isEmpty' and `nextTime` that check both of them
272                 // do not transiently report that both delayed and queue are empty during move
273                 delayed.removeFirstIf {
274                     if (it.timeToExecute(now)) {
275                         enqueueImpl(it)
276                     } else
277                         false
278                 } ?: break // quit loop when nothing more to remove or enqueueImpl returns false on "isComplete"
279             }
280         }
281         // then process one event from queue
282         val task = dequeue()
283         if (task != null) {
284             platformAutoreleasePool { task.run() }
285             return 0
286         }
287         return nextTime
288     }
289 
dispatchnull290     public final override fun dispatch(context: CoroutineContext, block: Runnable) = enqueue(block)
291 
292     open fun enqueue(task: Runnable) {
293         if (enqueueImpl(task)) {
294             // todo: we should unpark only when this delayed task became first in the queue
295             unpark()
296         } else {
297             DefaultExecutor.enqueue(task)
298         }
299     }
300 
301     @Suppress("UNCHECKED_CAST")
enqueueImplnull302     private fun enqueueImpl(task: Runnable): Boolean {
303         _queue.loop { queue ->
304             if (isCompleted) return false // fail fast if already completed, may still add, but queues will close
305             when (queue) {
306                 null -> if (_queue.compareAndSet(null, task)) return true
307                 is Queue<*> -> {
308                     when ((queue as Queue<Runnable>).addLast(task)) {
309                         Queue.ADD_SUCCESS -> return true
310                         Queue.ADD_CLOSED -> return false
311                         Queue.ADD_FROZEN -> _queue.compareAndSet(queue, queue.next())
312                     }
313                 }
314                 else -> when {
315                     queue === CLOSED_EMPTY -> return false
316                     else -> {
317                         // update to full-blown queue to add one more
318                         val newQueue = Queue<Runnable>(Queue.INITIAL_CAPACITY, singleConsumer = true)
319                         newQueue.addLast(queue as Runnable)
320                         newQueue.addLast(task)
321                         if (_queue.compareAndSet(queue, newQueue)) return true
322                     }
323                 }
324             }
325         }
326     }
327 
328     @Suppress("UNCHECKED_CAST")
dequeuenull329     private fun dequeue(): Runnable? {
330         _queue.loop { queue ->
331             when (queue) {
332                 null -> return null
333                 is Queue<*> -> {
334                     val result = (queue as Queue<Runnable>).removeFirstOrNull()
335                     if (result !== Queue.REMOVE_FROZEN) return result as Runnable?
336                     _queue.compareAndSet(queue, queue.next())
337                 }
338                 else -> when {
339                     queue === CLOSED_EMPTY -> return null
340                     else -> if (_queue.compareAndSet(queue, null)) return queue as Runnable
341                 }
342             }
343         }
344     }
345 
closeQueuenull346     private fun closeQueue() {
347         assert { isCompleted }
348         _queue.loop { queue ->
349             when (queue) {
350                 null -> if (_queue.compareAndSet(null, CLOSED_EMPTY)) return
351                 is Queue<*> -> {
352                     queue.close()
353                     return
354                 }
355                 else -> when {
356                     queue === CLOSED_EMPTY -> return
357                     else -> {
358                         // update to full-blown queue to close
359                         val newQueue = Queue<Runnable>(Queue.INITIAL_CAPACITY, singleConsumer = true)
360                         newQueue.addLast(queue as Runnable)
361                         if (_queue.compareAndSet(queue, newQueue)) return
362                     }
363                 }
364             }
365         }
366 
367     }
368 
schedulenull369     public fun schedule(now: Long, delayedTask: DelayedTask) {
370         when (scheduleImpl(now, delayedTask)) {
371             SCHEDULE_OK -> if (shouldUnpark(delayedTask)) unpark()
372             SCHEDULE_COMPLETED -> reschedule(now, delayedTask)
373             SCHEDULE_DISPOSED -> {} // do nothing -- task was already disposed
374             else -> error("unexpected result")
375         }
376     }
377 
shouldUnparknull378     private fun shouldUnpark(task: DelayedTask): Boolean = _delayed.value?.peek() === task
379 
380     private fun scheduleImpl(now: Long, delayedTask: DelayedTask): Int {
381         if (isCompleted) return SCHEDULE_COMPLETED
382         val delayedQueue = _delayed.value ?: run {
383             _delayed.compareAndSet(null, DelayedTaskQueue(now))
384             _delayed.value!!
385         }
386         return delayedTask.scheduleTask(now, delayedQueue, this)
387     }
388 
389     // It performs "hard" shutdown for test cleanup purposes
resetAllnull390     protected fun resetAll() {
391         _queue.value = null
392         _delayed.value = null
393     }
394 
395     // This is a "soft" (normal) shutdown
rescheduleAllDelayednull396     private fun rescheduleAllDelayed() {
397         val now = nanoTime()
398         while (true) {
399             /*
400              * `removeFirstOrNull` below is the only operation on DelayedTask & ThreadSafeHeap that is not
401              * synchronized on DelayedTask itself. All other operation are synchronized both on
402              * DelayedTask & ThreadSafeHeap instances (in this order). It is still safe, because `dispose`
403              * first removes DelayedTask from the heap (under synchronization) then
404              * assign "_heap = DISPOSED_TASK", so there cannot be ever a race to _heap reference update.
405              */
406             val delayedTask = _delayed.value?.removeFirstOrNull() ?: break
407             reschedule(now, delayedTask)
408         }
409     }
410 
411     internal abstract class DelayedTask(
412         /**
413          * This field can be only modified in [scheduleTask] before putting this DelayedTask
414          * into heap to avoid overflow and corruption of heap data structure.
415          */
416         @JvmField var nanoTime: Long
417     ) : Runnable, Comparable<DelayedTask>, DisposableHandle, ThreadSafeHeapNode {
418         @Volatile
419         private var _heap: Any? = null // null | ThreadSafeHeap | DISPOSED_TASK
420 
421         override var heap: ThreadSafeHeap<*>?
422             get() = _heap as? ThreadSafeHeap<*>
423             set(value) {
424                 require(_heap !== DISPOSED_TASK) // this can never happen, it is always checked before adding/removing
425                 _heap = value
426             }
427 
428         override var index: Int = -1
429 
compareTonull430         override fun compareTo(other: DelayedTask): Int {
431             val dTime = nanoTime - other.nanoTime
432             return when {
433                 dTime > 0 -> 1
434                 dTime < 0 -> -1
435                 else -> 0
436             }
437         }
438 
timeToExecutenull439         fun timeToExecute(now: Long): Boolean = now - nanoTime >= 0L
440 
441         @Synchronized
442         fun scheduleTask(now: Long, delayed: DelayedTaskQueue, eventLoop: EventLoopImplBase): Int {
443             if (_heap === DISPOSED_TASK) return SCHEDULE_DISPOSED // don't add -- was already disposed
444             delayed.addLastIf(this) { firstTask ->
445                 if (eventLoop.isCompleted) return SCHEDULE_COMPLETED // non-local return from scheduleTask
446                 /**
447                  * We are about to add new task and we have to make sure that [DelayedTaskQueue]
448                  * invariant is maintained. The code in this lambda is additionally executed under
449                  * the lock of [DelayedTaskQueue] and working with [DelayedTaskQueue.timeNow] here is thread-safe.
450                  */
451                 if (firstTask == null) {
452                     /**
453                      * When adding the first delayed task we simply update queue's [DelayedTaskQueue.timeNow] to
454                      * the current now time even if that means "going backwards in time". This makes the structure
455                      * self-correcting in spite of wild jumps in `nanoTime()` measurements once all delayed tasks
456                      * are removed from the delayed queue for execution.
457                      */
458                     delayed.timeNow = now
459                 } else {
460                     /**
461                      * Carefully update [DelayedTaskQueue.timeNow] so that it does not sweep past first's tasks time
462                      * and only goes forward in time. We cannot let it go backwards in time or invariant can be
463                      * violated for tasks that were already scheduled.
464                      */
465                     val firstTime = firstTask.nanoTime
466                     // compute min(now, firstTime) using a wrap-safe check
467                     val minTime = if (firstTime - now >= 0) now else firstTime
468                     // update timeNow only when going forward in time
469                     if (minTime - delayed.timeNow > 0) delayed.timeNow = minTime
470                 }
471                 /**
472                  * Here [DelayedTaskQueue.timeNow] was already modified and we have to double-check that newly added
473                  * task does not violate [DelayedTaskQueue] invariant because of that. Note also that this scheduleTask
474                  * function can be called to reschedule from one queue to another and this might be another reason
475                  * where new task's time might now violate invariant.
476                  * We correct invariant violation (if any) by simply changing this task's time to now.
477                  */
478                 if (nanoTime - delayed.timeNow < 0) nanoTime = delayed.timeNow
479                 true
480             }
481             return SCHEDULE_OK
482         }
483 
484         @Synchronized
disposenull485         final override fun dispose() {
486             val heap = _heap
487             if (heap === DISPOSED_TASK) return // already disposed
488             @Suppress("UNCHECKED_CAST")
489             (heap as? DelayedTaskQueue)?.remove(this) // remove if it is in heap (first)
490             _heap = DISPOSED_TASK // never add again to any heap
491         }
492 
toStringnull493         override fun toString(): String = "Delayed[nanos=$nanoTime]"
494     }
495 
496     private inner class DelayedResumeTask(
497         nanoTime: Long,
498         private val cont: CancellableContinuation<Unit>
499     ) : DelayedTask(nanoTime) {
500         override fun run() { with(cont) { resumeUndispatched(Unit) } }
501         override fun toString(): String = super.toString() + cont.toString()
502     }
503 
504     private class DelayedRunnableTask(
505         nanoTime: Long,
506         private val block: Runnable
507     ) : DelayedTask(nanoTime) {
runnull508         override fun run() { block.run() }
toStringnull509         override fun toString(): String = super.toString() + block.toString()
510     }
511 
512     /**
513      * Delayed task queue maintains stable time-comparision invariant despite potential wraparounds in
514      * long nano time measurements by maintaining last observed [timeNow]. It protects the integrity of the
515      * heap data structure in spite of potential non-monotonicity of `nanoTime()` source.
516      * The invariant is that for every scheduled [DelayedTask]:
517      *
518      * ```
519      * delayedTask.nanoTime - timeNow >= 0
520      * ```
521      *
522      * So the comparison of scheduled tasks via [DelayedTask.compareTo] is always stable as
523      * scheduled [DelayedTask.nanoTime] can be at most [Long.MAX_VALUE] apart. This invariant is maintained when
524      * new tasks are added by [DelayedTask.scheduleTask] function and it cannot be violated when tasks are removed
525      * (so there is nothing special to do there).
526      */
527     internal class DelayedTaskQueue(
528         @JvmField var timeNow: Long
529     ) : ThreadSafeHeap<DelayedTask>()
530 }
531 
532 internal expect fun createEventLoop(): EventLoop
533 
534 internal expect fun nanoTime(): Long
535 
536 internal expect object DefaultExecutor {
537     public fun enqueue(task: Runnable)
538 }
539 
540 /**
541  * Used by Darwin targets to wrap a [Runnable.run] call in an Objective-C Autorelease Pool. It is a no-op on JVM, JS and
542  * non-Darwin native targets.
543  *
544  * Coroutines on Darwin targets can call into the Objective-C world, where a callee may push a to-be-returned object to
545  * the Autorelease Pool, so as to avoid a premature ARC release before it reaches the caller. This means the pool must
546  * be eventually drained to avoid leaks. Since Kotlin Coroutines does not use [NSRunLoop], which provides automatic
547  * pool management, it must manage the pool creation and pool drainage manually.
548  */
platformAutoreleasePoolnull549 internal expect inline fun platformAutoreleasePool(crossinline block: () -> Unit)
550