• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines
6 
7 import kotlinx.atomicfu.*
8 import kotlinx.coroutines.internal.*
9 import kotlin.coroutines.*
10 import kotlin.jvm.*
11 
12 /**
13  * Extended by [CoroutineDispatcher] implementations that have event loop inside and can
14  * be asked to process next event from their event queue.
15  *
16  * It may optionally implement [Delay] interface and support time-scheduled tasks.
17  * It is created or pigged back onto (see [ThreadLocalEventLoop])
18  * by `runBlocking` and by [Dispatchers.Unconfined].
19  *
20  * @suppress **This an internal API and should not be used from general code.**
21  */
22 internal abstract class EventLoop : CoroutineDispatcher() {
23     /**
24      * Counts the number of nested `runBlocking` and [Dispatchers.Unconfined] that use this event loop.
25      */
26     private var useCount = 0L
27 
28     /**
29      * Set to true on any use by `runBlocking`, because it potentially leaks this loop to other threads, so
30      * this instance must be properly shutdown. We don't need to shutdown event loop that was used solely
31      * by [Dispatchers.Unconfined] -- it can be left as [ThreadLocalEventLoop] and reused next time.
32      */
33     private var shared = false
34 
35     /**
36      * Queue used by [Dispatchers.Unconfined] tasks.
37      * These tasks are thread-local for performance and take precedence over the rest of the queue.
38      */
39     private var unconfinedQueue: ArrayDeque<DispatchedTask<*>>? = null
40 
41     /**
42      * Processes next event in this event loop.
43      *
44      * The result of this function is to be interpreted like this:
45      * * `<= 0` -- there are potentially more events for immediate processing;
46      * * `> 0` -- a number of nanoseconds to wait for next scheduled event;
47      * * [Long.MAX_VALUE] -- no more events.
48      *
49      * **NOTE**: Must be invoked only from the event loop's thread
50      *          (no check for performance reasons, may be added in the future).
51      */
52     open fun processNextEvent(): Long {
53         if (!processUnconfinedEvent()) return Long.MAX_VALUE
54         return 0
55     }
56 
57     protected open val isEmpty: Boolean get() = isUnconfinedQueueEmpty
58 
59     protected open val nextTime: Long
60         get() {
61             val queue = unconfinedQueue ?: return Long.MAX_VALUE
62             return if (queue.isEmpty()) Long.MAX_VALUE else 0L
63         }
64 
65     fun processUnconfinedEvent(): Boolean {
66         val queue = unconfinedQueue ?: return false
67         val task = queue.removeFirstOrNull() ?: return false
68         task.run()
69         return true
70     }
71     /**
72      * Returns `true` if the invoking `runBlocking(context) { ... }` that was passed this event loop in its context
73      * parameter should call [processNextEvent] for this event loop (otherwise, it will process thread-local one).
74      * By default, event loop implementation is thread-local and should not processed in the context
75      * (current thread's event loop should be processed instead).
76      */
77     open fun shouldBeProcessedFromContext(): Boolean = false
78 
79     /**
80      * Dispatches task whose dispatcher returned `false` from [CoroutineDispatcher.isDispatchNeeded]
81      * into the current event loop.
82      */
83     fun dispatchUnconfined(task: DispatchedTask<*>) {
84         val queue = unconfinedQueue ?:
85             ArrayDeque<DispatchedTask<*>>().also { unconfinedQueue = it }
86         queue.addLast(task)
87     }
88 
89     val isActive: Boolean
90         get() = useCount > 0
91 
92     val isUnconfinedLoopActive: Boolean
93         get() = useCount >= delta(unconfined = true)
94 
95     // May only be used from the event loop's thread
96     val isUnconfinedQueueEmpty: Boolean
97         get() = unconfinedQueue?.isEmpty() ?: true
98 
99     private fun delta(unconfined: Boolean) =
100         if (unconfined) (1L shl 32) else 1L
101 
102     fun incrementUseCount(unconfined: Boolean = false) {
103         useCount += delta(unconfined)
104         if (!unconfined) shared = true
105     }
106 
107     fun decrementUseCount(unconfined: Boolean = false) {
108         useCount -= delta(unconfined)
109         if (useCount > 0) return
110         assert { useCount == 0L } // "Extra decrementUseCount"
111         if (shared) {
112             // shut it down and remove from ThreadLocalEventLoop
113             shutdown()
114         }
115     }
116 
117     final override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
118         parallelism.checkParallelism()
119         return this
120     }
121 
122     open fun shutdown() {}
123 }
124 
125 internal object ThreadLocalEventLoop {
126     private val ref = commonThreadLocal<EventLoop?>(Symbol("ThreadLocalEventLoop"))
127 
128     internal val eventLoop: EventLoop
<lambda>null129         get() = ref.get() ?: createEventLoop().also { ref.set(it) }
130 
currentOrNullnull131     internal fun currentOrNull(): EventLoop? =
132         ref.get()
133 
134     internal fun resetEventLoop() {
135         ref.set(null)
136     }
137 
setEventLoopnull138     internal fun setEventLoop(eventLoop: EventLoop) {
139         ref.set(eventLoop)
140     }
141 }
142 
143 private val DISPOSED_TASK = Symbol("REMOVED_TASK")
144 
145 // results for scheduleImpl
146 private const val SCHEDULE_OK = 0
147 private const val SCHEDULE_COMPLETED = 1
148 private const val SCHEDULE_DISPOSED = 2
149 
150 private const val MS_TO_NS = 1_000_000L
151 private const val MAX_MS = Long.MAX_VALUE / MS_TO_NS
152 
153 /**
154  * First-line overflow protection -- limit maximal delay.
155  * Delays longer than this one (~146 years) are considered to be delayed "forever".
156  */
157 private const val MAX_DELAY_NS = Long.MAX_VALUE / 2
158 
delayToNanosnull159 internal fun delayToNanos(timeMillis: Long): Long = when {
160     timeMillis <= 0 -> 0L
161     timeMillis >= MAX_MS -> Long.MAX_VALUE
162     else -> timeMillis * MS_TO_NS
163 }
164 
delayNanosToMillisnull165 internal fun delayNanosToMillis(timeNanos: Long): Long =
166     timeNanos / MS_TO_NS
167 
168 private val CLOSED_EMPTY = Symbol("CLOSED_EMPTY")
169 
170 private typealias Queue<T> = LockFreeTaskQueueCore<T>
171 
172 internal expect abstract class EventLoopImplPlatform() : EventLoop {
173     // Called to unpark this event loop's thread
174     protected fun unpark()
175 
176     // Called to reschedule to DefaultExecutor when this event loop is complete
177     protected fun reschedule(now: Long, delayedTask: EventLoopImplBase.DelayedTask)
178 }
179 
180 internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
181     // null | CLOSED_EMPTY | task | Queue<Runnable>
182     private val _queue = atomic<Any?>(null)
183 
184     // Allocated only only once
185     private val _delayed = atomic<DelayedTaskQueue?>(null)
186 
187     private val _isCompleted = atomic(false)
188     private var isCompleted
189         get() = _isCompleted.value
190         set(value) { _isCompleted.value = value }
191 
192     override val isEmpty: Boolean get() {
193         if (!isUnconfinedQueueEmpty) return false
194         val delayed = _delayed.value
195         if (delayed != null && !delayed.isEmpty) return false
196         return when (val queue = _queue.value) {
197             null -> true
198             is Queue<*> -> queue.isEmpty
199             else -> queue === CLOSED_EMPTY
200         }
201     }
202 
203     override val nextTime: Long
204         get() {
205             if (super.nextTime == 0L) return 0L
206             val queue = _queue.value
207             when {
208                 queue === null -> {} // empty queue -- proceed
209                 queue is Queue<*> -> if (!queue.isEmpty) return 0 // non-empty queue
210                 queue === CLOSED_EMPTY -> return Long.MAX_VALUE // no more events -- closed
211                 else -> return 0 // non-empty queue
212             }
213             val nextDelayedTask = _delayed.value?.peek() ?: return Long.MAX_VALUE
214             return (nextDelayedTask.nanoTime - nanoTime()).coerceAtLeast(0)
215         }
216 
shutdownnull217     override fun shutdown() {
218         // Clean up thread-local reference here -- this event loop is shutting down
219         ThreadLocalEventLoop.resetEventLoop()
220         // We should signal that this event loop should not accept any more tasks
221         // and process queued events (that could have been added after last processNextEvent)
222         isCompleted = true
223         closeQueue()
224         // complete processing of all queued tasks
225         while (processNextEvent() <= 0) { /* spin */ }
226         // reschedule the rest of delayed tasks
227         rescheduleAllDelayed()
228     }
229 
scheduleResumeAfterDelaynull230     override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
231         val timeNanos = delayToNanos(timeMillis)
232         if (timeNanos < MAX_DELAY_NS) {
233             val now = nanoTime()
234             DelayedResumeTask(now + timeNanos, continuation).also { task ->
235                 /*
236                  * Order is important here: first we schedule the heap and only then
237                  * publish it to continuation. Otherwise, `DelayedResumeTask` would
238                  * have to know how to be disposed of even when it wasn't scheduled yet.
239                  */
240                 schedule(now, task)
241                 continuation.disposeOnCancellation(task)
242             }
243         }
244     }
245 
scheduleInvokeOnTimeoutnull246     protected fun scheduleInvokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
247         val timeNanos = delayToNanos(timeMillis)
248         return if (timeNanos < MAX_DELAY_NS) {
249             val now = nanoTime()
250             DelayedRunnableTask(now + timeNanos, block).also { task ->
251                 schedule(now, task)
252             }
253         } else {
254             NonDisposableHandle
255         }
256     }
257 
processNextEventnull258     override fun processNextEvent(): Long {
259         // unconfined events take priority
260         if (processUnconfinedEvent()) return 0
261         // queue all delayed tasks that are due to be executed
262         val delayed = _delayed.value
263         if (delayed != null && !delayed.isEmpty) {
264             val now = nanoTime()
265             while (true) {
266                 // make sure that moving from delayed to queue removes from delayed only after it is added to queue
267                 // to make sure that 'isEmpty' and `nextTime` that check both of them
268                 // do not transiently report that both delayed and queue are empty during move
269                 delayed.removeFirstIf {
270                     if (it.timeToExecute(now)) {
271                         enqueueImpl(it)
272                     } else
273                         false
274                 } ?: break // quit loop when nothing more to remove or enqueueImpl returns false on "isComplete"
275             }
276         }
277         // then process one event from queue
278         val task = dequeue()
279         if (task != null) {
280             platformAutoreleasePool { task.run() }
281             return 0
282         }
283         return nextTime
284     }
285 
dispatchnull286     final override fun dispatch(context: CoroutineContext, block: Runnable) = enqueue(block)
287 
288     open fun enqueue(task: Runnable) {
289         if (enqueueImpl(task)) {
290             // todo: we should unpark only when this delayed task became first in the queue
291             unpark()
292         } else {
293             DefaultExecutor.enqueue(task)
294         }
295     }
296 
297     @Suppress("UNCHECKED_CAST")
enqueueImplnull298     private fun enqueueImpl(task: Runnable): Boolean {
299         _queue.loop { queue ->
300             if (isCompleted) return false // fail fast if already completed, may still add, but queues will close
301             when (queue) {
302                 null -> if (_queue.compareAndSet(null, task)) return true
303                 is Queue<*> -> {
304                     when ((queue as Queue<Runnable>).addLast(task)) {
305                         Queue.ADD_SUCCESS -> return true
306                         Queue.ADD_CLOSED -> return false
307                         Queue.ADD_FROZEN -> _queue.compareAndSet(queue, queue.next())
308                     }
309                 }
310                 else -> when {
311                     queue === CLOSED_EMPTY -> return false
312                     else -> {
313                         // update to full-blown queue to add one more
314                         val newQueue = Queue<Runnable>(Queue.INITIAL_CAPACITY, singleConsumer = true)
315                         newQueue.addLast(queue as Runnable)
316                         newQueue.addLast(task)
317                         if (_queue.compareAndSet(queue, newQueue)) return true
318                     }
319                 }
320             }
321         }
322     }
323 
324     @Suppress("UNCHECKED_CAST")
dequeuenull325     private fun dequeue(): Runnable? {
326         _queue.loop { queue ->
327             when (queue) {
328                 null -> return null
329                 is Queue<*> -> {
330                     val result = (queue as Queue<Runnable>).removeFirstOrNull()
331                     if (result !== Queue.REMOVE_FROZEN) return result as Runnable?
332                     _queue.compareAndSet(queue, queue.next())
333                 }
334                 else -> when {
335                     queue === CLOSED_EMPTY -> return null
336                     else -> if (_queue.compareAndSet(queue, null)) return queue as Runnable
337                 }
338             }
339         }
340     }
341 
closeQueuenull342     private fun closeQueue() {
343         assert { isCompleted }
344         _queue.loop { queue ->
345             when (queue) {
346                 null -> if (_queue.compareAndSet(null, CLOSED_EMPTY)) return
347                 is Queue<*> -> {
348                     queue.close()
349                     return
350                 }
351                 else -> when {
352                     queue === CLOSED_EMPTY -> return
353                     else -> {
354                         // update to full-blown queue to close
355                         val newQueue = Queue<Runnable>(Queue.INITIAL_CAPACITY, singleConsumer = true)
356                         newQueue.addLast(queue as Runnable)
357                         if (_queue.compareAndSet(queue, newQueue)) return
358                     }
359                 }
360             }
361         }
362 
363     }
364 
schedulenull365     fun schedule(now: Long, delayedTask: DelayedTask) {
366         when (scheduleImpl(now, delayedTask)) {
367             SCHEDULE_OK -> if (shouldUnpark(delayedTask)) unpark()
368             SCHEDULE_COMPLETED -> reschedule(now, delayedTask)
369             SCHEDULE_DISPOSED -> {} // do nothing -- task was already disposed
370             else -> error("unexpected result")
371         }
372     }
373 
shouldUnparknull374     private fun shouldUnpark(task: DelayedTask): Boolean = _delayed.value?.peek() === task
375 
376     private fun scheduleImpl(now: Long, delayedTask: DelayedTask): Int {
377         if (isCompleted) return SCHEDULE_COMPLETED
378         val delayedQueue = _delayed.value ?: run {
379             _delayed.compareAndSet(null, DelayedTaskQueue(now))
380             _delayed.value!!
381         }
382         return delayedTask.scheduleTask(now, delayedQueue, this)
383     }
384 
385     // It performs "hard" shutdown for test cleanup purposes
resetAllnull386     protected fun resetAll() {
387         _queue.value = null
388         _delayed.value = null
389     }
390 
391     // This is a "soft" (normal) shutdown
rescheduleAllDelayednull392     private fun rescheduleAllDelayed() {
393         val now = nanoTime()
394         while (true) {
395             /*
396              * `removeFirstOrNull` below is the only operation on DelayedTask & ThreadSafeHeap that is not
397              * synchronized on DelayedTask itself. All other operation are synchronized both on
398              * DelayedTask & ThreadSafeHeap instances (in this order). It is still safe, because `dispose`
399              * first removes DelayedTask from the heap (under synchronization) then
400              * assign "_heap = DISPOSED_TASK", so there cannot be ever a race to _heap reference update.
401              */
402             val delayedTask = _delayed.value?.removeFirstOrNull() ?: break
403             reschedule(now, delayedTask)
404         }
405     }
406 
407     internal abstract class DelayedTask(
408         /**
409          * This field can be only modified in [scheduleTask] before putting this DelayedTask
410          * into heap to avoid overflow and corruption of heap data structure.
411          */
412         @JvmField var nanoTime: Long
413     ) : Runnable, Comparable<DelayedTask>, DisposableHandle, ThreadSafeHeapNode, SynchronizedObject() {
414         @Volatile
415         private var _heap: Any? = null // null | ThreadSafeHeap | DISPOSED_TASK
416 
417         override var heap: ThreadSafeHeap<*>?
418             get() = _heap as? ThreadSafeHeap<*>
419             set(value) {
420                 require(_heap !== DISPOSED_TASK) // this can never happen, it is always checked before adding/removing
421                 _heap = value
422             }
423 
424         override var index: Int = -1
425 
compareTonull426         override fun compareTo(other: DelayedTask): Int {
427             val dTime = nanoTime - other.nanoTime
428             return when {
429                 dTime > 0 -> 1
430                 dTime < 0 -> -1
431                 else -> 0
432             }
433         }
434 
timeToExecutenull435         fun timeToExecute(now: Long): Boolean = now - nanoTime >= 0L
436 
437         fun scheduleTask(now: Long, delayed: DelayedTaskQueue, eventLoop: EventLoopImplBase): Int = synchronized<Int>(this) {
438             if (_heap === DISPOSED_TASK) return SCHEDULE_DISPOSED // don't add -- was already disposed
439             delayed.addLastIf(this) { firstTask ->
440                 if (eventLoop.isCompleted) return SCHEDULE_COMPLETED // non-local return from scheduleTask
441                 /**
442                  * We are about to add new task and we have to make sure that [DelayedTaskQueue]
443                  * invariant is maintained. The code in this lambda is additionally executed under
444                  * the lock of [DelayedTaskQueue] and working with [DelayedTaskQueue.timeNow] here is thread-safe.
445                  */
446                 if (firstTask == null) {
447                     /**
448                      * When adding the first delayed task we simply update queue's [DelayedTaskQueue.timeNow] to
449                      * the current now time even if that means "going backwards in time". This makes the structure
450                      * self-correcting in spite of wild jumps in `nanoTime()` measurements once all delayed tasks
451                      * are removed from the delayed queue for execution.
452                      */
453                     delayed.timeNow = now
454                 } else {
455                     /**
456                      * Carefully update [DelayedTaskQueue.timeNow] so that it does not sweep past first's tasks time
457                      * and only goes forward in time. We cannot let it go backwards in time or invariant can be
458                      * violated for tasks that were already scheduled.
459                      */
460                     val firstTime = firstTask.nanoTime
461                     // compute min(now, firstTime) using a wrap-safe check
462                     val minTime = if (firstTime - now >= 0) now else firstTime
463                     // update timeNow only when going forward in time
464                     if (minTime - delayed.timeNow > 0) delayed.timeNow = minTime
465                 }
466                 /**
467                  * Here [DelayedTaskQueue.timeNow] was already modified and we have to double-check that newly added
468                  * task does not violate [DelayedTaskQueue] invariant because of that. Note also that this scheduleTask
469                  * function can be called to reschedule from one queue to another and this might be another reason
470                  * where new task's time might now violate invariant.
471                  * We correct invariant violation (if any) by simply changing this task's time to now.
472                  */
473                 if (nanoTime - delayed.timeNow < 0) nanoTime = delayed.timeNow
474                 true
475             }
476             return SCHEDULE_OK
477         }
478 
<lambda>null479         final override fun dispose(): Unit = synchronized(this) {
480             val heap = _heap
481             if (heap === DISPOSED_TASK) return // already disposed
482             (heap as? DelayedTaskQueue)?.remove(this) // remove if it is in heap (first)
483             _heap = DISPOSED_TASK // never add again to any heap
484         }
485 
toStringnull486         override fun toString(): String = "Delayed[nanos=$nanoTime]"
487     }
488 
489     private inner class DelayedResumeTask(
490         nanoTime: Long,
491         private val cont: CancellableContinuation<Unit>
492     ) : DelayedTask(nanoTime) {
493         override fun run() { with(cont) { resumeUndispatched(Unit) } }
494         override fun toString(): String = super.toString() + cont.toString()
495     }
496 
497     private class DelayedRunnableTask(
498         nanoTime: Long,
499         private val block: Runnable
500     ) : DelayedTask(nanoTime) {
runnull501         override fun run() { block.run() }
toStringnull502         override fun toString(): String = super.toString() + block.toString()
503     }
504 
505     /**
506      * Delayed task queue maintains stable time-comparision invariant despite potential wraparounds in
507      * long nano time measurements by maintaining last observed [timeNow]. It protects the integrity of the
508      * heap data structure in spite of potential non-monotonicity of `nanoTime()` source.
509      * The invariant is that for every scheduled [DelayedTask]:
510      *
511      * ```
512      * delayedTask.nanoTime - timeNow >= 0
513      * ```
514      *
515      * So the comparison of scheduled tasks via [DelayedTask.compareTo] is always stable as
516      * scheduled [DelayedTask.nanoTime] can be at most [Long.MAX_VALUE] apart. This invariant is maintained when
517      * new tasks are added by [DelayedTask.scheduleTask] function and it cannot be violated when tasks are removed
518      * (so there is nothing special to do there).
519      */
520     internal class DelayedTaskQueue(
521         @JvmField var timeNow: Long
522     ) : ThreadSafeHeap<DelayedTask>()
523 }
524 
525 internal expect fun createEventLoop(): EventLoop
526 
527 internal expect fun nanoTime(): Long
528 
529 internal expect object DefaultExecutor {
530     fun enqueue(task: Runnable)
531 }
532 
533 /**
534  * Used by Darwin targets to wrap a [Runnable.run] call in an Objective-C Autorelease Pool. It is a no-op on JVM, JS and
535  * non-Darwin native targets.
536  *
537  * Coroutines on Darwin targets can call into the Objective-C world, where a callee may push a to-be-returned object to
538  * the Autorelease Pool, so as to avoid a premature ARC release before it reaches the caller. This means the pool must
539  * be eventually drained to avoid leaks. Since Kotlin Coroutines does not use [NSRunLoop], which provides automatic
540  * pool management, it must manage the pool creation and pool drainage manually.
541  */
platformAutoreleasePoolnull542 internal expect inline fun platformAutoreleasePool(crossinline block: () -> Unit)
543