• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download

<lambda>null1 package kotlinx.coroutines
2 
3 import kotlinx.atomicfu.*
4 import kotlinx.coroutines.internal.*
5 import kotlin.concurrent.Volatile
6 import kotlin.coroutines.*
7 import kotlin.jvm.*
8 
9 /**
10  * Extended by [CoroutineDispatcher] implementations that have event loop inside and can
11  * be asked to process next event from their event queue.
12  *
13  * It may optionally implement [Delay] interface and support time-scheduled tasks.
14  * It is created or pigged back onto (see [ThreadLocalEventLoop])
15  * by `runBlocking` and by [Dispatchers.Unconfined].
16  *
17  * @suppress **This an internal API and should not be used from general code.**
18  */
19 internal abstract class EventLoop : CoroutineDispatcher() {
20     /**
21      * Counts the number of nested `runBlocking` and [Dispatchers.Unconfined] that use this event loop.
22      */
23     private var useCount = 0L
24 
25     /**
26      * Set to true on any use by `runBlocking`, because it potentially leaks this loop to other threads, so
27      * this instance must be properly shutdown. We don't need to shutdown event loop that was used solely
28      * by [Dispatchers.Unconfined] -- it can be left as [ThreadLocalEventLoop] and reused next time.
29      */
30     private var shared = false
31 
32     /**
33      * Queue used by [Dispatchers.Unconfined] tasks.
34      * These tasks are thread-local for performance and take precedence over the rest of the queue.
35      */
36     private var unconfinedQueue: ArrayDeque<DispatchedTask<*>>? = null
37 
38     /**
39      * Processes next event in this event loop.
40      *
41      * The result of this function is to be interpreted like this:
42      * - `<= 0` -- there are potentially more events for immediate processing;
43      * - `> 0` -- a number of nanoseconds to wait for next scheduled event;
44      * - [Long.MAX_VALUE] -- no more events.
45      *
46      * **NOTE**: Must be invoked only from the event loop's thread
47      *          (no check for performance reasons, may be added in the future).
48      */
49     open fun processNextEvent(): Long {
50         if (!processUnconfinedEvent()) return Long.MAX_VALUE
51         return 0
52     }
53 
54     protected open val isEmpty: Boolean get() = isUnconfinedQueueEmpty
55 
56     protected open val nextTime: Long
57         get() {
58             val queue = unconfinedQueue ?: return Long.MAX_VALUE
59             return if (queue.isEmpty()) Long.MAX_VALUE else 0L
60         }
61 
62     fun processUnconfinedEvent(): Boolean {
63         val queue = unconfinedQueue ?: return false
64         val task = queue.removeFirstOrNull() ?: return false
65         task.run()
66         return true
67     }
68     /**
69      * Returns `true` if the invoking `runBlocking(context) { ... }` that was passed this event loop in its context
70      * parameter should call [processNextEvent] for this event loop (otherwise, it will process thread-local one).
71      * By default, event loop implementation is thread-local and should not processed in the context
72      * (current thread's event loop should be processed instead).
73      */
74     open fun shouldBeProcessedFromContext(): Boolean = false
75 
76     /**
77      * Dispatches task whose dispatcher returned `false` from [CoroutineDispatcher.isDispatchNeeded]
78      * into the current event loop.
79      */
80     fun dispatchUnconfined(task: DispatchedTask<*>) {
81         val queue = unconfinedQueue ?:
82             ArrayDeque<DispatchedTask<*>>().also { unconfinedQueue = it }
83         queue.addLast(task)
84     }
85 
86     val isActive: Boolean
87         get() = useCount > 0
88 
89     val isUnconfinedLoopActive: Boolean
90         get() = useCount >= delta(unconfined = true)
91 
92     // May only be used from the event loop's thread
93     val isUnconfinedQueueEmpty: Boolean
94         get() = unconfinedQueue?.isEmpty() ?: true
95 
96     private fun delta(unconfined: Boolean) =
97         if (unconfined) (1L shl 32) else 1L
98 
99     fun incrementUseCount(unconfined: Boolean = false) {
100         useCount += delta(unconfined)
101         if (!unconfined) shared = true
102     }
103 
104     fun decrementUseCount(unconfined: Boolean = false) {
105         useCount -= delta(unconfined)
106         if (useCount > 0) return
107         assert { useCount == 0L } // "Extra decrementUseCount"
108         if (shared) {
109             // shut it down and remove from ThreadLocalEventLoop
110             shutdown()
111         }
112     }
113 
114     final override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher {
115         parallelism.checkParallelism()
116         return namedOrThis(name) // Single-threaded, short-circuit
117     }
118 
119     open fun shutdown() {}
120 }
121 
122 internal object ThreadLocalEventLoop {
123     private val ref = commonThreadLocal<EventLoop?>(Symbol("ThreadLocalEventLoop"))
124 
125     internal val eventLoop: EventLoop
<lambda>null126         get() = ref.get() ?: createEventLoop().also { ref.set(it) }
127 
currentOrNullnull128     internal fun currentOrNull(): EventLoop? =
129         ref.get()
130 
131     internal fun resetEventLoop() {
132         ref.set(null)
133     }
134 
setEventLoopnull135     internal fun setEventLoop(eventLoop: EventLoop) {
136         ref.set(eventLoop)
137     }
138 }
139 
140 private val DISPOSED_TASK = Symbol("REMOVED_TASK")
141 
142 // results for scheduleImpl
143 private const val SCHEDULE_OK = 0
144 private const val SCHEDULE_COMPLETED = 1
145 private const val SCHEDULE_DISPOSED = 2
146 
147 private const val MS_TO_NS = 1_000_000L
148 private const val MAX_MS = Long.MAX_VALUE / MS_TO_NS
149 
150 /**
151  * First-line overflow protection -- limit maximal delay.
152  * Delays longer than this one (~146 years) are considered to be delayed "forever".
153  */
154 private const val MAX_DELAY_NS = Long.MAX_VALUE / 2
155 
delayToNanosnull156 internal fun delayToNanos(timeMillis: Long): Long = when {
157     timeMillis <= 0 -> 0L
158     timeMillis >= MAX_MS -> Long.MAX_VALUE
159     else -> timeMillis * MS_TO_NS
160 }
161 
delayNanosToMillisnull162 internal fun delayNanosToMillis(timeNanos: Long): Long =
163     timeNanos / MS_TO_NS
164 
165 private val CLOSED_EMPTY = Symbol("CLOSED_EMPTY")
166 
167 private typealias Queue<T> = LockFreeTaskQueueCore<T>
168 
169 internal expect abstract class EventLoopImplPlatform() : EventLoop {
170     // Called to unpark this event loop's thread
171     protected fun unpark()
172 
173     // Called to reschedule to DefaultExecutor when this event loop is complete
174     protected fun reschedule(now: Long, delayedTask: EventLoopImplBase.DelayedTask)
175 }
176 
177 internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
178     // null | CLOSED_EMPTY | task | Queue<Runnable>
179     private val _queue = atomic<Any?>(null)
180 
181     // Allocated only only once
182     private val _delayed = atomic<DelayedTaskQueue?>(null)
183 
184     private val _isCompleted = atomic(false)
185     private var isCompleted
186         get() = _isCompleted.value
187         set(value) { _isCompleted.value = value }
188 
189     override val isEmpty: Boolean get() {
190         if (!isUnconfinedQueueEmpty) return false
191         val delayed = _delayed.value
192         if (delayed != null && !delayed.isEmpty) return false
193         return when (val queue = _queue.value) {
194             null -> true
195             is Queue<*> -> queue.isEmpty
196             else -> queue === CLOSED_EMPTY
197         }
198     }
199 
200     override val nextTime: Long
201         get() {
202             if (super.nextTime == 0L) return 0L
203             val queue = _queue.value
204             when {
205                 queue === null -> {} // empty queue -- proceed
206                 queue is Queue<*> -> if (!queue.isEmpty) return 0 // non-empty queue
207                 queue === CLOSED_EMPTY -> return Long.MAX_VALUE // no more events -- closed
208                 else -> return 0 // non-empty queue
209             }
210             val nextDelayedTask = _delayed.value?.peek() ?: return Long.MAX_VALUE
211             return (nextDelayedTask.nanoTime - nanoTime()).coerceAtLeast(0)
212         }
213 
shutdownnull214     override fun shutdown() {
215         // Clean up thread-local reference here -- this event loop is shutting down
216         ThreadLocalEventLoop.resetEventLoop()
217         // We should signal that this event loop should not accept any more tasks
218         // and process queued events (that could have been added after last processNextEvent)
219         isCompleted = true
220         closeQueue()
221         // complete processing of all queued tasks
222         while (processNextEvent() <= 0) { /* spin */ }
223         // reschedule the rest of delayed tasks
224         rescheduleAllDelayed()
225     }
226 
scheduleResumeAfterDelaynull227     override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
228         val timeNanos = delayToNanos(timeMillis)
229         if (timeNanos < MAX_DELAY_NS) {
230             val now = nanoTime()
231             DelayedResumeTask(now + timeNanos, continuation).also { task ->
232                 /*
233                  * Order is important here: first we schedule the heap and only then
234                  * publish it to continuation. Otherwise, `DelayedResumeTask` would
235                  * have to know how to be disposed of even when it wasn't scheduled yet.
236                  */
237                 schedule(now, task)
238                 continuation.disposeOnCancellation(task)
239             }
240         }
241     }
242 
scheduleInvokeOnTimeoutnull243     protected fun scheduleInvokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
244         val timeNanos = delayToNanos(timeMillis)
245         return if (timeNanos < MAX_DELAY_NS) {
246             val now = nanoTime()
247             DelayedRunnableTask(now + timeNanos, block).also { task ->
248                 schedule(now, task)
249             }
250         } else {
251             NonDisposableHandle
252         }
253     }
254 
processNextEventnull255     override fun processNextEvent(): Long {
256         // unconfined events take priority
257         if (processUnconfinedEvent()) return 0
258         // queue all delayed tasks that are due to be executed
259         enqueueDelayedTasks()
260         // then process one event from queue
261         val task = dequeue()
262         if (task != null) {
263             platformAutoreleasePool { task.run() }
264             return 0
265         }
266         return nextTime
267     }
268 
dispatchnull269     final override fun dispatch(context: CoroutineContext, block: Runnable) = enqueue(block)
270 
271     open fun enqueue(task: Runnable) {
272         // are there some delayed tasks that should execute before this one? If so, move them to the queue first.
273         enqueueDelayedTasks()
274         if (enqueueImpl(task)) {
275             // todo: we should unpark only when this delayed task became first in the queue
276             unpark()
277         } else {
278             DefaultExecutor.enqueue(task)
279         }
280     }
281 
282     @Suppress("UNCHECKED_CAST")
enqueueImplnull283     private fun enqueueImpl(task: Runnable): Boolean {
284         _queue.loop { queue ->
285             if (isCompleted) return false // fail fast if already completed, may still add, but queues will close
286             when (queue) {
287                 null -> if (_queue.compareAndSet(null, task)) return true
288                 is Queue<*> -> {
289                     when ((queue as Queue<Runnable>).addLast(task)) {
290                         Queue.ADD_SUCCESS -> return true
291                         Queue.ADD_CLOSED -> return false
292                         Queue.ADD_FROZEN -> _queue.compareAndSet(queue, queue.next())
293                     }
294                 }
295                 else -> when {
296                     queue === CLOSED_EMPTY -> return false
297                     else -> {
298                         // update to full-blown queue to add one more
299                         val newQueue = Queue<Runnable>(Queue.INITIAL_CAPACITY, singleConsumer = true)
300                         newQueue.addLast(queue as Runnable)
301                         newQueue.addLast(task)
302                         if (_queue.compareAndSet(queue, newQueue)) return true
303                     }
304                 }
305             }
306         }
307     }
308 
309     @Suppress("UNCHECKED_CAST")
dequeuenull310     private fun dequeue(): Runnable? {
311         _queue.loop { queue ->
312             when (queue) {
313                 null -> return null
314                 is Queue<*> -> {
315                     val result = (queue as Queue<Runnable>).removeFirstOrNull()
316                     if (result !== Queue.REMOVE_FROZEN) return result as Runnable?
317                     _queue.compareAndSet(queue, queue.next())
318                 }
319                 else -> when {
320                     queue === CLOSED_EMPTY -> return null
321                     else -> if (_queue.compareAndSet(queue, null)) return queue as Runnable
322                 }
323             }
324         }
325     }
326 
327     /** Move all delayed tasks that are due to the main queue. */
enqueueDelayedTasksnull328     private fun enqueueDelayedTasks() {
329         val delayed = _delayed.value
330         if (delayed != null && !delayed.isEmpty) {
331             val now = nanoTime()
332             while (true) {
333                 // make sure that moving from delayed to queue removes from delayed only after it is added to queue
334                 // to make sure that 'isEmpty' and `nextTime` that check both of them
335                 // do not transiently report that both delayed and queue are empty during move
336                 delayed.removeFirstIf {
337                     if (it.timeToExecute(now)) {
338                         enqueueImpl(it)
339                     } else
340                         false
341                 } ?: break // quit loop when nothing more to remove or enqueueImpl returns false on "isComplete"
342             }
343         }
344     }
345 
closeQueuenull346     private fun closeQueue() {
347         assert { isCompleted }
348         _queue.loop { queue ->
349             when (queue) {
350                 null -> if (_queue.compareAndSet(null, CLOSED_EMPTY)) return
351                 is Queue<*> -> {
352                     queue.close()
353                     return
354                 }
355                 else -> when {
356                     queue === CLOSED_EMPTY -> return
357                     else -> {
358                         // update to full-blown queue to close
359                         val newQueue = Queue<Runnable>(Queue.INITIAL_CAPACITY, singleConsumer = true)
360                         newQueue.addLast(queue as Runnable)
361                         if (_queue.compareAndSet(queue, newQueue)) return
362                     }
363                 }
364             }
365         }
366 
367     }
368 
schedulenull369     fun schedule(now: Long, delayedTask: DelayedTask) {
370         when (scheduleImpl(now, delayedTask)) {
371             SCHEDULE_OK -> if (shouldUnpark(delayedTask)) unpark()
372             SCHEDULE_COMPLETED -> reschedule(now, delayedTask)
373             SCHEDULE_DISPOSED -> {} // do nothing -- task was already disposed
374             else -> error("unexpected result")
375         }
376     }
377 
shouldUnparknull378     private fun shouldUnpark(task: DelayedTask): Boolean = _delayed.value?.peek() === task
379 
380     private fun scheduleImpl(now: Long, delayedTask: DelayedTask): Int {
381         if (isCompleted) return SCHEDULE_COMPLETED
382         val delayedQueue = _delayed.value ?: run {
383             _delayed.compareAndSet(null, DelayedTaskQueue(now))
384             _delayed.value!!
385         }
386         return delayedTask.scheduleTask(now, delayedQueue, this)
387     }
388 
389     // It performs "hard" shutdown for test cleanup purposes
resetAllnull390     protected fun resetAll() {
391         _queue.value = null
392         _delayed.value = null
393     }
394 
395     // This is a "soft" (normal) shutdown
rescheduleAllDelayednull396     private fun rescheduleAllDelayed() {
397         val now = nanoTime()
398         while (true) {
399             /*
400              * `removeFirstOrNull` below is the only operation on DelayedTask & ThreadSafeHeap that is not
401              * synchronized on DelayedTask itself. All other operation are synchronized both on
402              * DelayedTask & ThreadSafeHeap instances (in this order). It is still safe, because `dispose`
403              * first removes DelayedTask from the heap (under synchronization) then
404              * assign "_heap = DISPOSED_TASK", so there cannot be ever a race to _heap reference update.
405              */
406             val delayedTask = _delayed.value?.removeFirstOrNull() ?: break
407             reschedule(now, delayedTask)
408         }
409     }
410 
411     internal abstract class DelayedTask(
412         /**
413          * This field can be only modified in [scheduleTask] before putting this DelayedTask
414          * into heap to avoid overflow and corruption of heap data structure.
415          */
416         @JvmField var nanoTime: Long
417     ) : Runnable, Comparable<DelayedTask>, DisposableHandle, ThreadSafeHeapNode, SynchronizedObject() {
418         @Volatile
419         private var _heap: Any? = null // null | ThreadSafeHeap | DISPOSED_TASK
420 
421         override var heap: ThreadSafeHeap<*>?
422             get() = _heap as? ThreadSafeHeap<*>
423             set(value) {
424                 require(_heap !== DISPOSED_TASK) // this can never happen, it is always checked before adding/removing
425                 _heap = value
426             }
427 
428         override var index: Int = -1
429 
compareTonull430         override fun compareTo(other: DelayedTask): Int {
431             val dTime = nanoTime - other.nanoTime
432             return when {
433                 dTime > 0 -> 1
434                 dTime < 0 -> -1
435                 else -> 0
436             }
437         }
438 
timeToExecutenull439         fun timeToExecute(now: Long): Boolean = now - nanoTime >= 0L
440 
441         fun scheduleTask(now: Long, delayed: DelayedTaskQueue, eventLoop: EventLoopImplBase): Int = synchronized<Int>(this) {
442             if (_heap === DISPOSED_TASK) return SCHEDULE_DISPOSED // don't add -- was already disposed
443             delayed.addLastIf(this) { firstTask ->
444                 if (eventLoop.isCompleted) return SCHEDULE_COMPLETED // non-local return from scheduleTask
445                 /**
446                  * We are about to add new task and we have to make sure that [DelayedTaskQueue]
447                  * invariant is maintained. The code in this lambda is additionally executed under
448                  * the lock of [DelayedTaskQueue] and working with [DelayedTaskQueue.timeNow] here is thread-safe.
449                  */
450                 if (firstTask == null) {
451                     /**
452                      * When adding the first delayed task we simply update queue's [DelayedTaskQueue.timeNow] to
453                      * the current now time even if that means "going backwards in time". This makes the structure
454                      * self-correcting in spite of wild jumps in `nanoTime()` measurements once all delayed tasks
455                      * are removed from the delayed queue for execution.
456                      */
457                     delayed.timeNow = now
458                 } else {
459                     /**
460                      * Carefully update [DelayedTaskQueue.timeNow] so that it does not sweep past first's tasks time
461                      * and only goes forward in time. We cannot let it go backwards in time or invariant can be
462                      * violated for tasks that were already scheduled.
463                      */
464                     val firstTime = firstTask.nanoTime
465                     // compute min(now, firstTime) using a wrap-safe check
466                     val minTime = if (firstTime - now >= 0) now else firstTime
467                     // update timeNow only when going forward in time
468                     if (minTime - delayed.timeNow > 0) delayed.timeNow = minTime
469                 }
470                 /**
471                  * Here [DelayedTaskQueue.timeNow] was already modified and we have to double-check that newly added
472                  * task does not violate [DelayedTaskQueue] invariant because of that. Note also that this scheduleTask
473                  * function can be called to reschedule from one queue to another and this might be another reason
474                  * where new task's time might now violate invariant.
475                  * We correct invariant violation (if any) by simply changing this task's time to now.
476                  */
477                 if (nanoTime - delayed.timeNow < 0) nanoTime = delayed.timeNow
478                 true
479             }
480             return SCHEDULE_OK
481         }
482 
<lambda>null483         final override fun dispose(): Unit = synchronized(this) {
484             val heap = _heap
485             if (heap === DISPOSED_TASK) return // already disposed
486             (heap as? DelayedTaskQueue)?.remove(this) // remove if it is in heap (first)
487             _heap = DISPOSED_TASK // never add again to any heap
488         }
489 
toStringnull490         override fun toString(): String = "Delayed[nanos=$nanoTime]"
491     }
492 
493     private inner class DelayedResumeTask(
494         nanoTime: Long,
495         private val cont: CancellableContinuation<Unit>
496     ) : DelayedTask(nanoTime) {
497         override fun run() { with(cont) { resumeUndispatched(Unit) } }
498         override fun toString(): String = super.toString() + cont.toString()
499     }
500 
501     private class DelayedRunnableTask(
502         nanoTime: Long,
503         private val block: Runnable
504     ) : DelayedTask(nanoTime) {
runnull505         override fun run() { block.run() }
toStringnull506         override fun toString(): String = super.toString() + block.toString()
507     }
508 
509     /**
510      * Delayed task queue maintains stable time-comparision invariant despite potential wraparounds in
511      * long nano time measurements by maintaining last observed [timeNow]. It protects the integrity of the
512      * heap data structure in spite of potential non-monotonicity of `nanoTime()` source.
513      * The invariant is that for every scheduled [DelayedTask]:
514      *
515      * ```
516      * delayedTask.nanoTime - timeNow >= 0
517      * ```
518      *
519      * So the comparison of scheduled tasks via [DelayedTask.compareTo] is always stable as
520      * scheduled [DelayedTask.nanoTime] can be at most [Long.MAX_VALUE] apart. This invariant is maintained when
521      * new tasks are added by [DelayedTask.scheduleTask] function and it cannot be violated when tasks are removed
522      * (so there is nothing special to do there).
523      */
524     internal class DelayedTaskQueue(
525         @JvmField var timeNow: Long
526     ) : ThreadSafeHeap<DelayedTask>()
527 }
528 
529 internal expect fun createEventLoop(): EventLoop
530 
531 internal expect fun nanoTime(): Long
532 
533 internal expect object DefaultExecutor {
534     fun enqueue(task: Runnable)
535 }
536 
537 /**
538  * Used by Darwin targets to wrap a [Runnable.run] call in an Objective-C Autorelease Pool. It is a no-op on JVM, JS and
539  * non-Darwin native targets.
540  *
541  * Coroutines on Darwin targets can call into the Objective-C world, where a callee may push a to-be-returned object to
542  * the Autorelease Pool, so as to avoid a premature ARC release before it reaches the caller. This means the pool must
543  * be eventually drained to avoid leaks. Since Kotlin Coroutines does not use [NSRunLoop], which provides automatic
544  * pool management, it must manage the pool creation and pool drainage manually.
545  */
platformAutoreleasePoolnull546 internal expect inline fun platformAutoreleasePool(crossinline block: () -> Unit)
547