<lambda>null1 package kotlinx.coroutines
2
3 import kotlinx.atomicfu.*
4 import kotlinx.coroutines.internal.*
5 import kotlin.concurrent.Volatile
6 import kotlin.coroutines.*
7 import kotlin.jvm.*
8
9 /**
10 * Extended by [CoroutineDispatcher] implementations that have event loop inside and can
11 * be asked to process next event from their event queue.
12 *
13 * It may optionally implement [Delay] interface and support time-scheduled tasks.
14 * It is created or pigged back onto (see [ThreadLocalEventLoop])
15 * by `runBlocking` and by [Dispatchers.Unconfined].
16 *
17 * @suppress **This an internal API and should not be used from general code.**
18 */
19 internal abstract class EventLoop : CoroutineDispatcher() {
20 /**
21 * Counts the number of nested `runBlocking` and [Dispatchers.Unconfined] that use this event loop.
22 */
23 private var useCount = 0L
24
25 /**
26 * Set to true on any use by `runBlocking`, because it potentially leaks this loop to other threads, so
27 * this instance must be properly shutdown. We don't need to shutdown event loop that was used solely
28 * by [Dispatchers.Unconfined] -- it can be left as [ThreadLocalEventLoop] and reused next time.
29 */
30 private var shared = false
31
32 /**
33 * Queue used by [Dispatchers.Unconfined] tasks.
34 * These tasks are thread-local for performance and take precedence over the rest of the queue.
35 */
36 private var unconfinedQueue: ArrayDeque<DispatchedTask<*>>? = null
37
38 /**
39 * Processes next event in this event loop.
40 *
41 * The result of this function is to be interpreted like this:
42 * - `<= 0` -- there are potentially more events for immediate processing;
43 * - `> 0` -- a number of nanoseconds to wait for next scheduled event;
44 * - [Long.MAX_VALUE] -- no more events.
45 *
46 * **NOTE**: Must be invoked only from the event loop's thread
47 * (no check for performance reasons, may be added in the future).
48 */
49 open fun processNextEvent(): Long {
50 if (!processUnconfinedEvent()) return Long.MAX_VALUE
51 return 0
52 }
53
54 protected open val isEmpty: Boolean get() = isUnconfinedQueueEmpty
55
56 protected open val nextTime: Long
57 get() {
58 val queue = unconfinedQueue ?: return Long.MAX_VALUE
59 return if (queue.isEmpty()) Long.MAX_VALUE else 0L
60 }
61
62 fun processUnconfinedEvent(): Boolean {
63 val queue = unconfinedQueue ?: return false
64 val task = queue.removeFirstOrNull() ?: return false
65 task.run()
66 return true
67 }
68 /**
69 * Returns `true` if the invoking `runBlocking(context) { ... }` that was passed this event loop in its context
70 * parameter should call [processNextEvent] for this event loop (otherwise, it will process thread-local one).
71 * By default, event loop implementation is thread-local and should not processed in the context
72 * (current thread's event loop should be processed instead).
73 */
74 open fun shouldBeProcessedFromContext(): Boolean = false
75
76 /**
77 * Dispatches task whose dispatcher returned `false` from [CoroutineDispatcher.isDispatchNeeded]
78 * into the current event loop.
79 */
80 fun dispatchUnconfined(task: DispatchedTask<*>) {
81 val queue = unconfinedQueue ?:
82 ArrayDeque<DispatchedTask<*>>().also { unconfinedQueue = it }
83 queue.addLast(task)
84 }
85
86 val isActive: Boolean
87 get() = useCount > 0
88
89 val isUnconfinedLoopActive: Boolean
90 get() = useCount >= delta(unconfined = true)
91
92 // May only be used from the event loop's thread
93 val isUnconfinedQueueEmpty: Boolean
94 get() = unconfinedQueue?.isEmpty() ?: true
95
96 private fun delta(unconfined: Boolean) =
97 if (unconfined) (1L shl 32) else 1L
98
99 fun incrementUseCount(unconfined: Boolean = false) {
100 useCount += delta(unconfined)
101 if (!unconfined) shared = true
102 }
103
104 fun decrementUseCount(unconfined: Boolean = false) {
105 useCount -= delta(unconfined)
106 if (useCount > 0) return
107 assert { useCount == 0L } // "Extra decrementUseCount"
108 if (shared) {
109 // shut it down and remove from ThreadLocalEventLoop
110 shutdown()
111 }
112 }
113
114 final override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher {
115 parallelism.checkParallelism()
116 return namedOrThis(name) // Single-threaded, short-circuit
117 }
118
119 open fun shutdown() {}
120 }
121
122 internal object ThreadLocalEventLoop {
123 private val ref = commonThreadLocal<EventLoop?>(Symbol("ThreadLocalEventLoop"))
124
125 internal val eventLoop: EventLoop
<lambda>null126 get() = ref.get() ?: createEventLoop().also { ref.set(it) }
127
currentOrNullnull128 internal fun currentOrNull(): EventLoop? =
129 ref.get()
130
131 internal fun resetEventLoop() {
132 ref.set(null)
133 }
134
setEventLoopnull135 internal fun setEventLoop(eventLoop: EventLoop) {
136 ref.set(eventLoop)
137 }
138 }
139
140 private val DISPOSED_TASK = Symbol("REMOVED_TASK")
141
142 // results for scheduleImpl
143 private const val SCHEDULE_OK = 0
144 private const val SCHEDULE_COMPLETED = 1
145 private const val SCHEDULE_DISPOSED = 2
146
147 private const val MS_TO_NS = 1_000_000L
148 private const val MAX_MS = Long.MAX_VALUE / MS_TO_NS
149
150 /**
151 * First-line overflow protection -- limit maximal delay.
152 * Delays longer than this one (~146 years) are considered to be delayed "forever".
153 */
154 private const val MAX_DELAY_NS = Long.MAX_VALUE / 2
155
delayToNanosnull156 internal fun delayToNanos(timeMillis: Long): Long = when {
157 timeMillis <= 0 -> 0L
158 timeMillis >= MAX_MS -> Long.MAX_VALUE
159 else -> timeMillis * MS_TO_NS
160 }
161
delayNanosToMillisnull162 internal fun delayNanosToMillis(timeNanos: Long): Long =
163 timeNanos / MS_TO_NS
164
165 private val CLOSED_EMPTY = Symbol("CLOSED_EMPTY")
166
167 private typealias Queue<T> = LockFreeTaskQueueCore<T>
168
169 internal expect abstract class EventLoopImplPlatform() : EventLoop {
170 // Called to unpark this event loop's thread
171 protected fun unpark()
172
173 // Called to reschedule to DefaultExecutor when this event loop is complete
174 protected fun reschedule(now: Long, delayedTask: EventLoopImplBase.DelayedTask)
175 }
176
177 internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
178 // null | CLOSED_EMPTY | task | Queue<Runnable>
179 private val _queue = atomic<Any?>(null)
180
181 // Allocated only only once
182 private val _delayed = atomic<DelayedTaskQueue?>(null)
183
184 private val _isCompleted = atomic(false)
185 private var isCompleted
186 get() = _isCompleted.value
187 set(value) { _isCompleted.value = value }
188
189 override val isEmpty: Boolean get() {
190 if (!isUnconfinedQueueEmpty) return false
191 val delayed = _delayed.value
192 if (delayed != null && !delayed.isEmpty) return false
193 return when (val queue = _queue.value) {
194 null -> true
195 is Queue<*> -> queue.isEmpty
196 else -> queue === CLOSED_EMPTY
197 }
198 }
199
200 override val nextTime: Long
201 get() {
202 if (super.nextTime == 0L) return 0L
203 val queue = _queue.value
204 when {
205 queue === null -> {} // empty queue -- proceed
206 queue is Queue<*> -> if (!queue.isEmpty) return 0 // non-empty queue
207 queue === CLOSED_EMPTY -> return Long.MAX_VALUE // no more events -- closed
208 else -> return 0 // non-empty queue
209 }
210 val nextDelayedTask = _delayed.value?.peek() ?: return Long.MAX_VALUE
211 return (nextDelayedTask.nanoTime - nanoTime()).coerceAtLeast(0)
212 }
213
shutdownnull214 override fun shutdown() {
215 // Clean up thread-local reference here -- this event loop is shutting down
216 ThreadLocalEventLoop.resetEventLoop()
217 // We should signal that this event loop should not accept any more tasks
218 // and process queued events (that could have been added after last processNextEvent)
219 isCompleted = true
220 closeQueue()
221 // complete processing of all queued tasks
222 while (processNextEvent() <= 0) { /* spin */ }
223 // reschedule the rest of delayed tasks
224 rescheduleAllDelayed()
225 }
226
scheduleResumeAfterDelaynull227 override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
228 val timeNanos = delayToNanos(timeMillis)
229 if (timeNanos < MAX_DELAY_NS) {
230 val now = nanoTime()
231 DelayedResumeTask(now + timeNanos, continuation).also { task ->
232 /*
233 * Order is important here: first we schedule the heap and only then
234 * publish it to continuation. Otherwise, `DelayedResumeTask` would
235 * have to know how to be disposed of even when it wasn't scheduled yet.
236 */
237 schedule(now, task)
238 continuation.disposeOnCancellation(task)
239 }
240 }
241 }
242
scheduleInvokeOnTimeoutnull243 protected fun scheduleInvokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
244 val timeNanos = delayToNanos(timeMillis)
245 return if (timeNanos < MAX_DELAY_NS) {
246 val now = nanoTime()
247 DelayedRunnableTask(now + timeNanos, block).also { task ->
248 schedule(now, task)
249 }
250 } else {
251 NonDisposableHandle
252 }
253 }
254
processNextEventnull255 override fun processNextEvent(): Long {
256 // unconfined events take priority
257 if (processUnconfinedEvent()) return 0
258 // queue all delayed tasks that are due to be executed
259 enqueueDelayedTasks()
260 // then process one event from queue
261 val task = dequeue()
262 if (task != null) {
263 platformAutoreleasePool { task.run() }
264 return 0
265 }
266 return nextTime
267 }
268
dispatchnull269 final override fun dispatch(context: CoroutineContext, block: Runnable) = enqueue(block)
270
271 open fun enqueue(task: Runnable) {
272 // are there some delayed tasks that should execute before this one? If so, move them to the queue first.
273 enqueueDelayedTasks()
274 if (enqueueImpl(task)) {
275 // todo: we should unpark only when this delayed task became first in the queue
276 unpark()
277 } else {
278 DefaultExecutor.enqueue(task)
279 }
280 }
281
282 @Suppress("UNCHECKED_CAST")
enqueueImplnull283 private fun enqueueImpl(task: Runnable): Boolean {
284 _queue.loop { queue ->
285 if (isCompleted) return false // fail fast if already completed, may still add, but queues will close
286 when (queue) {
287 null -> if (_queue.compareAndSet(null, task)) return true
288 is Queue<*> -> {
289 when ((queue as Queue<Runnable>).addLast(task)) {
290 Queue.ADD_SUCCESS -> return true
291 Queue.ADD_CLOSED -> return false
292 Queue.ADD_FROZEN -> _queue.compareAndSet(queue, queue.next())
293 }
294 }
295 else -> when {
296 queue === CLOSED_EMPTY -> return false
297 else -> {
298 // update to full-blown queue to add one more
299 val newQueue = Queue<Runnable>(Queue.INITIAL_CAPACITY, singleConsumer = true)
300 newQueue.addLast(queue as Runnable)
301 newQueue.addLast(task)
302 if (_queue.compareAndSet(queue, newQueue)) return true
303 }
304 }
305 }
306 }
307 }
308
309 @Suppress("UNCHECKED_CAST")
dequeuenull310 private fun dequeue(): Runnable? {
311 _queue.loop { queue ->
312 when (queue) {
313 null -> return null
314 is Queue<*> -> {
315 val result = (queue as Queue<Runnable>).removeFirstOrNull()
316 if (result !== Queue.REMOVE_FROZEN) return result as Runnable?
317 _queue.compareAndSet(queue, queue.next())
318 }
319 else -> when {
320 queue === CLOSED_EMPTY -> return null
321 else -> if (_queue.compareAndSet(queue, null)) return queue as Runnable
322 }
323 }
324 }
325 }
326
327 /** Move all delayed tasks that are due to the main queue. */
enqueueDelayedTasksnull328 private fun enqueueDelayedTasks() {
329 val delayed = _delayed.value
330 if (delayed != null && !delayed.isEmpty) {
331 val now = nanoTime()
332 while (true) {
333 // make sure that moving from delayed to queue removes from delayed only after it is added to queue
334 // to make sure that 'isEmpty' and `nextTime` that check both of them
335 // do not transiently report that both delayed and queue are empty during move
336 delayed.removeFirstIf {
337 if (it.timeToExecute(now)) {
338 enqueueImpl(it)
339 } else
340 false
341 } ?: break // quit loop when nothing more to remove or enqueueImpl returns false on "isComplete"
342 }
343 }
344 }
345
closeQueuenull346 private fun closeQueue() {
347 assert { isCompleted }
348 _queue.loop { queue ->
349 when (queue) {
350 null -> if (_queue.compareAndSet(null, CLOSED_EMPTY)) return
351 is Queue<*> -> {
352 queue.close()
353 return
354 }
355 else -> when {
356 queue === CLOSED_EMPTY -> return
357 else -> {
358 // update to full-blown queue to close
359 val newQueue = Queue<Runnable>(Queue.INITIAL_CAPACITY, singleConsumer = true)
360 newQueue.addLast(queue as Runnable)
361 if (_queue.compareAndSet(queue, newQueue)) return
362 }
363 }
364 }
365 }
366
367 }
368
schedulenull369 fun schedule(now: Long, delayedTask: DelayedTask) {
370 when (scheduleImpl(now, delayedTask)) {
371 SCHEDULE_OK -> if (shouldUnpark(delayedTask)) unpark()
372 SCHEDULE_COMPLETED -> reschedule(now, delayedTask)
373 SCHEDULE_DISPOSED -> {} // do nothing -- task was already disposed
374 else -> error("unexpected result")
375 }
376 }
377
shouldUnparknull378 private fun shouldUnpark(task: DelayedTask): Boolean = _delayed.value?.peek() === task
379
380 private fun scheduleImpl(now: Long, delayedTask: DelayedTask): Int {
381 if (isCompleted) return SCHEDULE_COMPLETED
382 val delayedQueue = _delayed.value ?: run {
383 _delayed.compareAndSet(null, DelayedTaskQueue(now))
384 _delayed.value!!
385 }
386 return delayedTask.scheduleTask(now, delayedQueue, this)
387 }
388
389 // It performs "hard" shutdown for test cleanup purposes
resetAllnull390 protected fun resetAll() {
391 _queue.value = null
392 _delayed.value = null
393 }
394
395 // This is a "soft" (normal) shutdown
rescheduleAllDelayednull396 private fun rescheduleAllDelayed() {
397 val now = nanoTime()
398 while (true) {
399 /*
400 * `removeFirstOrNull` below is the only operation on DelayedTask & ThreadSafeHeap that is not
401 * synchronized on DelayedTask itself. All other operation are synchronized both on
402 * DelayedTask & ThreadSafeHeap instances (in this order). It is still safe, because `dispose`
403 * first removes DelayedTask from the heap (under synchronization) then
404 * assign "_heap = DISPOSED_TASK", so there cannot be ever a race to _heap reference update.
405 */
406 val delayedTask = _delayed.value?.removeFirstOrNull() ?: break
407 reschedule(now, delayedTask)
408 }
409 }
410
411 internal abstract class DelayedTask(
412 /**
413 * This field can be only modified in [scheduleTask] before putting this DelayedTask
414 * into heap to avoid overflow and corruption of heap data structure.
415 */
416 @JvmField var nanoTime: Long
417 ) : Runnable, Comparable<DelayedTask>, DisposableHandle, ThreadSafeHeapNode, SynchronizedObject() {
418 @Volatile
419 private var _heap: Any? = null // null | ThreadSafeHeap | DISPOSED_TASK
420
421 override var heap: ThreadSafeHeap<*>?
422 get() = _heap as? ThreadSafeHeap<*>
423 set(value) {
424 require(_heap !== DISPOSED_TASK) // this can never happen, it is always checked before adding/removing
425 _heap = value
426 }
427
428 override var index: Int = -1
429
compareTonull430 override fun compareTo(other: DelayedTask): Int {
431 val dTime = nanoTime - other.nanoTime
432 return when {
433 dTime > 0 -> 1
434 dTime < 0 -> -1
435 else -> 0
436 }
437 }
438
timeToExecutenull439 fun timeToExecute(now: Long): Boolean = now - nanoTime >= 0L
440
441 fun scheduleTask(now: Long, delayed: DelayedTaskQueue, eventLoop: EventLoopImplBase): Int = synchronized<Int>(this) {
442 if (_heap === DISPOSED_TASK) return SCHEDULE_DISPOSED // don't add -- was already disposed
443 delayed.addLastIf(this) { firstTask ->
444 if (eventLoop.isCompleted) return SCHEDULE_COMPLETED // non-local return from scheduleTask
445 /**
446 * We are about to add new task and we have to make sure that [DelayedTaskQueue]
447 * invariant is maintained. The code in this lambda is additionally executed under
448 * the lock of [DelayedTaskQueue] and working with [DelayedTaskQueue.timeNow] here is thread-safe.
449 */
450 if (firstTask == null) {
451 /**
452 * When adding the first delayed task we simply update queue's [DelayedTaskQueue.timeNow] to
453 * the current now time even if that means "going backwards in time". This makes the structure
454 * self-correcting in spite of wild jumps in `nanoTime()` measurements once all delayed tasks
455 * are removed from the delayed queue for execution.
456 */
457 delayed.timeNow = now
458 } else {
459 /**
460 * Carefully update [DelayedTaskQueue.timeNow] so that it does not sweep past first's tasks time
461 * and only goes forward in time. We cannot let it go backwards in time or invariant can be
462 * violated for tasks that were already scheduled.
463 */
464 val firstTime = firstTask.nanoTime
465 // compute min(now, firstTime) using a wrap-safe check
466 val minTime = if (firstTime - now >= 0) now else firstTime
467 // update timeNow only when going forward in time
468 if (minTime - delayed.timeNow > 0) delayed.timeNow = minTime
469 }
470 /**
471 * Here [DelayedTaskQueue.timeNow] was already modified and we have to double-check that newly added
472 * task does not violate [DelayedTaskQueue] invariant because of that. Note also that this scheduleTask
473 * function can be called to reschedule from one queue to another and this might be another reason
474 * where new task's time might now violate invariant.
475 * We correct invariant violation (if any) by simply changing this task's time to now.
476 */
477 if (nanoTime - delayed.timeNow < 0) nanoTime = delayed.timeNow
478 true
479 }
480 return SCHEDULE_OK
481 }
482
<lambda>null483 final override fun dispose(): Unit = synchronized(this) {
484 val heap = _heap
485 if (heap === DISPOSED_TASK) return // already disposed
486 (heap as? DelayedTaskQueue)?.remove(this) // remove if it is in heap (first)
487 _heap = DISPOSED_TASK // never add again to any heap
488 }
489
toStringnull490 override fun toString(): String = "Delayed[nanos=$nanoTime]"
491 }
492
493 private inner class DelayedResumeTask(
494 nanoTime: Long,
495 private val cont: CancellableContinuation<Unit>
496 ) : DelayedTask(nanoTime) {
497 override fun run() { with(cont) { resumeUndispatched(Unit) } }
498 override fun toString(): String = super.toString() + cont.toString()
499 }
500
501 private class DelayedRunnableTask(
502 nanoTime: Long,
503 private val block: Runnable
504 ) : DelayedTask(nanoTime) {
runnull505 override fun run() { block.run() }
toStringnull506 override fun toString(): String = super.toString() + block.toString()
507 }
508
509 /**
510 * Delayed task queue maintains stable time-comparision invariant despite potential wraparounds in
511 * long nano time measurements by maintaining last observed [timeNow]. It protects the integrity of the
512 * heap data structure in spite of potential non-monotonicity of `nanoTime()` source.
513 * The invariant is that for every scheduled [DelayedTask]:
514 *
515 * ```
516 * delayedTask.nanoTime - timeNow >= 0
517 * ```
518 *
519 * So the comparison of scheduled tasks via [DelayedTask.compareTo] is always stable as
520 * scheduled [DelayedTask.nanoTime] can be at most [Long.MAX_VALUE] apart. This invariant is maintained when
521 * new tasks are added by [DelayedTask.scheduleTask] function and it cannot be violated when tasks are removed
522 * (so there is nothing special to do there).
523 */
524 internal class DelayedTaskQueue(
525 @JvmField var timeNow: Long
526 ) : ThreadSafeHeap<DelayedTask>()
527 }
528
529 internal expect fun createEventLoop(): EventLoop
530
531 internal expect fun nanoTime(): Long
532
533 internal expect object DefaultExecutor {
534 fun enqueue(task: Runnable)
535 }
536
537 /**
538 * Used by Darwin targets to wrap a [Runnable.run] call in an Objective-C Autorelease Pool. It is a no-op on JVM, JS and
539 * non-Darwin native targets.
540 *
541 * Coroutines on Darwin targets can call into the Objective-C world, where a callee may push a to-be-returned object to
542 * the Autorelease Pool, so as to avoid a premature ARC release before it reaches the caller. This means the pool must
543 * be eventually drained to avoid leaks. Since Kotlin Coroutines does not use [NSRunLoop], which provides automatic
544 * pool management, it must manage the pool creation and pool drainage manually.
545 */
platformAutoreleasePoolnull546 internal expect inline fun platformAutoreleasePool(crossinline block: () -> Unit)
547